blob: 3d815278485acd5e48943e1ff18df6a8a3ad3bd6 [file] [log] [blame]
/**********************************************************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Juergen Schumacher, Andreas Weber, Drazen Cindric, Andreas Schank (all Attensity Europe GmbH) - initial
* implementation
**********************************************************************************************************************/
package org.eclipse.smila.taskworker.test;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.io.IOUtils;
import org.eclipse.smila.datamodel.DataFactory;
import org.eclipse.smila.datamodel.Record;
import org.eclipse.smila.datamodel.ipc.IpcSerializationUtils;
import org.eclipse.smila.objectstore.ObjectStoreService;
import org.eclipse.smila.taskmanager.BulkInfo;
import org.eclipse.smila.taskmanager.Task;
import org.eclipse.smila.taskworker.DefaultTaskLogFactory;
import org.eclipse.smila.taskworker.TaskContext;
import org.eclipse.smila.taskworker.input.RecordInput;
import org.eclipse.smila.taskworker.internal.TaskContextImpl;
import org.eclipse.smila.taskworker.output.AppendableOutput;
import org.eclipse.smila.taskworker.output.RecordOutput;
import org.eclipse.smila.test.DeclarativeServiceTestCase;
import org.eclipse.smila.utils.collections.MultiValueMap;
/** unit tests for task context, mostly counter aggrgation tests. */
public class TestTaskContext extends DeclarativeServiceTestCase {
/** Helper for transforming streams and byte arrays. */
protected static final IpcSerializationUtils IPCUTILS = new IpcSerializationUtils();
protected static final String STORE_NAME = "data";
private ObjectStoreService _service;
private TaskContext _taskContext;
@Override
protected void setUp() throws Exception {
super.setUp();
_service = getService(ObjectStoreService.class);
_service.removeStore(STORE_NAME);
_service.createStore(STORE_NAME, null);
createTaskContext();
}
public void testCountersNothingDone() throws Exception {
assertNotNull(_taskContext);
_taskContext.getInputs().closeAll();
_taskContext.getOutputs().commitAll();
final Map<String, Number> counters = _taskContext.getFinalCounters();
assertEquals(0, counters.get("warnCount").longValue());
assertFalse(counters.containsKey("duration.iodata"));
assertFalse(counters.containsKey("duration.iodata.close"));
assertFalse(counters.containsKey("duration.iodata.open"));
assertFalse(counters.containsKey("duration.perform.input"));
assertFalse(counters.containsKey("duration.perform.output"));
}
public void testCounterWarnCount() throws Exception {
assertNotNull(_taskContext);
_taskContext.getLog().warn("w1");
_taskContext.getLog().warn("w2");
_taskContext.getLog().warn("w3");
_taskContext.getLog().info("i1"); // not counted
_taskContext.getLog().error("e1"); // not counted
_taskContext.getInputs().closeAll();
_taskContext.getOutputs().commitAll();
final Map<String, Number> counters = _taskContext.getFinalCounters();
assertEquals(3, counters.get("warnCount").longValue());
assertFalse(counters.containsKey("duration.iodata"));
assertFalse(counters.containsKey("duration.iodata.close"));
assertFalse(counters.containsKey("duration.iodata.open"));
assertFalse(counters.containsKey("duration.perform.input"));
assertFalse(counters.containsKey("duration.perform.output"));
}
public void testCountersReadWriteStream() throws Exception {
assertNotNull(_taskContext);
_service.putObject(STORE_NAME, "input", "test".getBytes());
final InputStream input = _taskContext.getInputs().getAsStreamInput("input").getStream();
final OutputStream output = _taskContext.getOutputs().getAsStreamOutput("output").getStream();
final byte[] value = IOUtils.toByteArray(input);
output.write(value);
output.write(value);
_taskContext.getInputs().closeAll();
_taskContext.getOutputs().commitAll();
final Map<String, Number> counters = _taskContext.getFinalCounters();
assertEquals(0, counters.get("warnCount").longValue());
assertTrue(counters.containsKey("duration.iodata"));
assertTrue(counters.containsKey("duration.iodata.close"));
assertTrue(counters.containsKey("duration.iodata.open"));
assertFalse(counters.containsKey("duration.perform.input"));
assertFalse(counters.containsKey("duration.perform.input.input"));
assertFalse(counters.containsKey("duration.perform.output"));
assertFalse(counters.containsKey("duration.perform.output.output"));
assertEquals(1, counters.get("input.input.dataObjectCount").longValue());
assertEquals(4, counters.get("input.input.size").longValue());
assertEquals(1, counters.get("output.output.dataObjectCount").longValue());
assertEquals(8, counters.get("output.output.size").longValue());
}
public void testCountersReadWriteRecords() throws Exception {
assertNotNull(_taskContext);
_service.putObject(STORE_NAME, "input", createRecordBon("testCountersReadWriteRecords"));
final RecordInput input = _taskContext.getInputs().getAsRecordInput("input");
try {
final RecordOutput output = _taskContext.getOutputs().getAsRecordOutput("output");
final Record r = input.getRecord();
output.writeRecord(r);
output.writeRecord(r);
_taskContext.getInputs().closeAll();
_taskContext.getOutputs().commitAll();
final Map<String, Number> counters = _taskContext.getFinalCounters();
assertEquals(0, counters.get("warnCount").longValue());
assertTrue(counters.containsKey("duration.iodata"));
assertTrue(counters.containsKey("duration.iodata.close"));
assertTrue(counters.containsKey("duration.iodata.open"));
assertTrue(counters.get("duration.perform.input").doubleValue() > 0);
assertTrue(counters.get("duration.perform.input.input").doubleValue() > 0);
assertTrue(counters.get("duration.perform.output").doubleValue() > 0);
assertTrue(counters.get("duration.perform.output.output").doubleValue() > 0);
assertEquals(1, counters.get("input.input.dataObjectCount").longValue());
assertEquals(1, counters.get("input.input.recordCount").longValue());
assertTrue(counters.get("input.input.size").longValue() > 0);
assertEquals(1, counters.get("output.output.dataObjectCount").longValue());
assertEquals(2, counters.get("output.output.recordCount").longValue());
assertTrue(counters.get("output.output.size").longValue() > 0);
} finally {
input.close();
}
}
public void testCountersMultipleReadWriteRecords() throws Exception {
assertNotNull(_taskContext);
_service.putObject(STORE_NAME, "input", createRecordBon("testCountersReadWriteRecords"));
_service.putObject(STORE_NAME, "input.ext", createRecordBon("testCountersReadWriteRecords"));
final RecordInput input = _taskContext.getInputs().getAsRecordInput("input");
final RecordInput inputExt = _taskContext.getInputs().getAsRecordInput("input", 1);
try {
final RecordOutput output = _taskContext.getOutputs().getAsRecordOutput("output");
final RecordOutput outputExt = _taskContext.getOutputs().getAsRecordOutput("output", 1);
final Record r = input.getRecord();
output.writeRecord(r);
output.writeRecord(r);
final Record rExt = inputExt.getRecord();
outputExt.writeRecord(rExt);
outputExt.writeRecord(rExt);
_taskContext.getInputs().closeAll();
_taskContext.getOutputs().commitAll();
final Map<String, Number> counters = _taskContext.getFinalCounters();
assertEquals(0, counters.get("warnCount").longValue());
assertTrue(counters.containsKey("duration.iodata"));
assertTrue(counters.containsKey("duration.iodata.close"));
assertTrue(counters.containsKey("duration.iodata.open"));
assertTrue(counters.get("duration.perform.input").doubleValue() > 0);
assertTrue(counters.get("duration.perform.input.input").doubleValue() > 0);
assertTrue(counters.get("duration.perform.output").doubleValue() > 0);
assertTrue(counters.get("duration.perform.output.output").doubleValue() > 0);
assertEquals(2, counters.get("input.input.dataObjectCount").longValue());
assertEquals(2, counters.get("input.input.recordCount").longValue());
assertTrue(counters.get("input.input.size").longValue() > 0);
assertEquals(2, counters.get("output.output.dataObjectCount").longValue());
assertEquals(4, counters.get("output.output.recordCount").longValue());
assertTrue(counters.get("output.output.size").longValue() > 0);
} finally {
input.close();
inputExt.close();
}
}
public void testCountersWriteAppendable() throws Exception {
assertNotNull(_taskContext);
final AppendableOutput output = _taskContext.getOutputs().getAsOutput("outputApp", AppendableOutput.class);
output.append("value1".getBytes());
output.append("value2".getBytes());
output.append("value3".getBytes());
_taskContext.getInputs().closeAll();
_taskContext.getOutputs().commitAll();
final Map<String, Number> counters = _taskContext.getFinalCounters();
assertEquals(0, counters.get("warnCount").longValue());
// assertTrue(counters.containsKey("duration.iodata")); // TODO fails sometimes under Windows
// assertTrue(counters.containsKey("duration.iodata.close"));//TODO fails sometimes under Windows
// assertTrue(counters.containsKey("duration.iodata.open")); //TODO fails sometimes under Windows
assertNull(counters.get("duration.perform.input"));
assertTrue(counters.get("duration.perform.output").doubleValue() > 0);
assertTrue(counters.get("duration.perform.output.outputApp").doubleValue() > 0);
assertEquals(1, counters.get("output.outputApp.dataObjectCount").longValue());
assertNull(counters.get("output.outputApp.recordCount"));
assertEquals(18, counters.get("output.outputApp.size").longValue());
}
public void testCountersWriteAppendableRecord() throws Exception {
assertNotNull(_taskContext);
final AppendableOutput output = _taskContext.getOutputs().getAsOutput("outputApp", AppendableOutput.class);
output.appendRecord(createRecord("test1"));
output.appendRecord(createRecord("test2"));
output.appendRecord(createRecord("test3"));
_taskContext.getInputs().closeAll();
_taskContext.getOutputs().commitAll();
final Map<String, Number> counters = _taskContext.getFinalCounters();
assertEquals(0, counters.get("warnCount").longValue());
// assertTrue(counters.containsKey("duration.iodata")); // TODO fails sometimes under Windows
// assertTrue(counters.containsKey("duration.iodata.close"));//TODO fails sometimes under Windows
// assertTrue(counters.containsKey("duration.iodata.open"));// TODO fails sometimes under Windows
assertNull(counters.get("duration.perform.input"));
assertTrue(counters.get("duration.perform.output").doubleValue() > 0);
assertTrue(counters.get("duration.perform.output.outputApp").doubleValue() > 0);
assertEquals(1, counters.get("output.outputApp.dataObjectCount").longValue());
assertEquals(3, counters.get("output.outputApp.recordCount").longValue());
assertTrue(counters.get("output.outputApp.size").longValue() > 0);
}
public void testWorkerCounters() throws Exception {
assertNotNull(_taskContext);
_taskContext.addToCounter("long", 13);
_taskContext.addToCounter("long", 29);
_taskContext.addToCounter("double", Math.PI);
_taskContext.addToCounter("double", Math.E);
_taskContext.getInputs().closeAll();
_taskContext.getOutputs().commitAll();
final Map<String, Number> counters = _taskContext.getFinalCounters();
assertTrue(counters.get("function.long") instanceof Long);
assertEquals(42, counters.get("function.long").longValue());
assertTrue(counters.get("function.double") instanceof Double);
assertEquals(Math.PI + Math.E, counters.get("function.double").doubleValue());
}
public void testWorkerDurations() throws Exception {
assertNotNull(_taskContext);
final long startTime = _taskContext.getTimestamp();
_taskContext.addDuration("long", (long) 13e9);
_taskContext.addDuration("long", (long) 29e9);
_taskContext.addDuration("double", Math.PI);
_taskContext.addDuration("double", Math.E);
Thread.sleep(100);
_taskContext.measureTime("measured", startTime);
_taskContext.getInputs().closeAll();
_taskContext.getOutputs().commitAll();
final Map<String, Number> counters = _taskContext.getFinalCounters();
assertTrue(counters.get("duration.perform.function.long") instanceof Double);
assertEquals(42.0, counters.get("duration.perform.function.long").doubleValue());
assertTrue(counters.get("duration.perform.function.double") instanceof Double);
assertEquals(Math.PI + Math.E, counters.get("duration.perform.function.double").doubleValue());
final Number measured = counters.get("duration.perform.function.measured");
assertTrue(measured instanceof Double);
assertTrue("measured duration should be bigger, was " + measured, measured.doubleValue() > 0.08);
}
private void createTaskContext() {
final String jobName = "testTaskContext";
final String jobRunId = UUID.randomUUID().toString();
final String taskId = UUID.randomUUID().toString();
final Task task = new Task(taskId, "testTaskContextWorker");
task.getProperties().put(Task.PROPERTY_JOB_NAME, jobName);
task.getProperties().put(Task.PROPERTY_JOB_RUN_ID, jobRunId);
final MultiValueMap<String, BulkInfo> inputObjects = new MultiValueMap<String, BulkInfo>();
inputObjects.add("input", new BulkInfo("x", "data", "input"));
inputObjects.add("input", new BulkInfo("x", "data", "input.ext"));
inputObjects.add("inputKvo", new BulkInfo("x", "data", "input.kvo"));
task.getInputBulks().putAll(inputObjects);
final MultiValueMap<String, BulkInfo> outputObjects = new MultiValueMap<String, BulkInfo>();
outputObjects.add("output", new BulkInfo("x", "data", "output"));
outputObjects.add("output", new BulkInfo("x", "data", "output.ext"));
outputObjects.add("outputKvo", new BulkInfo("x", "data", "output.kvo"));
outputObjects.add("outputApp", new BulkInfo("x", "data", "output.app"));
task.getOutputBulks().putAll(outputObjects);
_taskContext = new TaskContextImpl(task, new DefaultTaskLogFactory().getTaskLog(task), _service);
}
/**
* @param title
* The title
* @return a record with the given title
* @throws IOException
*/
private Record createRecord(final String title) throws IOException {
final Record r = DataFactory.DEFAULT.createRecord(title);
r.getMetadata().put("title", title);
return r;
}
/**
* @param title
* The title
* @return a record with the given title as byte array
* @throws IOException
*/
private byte[] createRecordBon(final String title) throws IOException {
return IPCUTILS.record2BinaryObject(createRecord(title));
}
}