blob: c4716336d23922f7da28e2834078edb6f1a534dd [file] [log] [blame]
/**********************************************************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Juergen Schumacher, Andreas Weber, Drazen Cindric, Andreas Schank (all Attensity Europe GmbH) - initial
* implementation
**********************************************************************************************************************/
package org.eclipse.smila.bulkbuilder.test;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.httpclient.HttpMethod;
import org.apache.commons.httpclient.HttpStatus;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.commons.io.IOUtils;
import org.eclipse.smila.bulkbuilder.BulkbuilderConstants;
import org.eclipse.smila.bulkbuilder.httphandler.BulkbuilderBurstHandler;
import org.eclipse.smila.bulkbuilder.httphandler.BulkbuilderHandler;
import org.eclipse.smila.bulkbuilder.internal.BulkbuilderServiceImpl;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.AnySeq;
import org.eclipse.smila.datamodel.DataFactory;
import org.eclipse.smila.datamodel.Record;
import org.eclipse.smila.jobmanager.JobState;
import org.eclipse.smila.jobmanager.WorkflowRunInfo;
import org.eclipse.smila.taskmanager.Task;
import org.eclipse.smila.taskworker.input.RecordInput;
/**
* Test case for BulkBuilderBurstHandler in incremental mode.
*
* @author cind01
*
*/
public class TestBulkbuilderBurstHandler extends BulkbuilderTestBase {
/**
* Tests if the service can be accessed.
*
* @throws Exception
* test fails.
*/
public void testService() throws Exception {
assertTrue(_builder instanceof BulkbuilderServiceImpl);
}
/**
* tests if the error return codes are correct.
*
* @throws Exception
* unexpected Exception
*/
public void testErrorCodes() throws Exception {
// test post to job that does not exist
HttpMethod method = new PostMethod(BASE_URI + "job/iDoNotExist/bulk");
assertEquals(HttpStatus.SC_NOT_FOUND, _httpClient.executeMethod(method));
final String jobName = "testIncModeErrorCodesWithHandler";
createJob(jobName, "bulkBuilderTest", "test");
// test post to job that does not run yet
method = new PostMethod(BASE_URI + "job/" + jobName + "/bulk");
assertEquals(HttpStatus.SC_NOT_FOUND, _httpClient.executeMethod(method));
final String jobRunId = _jobRunEngine.startJob(jobName);
_jobRunEngine.finishJob(jobName, jobRunId);
// test POST to job that has already been finished
method = new PostMethod(BASE_URI + "job/" + jobName + "/bulk");
assertEquals(HttpStatus.SC_NOT_FOUND, _httpClient.executeMethod(method));
}
/**
* test creation for adding micro bulk only.
*
* @throws Exception
* test fails
*/
public void testAddMicroBulk() throws Exception {
final String jobName = "testAddBulkBurstInc";
final String jobId = startJob(jobName, "tempstore");
final int noOfRecords = 3;
final int numberOfMicroBulks = 2;
final byte[] jsonBulk = createBulk(0, noOfRecords);
for (int j = 0; j < numberOfMicroBulks; j++) {
final AnyMap result = addBulkStreamWithClient(jobName, new ByteArrayInputStream(jsonBulk)).asMap();
assertTrue(result.containsKey(BulkbuilderBurstHandler.KEY_WORKFLOW_RUNS));
final AnySeq workflows = result.getSeq(BulkbuilderBurstHandler.KEY_WORKFLOW_RUNS);
assertEquals(1, workflows.size());
final AnyMap res1 = workflows.getMap(0);
assertTrue(res1.containsKey(WorkflowRunInfo.KEY_WORKFLOW_RUN_ID));
assertTrue(res1.containsKey(BulkbuilderHandler.KEY_URL));
assertTrue(res1.containsKey(WorkflowRunInfo.KEY_JOB_RUN_ID));
}
_builder.commitJob(jobName);
assertNoTask(WORKER_DELETESCONSUMER);
waitForFinishingTasksProcessed();
final Task task = getTask(WORKER_BULKCONSUMER);
final RecordInput reader = readBulk(task, "bulk");
try {
int numberOfResultRecords = 0;
final int numberOfAllRecords = numberOfMicroBulks * noOfRecords;
for (int i = 0; i < numberOfAllRecords; i++) {
final Record record = reader.getRecord();
assertNotNull(record);
assertEquals(CONTENT, record.getMetadata().getStringValue("content"));
assertEquals(UMLAUTS, record.getMetadata().getStringValue("umlauts"));
numberOfResultRecords++;
}
assertEquals(numberOfAllRecords, numberOfResultRecords);
finishTaskWithSuccess(task);
assertNoTask(WORKER_BULKCONSUMER);
_jobRunEngine.finishJob(jobName, jobId);
checkCounters(jobName, jobId, 1, numberOfAllRecords, 0, 0);
} finally {
reader.close();
}
}
/**
* test for adding micro bulks where each micro bulk is bigger than limitIncSize.
*
* @throws Exception
* test fails
*/
public void testAddMicroBulkBySize() throws Exception {
final String jobName = "testAddMicroBulkIncrementalSize";
final String jobId = startJob(jobName, "tempstore");
// 10 records are bigger than limitIncSize=10k
final int noOfRecords = 10;
final int numberOfMicroBulks = 5;
final byte[] jsonBulk = createBulk(0, noOfRecords);
for (int j = 0; j < numberOfMicroBulks; j++) {
addBulkStreamWithClient(jobName, new ByteArrayInputStream(jsonBulk)).asMap();
}
_builder.commitJob(jobName);
assertNoTask(WORKER_DELETESCONSUMER);
waitForFinishingTasksProcessed();
Task task = null;
int numberOfResultRecords = 0;
int taskCount = 0;
final int numberOfAllRecords = numberOfMicroBulks * noOfRecords;
do {
task = _taskManager.getTask(WORKER_BULKCONSUMER, null);
if (task != null) {
taskCount++;
final RecordInput reader = readBulk(task, "bulk");
try {
Record record = null;
do {
record = reader.getRecord();
if (record != null) {
numberOfResultRecords++;
}
} while (record != null);
finishTaskWithSuccess(task);
} finally {
reader.close();
}
}
} while (task != null);
assertEquals(numberOfAllRecords, numberOfResultRecords);
assertNoTask(WORKER_BULKCONSUMER);
checkCounters(jobName, jobId, taskCount, numberOfAllRecords, 0, 0);
_jobRunEngine.finishJob(jobName, jobId);
}
/**
* test committing a bulk by pushing an empty micro bulk.
*/
public void testMicroBulkCommitExplicitly() throws Exception {
final String jobName = "testMicroBulkCommitExplicitly";
final String jobId = startJobWithIncreasedLimits(jobName, "tempstore");
// 10 records are bigger than limitIncSize=10k
final int noOfRecords = 5;
final int numberOfMicroBulks = 2;
for (int j = 0; j < numberOfMicroBulks; j++) {
addBulkStreamWithClient(jobName, new ByteArrayInputStream(createBulk(j * noOfRecords, noOfRecords)));
}
// commit using empty bulk
commitBulkWithClient(jobName);
checkBulks(jobName, jobId, 0, numberOfMicroBulks * noOfRecords, 0, 1, false);
}
/**
* test error on opening more than the allowed number of microbulks (2 at the time of writing) in a single job.
*
* @throws Exception
*/
public void testParallelMicroBulksLimitSingleJob() throws Exception {
final String jobName = "testParallelMicroBulksLimitSingleJob";
System.out.println("Testing " + jobName);
final String jobId = startJobWithIncreasedLimits(jobName, "tempstore");
final int noOfParallelBulks = 2; // configured limit.
final int noOfRecords = 50;
final HttpURLConnection[] conn = new HttpURLConnection[noOfParallelBulks + 1];
final OutputStream[] out = new OutputStream[conn.length];
for (int i = 0; i < conn.length; i++) {
conn[i] = openBulkRequest(jobName);
out[i] = conn[i].getOutputStream();
out[i].write(createBulk(i * noOfRecords, noOfRecords));
out[i].flush();
}
final int failedBulkNumber = finishConnections(conn, out);
_builder.commitJob(jobName);
checkBulkOfMicroBulks(jobName, jobId, noOfParallelBulks, noOfRecords, failedBulkNumber);
}
/**
* test error on opening more than the allowed number of microbulks (2 at the time of writing) in a multiple jobs.
*
* @throws Exception
*/
public void testParallelMicroBulksLimitMultipleJobs() throws Exception {
final String baseJobName = "testParallelMicroBulksLimitMultipleJobs";
System.out.println("Testing " + baseJobName);
final int noOfParallelBulks = 2; // configured limit.
final int noOfRecords = 50; // use many records to guarantee order of microbulk creation
final String[] jobName = new String[noOfParallelBulks + 1];
final String[] jobId = new String[jobName.length];
final HttpURLConnection[] conn = new HttpURLConnection[jobName.length];
final OutputStream[] out = new OutputStream[jobName.length];
for (int i = 0; i < jobName.length; i++) {
jobName[i] = baseJobName + i;
jobId[i] = startJobWithIncreasedLimits(jobName[i], "tempstore");
conn[i] = openBulkRequest(jobName[i]);
out[i] = conn[i].getOutputStream();
out[i].write(createBulk(i * noOfRecords, noOfRecords));
}
final int failedBulkNumber = finishConnections(conn, out);
for (int i = 0; i < jobName.length; i++) {
if (i != failedBulkNumber) {
_builder.commitJob(jobName[i]);
checkBulks(jobName[i], jobId[i], i * noOfRecords, noOfRecords, 0, 1, false);
} else {
_builder.commitJob(jobName[i]);
}
}
waitForRunningWorker();
}
/** test where one record in a micro bulk is broken, so none of this micro bulk's records should be added. */
public void testAddBrokenMicroBulk() throws Exception {
final String jobName = "testAddBulkBurst";
final String jobId = startJob(jobName, "tempstore");
final int noOfRecordsPerMicroBulk = 5;
// add first micro bulk
StringBuilder jsonBuffer = new StringBuilder();
for (int i = 0; i < noOfRecordsPerMicroBulk; i++) {
jsonBuffer.append(String.format("{\"_recordid\": \"%d\", \"content\": \"%s\" }\n", i, CONTENT));
}
sendMicroBulk(jobName, jsonBuffer);
// add second micro bulk with last record broken
jsonBuffer = new StringBuilder();
for (int i = 0; i < noOfRecordsPerMicroBulk; i++) {
jsonBuffer.append(String.format("{\"_recordid\": \"%d\", \"content\": \"%s\" }\n", 10 + i, CONTENT));
}
jsonBuffer.append(String.format("{\"_recordid\": \"%d\", \"content: \"%s\" }}}}}}}\n", 4711,
"I have no valid JSON format"));
try {
sendMicroBulk(jobName, jsonBuffer);
fail("Invalid micro bulk should cause an error");
} catch (final HttpClientException e) {
assertEquals(HttpStatus.SC_BAD_REQUEST, e.getResponseCode());
}
// add third micro bulk
jsonBuffer = new StringBuilder();
for (int i = 0; i < noOfRecordsPerMicroBulk; i++) {
jsonBuffer.append(String.format("{\"_recordid\": \"%d\", \"content\": \"%s\" }\n", 100 + i, CONTENT));
}
sendMicroBulk(jobName, jsonBuffer);
// add fourth micro bulk with first record broken
jsonBuffer = new StringBuilder();
jsonBuffer.append(String.format("{\"_recordid\": \"%d\", \"content: \"%s\" }}}}}}}\n", 4711,
"I have no valid JSON format"));
for (int i = 0; i < noOfRecordsPerMicroBulk; i++) {
jsonBuffer.append(String.format("{\"_recordid\": \"%d\", \"content\": \"%s\" }\n", 1000 + i, CONTENT));
}
try {
sendMicroBulk(jobName, jsonBuffer);
fail("Invalid micro bulk should cause an error");
} catch (final HttpClientException e) {
assertEquals(HttpStatus.SC_BAD_REQUEST, e.getResponseCode());
}
// commit bulk
commitRecordWithClient(jobName);
waitForFinishingTasksProcessed();
final Task task = getTask(WORKER_BULKCONSUMER);
// read resulting bulk: first and third micro bulk should be contained, second and fourth must not be contained
final RecordInput reader = readBulk(task, "bulk");
try {
// first bulk:
for (int i = 0; i < noOfRecordsPerMicroBulk; i++) {
final Record record = reader.getRecord();
assertNotNull(record);
assertRecordId(record, i);
}
// third bulk:
for (int i = 0; i < noOfRecordsPerMicroBulk; i++) {
final Record record = reader.getRecord();
assertNotNull(record);
assertRecordId(record, 100 + i);
}
// no more
final Record record = reader.getRecord();
assertNull("No more records expected", record);
} finally {
reader.close();
}
finishTaskWithSuccess(task);
assertNoTask(WORKER_BULKCONSUMER);
_jobRunEngine.finishJob(jobName, jobId);
}
/**
* create a test job in incremental mode with increased limits.
*/
private String startJobWithIncreasedLimits(final String jobName, final String store) throws Exception {
final AnyMap parameters = DataFactory.DEFAULT.createAnyMap();
parameters.put(BulkbuilderConstants.TASK_PARAM_LIMIT_TIME, 10);
parameters.put(BulkbuilderConstants.TASK_PARAM_LIMIT_SIZE, "10m");
return startJob(jobName, store, parameters);
}
/**
* open an HttpURLConnection for pushing a bulk.
*/
private HttpURLConnection openBulkRequest(final String jobName) throws Exception {
final URL bulkUrl = new URL(BASE_URI + "job/" + jobName + "/bulk");
final HttpURLConnection conn = (HttpURLConnection) bulkUrl.openConnection();
conn.setRequestMethod("POST");
conn.setDoOutput(true);
conn.setDoInput(true);
conn.setChunkedStreamingMode(1000);
conn.connect();
return conn;
}
/**
* finish the connection and check that only one request failed with SERVICE_UNAVAILABLE, all others must succeed with
* ACCEPTED. Collection error messages for other failures.
*/
private int finishConnections(final HttpURLConnection[] conn, final OutputStream[] out) throws Exception {
final StringBuilder error = new StringBuilder();
int failedConn = -1;
for (int i = 0; i < conn.length; i++) {
IOUtils.closeQuietly(out[i]);
final int code = conn[i].getResponseCode();
if (code != HttpStatus.SC_ACCEPTED) {
if (failedConn < 0 && code == HttpStatus.SC_SERVICE_UNAVAILABLE) {
failedConn = i;
} else {
final InputStream err = conn[i].getErrorStream();
error
.append("Error in Connection " + i + ": Code " + code + ", Message " + IOUtils.toString(err) + ".\n");
IOUtils.closeQuietly(err);
}
}
conn[i].disconnect();
}
if (error.length() > 0) {
fail(error.toString());
}
return failedConn;
}
/**
* check if one bulk was created consisting of all records from the non-failed microbulks.
*/
private void checkBulkOfMicroBulks(final String jobName, final String jobId, final int noOfBulks,
final int noOfRecords, final int failedBulkNumber) throws Exception {
final Set<Integer> foundBulkNumbers = new HashSet<Integer>();
try {
int recordCount = 0;
Task task = null;
waitForFinishingTasksProcessed();
task = _taskManager.getTask(WORKER_BULKCONSUMER, null);
assertNotNull(task);
assertNoTask(WORKER_DELETESCONSUMER);
final RecordInput reader = readBulk(task, "bulk");
try {
Record record = null;
do {
record = reader.getRecord();
if (record != null) {
final int recordid = Integer.parseInt(record.getId());
foundBulkNumbers.add(recordid / noOfRecords);
assertRecordId(record, recordid);
recordCount++;
for (int i = 1; i < noOfRecords; i++) {
record = reader.getRecord();
assertNotNull(record);
assertRecordId(record, recordid + i);
recordCount++;
}
}
} while (record != null);
finishTaskWithSuccess(task);
} finally {
reader.close();
}
assertNoTask(WORKER_BULKCONSUMER);
assertEquals(noOfBulks, foundBulkNumbers.size());
assertFalse(foundBulkNumbers.contains(failedBulkNumber));
assertEquals(noOfRecords * noOfBulks, recordCount);
checkCounters(jobName, jobId, 1, noOfRecords * noOfBulks, 0, 0);
} finally {
try {
if (_jobRunDataProvider.getJobRunInfo(jobName).getState() == JobState.RUNNING) {
_jobRunEngine.finishJob(jobName, jobId);
}
} catch (final Exception ex) {
; // forget about it.
}
}
}
/** send a micro bulk via HTTP. */
private void sendMicroBulk(final String jobName, final StringBuilder jsonBuffer) throws Exception {
final String jsonBulk = jsonBuffer.toString();
addBulkStreamWithClient(jobName, new ByteArrayInputStream(jsonBulk.getBytes("utf-8")));
}
}