blob: d58a077499e4da30e0387467662250cf33ebd5fb [file] [log] [blame]
/*********************************************************************************************************************
* Copyright (c) 2014 Empolis Information Management GmbH and brox IT Solutions GmbH. All rights reserved. This program
* and the accompanying materials are made available under the terms of the Eclipse Public License v1.0 which
* accompanies this distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*********************************************************************************************************************/
package org.eclipse.smila.jdbc.test;
import org.eclipse.smila.blackboard.Blackboard;
import org.eclipse.smila.blackboard.Blackboard.Get;
import org.eclipse.smila.blackboard.BlackboardFactory;
import org.eclipse.smila.datamodel.Any;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.DataFactory;
import org.eclipse.smila.jdbc.JdbcLoggingPipelet;
import org.eclipse.smila.jdbc.JdbcProvider;
import org.eclipse.smila.jdbc.JdbcSelectPipelet;
import org.eclipse.smila.jdbc.JdbcWriterService;
import org.eclipse.smila.processing.Pipelet;
import org.eclipse.smila.processing.PipeletTracker;
import org.eclipse.smila.processing.parameters.ParameterAccessor;
import org.eclipse.smila.test.DeclarativeServiceTestCase;
/** Test for {@link JdbcSelectPipelet} class. */
public class TestJdbcSelectPipelet extends DeclarativeServiceTestCase {
private static final String DB_URL = "jdbc:derby:memory:testpipelet";
private Blackboard _bb;
private final AnyMap _dbProps = DataFactory.DEFAULT.createAnyMap();
private JdbcLoggingPipelet _logPipelet;
private JdbcSelectPipelet _selectPipelet;
@Override
protected void setUp() throws Exception {
super.setUp();
_bb = getService(BlackboardFactory.class).createTransientBlackboard();
_dbProps.put(JdbcWriterService.DB_PROPERTY_USER_NAME, "user");
_dbProps.put(JdbcWriterService.DB_PROPERTY_USER_PASSWORD, "password");
final JdbcProvider provider = getService(JdbcProvider.class);
TestJdbcWriterService.prepareConnection(provider, DB_URL);
final PipeletTracker tracker = getService(PipeletTracker.class);
Class<? extends Pipelet> pipeletClass = tracker.getRegisteredPipelets().get(JdbcLoggingPipelet.class.getName());
_logPipelet = (JdbcLoggingPipelet) pipeletClass.newInstance();
pipeletClass = tracker.getRegisteredPipelets().get(JdbcSelectPipelet.class.getName());
_selectPipelet = (JdbcSelectPipelet) pipeletClass.newInstance();
final AnyMap config = DataFactory.DEFAULT.createAnyMap();
config.put(JdbcLoggingPipelet.PARAM_DB_URL, DB_URL);
config.put(JdbcLoggingPipelet.PARAM_DB_PROPS, _dbProps);
_logPipelet.configure(config);
_selectPipelet.configure(config);
}
@Override
protected void tearDown() throws Exception {
super.tearDown();
}
/** tests selecting a single row. */
public void testSingleRow() throws Exception {
final AnyMap logRecord = _bb.getRecord("logRecord", Get.NEW).getMetadata();
final AnyMap logParameters = logRecord.getMap(ParameterAccessor.DEFAULT_PARAMETERS_ATTRIBUTE, true);
logParameters.put(JdbcLoggingPipelet.PARAM_STMT, "INSERT INTO entries (string1, string2) VALUES (?, ?)");
logParameters.getSeq(JdbcLoggingPipelet.PARAM_VALUE_PATHS, true).add("Sub/Att_String1");
logParameters.getSeq(JdbcLoggingPipelet.PARAM_VALUE_PATHS, true).add("Sub/Att_String2");
logRecord.getMap("Sub", true).put("Att_String1", "hello");
logRecord.getMap("Sub", true).put("Att_String2", "pipelet");
_logPipelet.process(_bb, new String[] { "logRecord" });
Thread.sleep(1000); // async. logging -> wait some time
final AnyMap inputRecord = _bb.getRecord("inputRecord", Get.NEW).getMetadata();
final AnyMap params = inputRecord.getMap(ParameterAccessor.DEFAULT_PARAMETERS_ATTRIBUTE, true);
params.put(JdbcSelectPipelet.PARAM_STMT, "SELECT * FROM entries WHERE string1=?");
params.getSeq(JdbcSelectPipelet.PARAM_VALUE_PATHS, true).add("Sub/Att_String1");
inputRecord.getMap("Sub", true).put("Att_String1", "hello");
// test result 'single' - return input record with new sections 'records'
{
params.put(JdbcSelectPipelet.RESULT_PARAM, "single");
final String[] resultIds = _selectPipelet.process(_bb, new String[] { "inputRecord" });
assertNotNull(resultIds);
assertEquals(1, resultIds.length);
assertEquals("inputRecord", resultIds[0]);
final AnyMap metadata = _bb.getRecord("inputRecord", Get.EXISTING).getMetadata();
assertNotNull(metadata.getSeq(JdbcSelectPipelet.RESULT_KEY));
assertEquals(1, metadata.getSeq(JdbcSelectPipelet.RESULT_KEY).size());
final AnyMap dbEntry = metadata.getSeq(JdbcSelectPipelet.RESULT_KEY).getMap(0);
assertEquals("hello", dbEntry.getStringValue("STRING1"));
assertEquals("pipelet", dbEntry.getStringValue("STRING2"));
}
// test result 'multi' - return database rows as result records
{
params.put(JdbcSelectPipelet.RESULT_PARAM, "multi");
final String[] resultIds = _selectPipelet.process(_bb, new String[] { "inputRecord" });
assertNotNull(resultIds);
assertEquals(1, resultIds.length);
assertFalse("inputRecord".equals(resultIds[0]));
final AnyMap metadata = _bb.getRecord(resultIds[0], Get.EXISTING).getMetadata();
assertNull(metadata.getSeq(JdbcSelectPipelet.RESULT_KEY));
assertEquals("hello", metadata.getStringValue("STRING1"));
assertEquals("pipelet", metadata.getStringValue("STRING2"));
}
}
/** tests selecting multiple rows. */
public void testMultipleRows() throws Exception {
final int noOfRecords = 3;
final AnyMap logRecord = _bb.getRecord("logRecord", Get.NEW).getMetadata();
final AnyMap logParameters = logRecord.getMap(ParameterAccessor.DEFAULT_PARAMETERS_ATTRIBUTE, true);
logParameters.put(JdbcLoggingPipelet.PARAM_STMT,
"INSERT INTO entries (string1, string2) VALUES ('Test1', 'Test2')");
for (int i = 0; i < noOfRecords; i++) {
_logPipelet.process(_bb, new String[] { "logRecord" });
}
Thread.sleep(1000); // async. logging -> wait some time
final AnyMap inputRecord = _bb.getRecord("inputRecord", Get.NEW).getMetadata();
final AnyMap params = inputRecord.getMap(ParameterAccessor.DEFAULT_PARAMETERS_ATTRIBUTE, true);
params.put(JdbcSelectPipelet.PARAM_STMT, "SELECT * FROM entries WHERE string1='Test1'");
// test result 'single' - return input record with new sections 'records'
{
params.put(JdbcSelectPipelet.RESULT_PARAM, "single");
final String[] resultIds = _selectPipelet.process(_bb, new String[] { "inputRecord" });
assertNotNull(resultIds);
assertEquals(1, resultIds.length);
assertEquals("inputRecord", resultIds[0]);
final AnyMap metadata = _bb.getRecord("inputRecord", Get.EXISTING).getMetadata();
assertNotNull(metadata.getSeq(JdbcSelectPipelet.RESULT_KEY));
assertEquals(noOfRecords, metadata.getSeq(JdbcSelectPipelet.RESULT_KEY).size());
for (final Any dbAny : metadata.getSeq(JdbcSelectPipelet.RESULT_KEY)) {
final AnyMap dbEntry = dbAny.asMap();
assertEquals("Test1", dbEntry.getStringValue("STRING1"));
assertEquals("Test2", dbEntry.getStringValue("STRING2"));
}
}
// test result 'multi' - return database rows as result records
{
params.put(JdbcSelectPipelet.RESULT_PARAM, "multi");
final String[] resultIds = _selectPipelet.process(_bb, new String[] { "inputRecord" });
assertNotNull(resultIds);
assertEquals(noOfRecords, resultIds.length);
for (int i = 0; i < noOfRecords; i++) {
final String resultId = resultIds[i];
assertFalse("inputRecord".equals(resultId));
final AnyMap metadata = _bb.getRecord(resultId, Get.EXISTING).getMetadata();
assertNull(metadata.getSeq(JdbcSelectPipelet.RESULT_KEY));
assertEquals("Test1", metadata.getStringValue("STRING1"));
assertEquals("Test2", metadata.getStringValue("STRING2"));
}
}
}
/** tests selecting with multiple input records. */
public void testMultipleRecords() throws Exception {
final AnyMap logRecord = _bb.getRecord("logRecord", Get.NEW).getMetadata();
final AnyMap logParameters = logRecord.getMap(ParameterAccessor.DEFAULT_PARAMETERS_ATTRIBUTE, true);
logParameters.put(JdbcLoggingPipelet.PARAM_STMT,
"INSERT INTO entries (string1, string2) VALUES ('Test1', 'Test2')");
_logPipelet.process(_bb, new String[] { "logRecord" });
logParameters.put(JdbcLoggingPipelet.PARAM_STMT,
"INSERT INTO entries (string1, string2) VALUES ('Test3', 'Test4')");
_logPipelet.process(_bb, new String[] { "logRecord" });
Thread.sleep(1000); // async. logging -> wait some time
final AnyMap inputRecord = _bb.getRecord("inputRecord", Get.NEW).getMetadata();
final AnyMap params = inputRecord.getMap(ParameterAccessor.DEFAULT_PARAMETERS_ATTRIBUTE, true);
params.put(JdbcSelectPipelet.PARAM_STMT, "SELECT * FROM entries WHERE string1='Test1'");
final AnyMap inputRecord2 = _bb.getRecord("inputRecord2", Get.NEW).getMetadata();
final AnyMap params2 = inputRecord2.getMap(ParameterAccessor.DEFAULT_PARAMETERS_ATTRIBUTE, true);
params2.put(JdbcSelectPipelet.PARAM_STMT, "SELECT * FROM entries WHERE string1='Test3'");
// test result 'single' - all returned input records have new sections 'records'
{
params.put(JdbcSelectPipelet.RESULT_PARAM, "single");
params2.put(JdbcSelectPipelet.RESULT_PARAM, "single");
final String[] resultIds = _selectPipelet.process(_bb, new String[] { "inputRecord", "inputRecord2" });
assertNotNull(resultIds);
assertEquals(2, resultIds.length);
assertEquals("inputRecord", resultIds[0]);
assertEquals("inputRecord2", resultIds[1]);
final AnyMap metadata1 = _bb.getRecord("inputRecord", Get.EXISTING).getMetadata();
final AnyMap dbEntry = metadata1.getSeq(JdbcSelectPipelet.RESULT_KEY).getMap(0);
assertEquals("Test1", dbEntry.getStringValue("STRING1"));
assertEquals("Test2", dbEntry.getStringValue("STRING2"));
final AnyMap metadata2 = _bb.getRecord("inputRecord2", Get.EXISTING).getMetadata();
final AnyMap dbEntry2 = metadata2.getSeq(JdbcSelectPipelet.RESULT_KEY).getMap(0);
assertEquals("Test3", dbEntry2.getStringValue("STRING1"));
assertEquals("Test4", dbEntry2.getStringValue("STRING2"));
}
// test result 'multi' - return database rows as result records
{
params.put(JdbcSelectPipelet.RESULT_PARAM, "multi");
params2.put(JdbcSelectPipelet.RESULT_PARAM, "multi");
final String[] resultIds = _selectPipelet.process(_bb, new String[] { "inputRecord", "inputRecord2" });
assertNotNull(resultIds);
assertEquals(2, resultIds.length);
final AnyMap metadata1 = _bb.getRecord(resultIds[0], Get.EXISTING).getMetadata();
assertNull(metadata1.getSeq(JdbcSelectPipelet.RESULT_KEY));
assertEquals("Test1", metadata1.getStringValue("STRING1"));
assertEquals("Test2", metadata1.getStringValue("STRING2"));
final AnyMap metadata2 = _bb.getRecord(resultIds[1], Get.EXISTING).getMetadata();
assertNull(metadata2.getSeq(JdbcSelectPipelet.RESULT_KEY));
assertEquals("Test3", metadata2.getStringValue("STRING1"));
assertEquals("Test4", metadata2.getStringValue("STRING2"));
}
}
}