| package org.eclipse.smila.integration.worker.test; |
| |
| import java.util.Iterator; |
| |
| import org.eclipse.smila.bulkbuilder.BulkbuilderException; |
| import org.eclipse.smila.bulkbuilder.BulkbuilderService; |
| import org.eclipse.smila.datamodel.Any; |
| import org.eclipse.smila.datamodel.Any.ValueType; |
| import org.eclipse.smila.datamodel.AnyMap; |
| import org.eclipse.smila.datamodel.DataFactory; |
| import org.eclipse.smila.datamodel.Record; |
| import org.eclipse.smila.datamodel.Value; |
| import org.eclipse.smila.datamodel.ipc.IpcRecordReader; |
| import org.eclipse.smila.datamodel.validation.InvalidRecordException; |
| import org.eclipse.smila.jobmanager.JobRunDataProvider; |
| import org.eclipse.smila.jobmanager.JobRunEngine; |
| import org.eclipse.smila.jobmanager.JobState; |
| import org.eclipse.smila.jobmanager.definitions.DefinitionPersistence; |
| import org.eclipse.smila.jobmanager.definitions.JobDefinition; |
| import org.eclipse.smila.jobmanager.definitions.JobManagerConstants; |
| import org.eclipse.smila.objectstore.ObjectStoreException; |
| import org.eclipse.smila.objectstore.ObjectStoreService; |
| import org.eclipse.smila.objectstore.StoreObject; |
| import org.eclipse.smila.test.DeclarativeServiceTestCase; |
| |
| /** |
| * Base class for the worker tests. |
| */ |
| public class WorkerTestbase extends DeclarativeServiceTestCase { |
| |
| protected static final String STORE_NAME = "tempstore"; |
| |
| /** 1 second in milliseconds. */ |
| protected static final int ONE_SECOND = 1000; |
| |
| /** Max wait time for job. */ |
| protected static final int MAX_WAIT_TIME = 120000; |
| |
| /** 1200 characters for the content attribute. */ |
| protected static final String CONTENT = |
| "Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut " |
| + "laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud exerci tation " |
| + "ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum iriure dolor " |
| + "in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at " |
| + "vero et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te " |
| + "feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh " |
| + "euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud " |
| + "exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum " |
| + "iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla " |
| + "facilisis at vero et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue " |
| + "duis dolore te feugait nulla facilisi. Nam liber tempor cum soluta nobis eleifend option congue nihil " |
| + "imperdiet doming id quod mazim placerat facer possim assum."; |
| |
| /** name of test worker that consumes the bulks of added records. */ |
| protected static final String WORKER_BULKCONSUMER = "testBulkConsumer"; |
| |
| /** name of test worker that consumes the bulks of deleted records. */ |
| protected static final String WORKER_DELETESCONSUMER = "testDeletesConsumer"; |
| |
| /** local IAS REST API host name and port. */ |
| protected static final String HOST_AND_PORT = "localhost:8050"; |
| |
| /** Helper for transforming streams and byte arrays. */ |
| protected static final IpcRecordReader RECORDREADER = new IpcRecordReader(); |
| |
| /** jobmanager used for tests. */ |
| protected JobRunEngine _jobRunEngine; |
| |
| /** jobmanager used for tests. */ |
| protected JobRunDataProvider _jobRunDataProvider; |
| |
| /** objectstore service for tests. */ |
| protected ObjectStoreService _objectStore; |
| |
| /** definition persistence reference. */ |
| protected DefinitionPersistence _defPersistence; |
| |
| protected BulkbuilderService _bulkbuilder; |
| |
| @Override |
| protected void setUp() throws Exception { |
| super.setUp(); |
| _objectStore = getService(ObjectStoreService.class); |
| assertNotNull(_objectStore); |
| _objectStore.removeStore(STORE_NAME); |
| _objectStore.ensureStore(STORE_NAME); |
| _jobRunEngine = getService(JobRunEngine.class); |
| assertNotNull(_jobRunEngine); |
| _jobRunDataProvider = getService(JobRunDataProvider.class); |
| assertNotNull(_jobRunDataProvider); |
| _defPersistence = getService(DefinitionPersistence.class); |
| assertNotNull(_defPersistence); |
| _bulkbuilder = getService(BulkbuilderService.class); |
| assertNotNull(_bulkbuilder); |
| } |
| |
| /** |
| * create and start a job. |
| * |
| * @param jobName |
| * name of job |
| * @param workflowName |
| * name of job workflow |
| * @param store |
| * tempStore to use for bulks. |
| * @return job run id |
| * @throws Exception |
| * job creation or start failed. |
| */ |
| public String startJob(final String jobName, final String workflowName, final String store) throws Exception { |
| createJob(jobName, workflowName, store); |
| return _jobRunEngine.startJob(jobName); |
| } |
| |
| /** |
| * @param jobName |
| * name of job to finish |
| * @param jobRunId |
| * id of job run to finish |
| */ |
| public void finishJob(final String jobName, final String jobRunId) throws Exception { |
| _jobRunEngine.finishJob(jobName, jobRunId); |
| } |
| |
| /** |
| * Waits for a job to finish its workflow runs. |
| * |
| * @param jobName |
| * the name of the job |
| * @param jobId |
| * the id of the job run |
| * @param maxWaitTime |
| * max wait time in milliseconds, if the waiting exceeds this wait time an assertion will fail. |
| * @throws Exception |
| * an exception occurred while trying to get job run data |
| */ |
| protected void waitForJobRunFinished(final String jobName, final String jobId, final long maxWaitTime) |
| throws Exception { |
| final long sleepTime = 500L; |
| final long millisStarted = System.currentTimeMillis(); |
| while (true) { |
| try { |
| final AnyMap jobRunData = _jobRunDataProvider.getJobRunData(jobName, jobId); |
| if (jobRunData != null) { |
| final JobState jobRunState = |
| JobState.valueOf(jobRunData.getStringValue(JobManagerConstants.DATA_JOB_STATE)); |
| if (jobRunState == JobState.SUCCEEDED || jobRunState == JobState.FAILED |
| || jobRunState == JobState.CANCELED) { |
| final AnyMap workflowRunData = jobRunData.getMap(JobManagerConstants.WORKFLOW_RUN_COUNTER); |
| final Any activeRunsAny = workflowRunData.get(JobManagerConstants.DATA_JOB_NO_OF_ACTIVE_WORKFLOW_RUNS); |
| if (activeRunsAny != null) { |
| assertEquals("wrong type of " + JobManagerConstants.DATA_JOB_NO_OF_ACTIVE_WORKFLOW_RUNS, |
| ValueType.LONG, activeRunsAny.getValueType()); |
| final int activeRuns = ((Value) activeRunsAny).asLong().intValue(); |
| assertEquals(0, activeRuns); |
| } |
| return; |
| } |
| } |
| assertTrue("Waited too long for job to finish. Latest job run data: " + jobRunData, |
| System.currentTimeMillis() - millisStarted <= maxWaitTime); |
| Thread.sleep(sleepTime); |
| } catch (final Exception e) { |
| e.printStackTrace(); |
| throw e; |
| } |
| } |
| } |
| |
| /** |
| * assert tasks counter sum in job run data. |
| */ |
| protected void assertTaskCounters(final AnyMap jobRunData) { |
| final AnyMap taskData = jobRunData.getMap(JobManagerConstants.TASK_COUNTER); |
| final int createdTasks = taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_CREATED_TASKS).intValue(); |
| final int succeededTasks = |
| taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_SUCCESSFUL_TASKS).intValue(); |
| final int retriedByWorker = |
| taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_RETRIED_TASKS_WORKER).intValue(); |
| final int retriedByTtl = taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_RETRIED_TASKS_TTL).intValue(); |
| final int failedAfterRetry = |
| taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_FAILED_TASKS_RETRIED).intValue(); |
| final int failedByWorker = |
| taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_FAILED_TASKS_NOT_RETRIED).intValue(); |
| final int cancelledTasks = taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_CANCELLED_TASKS).intValue(); |
| final int obsoleteTasks = taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_OBSOLETE_TASKS).intValue(); |
| assertEquals(createdTasks, succeededTasks + retriedByWorker + retriedByTtl + failedByWorker + failedAfterRetry |
| + cancelledTasks + obsoleteTasks); |
| } |
| |
| /** |
| * create job. |
| * |
| * @param jobName |
| * name of job |
| * @param workflowName |
| * name of job workflow |
| * @param store |
| * tempStore to use for bulks. |
| * @throws Exception |
| * job creation failed. |
| */ |
| protected void createJob(final String jobName, final String workflowName, final String store) throws Exception { |
| final AnyMap jobDef = DataFactory.DEFAULT.createAnyMap(); |
| jobDef.put("name", DataFactory.DEFAULT.createStringValue(jobName)); |
| jobDef.put("workflow", DataFactory.DEFAULT.createStringValue(workflowName)); |
| final AnyMap parametersAny = DataFactory.DEFAULT.createAnyMap(); |
| parametersAny.put("tempStore", DataFactory.DEFAULT.createStringValue(store)); |
| parametersAny.put("store", DataFactory.DEFAULT.createStringValue(store)); |
| jobDef.put("parameters", parametersAny); |
| _defPersistence.removeJob(jobName); |
| _defPersistence.addJob(new JobDefinition(jobDef)); |
| } |
| |
| /** |
| * @param id |
| * record id |
| * @return a record for testing. |
| */ |
| protected static Record createRecord(final int id) { |
| final Record record = DataFactory.DEFAULT.createRecord(Integer.toString(id)); |
| final Any content = DataFactory.DEFAULT.createStringValue(CONTENT); |
| record.getMetadata().put("content", content); |
| return record; |
| } |
| |
| /** |
| * @param id |
| * record id |
| * @return a record for testing. |
| */ |
| protected static Record createRecord(final String id) { |
| final Record record = DataFactory.DEFAULT.createRecord(id); |
| final Any content = DataFactory.DEFAULT.createStringValue(CONTENT); |
| record.getMetadata().put("content", content); |
| return record; |
| } |
| |
| /** |
| * gets the number of DOS logs for the specified level and jobId. |
| * |
| * @param level |
| * the level (infos, warnings, errors) |
| * @param jobName |
| * the jobName |
| * @param jobRunId |
| * the job's run id |
| * @return the number of objects in the DOS log |
| */ |
| protected int getLogCount(final String level, final String jobName, final String jobRunId) |
| throws ObjectStoreException { |
| final String jobIdPath = jobRunId.replace("-", "/"); |
| final String path = level + "/" + jobName + "/" + jobIdPath; |
| int count = 0; |
| final Iterator<StoreObject> iter = _objectStore.getStoreObjectInfos("jobmanager", path).iterator(); |
| while (iter.hasNext()) { |
| ++count; |
| iter.next(); |
| } |
| return count; |
| } |
| |
| protected void addRecord(final String jobName, final Record record) throws BulkbuilderException, |
| InvalidRecordException { |
| _bulkbuilder.addRecord(jobName, record); |
| } |
| |
| protected void commitJobBulk(final String jobName) throws BulkbuilderException { |
| _bulkbuilder.commitJob(jobName); |
| } |
| |
| } |