blob: a962aa3454adadd3132dadfcf5ea9d2dae6956b0 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH.
* All rights reserved. This program and the accompanying materials are made available
* under the terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Juergen Schumacher (Attensity Europe GmbH) - implementation
*******************************************************************************/
package org.eclipse.smila.processing.worker.test;
import java.util.Collection;
import org.eclipse.smila.bulkbuilder.BulkbuilderService;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.DataFactory;
import org.eclipse.smila.datamodel.Record;
import org.eclipse.smila.jobmanager.test.JobManagerTestBase;
import org.eclipse.smila.objectstore.StoreObject;
import org.eclipse.smila.taskmanager.BulkInfo;
import org.eclipse.smila.taskworker.input.RecordInput;
/**
* Test for example pipeline with filter pipelet.
*/
public class TestFilterPipeline extends JobManagerTestBase {
/** store name for test. */
private static final String STORE_NAME = "filtering";
/** final output bucket. */
private static final String BUCKET_NAME = "outbulkBucket";
/** service under test. */
protected BulkbuilderService _builder;
/**
* {@inheritDoc}
*/
@Override
protected void setUp() throws Exception {
super.setUp();
_builder = getService(BulkbuilderService.class);
assertNotNull(_builder);
_objectStoreService.ensureStore(STORE_NAME);
_objectStoreService.clearStore(STORE_NAME);
}
/**
* Adds bulks and the FilterWorker reads the records and filters them if the record-id is between 0 and 5.
*
* @throws Exception
*/
public void testFilterBulks() throws Exception {
final String jobName = "testFilterBulks";
final AnyMap parameters = DataFactory.DEFAULT.createAnyMap();
parameters.put("store", STORE_NAME);
parameters.put("tempStore", STORE_NAME);
addJob(jobName, "filterPipelineTest", parameters);
final String jobId = startJob(jobName);
final int noOfRecords = 12;
for (int i = 0; i < noOfRecords; i++) {
_builder.addRecord(jobName, DataFactory.DEFAULT.createRecord(Integer.toString(i)));
}
_builder.commitJob(jobName);
_jobRunEngine.finishJob(jobName, jobId);
waitForJobRunCompleted(jobName, jobId, 10000);
final Collection<StoreObject> storeObjects = _objectStoreService.getStoreObjectInfos(STORE_NAME, BUCKET_NAME);
assertEquals(1, storeObjects.size());
final BulkInfo bulkInfo = new BulkInfo(BUCKET_NAME, STORE_NAME, storeObjects.iterator().next().getId());
final RecordInput recordInput = new RecordInput(bulkInfo, _objectStoreService);
try {
int numberOfRecords = 0;
Record record = recordInput.getRecord();
while (record != null) {
assertEquals(Integer.toString(numberOfRecords), record.getId());
numberOfRecords++;
record = recordInput.getRecord();
}
assertEquals(6, numberOfRecords);
} finally {
recordInput.close();
}
}
/**
* Adds bulks and the FilterWorker reads the records and filters them if the record-id is between 0 and 5. But this
* time only record ids greater than 9 will be created, so no record will match.
*
* @throws Exception
*/
public void testFilterBulksNoMatch() throws Exception {
final String jobName = "testFilterBulksNoMatch";
final AnyMap parameters = DataFactory.DEFAULT.createAnyMap();
parameters.put("store", STORE_NAME);
parameters.put("tempStore", STORE_NAME);
addJob(jobName, "filterPipelineTest", parameters);
final String jobId = startJob(jobName);
final int noOfRecords = 12;
for (int i = 0; i < noOfRecords; i++) {
_builder.addRecord(jobName, DataFactory.DEFAULT.createRecord(Integer.toString(10 + i)));
}
_builder.commitJob(jobName);
_jobRunEngine.finishJob(jobName, jobId);
waitForJobRunCompleted(jobName, jobId, 10000);
final Collection<StoreObject> storeObjects =
_objectStoreService.getStoreObjectInfos(STORE_NAME, BUCKET_NAME + "/*");
assertEquals(0, storeObjects.size());
}
}