blob: 48f2e517c264e1a0bfb070762db52283212a2020 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Andreas Schank (Attensity Europe GmbH) - initial implementation
**********************************************************************************************************************/
package org.eclipse.smila.jobmanager.test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.AnySeq;
import org.eclipse.smila.datamodel.AnyUtil;
import org.eclipse.smila.jobmanager.BucketDefinition;
import org.eclipse.smila.jobmanager.DataObjectTypeDefinition;
import org.eclipse.smila.jobmanager.InvalidConfigException;
import org.eclipse.smila.jobmanager.JobDefinition;
import org.eclipse.smila.jobmanager.JobManager;
import org.eclipse.smila.jobmanager.JobManagerException;
import org.eclipse.smila.jobmanager.WorkerDefinition;
import org.eclipse.smila.jobmanager.WorkflowDefinition;
import org.eclipse.smila.jobmanager.internal.AccessAny;
import org.eclipse.smila.jobmanager.internal.JobManagerImpl;
import org.eclipse.smila.jobmanager.util.ValueExpression;
import org.eclipse.smila.objectstore.ObjectStoreService;
import org.eclipse.smila.taskmanager.Task;
/**
* Test class for JobMaangerImpl.
*
*/
public class TestJobManagerWithPersistence extends JobManagerTestBase {
private static final String STORE = "store";
private static final String TEMPSTORE = "tempstore";
/** {@inheritDoc} */
@Override
protected void setUp() throws Exception {
super.setUp();
_objectStoreService.ensureStore(TEMPSTORE);
_objectStoreService.ensureStore(STORE);
}
/** {@inheritDoc} */
@Override
protected void tearDown() throws Exception {
_objectStoreService.removeStore(STORE);
_objectStoreService.removeStore(TEMPSTORE);
super.tearDown();
}
/**
* Creates a bucket-Any representation with the given name, type and store.
*/
private AnyMap createTestBucket(final String name, final String type, final String store) throws Exception {
final AnyMap bucketAny = AccessAny.FACTORY.createAnyMap();
bucketAny.put(BucketDefinition.KEY_NAME, name);
bucketAny.put(BucketDefinition.KEY_TYPE, type);
return bucketAny;
}
/**
* @return a (correct) simple workflow definition containing only a bulkbuilder.
*/
private AnyMap createTestWorkflow(final String workflowName, final String insertBucketName,
final AnyMap parameters) {
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap();
workflowAny.put(WorkflowDefinition.KEY_NAME, workflowName);
final AnyMap startActionAny = AccessAny.FACTORY.createAnyMap();
startActionAny.put("worker", "inputWorker");
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap();
parametersAny.put("index", "index");
final AnyMap outputAny = AccessAny.FACTORY.createAnyMap();
outputAny.put("output", (insertBucketName != null) ? insertBucketName : "insertBucket");
startActionAny.put("output", outputAny);
workflowAny.put("startAction", startActionAny);
if (parameters != null) {
workflowAny.put(WorkflowDefinition.KEY_PARAMETERS, parameters);
}
return workflowAny;
}
/**
* @return a job definition for given parameters.
*/
private AnyMap createTestJob(final String name, final String workflow, final AnyMap parameters) {
final AnyMap jobAny = AccessAny.FACTORY.createAnyMap();
jobAny.put("name", name);
jobAny.put("workflow", workflow);
jobAny.put("parameters", parameters);
return jobAny;
}
/**
* Test if JobManager service was successfully started and registered.
*
* @throws Exception
* no service found.
*/
public void testService() throws Exception {
final JobManager service = getService(JobManager.class);
assertNotNull(service);
assertTrue(service instanceof JobManagerImpl);
}
/**
* Tests if jobmanager can load configured workers.
*
* @throws Exception
* Exception while trying to access jobmanager.
*/
public void testAccessPredefinedWorkers() throws Exception {
final Collection<String> workerNames = _defPersistence.getWorkers();
assertNotNull(workerNames);
assertTrue(workerNames.size() > 1);
for (final String name : workerNames) {
final WorkerDefinition worker = _defPersistence.getWorker(name);
assertNotNull(worker);
assertEquals(name, worker.getName());
}
}
/**
* Tests if jobmanager can load configured data object types.
*
* @throws Exception
* Exception while trying to access jobmanager.
*/
public void testAccessPredefinedDataObjectTypes() throws Exception {
final Collection<String> dotNames = _defPersistence.getDataObjectTypes();
assertNotNull(dotNames);
assertTrue(dotNames.size() > 0);
for (final String name : dotNames) {
final DataObjectTypeDefinition dot = _defPersistence.getDataObjectType(name);
assertNotNull(dot);
assertEquals(name, dot.getName());
}
}
/**
* Tests if jobmanager can load configured buckets.
*
* @throws Exception
* Exception while trying to access jobmanager.
*/
public void testAccessPredefinedBuckets() throws Exception {
final Collection<String> bucketNames = _defPersistence.getBuckets();
assertNotNull(bucketNames);
assertTrue(bucketNames.size() > 0);
for (final String name : bucketNames) {
final BucketDefinition bucket = _defPersistence.getBucket(name);
assertNotNull(bucket);
assertEquals(name, bucket.getName());
}
}
/**
* Tests if jobmanager can load configured workflows.
*
* @throws Exception
* Exception while trying to access jobmanager.
*/
public void testAccessPredefinedWorkflows() throws Exception {
final Collection<String> workflowNames = _defPersistence.getWorkflows();
assertNotNull(workflowNames);
assertTrue(workflowNames.size() > 0);
for (final String name : workflowNames) {
final WorkflowDefinition workflow = _defPersistence.getWorkflow(name);
assertNotNull(workflow);
assertEquals(name, workflow.getName());
}
}
/**
* Tests if jobmanager correctly adds buckets.
*
* @throws Exception
* Exception while trying to access jobmanager or creating objects.
*/
public void testAddBuckets() throws Exception {
final Collection<String> names = _defPersistence.getBuckets();
assertNotNull(names);
assertFalse(names.isEmpty());
final AnyMap bucketAny = AccessAny.FACTORY.createAnyMap();
bucketAny.put("name", "testAddBuckets-1");
bucketAny.put("type", "recordBulks");
_defPersistence.addBucket(new BucketDefinition(bucketAny));
bucketAny.put("name", "testAddBuckets-2");
_defPersistence.addBucket(new BucketDefinition(bucketAny));
final Collection<String> namesAfter = _defPersistence.getBuckets();
assertEquals(names.size() + 2, namesAfter.size());
}
/**
* Tests if jobmanager correctly adds workflows.
*
* @throws Exception
* Exception while trying to access jobmanager or creating objects.
*/
public void testAddWorkflow() throws Exception {
final int noOfWorkflows = _defPersistence.getWorkflows().size();
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap();
workflowAny.put("name", "indexStat");
final AnyMap startActionAny = AccessAny.FACTORY.createAnyMap();
startActionAny.put("worker", "triggeredWorker");
final AnyMap inputAny = AccessAny.FACTORY.createAnyMap();
inputAny.put("input", "finalBucket");
startActionAny.put("input", inputAny);
final AnyMap outputAny = AccessAny.FACTORY.createAnyMap();
outputAny.put("output", "triggeredBucket");
startActionAny.put("output", outputAny);
workflowAny.put("startAction", startActionAny);
final WorkflowDefinition workflow = new WorkflowDefinition(workflowAny);
_defPersistence.addWorkflow(workflow);
final int noOfWorkflowsNew = _defPersistence.getWorkflows().size();
assertTrue(noOfWorkflowsNew == noOfWorkflows + 1);
final WorkflowDefinition workflowCheck = _defPersistence.getWorkflow(workflow.getName());
assertNotNull(workflowCheck);
assertEquals(workflow.getName(), workflowCheck.getName());
assertFalse(workflowCheck.isReadOnly());
}
/**
* Tests if jobmanager does not remove predefined buckets.
*
* @throws Exception
* Exception while trying to access jobmanager or creating objects.
*/
public void testDontRemovePredefinedBuckets() throws Exception {
final Collection<String> names = _defPersistence.getBuckets();
assertNotNull(names);
assertFalse(names.isEmpty());
for (final String name : names) {
try {
_defPersistence.removeBucket(name);
fail("should not work");
} catch (final Exception ex) {
assertTrue("Wrong exception caught: " + ex, ex instanceof IllegalArgumentException);
}
}
}
/**
* Tests if jobmanager does not accept updating predefined buckets.
*/
public void testDontAddPredefinedBucket() throws Exception {
final Collection<String> names = _defPersistence.getBuckets();
assertNotNull(names);
assertFalse(names.isEmpty());
final AnyMap bucketAny = AccessAny.FACTORY.createAnyMap();
bucketAny.put("type", "recordBulks");
bucketAny.put("name", names.iterator().next());
try {
_defPersistence.addBucket(new BucketDefinition(bucketAny));
fail("should not work");
} catch (final Exception ex) {
assertTrue("Wrong exception caught: " + ex, ex instanceof InvalidConfigException);
}
final Collection<String> namesAfter = _defPersistence.getBuckets();
assertEquals(names.size(), namesAfter.size());
}
/**
* Tests if jobmanager does not accept incomplete bucket definitions.
*
* @throws Exception
* Exception while trying to access jobmanager or creating objects.
*/
public void testDontAddIncompleteBucket() throws Exception {
final Collection<String> names = _defPersistence.getBuckets();
assertNotNull(names);
assertFalse(names.isEmpty());
final Collection<BucketDefinition> newBuckets = new ArrayList<BucketDefinition>();
final AnyMap bucketAny = AccessAny.FACTORY.createAnyMap();
bucketAny.put("name", "testDontAddIncompleteBucket-1");
try {
newBuckets.add(new BucketDefinition(bucketAny));
fail("should not work");
} catch (final Exception ex) {
assertTrue("Wrong exception caught: " + ex, ex instanceof InvalidConfigException);
}
final Collection<String> namesAfter = _defPersistence.getBuckets();
assertEquals(names.size(), namesAfter.size());
}
/**
* Tests if jobmanager does not delete workflows that are referenced by a job.
*
* @throws Exception
* Exception while trying to access jobmanager or creating objects.
*/
public void testDontDeleteWorkflowsThatAreStillReferencedByAJob() throws Exception {
final Collection<String> names = _defPersistence.getWorkflows();
assertNotNull(names);
final Map<String, String> workflowParameters = new LinkedHashMap<String, String>();
workflowParameters.put("indexName", "indexName");
workflowParameters.put("index", "index");
workflowParameters.put("tempStore", TEMPSTORE);
workflowParameters.put(STORE, TEMPSTORE);
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap();
workflowAny.put("name", "testDeleteWorkflow-1");
workflowAny.put("parameters", AnyUtil.objectToAny(workflowParameters));
final AnyMap startActionAny = AccessAny.FACTORY.createAnyMap();
startActionAny.put("worker", "triggeredWorker");
final AnyMap inputSlots = AccessAny.FACTORY.createAnyMap();
inputSlots.put("input", "finalBucket");
startActionAny.put("input", inputSlots);
final AnyMap outputSlots = AccessAny.FACTORY.createAnyMap();
outputSlots.put("output", "triggeredBucket");
startActionAny.put("output", outputSlots);
workflowAny.put("startAction", startActionAny);
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny));
workflowAny.put("name", "testDeleteWorkflow-2");
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny));
final AnyMap jobAny = AccessAny.FACTORY.createAnyMap();
jobAny.put("name", "testDeleteWorkflowJob-1");
jobAny.put("workflow", "testDeleteWorkflow-1");
_defPersistence.addJob(new JobDefinition(jobAny));
// so now testDeleteWorkflow-2 can be deleted, while testDeleteWorkflow-1 is referenced by a job
try {
_defPersistence.removeWorkflow("testDeleteWorkflow-1");
fail("should not work");
} catch (final Exception ex) {
assertTrue("Wrong exception caught: " + ex, ex instanceof InvalidConfigException);
}
_defPersistence.removeWorkflow("testDeleteWorkflow-2");
final Collection<String> namesAfter = _defPersistence.getWorkflows();
assertEquals(names.size() + 1, namesAfter.size());
// now delete the job
_defPersistence.removeJob(jobAny.getStringValue("name"));
// now also workflow1 can be removed:
_defPersistence.removeWorkflow("testDeleteWorkflow-1");
}
/**
* Tests if jobmanager does not delete buckets that are still referenced by a workflow.
*
* @throws Exception
* Exception while trying to access jobmanager or creating objects.
*/
public void testDontDeleteBucketsThatAreStillReferencedByAWorkflow() throws Exception {
final Collection<String> names = _defPersistence.getBuckets();
final AnyMap bucket1Any = createTestBucket("docs-bucket-1", "recordBulks", "docs1");
_defPersistence.addBucket(new BucketDefinition(bucket1Any));
final AnyMap bucket2Any = createTestBucket("docs-bucket-2", "recordBulks", "docs2");
_defPersistence.addBucket(new BucketDefinition(bucket2Any));
final AnyMap bucket3Any = createTestBucket("docs-bucket-3", "recordBulks", "docs3");
_defPersistence.addBucket(new BucketDefinition(bucket3Any));
// workflow uses docs-bucket-1 and docs-bucket-2
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap();
workflowAny.put("name", "testDeleteWorkflow-1");
final AnyMap startActionAny = AccessAny.FACTORY.createAnyMap();
startActionAny.put("worker", "triggeredWorker");
final AnyMap startActionInputAny = AccessAny.FACTORY.createAnyMap();
startActionInputAny.put("input", "docs-bucket-1");
startActionAny.put("input", startActionInputAny);
final AnyMap startActionOutputAny = AccessAny.FACTORY.createAnyMap();
startActionOutputAny.put("output", "docs-bucket-2");
startActionAny.put("output", startActionOutputAny);
workflowAny.put("startAction", startActionAny);
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny));
// so now docs-bucket-3 can be deleted, the others not
try {
_defPersistence.removeBucket("docs-bucket-1");
fail("should not work");
} catch (final Exception ex) {
assertTrue("Wrong exception caught: " + ex, ex instanceof InvalidConfigException);
}
try {
_defPersistence.removeBucket("docs-bucket-2");
fail("should not work");
} catch (final Exception ex) {
assertTrue("Wrong exception caught: " + ex, ex instanceof InvalidConfigException);
}
_defPersistence.removeBucket("docs-bucket-3");
// still two more left...
final Collection<String> namesAfter = _defPersistence.getBuckets();
assertEquals(names.size() + 2, namesAfter.size());
// now delete the workflow
_defPersistence.removeWorkflow(workflowAny.getStringValue("name"));
// now all buckets can be removed
_defPersistence.removeBucket(bucket1Any.getStringValue("name"));
_defPersistence.removeBucket(bucket2Any.getStringValue("name"));
// all new buckets are gone
final Collection<String> namesFinal = _defPersistence.getBuckets();
assertEquals(names, namesFinal);
}
/**
* Tests if jobmanager does not accept workflows with non-existing workers.
*
* @throws Exception
* Exception while trying to access jobmanager or creating objects.
*/
public void testDontAddWorkflowWithNonexistingWorker() throws Exception {
final Collection<String> names = _defPersistence.getBuckets();
assertNotNull(names);
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap();
workflowAny.put("name", "testDontAddWorkflowWithNonexistingWorker");
final AnyMap startActionAny = AccessAny.FACTORY.createAnyMap();
startActionAny.put("worker", "nonexistingWorker");
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap();
parametersAny.put("index", "index");
startActionAny.put("parameters", parametersAny);
final AnyMap outputAny = AccessAny.FACTORY.createAnyMap();
outputAny.put("output", "insertBucket");
startActionAny.put("output", outputAny);
workflowAny.put("startAction", startActionAny);
// one bucket does not exist: error...
try {
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny));
fail("should not work");
} catch (final Exception ex) {
assertTrue("Wrong exception caught: " + ex, ex instanceof InvalidConfigException);
}
}
/**
* Tests if jobmanager does not accept transient bucket as workflow start action input bucket.
*
* @throws Exception
* error
*/
public void testDontAddWorkflowWithTransientStartBucket() throws Exception {
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap();
workflowAny.put("name", "testDontAddWorkflowWithTransientStartBucket");
final AnyMap startActionAny = AccessAny.FACTORY.createAnyMap();
startActionAny.put("worker", "triggeredWorker");
final AnyMap inputAny = AccessAny.FACTORY.createAnyMap();
inputAny.put("input", "docsBucket");
startActionAny.put("input", inputAny);
final AnyMap outputAny = AccessAny.FACTORY.createAnyMap();
outputAny.put("output", "triggeredBucket");
startActionAny.put("output", outputAny);
workflowAny.put("startAction", startActionAny);
try {
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny));
fail("should not work, start action worker has transient input bucket");
} catch (final Exception ex) {
assertTrue("Wrong exception caught: " + ex, ex instanceof InvalidConfigException);
}
}
/**
* Tests if jobmanager does not accept workflow bucket with the wrong type for a worker slot.
*
* @throws Exception
* Exception while trying to access jobmanager or creating objects.
*/
public void testDontAddWorkflowWithIncorrectBucketTypeForWorkerSlot() throws Exception {
final Collection<String> names = _defPersistence.getBuckets();
assertNotNull(names);
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap();
workflowAny.put("name", "testDontAddWorkflowWithIncorrectBucketTypeForWorkerSlot");
final AnyMap startActionAny = AccessAny.FACTORY.createAnyMap();
startActionAny.put("worker", WORKER_1);
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap();
parametersAny.put("index", "index");
final AnyMap outputAny = AccessAny.FACTORY.createAnyMap();
outputAny.put("output", "wrongBucket");
startActionAny.put("output", outputAny);
workflowAny.put("startAction", startActionAny);
// one bucket type does not fit: error...
try {
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny));
fail("should not work");
} catch (final Exception ex) {
assertTrue("Wrong exception caught: " + ex, ex instanceof InvalidConfigException);
}
// but check that the correct one won't fail:
outputAny.put("output", "docsBucket");
startActionAny.put("output", outputAny);
workflowAny.put("startAction", startActionAny);
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny));
_defPersistence.removeWorkflow(workflowAny.getStringValue("name"));
}
/**
* Tests if jobmanager does not accept updating predefined workflows.
*/
public void testDontAddPredefinedWorkflows() throws Exception {
final Collection<String> names = _defPersistence.getWorkflows();
assertNotNull(names);
assertFalse(names.isEmpty());
final WorkflowDefinition workflow = _defPersistence.getWorkflow(names.iterator().next());
assertNotNull(workflow);
try {
_defPersistence.addWorkflow(workflow);
fail("should not work");
} catch (final Exception ex) {
assertTrue("Wrong exception caught: " + ex, ex instanceof InvalidConfigException);
}
final Collection<String> namesAfter = _defPersistence.getWorkflows();
assertEquals(names.size(), namesAfter.size());
}
/**
* Tests if jobmanager does not remove predefined workflows.
*
* @throws Exception
* Exception while trying to access jobmanager or creating objects.
*/
public void testDontRemovePredefinedWorkflows() throws Exception {
final Collection<String> names = _defPersistence.getWorkflows();
assertNotNull(names);
assertFalse(names.isEmpty());
for (final String name : names) {
try {
_defPersistence.removeWorkflow(name);
fail("should not work");
} catch (final Exception ex) {
assertTrue("Wrong exception caught: " + ex, ex instanceof IllegalArgumentException);
}
}
}
/**
* Tests that jobmanager does accept overwriting of buckets and re-validates workflows/jobs based on it.
*/
public void testAddExistingBucket() throws Exception {
// create a (valid) job based on test workflow and bucket
final BucketDefinition originalBucket = new BucketDefinition(createTestBucket("testBucket", "recordBulks", ""));
_defPersistence.addBucket(originalBucket);
final WorkflowDefinition testWorkflow =
new WorkflowDefinition(createTestWorkflow("testAddExistingBucketWorkflow", originalBucket.getName(), null));
_defPersistence.addWorkflow(testWorkflow);
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap();
parametersAny.put("tempStore", TEMPSTORE);
parametersAny.put(STORE, STORE);
parametersAny.put("index", "index");
final AnyMap jobDef = createTestJob("testAddExistingBucketJob", testWorkflow.getName(), parametersAny);
_defPersistence.addJob(new JobDefinition(jobDef));
// get a bucket of the workflow, change the data type and try to add it -> should fail
final AnyMap bucketAny = originalBucket.toAny();
final String originalDataType = originalBucket.getDataObjectType();
bucketAny.put(BucketDefinition.KEY_TYPE, "wrongType");
BucketDefinition updatedBucket = new BucketDefinition(bucketAny);
try {
_defPersistence.addBucket(updatedBucket);
fail("should not work - wrong data type for existing workflow");
} catch (final Exception ex) {
assertTrue("Wrong exception caught: " + ex, ex instanceof InvalidConfigException);
}
bucketAny.put(BucketDefinition.KEY_TYPE, originalDataType); // reset original data type
// add an unresolved parameter to the bucket and try to add it -> should fail again
final AnyMap bucketParameters = AccessAny.FACTORY.createAnyMap();
bucketParameters.put("tempStore", "${unresolvedVariable}");
bucketAny.put(BucketDefinition.KEY_PARAMETERS, bucketParameters);
updatedBucket = new BucketDefinition(bucketAny);
try {
_defPersistence.addBucket(updatedBucket);
fail("should not work - unresolved variable in bucket parameters");
} catch (final Exception ex) {
assertTrue("Wrong exception caught: " + ex, ex instanceof InvalidConfigException);
}
// resolve the unresolved parameter and try to add the bucket -> should succeed
updatedBucket.getParameters().put("tempStore", new ValueExpression("newtempstore"));
_defPersistence.addBucket(updatedBucket);
final BucketDefinition newBucket = _defPersistence.getBucket(originalBucket.getName());
assertEquals("newtempstore", newBucket.getParameters().get("tempStore").getExpression());
assertFalse(newBucket.isReadOnly());
}
/**
* Tests that jobmanager does accept overwriting of workflows and re-validates jobs based on it.
*/
public void testAddExistingWorkflow() throws Exception {
// create a (valid) job based on test workflow
final WorkflowDefinition originalWorkflow =
new WorkflowDefinition(createTestWorkflow("testAddExistingWorkflow", null, null));
_defPersistence.addWorkflow(originalWorkflow);
assertNotNull(originalWorkflow);
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap();
parametersAny.put("tempStore", TEMPSTORE);
parametersAny.put(STORE, TEMPSTORE);
parametersAny.put("index", "index");
final AnyMap jobDef = createTestJob("testJob", originalWorkflow.getName(), parametersAny);
_defPersistence.addJob(new JobDefinition(jobDef));
// add an unresolved parameter to the workflow -> add workflow should fail
final AnyMap workflowAny = originalWorkflow.toAny();
final AnyMap workflowParameter = AccessAny.FACTORY.createAnyMap();
workflowParameter.put("myTestParam", "${unresolvedVariable}");
workflowAny.put(WorkflowDefinition.KEY_PARAMETERS, workflowParameter);
final WorkflowDefinition workflow = new WorkflowDefinition(workflowAny);
try {
_defPersistence.addWorkflow(workflow);
fail("should not work - unresolved parameter in job definition based on workflow");
} catch (final Exception ex) {
assertTrue("Wrong exception caught: " + ex, ex instanceof InvalidConfigException);
}
// remove unresolved parameter from workflow definition -> add workflow should succeed
workflow.getParameters().put("myTestParam", new ValueExpression("resolvedParameter"));
_defPersistence.addWorkflow(workflow);
final WorkflowDefinition newWorkflow = _defPersistence.getWorkflow(workflow.getName());
assertTrue(newWorkflow.getParameters().containsKey("myTestParam"));
assertFalse(newWorkflow.isReadOnly());
}
/**
* Tests that jobmanager does accept overwriting of job definitions and validates them.
*/
public void testAddExistingJob() throws Exception {
final String jobName = "testAddExistingJob";
final String workflowName = "testWorkflow";
// add valid job
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap();
parametersAny.put("workerParameter", "value");
parametersAny.put("store", STORE);
parametersAny.put("tempStore", TEMPSTORE);
AnyMap jobDef = createTestJob(jobName, workflowName, parametersAny);
_defPersistence.addJob(new JobDefinition(jobDef));
// try to add job again - with unresolved parameter -> should fail
try {
parametersAny.remove("store");
jobDef = createTestJob(jobName, workflowName, parametersAny);
_defPersistence.addJob(new JobDefinition(jobDef));
System.out.println(_defPersistence.getJobs());
fail("should not work - unresolved parameter in job definition based on workflow");
} catch (final Exception ex) {
assertTrue("Wrong exception caught: " + ex, ex instanceof InvalidConfigException);
}
// try to add job again - with resolved parameter -> should succeed
parametersAny.put("store", "newstore");
_objectStoreService.ensureStore("newstore");
jobDef = createTestJob(jobName, workflowName, parametersAny);
_defPersistence.addJob(new JobDefinition(jobDef));
System.out.println(_defPersistence.getJobs());
assertEquals("newstore", _defPersistence.getJob(jobName).getParameters().get("store").toString());
_objectStoreService.removeStore("newstore");
}
/**
* Tests if jobmanager does accept workflow without a bucket for an optional worker slot.
*
* @throws Exception
* Exception while trying to access jobmanager or creating objects.
*/
public void testAddWorkflowWithoutBucketForOptionalWorkerSlot() throws Exception {
final Collection<String> names = _defPersistence.getBuckets();
assertNotNull(names);
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap();
workflowAny.put("name", "testAddWorkflowWithoutBucketForOptionalWorkerSlot");
final AnyMap startActionAny = AccessAny.FACTORY.createAnyMap();
startActionAny.put("worker", WORKER_1);
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap();
parametersAny.put("index", "index");
workflowAny.put("startAction", startActionAny);
// the bucket slot is optional and not defined: that's ok
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny));
_defPersistence.removeWorkflow(workflowAny.getStringValue("name"));
// check without the complete output section, should also work, because both slots are optional
startActionAny.remove("output");
workflowAny.put("startAction", startActionAny);
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny));
_defPersistence.removeWorkflow(workflowAny.getStringValue("name"));
}
/**
* Tests if jobmanager does not accept workflow bucket without a bucket name for a mandatory worker output slot.
*
* @throws Exception
* Exception while trying to access jobmanager or creating objects.
*/
public void testAddWorkflowWithoutBucketForMandatoryWorkerSlot() throws Exception {
final Collection<String> names = _defPersistence.getBuckets();
assertNotNull(names);
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap();
workflowAny.put("name", "testAddWorkflowWithoutBucketForMandatoryWorkerSlot");
final AnyMap startActionAny = AccessAny.FACTORY.createAnyMap();
startActionAny.put("worker", "triggeredWorker");
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap();
parametersAny.put("index", "index");
final AnyMap inputAny = AccessAny.FACTORY.createAnyMap();
inputAny.put("input", "finalBucket");
startActionAny.put("input", inputAny);
workflowAny.put("startAction", startActionAny);
// the bucket slot is not optional and not defined: that's not ok
try {
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny));
fail("should not work");
} catch (final Exception ex) {
assertTrue("Wrong exception caught: " + ex, ex instanceof InvalidConfigException);
}
// check with the correct output section, should work
final AnyMap outputAny = AccessAny.FACTORY.createAnyMap();
outputAny.put("output", "triggeredBucket");
startActionAny.put("output", outputAny);
workflowAny.put("startAction", startActionAny);
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny));
_defPersistence.removeWorkflow(workflowAny.getStringValue("name"));
}
/**
* Tests if the jobmanager throws an exception when trying to add a bucket with a parameter that would result in an
* invalid store name.
*
* @throws Exception
* Exception while trying to access jobmanager or creating objects.
*/
public void testInvalidBucketStoreParameter() throws Exception {
final Collection<String> names = _defPersistence.getBuckets();
assertNotNull(names);
final AnyMap bucketAny = AccessAny.FACTORY.createAnyMap();
bucketAny.put("name", "testInvalidBucketStoreParameter");
bucketAny.put("type", "recordBulks");
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap();
parametersAny.put("store", "invalid~Store");
bucketAny.put("parameters", parametersAny);
try {
_defPersistence.addBucket(new BucketDefinition(bucketAny));
fail("Invalid store name: should not work.");
} catch (final Exception ex) {
assertTrue("wrong exception class " + ex.getClass(), ex instanceof JobManagerException);
}
// but must work with correct parameter:
parametersAny.put("store", "validstore");
_defPersistence.addBucket(new BucketDefinition(bucketAny));
final Collection<String> namesAfter = _defPersistence.getBuckets();
assertEquals(names.size() + 1, namesAfter.size());
}
/**
* Tests if the jobmanager ignores action parameters that would affect buckets.
*
* @throws Exception
* Exception while trying to access jobmanager or creating objects.
*/
public void testInvalidWorkflowActionStoreParameter() throws Exception {
final Collection<String> names = _defPersistence.getWorkflows();
assertNotNull(names);
final AnyMap bucketAny = AccessAny.FACTORY.createAnyMap();
bucketAny.put("name", "persistentTestInvalidWorkflowActionStoreParameterBucket");
bucketAny.put("type", "recordBulks");
_defPersistence.addBucket(new BucketDefinition(bucketAny));
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap();
workflowAny.put("name", "testInvalidWorkflowStartActionStoreParameter");
final AnyMap startActionAny = AccessAny.FACTORY.createAnyMap();
startActionAny.put("worker", WORKER_1);
final AnyMap parametersStartActionAny = AccessAny.FACTORY.createAnyMap();
parametersStartActionAny.put(STORE, "in~Valid*+%&$");
startActionAny.put("parameters", parametersStartActionAny);
final AnyMap outputStartActionAny = AccessAny.FACTORY.createAnyMap();
outputStartActionAny.put("output", "persistentTestInvalidWorkflowActionStoreParameterBucket");
startActionAny.put("output", outputStartActionAny);
workflowAny.put("startAction", startActionAny);
final AnyMap actionAny = AccessAny.FACTORY.createAnyMap();
actionAny.put("worker", "intermediateWorker");
final AnyMap parametersActionAny = AccessAny.FACTORY.createAnyMap();
parametersActionAny.put("tempStore", "in~Validäöü\\?");
actionAny.put("parameters", parametersActionAny);
final AnyMap inputActionAny = AccessAny.FACTORY.createAnyMap();
inputActionAny.put("input", "persistentTestInvalidWorkflowActionStoreParameterBucket");
actionAny.put("input", inputActionAny);
final AnyMap outputActionAny = AccessAny.FACTORY.createAnyMap();
outputActionAny.put("output", "processedBucket");
actionAny.put("output", outputActionAny);
final AnySeq actionListAny = AccessAny.FACTORY.createAnySeq();
actionListAny.add(actionAny);
workflowAny.put("actions", actionListAny);
// both wrong:
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny));
// would normally result in invalid store name, but since this is not checked, we must have succeeded.
final Collection<String> namesAfter = _defPersistence.getWorkflows();
assertEquals(names.size() + 1, namesAfter.size());
}
/**
* Tests if the jobmanager throws an exception when trying to add a workflow with a parameter that would result in an
* invalid store name.
*
* @throws Exception
* Exception while trying to access jobmanager or creating objects.
*/
public void testInvalidWorkflowParameter() throws Exception {
final Collection<String> names = _defPersistence.getWorkflows();
assertNotNull(names);
final AnyMap bucketAny = AccessAny.FACTORY.createAnyMap();
bucketAny.put("name", "persistentTestInvalidWorkflowParameterBucket");
bucketAny.put("type", "recordBulks");
_defPersistence.addBucket(new BucketDefinition(bucketAny));
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap();
workflowAny.put("name", "testInvalidWorkflowParameter");
final AnyMap startActionAny = AccessAny.FACTORY.createAnyMap();
startActionAny.put("worker", WORKER_1);
final AnyMap outputStartActionAny = AccessAny.FACTORY.createAnyMap();
outputStartActionAny.put("output", "persistentTestInvalidWorkflowParameterBucket");
startActionAny.put("output", outputStartActionAny);
workflowAny.put("startAction", startActionAny);
final AnyMap actionAny = AccessAny.FACTORY.createAnyMap();
actionAny.put("worker", WORKER_1);
final AnyMap outputActionAny = AccessAny.FACTORY.createAnyMap();
outputActionAny.put("output", "transientBucket");
actionAny.put("output", outputActionAny);
final AnySeq actionListAny = AccessAny.FACTORY.createAnySeq();
actionListAny.add(actionAny);
workflowAny.put("actions", actionListAny);
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap();
parametersAny.put("store", "in~+*Valid");
parametersAny.put("tempStore", "in+~*#Valid");
workflowAny.put("parameters", parametersAny);
// both wrong:
try {
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny));
fail("Invalid store name: should not work.");
} catch (final Exception ex) {
assertTrue("wrong exception class " + ex.getClass(), ex instanceof JobManagerException);
}
// store wrong
parametersAny.put("store", "In+*~Valid");
parametersAny.put("tempStore", "valid");
try {
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny));
fail("Invalid store name: should not work.");
} catch (final Exception ex) {
assertTrue("wrong exception class " + ex.getClass(), ex instanceof JobManagerException);
}
// tempStore wrong
parametersAny.put("store", "valid");
parametersAny.put("tempStore", "in~*Valid");
try {
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny));
fail("Invalid store name: should not work.");
} catch (final Exception ex) {
assertTrue("wrong exception class " + ex.getClass(), ex instanceof JobManagerException);
}
// but must work with correct parameter:
parametersAny.put(STORE, "valid");
parametersAny.put("tempStore", "valid");
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny));
final Collection<String> namesAfter = _defPersistence.getWorkflows();
assertEquals(names.size() + 1, namesAfter.size());
}
/**
* Tests if the jobmanager throws an exception when trying to add a job with a parameter that would result in an
* invalid store name.
*
* @throws Exception
* Exception while trying to access jobmanager or creating objects.
*/
public void testInvalidJobParameter() throws Exception {
final Collection<String> names = _defPersistence.getJobs();
assertNotNull(names);
final AnyMap jobAny = AccessAny.FACTORY.createAnyMap();
jobAny.put("name", "testInvalidJobParameter");
jobAny.put("workflow", "testWorkflow");
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap();
parametersAny.put("tempStore", "in+~*Valid");
parametersAny.put("store", "in+~*Valid");
parametersAny.put("workerParameter", "value");
jobAny.put("parameters", parametersAny);
// both wrong:
try {
_defPersistence.addJob(new JobDefinition(jobAny));
fail("Invalid store name: should not work.");
} catch (final Exception ex) {
assertTrue("wrong exception class " + ex.getClass(), ex instanceof JobManagerException);
}
// correct store name
parametersAny.put("tempStore", "valid");
parametersAny.put("store", "valid");
_defPersistence.addJob(new JobDefinition(jobAny));
final Collection<String> namesAfter = _defPersistence.getJobs();
assertEquals(names.size() + 1, namesAfter.size());
}
/**
* Tests adding a job without an optional parameter.
*
* @throws Exception
* Exception while trying to access jobmanager or creating objects.
*/
public void testMissingOptionalJobParameter() throws Exception {
final Collection<String> names = _defPersistence.getJobs();
assertNotNull(names);
final AnyMap jobAny = AccessAny.FACTORY.createAnyMap();
jobAny.put("name", "testInvalidJobParameter");
jobAny.put("workflow", "optionalParamWorkflow");
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap();
parametersAny.put("tempStore", "valid");
parametersAny.put("store", "valid");
// not now... parametersAny.put("workerParameter", "value");
jobAny.put("parameters", parametersAny);
// missing mandatory param
try {
_defPersistence.addJob(new JobDefinition(jobAny));
fail("Missing mandatory parameter: should not work.");
} catch (final Exception ex) {
assertTrue("wrong exception class " + ex.getClass(), ex instanceof JobManagerException);
}
// mandatory param set:
parametersAny.put("workerParameter", "value");
_defPersistence.addJob(new JobDefinition(jobAny));
final Collection<String> namesAfter = _defPersistence.getJobs();
assertEquals(names.size() + 1, namesAfter.size());
}
/**
* Tests adding a job with an optional parameter.
*
* @throws Exception
* Exception while trying to access jobmanager or creating objects.
*/
public void testOptionalJobParameter() throws Exception {
final Collection<String> names = _defPersistence.getJobs();
assertNotNull(names);
final String jobName = "testOptionalJobParameter";
final AnyMap jobAny = AccessAny.FACTORY.createAnyMap();
jobAny.put("name", jobName);
jobAny.put("workflow", "optionalParamWorkflow");
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap();
parametersAny.put("tempStore", "valid");
parametersAny.put("store", "valid");
// not now... parametersAny.put("workerParameter", "value");
parametersAny.put("optionalParameter", "optionalValue");
jobAny.put("parameters", parametersAny);
// missing mandatory param
try {
_defPersistence.addJob(new JobDefinition(jobAny));
fail("Missing mandatory parameter: should not work.");
} catch (final Exception ex) {
assertTrue("wrong exception class " + ex.getClass(), ex instanceof JobManagerException);
}
// mandatory param set:
parametersAny.put("workerParameter", "value");
_defPersistence.addJob(new JobDefinition(jobAny));
assertFalse(_defPersistence.getJob(jobName).isReadOnly());
final Collection<String> namesAfter = _defPersistence.getJobs();
assertEquals(names.size() + 1, namesAfter.size());
}
/**
* Tests if the jobmanager throws an exception when trying to create initial tasks for workers that are not marked as
* bulkSource workers.
*
* @throws Exception
* Exception while trying to access jobmanager or creating objects.
*/
public void testInitialTasksOnlyForBulkSourceWorkers() throws Exception {
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap();
workflowAny.put("name", "testInitialTasksOnlyForBulkSourceWorkers");
final AnyMap bulkBuilderActionAny = AccessAny.FACTORY.createAnyMap();
bulkBuilderActionAny.put("worker", WORKER_1);
final AnyMap bulkBuilderActionOutputAny = AccessAny.FACTORY.createAnyMap();
bulkBuilderActionOutputAny.put("output", "docsBucket");
bulkBuilderActionAny.put("output", bulkBuilderActionOutputAny);
workflowAny.put("startAction", bulkBuilderActionAny);
final AnySeq actionsAny = AccessAny.FACTORY.createAnySeq();
final AnyMap tmeRecoredAnalyzerActionAny = AccessAny.FACTORY.createAnyMap();
tmeRecoredAnalyzerActionAny.put("worker", "intermediateWorker");
final AnyMap tmeRecordAnalyzerActionOutputAny = AccessAny.FACTORY.createAnyMap();
tmeRecordAnalyzerActionOutputAny.put("output", "processedBucket");
final AnyMap tmeRecordAnalyzerActionInputAny = AccessAny.FACTORY.createAnyMap();
tmeRecordAnalyzerActionInputAny.put("input", "docsBucket");
tmeRecoredAnalyzerActionAny.put("input", tmeRecordAnalyzerActionInputAny);
tmeRecoredAnalyzerActionAny.put("output", tmeRecordAnalyzerActionOutputAny);
actionsAny.add(tmeRecoredAnalyzerActionAny);
workflowAny.put("actions", actionsAny);
// add workflow:
final JobManager service = getService(JobManager.class);
assertNotNull(service);
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny));
// add Job
final String jobName = "testInitialTasksOnlyForBulkSourceWorkers";
final AnyMap jobAny = AccessAny.FACTORY.createAnyMap();
jobAny.put("name", jobName);
jobAny.put("workflow", "testInitialTasksOnlyForBulkSourceWorkers");
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap();
parametersAny.put("tempStore", TEMPSTORE);
parametersAny.put("store", STORE);
parametersAny.put("workerParameter", "value");
jobAny.put("parameters", parametersAny);
_defPersistence.addJob(new JobDefinition(jobAny));
// prepare store
final ObjectStoreService store = getService(ObjectStoreService.class);
store.ensureStore("docstore");
// start job
final String jobId = service.startJob(jobName);
// try to start intermediate worker:
try {
service.getInitialTask(WORKER_2, jobName);
fail("must not get initial task for non bulkSource worker");
} catch (final Exception ex) {
assertTrue("wrong exception", ex instanceof JobManagerException);
}
// try to start something that is not part of the workflow:
try {
service.getInitialTask("theWorkerThatMustNotBeNamed", jobName);
fail("must not get initial task for non bulkSource worker");
} catch (final Exception ex) {
assertTrue("wrong exception", ex instanceof JobManagerException);
}
// start input worker:
final Task task = service.getInitialTask(WORKER_1, jobName);
assertNotNull(task);
assertEquals("wrong task worker name", WORKER_1, task.getWorkerName());
finishFinalTask(task, successResult());
service.finishJob(jobName, jobId);
}
/**
* Tests working workflow with groups.
*
* @throws Exception
* Exception while trying to access jobmanager or creating objects.
*/
public void testWorkingDefinitionWithGroups() throws Exception {
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap();
final String workflowName = "testWorkingDefinition";
workflowAny.put("name", workflowName);
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap();
parametersAny.put("indexName", "index");
workflowAny.put("parameters", parametersAny);
final AnyMap startActionAny = AccessAny.FACTORY.createAnyMap();
startActionAny.put("worker", WORKER_1);
final AnyMap inputParametersAny = AccessAny.FACTORY.createAnyMap();
inputParametersAny.put("index", "${indexName}");
startActionAny.put("parameters", inputParametersAny);
final AnyMap outputAny = AccessAny.FACTORY.createAnyMap();
outputAny.put("output", "docsBucket");
startActionAny.put("output", outputAny);
workflowAny.put("startAction", startActionAny);
final AnySeq actionsAny = AccessAny.FACTORY.createAnySeq();
final AnyMap action1Any = AccessAny.FACTORY.createAnyMap();
action1Any.put("worker", "testSlotGroups");
final AnyMap inputAction1Any = AccessAny.FACTORY.createAnyMap();
inputAction1Any.put("noGroup", "docsBucket");
action1Any.put("input", inputAction1Any);
final AnyMap outputAction1Any = AccessAny.FACTORY.createAnyMap();
outputAction1Any.put("noGroup", "docsBucket");
outputAction1Any.put("group2-1", "docsBucket");
outputAction1Any.put("group2-2", "docsBucket");
action1Any.put("output", outputAction1Any);
actionsAny.add(action1Any);
workflowAny.put("actions", actionsAny);
final WorkflowDefinition workflowDefinition = new WorkflowDefinition(workflowAny);
_defPersistence.addWorkflow(workflowDefinition);
final AnyMap reconvertedAny = _defPersistence.getWorkflow(workflowName).toAny();
assertEquals(workflowAny, reconvertedAny);
}
/**
* Tests workflow with conflicting output groups.
*
* @throws Exception
* Exception while trying to access jobmanager or creating objects.
*/
public void testDefinitionWithConflictingOutputGroups() throws Exception {
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap();
workflowAny.put("name", "testDefinitionWithConflictingOutputGroups");
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap();
parametersAny.put("indexName", "index");
workflowAny.put("parameters", parametersAny);
final AnyMap startActionAny = AccessAny.FACTORY.createAnyMap();
startActionAny.put("worker", WORKER_1);
final AnyMap inputParametersAny = AccessAny.FACTORY.createAnyMap();
inputParametersAny.put("index", "${indexName}");
startActionAny.put("parameters", inputParametersAny);
final AnyMap outputAny = AccessAny.FACTORY.createAnyMap();
outputAny.put("output", "docsBucket");
startActionAny.put("output", outputAny);
workflowAny.put("startAction", startActionAny);
final AnySeq actionsAny = AccessAny.FACTORY.createAnySeq();
final AnyMap action1Any = AccessAny.FACTORY.createAnyMap();
action1Any.put("worker", "testSlotGroups");
final AnyMap inputAction1Any = AccessAny.FACTORY.createAnyMap();
inputAction1Any.put("noGroup", "docsBucket");
action1Any.put("input", inputAction1Any);
final AnyMap outputAction1Any = AccessAny.FACTORY.createAnyMap();
outputAction1Any.put("noGroup", "docsBucket");
outputAction1Any.put("group1-1", "docsBucket");
outputAction1Any.put("group2-2", "docsBucket");
action1Any.put("output", outputAction1Any);
actionsAny.add(action1Any);
workflowAny.put("actions", actionsAny);
WorkflowDefinition workflowDefinition = new WorkflowDefinition(workflowAny);
try {
_defPersistence.addWorkflow(workflowDefinition);
fail("InvalidConfigException expected.");
} catch (final InvalidConfigException e) {
; // ok, let's amend the config and do it again
outputAction1Any.clear();
outputAction1Any.put("noGroup", "docsBucket");
outputAction1Any.put("group2-1", "docsBucket");
outputAction1Any.put("group2-2", "docsBucket");
action1Any.put("output", outputAction1Any);
workflowDefinition = new WorkflowDefinition(workflowAny);
_defPersistence.addWorkflow(workflowDefinition);
}
final AnyMap reconvertedAny = _defPersistence.getWorkflow(workflowAny.getStringValue("name")).toAny();
assertEquals(workflowAny, reconvertedAny);
}
/**
* check usage of inputWorker with persistent output only.
*/
public void testBulkBuilderWithPersistentBuckets() throws Exception {
// create a persistent recordBulk bucket
final AnyMap bucketDef = AccessAny.FACTORY.createAnyMap();
bucketDef.put("name", "persistentDocs");
bucketDef.put("type", "recordBulks");
_defPersistence.addBucket(new BucketDefinition(bucketDef));
// create a simple bulkbuilder workflow using this bucket
final AnyMap workflowDef = AccessAny.FACTORY.createAnyMap();
workflowDef.put("name", "testSlotParamaterValidationWorkflow");
final AnyMap startAction = AccessAny.FACTORY.createAnyMap();
startAction.put("worker", WORKER_1);
final AnyMap outputs = AccessAny.FACTORY.createAnyMap();
outputs.put("output", "persistentDocs");
startAction.put("output", outputs);
workflowDef.put("startAction", startAction);
_defPersistence.addWorkflow(new WorkflowDefinition(workflowDef));
// create a job with this workflow
final AnyMap jobDef = AccessAny.FACTORY.createAnyMap();
jobDef.put("name", "testSlotParamaterValidationJob");
jobDef.put("workflow", "testSlotParamaterValidationWorkflow");
final AnyMap parametersDef = AccessAny.FACTORY.createAnyMap();
parametersDef.put(STORE, "persstore");
_objectStoreService.ensureStore("persstore");
jobDef.put("parameters", parametersDef);
// this should work without having to set the "tempStore" parameter.
_defPersistence.addJob(new JobDefinition(jobDef));
}
/**
* check mandatory grouped slots.
*/
public void testMandatoryGroupedSlots() throws Exception {
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap();
final String workflowName = "testWorkingDefinition";
workflowAny.put("name", workflowName);
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap();
parametersAny.put("indexName", "index");
workflowAny.put("parameters", parametersAny);
final AnyMap startActionAny = AccessAny.FACTORY.createAnyMap();
startActionAny.put("worker", WORKER_1);
final AnyMap inputParametersAny = AccessAny.FACTORY.createAnyMap();
inputParametersAny.put("index", "${indexName}");
startActionAny.put("parameters", inputParametersAny);
final AnyMap outputAny = AccessAny.FACTORY.createAnyMap();
outputAny.put("output", "docsBucket");
startActionAny.put("output", outputAny);
workflowAny.put("startAction", startActionAny);
final AnySeq actionsAny = AccessAny.FACTORY.createAnySeq();
final AnyMap action1Any = AccessAny.FACTORY.createAnyMap();
action1Any.put("worker", "testSlotGroups");
final AnyMap inputAction1Any = AccessAny.FACTORY.createAnyMap();
inputAction1Any.put("noGroup", "docsBucket");
action1Any.put("input", inputAction1Any);
final AnyMap outputAction1Any = AccessAny.FACTORY.createAnyMap();
action1Any.put("output", outputAction1Any);
actionsAny.add(action1Any);
workflowAny.put("actions", actionsAny);
// check: mandatory groups, but no group used
outputAction1Any.put("noGroup", "docsBucket");
try {
final WorkflowDefinition workflowDefinition = new WorkflowDefinition(workflowAny);
_defPersistence.addWorkflow(workflowDefinition);
fail("should not work");
} catch (final InvalidConfigException ex) {
; //
}
// check: mandatory grouped slot not set
outputAction1Any.put("group2-2", "docsBucket");
try {
final WorkflowDefinition workflowDefinition = new WorkflowDefinition(workflowAny);
_defPersistence.addWorkflow(workflowDefinition);
fail("should not work");
} catch (final InvalidConfigException ex) {
; //
}
// check: mandatory ungrouped slot not set
outputAction1Any.clear();
outputAction1Any.put("group2-1", "docsBucket");
try {
final WorkflowDefinition workflowDefinition = new WorkflowDefinition(workflowAny);
_defPersistence.addWorkflow(workflowDefinition);
fail("should not work");
} catch (final InvalidConfigException ex) {
; //
}
}
}