| /******************************************************************************* |
| * Copyright (c) 2008 empolis GmbH and brox IT Solutions GmbH. All rights reserved. This program and the accompanying |
| * materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this distribution, |
| * and is available at http://www.eclipse.org/legal/epl-v10.html |
| * |
| * Contributors: Thomas Menzel (brox IT Solution GmbH) - initial creator |
| *******************************************************************************/ |
| package org.eclipse.smila.processing.worker.test; |
| |
| import java.util.Collection; |
| |
| import org.eclipse.smila.datamodel.AnyMap; |
| import org.eclipse.smila.datamodel.DataFactory; |
| import org.eclipse.smila.datamodel.Record; |
| import org.eclipse.smila.objectstore.StoreObject; |
| import org.eclipse.smila.processing.worker.PipelineProcessorWorker; |
| import org.eclipse.smila.taskmanager.BulkInfo; |
| import org.eclipse.smila.taskworker.input.RecordInput; |
| |
| /** |
| * more tests for the pipeline processor worker. |
| */ |
| public class TestPipelineSelectedByRecord extends ProcessingWorkerTestBase { |
| |
| private static final String BPEL_TEST_PIPELINE = "TestPipeline"; |
| |
| @Override |
| protected void createJob(final String workflowName, final String jobName) throws Exception { |
| createJob(workflowName, jobName, 1); |
| } |
| |
| protected void createJob(final String workflowName, final String jobName, final int runBulkSize) throws Exception { |
| final AnyMap parameters = DataFactory.DEFAULT.createAnyMap(); |
| parameters.put(PipelineProcessorWorker.KEY_PIPELINERUN_BULKSIZE, runBulkSize); |
| parameters.put(PipelineProcessorWorker.KEY_PIPELINE_NAME, BPEL_TEST_PIPELINE); |
| parameters.put("tempStore", TEMP_STORE); |
| parameters.put("store", STORE); |
| addJob(jobName, workflowName, parameters); |
| } |
| |
| protected void executeJob(final String workflowName, final int runBulkSize, final Record... records) |
| throws Exception { |
| final String jobName = "testJob"; |
| createJob(workflowName, jobName, runBulkSize); |
| final String jobRunId = startJob(jobName); |
| for (final Record record : records) { |
| // System.out.println("push record: " + record); |
| _builder.addRecord(jobName, record); |
| } |
| commitAndWaitForJob(jobName, jobRunId, true); |
| } |
| |
| /** |
| * tests that 2 instances of the same pipelet within same pipeline, the configs dont get mixed up. |
| */ |
| public void testRecordSelectsPipeline() throws Exception { |
| final String workflowName = "testWithOutputBucket"; |
| final Record r0 = DataFactory.DEFAULT.createRecord("0"); |
| final Record r1 = DataFactory.DEFAULT.createRecord("1"); |
| r1.getMetadata().put(PipelineProcessorWorker.ATTR_PIPELINE_NAME, "TestPipeline2"); |
| final Record r2 = DataFactory.DEFAULT.createRecord("2"); |
| executeJob(workflowName, 1, r0, r1, r2); |
| |
| final Collection<StoreObject> storeObjects = _objectStoreService.getStoreObjectInfos(STORE, BUCKET_NAME); |
| assertEquals(1, storeObjects.size()); |
| |
| final BulkInfo bulkInfo = new BulkInfo(BUCKET_NAME, STORE, storeObjects.iterator().next().getId()); |
| final RecordInput recordInput = new RecordInput(bulkInfo, _objectStoreService); |
| try { |
| int i = 0; |
| for (Record record = recordInput.getRecord(); record != null; record = recordInput.getRecord()) { |
| if (i % 2 == 0) { |
| assertTrue(record.getMetadata().containsKey(BpelWorkerTestPipelet.ATTRIBUTE_RECORD_COUNT)); |
| assertFalse(record.getMetadata().containsKey(BpelWorkerTestPipelet2.SECOND_ATTRIBUTE)); |
| } else { |
| assertFalse(record.getMetadata().containsKey(BpelWorkerTestPipelet.ATTRIBUTE_RECORD_COUNT)); |
| assertTrue(record.getMetadata().containsKey(BpelWorkerTestPipelet2.SECOND_ATTRIBUTE)); |
| } |
| i++; |
| } |
| assertEquals(3, i); |
| } finally { |
| recordInput.close(); |
| _objectStoreService.removeObject(STORE, bulkInfo.getObjectName()); |
| } |
| } |
| |
| /** |
| * tests that a record specifying an undefined pipeline is processed by the default pipeline. |
| */ |
| public void testInvalidPipelineNameUsesDefault() throws Exception { |
| final String workflowName = "testWithOutputBucket"; |
| final Record r0 = DataFactory.DEFAULT.createRecord("0"); |
| final Record r1 = DataFactory.DEFAULT.createRecord("1"); |
| r1.getMetadata().put(PipelineProcessorWorker.ATTR_PIPELINE_NAME, "TestPipeline2"); |
| final Record r2 = DataFactory.DEFAULT.createRecord("2"); |
| r2.getMetadata().put(PipelineProcessorWorker.ATTR_PIPELINE_NAME, "NoSuchTestPipeline"); |
| executeJob(workflowName, 1, r0, r1, r2); |
| |
| final Collection<StoreObject> storeObjects = _objectStoreService.getStoreObjectInfos(STORE, BUCKET_NAME); |
| assertEquals(1, storeObjects.size()); |
| |
| final BulkInfo bulkInfo = new BulkInfo(BUCKET_NAME, STORE, storeObjects.iterator().next().getId()); |
| final RecordInput recordInput = new RecordInput(bulkInfo, _objectStoreService); |
| try { |
| int i = 0; |
| for (Record record = recordInput.getRecord(); record != null; record = recordInput.getRecord()) { |
| if (i % 2 == 0) { |
| assertTrue(record.getMetadata().containsKey(BpelWorkerTestPipelet.ATTRIBUTE_RECORD_COUNT)); |
| assertFalse(record.getMetadata().containsKey(BpelWorkerTestPipelet2.SECOND_ATTRIBUTE)); |
| } else { |
| assertFalse(record.getMetadata().containsKey(BpelWorkerTestPipelet.ATTRIBUTE_RECORD_COUNT)); |
| assertTrue(record.getMetadata().containsKey(BpelWorkerTestPipelet2.SECOND_ATTRIBUTE)); |
| } |
| i++; |
| } |
| assertEquals(3, i); |
| } finally { |
| recordInput.close(); |
| _objectStoreService.removeObject(STORE, bulkInfo.getObjectName()); |
| } |
| } |
| |
| /** |
| * tests that pipelines using same pipeline are processed in one call if runbulksize allows. |
| */ |
| public void testRunBulksCreated() throws Exception { |
| final String workflowName = "testWithOutputBucket"; |
| final Record[] records = new Record[9]; |
| for (int i = 0; i < 3; i++) { |
| records[i] = DataFactory.DEFAULT.createRecord("0" + i); |
| final Record r1 = DataFactory.DEFAULT.createRecord("1" + i); |
| r1.getMetadata().put(PipelineProcessorWorker.ATTR_PIPELINE_NAME, "TestPipeline2"); |
| records[i + 3] = r1; |
| records[i + 6] = DataFactory.DEFAULT.createRecord("2" + i); |
| } |
| executeJob(workflowName, 10, records); |
| |
| final Collection<StoreObject> storeObjects = _objectStoreService.getStoreObjectInfos(STORE, BUCKET_NAME); |
| assertEquals(1, storeObjects.size()); |
| |
| final BulkInfo bulkInfo = new BulkInfo(BUCKET_NAME, STORE, storeObjects.iterator().next().getId()); |
| final RecordInput recordInput = new RecordInput(bulkInfo, _objectStoreService); |
| try { |
| int i = 0; |
| for (Record record = recordInput.getRecord(); record != null; record = recordInput.getRecord()) { |
| final AnyMap md = record.getMetadata(); |
| // System.out.println("read record: " + md); |
| if (i < 3 || i > 5) { |
| assertEquals(3, md.getSeq(BpelWorkerTestPipelet.ATTRIBUTE_RECORD_COUNT).getLongValue(0).intValue()); |
| assertFalse(md.containsKey(BpelWorkerTestPipelet2.SECOND_ATTRIBUTE)); |
| } else { |
| assertFalse(md.containsKey(BpelWorkerTestPipelet.ATTRIBUTE_RECORD_COUNT)); |
| assertEquals(3, md.getSeq(BpelWorkerTestPipelet2.SECOND_ATTRIBUTE).getLongValue(0).intValue()); |
| } |
| i++; |
| } |
| assertEquals(9, i); |
| } finally { |
| recordInput.close(); |
| _objectStoreService.removeObject(STORE, bulkInfo.getObjectName()); |
| } |
| } |
| |
| /** |
| * tests that each record with different pipeline attribute is processed separalety even when runbulksize is bigger |
| * than 1 |
| */ |
| public void testNoRunBulksCreated() throws Exception { |
| final String workflowName = "testWithOutputBucket"; |
| final Record r0 = DataFactory.DEFAULT.createRecord("0"); |
| final Record r1 = DataFactory.DEFAULT.createRecord("1"); |
| r1.getMetadata().put(PipelineProcessorWorker.ATTR_PIPELINE_NAME, "TestPipeline2"); |
| final Record r2 = DataFactory.DEFAULT.createRecord("2"); |
| executeJob(workflowName, 10, r0, r1, r2); |
| |
| final Collection<StoreObject> storeObjects = _objectStoreService.getStoreObjectInfos(STORE, BUCKET_NAME); |
| assertEquals(1, storeObjects.size()); |
| |
| final BulkInfo bulkInfo = new BulkInfo(BUCKET_NAME, STORE, storeObjects.iterator().next().getId()); |
| final RecordInput recordInput = new RecordInput(bulkInfo, _objectStoreService); |
| try { |
| int i = 0; |
| for (Record record = recordInput.getRecord(); record != null; record = recordInput.getRecord()) { |
| final AnyMap md = record.getMetadata(); |
| if (i % 2 == 0) { |
| assertEquals(1, md.getSeq(BpelWorkerTestPipelet.ATTRIBUTE_RECORD_COUNT).getLongValue(0).intValue()); |
| assertFalse(md.containsKey(BpelWorkerTestPipelet2.SECOND_ATTRIBUTE)); |
| } else { |
| assertFalse(md.containsKey(BpelWorkerTestPipelet.ATTRIBUTE_RECORD_COUNT)); |
| assertEquals(1, md.getSeq(BpelWorkerTestPipelet2.SECOND_ATTRIBUTE).getLongValue(0).intValue()); |
| } |
| i++; |
| } |
| assertEquals(3, i); |
| } finally { |
| recordInput.close(); |
| _objectStoreService.removeObject(STORE, bulkInfo.getObjectName()); |
| } |
| } |
| |
| /** |
| * tests that selecting the default pipeline does not break run bulk creation |
| */ |
| public void testRunBulkCreated2() throws Exception { |
| final String workflowName = "testWithOutputBucket"; |
| final Record r0 = DataFactory.DEFAULT.createRecord("0"); |
| final Record r1 = DataFactory.DEFAULT.createRecord("1"); |
| r1.getMetadata().put(PipelineProcessorWorker.ATTR_PIPELINE_NAME, "TestPipeline"); |
| final Record r2 = DataFactory.DEFAULT.createRecord("2"); |
| executeJob(workflowName, 10, r0, r1, r2); |
| |
| final Collection<StoreObject> storeObjects = _objectStoreService.getStoreObjectInfos(STORE, BUCKET_NAME); |
| assertEquals(1, storeObjects.size()); |
| |
| final BulkInfo bulkInfo = new BulkInfo(BUCKET_NAME, STORE, storeObjects.iterator().next().getId()); |
| final RecordInput recordInput = new RecordInput(bulkInfo, _objectStoreService); |
| try { |
| int i = 0; |
| for (Record record = recordInput.getRecord(); record != null; record = recordInput.getRecord()) { |
| final AnyMap md = record.getMetadata(); |
| assertEquals(3, md.getSeq(BpelWorkerTestPipelet.ATTRIBUTE_RECORD_COUNT).getLongValue(0).intValue()); |
| assertFalse(md.containsKey(BpelWorkerTestPipelet2.SECOND_ATTRIBUTE)); |
| i++; |
| } |
| assertEquals(3, i); |
| } finally { |
| recordInput.close(); |
| _objectStoreService.removeObject(STORE, bulkInfo.getObjectName()); |
| } |
| } |
| |
| /** |
| * test if multiple records with same record ID are processed. |
| */ |
| public void testRecordsWithSameRecordId() throws Exception { |
| final String workflowName = "testWithOutputBucket"; |
| final Record r0 = DataFactory.DEFAULT.createRecord("0"); |
| r0.getMetadata().put("name", "r0"); |
| final Record r1 = DataFactory.DEFAULT.createRecord("0"); |
| r1.getMetadata().put("name", "r1"); |
| final Record r2 = DataFactory.DEFAULT.createRecord("0"); |
| r2.getMetadata().put("name", "r2"); |
| executeJob(workflowName, 10, r0, r1, r2); |
| |
| final Collection<StoreObject> storeObjects = _objectStoreService.getStoreObjectInfos(STORE, BUCKET_NAME); |
| assertEquals(1, storeObjects.size()); |
| |
| final BulkInfo bulkInfo = new BulkInfo(BUCKET_NAME, STORE, storeObjects.iterator().next().getId()); |
| final RecordInput recordInput = new RecordInput(bulkInfo, _objectStoreService); |
| try { |
| int i = 0; |
| for (Record record = recordInput.getRecord(); record != null; record = recordInput.getRecord()) { |
| final AnyMap md = record.getMetadata(); |
| assertEquals(1, md.getSeq(BpelWorkerTestPipelet.ATTRIBUTE_RECORD_COUNT).getLongValue(0).intValue()); |
| assertEquals("r" + i, md.getStringValue("name")); |
| i++; |
| } |
| assertEquals(3, i); |
| } finally { |
| recordInput.close(); |
| _objectStoreService.removeObject(STORE, bulkInfo.getObjectName()); |
| } |
| } |
| |
| /** |
| * test if multiple records with same record ID are processed. |
| */ |
| public void testRecordsWithSameRecordId2() throws Exception { |
| final String workflowName = "testWithOutputBucket"; |
| final Record r0 = DataFactory.DEFAULT.createRecord("0"); |
| r0.getMetadata().put("name", "r0"); |
| final Record r1 = DataFactory.DEFAULT.createRecord("1"); |
| r1.getMetadata().put("name", "r1"); |
| final Record r2 = DataFactory.DEFAULT.createRecord("0"); |
| r2.getMetadata().put("name", "r2"); |
| final Record r3 = DataFactory.DEFAULT.createRecord("1"); |
| r3.getMetadata().put("name", "r3"); |
| executeJob(workflowName, 10, r0, r1, r2, r3); |
| |
| final Collection<StoreObject> storeObjects = _objectStoreService.getStoreObjectInfos(STORE, BUCKET_NAME); |
| assertEquals(1, storeObjects.size()); |
| |
| final BulkInfo bulkInfo = new BulkInfo(BUCKET_NAME, STORE, storeObjects.iterator().next().getId()); |
| final RecordInput recordInput = new RecordInput(bulkInfo, _objectStoreService); |
| try { |
| int i = 0; |
| for (Record record = recordInput.getRecord(); record != null; record = recordInput.getRecord()) { |
| final AnyMap md = record.getMetadata(); |
| assertEquals(2, md.getSeq(BpelWorkerTestPipelet.ATTRIBUTE_RECORD_COUNT).getLongValue(0).intValue()); |
| assertEquals("r" + i, md.getStringValue("name")); |
| i++; |
| } |
| assertEquals(4, i); |
| } finally { |
| recordInput.close(); |
| _objectStoreService.removeObject(STORE, bulkInfo.getObjectName()); |
| } |
| } |
| |
| } |