| /********************************************************************************************************************* |
| * Copyright (c) 2014 Empolis Information Management GmbH and brox IT Solutions GmbH. All rights reserved. This program |
| * and the accompanying materials are made available under the terms of the Eclipse Public License v1.0 which |
| * accompanies this distribution, and is available at http://www.eclipse.org/legal/epl-v10.html |
| *********************************************************************************************************************/ |
| package org.eclipse.smila.jdbc.test; |
| |
| import org.eclipse.smila.blackboard.Blackboard; |
| import org.eclipse.smila.blackboard.Blackboard.Get; |
| import org.eclipse.smila.blackboard.BlackboardFactory; |
| import org.eclipse.smila.datamodel.Any; |
| import org.eclipse.smila.datamodel.AnyMap; |
| import org.eclipse.smila.datamodel.DataFactory; |
| import org.eclipse.smila.jdbc.JdbcLoggingPipelet; |
| import org.eclipse.smila.jdbc.JdbcProvider; |
| import org.eclipse.smila.jdbc.JdbcSelectPipelet; |
| import org.eclipse.smila.jdbc.JdbcWriterService; |
| import org.eclipse.smila.processing.Pipelet; |
| import org.eclipse.smila.processing.PipeletTracker; |
| import org.eclipse.smila.processing.parameters.ParameterAccessor; |
| import org.eclipse.smila.test.DeclarativeServiceTestCase; |
| |
| /** Test for {@link JdbcSelectPipelet} class. */ |
| public class TestJdbcSelectPipelet extends DeclarativeServiceTestCase { |
| |
| private static final String DB_URL = "jdbc:derby:memory:testpipelet"; |
| |
| private Blackboard _bb; |
| |
| private final AnyMap _dbProps = DataFactory.DEFAULT.createAnyMap(); |
| |
| private JdbcLoggingPipelet _logPipelet; |
| |
| private JdbcSelectPipelet _selectPipelet; |
| |
| @Override |
| protected void setUp() throws Exception { |
| super.setUp(); |
| _bb = getService(BlackboardFactory.class).createTransientBlackboard(); |
| _dbProps.put(JdbcWriterService.DB_PROPERTY_USER_NAME, "user"); |
| _dbProps.put(JdbcWriterService.DB_PROPERTY_USER_PASSWORD, "password"); |
| final JdbcProvider provider = getService(JdbcProvider.class); |
| TestJdbcWriterService.prepareConnection(provider, DB_URL); |
| |
| final PipeletTracker tracker = getService(PipeletTracker.class); |
| Class<? extends Pipelet> pipeletClass = tracker.getRegisteredPipelets().get(JdbcLoggingPipelet.class.getName()); |
| _logPipelet = (JdbcLoggingPipelet) pipeletClass.newInstance(); |
| pipeletClass = tracker.getRegisteredPipelets().get(JdbcSelectPipelet.class.getName()); |
| _selectPipelet = (JdbcSelectPipelet) pipeletClass.newInstance(); |
| |
| final AnyMap config = DataFactory.DEFAULT.createAnyMap(); |
| config.put(JdbcLoggingPipelet.PARAM_DB_URL, DB_URL); |
| config.put(JdbcLoggingPipelet.PARAM_DB_PROPS, _dbProps); |
| _logPipelet.configure(config); |
| _selectPipelet.configure(config); |
| } |
| |
| @Override |
| protected void tearDown() throws Exception { |
| super.tearDown(); |
| } |
| |
| /** tests selecting a single row. */ |
| public void testSingleRow() throws Exception { |
| final AnyMap logRecord = _bb.getRecord("logRecord", Get.NEW).getMetadata(); |
| final AnyMap logParameters = logRecord.getMap(ParameterAccessor.DEFAULT_PARAMETERS_ATTRIBUTE, true); |
| logParameters.put(JdbcLoggingPipelet.PARAM_STMT, "INSERT INTO entries (string1, string2) VALUES (?, ?)"); |
| logParameters.getSeq(JdbcLoggingPipelet.PARAM_VALUE_PATHS, true).add("Sub/Att_String1"); |
| logParameters.getSeq(JdbcLoggingPipelet.PARAM_VALUE_PATHS, true).add("Sub/Att_String2"); |
| logRecord.getMap("Sub", true).put("Att_String1", "hello"); |
| logRecord.getMap("Sub", true).put("Att_String2", "pipelet"); |
| _logPipelet.process(_bb, new String[] { "logRecord" }); |
| |
| Thread.sleep(1000); // async. logging -> wait some time |
| |
| final AnyMap inputRecord = _bb.getRecord("inputRecord", Get.NEW).getMetadata(); |
| final AnyMap params = inputRecord.getMap(ParameterAccessor.DEFAULT_PARAMETERS_ATTRIBUTE, true); |
| params.put(JdbcSelectPipelet.PARAM_STMT, "SELECT * FROM entries WHERE string1=?"); |
| params.getSeq(JdbcSelectPipelet.PARAM_VALUE_PATHS, true).add("Sub/Att_String1"); |
| inputRecord.getMap("Sub", true).put("Att_String1", "hello"); |
| |
| // test result 'single' - return input record with new sections 'records' |
| { |
| params.put(JdbcSelectPipelet.RESULT_PARAM, "single"); |
| final String[] resultIds = _selectPipelet.process(_bb, new String[] { "inputRecord" }); |
| assertNotNull(resultIds); |
| assertEquals(1, resultIds.length); |
| assertEquals("inputRecord", resultIds[0]); |
| final AnyMap metadata = _bb.getRecord("inputRecord", Get.EXISTING).getMetadata(); |
| assertNotNull(metadata.getSeq(JdbcSelectPipelet.RESULT_KEY)); |
| assertEquals(1, metadata.getSeq(JdbcSelectPipelet.RESULT_KEY).size()); |
| final AnyMap dbEntry = metadata.getSeq(JdbcSelectPipelet.RESULT_KEY).getMap(0); |
| assertEquals("hello", dbEntry.getStringValue("STRING1")); |
| assertEquals("pipelet", dbEntry.getStringValue("STRING2")); |
| } |
| // test result 'multi' - return database rows as result records |
| { |
| params.put(JdbcSelectPipelet.RESULT_PARAM, "multi"); |
| final String[] resultIds = _selectPipelet.process(_bb, new String[] { "inputRecord" }); |
| assertNotNull(resultIds); |
| assertEquals(1, resultIds.length); |
| assertFalse("inputRecord".equals(resultIds[0])); |
| final AnyMap metadata = _bb.getRecord(resultIds[0], Get.EXISTING).getMetadata(); |
| assertNull(metadata.getSeq(JdbcSelectPipelet.RESULT_KEY)); |
| assertEquals("hello", metadata.getStringValue("STRING1")); |
| assertEquals("pipelet", metadata.getStringValue("STRING2")); |
| } |
| } |
| |
| /** tests selecting multiple rows. */ |
| public void testMultipleRows() throws Exception { |
| final int noOfRecords = 3; |
| final AnyMap logRecord = _bb.getRecord("logRecord", Get.NEW).getMetadata(); |
| final AnyMap logParameters = logRecord.getMap(ParameterAccessor.DEFAULT_PARAMETERS_ATTRIBUTE, true); |
| logParameters.put(JdbcLoggingPipelet.PARAM_STMT, |
| "INSERT INTO entries (string1, string2) VALUES ('Test1', 'Test2')"); |
| for (int i = 0; i < noOfRecords; i++) { |
| _logPipelet.process(_bb, new String[] { "logRecord" }); |
| } |
| Thread.sleep(1000); // async. logging -> wait some time |
| |
| final AnyMap inputRecord = _bb.getRecord("inputRecord", Get.NEW).getMetadata(); |
| final AnyMap params = inputRecord.getMap(ParameterAccessor.DEFAULT_PARAMETERS_ATTRIBUTE, true); |
| params.put(JdbcSelectPipelet.PARAM_STMT, "SELECT * FROM entries WHERE string1='Test1'"); |
| |
| // test result 'single' - return input record with new sections 'records' |
| { |
| params.put(JdbcSelectPipelet.RESULT_PARAM, "single"); |
| final String[] resultIds = _selectPipelet.process(_bb, new String[] { "inputRecord" }); |
| assertNotNull(resultIds); |
| assertEquals(1, resultIds.length); |
| assertEquals("inputRecord", resultIds[0]); |
| final AnyMap metadata = _bb.getRecord("inputRecord", Get.EXISTING).getMetadata(); |
| assertNotNull(metadata.getSeq(JdbcSelectPipelet.RESULT_KEY)); |
| assertEquals(noOfRecords, metadata.getSeq(JdbcSelectPipelet.RESULT_KEY).size()); |
| for (final Any dbAny : metadata.getSeq(JdbcSelectPipelet.RESULT_KEY)) { |
| final AnyMap dbEntry = dbAny.asMap(); |
| assertEquals("Test1", dbEntry.getStringValue("STRING1")); |
| assertEquals("Test2", dbEntry.getStringValue("STRING2")); |
| } |
| } |
| // test result 'multi' - return database rows as result records |
| { |
| params.put(JdbcSelectPipelet.RESULT_PARAM, "multi"); |
| final String[] resultIds = _selectPipelet.process(_bb, new String[] { "inputRecord" }); |
| assertNotNull(resultIds); |
| assertEquals(noOfRecords, resultIds.length); |
| for (int i = 0; i < noOfRecords; i++) { |
| final String resultId = resultIds[i]; |
| assertFalse("inputRecord".equals(resultId)); |
| final AnyMap metadata = _bb.getRecord(resultId, Get.EXISTING).getMetadata(); |
| assertNull(metadata.getSeq(JdbcSelectPipelet.RESULT_KEY)); |
| assertEquals("Test1", metadata.getStringValue("STRING1")); |
| assertEquals("Test2", metadata.getStringValue("STRING2")); |
| } |
| } |
| } |
| |
| /** tests selecting with multiple input records. */ |
| public void testMultipleRecords() throws Exception { |
| final AnyMap logRecord = _bb.getRecord("logRecord", Get.NEW).getMetadata(); |
| final AnyMap logParameters = logRecord.getMap(ParameterAccessor.DEFAULT_PARAMETERS_ATTRIBUTE, true); |
| |
| logParameters.put(JdbcLoggingPipelet.PARAM_STMT, |
| "INSERT INTO entries (string1, string2) VALUES ('Test1', 'Test2')"); |
| _logPipelet.process(_bb, new String[] { "logRecord" }); |
| |
| logParameters.put(JdbcLoggingPipelet.PARAM_STMT, |
| "INSERT INTO entries (string1, string2) VALUES ('Test3', 'Test4')"); |
| _logPipelet.process(_bb, new String[] { "logRecord" }); |
| |
| Thread.sleep(1000); // async. logging -> wait some time |
| |
| final AnyMap inputRecord = _bb.getRecord("inputRecord", Get.NEW).getMetadata(); |
| final AnyMap params = inputRecord.getMap(ParameterAccessor.DEFAULT_PARAMETERS_ATTRIBUTE, true); |
| params.put(JdbcSelectPipelet.PARAM_STMT, "SELECT * FROM entries WHERE string1='Test1'"); |
| |
| final AnyMap inputRecord2 = _bb.getRecord("inputRecord2", Get.NEW).getMetadata(); |
| final AnyMap params2 = inputRecord2.getMap(ParameterAccessor.DEFAULT_PARAMETERS_ATTRIBUTE, true); |
| params2.put(JdbcSelectPipelet.PARAM_STMT, "SELECT * FROM entries WHERE string1='Test3'"); |
| |
| // test result 'single' - all returned input records have new sections 'records' |
| { |
| params.put(JdbcSelectPipelet.RESULT_PARAM, "single"); |
| params2.put(JdbcSelectPipelet.RESULT_PARAM, "single"); |
| |
| final String[] resultIds = _selectPipelet.process(_bb, new String[] { "inputRecord", "inputRecord2" }); |
| assertNotNull(resultIds); |
| assertEquals(2, resultIds.length); |
| assertEquals("inputRecord", resultIds[0]); |
| assertEquals("inputRecord2", resultIds[1]); |
| |
| final AnyMap metadata1 = _bb.getRecord("inputRecord", Get.EXISTING).getMetadata(); |
| final AnyMap dbEntry = metadata1.getSeq(JdbcSelectPipelet.RESULT_KEY).getMap(0); |
| assertEquals("Test1", dbEntry.getStringValue("STRING1")); |
| assertEquals("Test2", dbEntry.getStringValue("STRING2")); |
| |
| final AnyMap metadata2 = _bb.getRecord("inputRecord2", Get.EXISTING).getMetadata(); |
| final AnyMap dbEntry2 = metadata2.getSeq(JdbcSelectPipelet.RESULT_KEY).getMap(0); |
| assertEquals("Test3", dbEntry2.getStringValue("STRING1")); |
| assertEquals("Test4", dbEntry2.getStringValue("STRING2")); |
| |
| } |
| // test result 'multi' - return database rows as result records |
| { |
| params.put(JdbcSelectPipelet.RESULT_PARAM, "multi"); |
| params2.put(JdbcSelectPipelet.RESULT_PARAM, "multi"); |
| |
| final String[] resultIds = _selectPipelet.process(_bb, new String[] { "inputRecord", "inputRecord2" }); |
| assertNotNull(resultIds); |
| assertEquals(2, resultIds.length); |
| |
| final AnyMap metadata1 = _bb.getRecord(resultIds[0], Get.EXISTING).getMetadata(); |
| assertNull(metadata1.getSeq(JdbcSelectPipelet.RESULT_KEY)); |
| assertEquals("Test1", metadata1.getStringValue("STRING1")); |
| assertEquals("Test2", metadata1.getStringValue("STRING2")); |
| |
| final AnyMap metadata2 = _bb.getRecord(resultIds[1], Get.EXISTING).getMetadata(); |
| assertNull(metadata2.getSeq(JdbcSelectPipelet.RESULT_KEY)); |
| assertEquals("Test3", metadata2.getStringValue("STRING1")); |
| assertEquals("Test4", metadata2.getStringValue("STRING2")); |
| } |
| } |
| } |