/********************************************************************************************************************** | |
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the | |
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this | |
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: Juergen Schumacher, Andreas Weber, Drazen Cindric, Andreas Schank (all Attensity Europe GmbH) - initial | |
* implementation | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.bulkbuilder.test; | |
import java.io.ByteArrayInputStream; | |
import java.io.InputStream; | |
import java.io.OutputStream; | |
import java.net.HttpURLConnection; | |
import java.net.URL; | |
import java.util.HashSet; | |
import java.util.Set; | |
import org.apache.commons.httpclient.HttpMethod; | |
import org.apache.commons.httpclient.HttpStatus; | |
import org.apache.commons.httpclient.methods.PostMethod; | |
import org.apache.commons.io.IOUtils; | |
import org.eclipse.smila.bulkbuilder.BulkbuilderConstants; | |
import org.eclipse.smila.bulkbuilder.httphandler.BulkbuilderBurstHandler; | |
import org.eclipse.smila.bulkbuilder.httphandler.BulkbuilderHandler; | |
import org.eclipse.smila.bulkbuilder.internal.BulkbuilderServiceImpl; | |
import org.eclipse.smila.datamodel.AnyMap; | |
import org.eclipse.smila.datamodel.AnySeq; | |
import org.eclipse.smila.datamodel.DataFactory; | |
import org.eclipse.smila.datamodel.Record; | |
import org.eclipse.smila.jobmanager.JobState; | |
import org.eclipse.smila.jobmanager.WorkflowRunInfo; | |
import org.eclipse.smila.taskmanager.Task; | |
import org.eclipse.smila.taskworker.input.RecordInput; | |
/** | |
* Test case for BulkBuilderBurstHandler in incremental mode. | |
* | |
* @author cind01 | |
* | |
*/ | |
public class TestBulkbuilderBurstHandler extends BulkbuilderTestBase { | |
/** | |
* Tests if the service can be accessed. | |
* | |
* @throws Exception | |
* test fails. | |
*/ | |
public void testService() throws Exception { | |
assertTrue(_builder instanceof BulkbuilderServiceImpl); | |
} | |
/** | |
* tests if the error return codes are correct. | |
* | |
* @throws Exception | |
* unexpected Exception | |
*/ | |
public void testErrorCodes() throws Exception { | |
// test post to job that does not exist | |
HttpMethod method = new PostMethod(BASE_URI + "job/iDoNotExist/bulk"); | |
assertEquals(HttpStatus.SC_NOT_FOUND, _httpClient.executeMethod(method)); | |
final String jobName = "testIncModeErrorCodesWithHandler"; | |
createJob(jobName, "bulkBuilderTest", "test"); | |
// test post to job that does not run yet | |
method = new PostMethod(BASE_URI + "job/" + jobName + "/bulk"); | |
assertEquals(HttpStatus.SC_NOT_FOUND, _httpClient.executeMethod(method)); | |
final String jobRunId = _jobRunEngine.startJob(jobName); | |
_jobRunEngine.finishJob(jobName, jobRunId); | |
// test POST to job that has already been finished | |
method = new PostMethod(BASE_URI + "job/" + jobName + "/bulk"); | |
assertEquals(HttpStatus.SC_NOT_FOUND, _httpClient.executeMethod(method)); | |
} | |
/** | |
* test creation for adding micro bulk only. | |
* | |
* @throws Exception | |
* test fails | |
*/ | |
public void testAddMicroBulk() throws Exception { | |
final String jobName = "testAddBulkBurstInc"; | |
final String jobId = startJob(jobName, "tempstore"); | |
final int noOfRecords = 3; | |
final int numberOfMicroBulks = 2; | |
final byte[] jsonBulk = createBulk(0, noOfRecords); | |
for (int j = 0; j < numberOfMicroBulks; j++) { | |
final AnyMap result = addBulkStreamWithClient(jobName, new ByteArrayInputStream(jsonBulk)).asMap(); | |
assertTrue(result.containsKey(BulkbuilderBurstHandler.KEY_WORKFLOW_RUNS)); | |
final AnySeq workflows = result.getSeq(BulkbuilderBurstHandler.KEY_WORKFLOW_RUNS); | |
assertEquals(1, workflows.size()); | |
final AnyMap res1 = workflows.getMap(0); | |
assertTrue(res1.containsKey(WorkflowRunInfo.KEY_WORKFLOW_RUN_ID)); | |
assertTrue(res1.containsKey(BulkbuilderHandler.KEY_URL)); | |
assertTrue(res1.containsKey(WorkflowRunInfo.KEY_JOB_RUN_ID)); | |
} | |
_builder.commitJob(jobName); | |
assertNoTask(WORKER_DELETESCONSUMER); | |
waitForFinishingTasksProcessed(); | |
final Task task = getTask(WORKER_BULKCONSUMER); | |
final RecordInput reader = readBulk(task, "bulk"); | |
try { | |
int numberOfResultRecords = 0; | |
final int numberOfAllRecords = numberOfMicroBulks * noOfRecords; | |
for (int i = 0; i < numberOfAllRecords; i++) { | |
final Record record = reader.getRecord(); | |
assertNotNull(record); | |
assertEquals(CONTENT, record.getMetadata().getStringValue("content")); | |
assertEquals(UMLAUTS, record.getMetadata().getStringValue("umlauts")); | |
numberOfResultRecords++; | |
} | |
assertEquals(numberOfAllRecords, numberOfResultRecords); | |
finishTaskWithSuccess(task); | |
assertNoTask(WORKER_BULKCONSUMER); | |
_jobRunEngine.finishJob(jobName, jobId); | |
checkCounters(jobName, jobId, 1, numberOfAllRecords, 0, 0); | |
} finally { | |
reader.close(); | |
} | |
} | |
/** | |
* test for adding micro bulks where each micro bulk is bigger than limitIncSize. | |
* | |
* @throws Exception | |
* test fails | |
*/ | |
public void testAddMicroBulkBySize() throws Exception { | |
final String jobName = "testAddMicroBulkIncrementalSize"; | |
final String jobId = startJob(jobName, "tempstore"); | |
// 10 records are bigger than limitIncSize=10k | |
final int noOfRecords = 10; | |
final int numberOfMicroBulks = 5; | |
final byte[] jsonBulk = createBulk(0, noOfRecords); | |
for (int j = 0; j < numberOfMicroBulks; j++) { | |
addBulkStreamWithClient(jobName, new ByteArrayInputStream(jsonBulk)).asMap(); | |
} | |
_builder.commitJob(jobName); | |
assertNoTask(WORKER_DELETESCONSUMER); | |
waitForFinishingTasksProcessed(); | |
Task task = null; | |
int numberOfResultRecords = 0; | |
int taskCount = 0; | |
final int numberOfAllRecords = numberOfMicroBulks * noOfRecords; | |
do { | |
task = _taskManager.getTask(WORKER_BULKCONSUMER, null); | |
if (task != null) { | |
taskCount++; | |
final RecordInput reader = readBulk(task, "bulk"); | |
try { | |
Record record = null; | |
do { | |
record = reader.getRecord(); | |
if (record != null) { | |
numberOfResultRecords++; | |
} | |
} while (record != null); | |
finishTaskWithSuccess(task); | |
} finally { | |
reader.close(); | |
} | |
} | |
} while (task != null); | |
assertEquals(numberOfAllRecords, numberOfResultRecords); | |
assertNoTask(WORKER_BULKCONSUMER); | |
checkCounters(jobName, jobId, taskCount, numberOfAllRecords, 0, 0); | |
_jobRunEngine.finishJob(jobName, jobId); | |
} | |
/** | |
* test committing a bulk by pushing an empty micro bulk. | |
*/ | |
public void testMicroBulkCommitExplicitly() throws Exception { | |
final String jobName = "testMicroBulkCommitExplicitly"; | |
final String jobId = startJobWithIncreasedLimits(jobName, "tempstore"); | |
// 10 records are bigger than limitIncSize=10k | |
final int noOfRecords = 5; | |
final int numberOfMicroBulks = 2; | |
for (int j = 0; j < numberOfMicroBulks; j++) { | |
addBulkStreamWithClient(jobName, new ByteArrayInputStream(createBulk(j * noOfRecords, noOfRecords))); | |
} | |
// commit using empty bulk | |
commitBulkWithClient(jobName); | |
checkBulks(jobName, jobId, 0, numberOfMicroBulks * noOfRecords, 0, 1, false); | |
} | |
/** | |
* test error on opening more than the allowed number of microbulks (2 at the time of writing) in a single job. | |
* | |
* @throws Exception | |
*/ | |
public void testParallelMicroBulksLimitSingleJob() throws Exception { | |
final String jobName = "testParallelMicroBulksLimitSingleJob"; | |
System.out.println("Testing " + jobName); | |
final String jobId = startJobWithIncreasedLimits(jobName, "tempstore"); | |
final int noOfParallelBulks = 2; // configured limit. | |
final int noOfRecords = 50; | |
final HttpURLConnection[] conn = new HttpURLConnection[noOfParallelBulks + 1]; | |
final OutputStream[] out = new OutputStream[conn.length]; | |
for (int i = 0; i < conn.length; i++) { | |
conn[i] = openBulkRequest(jobName); | |
out[i] = conn[i].getOutputStream(); | |
out[i].write(createBulk(i * noOfRecords, noOfRecords)); | |
out[i].flush(); | |
} | |
final int failedBulkNumber = finishConnections(conn, out); | |
_builder.commitJob(jobName); | |
checkBulkOfMicroBulks(jobName, jobId, noOfParallelBulks, noOfRecords, failedBulkNumber); | |
} | |
/** | |
* test error on opening more than the allowed number of microbulks (2 at the time of writing) in a multiple jobs. | |
* | |
* @throws Exception | |
*/ | |
public void testParallelMicroBulksLimitMultipleJobs() throws Exception { | |
final String baseJobName = "testParallelMicroBulksLimitMultipleJobs"; | |
System.out.println("Testing " + baseJobName); | |
final int noOfParallelBulks = 2; // configured limit. | |
final int noOfRecords = 50; // use many records to guarantee order of microbulk creation | |
final String[] jobName = new String[noOfParallelBulks + 1]; | |
final String[] jobId = new String[jobName.length]; | |
final HttpURLConnection[] conn = new HttpURLConnection[jobName.length]; | |
final OutputStream[] out = new OutputStream[jobName.length]; | |
for (int i = 0; i < jobName.length; i++) { | |
jobName[i] = baseJobName + i; | |
jobId[i] = startJobWithIncreasedLimits(jobName[i], "tempstore"); | |
conn[i] = openBulkRequest(jobName[i]); | |
out[i] = conn[i].getOutputStream(); | |
out[i].write(createBulk(i * noOfRecords, noOfRecords)); | |
} | |
final int failedBulkNumber = finishConnections(conn, out); | |
for (int i = 0; i < jobName.length; i++) { | |
if (i != failedBulkNumber) { | |
_builder.commitJob(jobName[i]); | |
checkBulks(jobName[i], jobId[i], i * noOfRecords, noOfRecords, 0, 1, false); | |
} else { | |
_builder.commitJob(jobName[i]); | |
} | |
} | |
waitForRunningWorker(); | |
} | |
/** test where one record in a micro bulk is broken, so none of this micro bulk's records should be added. */ | |
public void testAddBrokenMicroBulk() throws Exception { | |
final String jobName = "testAddBulkBurst"; | |
final String jobId = startJob(jobName, "tempstore"); | |
final int noOfRecordsPerMicroBulk = 5; | |
// add first micro bulk | |
StringBuilder jsonBuffer = new StringBuilder(); | |
for (int i = 0; i < noOfRecordsPerMicroBulk; i++) { | |
jsonBuffer.append(String.format("{\"_recordid\": \"%d\", \"content\": \"%s\" }\n", i, CONTENT)); | |
} | |
sendMicroBulk(jobName, jsonBuffer); | |
// add second micro bulk with last record broken | |
jsonBuffer = new StringBuilder(); | |
for (int i = 0; i < noOfRecordsPerMicroBulk; i++) { | |
jsonBuffer.append(String.format("{\"_recordid\": \"%d\", \"content\": \"%s\" }\n", 10 + i, CONTENT)); | |
} | |
jsonBuffer.append(String.format("{\"_recordid\": \"%d\", \"content: \"%s\" }}}}}}}\n", 4711, | |
"I have no valid JSON format")); | |
try { | |
sendMicroBulk(jobName, jsonBuffer); | |
fail("Invalid micro bulk should cause an error"); | |
} catch (final HttpClientException e) { | |
assertEquals(HttpStatus.SC_BAD_REQUEST, e.getResponseCode()); | |
} | |
// add third micro bulk | |
jsonBuffer = new StringBuilder(); | |
for (int i = 0; i < noOfRecordsPerMicroBulk; i++) { | |
jsonBuffer.append(String.format("{\"_recordid\": \"%d\", \"content\": \"%s\" }\n", 100 + i, CONTENT)); | |
} | |
sendMicroBulk(jobName, jsonBuffer); | |
// add fourth micro bulk with first record broken | |
jsonBuffer = new StringBuilder(); | |
jsonBuffer.append(String.format("{\"_recordid\": \"%d\", \"content: \"%s\" }}}}}}}\n", 4711, | |
"I have no valid JSON format")); | |
for (int i = 0; i < noOfRecordsPerMicroBulk; i++) { | |
jsonBuffer.append(String.format("{\"_recordid\": \"%d\", \"content\": \"%s\" }\n", 1000 + i, CONTENT)); | |
} | |
try { | |
sendMicroBulk(jobName, jsonBuffer); | |
fail("Invalid micro bulk should cause an error"); | |
} catch (final HttpClientException e) { | |
assertEquals(HttpStatus.SC_BAD_REQUEST, e.getResponseCode()); | |
} | |
// commit bulk | |
commitRecordWithClient(jobName); | |
waitForFinishingTasksProcessed(); | |
final Task task = getTask(WORKER_BULKCONSUMER); | |
// read resulting bulk: first and third micro bulk should be contained, second and fourth must not be contained | |
final RecordInput reader = readBulk(task, "bulk"); | |
try { | |
// first bulk: | |
for (int i = 0; i < noOfRecordsPerMicroBulk; i++) { | |
final Record record = reader.getRecord(); | |
assertNotNull(record); | |
assertRecordId(record, i); | |
} | |
// third bulk: | |
for (int i = 0; i < noOfRecordsPerMicroBulk; i++) { | |
final Record record = reader.getRecord(); | |
assertNotNull(record); | |
assertRecordId(record, 100 + i); | |
} | |
// no more | |
final Record record = reader.getRecord(); | |
assertNull("No more records expected", record); | |
} finally { | |
reader.close(); | |
} | |
finishTaskWithSuccess(task); | |
assertNoTask(WORKER_BULKCONSUMER); | |
_jobRunEngine.finishJob(jobName, jobId); | |
} | |
/** | |
* create a test job in incremental mode with increased limits. | |
*/ | |
private String startJobWithIncreasedLimits(final String jobName, final String store) throws Exception { | |
final AnyMap parameters = DataFactory.DEFAULT.createAnyMap(); | |
parameters.put(BulkbuilderConstants.TASK_PARAM_LIMIT_TIME, 10); | |
parameters.put(BulkbuilderConstants.TASK_PARAM_LIMIT_SIZE, "10m"); | |
return startJob(jobName, store, parameters); | |
} | |
/** | |
* open an HttpURLConnection for pushing a bulk. | |
*/ | |
private HttpURLConnection openBulkRequest(final String jobName) throws Exception { | |
final URL bulkUrl = new URL(BASE_URI + "job/" + jobName + "/bulk"); | |
final HttpURLConnection conn = (HttpURLConnection) bulkUrl.openConnection(); | |
conn.setRequestMethod("POST"); | |
conn.setDoOutput(true); | |
conn.setDoInput(true); | |
conn.setChunkedStreamingMode(1000); | |
conn.connect(); | |
return conn; | |
} | |
/** | |
* finish the connection and check that only one request failed with SERVICE_UNAVAILABLE, all others must succeed with | |
* ACCEPTED. Collection error messages for other failures. | |
*/ | |
private int finishConnections(final HttpURLConnection[] conn, final OutputStream[] out) throws Exception { | |
final StringBuilder error = new StringBuilder(); | |
int failedConn = -1; | |
for (int i = 0; i < conn.length; i++) { | |
IOUtils.closeQuietly(out[i]); | |
final int code = conn[i].getResponseCode(); | |
if (code != HttpStatus.SC_ACCEPTED) { | |
if (failedConn < 0 && code == HttpStatus.SC_SERVICE_UNAVAILABLE) { | |
failedConn = i; | |
} else { | |
final InputStream err = conn[i].getErrorStream(); | |
error | |
.append("Error in Connection " + i + ": Code " + code + ", Message " + IOUtils.toString(err) + ".\n"); | |
IOUtils.closeQuietly(err); | |
} | |
} | |
conn[i].disconnect(); | |
} | |
if (error.length() > 0) { | |
fail(error.toString()); | |
} | |
return failedConn; | |
} | |
/** | |
* check if one bulk was created consisting of all records from the non-failed microbulks. | |
*/ | |
private void checkBulkOfMicroBulks(final String jobName, final String jobId, final int noOfBulks, | |
final int noOfRecords, final int failedBulkNumber) throws Exception { | |
final Set<Integer> foundBulkNumbers = new HashSet<Integer>(); | |
try { | |
int recordCount = 0; | |
Task task = null; | |
waitForFinishingTasksProcessed(); | |
task = _taskManager.getTask(WORKER_BULKCONSUMER, null); | |
assertNotNull(task); | |
assertNoTask(WORKER_DELETESCONSUMER); | |
final RecordInput reader = readBulk(task, "bulk"); | |
try { | |
Record record = null; | |
do { | |
record = reader.getRecord(); | |
if (record != null) { | |
final int recordid = Integer.parseInt(record.getId()); | |
foundBulkNumbers.add(recordid / noOfRecords); | |
assertRecordId(record, recordid); | |
recordCount++; | |
for (int i = 1; i < noOfRecords; i++) { | |
record = reader.getRecord(); | |
assertNotNull(record); | |
assertRecordId(record, recordid + i); | |
recordCount++; | |
} | |
} | |
} while (record != null); | |
finishTaskWithSuccess(task); | |
} finally { | |
reader.close(); | |
} | |
assertNoTask(WORKER_BULKCONSUMER); | |
assertEquals(noOfBulks, foundBulkNumbers.size()); | |
assertFalse(foundBulkNumbers.contains(failedBulkNumber)); | |
assertEquals(noOfRecords * noOfBulks, recordCount); | |
checkCounters(jobName, jobId, 1, noOfRecords * noOfBulks, 0, 0); | |
} finally { | |
try { | |
if (_jobRunDataProvider.getJobRunInfo(jobName).getState() == JobState.RUNNING) { | |
_jobRunEngine.finishJob(jobName, jobId); | |
} | |
} catch (final Exception ex) { | |
; // forget about it. | |
} | |
} | |
} | |
/** send a micro bulk via HTTP. */ | |
private void sendMicroBulk(final String jobName, final StringBuilder jsonBuffer) throws Exception { | |
final String jsonBulk = jsonBuffer.toString(); | |
addBulkStreamWithClient(jobName, new ByteArrayInputStream(jsonBulk.getBytes("utf-8"))); | |
} | |
} |