| package org.eclipse.smila.jdbc.internal; |
| |
| import java.sql.Connection; |
| import java.sql.ParameterMetaData; |
| import java.sql.PreparedStatement; |
| import java.sql.SQLException; |
| import java.sql.SQLSyntaxErrorException; |
| import java.sql.Types; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.eclipse.smila.datamodel.Any; |
| import org.eclipse.smila.datamodel.Value; |
| |
| public class JdbcWriter implements Runnable { |
| |
| /** Maximum time to wait for a queue element to become available. in ms. */ |
| public static final long MAX_WAIT_TIME = 3000; |
| |
| private final Log _log = LogFactory.getLog(getClass()); |
| |
| final String _name; |
| |
| final ArrayBlockingQueue<Map<String, Object>> _queue; |
| |
| final JdbcWriterServiceImpl _jdbcWriterService; |
| |
| /** |
| * Flag if this thread should be stopped. |
| */ |
| private boolean _stopped; |
| |
| public JdbcWriter(final String name, final ArrayBlockingQueue<Map<String, Object>> queue, |
| final JdbcWriterServiceImpl jdbcWriterService) { |
| _name = name; |
| _queue = queue; |
| _jdbcWriterService = jdbcWriterService; |
| } |
| |
| /** |
| * Stops the thread. |
| */ |
| public void stop() { |
| synchronized (this) { |
| _stopped = true; |
| if (_log.isDebugEnabled()) { |
| _log.debug("Stopping JdbcWriter"); |
| } |
| } |
| } |
| |
| @Override |
| public void run() { |
| if (_log.isDebugEnabled()) { |
| _log.debug("Started JdbcWriter " + _name); |
| } |
| while (!_stopped) { |
| try { |
| final Map<String, Object> writeParams = _queue.poll(MAX_WAIT_TIME, TimeUnit.MILLISECONDS); |
| if (writeParams != null) { |
| writeToDB(writeParams); |
| } |
| } catch (final Exception ex) { |
| _log.error("Error polling queue " + _name, ex); |
| } |
| } // while |
| |
| if (_log.isDebugEnabled()) { |
| _log.debug("Stopped JdbcWriter " + _name); |
| } |
| } |
| |
| private void writeToDB(final Map<String, Object> writeParams) { |
| try { |
| final Connection con = _jdbcWriterService.getConnection(writeParams); |
| final String sql = (String) writeParams.get(JdbcWriterServiceImpl.PARAM_STATEMENT); |
| final List<Value> values = (List<Value>) writeParams.get(JdbcWriterServiceImpl.PARAM_VALUES); |
| try (final PreparedStatement stmt = con.prepareStatement(sql)) { |
| final ParameterMetaData paramTypes = stmt.getParameterMetaData(); |
| // PreparedStatement uses list of parameters |
| for (int j = 0; j < values.size(); j++) { |
| final Any param = values.get(j); |
| if (param == null) { |
| stmt.setNull(j + 1, getNullType(paramTypes, j + 1)); |
| } else { |
| stmt.setObject(j + 1, param.asValue().getObject()); // PreparedStatement params start with '1'! |
| } |
| |
| } |
| stmt.execute(); |
| } |
| } catch (final SQLSyntaxErrorException e) { |
| _log.error("Error writing entry for queue " + _name, e); |
| } catch (final SQLException e) { |
| _log.error("Error writing entry for queue " + _name, e); |
| _jdbcWriterService.dropConnection(writeParams); |
| } |
| } |
| |
| private int getNullType(final ParameterMetaData paramTypes, final int index) throws SQLException { |
| try { |
| return paramTypes.getParameterType(index); |
| } catch (final SQLException ex) { |
| // not supported by every driver. Try default. |
| return Types.NULL; |
| } |
| } |
| } |