| /*********************************************************************************************************************** |
| * Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the |
| * accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this |
| * distribution, and is available at http://www.eclipse.org/legal/epl-v10.html |
| * |
| * Contributors: Daniel Stucky (empolis GmbH) - initial API and implementation Andreas Weber (Attensity Europe GmbH) - |
| * data model simplification |
| **********************************************************************************************************************/ |
| package org.eclipse.smila.connectivity.bulkbuilder; |
| |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.UUID; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.eclipse.smila.bulkbuilder.BulkbuilderException; |
| import org.eclipse.smila.bulkbuilder.BulkbuilderService; |
| import org.eclipse.smila.bulkbuilder.WorkflowRunInfo; |
| import org.eclipse.smila.connectivity.ConnectivityException; |
| import org.eclipse.smila.connectivity.ConnectivityManager; |
| import org.eclipse.smila.datamodel.Record; |
| import org.eclipse.smila.jobmanager.JobManager; |
| import org.eclipse.smila.jobmanager.JobRunInfo; |
| import org.eclipse.smila.jobmanager.JobState; |
| import org.eclipse.smila.utils.MaybeRecoverableException; |
| |
| /** |
| * JobManager/Bulkbuilder based implementation of ConnectivityManager interface. |
| */ |
| public class ConnectivityManagerImpl implements ConnectivityManager { |
| |
| /** The LOG. */ |
| private final Log _log = LogFactory.getLog(ConnectivityManagerImpl.class); |
| |
| /** Reference to the BulkBuilder. */ |
| private BulkbuilderService _bulkbuilder; |
| |
| /** Reference to the JobMnager. */ |
| private JobManager _jobManager; |
| |
| /** |
| * Default Constructor. |
| */ |
| public ConnectivityManagerImpl() { |
| if (_log.isTraceEnabled()) { |
| _log.trace("Creating ConnectivityManagerImpl"); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void checkJobIsActive(final String jobName) throws ConnectivityException { |
| final JobRunInfo info; |
| try { |
| info = _jobManager.getJobRunInfo(jobName); |
| } catch (final Exception ex) { |
| throw new ConnectivityException("Cannot check state of job '" + jobName + "'", ex); |
| } |
| if (info == null || info.getState() != JobState.RUNNING) { |
| throw new ConnectivityException("Job '" + jobName |
| + "' is not defined or not active, create and start it before trying to submit records.", false); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public int add(final Record[] records, final String jobName) throws ConnectivityException { |
| final Map<String, Exception> exceptionMap = new HashMap<String, Exception>(); // key: record id |
| // add records as micro bulk |
| final String microBulkId = UUID.randomUUID().toString(); |
| for (final Record record : records) { |
| try { |
| _bulkbuilder.addToMicroBulk(jobName, record, microBulkId); |
| } catch (final MaybeRecoverableException e) { |
| final String msg = "Error while adding record with id '" + record.getId() + "' to bulkbuilder"; |
| if (e.isRecoverable()) { |
| _log.warn(msg, e); |
| exceptionMap.put(record.getId(), e); |
| } else { |
| throw new ConnectivityException(msg, e, false); |
| } |
| } catch (final Exception e) { |
| throw new ConnectivityException(e, false); |
| } |
| try { |
| _bulkbuilder.finishMicroBulk(jobName, microBulkId); |
| } catch (final BulkbuilderException e) { |
| final String message = "Error while adding " + records.length + " records to bulkbuilder"; |
| _log.error(message, e); |
| throw new ConnectivityException(message, e); |
| } |
| } |
| if (!exceptionMap.isEmpty()) { |
| _log.warn("Added " + (records.length - exceptionMap.size()) + " of " + records.length |
| + " records. Exceptions occured: " + exceptionMap); |
| } |
| return records.length - exceptionMap.size(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public int delete(final Record[] records, final String jobName) throws ConnectivityException { |
| final Map<String, Exception> exceptionMap = new HashMap<String, Exception>(); // key: record id |
| WorkflowRunInfo info = null; |
| for (final Record record : records) { |
| try { |
| info = _bulkbuilder.deleteRecord(jobName, record); |
| } catch (final BulkbuilderException e) { |
| final String msg = "Error while deleting record with id '" + record.getId() + "' to bulkbuilder"; |
| if (e.isRecoverable()) { |
| _log.warn(msg, e); |
| exceptionMap.put(record.getId(), e); |
| } else { |
| throw new ConnectivityException(msg, e, false); |
| } |
| } catch (final Exception e) { |
| _log.warn("Error while deleting record with id '" + record.getId() + "'", e); |
| exceptionMap.put(record.getId(), e); |
| } |
| } |
| if (!exceptionMap.isEmpty()) { |
| _log.warn("Deleted " + (records.length - exceptionMap.size()) + " of " + records.length |
| + " records. Exceptions occured: " + exceptionMap); |
| } else { |
| if (_log.isDebugEnabled()) { |
| _log.debug("Deleted " + records.length + " records for job run " + info.getJobId()); |
| } |
| } |
| return records.length - exceptionMap.size(); |
| } |
| |
| /** |
| * Sets the bulkbuilder. |
| * |
| * @param service |
| * the new bulkbuilder |
| */ |
| public void setBulkbuilderService(final BulkbuilderService service) { |
| _bulkbuilder = service; |
| } |
| |
| /** |
| * Unset the bulkbuilder. |
| * |
| * @param service |
| * the bulkbuilder to unset |
| */ |
| public void unsetBulkbuilderService(final BulkbuilderService service) { |
| if (_bulkbuilder == service) { |
| _bulkbuilder = null; |
| } |
| } |
| |
| /** set jobmanager reference. Used by OSGi DS. */ |
| public void setJobManager(final JobManager jobManager) { |
| _jobManager = jobManager; |
| } |
| |
| /** unset jobmanager reference. Used by OSGi DS. */ |
| public void unsetJobManager(final JobManager jobManager) { |
| if (_jobManager == jobManager) { |
| _jobManager = null; |
| } |
| } |
| } |