blob: 2c58690379206a0f8e43c605a040c59cfe47c2d2 [file] [log] [blame]
/***********************************************************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Daniel Stucky (empolis GmbH) - initial API and implementation Andreas Weber (Attensity Europe GmbH) -
* data model simplification
**********************************************************************************************************************/
package org.eclipse.smila.connectivity.bulkbuilder;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.smila.bulkbuilder.BulkbuilderException;
import org.eclipse.smila.bulkbuilder.BulkbuilderService;
import org.eclipse.smila.bulkbuilder.WorkflowRunInfo;
import org.eclipse.smila.connectivity.ConnectivityException;
import org.eclipse.smila.connectivity.ConnectivityManager;
import org.eclipse.smila.datamodel.Record;
import org.eclipse.smila.jobmanager.JobManager;
import org.eclipse.smila.jobmanager.JobRunInfo;
import org.eclipse.smila.jobmanager.JobState;
import org.eclipse.smila.utils.MaybeRecoverableException;
/**
* JobManager/Bulkbuilder based implementation of ConnectivityManager interface.
*/
public class ConnectivityManagerImpl implements ConnectivityManager {
/** The LOG. */
private final Log _log = LogFactory.getLog(ConnectivityManagerImpl.class);
/** Reference to the BulkBuilder. */
private BulkbuilderService _bulkbuilder;
/** Reference to the JobMnager. */
private JobManager _jobManager;
/**
* Default Constructor.
*/
public ConnectivityManagerImpl() {
if (_log.isTraceEnabled()) {
_log.trace("Creating ConnectivityManagerImpl");
}
}
/** {@inheritDoc} */
@Override
public void checkJobIsActive(final String jobName) throws ConnectivityException {
final JobRunInfo info;
try {
info = _jobManager.getJobRunInfo(jobName);
} catch (final Exception ex) {
throw new ConnectivityException("Cannot check state of job '" + jobName + "'", ex);
}
if (info == null || info.getState() != JobState.RUNNING) {
throw new ConnectivityException("Job '" + jobName
+ "' is not defined or not active, create and start it before trying to submit records.", false);
}
}
/** {@inheritDoc} */
@Override
public int add(final Record[] records, final String jobName) throws ConnectivityException {
final Map<String, Exception> exceptionMap = new HashMap<String, Exception>(); // key: record id
// add records as micro bulk
final String microBulkId = UUID.randomUUID().toString();
for (final Record record : records) {
try {
_bulkbuilder.addToMicroBulk(jobName, record, microBulkId);
} catch (final MaybeRecoverableException e) {
final String msg = "Error while adding record with id '" + record.getId() + "' to bulkbuilder";
if (e.isRecoverable()) {
_log.warn(msg, e);
exceptionMap.put(record.getId(), e);
} else {
throw new ConnectivityException(msg, e, false);
}
} catch (final Exception e) {
throw new ConnectivityException(e, false);
}
try {
_bulkbuilder.finishMicroBulk(jobName, microBulkId);
} catch (final BulkbuilderException e) {
final String message = "Error while adding " + records.length + " records to bulkbuilder";
_log.error(message, e);
throw new ConnectivityException(message, e);
}
}
if (!exceptionMap.isEmpty()) {
_log.warn("Added " + (records.length - exceptionMap.size()) + " of " + records.length
+ " records. Exceptions occured: " + exceptionMap);
}
return records.length - exceptionMap.size();
}
/** {@inheritDoc} */
@Override
public int delete(final Record[] records, final String jobName) throws ConnectivityException {
final Map<String, Exception> exceptionMap = new HashMap<String, Exception>(); // key: record id
WorkflowRunInfo info = null;
for (final Record record : records) {
try {
info = _bulkbuilder.deleteRecord(jobName, record);
} catch (final BulkbuilderException e) {
final String msg = "Error while deleting record with id '" + record.getId() + "' to bulkbuilder";
if (e.isRecoverable()) {
_log.warn(msg, e);
exceptionMap.put(record.getId(), e);
} else {
throw new ConnectivityException(msg, e, false);
}
} catch (final Exception e) {
_log.warn("Error while deleting record with id '" + record.getId() + "'", e);
exceptionMap.put(record.getId(), e);
}
}
if (!exceptionMap.isEmpty()) {
_log.warn("Deleted " + (records.length - exceptionMap.size()) + " of " + records.length
+ " records. Exceptions occured: " + exceptionMap);
} else {
if (_log.isDebugEnabled()) {
_log.debug("Deleted " + records.length + " records for job run " + info.getJobId());
}
}
return records.length - exceptionMap.size();
}
/**
* Sets the bulkbuilder.
*
* @param service
* the new bulkbuilder
*/
public void setBulkbuilderService(final BulkbuilderService service) {
_bulkbuilder = service;
}
/**
* Unset the bulkbuilder.
*
* @param service
* the bulkbuilder to unset
*/
public void unsetBulkbuilderService(final BulkbuilderService service) {
if (_bulkbuilder == service) {
_bulkbuilder = null;
}
}
/** set jobmanager reference. Used by OSGi DS. */
public void setJobManager(final JobManager jobManager) {
_jobManager = jobManager;
}
/** unset jobmanager reference. Used by OSGi DS. */
public void unsetJobManager(final JobManager jobManager) {
if (_jobManager == jobManager) {
_jobManager = null;
}
}
}