blob: b091e06acee49034a438fbf07cfadc5e10c06665 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Andreas Schank, Andreas Weber(Attensity Europe GmbH) - implementation
*******************************************************************************/
package org.eclipse.smila.processing.worker.test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import org.eclipse.smila.binarystorage.BinaryStorageService;
import org.eclipse.smila.bulkbuilder.BulkbuilderException;
import org.eclipse.smila.bulkbuilder.BulkbuilderService;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.DataFactory;
import org.eclipse.smila.datamodel.Record;
import org.eclipse.smila.jobmanager.JobManagerException;
import org.eclipse.smila.jobmanager.test.JobManagerTestBase;
import org.eclipse.smila.objectstore.ObjectStoreException;
import org.eclipse.smila.objectstore.StoreObject;
import org.eclipse.smila.processing.parameters.ParameterAccessor;
import org.eclipse.smila.processing.worker.PipelineProcessingWorker;
import org.eclipse.smila.taskmanager.BulkInfo;
import org.eclipse.smila.taskworker.input.RecordInput;
public class TestBpelWorker extends JobManagerTestBase {
private static final String TEMP_STORE = "tempStore";
private static final String STORE = "store";
private static final String BPEL_TEST_PIPELINE = "TestPipeline";
private static final String BUCKET_NAME = "outbulkBucket";
/** service under test. */
protected BulkbuilderService _builder;
@Override
protected void setUp() throws Exception {
super.setUp();
_builder = getService(BulkbuilderService.class);
assertNotNull(_builder);
// wait for binary storage...
assertNotNull(getService(BinaryStorageService.class));
// create stores...
_objectStoreService.ensureStore(STORE);
_objectStoreService.ensureStore(TEMP_STORE);
BpelWorkerTestPipelet.s_throwExceptions = true;
}
/** Test with a pipeline name that doesn't exist. */
public void testErrorOnNonExistingPipeline() throws Exception {
final String workflowName = "testWithoutOutputBucket";
final int noOfRecords = 10;
final String jobName = "testJobError";
final AnyMap parameters = DataFactory.DEFAULT.createAnyMap();
parameters.put(PipelineProcessingWorker.KEY_PIPELINE_NAME, "PipelineThatDoesNotExist");
parameters.put("tempStore", TEMP_STORE);
parameters.put("store", STORE);
addJob(jobName, workflowName, parameters);
final String jobRunId = startJob(jobName);
for (int i = 0; i < noOfRecords; i++) {
_builder.addRecord(jobName, DataFactory.DEFAULT.createRecord(Integer.toString(i)));
}
_builder.commitJob(jobName);
_jobManager.finishJob(jobName, jobRunId);
waitForJobRunCompleted(jobName, jobRunId, 100000); // wait max. 10 seconds
assertJobRunFailed(jobName, jobRunId);
}
/** Test with a workflow containing a BPEL worker that has no output bucket. */
public void testWithoutOutputBucket() throws Exception {
final String workflowName = "testWithoutOutputBucket";
final int noOfRecords = 10;
executeJob(workflowName, noOfRecords, 1);
assertEquals(1, BpelWorkerTestPipelet._lastParallelRecordCount);
executeJob(workflowName, noOfRecords, 5);
assertEquals(5, BpelWorkerTestPipelet._lastParallelRecordCount);
executeJob(workflowName, noOfRecords, 10);
assertEquals(10, BpelWorkerTestPipelet._lastParallelRecordCount);
executeJob(workflowName, noOfRecords, 20);
assertEquals(10, BpelWorkerTestPipelet._lastParallelRecordCount);
executeJob(workflowName, noOfRecords, 0);
assertEquals("Expecting default " + PipelineProcessingWorker.DEFAULT_NUMBER_OF_PARALLEL_RECORDS
+ " for value = 0", PipelineProcessingWorker.DEFAULT_NUMBER_OF_PARALLEL_RECORDS,
BpelWorkerTestPipelet._lastParallelRecordCount);
executeJob(workflowName, noOfRecords, -1);
assertEquals("Expecting default " + PipelineProcessingWorker.DEFAULT_NUMBER_OF_PARALLEL_RECORDS
+ " for value < 0", PipelineProcessingWorker.DEFAULT_NUMBER_OF_PARALLEL_RECORDS,
BpelWorkerTestPipelet._lastParallelRecordCount);
}
/** Test with a workflow containing a BPEL worker that has an output bucket. */
public void testWithOutputBucket() throws Exception {
final String workflowName = "testWithOutputBucket";
final int noOfRecords = 10;
executeJob(workflowName, noOfRecords, 1);
checkAndRemoveRecords(STORE, noOfRecords, 1);
executeJob(workflowName, noOfRecords, 5);
checkAndRemoveRecords(STORE, noOfRecords, 5);
executeJob(workflowName, noOfRecords, 10);
checkAndRemoveRecords(STORE, noOfRecords, 10);
executeJob(workflowName, noOfRecords, 20);
checkAndRemoveRecords(STORE, noOfRecords, 20);
executeJob(workflowName, noOfRecords, 0);
checkAndRemoveRecords(STORE, noOfRecords, PipelineProcessingWorker.DEFAULT_NUMBER_OF_PARALLEL_RECORDS);
executeJob(workflowName, noOfRecords, -1);
checkAndRemoveRecords(STORE, noOfRecords, PipelineProcessingWorker.DEFAULT_NUMBER_OF_PARALLEL_RECORDS);
}
public void testTaskParametersInRecord() throws Exception {
final String workflowName = "testWithOutputBucket";
final int noOfRecords = 10;
executeJob(workflowName, noOfRecords, 1);
checkParametersAndRemoveRecords(STORE, noOfRecords, 1);
executeJob(workflowName, noOfRecords, 5);
checkParametersAndRemoveRecords(STORE, noOfRecords, 5);
executeJob(workflowName, noOfRecords, 10);
checkParametersAndRemoveRecords(STORE, noOfRecords, 10);
}
/** Test with a workflow containing two BPEL worker that has an output bucket. */
public void testTwoPipelinesWithOutputBucket() throws Exception {
final String workflowName = "testMoreThanOneWorkerWithOutputBucket";
final int noOfRecords = 10;
executeJob(workflowName, noOfRecords, 1);
checkAndRemoveRecordsForTwoPipelines(STORE, noOfRecords, 1);
executeJob(workflowName, noOfRecords, 5);
checkAndRemoveRecordsForTwoPipelines(STORE, noOfRecords, 5);
executeJob(workflowName, noOfRecords, 10);
checkAndRemoveRecordsForTwoPipelines(STORE, noOfRecords, 10);
executeJob(workflowName, noOfRecords, 20);
checkAndRemoveRecordsForTwoPipelines(STORE, noOfRecords, 20);
executeJob(workflowName, noOfRecords, 0);
checkAndRemoveRecordsForTwoPipelines(STORE, noOfRecords,
PipelineProcessingWorker.DEFAULT_NUMBER_OF_PARALLEL_RECORDS);
executeJob(workflowName, noOfRecords, -1);
checkAndRemoveRecordsForTwoPipelines(STORE, noOfRecords,
PipelineProcessingWorker.DEFAULT_NUMBER_OF_PARALLEL_RECORDS);
}
/** tests handling of recoverable ProcessingExceptions */
public void testProcessingRecoverableException() throws Exception {
// Recoverable ProcessingException
final String jobName = "testJobExceptionsRecoverable";
final int noOfRecordsProcessedParallel = 5;
final int noOfRecords = 10;
final int noOfErrorRecord = 5;
final String workflowName = "testWithOutputBucket";
createJob(workflowName, noOfRecordsProcessedParallel, jobName);
final String jobRunId = startJob(jobName);
for (int i = 0; i < noOfRecords; i++) {
final Record record = DataFactory.DEFAULT.createRecord(Integer.toString(i));
if (i == noOfErrorRecord) {
record.getMetadata().put(BpelWorkerTestPipelet.ATTRIBUTE_THROW_EXCEPTION, true);
record.getMetadata().put(BpelWorkerTestPipelet.ATTRIBUTE_THROW_RECOVERABLE_EXCEPTION, true);
}
_builder.addRecord(jobName, record);
}
commitAndWaitForJob(jobName, jobRunId, true);
// everything should be ok, no record will be missing...
final Collection<StoreObject> storeObjects = _objectStoreService.getStoreObjectInfos(STORE, BUCKET_NAME);
assertEquals(1, storeObjects.size());
final BulkInfo bulkInfo = new BulkInfo(BUCKET_NAME, STORE, storeObjects.iterator().next().getId());
final RecordInput recordInput = new RecordInput(bulkInfo, _objectStoreService);
final Collection<String> ids = new HashSet<String>();
try {
Record record = recordInput.getRecord();
while (record != null) {
ids.add(record.getId());
record = recordInput.getRecord();
}
} finally {
recordInput.close();
}
assertEquals(noOfRecords, ids.size());
}
/** tests handling of non-recoverable ProcessingExceptions of one record. */
public void testProcessingNonrecoverableExceptionOfOneRecord() throws Exception {
final String jobName = "testProcessingNonrecoverableException";
final int noOfRecordsProcessedParallel = 5;
final int noOfRecords = 10;
final int noOfErrorRecord = 5;
final String workflowName = "testWithOutputBucket";
createJob(workflowName, noOfRecordsProcessedParallel, jobName);
// nonrecoverable ProcessingException
final String jobRunId = startJob(jobName);
for (int i = 0; i < noOfRecords; i++) {
final Record record = DataFactory.DEFAULT.createRecord(Integer.toString(i));
if (i == noOfErrorRecord) {
record.getMetadata().put(BpelWorkerTestPipelet.ATTRIBUTE_THROW_EXCEPTION, true);
}
_builder.addRecord(jobName, record);
}
commitAndWaitForJob(jobName, jobRunId, true);
// everything should be ok, but one bunch of records will be missing...
final Collection<StoreObject> storeObjects = _objectStoreService.getStoreObjectInfos(STORE, BUCKET_NAME);
assertEquals(1, storeObjects.size());
final BulkInfo bulkInfo = new BulkInfo(BUCKET_NAME, STORE, storeObjects.iterator().next().getId());
final RecordInput recordInput = new RecordInput(bulkInfo, _objectStoreService);
final Collection<String> ids = new HashSet<String>();
try {
Record record = recordInput.getRecord();
while (record != null) {
ids.add(record.getId());
record = recordInput.getRecord();
}
} finally {
recordInput.close();
}
// one bunch of parallely processed records will be missing...
assertEquals(noOfRecords - noOfRecordsProcessedParallel, ids.size());
}
/** tests handling of non-recoverable ProcessingExceptions of all records. */
public void testProcessingNonrecoverableExceptionOfAllRecords() throws Exception {
final String jobName = "testProcessingNonrecoverableException";
final int noOfRecordsProcessedParallel = 5;
final int noOfRecords = 10;
final String workflowName = "testWithOutputBucket";
createJob(workflowName, noOfRecordsProcessedParallel, jobName);
// nonrecoverable ProcessingException
final String jobRunId = startJob(jobName);
for (int i = 0; i < noOfRecords; i++) {
final Record record = DataFactory.DEFAULT.createRecord(Integer.toString(i));
record.getMetadata().put(BpelWorkerTestPipelet.ATTRIBUTE_THROW_EXCEPTION, true);
_builder.addRecord(jobName, record);
}
commitAndWaitForJob(jobName, jobRunId, false); // false -> job run should be failed, no successful workflow run
// there should be no ouput objects
final Collection<StoreObject> storeObjects = _objectStoreService.getStoreObjectInfos(STORE, BUCKET_NAME);
assertEquals(0, storeObjects.size());
}
/** tests handling of attachments */
public void testProcessingWithAttachments() throws Exception {
// Recoverable ProcessingException
final String jobName = "testProcessingWithAttachments";
final int noOfRecordsProcessedParallel = 5;
final int noOfRecords = 10;
final String workflowName = "testWithOutputBucket";
final AnyMap parameters = DataFactory.DEFAULT.createAnyMap();
final String attachmentName = "attachment";
parameters.put(PipelineProcessingWorker.KEY_NUMBER_OF_PARALLEL_RECORDS, noOfRecordsProcessedParallel);
parameters.put(PipelineProcessingWorker.KEY_PIPELINE_NAME, "TestAttachmentsPipeline");
parameters.put("tempStore", TEMP_STORE);
parameters.put("store", STORE);
addJob(jobName, workflowName, parameters);
final String jobRunId = startJob(jobName);
for (int i = 0; i < noOfRecords; i++) {
final Record record = DataFactory.DEFAULT.createRecord(Integer.toString(i));
record.getMetadata().put(BpelWorkerAttachmentTestPipelet.ATTRIBUTE_ATTACHMENT_NAME, attachmentName);
record.setAttachment(attachmentName, Integer.toString(i).getBytes("UTF-8"));
_builder.addRecord(jobName, record);
}
commitAndWaitForJob(jobName, jobRunId, true);
// everything should be ok, no record will be missing...
final Collection<StoreObject> storeObjects = _objectStoreService.getStoreObjectInfos(STORE, BUCKET_NAME);
assertEquals(1, storeObjects.size());
final BulkInfo bulkInfo = new BulkInfo(BUCKET_NAME, STORE, storeObjects.iterator().next().getId());
final RecordInput recordInput = new RecordInput(bulkInfo, _objectStoreService);
final Collection<String> ids = new HashSet<String>();
try {
Record record = recordInput.getRecord();
while (record != null) {
ids.add(record.getId());
final String idAsString = String.valueOf(record.getId());
assertEquals(idAsString,
record.getMetadata().getStringValue(BpelWorkerAttachmentTestPipelet.ATTRIBUTE_ATTACHMENT_TEXT));
record = recordInput.getRecord();
}
} finally {
recordInput.close();
}
assertEquals(noOfRecords, ids.size());
}
private void executeJob(final String workflowName, final int noOfRecords, final int noOfRecordsProcessedParallel)
throws Exception {
final String jobName = "testJob" + noOfRecordsProcessedParallel;
createJob(workflowName, noOfRecordsProcessedParallel, jobName);
final String jobRunId = startJob(jobName);
for (int i = 0; i < noOfRecords; i++) {
_builder.addRecord(jobName, DataFactory.DEFAULT.createRecord(Integer.toString(i)));
}
commitAndWaitForJob(jobName, jobRunId, true);
}
/**
* @param jobName
* @param jobRunId
* @param expectJobSuccessful
* @throws BulkbuilderException
* @throws JobManagerException
* @throws Exception
*/
protected void commitAndWaitForJob(final String jobName, final String jobRunId, final boolean expectJobSuccessful)
throws BulkbuilderException, JobManagerException, Exception {
_builder.commitJob(jobName);
_jobManager.finishJob(jobName, jobRunId);
waitForJobRunCompleted(jobName, jobRunId, 100000); // wait max. 10 seconds
if (expectJobSuccessful) {
assertJobRunSucceeded(jobName, jobRunId, 1);
} else {
assertJobRunFailed(jobName, jobRunId);
}
}
/**
* @param workflowName
* @param noOfRecordsProcessedParallel
* @param jobName
* @throws Exception
*/
protected void createJob(final String workflowName, final int noOfRecordsProcessedParallel, final String jobName)
throws Exception {
final AnyMap parameters = DataFactory.DEFAULT.createAnyMap();
parameters.put(PipelineProcessingWorker.KEY_NUMBER_OF_PARALLEL_RECORDS, noOfRecordsProcessedParallel);
parameters.put(PipelineProcessingWorker.KEY_PIPELINE_NAME, BPEL_TEST_PIPELINE);
parameters.put("tempStore", TEMP_STORE);
parameters.put("store", STORE);
addJob(jobName, workflowName, parameters);
}
private void checkAndRemoveRecordsForTwoPipelines(final String store2, final int noOfRecords,
final int noOfParallelRecords) throws ObjectStoreException, IOException {
final Collection<Long> parallelRecordNos = new ArrayList<Long>();
parallelRecordNos.add(Long.valueOf(Math.min(noOfParallelRecords, noOfRecords)));
// 1: constant from workflow.
parallelRecordNos.add(Long.valueOf(Math.min(1, noOfRecords)));
checkAndRemoveRecords(STORE, noOfRecords, parallelRecordNos, Math.min(noOfParallelRecords, noOfRecords));
}
private void checkAndRemoveRecords(final String store2, final int noOfRecords, final int noOfParallelRecords)
throws ObjectStoreException, IOException {
checkAndRemoveRecords(STORE, noOfRecords, null, Math.min(noOfParallelRecords, noOfRecords));
}
/**
* checks the output bulk for correctness and removes it afterwards.
*
* @throws ObjectStoreException
* @throws IOException
*/
private void checkAndRemoveRecords(final String storeName, final int noOfRecords,
final Collection<Long> noOfParallelRecords, final int attributeRecordNo) throws ObjectStoreException,
IOException {
final Collection<StoreObject> storeObjects = _objectStoreService.getStoreObjectInfos(storeName, BUCKET_NAME);
assertEquals(1, storeObjects.size());
final BulkInfo bulkInfo = new BulkInfo(BUCKET_NAME, storeName, storeObjects.iterator().next().getId());
final RecordInput recordInput = new RecordInput(bulkInfo, _objectStoreService);
final Collection<String> ids = new HashSet<String>();
try {
Record record = recordInput.getRecord();
while (record != null) {
ids.add(record.getId());
if (noOfParallelRecords != null) {
final Collection<Long> noOfParallelRecordsFound =
record.getMetadata().getSeq(BpelWorkerTestPipelet2.SECOND_ATTRIBUTE).asLongs();
assertEquals(noOfParallelRecords, noOfParallelRecordsFound);
}
assertEquals(attributeRecordNo, record.getMetadata().getSeq(BpelWorkerTestPipelet2.ATTRIBUTE_RECORD_COUNT)
.getLongValue(0).intValue());
record = recordInput.getRecord();
}
} finally {
recordInput.close();
_objectStoreService.removeObject(storeName, bulkInfo.getObjectName());
}
}
/**
* checks the output bulk for correctness and removes it afterwards.
*
* @throws ObjectStoreException
* @throws IOException
*/
private void checkParametersAndRemoveRecords(final String storeName, final int noOfRecords,
final int noOfParallelRecords) throws ObjectStoreException, IOException {
final Collection<StoreObject> storeObjects = _objectStoreService.getStoreObjectInfos(storeName, BUCKET_NAME);
assertEquals(1, storeObjects.size());
final BulkInfo bulkInfo = new BulkInfo(BUCKET_NAME, storeName, storeObjects.iterator().next().getId());
final RecordInput recordInput = new RecordInput(bulkInfo, _objectStoreService);
int recordCount = 0;
try {
Record record = recordInput.getRecord();
while (record != null) {
recordCount++;
final AnyMap parameters = record.getMetadata().getMap(ParameterAccessor.DEFAULT_PARAMETERS_ATTRIBUTE);
assertNotNull(parameters);
// compare with #createJob() for expected parameter values.
assertEquals(BPEL_TEST_PIPELINE, parameters.getStringValue(PipelineProcessingWorker.KEY_PIPELINE_NAME));
assertEquals(Integer.toString(noOfParallelRecords),
parameters.getStringValue(PipelineProcessingWorker.KEY_NUMBER_OF_PARALLEL_RECORDS));
assertEquals(TEMP_STORE, parameters.getStringValue("tempStore"));
assertEquals(STORE, parameters.getStringValue("store"));
record = recordInput.getRecord();
}
} finally {
recordInput.close();
_objectStoreService.removeObject(storeName, bulkInfo.getObjectName());
}
assertEquals(noOfRecords, recordCount);
}
}