/******************************************************************************* | |
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. | |
* All rights reserved. This program and the accompanying materials are made available | |
* under the terms of the Eclipse Public License v1.0 which accompanies this distribution, | |
* and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: Juergen Schumacher (Attensity Europe GmbH) - implementation | |
*******************************************************************************/ | |
package org.eclipse.smila.processing.worker.test; | |
import java.util.Collection; | |
import org.eclipse.smila.bulkbuilder.BulkbuilderService; | |
import org.eclipse.smila.datamodel.AnyMap; | |
import org.eclipse.smila.datamodel.DataFactory; | |
import org.eclipse.smila.datamodel.Record; | |
import org.eclipse.smila.jobmanager.test.JobManagerTestBase; | |
import org.eclipse.smila.objectstore.StoreObject; | |
import org.eclipse.smila.taskmanager.BulkInfo; | |
import org.eclipse.smila.taskworker.input.RecordInput; | |
/** | |
* Test for example pipeline with filter pipelet. | |
*/ | |
public class TestFilterPipeline extends JobManagerTestBase { | |
/** store name for test. */ | |
private static final String STORE_NAME = "filtering"; | |
/** final output bucket. */ | |
private static final String BUCKET_NAME = "outbulkBucket"; | |
/** service under test. */ | |
protected BulkbuilderService _builder; | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
protected void setUp() throws Exception { | |
super.setUp(); | |
_builder = getService(BulkbuilderService.class); | |
assertNotNull(_builder); | |
_objectStoreService.ensureStore(STORE_NAME); | |
_objectStoreService.clearStore(STORE_NAME); | |
} | |
/** | |
* Adds bulks and the FilterWorker reads the records and filters them if the record-id is between 0 and 5. | |
* | |
* @throws Exception | |
*/ | |
public void testFilterBulks() throws Exception { | |
final String jobName = "testFilterBulks"; | |
final AnyMap parameters = DataFactory.DEFAULT.createAnyMap(); | |
parameters.put("store", STORE_NAME); | |
parameters.put("tempStore", STORE_NAME); | |
addJob(jobName, "filterPipelineTest", parameters); | |
final String jobId = startJob(jobName); | |
final int noOfRecords = 12; | |
for (int i = 0; i < noOfRecords; i++) { | |
_builder.addRecord(jobName, DataFactory.DEFAULT.createRecord(Integer.toString(i))); | |
} | |
_builder.commitJob(jobName); | |
_jobRunEngine.finishJob(jobName, jobId); | |
waitForJobRunCompleted(jobName, jobId, 10000); | |
final Collection<StoreObject> storeObjects = _objectStoreService.getStoreObjectInfos(STORE_NAME, BUCKET_NAME); | |
assertEquals(1, storeObjects.size()); | |
final BulkInfo bulkInfo = new BulkInfo(BUCKET_NAME, STORE_NAME, storeObjects.iterator().next().getId()); | |
final RecordInput recordInput = new RecordInput(bulkInfo, _objectStoreService); | |
try { | |
int numberOfRecords = 0; | |
Record record = recordInput.getRecord(); | |
while (record != null) { | |
assertEquals(Integer.toString(numberOfRecords), record.getId()); | |
numberOfRecords++; | |
record = recordInput.getRecord(); | |
} | |
assertEquals(6, numberOfRecords); | |
} finally { | |
recordInput.close(); | |
} | |
} | |
/** | |
* Adds bulks and the FilterWorker reads the records and filters them if the record-id is between 0 and 5. But this | |
* time only record ids greater than 9 will be created, so no record will match. | |
* | |
* @throws Exception | |
*/ | |
public void testFilterBulksNoMatch() throws Exception { | |
final String jobName = "testFilterBulksNoMatch"; | |
final AnyMap parameters = DataFactory.DEFAULT.createAnyMap(); | |
parameters.put("store", STORE_NAME); | |
parameters.put("tempStore", STORE_NAME); | |
addJob(jobName, "filterPipelineTest", parameters); | |
final String jobId = startJob(jobName); | |
final int noOfRecords = 12; | |
for (int i = 0; i < noOfRecords; i++) { | |
_builder.addRecord(jobName, DataFactory.DEFAULT.createRecord(Integer.toString(10 + i))); | |
} | |
_builder.commitJob(jobName); | |
_jobRunEngine.finishJob(jobName, jobId); | |
waitForJobRunCompleted(jobName, jobId, 10000); | |
final Collection<StoreObject> storeObjects = | |
_objectStoreService.getStoreObjectInfos(STORE_NAME, BUCKET_NAME + "/*"); | |
assertEquals(0, storeObjects.size()); | |
} | |
} |