/******************************************************************************* | |
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the | |
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this | |
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: Andreas Schank, Andreas Weber(Attensity Europe GmbH) - implementation | |
*******************************************************************************/ | |
package org.eclipse.smila.processing.worker.test; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.Collection; | |
import java.util.HashSet; | |
import org.eclipse.smila.binarystorage.BinaryStorageService; | |
import org.eclipse.smila.bulkbuilder.BulkbuilderException; | |
import org.eclipse.smila.bulkbuilder.BulkbuilderService; | |
import org.eclipse.smila.datamodel.AnyMap; | |
import org.eclipse.smila.datamodel.DataFactory; | |
import org.eclipse.smila.datamodel.Record; | |
import org.eclipse.smila.jobmanager.JobManagerException; | |
import org.eclipse.smila.jobmanager.test.JobManagerTestBase; | |
import org.eclipse.smila.objectstore.ObjectStoreException; | |
import org.eclipse.smila.objectstore.StoreObject; | |
import org.eclipse.smila.processing.parameters.ParameterAccessor; | |
import org.eclipse.smila.processing.worker.PipelineProcessingWorker; | |
import org.eclipse.smila.taskmanager.BulkInfo; | |
import org.eclipse.smila.taskworker.input.RecordInput; | |
public class TestBpelWorker extends JobManagerTestBase { | |
private static final String TEMP_STORE = "tempStore"; | |
private static final String STORE = "store"; | |
private static final String BPEL_TEST_PIPELINE = "TestPipeline"; | |
private static final String BUCKET_NAME = "outbulkBucket"; | |
/** service under test. */ | |
protected BulkbuilderService _builder; | |
@Override | |
protected void setUp() throws Exception { | |
super.setUp(); | |
_builder = getService(BulkbuilderService.class); | |
assertNotNull(_builder); | |
// wait for binary storage... | |
assertNotNull(getService(BinaryStorageService.class)); | |
// create stores... | |
_objectStoreService.ensureStore(STORE); | |
_objectStoreService.ensureStore(TEMP_STORE); | |
BpelWorkerTestPipelet.s_throwExceptions = true; | |
} | |
/** Test with a pipeline name that doesn't exist. */ | |
public void testErrorOnNonExistingPipeline() throws Exception { | |
final String workflowName = "testWithoutOutputBucket"; | |
final int noOfRecords = 10; | |
final String jobName = "testJobError"; | |
final AnyMap parameters = DataFactory.DEFAULT.createAnyMap(); | |
parameters.put(PipelineProcessingWorker.KEY_PIPELINE_NAME, "PipelineThatDoesNotExist"); | |
parameters.put("tempStore", TEMP_STORE); | |
parameters.put("store", STORE); | |
addJob(jobName, workflowName, parameters); | |
final String jobRunId = startJob(jobName); | |
for (int i = 0; i < noOfRecords; i++) { | |
_builder.addRecord(jobName, DataFactory.DEFAULT.createRecord(Integer.toString(i))); | |
} | |
_builder.commitJob(jobName); | |
_jobManager.finishJob(jobName, jobRunId); | |
waitForJobRunCompleted(jobName, jobRunId, 100000); // wait max. 10 seconds | |
assertJobRunFailed(jobName, jobRunId); | |
} | |
/** Test with a workflow containing a BPEL worker that has no output bucket. */ | |
public void testWithoutOutputBucket() throws Exception { | |
final String workflowName = "testWithoutOutputBucket"; | |
final int noOfRecords = 10; | |
executeJob(workflowName, noOfRecords, 1); | |
assertEquals(1, BpelWorkerTestPipelet._lastParallelRecordCount); | |
executeJob(workflowName, noOfRecords, 5); | |
assertEquals(5, BpelWorkerTestPipelet._lastParallelRecordCount); | |
executeJob(workflowName, noOfRecords, 10); | |
assertEquals(10, BpelWorkerTestPipelet._lastParallelRecordCount); | |
executeJob(workflowName, noOfRecords, 20); | |
assertEquals(10, BpelWorkerTestPipelet._lastParallelRecordCount); | |
executeJob(workflowName, noOfRecords, 0); | |
assertEquals("Expecting default " + PipelineProcessingWorker.DEFAULT_NUMBER_OF_PARALLEL_RECORDS | |
+ " for value = 0", PipelineProcessingWorker.DEFAULT_NUMBER_OF_PARALLEL_RECORDS, | |
BpelWorkerTestPipelet._lastParallelRecordCount); | |
executeJob(workflowName, noOfRecords, -1); | |
assertEquals("Expecting default " + PipelineProcessingWorker.DEFAULT_NUMBER_OF_PARALLEL_RECORDS | |
+ " for value < 0", PipelineProcessingWorker.DEFAULT_NUMBER_OF_PARALLEL_RECORDS, | |
BpelWorkerTestPipelet._lastParallelRecordCount); | |
} | |
/** Test with a workflow containing a BPEL worker that has an output bucket. */ | |
public void testWithOutputBucket() throws Exception { | |
final String workflowName = "testWithOutputBucket"; | |
final int noOfRecords = 10; | |
executeJob(workflowName, noOfRecords, 1); | |
checkAndRemoveRecords(STORE, noOfRecords, 1); | |
executeJob(workflowName, noOfRecords, 5); | |
checkAndRemoveRecords(STORE, noOfRecords, 5); | |
executeJob(workflowName, noOfRecords, 10); | |
checkAndRemoveRecords(STORE, noOfRecords, 10); | |
executeJob(workflowName, noOfRecords, 20); | |
checkAndRemoveRecords(STORE, noOfRecords, 20); | |
executeJob(workflowName, noOfRecords, 0); | |
checkAndRemoveRecords(STORE, noOfRecords, PipelineProcessingWorker.DEFAULT_NUMBER_OF_PARALLEL_RECORDS); | |
executeJob(workflowName, noOfRecords, -1); | |
checkAndRemoveRecords(STORE, noOfRecords, PipelineProcessingWorker.DEFAULT_NUMBER_OF_PARALLEL_RECORDS); | |
} | |
public void testTaskParametersInRecord() throws Exception { | |
final String workflowName = "testWithOutputBucket"; | |
final int noOfRecords = 10; | |
executeJob(workflowName, noOfRecords, 1); | |
checkParametersAndRemoveRecords(STORE, noOfRecords, 1); | |
executeJob(workflowName, noOfRecords, 5); | |
checkParametersAndRemoveRecords(STORE, noOfRecords, 5); | |
executeJob(workflowName, noOfRecords, 10); | |
checkParametersAndRemoveRecords(STORE, noOfRecords, 10); | |
} | |
/** Test with a workflow containing two BPEL worker that has an output bucket. */ | |
public void testTwoPipelinesWithOutputBucket() throws Exception { | |
final String workflowName = "testMoreThanOneWorkerWithOutputBucket"; | |
final int noOfRecords = 10; | |
executeJob(workflowName, noOfRecords, 1); | |
checkAndRemoveRecordsForTwoPipelines(STORE, noOfRecords, 1); | |
executeJob(workflowName, noOfRecords, 5); | |
checkAndRemoveRecordsForTwoPipelines(STORE, noOfRecords, 5); | |
executeJob(workflowName, noOfRecords, 10); | |
checkAndRemoveRecordsForTwoPipelines(STORE, noOfRecords, 10); | |
executeJob(workflowName, noOfRecords, 20); | |
checkAndRemoveRecordsForTwoPipelines(STORE, noOfRecords, 20); | |
executeJob(workflowName, noOfRecords, 0); | |
checkAndRemoveRecordsForTwoPipelines(STORE, noOfRecords, | |
PipelineProcessingWorker.DEFAULT_NUMBER_OF_PARALLEL_RECORDS); | |
executeJob(workflowName, noOfRecords, -1); | |
checkAndRemoveRecordsForTwoPipelines(STORE, noOfRecords, | |
PipelineProcessingWorker.DEFAULT_NUMBER_OF_PARALLEL_RECORDS); | |
} | |
/** tests handling of recoverable ProcessingExceptions */ | |
public void testProcessingRecoverableException() throws Exception { | |
// Recoverable ProcessingException | |
final String jobName = "testJobExceptionsRecoverable"; | |
final int noOfRecordsProcessedParallel = 5; | |
final int noOfRecords = 10; | |
final int noOfErrorRecord = 5; | |
final String workflowName = "testWithOutputBucket"; | |
createJob(workflowName, noOfRecordsProcessedParallel, jobName); | |
final String jobRunId = startJob(jobName); | |
for (int i = 0; i < noOfRecords; i++) { | |
final Record record = DataFactory.DEFAULT.createRecord(Integer.toString(i)); | |
if (i == noOfErrorRecord) { | |
record.getMetadata().put(BpelWorkerTestPipelet.ATTRIBUTE_THROW_EXCEPTION, true); | |
record.getMetadata().put(BpelWorkerTestPipelet.ATTRIBUTE_THROW_RECOVERABLE_EXCEPTION, true); | |
} | |
_builder.addRecord(jobName, record); | |
} | |
commitAndWaitForJob(jobName, jobRunId, true); | |
// everything should be ok, no record will be missing... | |
final Collection<StoreObject> storeObjects = _objectStoreService.getStoreObjectInfos(STORE, BUCKET_NAME); | |
assertEquals(1, storeObjects.size()); | |
final BulkInfo bulkInfo = new BulkInfo(BUCKET_NAME, STORE, storeObjects.iterator().next().getId()); | |
final RecordInput recordInput = new RecordInput(bulkInfo, _objectStoreService); | |
final Collection<String> ids = new HashSet<String>(); | |
try { | |
Record record = recordInput.getRecord(); | |
while (record != null) { | |
ids.add(record.getId()); | |
record = recordInput.getRecord(); | |
} | |
} finally { | |
recordInput.close(); | |
} | |
assertEquals(noOfRecords, ids.size()); | |
} | |
/** tests handling of non-recoverable ProcessingExceptions of one record. */ | |
public void testProcessingNonrecoverableExceptionOfOneRecord() throws Exception { | |
final String jobName = "testProcessingNonrecoverableException"; | |
final int noOfRecordsProcessedParallel = 5; | |
final int noOfRecords = 10; | |
final int noOfErrorRecord = 5; | |
final String workflowName = "testWithOutputBucket"; | |
createJob(workflowName, noOfRecordsProcessedParallel, jobName); | |
// nonrecoverable ProcessingException | |
final String jobRunId = startJob(jobName); | |
for (int i = 0; i < noOfRecords; i++) { | |
final Record record = DataFactory.DEFAULT.createRecord(Integer.toString(i)); | |
if (i == noOfErrorRecord) { | |
record.getMetadata().put(BpelWorkerTestPipelet.ATTRIBUTE_THROW_EXCEPTION, true); | |
} | |
_builder.addRecord(jobName, record); | |
} | |
commitAndWaitForJob(jobName, jobRunId, true); | |
// everything should be ok, but one bunch of records will be missing... | |
final Collection<StoreObject> storeObjects = _objectStoreService.getStoreObjectInfos(STORE, BUCKET_NAME); | |
assertEquals(1, storeObjects.size()); | |
final BulkInfo bulkInfo = new BulkInfo(BUCKET_NAME, STORE, storeObjects.iterator().next().getId()); | |
final RecordInput recordInput = new RecordInput(bulkInfo, _objectStoreService); | |
final Collection<String> ids = new HashSet<String>(); | |
try { | |
Record record = recordInput.getRecord(); | |
while (record != null) { | |
ids.add(record.getId()); | |
record = recordInput.getRecord(); | |
} | |
} finally { | |
recordInput.close(); | |
} | |
// one bunch of parallely processed records will be missing... | |
assertEquals(noOfRecords - noOfRecordsProcessedParallel, ids.size()); | |
} | |
/** tests handling of non-recoverable ProcessingExceptions of all records. */ | |
public void testProcessingNonrecoverableExceptionOfAllRecords() throws Exception { | |
final String jobName = "testProcessingNonrecoverableException"; | |
final int noOfRecordsProcessedParallel = 5; | |
final int noOfRecords = 10; | |
final String workflowName = "testWithOutputBucket"; | |
createJob(workflowName, noOfRecordsProcessedParallel, jobName); | |
// nonrecoverable ProcessingException | |
final String jobRunId = startJob(jobName); | |
for (int i = 0; i < noOfRecords; i++) { | |
final Record record = DataFactory.DEFAULT.createRecord(Integer.toString(i)); | |
record.getMetadata().put(BpelWorkerTestPipelet.ATTRIBUTE_THROW_EXCEPTION, true); | |
_builder.addRecord(jobName, record); | |
} | |
commitAndWaitForJob(jobName, jobRunId, false); // false -> job run should be failed, no successful workflow run | |
// there should be no ouput objects | |
final Collection<StoreObject> storeObjects = _objectStoreService.getStoreObjectInfos(STORE, BUCKET_NAME); | |
assertEquals(0, storeObjects.size()); | |
} | |
/** tests handling of attachments */ | |
public void testProcessingWithAttachments() throws Exception { | |
// Recoverable ProcessingException | |
final String jobName = "testProcessingWithAttachments"; | |
final int noOfRecordsProcessedParallel = 5; | |
final int noOfRecords = 10; | |
final String workflowName = "testWithOutputBucket"; | |
final AnyMap parameters = DataFactory.DEFAULT.createAnyMap(); | |
final String attachmentName = "attachment"; | |
parameters.put(PipelineProcessingWorker.KEY_NUMBER_OF_PARALLEL_RECORDS, noOfRecordsProcessedParallel); | |
parameters.put(PipelineProcessingWorker.KEY_PIPELINE_NAME, "TestAttachmentsPipeline"); | |
parameters.put("tempStore", TEMP_STORE); | |
parameters.put("store", STORE); | |
addJob(jobName, workflowName, parameters); | |
final String jobRunId = startJob(jobName); | |
for (int i = 0; i < noOfRecords; i++) { | |
final Record record = DataFactory.DEFAULT.createRecord(Integer.toString(i)); | |
record.getMetadata().put(BpelWorkerAttachmentTestPipelet.ATTRIBUTE_ATTACHMENT_NAME, attachmentName); | |
record.setAttachment(attachmentName, Integer.toString(i).getBytes("UTF-8")); | |
_builder.addRecord(jobName, record); | |
} | |
commitAndWaitForJob(jobName, jobRunId, true); | |
// everything should be ok, no record will be missing... | |
final Collection<StoreObject> storeObjects = _objectStoreService.getStoreObjectInfos(STORE, BUCKET_NAME); | |
assertEquals(1, storeObjects.size()); | |
final BulkInfo bulkInfo = new BulkInfo(BUCKET_NAME, STORE, storeObjects.iterator().next().getId()); | |
final RecordInput recordInput = new RecordInput(bulkInfo, _objectStoreService); | |
final Collection<String> ids = new HashSet<String>(); | |
try { | |
Record record = recordInput.getRecord(); | |
while (record != null) { | |
ids.add(record.getId()); | |
final String idAsString = String.valueOf(record.getId()); | |
assertEquals(idAsString, | |
record.getMetadata().getStringValue(BpelWorkerAttachmentTestPipelet.ATTRIBUTE_ATTACHMENT_TEXT)); | |
record = recordInput.getRecord(); | |
} | |
} finally { | |
recordInput.close(); | |
} | |
assertEquals(noOfRecords, ids.size()); | |
} | |
private void executeJob(final String workflowName, final int noOfRecords, final int noOfRecordsProcessedParallel) | |
throws Exception { | |
final String jobName = "testJob" + noOfRecordsProcessedParallel; | |
createJob(workflowName, noOfRecordsProcessedParallel, jobName); | |
final String jobRunId = startJob(jobName); | |
for (int i = 0; i < noOfRecords; i++) { | |
_builder.addRecord(jobName, DataFactory.DEFAULT.createRecord(Integer.toString(i))); | |
} | |
commitAndWaitForJob(jobName, jobRunId, true); | |
} | |
/** | |
* @param jobName | |
* @param jobRunId | |
* @param expectJobSuccessful | |
* @throws BulkbuilderException | |
* @throws JobManagerException | |
* @throws Exception | |
*/ | |
protected void commitAndWaitForJob(final String jobName, final String jobRunId, final boolean expectJobSuccessful) | |
throws BulkbuilderException, JobManagerException, Exception { | |
_builder.commitJob(jobName); | |
_jobManager.finishJob(jobName, jobRunId); | |
waitForJobRunCompleted(jobName, jobRunId, 100000); // wait max. 10 seconds | |
if (expectJobSuccessful) { | |
assertJobRunSucceeded(jobName, jobRunId, 1); | |
} else { | |
assertJobRunFailed(jobName, jobRunId); | |
} | |
} | |
/** | |
* @param workflowName | |
* @param noOfRecordsProcessedParallel | |
* @param jobName | |
* @throws Exception | |
*/ | |
protected void createJob(final String workflowName, final int noOfRecordsProcessedParallel, final String jobName) | |
throws Exception { | |
final AnyMap parameters = DataFactory.DEFAULT.createAnyMap(); | |
parameters.put(PipelineProcessingWorker.KEY_NUMBER_OF_PARALLEL_RECORDS, noOfRecordsProcessedParallel); | |
parameters.put(PipelineProcessingWorker.KEY_PIPELINE_NAME, BPEL_TEST_PIPELINE); | |
parameters.put("tempStore", TEMP_STORE); | |
parameters.put("store", STORE); | |
addJob(jobName, workflowName, parameters); | |
} | |
private void checkAndRemoveRecordsForTwoPipelines(final String store2, final int noOfRecords, | |
final int noOfParallelRecords) throws ObjectStoreException, IOException { | |
final Collection<Long> parallelRecordNos = new ArrayList<Long>(); | |
parallelRecordNos.add(Long.valueOf(Math.min(noOfParallelRecords, noOfRecords))); | |
// 1: constant from workflow. | |
parallelRecordNos.add(Long.valueOf(Math.min(1, noOfRecords))); | |
checkAndRemoveRecords(STORE, noOfRecords, parallelRecordNos, Math.min(noOfParallelRecords, noOfRecords)); | |
} | |
private void checkAndRemoveRecords(final String store2, final int noOfRecords, final int noOfParallelRecords) | |
throws ObjectStoreException, IOException { | |
checkAndRemoveRecords(STORE, noOfRecords, null, Math.min(noOfParallelRecords, noOfRecords)); | |
} | |
/** | |
* checks the output bulk for correctness and removes it afterwards. | |
* | |
* @throws ObjectStoreException | |
* @throws IOException | |
*/ | |
private void checkAndRemoveRecords(final String storeName, final int noOfRecords, | |
final Collection<Long> noOfParallelRecords, final int attributeRecordNo) throws ObjectStoreException, | |
IOException { | |
final Collection<StoreObject> storeObjects = _objectStoreService.getStoreObjectInfos(storeName, BUCKET_NAME); | |
assertEquals(1, storeObjects.size()); | |
final BulkInfo bulkInfo = new BulkInfo(BUCKET_NAME, storeName, storeObjects.iterator().next().getId()); | |
final RecordInput recordInput = new RecordInput(bulkInfo, _objectStoreService); | |
final Collection<String> ids = new HashSet<String>(); | |
try { | |
Record record = recordInput.getRecord(); | |
while (record != null) { | |
ids.add(record.getId()); | |
if (noOfParallelRecords != null) { | |
final Collection<Long> noOfParallelRecordsFound = | |
record.getMetadata().getSeq(BpelWorkerTestPipelet2.SECOND_ATTRIBUTE).asLongs(); | |
assertEquals(noOfParallelRecords, noOfParallelRecordsFound); | |
} | |
assertEquals(attributeRecordNo, record.getMetadata().getSeq(BpelWorkerTestPipelet2.ATTRIBUTE_RECORD_COUNT) | |
.getLongValue(0).intValue()); | |
record = recordInput.getRecord(); | |
} | |
} finally { | |
recordInput.close(); | |
_objectStoreService.removeObject(storeName, bulkInfo.getObjectName()); | |
} | |
} | |
/** | |
* checks the output bulk for correctness and removes it afterwards. | |
* | |
* @throws ObjectStoreException | |
* @throws IOException | |
*/ | |
private void checkParametersAndRemoveRecords(final String storeName, final int noOfRecords, | |
final int noOfParallelRecords) throws ObjectStoreException, IOException { | |
final Collection<StoreObject> storeObjects = _objectStoreService.getStoreObjectInfos(storeName, BUCKET_NAME); | |
assertEquals(1, storeObjects.size()); | |
final BulkInfo bulkInfo = new BulkInfo(BUCKET_NAME, storeName, storeObjects.iterator().next().getId()); | |
final RecordInput recordInput = new RecordInput(bulkInfo, _objectStoreService); | |
int recordCount = 0; | |
try { | |
Record record = recordInput.getRecord(); | |
while (record != null) { | |
recordCount++; | |
final AnyMap parameters = record.getMetadata().getMap(ParameterAccessor.DEFAULT_PARAMETERS_ATTRIBUTE); | |
assertNotNull(parameters); | |
// compare with #createJob() for expected parameter values. | |
assertEquals(BPEL_TEST_PIPELINE, parameters.getStringValue(PipelineProcessingWorker.KEY_PIPELINE_NAME)); | |
assertEquals(Integer.toString(noOfParallelRecords), | |
parameters.getStringValue(PipelineProcessingWorker.KEY_NUMBER_OF_PARALLEL_RECORDS)); | |
assertEquals(TEMP_STORE, parameters.getStringValue("tempStore")); | |
assertEquals(STORE, parameters.getStringValue("store")); | |
record = recordInput.getRecord(); | |
} | |
} finally { | |
recordInput.close(); | |
_objectStoreService.removeObject(storeName, bulkInfo.getObjectName()); | |
} | |
assertEquals(noOfRecords, recordCount); | |
} | |
} |