blob: c3d79144f267e7a4abc5df72066519d5f61c5a7d [file] [log] [blame]
package org.eclipse.smila.jdbc.internal;
import java.sql.Connection;
import java.sql.ParameterMetaData;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.SQLSyntaxErrorException;
import java.sql.Types;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.smila.datamodel.Any;
import org.eclipse.smila.datamodel.Value;
public class JdbcWriter implements Runnable {
/** Maximum time to wait for a queue element to become available. in ms. */
public static final long MAX_WAIT_TIME = 3000;
private final Log _log = LogFactory.getLog(getClass());
final String _name;
final ArrayBlockingQueue<Map<String, Object>> _queue;
final JdbcWriterServiceImpl _jdbcWriterService;
/**
* Flag if this thread should be stopped.
*/
private boolean _stopped;
public JdbcWriter(final String name, final ArrayBlockingQueue<Map<String, Object>> queue,
final JdbcWriterServiceImpl jdbcWriterService) {
_name = name;
_queue = queue;
_jdbcWriterService = jdbcWriterService;
}
/**
* Stops the thread.
*/
public void stop() {
synchronized (this) {
_stopped = true;
if (_log.isDebugEnabled()) {
_log.debug("Stopping JdbcWriter");
}
}
}
@Override
public void run() {
if (_log.isDebugEnabled()) {
_log.debug("Started JdbcWriter " + _name);
}
while (!_stopped) {
try {
final Map<String, Object> writeParams = _queue.poll(MAX_WAIT_TIME, TimeUnit.MILLISECONDS);
if (writeParams != null) {
writeToDB(writeParams);
}
} catch (final Exception ex) {
_log.error("Error polling queue " + _name, ex);
}
} // while
if (_log.isDebugEnabled()) {
_log.debug("Stopped JdbcWriter " + _name);
}
}
private void writeToDB(final Map<String, Object> writeParams) {
try {
final Connection con = _jdbcWriterService.getConnection(writeParams);
final String sql = (String) writeParams.get(JdbcWriterServiceImpl.PARAM_STATEMENT);
final List<Value> values = (List<Value>) writeParams.get(JdbcWriterServiceImpl.PARAM_VALUES);
try (final PreparedStatement stmt = con.prepareStatement(sql)) {
final ParameterMetaData paramTypes = stmt.getParameterMetaData();
// PreparedStatement uses list of parameters
for (int j = 0; j < values.size(); j++) {
final Any param = values.get(j);
if (param == null) {
stmt.setNull(j + 1, getNullType(paramTypes, j + 1));
} else {
stmt.setObject(j + 1, param.asValue().getObject()); // PreparedStatement params start with '1'!
}
}
stmt.execute();
}
} catch (final SQLSyntaxErrorException e) {
_log.error("Error writing entry for queue " + _name, e);
} catch (final SQLException e) {
_log.error("Error writing entry for queue " + _name, e);
_jdbcWriterService.dropConnection(writeParams);
}
}
private int getNullType(final ParameterMetaData paramTypes, final int index) throws SQLException {
try {
return paramTypes.getParameterType(index);
} catch (final SQLException ex) {
// not supported by every driver. Try default.
return Types.NULL;
}
}
}