| /******************************************************************************* |
| * Copyright (c) 2008, 2012 Empolis Information Management GmbH and brox IT Solutions GmbH. All rights reserved. This |
| * program and the accompanying materials are made available under the terms of the Eclipse Public License v1.0 which |
| * accompanies this distribution, and is available at http://www.eclipse.org/legal/epl-v10.html |
| * |
| * Contributors: Daniel Stucky (Empolis Information Management GmbH) - initial API and implementation |
| *******************************************************************************/ |
| package org.eclipse.smila.jdbc.internal; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.sql.Connection; |
| import java.sql.SQLException; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.concurrent.ArrayBlockingQueue; |
| |
| import org.apache.commons.io.IOUtils; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.eclipse.smila.datamodel.AnyMap; |
| import org.eclipse.smila.datamodel.Value; |
| import org.eclipse.smila.datamodel.util.AnyUtil; |
| import org.eclipse.smila.jdbc.JdbcProvider; |
| import org.eclipse.smila.jdbc.JdbcWriterService; |
| import org.eclipse.smila.jdbc.JdbcWriterServiceException; |
| import org.eclipse.smila.utils.config.ConfigUtils; |
| import org.eclipse.smila.utils.digest.DigestHelper; |
| import org.osgi.service.component.ComponentContext; |
| |
| public class JdbcWriterServiceImpl implements JdbcWriterService { |
| |
| public static final String BUNDLE_ID = "org.eclipse.smila.jdbc"; |
| |
| public static final String FILENAME = "jdbcwriterservice.properties"; |
| |
| public static final String PROPERTY_CAPACITY = "capacity"; |
| |
| public static final String PARAM_DB_URL = "dbUrl"; |
| |
| public static final String PARAM_DB_PROPS = "dbProps"; |
| |
| public static final String PARAM_STATEMENT = "statement"; |
| |
| public static final String PARAM_VALUES = "values"; |
| |
| private JdbcProvider _jdbcProvider; |
| |
| /** default capacity of each database queue */ |
| private int _capacity = 100; |
| |
| private final Map<String, ArrayBlockingQueue<Map<String, Object>>> _queues = |
| new HashMap<String, ArrayBlockingQueue<Map<String, Object>>>(); |
| |
| private final List<JdbcWriter> _jdbcWriters = new ArrayList<JdbcWriter>(); |
| |
| private final List<Thread> _jdbcWriterThreads = new ArrayList<Thread>(); |
| |
| private final Map<String, Connection> _connectionPool = new HashMap<String, Connection>(); |
| |
| private final Log _log = LogFactory.getLog(getClass()); |
| |
| @Override |
| public void write(final String dbUrl, final AnyMap dbProps, final String preparedStatement, |
| final List<Value> values) throws JdbcWriterServiceException { |
| try { |
| final Map<String, Object> writeParams = new HashMap<String, Object>(); |
| writeParams.put(PARAM_DB_URL, dbUrl); |
| writeParams.put(PARAM_DB_PROPS, dbProps); |
| writeParams.put(PARAM_STATEMENT, preparedStatement); |
| if (values == null) { |
| writeParams.put(PARAM_VALUES, NO_VALUES); |
| } else { |
| writeParams.put(PARAM_VALUES, values); |
| } |
| |
| final ArrayBlockingQueue<Map<String, Object>> queue = getWriteQueue(dbUrl); |
| if (_log.isDebugEnabled()) { |
| _log.debug("Queue length for " + dbUrl + ": " + queue.size()); |
| } |
| queue.add(writeParams); // throws IllegalStateException if this queue is full |
| } catch (final Exception e) { |
| throw new JdbcWriterServiceException("Error queueing data for queue " + dbUrl, e); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| protected void activate(final ComponentContext context) { |
| readConfiguration(); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| protected void deactivate(final ComponentContext context) { |
| synchronized (_queues) { |
| for (final JdbcWriter writer : _jdbcWriters) { |
| writer.stop(); |
| } |
| for (final Thread thread : _jdbcWriterThreads) { |
| try { |
| thread.join(JdbcWriter.MAX_WAIT_TIME * 2); |
| } catch (final InterruptedException e) { |
| _log.error("Error joining writer threads in deactivate", e); |
| } |
| } |
| _jdbcWriterThreads.clear(); |
| _jdbcWriters.clear(); |
| _queues.clear(); |
| } |
| synchronized (_connectionPool) { |
| for (final Connection con : _connectionPool.values()) { |
| try { |
| con.close(); |
| } catch (final SQLException e) { |
| _log.error("Error closing JDBC connection in deactivate", e); |
| } |
| } |
| _connectionPool.clear(); |
| } |
| } |
| |
| /** DS service reference bind method. */ |
| public void setJdbcProvider(final JdbcProvider jdbcProvider) { |
| _jdbcProvider = jdbcProvider; |
| } |
| |
| /** DS service reference unbind method. */ |
| public void unsetJdbcProvider(final JdbcProvider jdbcProvider) { |
| if (_jdbcProvider == jdbcProvider) { |
| _jdbcProvider = null; |
| } |
| } |
| |
| protected Connection getConnection(final Map<String, Object> writeParams) throws SQLException { |
| final String dbUrl = (String) writeParams.get(PARAM_DB_URL); |
| final AnyMap dbProps = (AnyMap) writeParams.get(PARAM_DB_PROPS); |
| final String user = dbProps.getStringValue(DB_PROPERTY_USER_NAME); |
| final String connectionKey = DigestHelper.calculateDigest(dbUrl + user); |
| if (_log.isDebugEnabled()) { |
| _log.debug("Getting connection for " + user + "@" + dbUrl); |
| } |
| synchronized (_connectionPool) { |
| Connection connection = _connectionPool.get(connectionKey); |
| if (connection == null) { |
| connection = _jdbcProvider.getConnection(dbUrl, AnyUtil.anyToProperties(dbProps)); |
| connection.setAutoCommit(true); |
| _connectionPool.put(connectionKey, connection); |
| } |
| return connection; |
| } |
| } |
| |
| protected void dropConnection(final Map<String, Object> writeParams) { |
| final String dbUrl = (String) writeParams.get(PARAM_DB_URL); |
| final AnyMap dbProps = (AnyMap) writeParams.get(PARAM_DB_PROPS); |
| final String user = dbProps.getStringValue(DB_PROPERTY_USER_NAME); |
| final String connectionKey = DigestHelper.calculateDigest(dbUrl + user); |
| if (_log.isDebugEnabled()) { |
| _log.debug("Dropping connection for " + user + "@" + dbUrl); |
| } |
| synchronized (_connectionPool) { |
| try { |
| final Connection connection = _connectionPool.get(connectionKey); |
| if (connection != null) { |
| connection.close(); |
| } |
| } catch (final SQLException e) { |
| _log.error("Error dropping connection to " + dbUrl + " for user " + user, e); |
| } |
| _connectionPool.remove(connectionKey); |
| } |
| } |
| |
| private ArrayBlockingQueue<Map<String, Object>> getWriteQueue(final String name) { |
| synchronized (_queues) { |
| ArrayBlockingQueue<Map<String, Object>> queue = _queues.get(name); |
| if (queue == null) { |
| queue = new ArrayBlockingQueue<Map<String, Object>>(_capacity); |
| _queues.put(name, queue); |
| final JdbcWriter jdbcWriter = new JdbcWriter(name, queue, this); |
| _jdbcWriters.add(jdbcWriter); |
| final Thread thread = new Thread(jdbcWriter, "JdbcWriter " + name); |
| _jdbcWriterThreads.add(thread); |
| thread.start(); |
| } |
| return queue; |
| } |
| } |
| |
| /** |
| * read configuration from config file. |
| * |
| * @throws IOException |
| * error reading the file or missing required properties. |
| */ |
| private void readConfiguration() { |
| InputStream propStream = null; |
| try { |
| propStream = ConfigUtils.getConfigStream(BUNDLE_ID, FILENAME); |
| if (propStream != null) { |
| final Properties properties = new Properties(); |
| properties.load(propStream); |
| try { |
| final String capacity = properties.getProperty(PROPERTY_CAPACITY); |
| _capacity = Integer.parseInt(capacity); |
| } catch (final Exception e) { |
| _log.error("Could not read property " + PROPERTY_CAPACITY + ", using default", e); |
| } |
| } |
| } catch (final Exception e) { |
| _log.error("Could not find " + FILENAME + " in configuration area.", e); |
| } finally { |
| IOUtils.closeQuietly(propStream); |
| } |
| } |
| } |