/*********************************************************************************************************************** | |
* Copyright (c) 2008, 2012 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the | |
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this | |
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: - Juergen Schumacher (Attensity Europe GmbH) - initial API and implementation - Andreas Weber | |
* (Attensity Europe GmbH) - removed processing services as BPEL pipeline extensions | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.processing.bpel; | |
import java.io.File; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.Collection; | |
import java.util.Date; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.TreeSet; | |
import java.util.concurrent.locks.ReadWriteLock; | |
import java.util.concurrent.locks.ReentrantReadWriteLock; | |
import org.apache.commons.lang.ArrayUtils; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.apache.ode.utils.DOMUtils; | |
import org.eclipse.smila.blackboard.Blackboard; | |
import org.eclipse.smila.common.definitions.NameValidator; | |
import org.eclipse.smila.datamodel.AnyMap; | |
import org.eclipse.smila.datamodel.Value; | |
import org.eclipse.smila.ode.ODEServerException; | |
import org.eclipse.smila.processing.ProcessingException; | |
import org.eclipse.smila.processing.WorkflowProcessor; | |
import org.eclipse.smila.processing.bpel.counter.PipelinePerformanceCounter; | |
import org.eclipse.smila.processing.bpel.util.ConfigurationHelper; | |
import org.eclipse.smila.processing.bpel.util.MessageHelper; | |
import org.eclipse.smila.utils.config.ConfigurationUpdateWatcher; | |
import org.eclipse.smila.utils.config.ConfigurationUpdateWatcher.UpdateableService; | |
import org.w3c.dom.Element; | |
/** | |
* SMILA Workflow Processor that uses the Apache ODE BPEL engine to orchestrate SMILA pipelets in BPEL processes. | |
*/ | |
public class BpelWorkflowProcessor implements WorkflowProcessor, UpdateableService { | |
/** error message used to notify about (potential) inconsistent cluster state after a workflow (un)deploy failure. */ | |
private static final String ERROR_CLUSTER_INCONSISTENT = | |
" There may be inconsistencies in cluster deploy of this workflow. Please repeat the operation."; | |
/** local logger. */ | |
private final Log _log = LogFactory.getLog(getClass()); | |
/** reference to a BPEL engine. */ | |
private BpelEngine _bpelEngine; | |
/** to store the added/updated workflow definitions. */ | |
private WorkflowStorage _workflowStorage; | |
/** coordination of workflow updates in a SMILA cluster. */ | |
private ConfigurationUpdateWatcher _updateWatcher; | |
/** management of request-dependent objects. */ | |
private RequestTable _requests; | |
/** performance counters for measurement of pipeline performance. */ | |
private final Map<String, PipelinePerformanceCounter> _pipelinePerformanceCounter = | |
new HashMap<String, PipelinePerformanceCounter>(); | |
/** names of pipelines callable from outside. */ | |
private final Collection<String> _externalWorkflowNames = new TreeSet<String>(); | |
/*** process methods use read lock, deactivate needs write lock. */ | |
private final ReadWriteLock _lock = new ReentrantReadWriteLock(true); | |
/** | |
* @see org.eclipse.smila.processing.WorkflowProcessor#process(java.lang.String, org.eclipse.smila.datamodel.id.Id[]) | |
*/ | |
@Override | |
public String[] process(final String workflowName, final Blackboard blackboard, final String[] recordIds) | |
throws ProcessingException { | |
final long startTime = System.nanoTime(); | |
_lock.readLock().lock(); | |
final String requestId = _requests.initRequest(blackboard); | |
String[] resultIds = null; | |
boolean success = false; | |
final PipelinePerformanceCounter counter = _pipelinePerformanceCounter.get(workflowName); | |
try { | |
if (_bpelEngine == null) { | |
throw new ProcessingException("Cannot process request, because BPEL engine is not yet initialised"); | |
} | |
final MessageHelper messageHelper = _requests.getMessageHelper(); | |
final Element message = messageHelper.createMessage(blackboard, recordIds, requestId); | |
if (_log.isTraceEnabled()) { | |
_log.trace("Request: " + DOMUtils.domToString(message)); | |
} | |
try { | |
final Element result = _bpelEngine.invoke(workflowName, message); | |
if (result != null) { | |
if (_log.isTraceEnabled()) { | |
_log.trace("Final Result: " + DOMUtils.domToString(result)); | |
} | |
resultIds = messageHelper.parseMessage(blackboard, result); | |
success = true; | |
return resultIds; | |
} | |
return null; | |
} catch (final ODEServerException ex) { | |
final Exception pipeletEx = _requests.getPipeletException(requestId); | |
final String details = pipeletEx == null ? ex.getMessage() : pipeletEx.getMessage(); | |
final Exception cause = pipeletEx == null ? ex : pipeletEx; | |
throw new ProcessingException("Error processing BPEL workflow " + workflowName + ": " + details, cause); | |
} | |
} catch (final ProcessingException ex) { | |
countError(ex, false, counter); | |
throw ex; | |
} catch (final Throwable ex) { | |
countError(ex, true, counter); | |
throw new ProcessingException(ex); | |
} finally { | |
_requests.cleanupRequest(requestId); | |
countInvocation(recordIds, resultIds, startTime, success, counter); | |
_lock.readLock().unlock(); | |
} | |
} | |
/** | |
* get the pipeline names of the active BPEL processes. The pipeline name is the local part of the EPR service name. | |
* | |
* @return pipeline names of the active BPEL processes. | |
*/ | |
@Override | |
public List<String> getWorkflowNames() { | |
return new ArrayList<String>(_externalWorkflowNames); | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public AnyMap getWorkflowDefinition(final String workflowName) throws ProcessingException { | |
try { | |
if (_bpelEngine.isPredefinedWorkflow(workflowName)) { | |
return _bpelEngine.getWorkflow(workflowName); | |
} else if (_bpelEngine.isCustomWorkflow(workflowName)) { | |
return _workflowStorage.getWorkflow(workflowName); | |
} | |
} catch (final Exception ex) { | |
throw new ProcessingException("Error reading BPEL definition for workflow '" + workflowName + "'", ex); | |
} | |
return null; | |
} | |
@Override | |
public void setWorkflowDefinition(final String workflowName, final AnyMap workflowDefinition) | |
throws ProcessingException { | |
checkWorkflowName(workflowName); | |
workflowDefinition.remove(WorkflowProcessor.WORKFLOW_READONLY); | |
final String timestamp = setTimestamp(workflowDefinition); | |
final File deployDir = _bpelEngine.validateWorkflow(workflowName, workflowDefinition); | |
final AnyMap backupWorkflowDefinition = _workflowStorage.getWorkflow(workflowName); // for rollback | |
try { | |
deployWorkflowDefinition(workflowName, workflowDefinition, deployDir, timestamp); | |
} catch (final ProcessingException ex) { | |
if (backupWorkflowDefinition != null) { | |
final String backupTimestamp = | |
backupWorkflowDefinition.getStringValue(WorkflowProcessor.WORKFLOW_TIMESTAMP); | |
try { | |
final File backupDeployDir = _bpelEngine.validateWorkflow(workflowName, backupWorkflowDefinition); | |
deployWorkflowDefinition(workflowName, backupWorkflowDefinition, backupDeployDir, backupTimestamp); | |
} catch (final Exception ex2) { | |
_log.warn("Failed to rollback workflow definition for workflow '" + workflowName + "'", ex2); | |
} | |
} | |
throw ex; | |
} | |
} | |
private void deployWorkflowDefinition(final String workflowName, final AnyMap workflowDefinition, | |
final File deployDir, final String timestamp) throws ProcessingException { | |
try { | |
_bpelEngine.deployWorkflowDir(workflowName, deployDir); | |
registerPipeline(workflowName); | |
} catch (final Exception e) { | |
final String errorMessage = "Could not deploy update for workflow '" + workflowName + "': " + e.getMessage(); | |
throw new ProcessingException(errorMessage, e, false); | |
} | |
try { | |
_workflowStorage.setWorkflow(workflowName, workflowDefinition); | |
} catch (final Exception e) { | |
final String errorMessage = | |
"Error while storing workflow definition for '" + workflowName + "'." + ERROR_CLUSTER_INCONSISTENT; | |
throw new ProcessingException(errorMessage, e, true); // recoverable error, client should repeat | |
} | |
try { | |
_updateWatcher.configUpdated(workflowName, timestamp); | |
} catch (final Exception e) { | |
final String errorMessage = | |
"Eror while notifying cluster about update of workflow '" + workflowName + "'." | |
+ ERROR_CLUSTER_INCONSISTENT; | |
throw new ProcessingException(errorMessage, e, true); // recoverable error, client should repeat | |
} | |
} | |
@Override | |
public void deleteWorkflowDefinition(final String workflowName) throws ProcessingException { | |
checkWorkflowName(workflowName); | |
final AnyMap oldWorkflowAny = _workflowStorage.getWorkflow(workflowName); // needed for possible rollback | |
// if objectstore delete fails: no problem, local+cluster continue with old version. | |
_workflowStorage.deleteWorkflow(workflowName); | |
try { | |
_updateWatcher.configDeleted(workflowName); | |
} catch (final Exception e) { | |
String errorMessage = "Error deleting workflow '" + workflowName + "'."; | |
_log.warn(errorMessage + " Error while notifying update watcher.", e); | |
// watcher update failed: local+cluster still working with old workflow version, | |
// but workflow is deleted in objectstore, try to rollback: | |
try { | |
if (oldWorkflowAny != null) { | |
_workflowStorage.setWorkflow(workflowName, oldWorkflowAny); | |
} | |
} catch (final ProcessingException e2) { | |
errorMessage += ERROR_CLUSTER_INCONSISTENT; | |
_log.error(errorMessage + " Error rolling back deletion of workflow.", e2); | |
} | |
throw new ProcessingException(errorMessage, e, true); // recoverable | |
} | |
try { | |
_bpelEngine.undeployWorkflow(workflowName); | |
unregisterPipeline(workflowName); | |
} catch (final ProcessingException e) { | |
// ODE undeploy failed: local+cluster (potentially) would have different states, but that should not happen | |
final String errorMessage = "Error deleting workflow '" + workflowName + "'." + ERROR_CLUSTER_INCONSISTENT; | |
_log.error(errorMessage + " Error undeploying workflow.", e); | |
throw new ProcessingException(errorMessage, e, true); // recoverable (?) | |
} | |
} | |
@Override | |
public void synchronizeConfiguration(final String workflowName, final boolean isDeleted) { | |
try { | |
checkWorkflowName(workflowName); | |
final AnyMap workflowDefinition = _workflowStorage.getWorkflow(workflowName); | |
if (isDeleted) { | |
_bpelEngine.undeployWorkflow(workflowName); | |
unregisterPipeline(workflowName); | |
} else { | |
if (workflowDefinition == null) { | |
throw new ProcessingException("Definition for workflow '" + workflowName + "' not found for reload."); | |
} | |
_bpelEngine.deployWorkflow(workflowName, workflowDefinition); | |
registerPipeline(workflowName); | |
} | |
} catch (final ProcessingException ex) { | |
throw new RuntimeException(ex); | |
} | |
} | |
/** OSGi Declarative Services service activation method. Initializes BPEL engine. */ | |
protected void activate() { | |
_lock.writeLock().lock(); | |
try { | |
_updateWatcher.registerService(this); | |
initializeBpel(); | |
_updateWatcher.startPolling(); | |
_updateWatcher.startWatching(); | |
} catch (final Throwable ex) { | |
// necessary to prevent automatic restarts of service before problem can be fixed. | |
_log.error("Start of BPEL workflow service failed: Unknown fatal error. " | |
+ "Service is non-functional, please fix problem and restart bundle", ex); | |
} finally { | |
_lock.writeLock().unlock(); | |
} | |
} | |
/** OSGi Declarative Services service deactivation method. Shuts down BPEL engine. */ | |
protected void deactivate() { | |
_lock.writeLock().lock(); | |
try { | |
if (_updateWatcher != null) { | |
_updateWatcher.stopPolling(); | |
_updateWatcher.stopWatching(); | |
} | |
} finally { | |
_lock.writeLock().unlock(); | |
} | |
} | |
/** method for DS to set a service reference. */ | |
public void setWorkflowStorage(final WorkflowStorage workflowStorage) { | |
_workflowStorage = workflowStorage; | |
} | |
/** method for DS to unset a service reference. */ | |
public void unsetWorkflowStorage(final WorkflowStorage workflowStorage) { | |
if (_workflowStorage == workflowStorage) { | |
_workflowStorage = null; | |
} | |
} | |
/** method for DS to set a service reference. */ | |
public void setUpdateWatcher(final ConfigurationUpdateWatcher updateWatcher) { | |
_updateWatcher = updateWatcher; | |
} | |
/** method for DS to unset a service reference. */ | |
public void unsetUpdateWatcher(final ConfigurationUpdateWatcher updateWatcher) { | |
if (_updateWatcher == updateWatcher) { | |
_updateWatcher = null; | |
} | |
} | |
/** method for DS to set a service reference. */ | |
public void setBpelEngine(final BpelEngine bpelEngine) { | |
_bpelEngine = bpelEngine; | |
} | |
/** method for DS to unset a service reference. */ | |
public void unsetBpelEngine(final BpelEngine bpelEngine) { | |
if (_bpelEngine == bpelEngine) { | |
_bpelEngine = null; | |
} | |
} | |
/** method for DS to set a service reference. */ | |
public void setRequestTable(final RequestTable requests) { | |
_requests = requests; | |
} | |
/** method for DS to unset a service reference. */ | |
public void unsetRequestTable(final RequestTable requests) { | |
if (_requests == requests) { | |
_requests = null; | |
} | |
} | |
/** | |
* initialize BPEL engine, initial deploy of existing BPEL pipelines. * | |
* | |
* @throws IOException | |
* error reading the configuration | |
* @throws ODEServerException | |
* error initializing the ODE server | |
*/ | |
private void initializeBpel() throws IOException, ODEServerException { | |
_log.debug("Initialize BPEL engine"); | |
try { | |
deployPredefinedPipelines(); | |
} catch (final ProcessingException ex) { | |
_log.error("Deployment of predefined pipelines failed. Installing custom pipelines should still work.", ex); | |
} | |
try { | |
deployCustomPipelines(); | |
} catch (final ProcessingException ex) { | |
_log.error("Deployment of existing custom pipelines failed. " | |
+ "Installing additional custom pipelines should still work.", ex); | |
} | |
_log.debug("Initialization of BPEL engine successful"); | |
} | |
/** | |
* deploy predefined BPEL pipelines. | |
* | |
* @throws ProcessingException | |
* error initializing pipelines. | |
* @throws IOException | |
* error creating deployment directory. | |
*/ | |
private void deployPredefinedPipelines() throws ProcessingException, IOException { | |
final String pipelineDir = ConfigurationHelper.getPipelineDirectory(); | |
final Collection<String> workflowNames = _bpelEngine.deployPredefinedWorkflows(pipelineDir); | |
for (final String workflowName : workflowNames) { | |
_log.info("Registering predefined pipeline " + workflowName); | |
registerPipeline(workflowName); | |
} | |
} | |
/** | |
* deploy BPEL pipelines stored in objectstore. | |
* | |
* @throws ProcessingException | |
* error initializing pipelines. | |
* @throws IOExceptionfinal | |
* error creating deployment directory. | |
*/ | |
private void deployCustomPipelines() throws ProcessingException, IOException { | |
final Collection<String> workflowNames = _workflowStorage.getWorkflowNames(); | |
// JAXB used for parsing pipelet configs from BPEL needs this to be set. | |
final ClassLoader oldCL = Thread.currentThread().getContextClassLoader(); | |
Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); | |
for (final String workflowName : workflowNames) { | |
try { | |
final AnyMap workflowDefinition = _workflowStorage.getWorkflow(workflowName); | |
final String timestamp = workflowDefinition.getStringValue(WORKFLOW_TIMESTAMP); | |
_bpelEngine.deployWorkflow(workflowName, workflowDefinition); | |
_updateWatcher.configLoadedOnStart(workflowName, timestamp); | |
_log.info("Registering custom pipeline " + workflowName); | |
registerPipeline(workflowName); | |
} catch (final ProcessingException ex) { | |
_log.warn("Custom pipeline " + workflowName + " could not be deployed.", ex); | |
} | |
} | |
Thread.currentThread().setContextClassLoader(oldCL); | |
} | |
/** register pipeline process name. */ | |
private void registerPipeline(final String pipelineName) { | |
_pipelinePerformanceCounter.put(pipelineName, new PipelinePerformanceCounter(pipelineName)); | |
try { | |
NameValidator.checkName(pipelineName); | |
_externalWorkflowNames.add(pipelineName); | |
} catch (final Exception ex) { | |
_log.warn("Pipeline name '" + pipelineName + "' is not valid and will not be accessible for external calls: " | |
+ ex.getMessage()); | |
} | |
} | |
/** unregister pipeline process name. */ | |
private void unregisterPipeline(final String pipelineName) { | |
_pipelinePerformanceCounter.remove(pipelineName); | |
_externalWorkflowNames.remove(pipelineName); | |
} | |
/** | |
* @throws ProcessingException | |
* if workflowName is null or refers to a predefined workflow. | |
*/ | |
private void checkWorkflowName(final String workflowName) throws ProcessingException { | |
try { | |
NameValidator.checkName(workflowName); | |
} catch (final Exception ex) { | |
throw new ProcessingException("Workflow name '" + workflowName + "' is not valid: " + ex.getMessage()); | |
} | |
if (_bpelEngine.isPredefinedWorkflow(workflowName)) { | |
throw new ProcessingException("Workflow '" + workflowName + "' can not be changed, cause it's predefined"); | |
} | |
} | |
/** check if workflow definition has a timestamp and create one, if not. */ | |
private String setTimestamp(final AnyMap workflowDefinition) { | |
final Value value = workflowDefinition.getFactory().createDateTimeValue(new Date()); | |
workflowDefinition.put(WorkflowProcessor.WORKFLOW_TIMESTAMP, value); | |
return value.asString(); | |
} | |
/** add statistics about a completed invocation to the pipeline counter. */ | |
private void countInvocation(final String[] incomingIds, final String[] outgoingIds, final long startTime, | |
final boolean success, final PipelinePerformanceCounter counter) { | |
if (counter != null) { | |
counter.countInvocationNanos(System.nanoTime() - startTime, success, ArrayUtils.getLength(incomingIds), | |
ArrayUtils.getLength(outgoingIds)); | |
} | |
} | |
/** add an error to the pipeline counter. */ | |
private void countError(final Throwable ex, final boolean isCritical, final PipelinePerformanceCounter counter) { | |
if (counter != null) { | |
counter.addError(ex, isCritical); | |
} | |
} | |
} |