blob: 85d53eb555aa54129e02788837ec957e987dee4c [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2012 Empolis Information Management GmbH and brox IT Solutions GmbH. All rights reserved. This
* program and the accompanying materials are made available under the terms of the Eclipse Public License v1.0 which
* accompanies this distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Daniel Stucky (Empolis Information Management GmbH) - initial API and implementation
*******************************************************************************/
package org.eclipse.smila.jdbc.internal;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.Value;
import org.eclipse.smila.datamodel.util.AnyUtil;
import org.eclipse.smila.jdbc.JdbcProvider;
import org.eclipse.smila.jdbc.JdbcWriterService;
import org.eclipse.smila.jdbc.JdbcWriterServiceException;
import org.eclipse.smila.utils.config.ConfigUtils;
import org.eclipse.smila.utils.digest.DigestHelper;
import org.osgi.service.component.ComponentContext;
public class JdbcWriterServiceImpl implements JdbcWriterService {
public static final String BUNDLE_ID = "org.eclipse.smila.jdbc";
public static final String FILENAME = "jdbcwriterservice.properties";
public static final String PROPERTY_CAPACITY = "capacity";
public static final String PARAM_DB_URL = "dbUrl";
public static final String PARAM_DB_PROPS = "dbProps";
public static final String PARAM_STATEMENT = "statement";
public static final String PARAM_VALUES = "values";
private JdbcProvider _jdbcProvider;
/** default capacity of each database queue */
private int _capacity = 100;
private final Map<String, ArrayBlockingQueue<Map<String, Object>>> _queues =
new HashMap<String, ArrayBlockingQueue<Map<String, Object>>>();
private final List<JdbcWriter> _jdbcWriters = new ArrayList<JdbcWriter>();
private final List<Thread> _jdbcWriterThreads = new ArrayList<Thread>();
private final Map<String, Connection> _connectionPool = new HashMap<String, Connection>();
private final Log _log = LogFactory.getLog(getClass());
@Override
public void write(final String dbUrl, final AnyMap dbProps, final String preparedStatement,
final List<Value> values) throws JdbcWriterServiceException {
try {
final Map<String, Object> writeParams = new HashMap<String, Object>();
writeParams.put(PARAM_DB_URL, dbUrl);
writeParams.put(PARAM_DB_PROPS, dbProps);
writeParams.put(PARAM_STATEMENT, preparedStatement);
if (values == null) {
writeParams.put(PARAM_VALUES, NO_VALUES);
} else {
writeParams.put(PARAM_VALUES, values);
}
final ArrayBlockingQueue<Map<String, Object>> queue = getWriteQueue(dbUrl);
if (_log.isDebugEnabled()) {
_log.debug("Queue length for " + dbUrl + ": " + queue.size());
}
queue.add(writeParams); // throws IllegalStateException if this queue is full
} catch (final Exception e) {
throw new JdbcWriterServiceException("Error queueing data for queue " + dbUrl, e);
}
}
/**
* {@inheritDoc}
*/
protected void activate(final ComponentContext context) {
readConfiguration();
}
/**
* {@inheritDoc}
*/
protected void deactivate(final ComponentContext context) {
synchronized (_queues) {
for (final JdbcWriter writer : _jdbcWriters) {
writer.stop();
}
for (final Thread thread : _jdbcWriterThreads) {
try {
thread.join(JdbcWriter.MAX_WAIT_TIME * 2);
} catch (final InterruptedException e) {
_log.error("Error joining writer threads in deactivate", e);
}
}
_jdbcWriterThreads.clear();
_jdbcWriters.clear();
_queues.clear();
}
synchronized (_connectionPool) {
for (final Connection con : _connectionPool.values()) {
try {
con.close();
} catch (final SQLException e) {
_log.error("Error closing JDBC connection in deactivate", e);
}
}
_connectionPool.clear();
}
}
/** DS service reference bind method. */
public void setJdbcProvider(final JdbcProvider jdbcProvider) {
_jdbcProvider = jdbcProvider;
}
/** DS service reference unbind method. */
public void unsetJdbcProvider(final JdbcProvider jdbcProvider) {
if (_jdbcProvider == jdbcProvider) {
_jdbcProvider = null;
}
}
protected Connection getConnection(final Map<String, Object> writeParams) throws SQLException {
final String dbUrl = (String) writeParams.get(PARAM_DB_URL);
final AnyMap dbProps = (AnyMap) writeParams.get(PARAM_DB_PROPS);
final String user = dbProps.getStringValue(DB_PROPERTY_USER_NAME);
final String connectionKey = DigestHelper.calculateDigest(dbUrl + user);
if (_log.isDebugEnabled()) {
_log.debug("Getting connection for " + user + "@" + dbUrl);
}
synchronized (_connectionPool) {
Connection connection = _connectionPool.get(connectionKey);
if (connection == null) {
connection = _jdbcProvider.getConnection(dbUrl, AnyUtil.anyToProperties(dbProps));
connection.setAutoCommit(true);
_connectionPool.put(connectionKey, connection);
}
return connection;
}
}
protected void dropConnection(final Map<String, Object> writeParams) {
final String dbUrl = (String) writeParams.get(PARAM_DB_URL);
final AnyMap dbProps = (AnyMap) writeParams.get(PARAM_DB_PROPS);
final String user = dbProps.getStringValue(DB_PROPERTY_USER_NAME);
final String connectionKey = DigestHelper.calculateDigest(dbUrl + user);
if (_log.isDebugEnabled()) {
_log.debug("Dropping connection for " + user + "@" + dbUrl);
}
synchronized (_connectionPool) {
try {
final Connection connection = _connectionPool.get(connectionKey);
if (connection != null) {
connection.close();
}
} catch (final SQLException e) {
_log.error("Error dropping connection to " + dbUrl + " for user " + user, e);
}
_connectionPool.remove(connectionKey);
}
}
private ArrayBlockingQueue<Map<String, Object>> getWriteQueue(final String name) {
synchronized (_queues) {
ArrayBlockingQueue<Map<String, Object>> queue = _queues.get(name);
if (queue == null) {
queue = new ArrayBlockingQueue<Map<String, Object>>(_capacity);
_queues.put(name, queue);
final JdbcWriter jdbcWriter = new JdbcWriter(name, queue, this);
_jdbcWriters.add(jdbcWriter);
final Thread thread = new Thread(jdbcWriter, "JdbcWriter " + name);
_jdbcWriterThreads.add(thread);
thread.start();
}
return queue;
}
}
/**
* read configuration from config file.
*
* @throws IOException
* error reading the file or missing required properties.
*/
private void readConfiguration() {
InputStream propStream = null;
try {
propStream = ConfigUtils.getConfigStream(BUNDLE_ID, FILENAME);
if (propStream != null) {
final Properties properties = new Properties();
properties.load(propStream);
try {
final String capacity = properties.getProperty(PROPERTY_CAPACITY);
_capacity = Integer.parseInt(capacity);
} catch (final Exception e) {
_log.error("Could not read property " + PROPERTY_CAPACITY + ", using default", e);
}
}
} catch (final Exception e) {
_log.error("Could not find " + FILENAME + " in configuration area.", e);
} finally {
IOUtils.closeQuietly(propStream);
}
}
}