/********************************************************************************************************************** | |
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the | |
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this | |
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: Juergen Schumacher, Andreas Weber, Drazen Cindric, Andreas Schank (all Attensity Europe GmbH) - initial | |
* implementation | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.taskworker.test; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.io.OutputStream; | |
import java.util.Map; | |
import java.util.UUID; | |
import org.apache.commons.io.IOUtils; | |
import org.eclipse.smila.datamodel.DataFactory; | |
import org.eclipse.smila.datamodel.Record; | |
import org.eclipse.smila.datamodel.ipc.IpcRecordWriter; | |
import org.eclipse.smila.objectstore.ObjectStoreService; | |
import org.eclipse.smila.taskmanager.BulkInfo; | |
import org.eclipse.smila.taskmanager.Task; | |
import org.eclipse.smila.taskworker.DefaultTaskLogFactory; | |
import org.eclipse.smila.taskworker.TaskContext; | |
import org.eclipse.smila.taskworker.input.RecordInput; | |
import org.eclipse.smila.taskworker.internal.TaskContextImpl; | |
import org.eclipse.smila.taskworker.output.AppendableOutput; | |
import org.eclipse.smila.taskworker.output.RecordOutput; | |
import org.eclipse.smila.test.DeclarativeServiceTestCase; | |
import org.eclipse.smila.utils.collections.MultiValueMap; | |
/** unit tests for task context, mostly counter aggrgation tests. */ | |
public class TestTaskContext extends DeclarativeServiceTestCase { | |
/** Helper for transforming record to streams and byte arrays. */ | |
protected static final IpcRecordWriter IPC_RECORD_WRITER = new IpcRecordWriter(); | |
protected static final String STORE_NAME = "data"; | |
private ObjectStoreService _service; | |
private TaskContext _taskContext; | |
@Override | |
protected void setUp() throws Exception { | |
super.setUp(); | |
_service = getService(ObjectStoreService.class); | |
_service.removeStore(STORE_NAME); | |
_service.createStore(STORE_NAME, null); | |
createTaskContext(); | |
} | |
public void testCountersNothingDone() throws Exception { | |
assertNotNull(_taskContext); | |
_taskContext.getInputs().closeAll(); | |
_taskContext.getOutputs().commitAll(); | |
final Map<String, Number> counters = _taskContext.getFinalCounters(); | |
assertEquals(0, counters.get("warnCount").longValue()); | |
assertFalse(counters.containsKey("duration.iodata")); | |
assertFalse(counters.containsKey("duration.iodata.close")); | |
assertFalse(counters.containsKey("duration.iodata.open")); | |
assertFalse(counters.containsKey("duration.perform.input")); | |
assertFalse(counters.containsKey("duration.perform.output")); | |
} | |
public void testCounterWarnCount() throws Exception { | |
assertNotNull(_taskContext); | |
_taskContext.getLog().warn("w1"); | |
_taskContext.getLog().warn("w2"); | |
_taskContext.getLog().warn("w3"); | |
_taskContext.getLog().info("i1"); // not counted | |
_taskContext.getLog().error("e1"); // not counted | |
_taskContext.getInputs().closeAll(); | |
_taskContext.getOutputs().commitAll(); | |
final Map<String, Number> counters = _taskContext.getFinalCounters(); | |
assertEquals(3, counters.get("warnCount").longValue()); | |
assertFalse(counters.containsKey("duration.iodata")); | |
assertFalse(counters.containsKey("duration.iodata.close")); | |
assertFalse(counters.containsKey("duration.iodata.open")); | |
assertFalse(counters.containsKey("duration.perform.input")); | |
assertFalse(counters.containsKey("duration.perform.output")); | |
} | |
public void testCountersReadWriteStream() throws Exception { | |
assertNotNull(_taskContext); | |
_service.putObject(STORE_NAME, "input", "test".getBytes()); | |
final InputStream input = _taskContext.getInputs().getAsStreamInput("input").getStream(); | |
final OutputStream output = _taskContext.getOutputs().getAsStreamOutput("output").getStream(); | |
final byte[] value = IOUtils.toByteArray(input); | |
output.write(value); | |
output.write(value); | |
_taskContext.getInputs().closeAll(); | |
_taskContext.getOutputs().commitAll(); | |
final Map<String, Number> counters = _taskContext.getFinalCounters(); | |
assertEquals(0, counters.get("warnCount").longValue()); | |
assertTrue(counters.containsKey("duration.iodata")); | |
assertTrue(counters.containsKey("duration.iodata.close")); | |
assertTrue(counters.containsKey("duration.iodata.open")); | |
assertFalse(counters.containsKey("duration.perform.input")); | |
assertFalse(counters.containsKey("duration.perform.input.input")); | |
assertFalse(counters.containsKey("duration.perform.output")); | |
assertFalse(counters.containsKey("duration.perform.output.output")); | |
assertEquals(1, counters.get("input.input.dataObjectCount").longValue()); | |
assertEquals(4, counters.get("input.input.size").longValue()); | |
assertEquals(1, counters.get("output.output.dataObjectCount").longValue()); | |
assertEquals(8, counters.get("output.output.size").longValue()); | |
} | |
public void testCountersReadWriteRecords() throws Exception { | |
assertNotNull(_taskContext); | |
_service.putObject(STORE_NAME, "input", createRecordBon("testCountersReadWriteRecords")); | |
final RecordInput input = _taskContext.getInputs().getAsRecordInput("input"); | |
try { | |
final RecordOutput output = _taskContext.getOutputs().getAsRecordOutput("output"); | |
final Record r = input.getRecord(); | |
output.writeRecord(r); | |
output.writeRecord(r); | |
_taskContext.getInputs().closeAll(); | |
_taskContext.getOutputs().commitAll(); | |
final Map<String, Number> counters = _taskContext.getFinalCounters(); | |
assertEquals(0, counters.get("warnCount").longValue()); | |
assertTrue(counters.containsKey("duration.iodata")); | |
assertTrue(counters.containsKey("duration.iodata.close")); | |
assertTrue(counters.containsKey("duration.iodata.open")); | |
assertTrue(counters.get("duration.perform.input").doubleValue() > 0); | |
assertTrue(counters.get("duration.perform.input.input").doubleValue() > 0); | |
assertTrue(counters.get("duration.perform.output").doubleValue() > 0); | |
assertTrue(counters.get("duration.perform.output.output").doubleValue() > 0); | |
assertEquals(1, counters.get("input.input.dataObjectCount").longValue()); | |
assertEquals(1, counters.get("input.input.recordCount").longValue()); | |
assertTrue(counters.get("input.input.size").longValue() > 0); | |
assertEquals(1, counters.get("output.output.dataObjectCount").longValue()); | |
assertEquals(2, counters.get("output.output.recordCount").longValue()); | |
assertTrue(counters.get("output.output.size").longValue() > 0); | |
} finally { | |
input.close(); | |
} | |
} | |
public void testCountersMultipleReadWriteRecords() throws Exception { | |
assertNotNull(_taskContext); | |
_service.putObject(STORE_NAME, "input", createRecordBon("testCountersReadWriteRecords")); | |
_service.putObject(STORE_NAME, "input.ext", createRecordBon("testCountersReadWriteRecords")); | |
final RecordInput input = _taskContext.getInputs().getAsRecordInput("input"); | |
final RecordInput inputExt = _taskContext.getInputs().getAsRecordInput("input", 1); | |
try { | |
final RecordOutput output = _taskContext.getOutputs().getAsRecordOutput("output"); | |
final RecordOutput outputExt = _taskContext.getOutputs().getAsRecordOutput("output", 1); | |
final Record r = input.getRecord(); | |
output.writeRecord(r); | |
output.writeRecord(r); | |
final Record rExt = inputExt.getRecord(); | |
outputExt.writeRecord(rExt); | |
outputExt.writeRecord(rExt); | |
_taskContext.getInputs().closeAll(); | |
_taskContext.getOutputs().commitAll(); | |
final Map<String, Number> counters = _taskContext.getFinalCounters(); | |
assertEquals(0, counters.get("warnCount").longValue()); | |
assertTrue(counters.containsKey("duration.iodata")); | |
assertTrue(counters.containsKey("duration.iodata.close")); | |
assertTrue(counters.containsKey("duration.iodata.open")); | |
assertTrue(counters.get("duration.perform.input").doubleValue() > 0); | |
assertTrue(counters.get("duration.perform.input.input").doubleValue() > 0); | |
assertTrue(counters.get("duration.perform.output").doubleValue() > 0); | |
assertTrue(counters.get("duration.perform.output.output").doubleValue() > 0); | |
assertEquals(2, counters.get("input.input.dataObjectCount").longValue()); | |
assertEquals(2, counters.get("input.input.recordCount").longValue()); | |
assertTrue(counters.get("input.input.size").longValue() > 0); | |
assertEquals(2, counters.get("output.output.dataObjectCount").longValue()); | |
assertEquals(4, counters.get("output.output.recordCount").longValue()); | |
assertTrue(counters.get("output.output.size").longValue() > 0); | |
} finally { | |
input.close(); | |
inputExt.close(); | |
} | |
} | |
public void testCountersWriteAppendable() throws Exception { | |
assertNotNull(_taskContext); | |
final AppendableOutput output = _taskContext.getOutputs().getAsOutput("outputApp", AppendableOutput.class); | |
output.append("value1".getBytes()); | |
output.append("value2".getBytes()); | |
output.append("value3".getBytes()); | |
_taskContext.getInputs().closeAll(); | |
_taskContext.getOutputs().commitAll(); | |
final Map<String, Number> counters = _taskContext.getFinalCounters(); | |
assertEquals(0, counters.get("warnCount").longValue()); | |
// assertTrue(counters.containsKey("duration.iodata")); // TODO fails sometimes under Windows | |
// assertTrue(counters.containsKey("duration.iodata.close"));//TODO fails sometimes under Windows | |
// assertTrue(counters.containsKey("duration.iodata.open")); //TODO fails sometimes under Windows | |
assertNull(counters.get("duration.perform.input")); | |
assertTrue(counters.get("duration.perform.output").doubleValue() > 0); | |
assertTrue(counters.get("duration.perform.output.outputApp").doubleValue() > 0); | |
assertEquals(1, counters.get("output.outputApp.dataObjectCount").longValue()); | |
assertNull(counters.get("output.outputApp.recordCount")); | |
assertEquals(18, counters.get("output.outputApp.size").longValue()); | |
} | |
public void testCountersWriteAppendableRecord() throws Exception { | |
assertNotNull(_taskContext); | |
final AppendableOutput output = _taskContext.getOutputs().getAsOutput("outputApp", AppendableOutput.class); | |
output.appendRecord(createRecord("test1")); | |
output.appendRecord(createRecord("test2")); | |
output.appendRecord(createRecord("test3")); | |
_taskContext.getInputs().closeAll(); | |
_taskContext.getOutputs().commitAll(); | |
final Map<String, Number> counters = _taskContext.getFinalCounters(); | |
assertEquals(0, counters.get("warnCount").longValue()); | |
// assertTrue(counters.containsKey("duration.iodata")); // TODO fails sometimes under Windows | |
// assertTrue(counters.containsKey("duration.iodata.close"));//TODO fails sometimes under Windows | |
// assertTrue(counters.containsKey("duration.iodata.open"));// TODO fails sometimes under Windows | |
assertNull(counters.get("duration.perform.input")); | |
assertTrue(counters.get("duration.perform.output").doubleValue() > 0); | |
assertTrue(counters.get("duration.perform.output.outputApp").doubleValue() > 0); | |
assertEquals(1, counters.get("output.outputApp.dataObjectCount").longValue()); | |
assertEquals(3, counters.get("output.outputApp.recordCount").longValue()); | |
assertTrue(counters.get("output.outputApp.size").longValue() > 0); | |
} | |
public void testWorkerCounters() throws Exception { | |
assertNotNull(_taskContext); | |
_taskContext.addToCounter("long", 13); | |
_taskContext.addToCounter("long", 29); | |
_taskContext.addToCounter("double", Math.PI); | |
_taskContext.addToCounter("double", Math.E); | |
_taskContext.getInputs().closeAll(); | |
_taskContext.getOutputs().commitAll(); | |
final Map<String, Number> counters = _taskContext.getFinalCounters(); | |
assertTrue(counters.get("function.long") instanceof Long); | |
assertEquals(42, counters.get("function.long").longValue()); | |
assertTrue(counters.get("function.double") instanceof Double); | |
assertEquals(Math.PI + Math.E, counters.get("function.double").doubleValue()); | |
} | |
public void testWorkerDurations() throws Exception { | |
assertNotNull(_taskContext); | |
final long startTime = _taskContext.getTimestamp(); | |
_taskContext.addDuration("long", (long) 13e9); | |
_taskContext.addDuration("long", (long) 29e9); | |
_taskContext.addDuration("double", Math.PI); | |
_taskContext.addDuration("double", Math.E); | |
Thread.sleep(100); | |
_taskContext.measureTime("measured", startTime); | |
_taskContext.getInputs().closeAll(); | |
_taskContext.getOutputs().commitAll(); | |
final Map<String, Number> counters = _taskContext.getFinalCounters(); | |
assertTrue(counters.get("duration.perform.function.long") instanceof Double); | |
assertEquals(42.0, counters.get("duration.perform.function.long").doubleValue()); | |
assertTrue(counters.get("duration.perform.function.double") instanceof Double); | |
assertEquals(Math.PI + Math.E, counters.get("duration.perform.function.double").doubleValue()); | |
final Number measured = counters.get("duration.perform.function.measured"); | |
assertTrue(measured instanceof Double); | |
assertTrue("measured duration should be bigger, was " + measured, measured.doubleValue() > 0.08); | |
} | |
private void createTaskContext() { | |
final String jobName = "testTaskContext"; | |
final String jobRunId = UUID.randomUUID().toString(); | |
final String taskId = UUID.randomUUID().toString(); | |
final Task task = new Task(taskId, "testTaskContextWorker"); | |
task.getProperties().put(Task.PROPERTY_JOB_NAME, jobName); | |
task.getProperties().put(Task.PROPERTY_JOB_RUN_ID, jobRunId); | |
final MultiValueMap<String, BulkInfo> inputObjects = new MultiValueMap<String, BulkInfo>(); | |
inputObjects.add("input", new BulkInfo("x", "data", "input")); | |
inputObjects.add("input", new BulkInfo("x", "data", "input.ext")); | |
inputObjects.add("inputKvo", new BulkInfo("x", "data", "input.kvo")); | |
task.getInputBulks().putAll(inputObjects); | |
final MultiValueMap<String, BulkInfo> outputObjects = new MultiValueMap<String, BulkInfo>(); | |
outputObjects.add("output", new BulkInfo("x", "data", "output")); | |
outputObjects.add("output", new BulkInfo("x", "data", "output.ext")); | |
outputObjects.add("outputKvo", new BulkInfo("x", "data", "output.kvo")); | |
outputObjects.add("outputApp", new BulkInfo("x", "data", "output.app")); | |
task.getOutputBulks().putAll(outputObjects); | |
_taskContext = new TaskContextImpl(task, new DefaultTaskLogFactory().getTaskLog(task), _service); | |
} | |
/** | |
* @param title | |
* The title | |
* @return a record with the given title | |
* @throws IOException | |
*/ | |
private Record createRecord(final String title) throws IOException { | |
final Record r = DataFactory.DEFAULT.createRecord(title); | |
r.getMetadata().put("title", title); | |
return r; | |
} | |
/** | |
* @param title | |
* The title | |
* @return a record with the given title as byte array | |
* @throws IOException | |
*/ | |
private byte[] createRecordBon(final String title) throws IOException { | |
return IPC_RECORD_WRITER.writeBinaryObject(createRecord(title)); | |
} | |
} |