blob: bad01430d38d02ad52aa6bd90a9ea22c3a78a6e5 [file] [log] [blame]
/***********************************************************************************************************************
* Copyright (c) 2008, 2012 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: - Juergen Schumacher (Attensity Europe GmbH) - initial API and implementation - Andreas Weber
* (Attensity Europe GmbH) - removed processing services as BPEL pipeline extensions
**********************************************************************************************************************/
package org.eclipse.smila.processing.bpel;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.utils.DOMUtils;
import org.eclipse.smila.blackboard.Blackboard;
import org.eclipse.smila.common.definitions.NameValidator;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.Value;
import org.eclipse.smila.ode.ODEServerException;
import org.eclipse.smila.processing.ProcessingException;
import org.eclipse.smila.processing.WorkflowProcessor;
import org.eclipse.smila.processing.bpel.counter.PipelinePerformanceCounter;
import org.eclipse.smila.processing.bpel.util.ConfigurationHelper;
import org.eclipse.smila.processing.bpel.util.MessageHelper;
import org.eclipse.smila.utils.config.ConfigurationUpdateWatcher;
import org.eclipse.smila.utils.config.ConfigurationUpdateWatcher.UpdateableService;
import org.w3c.dom.Element;
/**
* SMILA Workflow Processor that uses the Apache ODE BPEL engine to orchestrate SMILA pipelets in BPEL processes.
*/
public class BpelWorkflowProcessor implements WorkflowProcessor, UpdateableService {
/** error message used to notify about (potential) inconsistent cluster state after a workflow (un)deploy failure. */
private static final String ERROR_CLUSTER_INCONSISTENT =
" There may be inconsistencies in cluster deploy of this workflow. Please repeat the operation.";
/** local logger. */
private final Log _log = LogFactory.getLog(getClass());
/** reference to a BPEL engine. */
private BpelEngine _bpelEngine;
/** to store the added/updated workflow definitions. */
private WorkflowStorage _workflowStorage;
/** coordination of workflow updates in a SMILA cluster. */
private ConfigurationUpdateWatcher _updateWatcher;
/** management of request-dependent objects. */
private RequestTable _requests;
/** performance counters for measurement of pipeline performance. */
private final Map<String, PipelinePerformanceCounter> _pipelinePerformanceCounter =
new HashMap<String, PipelinePerformanceCounter>();
/** names of pipelines callable from outside. */
private final Collection<String> _externalWorkflowNames = new TreeSet<String>();
/*** process methods use read lock, deactivate needs write lock. */
private final ReadWriteLock _lock = new ReentrantReadWriteLock(true);
/**
* @see org.eclipse.smila.processing.WorkflowProcessor#process(java.lang.String, org.eclipse.smila.datamodel.id.Id[])
*/
@Override
public String[] process(final String workflowName, final Blackboard blackboard, final String[] recordIds)
throws ProcessingException {
final long startTime = System.nanoTime();
_lock.readLock().lock();
final String requestId = _requests.initRequest(blackboard);
String[] resultIds = null;
boolean success = false;
final PipelinePerformanceCounter counter = _pipelinePerformanceCounter.get(workflowName);
try {
if (_bpelEngine == null) {
throw new ProcessingException("Cannot process request, because BPEL engine is not yet initialised");
}
final MessageHelper messageHelper = _requests.getMessageHelper();
final Element message = messageHelper.createMessage(blackboard, recordIds, requestId);
if (_log.isTraceEnabled()) {
_log.trace("Request: " + DOMUtils.domToString(message));
}
try {
final Element result = _bpelEngine.invoke(workflowName, message);
if (result != null) {
if (_log.isTraceEnabled()) {
_log.trace("Final Result: " + DOMUtils.domToString(result));
}
resultIds = messageHelper.parseMessage(blackboard, result);
success = true;
return resultIds;
}
return null;
} catch (final ODEServerException ex) {
final Exception pipeletEx = _requests.getPipeletException(requestId);
final String details = pipeletEx == null ? ex.getMessage() : pipeletEx.getMessage();
final Exception cause = pipeletEx == null ? ex : pipeletEx;
throw new ProcessingException("Error processing BPEL workflow " + workflowName + ": " + details, cause);
}
} catch (final ProcessingException ex) {
countError(ex, false, counter);
throw ex;
} catch (final Throwable ex) {
countError(ex, true, counter);
throw new ProcessingException(ex);
} finally {
_requests.cleanupRequest(requestId);
countInvocation(recordIds, resultIds, startTime, success, counter);
_lock.readLock().unlock();
}
}
/**
* get the pipeline names of the active BPEL processes. The pipeline name is the local part of the EPR service name.
*
* @return pipeline names of the active BPEL processes.
*/
@Override
public List<String> getWorkflowNames() {
return new ArrayList<String>(_externalWorkflowNames);
}
/**
* {@inheritDoc}
*/
@Override
public AnyMap getWorkflowDefinition(final String workflowName) throws ProcessingException {
try {
if (_bpelEngine.isPredefinedWorkflow(workflowName)) {
return _bpelEngine.getWorkflow(workflowName);
} else if (_bpelEngine.isCustomWorkflow(workflowName)) {
return _workflowStorage.getWorkflow(workflowName);
}
} catch (final Exception ex) {
throw new ProcessingException("Error reading BPEL definition for workflow '" + workflowName + "'", ex);
}
return null;
}
@Override
public void setWorkflowDefinition(final String workflowName, final AnyMap workflowDefinition)
throws ProcessingException {
checkWorkflowName(workflowName);
workflowDefinition.remove(WorkflowProcessor.WORKFLOW_READONLY);
final String timestamp = setTimestamp(workflowDefinition);
final File deployDir = _bpelEngine.validateWorkflow(workflowName, workflowDefinition);
final AnyMap backupWorkflowDefinition = _workflowStorage.getWorkflow(workflowName); // for rollback
try {
deployWorkflowDefinition(workflowName, workflowDefinition, deployDir, timestamp);
} catch (final ProcessingException ex) {
if (backupWorkflowDefinition != null) {
final String backupTimestamp =
backupWorkflowDefinition.getStringValue(WorkflowProcessor.WORKFLOW_TIMESTAMP);
try {
final File backupDeployDir = _bpelEngine.validateWorkflow(workflowName, backupWorkflowDefinition);
deployWorkflowDefinition(workflowName, backupWorkflowDefinition, backupDeployDir, backupTimestamp);
} catch (final Exception ex2) {
_log.warn("Failed to rollback workflow definition for workflow '" + workflowName + "'", ex2);
}
}
throw ex;
}
}
private void deployWorkflowDefinition(final String workflowName, final AnyMap workflowDefinition,
final File deployDir, final String timestamp) throws ProcessingException {
try {
_bpelEngine.deployWorkflowDir(workflowName, deployDir);
registerPipeline(workflowName);
} catch (final Exception e) {
final String errorMessage = "Could not deploy update for workflow '" + workflowName + "': " + e.getMessage();
throw new ProcessingException(errorMessage, e, false);
}
try {
_workflowStorage.setWorkflow(workflowName, workflowDefinition);
} catch (final Exception e) {
final String errorMessage =
"Error while storing workflow definition for '" + workflowName + "'." + ERROR_CLUSTER_INCONSISTENT;
throw new ProcessingException(errorMessage, e, true); // recoverable error, client should repeat
}
try {
_updateWatcher.configUpdated(workflowName, timestamp);
} catch (final Exception e) {
final String errorMessage =
"Eror while notifying cluster about update of workflow '" + workflowName + "'."
+ ERROR_CLUSTER_INCONSISTENT;
throw new ProcessingException(errorMessage, e, true); // recoverable error, client should repeat
}
}
@Override
public void deleteWorkflowDefinition(final String workflowName) throws ProcessingException {
checkWorkflowName(workflowName);
final AnyMap oldWorkflowAny = _workflowStorage.getWorkflow(workflowName); // needed for possible rollback
// if objectstore delete fails: no problem, local+cluster continue with old version.
_workflowStorage.deleteWorkflow(workflowName);
try {
_updateWatcher.configDeleted(workflowName);
} catch (final Exception e) {
String errorMessage = "Error deleting workflow '" + workflowName + "'.";
_log.warn(errorMessage + " Error while notifying update watcher.", e);
// watcher update failed: local+cluster still working with old workflow version,
// but workflow is deleted in objectstore, try to rollback:
try {
if (oldWorkflowAny != null) {
_workflowStorage.setWorkflow(workflowName, oldWorkflowAny);
}
} catch (final ProcessingException e2) {
errorMessage += ERROR_CLUSTER_INCONSISTENT;
_log.error(errorMessage + " Error rolling back deletion of workflow.", e2);
}
throw new ProcessingException(errorMessage, e, true); // recoverable
}
try {
_bpelEngine.undeployWorkflow(workflowName);
unregisterPipeline(workflowName);
} catch (final ProcessingException e) {
// ODE undeploy failed: local+cluster (potentially) would have different states, but that should not happen
final String errorMessage = "Error deleting workflow '" + workflowName + "'." + ERROR_CLUSTER_INCONSISTENT;
_log.error(errorMessage + " Error undeploying workflow.", e);
throw new ProcessingException(errorMessage, e, true); // recoverable (?)
}
}
@Override
public void synchronizeConfiguration(final String workflowName, final boolean isDeleted) {
try {
checkWorkflowName(workflowName);
final AnyMap workflowDefinition = _workflowStorage.getWorkflow(workflowName);
if (isDeleted) {
_bpelEngine.undeployWorkflow(workflowName);
unregisterPipeline(workflowName);
} else {
if (workflowDefinition == null) {
throw new ProcessingException("Definition for workflow '" + workflowName + "' not found for reload.");
}
_bpelEngine.deployWorkflow(workflowName, workflowDefinition);
registerPipeline(workflowName);
}
} catch (final ProcessingException ex) {
throw new RuntimeException(ex);
}
}
/** OSGi Declarative Services service activation method. Initializes BPEL engine. */
protected void activate() {
_lock.writeLock().lock();
try {
_updateWatcher.registerService(this);
initializeBpel();
_updateWatcher.startPolling();
_updateWatcher.startWatching();
} catch (final Throwable ex) {
// necessary to prevent automatic restarts of service before problem can be fixed.
_log.error("Start of BPEL workflow service failed: Unknown fatal error. "
+ "Service is non-functional, please fix problem and restart bundle", ex);
} finally {
_lock.writeLock().unlock();
}
}
/** OSGi Declarative Services service deactivation method. Shuts down BPEL engine. */
protected void deactivate() {
_lock.writeLock().lock();
try {
if (_updateWatcher != null) {
_updateWatcher.stopPolling();
_updateWatcher.stopWatching();
}
} finally {
_lock.writeLock().unlock();
}
}
/** method for DS to set a service reference. */
public void setWorkflowStorage(final WorkflowStorage workflowStorage) {
_workflowStorage = workflowStorage;
}
/** method for DS to unset a service reference. */
public void unsetWorkflowStorage(final WorkflowStorage workflowStorage) {
if (_workflowStorage == workflowStorage) {
_workflowStorage = null;
}
}
/** method for DS to set a service reference. */
public void setUpdateWatcher(final ConfigurationUpdateWatcher updateWatcher) {
_updateWatcher = updateWatcher;
}
/** method for DS to unset a service reference. */
public void unsetUpdateWatcher(final ConfigurationUpdateWatcher updateWatcher) {
if (_updateWatcher == updateWatcher) {
_updateWatcher = null;
}
}
/** method for DS to set a service reference. */
public void setBpelEngine(final BpelEngine bpelEngine) {
_bpelEngine = bpelEngine;
}
/** method for DS to unset a service reference. */
public void unsetBpelEngine(final BpelEngine bpelEngine) {
if (_bpelEngine == bpelEngine) {
_bpelEngine = null;
}
}
/** method for DS to set a service reference. */
public void setRequestTable(final RequestTable requests) {
_requests = requests;
}
/** method for DS to unset a service reference. */
public void unsetRequestTable(final RequestTable requests) {
if (_requests == requests) {
_requests = null;
}
}
/**
* initialize BPEL engine, initial deploy of existing BPEL pipelines. *
*
* @throws IOException
* error reading the configuration
* @throws ODEServerException
* error initializing the ODE server
*/
private void initializeBpel() throws IOException, ODEServerException {
_log.debug("Initialize BPEL engine");
try {
deployPredefinedPipelines();
} catch (final ProcessingException ex) {
_log.error("Deployment of predefined pipelines failed. Installing custom pipelines should still work.", ex);
}
try {
deployCustomPipelines();
} catch (final ProcessingException ex) {
_log.error("Deployment of existing custom pipelines failed. "
+ "Installing additional custom pipelines should still work.", ex);
}
_log.debug("Initialization of BPEL engine successful");
}
/**
* deploy predefined BPEL pipelines.
*
* @throws ProcessingException
* error initializing pipelines.
* @throws IOException
* error creating deployment directory.
*/
private void deployPredefinedPipelines() throws ProcessingException, IOException {
final String pipelineDir = ConfigurationHelper.getPipelineDirectory();
final Collection<String> workflowNames = _bpelEngine.deployPredefinedWorkflows(pipelineDir);
for (final String workflowName : workflowNames) {
_log.info("Registering predefined pipeline " + workflowName);
registerPipeline(workflowName);
}
}
/**
* deploy BPEL pipelines stored in objectstore.
*
* @throws ProcessingException
* error initializing pipelines.
* @throws IOExceptionfinal
* error creating deployment directory.
*/
private void deployCustomPipelines() throws ProcessingException, IOException {
final Collection<String> workflowNames = _workflowStorage.getWorkflowNames();
// JAXB used for parsing pipelet configs from BPEL needs this to be set.
final ClassLoader oldCL = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
for (final String workflowName : workflowNames) {
try {
final AnyMap workflowDefinition = _workflowStorage.getWorkflow(workflowName);
final String timestamp = workflowDefinition.getStringValue(WORKFLOW_TIMESTAMP);
_bpelEngine.deployWorkflow(workflowName, workflowDefinition);
_updateWatcher.configLoadedOnStart(workflowName, timestamp);
_log.info("Registering custom pipeline " + workflowName);
registerPipeline(workflowName);
} catch (final ProcessingException ex) {
_log.warn("Custom pipeline " + workflowName + " could not be deployed.", ex);
}
}
Thread.currentThread().setContextClassLoader(oldCL);
}
/** register pipeline process name. */
private void registerPipeline(final String pipelineName) {
_pipelinePerformanceCounter.put(pipelineName, new PipelinePerformanceCounter(pipelineName));
try {
NameValidator.checkName(pipelineName);
_externalWorkflowNames.add(pipelineName);
} catch (final Exception ex) {
_log.warn("Pipeline name '" + pipelineName + "' is not valid and will not be accessible for external calls: "
+ ex.getMessage());
}
}
/** unregister pipeline process name. */
private void unregisterPipeline(final String pipelineName) {
_pipelinePerformanceCounter.remove(pipelineName);
_externalWorkflowNames.remove(pipelineName);
}
/**
* @throws ProcessingException
* if workflowName is null or refers to a predefined workflow.
*/
private void checkWorkflowName(final String workflowName) throws ProcessingException {
try {
NameValidator.checkName(workflowName);
} catch (final Exception ex) {
throw new ProcessingException("Workflow name '" + workflowName + "' is not valid: " + ex.getMessage());
}
if (_bpelEngine.isPredefinedWorkflow(workflowName)) {
throw new ProcessingException("Workflow '" + workflowName + "' can not be changed, cause it's predefined");
}
}
/** check if workflow definition has a timestamp and create one, if not. */
private String setTimestamp(final AnyMap workflowDefinition) {
final Value value = workflowDefinition.getFactory().createDateTimeValue(new Date());
workflowDefinition.put(WorkflowProcessor.WORKFLOW_TIMESTAMP, value);
return value.asString();
}
/** add statistics about a completed invocation to the pipeline counter. */
private void countInvocation(final String[] incomingIds, final String[] outgoingIds, final long startTime,
final boolean success, final PipelinePerformanceCounter counter) {
if (counter != null) {
counter.countInvocationNanos(System.nanoTime() - startTime, success, ArrayUtils.getLength(incomingIds),
ArrayUtils.getLength(outgoingIds));
}
}
/** add an error to the pipeline counter. */
private void countError(final Throwable ex, final boolean isCritical, final PipelinePerformanceCounter counter) {
if (counter != null) {
counter.addError(ex, isCritical);
}
}
}