| package org.eclipse.smila.integration.worker.test; |
| |
| import java.io.InputStream; |
| import java.util.Collection; |
| |
| import org.eclipse.smila.datamodel.Record; |
| import org.eclipse.smila.integration.worker.HelloWorldWorker; |
| import org.eclipse.smila.objectstore.StoreObject; |
| |
| public class TestHelloWorldWorker extends WorkerTestbase { |
| |
| /** |
| * Adds bulks and the HelloWorldWorker reads the records and modifies them. |
| * |
| * @throws Exception |
| */ |
| public void testModifyBulks() throws Exception { |
| final String jobName = "HelloWorldJob"; |
| // start job with workflow "HelloWorldTest" from workflows.json you have defined before |
| final String jobId = startJob(jobName, "HelloWorldTest", "tempstore"); |
| final int noOfRecords = 5; |
| // add 5 records |
| for (int i = 0; i < noOfRecords; i++) { |
| addRecord(jobName, createRecord(i)); |
| } |
| // commit job |
| commitJobBulk(jobName); |
| finishJob(jobName, jobId); |
| waitForJobRunFinished(jobName, jobId, MAX_WAIT_TIME); |
| // Now read the object from store |
| final Collection<StoreObject> objectInfos = |
| _objectStore.getStoreObjectInfos(STORE_NAME, "helloWorldExportBucket/"); |
| int readNoOfRecords = 0; |
| for (final StoreObject objectInfo : objectInfos) { |
| final InputStream inputStream = _objectStore.readObject(STORE_NAME, objectInfo.getId()); |
| Record readRecord = RECORDREADER.readBinaryStream(inputStream); |
| while (readRecord != null) { |
| try { |
| readNoOfRecords++; |
| // check if the test string is in the test attribute |
| System.err.println(getClass().getSimpleName() + ": Value of attribute '" |
| + HelloWorldWorker.TEST_ATTRIBUTE + "' = '" |
| + readRecord.getMetadata().getStringValue(HelloWorldWorker.TEST_ATTRIBUTE) + "'"); |
| assertEquals(HelloWorldWorker.TEST_STRING, |
| readRecord.getMetadata().getStringValue(HelloWorldWorker.TEST_ATTRIBUTE)); |
| readRecord = RECORDREADER.readBinaryStream(inputStream); |
| } catch (final IllegalStateException ise) { |
| readRecord = null; |
| } |
| } |
| } |
| // check if number of records is ok. |
| assertEquals(noOfRecords, readNoOfRecords); |
| } |
| |
| } |