blob: 609b24052083f11d94956e99ef82f971f567dfe8 [file] [log] [blame]
package org.eclipse.smila.integration.worker.test;
import java.util.Iterator;
import org.eclipse.smila.bulkbuilder.BulkbuilderException;
import org.eclipse.smila.bulkbuilder.BulkbuilderService;
import org.eclipse.smila.datamodel.Any;
import org.eclipse.smila.datamodel.Any.ValueType;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.DataFactory;
import org.eclipse.smila.datamodel.Record;
import org.eclipse.smila.datamodel.Value;
import org.eclipse.smila.datamodel.ipc.IpcRecordReader;
import org.eclipse.smila.datamodel.validation.InvalidRecordException;
import org.eclipse.smila.jobmanager.JobRunDataProvider;
import org.eclipse.smila.jobmanager.JobRunEngine;
import org.eclipse.smila.jobmanager.JobState;
import org.eclipse.smila.jobmanager.definitions.DefinitionPersistence;
import org.eclipse.smila.jobmanager.definitions.JobDefinition;
import org.eclipse.smila.jobmanager.definitions.JobManagerConstants;
import org.eclipse.smila.objectstore.ObjectStoreException;
import org.eclipse.smila.objectstore.ObjectStoreService;
import org.eclipse.smila.objectstore.StoreObject;
import org.eclipse.smila.test.DeclarativeServiceTestCase;
/**
* Base class for the worker tests.
*/
public class WorkerTestbase extends DeclarativeServiceTestCase {
protected static final String STORE_NAME = "tempstore";
/** 1 second in milliseconds. */
protected static final int ONE_SECOND = 1000;
/** Max wait time for job. */
protected static final int MAX_WAIT_TIME = 120000;
/** 1200 characters for the content attribute. */
protected static final String CONTENT =
"Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut "
+ "laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud exerci tation "
+ "ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum iriure dolor "
+ "in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at "
+ "vero et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te "
+ "feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh "
+ "euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud "
+ "exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum "
+ "iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla "
+ "facilisis at vero et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue "
+ "duis dolore te feugait nulla facilisi. Nam liber tempor cum soluta nobis eleifend option congue nihil "
+ "imperdiet doming id quod mazim placerat facer possim assum.";
/** name of test worker that consumes the bulks of added records. */
protected static final String WORKER_BULKCONSUMER = "testBulkConsumer";
/** name of test worker that consumes the bulks of deleted records. */
protected static final String WORKER_DELETESCONSUMER = "testDeletesConsumer";
/** local IAS REST API host name and port. */
protected static final String HOST_AND_PORT = "localhost:8050";
/** Helper for transforming streams and byte arrays. */
protected static final IpcRecordReader RECORDREADER = new IpcRecordReader();
/** jobmanager used for tests. */
protected JobRunEngine _jobRunEngine;
/** jobmanager used for tests. */
protected JobRunDataProvider _jobRunDataProvider;
/** objectstore service for tests. */
protected ObjectStoreService _objectStore;
/** definition persistence reference. */
protected DefinitionPersistence _defPersistence;
protected BulkbuilderService _bulkbuilder;
@Override
protected void setUp() throws Exception {
super.setUp();
_objectStore = getService(ObjectStoreService.class);
assertNotNull(_objectStore);
_objectStore.removeStore(STORE_NAME);
_objectStore.ensureStore(STORE_NAME);
_jobRunEngine = getService(JobRunEngine.class);
assertNotNull(_jobRunEngine);
_jobRunDataProvider = getService(JobRunDataProvider.class);
assertNotNull(_jobRunDataProvider);
_defPersistence = getService(DefinitionPersistence.class);
assertNotNull(_defPersistence);
_bulkbuilder = getService(BulkbuilderService.class);
assertNotNull(_bulkbuilder);
}
/**
* create and start a job.
*
* @param jobName
* name of job
* @param workflowName
* name of job workflow
* @param store
* tempStore to use for bulks.
* @return job run id
* @throws Exception
* job creation or start failed.
*/
public String startJob(final String jobName, final String workflowName, final String store) throws Exception {
createJob(jobName, workflowName, store);
return _jobRunEngine.startJob(jobName);
}
/**
* @param jobName
* name of job to finish
* @param jobRunId
* id of job run to finish
*/
public void finishJob(final String jobName, final String jobRunId) throws Exception {
_jobRunEngine.finishJob(jobName, jobRunId);
}
/**
* Waits for a job to finish its workflow runs.
*
* @param jobName
* the name of the job
* @param jobId
* the id of the job run
* @param maxWaitTime
* max wait time in milliseconds, if the waiting exceeds this wait time an assertion will fail.
* @throws Exception
* an exception occurred while trying to get job run data
*/
protected void waitForJobRunFinished(final String jobName, final String jobId, final long maxWaitTime)
throws Exception {
final long sleepTime = 500L;
final long millisStarted = System.currentTimeMillis();
while (true) {
try {
final AnyMap jobRunData = _jobRunDataProvider.getJobRunData(jobName, jobId);
if (jobRunData != null) {
final JobState jobRunState =
JobState.valueOf(jobRunData.getStringValue(JobManagerConstants.DATA_JOB_STATE));
if (jobRunState == JobState.SUCCEEDED || jobRunState == JobState.FAILED
|| jobRunState == JobState.CANCELED) {
final AnyMap workflowRunData = jobRunData.getMap(JobManagerConstants.WORKFLOW_RUN_COUNTER);
final Any activeRunsAny = workflowRunData.get(JobManagerConstants.DATA_JOB_NO_OF_ACTIVE_WORKFLOW_RUNS);
if (activeRunsAny != null) {
assertEquals("wrong type of " + JobManagerConstants.DATA_JOB_NO_OF_ACTIVE_WORKFLOW_RUNS,
ValueType.LONG, activeRunsAny.getValueType());
final int activeRuns = ((Value) activeRunsAny).asLong().intValue();
assertEquals(0, activeRuns);
}
return;
}
}
assertTrue("Waited too long for job to finish. Latest job run data: " + jobRunData,
System.currentTimeMillis() - millisStarted <= maxWaitTime);
Thread.sleep(sleepTime);
} catch (final Exception e) {
e.printStackTrace();
throw e;
}
}
}
/**
* assert tasks counter sum in job run data.
*/
protected void assertTaskCounters(final AnyMap jobRunData) {
final AnyMap taskData = jobRunData.getMap(JobManagerConstants.TASK_COUNTER);
final int createdTasks = taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_CREATED_TASKS).intValue();
final int succeededTasks =
taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_SUCCESSFUL_TASKS).intValue();
final int retriedByWorker =
taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_RETRIED_TASKS_WORKER).intValue();
final int retriedByTtl = taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_RETRIED_TASKS_TTL).intValue();
final int failedAfterRetry =
taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_FAILED_TASKS_RETRIED).intValue();
final int failedByWorker =
taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_FAILED_TASKS_NOT_RETRIED).intValue();
final int cancelledTasks = taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_CANCELLED_TASKS).intValue();
final int obsoleteTasks = taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_OBSOLETE_TASKS).intValue();
assertEquals(createdTasks, succeededTasks + retriedByWorker + retriedByTtl + failedByWorker + failedAfterRetry
+ cancelledTasks + obsoleteTasks);
}
/**
* create job.
*
* @param jobName
* name of job
* @param workflowName
* name of job workflow
* @param store
* tempStore to use for bulks.
* @throws Exception
* job creation failed.
*/
protected void createJob(final String jobName, final String workflowName, final String store) throws Exception {
final AnyMap jobDef = DataFactory.DEFAULT.createAnyMap();
jobDef.put("name", DataFactory.DEFAULT.createStringValue(jobName));
jobDef.put("workflow", DataFactory.DEFAULT.createStringValue(workflowName));
final AnyMap parametersAny = DataFactory.DEFAULT.createAnyMap();
parametersAny.put("tempStore", DataFactory.DEFAULT.createStringValue(store));
parametersAny.put("store", DataFactory.DEFAULT.createStringValue(store));
jobDef.put("parameters", parametersAny);
_defPersistence.removeJob(jobName);
_defPersistence.addJob(new JobDefinition(jobDef));
}
/**
* @param id
* record id
* @return a record for testing.
*/
protected static Record createRecord(final int id) {
final Record record = DataFactory.DEFAULT.createRecord(Integer.toString(id));
final Any content = DataFactory.DEFAULT.createStringValue(CONTENT);
record.getMetadata().put("content", content);
return record;
}
/**
* @param id
* record id
* @return a record for testing.
*/
protected static Record createRecord(final String id) {
final Record record = DataFactory.DEFAULT.createRecord(id);
final Any content = DataFactory.DEFAULT.createStringValue(CONTENT);
record.getMetadata().put("content", content);
return record;
}
/**
* gets the number of DOS logs for the specified level and jobId.
*
* @param level
* the level (infos, warnings, errors)
* @param jobName
* the jobName
* @param jobRunId
* the job's run id
* @return the number of objects in the DOS log
*/
protected int getLogCount(final String level, final String jobName, final String jobRunId)
throws ObjectStoreException {
final String jobIdPath = jobRunId.replace("-", "/");
final String path = level + "/" + jobName + "/" + jobIdPath;
int count = 0;
final Iterator<StoreObject> iter = _objectStore.getStoreObjectInfos("jobmanager", path).iterator();
while (iter.hasNext()) {
++count;
iter.next();
}
return count;
}
protected void addRecord(final String jobName, final Record record) throws BulkbuilderException,
InvalidRecordException {
_bulkbuilder.addRecord(jobName, record);
}
protected void commitJobBulk(final String jobName) throws BulkbuilderException {
_bulkbuilder.commitJob(jobName);
}
}