/******************************************************************************* | |
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the | |
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this | |
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: Andreas Schank (Attensity Europe GmbH) - initial implementation | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.jobmanager.test; | |
import java.util.ArrayList; | |
import java.util.Collection; | |
import java.util.LinkedHashMap; | |
import java.util.Map; | |
import org.eclipse.smila.datamodel.AnyMap; | |
import org.eclipse.smila.datamodel.AnySeq; | |
import org.eclipse.smila.datamodel.AnyUtil; | |
import org.eclipse.smila.jobmanager.BucketDefinition; | |
import org.eclipse.smila.jobmanager.DataObjectTypeDefinition; | |
import org.eclipse.smila.jobmanager.InvalidConfigException; | |
import org.eclipse.smila.jobmanager.JobDefinition; | |
import org.eclipse.smila.jobmanager.JobManager; | |
import org.eclipse.smila.jobmanager.JobManagerException; | |
import org.eclipse.smila.jobmanager.WorkerDefinition; | |
import org.eclipse.smila.jobmanager.WorkflowDefinition; | |
import org.eclipse.smila.jobmanager.internal.AccessAny; | |
import org.eclipse.smila.jobmanager.internal.JobManagerImpl; | |
import org.eclipse.smila.jobmanager.util.ValueExpression; | |
import org.eclipse.smila.objectstore.ObjectStoreService; | |
import org.eclipse.smila.taskmanager.Task; | |
/** | |
* Test class for JobMaangerImpl. | |
* | |
*/ | |
public class TestJobManagerWithPersistence extends JobManagerTestBase { | |
private static final String STORE = "store"; | |
private static final String TEMPSTORE = "tempstore"; | |
/** {@inheritDoc} */ | |
@Override | |
protected void setUp() throws Exception { | |
super.setUp(); | |
_objectStoreService.ensureStore(TEMPSTORE); | |
_objectStoreService.ensureStore(STORE); | |
} | |
/** {@inheritDoc} */ | |
@Override | |
protected void tearDown() throws Exception { | |
_objectStoreService.removeStore(STORE); | |
_objectStoreService.removeStore(TEMPSTORE); | |
super.tearDown(); | |
} | |
/** | |
* Creates a bucket-Any representation with the given name, type and store. | |
*/ | |
private AnyMap createTestBucket(final String name, final String type, final String store) throws Exception { | |
final AnyMap bucketAny = AccessAny.FACTORY.createAnyMap(); | |
bucketAny.put(BucketDefinition.KEY_NAME, name); | |
bucketAny.put(BucketDefinition.KEY_TYPE, type); | |
return bucketAny; | |
} | |
/** | |
* @return a (correct) simple workflow definition containing only a bulkbuilder. | |
*/ | |
private AnyMap createTestWorkflow(final String workflowName, final String insertBucketName, | |
final AnyMap parameters) { | |
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap(); | |
workflowAny.put(WorkflowDefinition.KEY_NAME, workflowName); | |
final AnyMap startActionAny = AccessAny.FACTORY.createAnyMap(); | |
startActionAny.put("worker", "inputWorker"); | |
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap(); | |
parametersAny.put("index", "index"); | |
final AnyMap outputAny = AccessAny.FACTORY.createAnyMap(); | |
outputAny.put("output", (insertBucketName != null) ? insertBucketName : "insertBucket"); | |
startActionAny.put("output", outputAny); | |
workflowAny.put("startAction", startActionAny); | |
if (parameters != null) { | |
workflowAny.put(WorkflowDefinition.KEY_PARAMETERS, parameters); | |
} | |
return workflowAny; | |
} | |
/** | |
* @return a job definition for given parameters. | |
*/ | |
private AnyMap createTestJob(final String name, final String workflow, final AnyMap parameters) { | |
final AnyMap jobAny = AccessAny.FACTORY.createAnyMap(); | |
jobAny.put("name", name); | |
jobAny.put("workflow", workflow); | |
jobAny.put("parameters", parameters); | |
return jobAny; | |
} | |
/** | |
* Test if JobManager service was successfully started and registered. | |
* | |
* @throws Exception | |
* no service found. | |
*/ | |
public void testService() throws Exception { | |
final JobManager service = getService(JobManager.class); | |
assertNotNull(service); | |
assertTrue(service instanceof JobManagerImpl); | |
} | |
/** | |
* Tests if jobmanager can load configured workers. | |
* | |
* @throws Exception | |
* Exception while trying to access jobmanager. | |
*/ | |
public void testAccessPredefinedWorkers() throws Exception { | |
final Collection<String> workerNames = _defPersistence.getWorkers(); | |
assertNotNull(workerNames); | |
assertTrue(workerNames.size() > 1); | |
for (final String name : workerNames) { | |
final WorkerDefinition worker = _defPersistence.getWorker(name); | |
assertNotNull(worker); | |
assertEquals(name, worker.getName()); | |
} | |
} | |
/** | |
* Tests if jobmanager can load configured data object types. | |
* | |
* @throws Exception | |
* Exception while trying to access jobmanager. | |
*/ | |
public void testAccessPredefinedDataObjectTypes() throws Exception { | |
final Collection<String> dotNames = _defPersistence.getDataObjectTypes(); | |
assertNotNull(dotNames); | |
assertTrue(dotNames.size() > 0); | |
for (final String name : dotNames) { | |
final DataObjectTypeDefinition dot = _defPersistence.getDataObjectType(name); | |
assertNotNull(dot); | |
assertEquals(name, dot.getName()); | |
} | |
} | |
/** | |
* Tests if jobmanager can load configured buckets. | |
* | |
* @throws Exception | |
* Exception while trying to access jobmanager. | |
*/ | |
public void testAccessPredefinedBuckets() throws Exception { | |
final Collection<String> bucketNames = _defPersistence.getBuckets(); | |
assertNotNull(bucketNames); | |
assertTrue(bucketNames.size() > 0); | |
for (final String name : bucketNames) { | |
final BucketDefinition bucket = _defPersistence.getBucket(name); | |
assertNotNull(bucket); | |
assertEquals(name, bucket.getName()); | |
} | |
} | |
/** | |
* Tests if jobmanager can load configured workflows. | |
* | |
* @throws Exception | |
* Exception while trying to access jobmanager. | |
*/ | |
public void testAccessPredefinedWorkflows() throws Exception { | |
final Collection<String> workflowNames = _defPersistence.getWorkflows(); | |
assertNotNull(workflowNames); | |
assertTrue(workflowNames.size() > 0); | |
for (final String name : workflowNames) { | |
final WorkflowDefinition workflow = _defPersistence.getWorkflow(name); | |
assertNotNull(workflow); | |
assertEquals(name, workflow.getName()); | |
} | |
} | |
/** | |
* Tests if jobmanager correctly adds buckets. | |
* | |
* @throws Exception | |
* Exception while trying to access jobmanager or creating objects. | |
*/ | |
public void testAddBuckets() throws Exception { | |
final Collection<String> names = _defPersistence.getBuckets(); | |
assertNotNull(names); | |
assertFalse(names.isEmpty()); | |
final AnyMap bucketAny = AccessAny.FACTORY.createAnyMap(); | |
bucketAny.put("name", "testAddBuckets-1"); | |
bucketAny.put("type", "recordBulks"); | |
_defPersistence.addBucket(new BucketDefinition(bucketAny)); | |
bucketAny.put("name", "testAddBuckets-2"); | |
_defPersistence.addBucket(new BucketDefinition(bucketAny)); | |
final Collection<String> namesAfter = _defPersistence.getBuckets(); | |
assertEquals(names.size() + 2, namesAfter.size()); | |
} | |
/** | |
* Tests if jobmanager correctly adds workflows. | |
* | |
* @throws Exception | |
* Exception while trying to access jobmanager or creating objects. | |
*/ | |
public void testAddWorkflow() throws Exception { | |
final int noOfWorkflows = _defPersistence.getWorkflows().size(); | |
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap(); | |
workflowAny.put("name", "indexStat"); | |
final AnyMap startActionAny = AccessAny.FACTORY.createAnyMap(); | |
startActionAny.put("worker", "triggeredWorker"); | |
final AnyMap inputAny = AccessAny.FACTORY.createAnyMap(); | |
inputAny.put("input", "finalBucket"); | |
startActionAny.put("input", inputAny); | |
final AnyMap outputAny = AccessAny.FACTORY.createAnyMap(); | |
outputAny.put("output", "triggeredBucket"); | |
startActionAny.put("output", outputAny); | |
workflowAny.put("startAction", startActionAny); | |
final WorkflowDefinition workflow = new WorkflowDefinition(workflowAny); | |
_defPersistence.addWorkflow(workflow); | |
final int noOfWorkflowsNew = _defPersistence.getWorkflows().size(); | |
assertTrue(noOfWorkflowsNew == noOfWorkflows + 1); | |
final WorkflowDefinition workflowCheck = _defPersistence.getWorkflow(workflow.getName()); | |
assertNotNull(workflowCheck); | |
assertEquals(workflow.getName(), workflowCheck.getName()); | |
assertFalse(workflowCheck.isReadOnly()); | |
} | |
/** | |
* Tests if jobmanager does not remove predefined buckets. | |
* | |
* @throws Exception | |
* Exception while trying to access jobmanager or creating objects. | |
*/ | |
public void testDontRemovePredefinedBuckets() throws Exception { | |
final Collection<String> names = _defPersistence.getBuckets(); | |
assertNotNull(names); | |
assertFalse(names.isEmpty()); | |
for (final String name : names) { | |
try { | |
_defPersistence.removeBucket(name); | |
fail("should not work"); | |
} catch (final Exception ex) { | |
assertTrue("Wrong exception caught: " + ex, ex instanceof IllegalArgumentException); | |
} | |
} | |
} | |
/** | |
* Tests if jobmanager does not accept updating predefined buckets. | |
*/ | |
public void testDontAddPredefinedBucket() throws Exception { | |
final Collection<String> names = _defPersistence.getBuckets(); | |
assertNotNull(names); | |
assertFalse(names.isEmpty()); | |
final AnyMap bucketAny = AccessAny.FACTORY.createAnyMap(); | |
bucketAny.put("type", "recordBulks"); | |
bucketAny.put("name", names.iterator().next()); | |
try { | |
_defPersistence.addBucket(new BucketDefinition(bucketAny)); | |
fail("should not work"); | |
} catch (final Exception ex) { | |
assertTrue("Wrong exception caught: " + ex, ex instanceof InvalidConfigException); | |
} | |
final Collection<String> namesAfter = _defPersistence.getBuckets(); | |
assertEquals(names.size(), namesAfter.size()); | |
} | |
/** | |
* Tests if jobmanager does not accept incomplete bucket definitions. | |
* | |
* @throws Exception | |
* Exception while trying to access jobmanager or creating objects. | |
*/ | |
public void testDontAddIncompleteBucket() throws Exception { | |
final Collection<String> names = _defPersistence.getBuckets(); | |
assertNotNull(names); | |
assertFalse(names.isEmpty()); | |
final Collection<BucketDefinition> newBuckets = new ArrayList<BucketDefinition>(); | |
final AnyMap bucketAny = AccessAny.FACTORY.createAnyMap(); | |
bucketAny.put("name", "testDontAddIncompleteBucket-1"); | |
try { | |
newBuckets.add(new BucketDefinition(bucketAny)); | |
fail("should not work"); | |
} catch (final Exception ex) { | |
assertTrue("Wrong exception caught: " + ex, ex instanceof InvalidConfigException); | |
} | |
final Collection<String> namesAfter = _defPersistence.getBuckets(); | |
assertEquals(names.size(), namesAfter.size()); | |
} | |
/** | |
* Tests if jobmanager does not delete workflows that are referenced by a job. | |
* | |
* @throws Exception | |
* Exception while trying to access jobmanager or creating objects. | |
*/ | |
public void testDontDeleteWorkflowsThatAreStillReferencedByAJob() throws Exception { | |
final Collection<String> names = _defPersistence.getWorkflows(); | |
assertNotNull(names); | |
final Map<String, String> workflowParameters = new LinkedHashMap<String, String>(); | |
workflowParameters.put("indexName", "indexName"); | |
workflowParameters.put("index", "index"); | |
workflowParameters.put("tempStore", TEMPSTORE); | |
workflowParameters.put(STORE, TEMPSTORE); | |
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap(); | |
workflowAny.put("name", "testDeleteWorkflow-1"); | |
workflowAny.put("parameters", AnyUtil.objectToAny(workflowParameters)); | |
final AnyMap startActionAny = AccessAny.FACTORY.createAnyMap(); | |
startActionAny.put("worker", "triggeredWorker"); | |
final AnyMap inputSlots = AccessAny.FACTORY.createAnyMap(); | |
inputSlots.put("input", "finalBucket"); | |
startActionAny.put("input", inputSlots); | |
final AnyMap outputSlots = AccessAny.FACTORY.createAnyMap(); | |
outputSlots.put("output", "triggeredBucket"); | |
startActionAny.put("output", outputSlots); | |
workflowAny.put("startAction", startActionAny); | |
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny)); | |
workflowAny.put("name", "testDeleteWorkflow-2"); | |
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny)); | |
final AnyMap jobAny = AccessAny.FACTORY.createAnyMap(); | |
jobAny.put("name", "testDeleteWorkflowJob-1"); | |
jobAny.put("workflow", "testDeleteWorkflow-1"); | |
_defPersistence.addJob(new JobDefinition(jobAny)); | |
// so now testDeleteWorkflow-2 can be deleted, while testDeleteWorkflow-1 is referenced by a job | |
try { | |
_defPersistence.removeWorkflow("testDeleteWorkflow-1"); | |
fail("should not work"); | |
} catch (final Exception ex) { | |
assertTrue("Wrong exception caught: " + ex, ex instanceof InvalidConfigException); | |
} | |
_defPersistence.removeWorkflow("testDeleteWorkflow-2"); | |
final Collection<String> namesAfter = _defPersistence.getWorkflows(); | |
assertEquals(names.size() + 1, namesAfter.size()); | |
// now delete the job | |
_defPersistence.removeJob(jobAny.getStringValue("name")); | |
// now also workflow1 can be removed: | |
_defPersistence.removeWorkflow("testDeleteWorkflow-1"); | |
} | |
/** | |
* Tests if jobmanager does not delete buckets that are still referenced by a workflow. | |
* | |
* @throws Exception | |
* Exception while trying to access jobmanager or creating objects. | |
*/ | |
public void testDontDeleteBucketsThatAreStillReferencedByAWorkflow() throws Exception { | |
final Collection<String> names = _defPersistence.getBuckets(); | |
final AnyMap bucket1Any = createTestBucket("docs-bucket-1", "recordBulks", "docs1"); | |
_defPersistence.addBucket(new BucketDefinition(bucket1Any)); | |
final AnyMap bucket2Any = createTestBucket("docs-bucket-2", "recordBulks", "docs2"); | |
_defPersistence.addBucket(new BucketDefinition(bucket2Any)); | |
final AnyMap bucket3Any = createTestBucket("docs-bucket-3", "recordBulks", "docs3"); | |
_defPersistence.addBucket(new BucketDefinition(bucket3Any)); | |
// workflow uses docs-bucket-1 and docs-bucket-2 | |
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap(); | |
workflowAny.put("name", "testDeleteWorkflow-1"); | |
final AnyMap startActionAny = AccessAny.FACTORY.createAnyMap(); | |
startActionAny.put("worker", "triggeredWorker"); | |
final AnyMap startActionInputAny = AccessAny.FACTORY.createAnyMap(); | |
startActionInputAny.put("input", "docs-bucket-1"); | |
startActionAny.put("input", startActionInputAny); | |
final AnyMap startActionOutputAny = AccessAny.FACTORY.createAnyMap(); | |
startActionOutputAny.put("output", "docs-bucket-2"); | |
startActionAny.put("output", startActionOutputAny); | |
workflowAny.put("startAction", startActionAny); | |
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny)); | |
// so now docs-bucket-3 can be deleted, the others not | |
try { | |
_defPersistence.removeBucket("docs-bucket-1"); | |
fail("should not work"); | |
} catch (final Exception ex) { | |
assertTrue("Wrong exception caught: " + ex, ex instanceof InvalidConfigException); | |
} | |
try { | |
_defPersistence.removeBucket("docs-bucket-2"); | |
fail("should not work"); | |
} catch (final Exception ex) { | |
assertTrue("Wrong exception caught: " + ex, ex instanceof InvalidConfigException); | |
} | |
_defPersistence.removeBucket("docs-bucket-3"); | |
// still two more left... | |
final Collection<String> namesAfter = _defPersistence.getBuckets(); | |
assertEquals(names.size() + 2, namesAfter.size()); | |
// now delete the workflow | |
_defPersistence.removeWorkflow(workflowAny.getStringValue("name")); | |
// now all buckets can be removed | |
_defPersistence.removeBucket(bucket1Any.getStringValue("name")); | |
_defPersistence.removeBucket(bucket2Any.getStringValue("name")); | |
// all new buckets are gone | |
final Collection<String> namesFinal = _defPersistence.getBuckets(); | |
assertEquals(names, namesFinal); | |
} | |
/** | |
* Tests if jobmanager does not accept workflows with non-existing workers. | |
* | |
* @throws Exception | |
* Exception while trying to access jobmanager or creating objects. | |
*/ | |
public void testDontAddWorkflowWithNonexistingWorker() throws Exception { | |
final Collection<String> names = _defPersistence.getBuckets(); | |
assertNotNull(names); | |
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap(); | |
workflowAny.put("name", "testDontAddWorkflowWithNonexistingWorker"); | |
final AnyMap startActionAny = AccessAny.FACTORY.createAnyMap(); | |
startActionAny.put("worker", "nonexistingWorker"); | |
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap(); | |
parametersAny.put("index", "index"); | |
startActionAny.put("parameters", parametersAny); | |
final AnyMap outputAny = AccessAny.FACTORY.createAnyMap(); | |
outputAny.put("output", "insertBucket"); | |
startActionAny.put("output", outputAny); | |
workflowAny.put("startAction", startActionAny); | |
// one bucket does not exist: error... | |
try { | |
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny)); | |
fail("should not work"); | |
} catch (final Exception ex) { | |
assertTrue("Wrong exception caught: " + ex, ex instanceof InvalidConfigException); | |
} | |
} | |
/** | |
* Tests if jobmanager does not accept transient bucket as workflow start action input bucket. | |
* | |
* @throws Exception | |
* error | |
*/ | |
public void testDontAddWorkflowWithTransientStartBucket() throws Exception { | |
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap(); | |
workflowAny.put("name", "testDontAddWorkflowWithTransientStartBucket"); | |
final AnyMap startActionAny = AccessAny.FACTORY.createAnyMap(); | |
startActionAny.put("worker", "triggeredWorker"); | |
final AnyMap inputAny = AccessAny.FACTORY.createAnyMap(); | |
inputAny.put("input", "docsBucket"); | |
startActionAny.put("input", inputAny); | |
final AnyMap outputAny = AccessAny.FACTORY.createAnyMap(); | |
outputAny.put("output", "triggeredBucket"); | |
startActionAny.put("output", outputAny); | |
workflowAny.put("startAction", startActionAny); | |
try { | |
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny)); | |
fail("should not work, start action worker has transient input bucket"); | |
} catch (final Exception ex) { | |
assertTrue("Wrong exception caught: " + ex, ex instanceof InvalidConfigException); | |
} | |
} | |
/** | |
* Tests if jobmanager does not accept workflow bucket with the wrong type for a worker slot. | |
* | |
* @throws Exception | |
* Exception while trying to access jobmanager or creating objects. | |
*/ | |
public void testDontAddWorkflowWithIncorrectBucketTypeForWorkerSlot() throws Exception { | |
final Collection<String> names = _defPersistence.getBuckets(); | |
assertNotNull(names); | |
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap(); | |
workflowAny.put("name", "testDontAddWorkflowWithIncorrectBucketTypeForWorkerSlot"); | |
final AnyMap startActionAny = AccessAny.FACTORY.createAnyMap(); | |
startActionAny.put("worker", WORKER_1); | |
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap(); | |
parametersAny.put("index", "index"); | |
final AnyMap outputAny = AccessAny.FACTORY.createAnyMap(); | |
outputAny.put("output", "wrongBucket"); | |
startActionAny.put("output", outputAny); | |
workflowAny.put("startAction", startActionAny); | |
// one bucket type does not fit: error... | |
try { | |
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny)); | |
fail("should not work"); | |
} catch (final Exception ex) { | |
assertTrue("Wrong exception caught: " + ex, ex instanceof InvalidConfigException); | |
} | |
// but check that the correct one won't fail: | |
outputAny.put("output", "docsBucket"); | |
startActionAny.put("output", outputAny); | |
workflowAny.put("startAction", startActionAny); | |
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny)); | |
_defPersistence.removeWorkflow(workflowAny.getStringValue("name")); | |
} | |
/** | |
* Tests if jobmanager does not accept updating predefined workflows. | |
*/ | |
public void testDontAddPredefinedWorkflows() throws Exception { | |
final Collection<String> names = _defPersistence.getWorkflows(); | |
assertNotNull(names); | |
assertFalse(names.isEmpty()); | |
final WorkflowDefinition workflow = _defPersistence.getWorkflow(names.iterator().next()); | |
assertNotNull(workflow); | |
try { | |
_defPersistence.addWorkflow(workflow); | |
fail("should not work"); | |
} catch (final Exception ex) { | |
assertTrue("Wrong exception caught: " + ex, ex instanceof InvalidConfigException); | |
} | |
final Collection<String> namesAfter = _defPersistence.getWorkflows(); | |
assertEquals(names.size(), namesAfter.size()); | |
} | |
/** | |
* Tests if jobmanager does not remove predefined workflows. | |
* | |
* @throws Exception | |
* Exception while trying to access jobmanager or creating objects. | |
*/ | |
public void testDontRemovePredefinedWorkflows() throws Exception { | |
final Collection<String> names = _defPersistence.getWorkflows(); | |
assertNotNull(names); | |
assertFalse(names.isEmpty()); | |
for (final String name : names) { | |
try { | |
_defPersistence.removeWorkflow(name); | |
fail("should not work"); | |
} catch (final Exception ex) { | |
assertTrue("Wrong exception caught: " + ex, ex instanceof IllegalArgumentException); | |
} | |
} | |
} | |
/** | |
* Tests that jobmanager does accept overwriting of buckets and re-validates workflows/jobs based on it. | |
*/ | |
public void testAddExistingBucket() throws Exception { | |
// create a (valid) job based on test workflow and bucket | |
final BucketDefinition originalBucket = new BucketDefinition(createTestBucket("testBucket", "recordBulks", "")); | |
_defPersistence.addBucket(originalBucket); | |
final WorkflowDefinition testWorkflow = | |
new WorkflowDefinition(createTestWorkflow("testAddExistingBucketWorkflow", originalBucket.getName(), null)); | |
_defPersistence.addWorkflow(testWorkflow); | |
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap(); | |
parametersAny.put("tempStore", TEMPSTORE); | |
parametersAny.put(STORE, STORE); | |
parametersAny.put("index", "index"); | |
final AnyMap jobDef = createTestJob("testAddExistingBucketJob", testWorkflow.getName(), parametersAny); | |
_defPersistence.addJob(new JobDefinition(jobDef)); | |
// get a bucket of the workflow, change the data type and try to add it -> should fail | |
final AnyMap bucketAny = originalBucket.toAny(); | |
final String originalDataType = originalBucket.getDataObjectType(); | |
bucketAny.put(BucketDefinition.KEY_TYPE, "wrongType"); | |
BucketDefinition updatedBucket = new BucketDefinition(bucketAny); | |
try { | |
_defPersistence.addBucket(updatedBucket); | |
fail("should not work - wrong data type for existing workflow"); | |
} catch (final Exception ex) { | |
assertTrue("Wrong exception caught: " + ex, ex instanceof InvalidConfigException); | |
} | |
bucketAny.put(BucketDefinition.KEY_TYPE, originalDataType); // reset original data type | |
// add an unresolved parameter to the bucket and try to add it -> should fail again | |
final AnyMap bucketParameters = AccessAny.FACTORY.createAnyMap(); | |
bucketParameters.put("tempStore", "${unresolvedVariable}"); | |
bucketAny.put(BucketDefinition.KEY_PARAMETERS, bucketParameters); | |
updatedBucket = new BucketDefinition(bucketAny); | |
try { | |
_defPersistence.addBucket(updatedBucket); | |
fail("should not work - unresolved variable in bucket parameters"); | |
} catch (final Exception ex) { | |
assertTrue("Wrong exception caught: " + ex, ex instanceof InvalidConfigException); | |
} | |
// resolve the unresolved parameter and try to add the bucket -> should succeed | |
updatedBucket.getParameters().put("tempStore", new ValueExpression("newtempstore")); | |
_defPersistence.addBucket(updatedBucket); | |
final BucketDefinition newBucket = _defPersistence.getBucket(originalBucket.getName()); | |
assertEquals("newtempstore", newBucket.getParameters().get("tempStore").getExpression()); | |
assertFalse(newBucket.isReadOnly()); | |
} | |
/** | |
* Tests that jobmanager does accept overwriting of workflows and re-validates jobs based on it. | |
*/ | |
public void testAddExistingWorkflow() throws Exception { | |
// create a (valid) job based on test workflow | |
final WorkflowDefinition originalWorkflow = | |
new WorkflowDefinition(createTestWorkflow("testAddExistingWorkflow", null, null)); | |
_defPersistence.addWorkflow(originalWorkflow); | |
assertNotNull(originalWorkflow); | |
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap(); | |
parametersAny.put("tempStore", TEMPSTORE); | |
parametersAny.put(STORE, TEMPSTORE); | |
parametersAny.put("index", "index"); | |
final AnyMap jobDef = createTestJob("testJob", originalWorkflow.getName(), parametersAny); | |
_defPersistence.addJob(new JobDefinition(jobDef)); | |
// add an unresolved parameter to the workflow -> add workflow should fail | |
final AnyMap workflowAny = originalWorkflow.toAny(); | |
final AnyMap workflowParameter = AccessAny.FACTORY.createAnyMap(); | |
workflowParameter.put("myTestParam", "${unresolvedVariable}"); | |
workflowAny.put(WorkflowDefinition.KEY_PARAMETERS, workflowParameter); | |
final WorkflowDefinition workflow = new WorkflowDefinition(workflowAny); | |
try { | |
_defPersistence.addWorkflow(workflow); | |
fail("should not work - unresolved parameter in job definition based on workflow"); | |
} catch (final Exception ex) { | |
assertTrue("Wrong exception caught: " + ex, ex instanceof InvalidConfigException); | |
} | |
// remove unresolved parameter from workflow definition -> add workflow should succeed | |
workflow.getParameters().put("myTestParam", new ValueExpression("resolvedParameter")); | |
_defPersistence.addWorkflow(workflow); | |
final WorkflowDefinition newWorkflow = _defPersistence.getWorkflow(workflow.getName()); | |
assertTrue(newWorkflow.getParameters().containsKey("myTestParam")); | |
assertFalse(newWorkflow.isReadOnly()); | |
} | |
/** | |
* Tests that jobmanager does accept overwriting of job definitions and validates them. | |
*/ | |
public void testAddExistingJob() throws Exception { | |
final String jobName = "testAddExistingJob"; | |
final String workflowName = "testWorkflow"; | |
// add valid job | |
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap(); | |
parametersAny.put("workerParameter", "value"); | |
parametersAny.put("store", STORE); | |
parametersAny.put("tempStore", TEMPSTORE); | |
AnyMap jobDef = createTestJob(jobName, workflowName, parametersAny); | |
_defPersistence.addJob(new JobDefinition(jobDef)); | |
// try to add job again - with unresolved parameter -> should fail | |
try { | |
parametersAny.remove("store"); | |
jobDef = createTestJob(jobName, workflowName, parametersAny); | |
_defPersistence.addJob(new JobDefinition(jobDef)); | |
System.out.println(_defPersistence.getJobs()); | |
fail("should not work - unresolved parameter in job definition based on workflow"); | |
} catch (final Exception ex) { | |
assertTrue("Wrong exception caught: " + ex, ex instanceof InvalidConfigException); | |
} | |
// try to add job again - with resolved parameter -> should succeed | |
parametersAny.put("store", "newstore"); | |
_objectStoreService.ensureStore("newstore"); | |
jobDef = createTestJob(jobName, workflowName, parametersAny); | |
_defPersistence.addJob(new JobDefinition(jobDef)); | |
System.out.println(_defPersistence.getJobs()); | |
assertEquals("newstore", _defPersistence.getJob(jobName).getParameters().get("store").toString()); | |
_objectStoreService.removeStore("newstore"); | |
} | |
/** | |
* Tests if jobmanager does accept workflow without a bucket for an optional worker slot. | |
* | |
* @throws Exception | |
* Exception while trying to access jobmanager or creating objects. | |
*/ | |
public void testAddWorkflowWithoutBucketForOptionalWorkerSlot() throws Exception { | |
final Collection<String> names = _defPersistence.getBuckets(); | |
assertNotNull(names); | |
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap(); | |
workflowAny.put("name", "testAddWorkflowWithoutBucketForOptionalWorkerSlot"); | |
final AnyMap startActionAny = AccessAny.FACTORY.createAnyMap(); | |
startActionAny.put("worker", WORKER_1); | |
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap(); | |
parametersAny.put("index", "index"); | |
workflowAny.put("startAction", startActionAny); | |
// the bucket slot is optional and not defined: that's ok | |
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny)); | |
_defPersistence.removeWorkflow(workflowAny.getStringValue("name")); | |
// check without the complete output section, should also work, because both slots are optional | |
startActionAny.remove("output"); | |
workflowAny.put("startAction", startActionAny); | |
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny)); | |
_defPersistence.removeWorkflow(workflowAny.getStringValue("name")); | |
} | |
/** | |
* Tests if jobmanager does not accept workflow bucket without a bucket name for a mandatory worker output slot. | |
* | |
* @throws Exception | |
* Exception while trying to access jobmanager or creating objects. | |
*/ | |
public void testAddWorkflowWithoutBucketForMandatoryWorkerSlot() throws Exception { | |
final Collection<String> names = _defPersistence.getBuckets(); | |
assertNotNull(names); | |
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap(); | |
workflowAny.put("name", "testAddWorkflowWithoutBucketForMandatoryWorkerSlot"); | |
final AnyMap startActionAny = AccessAny.FACTORY.createAnyMap(); | |
startActionAny.put("worker", "triggeredWorker"); | |
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap(); | |
parametersAny.put("index", "index"); | |
final AnyMap inputAny = AccessAny.FACTORY.createAnyMap(); | |
inputAny.put("input", "finalBucket"); | |
startActionAny.put("input", inputAny); | |
workflowAny.put("startAction", startActionAny); | |
// the bucket slot is not optional and not defined: that's not ok | |
try { | |
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny)); | |
fail("should not work"); | |
} catch (final Exception ex) { | |
assertTrue("Wrong exception caught: " + ex, ex instanceof InvalidConfigException); | |
} | |
// check with the correct output section, should work | |
final AnyMap outputAny = AccessAny.FACTORY.createAnyMap(); | |
outputAny.put("output", "triggeredBucket"); | |
startActionAny.put("output", outputAny); | |
workflowAny.put("startAction", startActionAny); | |
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny)); | |
_defPersistence.removeWorkflow(workflowAny.getStringValue("name")); | |
} | |
/** | |
* Tests if the jobmanager throws an exception when trying to add a bucket with a parameter that would result in an | |
* invalid store name. | |
* | |
* @throws Exception | |
* Exception while trying to access jobmanager or creating objects. | |
*/ | |
public void testInvalidBucketStoreParameter() throws Exception { | |
final Collection<String> names = _defPersistence.getBuckets(); | |
assertNotNull(names); | |
final AnyMap bucketAny = AccessAny.FACTORY.createAnyMap(); | |
bucketAny.put("name", "testInvalidBucketStoreParameter"); | |
bucketAny.put("type", "recordBulks"); | |
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap(); | |
parametersAny.put("store", "invalid~Store"); | |
bucketAny.put("parameters", parametersAny); | |
try { | |
_defPersistence.addBucket(new BucketDefinition(bucketAny)); | |
fail("Invalid store name: should not work."); | |
} catch (final Exception ex) { | |
assertTrue("wrong exception class " + ex.getClass(), ex instanceof JobManagerException); | |
} | |
// but must work with correct parameter: | |
parametersAny.put("store", "validstore"); | |
_defPersistence.addBucket(new BucketDefinition(bucketAny)); | |
final Collection<String> namesAfter = _defPersistence.getBuckets(); | |
assertEquals(names.size() + 1, namesAfter.size()); | |
} | |
/** | |
* Tests if the jobmanager ignores action parameters that would affect buckets. | |
* | |
* @throws Exception | |
* Exception while trying to access jobmanager or creating objects. | |
*/ | |
public void testInvalidWorkflowActionStoreParameter() throws Exception { | |
final Collection<String> names = _defPersistence.getWorkflows(); | |
assertNotNull(names); | |
final AnyMap bucketAny = AccessAny.FACTORY.createAnyMap(); | |
bucketAny.put("name", "persistentTestInvalidWorkflowActionStoreParameterBucket"); | |
bucketAny.put("type", "recordBulks"); | |
_defPersistence.addBucket(new BucketDefinition(bucketAny)); | |
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap(); | |
workflowAny.put("name", "testInvalidWorkflowStartActionStoreParameter"); | |
final AnyMap startActionAny = AccessAny.FACTORY.createAnyMap(); | |
startActionAny.put("worker", WORKER_1); | |
final AnyMap parametersStartActionAny = AccessAny.FACTORY.createAnyMap(); | |
parametersStartActionAny.put(STORE, "in~Valid*+%&$"); | |
startActionAny.put("parameters", parametersStartActionAny); | |
final AnyMap outputStartActionAny = AccessAny.FACTORY.createAnyMap(); | |
outputStartActionAny.put("output", "persistentTestInvalidWorkflowActionStoreParameterBucket"); | |
startActionAny.put("output", outputStartActionAny); | |
workflowAny.put("startAction", startActionAny); | |
final AnyMap actionAny = AccessAny.FACTORY.createAnyMap(); | |
actionAny.put("worker", "intermediateWorker"); | |
final AnyMap parametersActionAny = AccessAny.FACTORY.createAnyMap(); | |
parametersActionAny.put("tempStore", "in~Validäöü\\?"); | |
actionAny.put("parameters", parametersActionAny); | |
final AnyMap inputActionAny = AccessAny.FACTORY.createAnyMap(); | |
inputActionAny.put("input", "persistentTestInvalidWorkflowActionStoreParameterBucket"); | |
actionAny.put("input", inputActionAny); | |
final AnyMap outputActionAny = AccessAny.FACTORY.createAnyMap(); | |
outputActionAny.put("output", "processedBucket"); | |
actionAny.put("output", outputActionAny); | |
final AnySeq actionListAny = AccessAny.FACTORY.createAnySeq(); | |
actionListAny.add(actionAny); | |
workflowAny.put("actions", actionListAny); | |
// both wrong: | |
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny)); | |
// would normally result in invalid store name, but since this is not checked, we must have succeeded. | |
final Collection<String> namesAfter = _defPersistence.getWorkflows(); | |
assertEquals(names.size() + 1, namesAfter.size()); | |
} | |
/** | |
* Tests if the jobmanager throws an exception when trying to add a workflow with a parameter that would result in an | |
* invalid store name. | |
* | |
* @throws Exception | |
* Exception while trying to access jobmanager or creating objects. | |
*/ | |
public void testInvalidWorkflowParameter() throws Exception { | |
final Collection<String> names = _defPersistence.getWorkflows(); | |
assertNotNull(names); | |
final AnyMap bucketAny = AccessAny.FACTORY.createAnyMap(); | |
bucketAny.put("name", "persistentTestInvalidWorkflowParameterBucket"); | |
bucketAny.put("type", "recordBulks"); | |
_defPersistence.addBucket(new BucketDefinition(bucketAny)); | |
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap(); | |
workflowAny.put("name", "testInvalidWorkflowParameter"); | |
final AnyMap startActionAny = AccessAny.FACTORY.createAnyMap(); | |
startActionAny.put("worker", WORKER_1); | |
final AnyMap outputStartActionAny = AccessAny.FACTORY.createAnyMap(); | |
outputStartActionAny.put("output", "persistentTestInvalidWorkflowParameterBucket"); | |
startActionAny.put("output", outputStartActionAny); | |
workflowAny.put("startAction", startActionAny); | |
final AnyMap actionAny = AccessAny.FACTORY.createAnyMap(); | |
actionAny.put("worker", WORKER_1); | |
final AnyMap outputActionAny = AccessAny.FACTORY.createAnyMap(); | |
outputActionAny.put("output", "transientBucket"); | |
actionAny.put("output", outputActionAny); | |
final AnySeq actionListAny = AccessAny.FACTORY.createAnySeq(); | |
actionListAny.add(actionAny); | |
workflowAny.put("actions", actionListAny); | |
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap(); | |
parametersAny.put("store", "in~+*Valid"); | |
parametersAny.put("tempStore", "in+~*#Valid"); | |
workflowAny.put("parameters", parametersAny); | |
// both wrong: | |
try { | |
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny)); | |
fail("Invalid store name: should not work."); | |
} catch (final Exception ex) { | |
assertTrue("wrong exception class " + ex.getClass(), ex instanceof JobManagerException); | |
} | |
// store wrong | |
parametersAny.put("store", "In+*~Valid"); | |
parametersAny.put("tempStore", "valid"); | |
try { | |
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny)); | |
fail("Invalid store name: should not work."); | |
} catch (final Exception ex) { | |
assertTrue("wrong exception class " + ex.getClass(), ex instanceof JobManagerException); | |
} | |
// tempStore wrong | |
parametersAny.put("store", "valid"); | |
parametersAny.put("tempStore", "in~*Valid"); | |
try { | |
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny)); | |
fail("Invalid store name: should not work."); | |
} catch (final Exception ex) { | |
assertTrue("wrong exception class " + ex.getClass(), ex instanceof JobManagerException); | |
} | |
// but must work with correct parameter: | |
parametersAny.put(STORE, "valid"); | |
parametersAny.put("tempStore", "valid"); | |
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny)); | |
final Collection<String> namesAfter = _defPersistence.getWorkflows(); | |
assertEquals(names.size() + 1, namesAfter.size()); | |
} | |
/** | |
* Tests if the jobmanager throws an exception when trying to add a job with a parameter that would result in an | |
* invalid store name. | |
* | |
* @throws Exception | |
* Exception while trying to access jobmanager or creating objects. | |
*/ | |
public void testInvalidJobParameter() throws Exception { | |
final Collection<String> names = _defPersistence.getJobs(); | |
assertNotNull(names); | |
final AnyMap jobAny = AccessAny.FACTORY.createAnyMap(); | |
jobAny.put("name", "testInvalidJobParameter"); | |
jobAny.put("workflow", "testWorkflow"); | |
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap(); | |
parametersAny.put("tempStore", "in+~*Valid"); | |
parametersAny.put("store", "in+~*Valid"); | |
parametersAny.put("workerParameter", "value"); | |
jobAny.put("parameters", parametersAny); | |
// both wrong: | |
try { | |
_defPersistence.addJob(new JobDefinition(jobAny)); | |
fail("Invalid store name: should not work."); | |
} catch (final Exception ex) { | |
assertTrue("wrong exception class " + ex.getClass(), ex instanceof JobManagerException); | |
} | |
// correct store name | |
parametersAny.put("tempStore", "valid"); | |
parametersAny.put("store", "valid"); | |
_defPersistence.addJob(new JobDefinition(jobAny)); | |
final Collection<String> namesAfter = _defPersistence.getJobs(); | |
assertEquals(names.size() + 1, namesAfter.size()); | |
} | |
/** | |
* Tests adding a job without an optional parameter. | |
* | |
* @throws Exception | |
* Exception while trying to access jobmanager or creating objects. | |
*/ | |
public void testMissingOptionalJobParameter() throws Exception { | |
final Collection<String> names = _defPersistence.getJobs(); | |
assertNotNull(names); | |
final AnyMap jobAny = AccessAny.FACTORY.createAnyMap(); | |
jobAny.put("name", "testInvalidJobParameter"); | |
jobAny.put("workflow", "optionalParamWorkflow"); | |
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap(); | |
parametersAny.put("tempStore", "valid"); | |
parametersAny.put("store", "valid"); | |
// not now... parametersAny.put("workerParameter", "value"); | |
jobAny.put("parameters", parametersAny); | |
// missing mandatory param | |
try { | |
_defPersistence.addJob(new JobDefinition(jobAny)); | |
fail("Missing mandatory parameter: should not work."); | |
} catch (final Exception ex) { | |
assertTrue("wrong exception class " + ex.getClass(), ex instanceof JobManagerException); | |
} | |
// mandatory param set: | |
parametersAny.put("workerParameter", "value"); | |
_defPersistence.addJob(new JobDefinition(jobAny)); | |
final Collection<String> namesAfter = _defPersistence.getJobs(); | |
assertEquals(names.size() + 1, namesAfter.size()); | |
} | |
/** | |
* Tests adding a job with an optional parameter. | |
* | |
* @throws Exception | |
* Exception while trying to access jobmanager or creating objects. | |
*/ | |
public void testOptionalJobParameter() throws Exception { | |
final Collection<String> names = _defPersistence.getJobs(); | |
assertNotNull(names); | |
final String jobName = "testOptionalJobParameter"; | |
final AnyMap jobAny = AccessAny.FACTORY.createAnyMap(); | |
jobAny.put("name", jobName); | |
jobAny.put("workflow", "optionalParamWorkflow"); | |
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap(); | |
parametersAny.put("tempStore", "valid"); | |
parametersAny.put("store", "valid"); | |
// not now... parametersAny.put("workerParameter", "value"); | |
parametersAny.put("optionalParameter", "optionalValue"); | |
jobAny.put("parameters", parametersAny); | |
// missing mandatory param | |
try { | |
_defPersistence.addJob(new JobDefinition(jobAny)); | |
fail("Missing mandatory parameter: should not work."); | |
} catch (final Exception ex) { | |
assertTrue("wrong exception class " + ex.getClass(), ex instanceof JobManagerException); | |
} | |
// mandatory param set: | |
parametersAny.put("workerParameter", "value"); | |
_defPersistence.addJob(new JobDefinition(jobAny)); | |
assertFalse(_defPersistence.getJob(jobName).isReadOnly()); | |
final Collection<String> namesAfter = _defPersistence.getJobs(); | |
assertEquals(names.size() + 1, namesAfter.size()); | |
} | |
/** | |
* Tests if the jobmanager throws an exception when trying to create initial tasks for workers that are not marked as | |
* bulkSource workers. | |
* | |
* @throws Exception | |
* Exception while trying to access jobmanager or creating objects. | |
*/ | |
public void testInitialTasksOnlyForBulkSourceWorkers() throws Exception { | |
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap(); | |
workflowAny.put("name", "testInitialTasksOnlyForBulkSourceWorkers"); | |
final AnyMap bulkBuilderActionAny = AccessAny.FACTORY.createAnyMap(); | |
bulkBuilderActionAny.put("worker", WORKER_1); | |
final AnyMap bulkBuilderActionOutputAny = AccessAny.FACTORY.createAnyMap(); | |
bulkBuilderActionOutputAny.put("output", "docsBucket"); | |
bulkBuilderActionAny.put("output", bulkBuilderActionOutputAny); | |
workflowAny.put("startAction", bulkBuilderActionAny); | |
final AnySeq actionsAny = AccessAny.FACTORY.createAnySeq(); | |
final AnyMap tmeRecoredAnalyzerActionAny = AccessAny.FACTORY.createAnyMap(); | |
tmeRecoredAnalyzerActionAny.put("worker", "intermediateWorker"); | |
final AnyMap tmeRecordAnalyzerActionOutputAny = AccessAny.FACTORY.createAnyMap(); | |
tmeRecordAnalyzerActionOutputAny.put("output", "processedBucket"); | |
final AnyMap tmeRecordAnalyzerActionInputAny = AccessAny.FACTORY.createAnyMap(); | |
tmeRecordAnalyzerActionInputAny.put("input", "docsBucket"); | |
tmeRecoredAnalyzerActionAny.put("input", tmeRecordAnalyzerActionInputAny); | |
tmeRecoredAnalyzerActionAny.put("output", tmeRecordAnalyzerActionOutputAny); | |
actionsAny.add(tmeRecoredAnalyzerActionAny); | |
workflowAny.put("actions", actionsAny); | |
// add workflow: | |
final JobManager service = getService(JobManager.class); | |
assertNotNull(service); | |
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny)); | |
// add Job | |
final String jobName = "testInitialTasksOnlyForBulkSourceWorkers"; | |
final AnyMap jobAny = AccessAny.FACTORY.createAnyMap(); | |
jobAny.put("name", jobName); | |
jobAny.put("workflow", "testInitialTasksOnlyForBulkSourceWorkers"); | |
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap(); | |
parametersAny.put("tempStore", TEMPSTORE); | |
parametersAny.put("store", STORE); | |
parametersAny.put("workerParameter", "value"); | |
jobAny.put("parameters", parametersAny); | |
_defPersistence.addJob(new JobDefinition(jobAny)); | |
// prepare store | |
final ObjectStoreService store = getService(ObjectStoreService.class); | |
store.ensureStore("docstore"); | |
// start job | |
final String jobId = service.startJob(jobName); | |
// try to start intermediate worker: | |
try { | |
service.getInitialTask(WORKER_2, jobName); | |
fail("must not get initial task for non bulkSource worker"); | |
} catch (final Exception ex) { | |
assertTrue("wrong exception", ex instanceof JobManagerException); | |
} | |
// try to start something that is not part of the workflow: | |
try { | |
service.getInitialTask("theWorkerThatMustNotBeNamed", jobName); | |
fail("must not get initial task for non bulkSource worker"); | |
} catch (final Exception ex) { | |
assertTrue("wrong exception", ex instanceof JobManagerException); | |
} | |
// start input worker: | |
final Task task = service.getInitialTask(WORKER_1, jobName); | |
assertNotNull(task); | |
assertEquals("wrong task worker name", WORKER_1, task.getWorkerName()); | |
finishFinalTask(task, successResult()); | |
service.finishJob(jobName, jobId); | |
} | |
/** | |
* Tests working workflow with groups. | |
* | |
* @throws Exception | |
* Exception while trying to access jobmanager or creating objects. | |
*/ | |
public void testWorkingDefinitionWithGroups() throws Exception { | |
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap(); | |
final String workflowName = "testWorkingDefinition"; | |
workflowAny.put("name", workflowName); | |
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap(); | |
parametersAny.put("indexName", "index"); | |
workflowAny.put("parameters", parametersAny); | |
final AnyMap startActionAny = AccessAny.FACTORY.createAnyMap(); | |
startActionAny.put("worker", WORKER_1); | |
final AnyMap inputParametersAny = AccessAny.FACTORY.createAnyMap(); | |
inputParametersAny.put("index", "${indexName}"); | |
startActionAny.put("parameters", inputParametersAny); | |
final AnyMap outputAny = AccessAny.FACTORY.createAnyMap(); | |
outputAny.put("output", "docsBucket"); | |
startActionAny.put("output", outputAny); | |
workflowAny.put("startAction", startActionAny); | |
final AnySeq actionsAny = AccessAny.FACTORY.createAnySeq(); | |
final AnyMap action1Any = AccessAny.FACTORY.createAnyMap(); | |
action1Any.put("worker", "testSlotGroups"); | |
final AnyMap inputAction1Any = AccessAny.FACTORY.createAnyMap(); | |
inputAction1Any.put("noGroup", "docsBucket"); | |
action1Any.put("input", inputAction1Any); | |
final AnyMap outputAction1Any = AccessAny.FACTORY.createAnyMap(); | |
outputAction1Any.put("noGroup", "docsBucket"); | |
outputAction1Any.put("group2-1", "docsBucket"); | |
outputAction1Any.put("group2-2", "docsBucket"); | |
action1Any.put("output", outputAction1Any); | |
actionsAny.add(action1Any); | |
workflowAny.put("actions", actionsAny); | |
final WorkflowDefinition workflowDefinition = new WorkflowDefinition(workflowAny); | |
_defPersistence.addWorkflow(workflowDefinition); | |
final AnyMap reconvertedAny = _defPersistence.getWorkflow(workflowName).toAny(); | |
assertEquals(workflowAny, reconvertedAny); | |
} | |
/** | |
* Tests workflow with conflicting output groups. | |
* | |
* @throws Exception | |
* Exception while trying to access jobmanager or creating objects. | |
*/ | |
public void testDefinitionWithConflictingOutputGroups() throws Exception { | |
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap(); | |
workflowAny.put("name", "testDefinitionWithConflictingOutputGroups"); | |
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap(); | |
parametersAny.put("indexName", "index"); | |
workflowAny.put("parameters", parametersAny); | |
final AnyMap startActionAny = AccessAny.FACTORY.createAnyMap(); | |
startActionAny.put("worker", WORKER_1); | |
final AnyMap inputParametersAny = AccessAny.FACTORY.createAnyMap(); | |
inputParametersAny.put("index", "${indexName}"); | |
startActionAny.put("parameters", inputParametersAny); | |
final AnyMap outputAny = AccessAny.FACTORY.createAnyMap(); | |
outputAny.put("output", "docsBucket"); | |
startActionAny.put("output", outputAny); | |
workflowAny.put("startAction", startActionAny); | |
final AnySeq actionsAny = AccessAny.FACTORY.createAnySeq(); | |
final AnyMap action1Any = AccessAny.FACTORY.createAnyMap(); | |
action1Any.put("worker", "testSlotGroups"); | |
final AnyMap inputAction1Any = AccessAny.FACTORY.createAnyMap(); | |
inputAction1Any.put("noGroup", "docsBucket"); | |
action1Any.put("input", inputAction1Any); | |
final AnyMap outputAction1Any = AccessAny.FACTORY.createAnyMap(); | |
outputAction1Any.put("noGroup", "docsBucket"); | |
outputAction1Any.put("group1-1", "docsBucket"); | |
outputAction1Any.put("group2-2", "docsBucket"); | |
action1Any.put("output", outputAction1Any); | |
actionsAny.add(action1Any); | |
workflowAny.put("actions", actionsAny); | |
WorkflowDefinition workflowDefinition = new WorkflowDefinition(workflowAny); | |
try { | |
_defPersistence.addWorkflow(workflowDefinition); | |
fail("InvalidConfigException expected."); | |
} catch (final InvalidConfigException e) { | |
; // ok, let's amend the config and do it again | |
outputAction1Any.clear(); | |
outputAction1Any.put("noGroup", "docsBucket"); | |
outputAction1Any.put("group2-1", "docsBucket"); | |
outputAction1Any.put("group2-2", "docsBucket"); | |
action1Any.put("output", outputAction1Any); | |
workflowDefinition = new WorkflowDefinition(workflowAny); | |
_defPersistence.addWorkflow(workflowDefinition); | |
} | |
final AnyMap reconvertedAny = _defPersistence.getWorkflow(workflowAny.getStringValue("name")).toAny(); | |
assertEquals(workflowAny, reconvertedAny); | |
} | |
/** | |
* check usage of inputWorker with persistent output only. | |
*/ | |
public void testBulkBuilderWithPersistentBuckets() throws Exception { | |
// create a persistent recordBulk bucket | |
final AnyMap bucketDef = AccessAny.FACTORY.createAnyMap(); | |
bucketDef.put("name", "persistentDocs"); | |
bucketDef.put("type", "recordBulks"); | |
_defPersistence.addBucket(new BucketDefinition(bucketDef)); | |
// create a simple bulkbuilder workflow using this bucket | |
final AnyMap workflowDef = AccessAny.FACTORY.createAnyMap(); | |
workflowDef.put("name", "testSlotParamaterValidationWorkflow"); | |
final AnyMap startAction = AccessAny.FACTORY.createAnyMap(); | |
startAction.put("worker", WORKER_1); | |
final AnyMap outputs = AccessAny.FACTORY.createAnyMap(); | |
outputs.put("output", "persistentDocs"); | |
startAction.put("output", outputs); | |
workflowDef.put("startAction", startAction); | |
_defPersistence.addWorkflow(new WorkflowDefinition(workflowDef)); | |
// create a job with this workflow | |
final AnyMap jobDef = AccessAny.FACTORY.createAnyMap(); | |
jobDef.put("name", "testSlotParamaterValidationJob"); | |
jobDef.put("workflow", "testSlotParamaterValidationWorkflow"); | |
final AnyMap parametersDef = AccessAny.FACTORY.createAnyMap(); | |
parametersDef.put(STORE, "persstore"); | |
_objectStoreService.ensureStore("persstore"); | |
jobDef.put("parameters", parametersDef); | |
// this should work without having to set the "tempStore" parameter. | |
_defPersistence.addJob(new JobDefinition(jobDef)); | |
} | |
/** | |
* check mandatory grouped slots. | |
*/ | |
public void testMandatoryGroupedSlots() throws Exception { | |
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap(); | |
final String workflowName = "testWorkingDefinition"; | |
workflowAny.put("name", workflowName); | |
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap(); | |
parametersAny.put("indexName", "index"); | |
workflowAny.put("parameters", parametersAny); | |
final AnyMap startActionAny = AccessAny.FACTORY.createAnyMap(); | |
startActionAny.put("worker", WORKER_1); | |
final AnyMap inputParametersAny = AccessAny.FACTORY.createAnyMap(); | |
inputParametersAny.put("index", "${indexName}"); | |
startActionAny.put("parameters", inputParametersAny); | |
final AnyMap outputAny = AccessAny.FACTORY.createAnyMap(); | |
outputAny.put("output", "docsBucket"); | |
startActionAny.put("output", outputAny); | |
workflowAny.put("startAction", startActionAny); | |
final AnySeq actionsAny = AccessAny.FACTORY.createAnySeq(); | |
final AnyMap action1Any = AccessAny.FACTORY.createAnyMap(); | |
action1Any.put("worker", "testSlotGroups"); | |
final AnyMap inputAction1Any = AccessAny.FACTORY.createAnyMap(); | |
inputAction1Any.put("noGroup", "docsBucket"); | |
action1Any.put("input", inputAction1Any); | |
final AnyMap outputAction1Any = AccessAny.FACTORY.createAnyMap(); | |
action1Any.put("output", outputAction1Any); | |
actionsAny.add(action1Any); | |
workflowAny.put("actions", actionsAny); | |
// check: mandatory groups, but no group used | |
outputAction1Any.put("noGroup", "docsBucket"); | |
try { | |
final WorkflowDefinition workflowDefinition = new WorkflowDefinition(workflowAny); | |
_defPersistence.addWorkflow(workflowDefinition); | |
fail("should not work"); | |
} catch (final InvalidConfigException ex) { | |
; // | |
} | |
// check: mandatory grouped slot not set | |
outputAction1Any.put("group2-2", "docsBucket"); | |
try { | |
final WorkflowDefinition workflowDefinition = new WorkflowDefinition(workflowAny); | |
_defPersistence.addWorkflow(workflowDefinition); | |
fail("should not work"); | |
} catch (final InvalidConfigException ex) { | |
; // | |
} | |
// check: mandatory ungrouped slot not set | |
outputAction1Any.clear(); | |
outputAction1Any.put("group2-1", "docsBucket"); | |
try { | |
final WorkflowDefinition workflowDefinition = new WorkflowDefinition(workflowAny); | |
_defPersistence.addWorkflow(workflowDefinition); | |
fail("should not work"); | |
} catch (final InvalidConfigException ex) { | |
; // | |
} | |
} | |
} |