blob: 6000d2464635f48a1fcc80ee3047598f24e4575b [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008 empolis GmbH and brox IT Solutions GmbH. All rights reserved. This program and the accompanying
* materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Thomas Menzel (brox IT Solution GmbH) - initial creator
*******************************************************************************/
package org.eclipse.smila.processing.worker.test;
import java.util.Collection;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.DataFactory;
import org.eclipse.smila.datamodel.Record;
import org.eclipse.smila.objectstore.StoreObject;
import org.eclipse.smila.processing.worker.PipelineProcessorWorker;
import org.eclipse.smila.taskmanager.BulkInfo;
import org.eclipse.smila.taskworker.input.RecordInput;
/**
* more tests for the pipeline processor worker.
*/
public class TestPipelineSelectedByRecord extends ProcessingWorkerTestBase {
private static final String BPEL_TEST_PIPELINE = "TestPipeline";
@Override
protected void createJob(final String workflowName, final String jobName) throws Exception {
createJob(workflowName, jobName, 1);
}
protected void createJob(final String workflowName, final String jobName, final int runBulkSize) throws Exception {
final AnyMap parameters = DataFactory.DEFAULT.createAnyMap();
parameters.put(PipelineProcessorWorker.KEY_PIPELINERUN_BULKSIZE, runBulkSize);
parameters.put(PipelineProcessorWorker.KEY_PIPELINE_NAME, BPEL_TEST_PIPELINE);
parameters.put("tempStore", TEMP_STORE);
parameters.put("store", STORE);
addJob(jobName, workflowName, parameters);
}
protected void executeJob(final String workflowName, final int runBulkSize, final Record... records)
throws Exception {
final String jobName = "testJob";
createJob(workflowName, jobName, runBulkSize);
final String jobRunId = startJob(jobName);
for (final Record record : records) {
// System.out.println("push record: " + record);
_builder.addRecord(jobName, record);
}
commitAndWaitForJob(jobName, jobRunId, true);
}
/**
* tests that 2 instances of the same pipelet within same pipeline, the configs dont get mixed up.
*/
public void testRecordSelectsPipeline() throws Exception {
final String workflowName = "testWithOutputBucket";
final Record r0 = DataFactory.DEFAULT.createRecord("0");
final Record r1 = DataFactory.DEFAULT.createRecord("1");
r1.getMetadata().put(PipelineProcessorWorker.ATTR_PIPELINE_NAME, "TestPipeline2");
final Record r2 = DataFactory.DEFAULT.createRecord("2");
executeJob(workflowName, 1, r0, r1, r2);
final Collection<StoreObject> storeObjects = _objectStoreService.getStoreObjectInfos(STORE, BUCKET_NAME);
assertEquals(1, storeObjects.size());
final BulkInfo bulkInfo = new BulkInfo(BUCKET_NAME, STORE, storeObjects.iterator().next().getId());
final RecordInput recordInput = new RecordInput(bulkInfo, _objectStoreService);
try {
int i = 0;
for (Record record = recordInput.getRecord(); record != null; record = recordInput.getRecord()) {
if (i % 2 == 0) {
assertTrue(record.getMetadata().containsKey(BpelWorkerTestPipelet.ATTRIBUTE_RECORD_COUNT));
assertFalse(record.getMetadata().containsKey(BpelWorkerTestPipelet2.SECOND_ATTRIBUTE));
} else {
assertFalse(record.getMetadata().containsKey(BpelWorkerTestPipelet.ATTRIBUTE_RECORD_COUNT));
assertTrue(record.getMetadata().containsKey(BpelWorkerTestPipelet2.SECOND_ATTRIBUTE));
}
i++;
}
assertEquals(3, i);
} finally {
recordInput.close();
_objectStoreService.removeObject(STORE, bulkInfo.getObjectName());
}
}
/**
* tests that a record specifying an undefined pipeline is processed by the default pipeline.
*/
public void testInvalidPipelineNameUsesDefault() throws Exception {
final String workflowName = "testWithOutputBucket";
final Record r0 = DataFactory.DEFAULT.createRecord("0");
final Record r1 = DataFactory.DEFAULT.createRecord("1");
r1.getMetadata().put(PipelineProcessorWorker.ATTR_PIPELINE_NAME, "TestPipeline2");
final Record r2 = DataFactory.DEFAULT.createRecord("2");
r2.getMetadata().put(PipelineProcessorWorker.ATTR_PIPELINE_NAME, "NoSuchTestPipeline");
executeJob(workflowName, 1, r0, r1, r2);
final Collection<StoreObject> storeObjects = _objectStoreService.getStoreObjectInfos(STORE, BUCKET_NAME);
assertEquals(1, storeObjects.size());
final BulkInfo bulkInfo = new BulkInfo(BUCKET_NAME, STORE, storeObjects.iterator().next().getId());
final RecordInput recordInput = new RecordInput(bulkInfo, _objectStoreService);
try {
int i = 0;
for (Record record = recordInput.getRecord(); record != null; record = recordInput.getRecord()) {
if (i % 2 == 0) {
assertTrue(record.getMetadata().containsKey(BpelWorkerTestPipelet.ATTRIBUTE_RECORD_COUNT));
assertFalse(record.getMetadata().containsKey(BpelWorkerTestPipelet2.SECOND_ATTRIBUTE));
} else {
assertFalse(record.getMetadata().containsKey(BpelWorkerTestPipelet.ATTRIBUTE_RECORD_COUNT));
assertTrue(record.getMetadata().containsKey(BpelWorkerTestPipelet2.SECOND_ATTRIBUTE));
}
i++;
}
assertEquals(3, i);
} finally {
recordInput.close();
_objectStoreService.removeObject(STORE, bulkInfo.getObjectName());
}
}
/**
* tests that pipelines using same pipeline are processed in one call if runbulksize allows.
*/
public void testRunBulksCreated() throws Exception {
final String workflowName = "testWithOutputBucket";
final Record[] records = new Record[9];
for (int i = 0; i < 3; i++) {
records[i] = DataFactory.DEFAULT.createRecord("0" + i);
final Record r1 = DataFactory.DEFAULT.createRecord("1" + i);
r1.getMetadata().put(PipelineProcessorWorker.ATTR_PIPELINE_NAME, "TestPipeline2");
records[i + 3] = r1;
records[i + 6] = DataFactory.DEFAULT.createRecord("2" + i);
}
executeJob(workflowName, 10, records);
final Collection<StoreObject> storeObjects = _objectStoreService.getStoreObjectInfos(STORE, BUCKET_NAME);
assertEquals(1, storeObjects.size());
final BulkInfo bulkInfo = new BulkInfo(BUCKET_NAME, STORE, storeObjects.iterator().next().getId());
final RecordInput recordInput = new RecordInput(bulkInfo, _objectStoreService);
try {
int i = 0;
for (Record record = recordInput.getRecord(); record != null; record = recordInput.getRecord()) {
final AnyMap md = record.getMetadata();
// System.out.println("read record: " + md);
if (i < 3 || i > 5) {
assertEquals(3, md.getSeq(BpelWorkerTestPipelet.ATTRIBUTE_RECORD_COUNT).getLongValue(0).intValue());
assertFalse(md.containsKey(BpelWorkerTestPipelet2.SECOND_ATTRIBUTE));
} else {
assertFalse(md.containsKey(BpelWorkerTestPipelet.ATTRIBUTE_RECORD_COUNT));
assertEquals(3, md.getSeq(BpelWorkerTestPipelet2.SECOND_ATTRIBUTE).getLongValue(0).intValue());
}
i++;
}
assertEquals(9, i);
} finally {
recordInput.close();
_objectStoreService.removeObject(STORE, bulkInfo.getObjectName());
}
}
/**
* tests that each record with different pipeline attribute is processed separalety even when runbulksize is bigger
* than 1
*/
public void testNoRunBulksCreated() throws Exception {
final String workflowName = "testWithOutputBucket";
final Record r0 = DataFactory.DEFAULT.createRecord("0");
final Record r1 = DataFactory.DEFAULT.createRecord("1");
r1.getMetadata().put(PipelineProcessorWorker.ATTR_PIPELINE_NAME, "TestPipeline2");
final Record r2 = DataFactory.DEFAULT.createRecord("2");
executeJob(workflowName, 10, r0, r1, r2);
final Collection<StoreObject> storeObjects = _objectStoreService.getStoreObjectInfos(STORE, BUCKET_NAME);
assertEquals(1, storeObjects.size());
final BulkInfo bulkInfo = new BulkInfo(BUCKET_NAME, STORE, storeObjects.iterator().next().getId());
final RecordInput recordInput = new RecordInput(bulkInfo, _objectStoreService);
try {
int i = 0;
for (Record record = recordInput.getRecord(); record != null; record = recordInput.getRecord()) {
final AnyMap md = record.getMetadata();
if (i % 2 == 0) {
assertEquals(1, md.getSeq(BpelWorkerTestPipelet.ATTRIBUTE_RECORD_COUNT).getLongValue(0).intValue());
assertFalse(md.containsKey(BpelWorkerTestPipelet2.SECOND_ATTRIBUTE));
} else {
assertFalse(md.containsKey(BpelWorkerTestPipelet.ATTRIBUTE_RECORD_COUNT));
assertEquals(1, md.getSeq(BpelWorkerTestPipelet2.SECOND_ATTRIBUTE).getLongValue(0).intValue());
}
i++;
}
assertEquals(3, i);
} finally {
recordInput.close();
_objectStoreService.removeObject(STORE, bulkInfo.getObjectName());
}
}
/**
* tests that selecting the default pipeline does not break run bulk creation
*/
public void testRunBulkCreated2() throws Exception {
final String workflowName = "testWithOutputBucket";
final Record r0 = DataFactory.DEFAULT.createRecord("0");
final Record r1 = DataFactory.DEFAULT.createRecord("1");
r1.getMetadata().put(PipelineProcessorWorker.ATTR_PIPELINE_NAME, "TestPipeline");
final Record r2 = DataFactory.DEFAULT.createRecord("2");
executeJob(workflowName, 10, r0, r1, r2);
final Collection<StoreObject> storeObjects = _objectStoreService.getStoreObjectInfos(STORE, BUCKET_NAME);
assertEquals(1, storeObjects.size());
final BulkInfo bulkInfo = new BulkInfo(BUCKET_NAME, STORE, storeObjects.iterator().next().getId());
final RecordInput recordInput = new RecordInput(bulkInfo, _objectStoreService);
try {
int i = 0;
for (Record record = recordInput.getRecord(); record != null; record = recordInput.getRecord()) {
final AnyMap md = record.getMetadata();
assertEquals(3, md.getSeq(BpelWorkerTestPipelet.ATTRIBUTE_RECORD_COUNT).getLongValue(0).intValue());
assertFalse(md.containsKey(BpelWorkerTestPipelet2.SECOND_ATTRIBUTE));
i++;
}
assertEquals(3, i);
} finally {
recordInput.close();
_objectStoreService.removeObject(STORE, bulkInfo.getObjectName());
}
}
/**
* test if multiple records with same record ID are processed.
*/
public void testRecordsWithSameRecordId() throws Exception {
final String workflowName = "testWithOutputBucket";
final Record r0 = DataFactory.DEFAULT.createRecord("0");
r0.getMetadata().put("name", "r0");
final Record r1 = DataFactory.DEFAULT.createRecord("0");
r1.getMetadata().put("name", "r1");
final Record r2 = DataFactory.DEFAULT.createRecord("0");
r2.getMetadata().put("name", "r2");
executeJob(workflowName, 10, r0, r1, r2);
final Collection<StoreObject> storeObjects = _objectStoreService.getStoreObjectInfos(STORE, BUCKET_NAME);
assertEquals(1, storeObjects.size());
final BulkInfo bulkInfo = new BulkInfo(BUCKET_NAME, STORE, storeObjects.iterator().next().getId());
final RecordInput recordInput = new RecordInput(bulkInfo, _objectStoreService);
try {
int i = 0;
for (Record record = recordInput.getRecord(); record != null; record = recordInput.getRecord()) {
final AnyMap md = record.getMetadata();
assertEquals(1, md.getSeq(BpelWorkerTestPipelet.ATTRIBUTE_RECORD_COUNT).getLongValue(0).intValue());
assertEquals("r" + i, md.getStringValue("name"));
i++;
}
assertEquals(3, i);
} finally {
recordInput.close();
_objectStoreService.removeObject(STORE, bulkInfo.getObjectName());
}
}
/**
* test if multiple records with same record ID are processed.
*/
public void testRecordsWithSameRecordId2() throws Exception {
final String workflowName = "testWithOutputBucket";
final Record r0 = DataFactory.DEFAULT.createRecord("0");
r0.getMetadata().put("name", "r0");
final Record r1 = DataFactory.DEFAULT.createRecord("1");
r1.getMetadata().put("name", "r1");
final Record r2 = DataFactory.DEFAULT.createRecord("0");
r2.getMetadata().put("name", "r2");
final Record r3 = DataFactory.DEFAULT.createRecord("1");
r3.getMetadata().put("name", "r3");
executeJob(workflowName, 10, r0, r1, r2, r3);
final Collection<StoreObject> storeObjects = _objectStoreService.getStoreObjectInfos(STORE, BUCKET_NAME);
assertEquals(1, storeObjects.size());
final BulkInfo bulkInfo = new BulkInfo(BUCKET_NAME, STORE, storeObjects.iterator().next().getId());
final RecordInput recordInput = new RecordInput(bulkInfo, _objectStoreService);
try {
int i = 0;
for (Record record = recordInput.getRecord(); record != null; record = recordInput.getRecord()) {
final AnyMap md = record.getMetadata();
assertEquals(2, md.getSeq(BpelWorkerTestPipelet.ATTRIBUTE_RECORD_COUNT).getLongValue(0).intValue());
assertEquals("r" + i, md.getStringValue("name"));
i++;
}
assertEquals(4, i);
} finally {
recordInput.close();
_objectStoreService.removeObject(STORE, bulkInfo.getObjectName());
}
}
}