blob: 527941bb91f2e634a342218c03235674bbd615c2 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2009 empolis GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Juergen Schumacher (empolis GmbH) - initial API and implementation
*******************************************************************************/
package org.eclipse.smila.ode;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.sql.DataSource;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import javax.xml.namespace.QName;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
import org.apache.ode.bpel.engine.BpelServerImpl;
import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.BpelEventListener;
import org.apache.ode.bpel.iapi.BpelServer;
import org.apache.ode.bpel.iapi.InvocationStyle;
import org.apache.ode.bpel.iapi.Message;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
import org.apache.ode.bpel.iapi.ProcessConf;
import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.ode.bpel.rtrep.common.extension.AbstractExtensionBundle;
import org.apache.ode.il.EmbeddedGeronimoFactory;
import org.apache.ode.il.config.OdeConfigProperties;
import org.apache.ode.il.dbutil.Database;
import org.apache.ode.il.dbutil.DatabaseConfigException;
import org.apache.ode.scheduler.simple.JdbcDelegate;
import org.apache.ode.scheduler.simple.SimpleScheduler;
import org.apache.ode.store.ProcessStoreImpl;
import org.apache.ode.utils.GUID;
import org.w3c.dom.Element;
/**
* very simple ODE integration.
*
* @author jschumacher
*
*/
public class ODEServer {
/** config property for transaction timeout = "pipeline.timeout". */
public static final String PROP_PIPELINE_TIMEOUT = "pipeline.timeout";
/** default transaction timeout for transaction manager in seconds = 5 minutes. */
public static final String DEFAULT_PIPELINE_TIMEOUT = "300";
/** for conversion of timeout seconds to millis ... prevent a magic number. */
public static final int MILLIS_PER_SECOND = 1000;
/** path to SQL script that prepares the in-memory HSQLDB instance for the scheduler. */
private static final String RESOURCE_SCHEDULER_HSQLDB_SQL = "/sql/scheduler-hsqldb.sql";
/** path to SQL script that prepares the in-memory Derby instance for the scheduler. */
private static final String RESOURCE_SCHEDULER_DERBY_SQL = "/sql/scheduler-derby.sql";
/** logger for this class. */
private final Log _log = LogFactory.getLog(getClass());
/** configuration of ODE. */
private ODEConfigProperties _odeConfig;
/** the BPEL server. */
private BpelServerImpl _server;
/** store for deployed processes. */
private ProcessStoreImpl _store;
/** transaction manager. */
private TransactionManager _txManager;
/** factory for database related objects (datasources, DAO connection factories, etc.). */
private Database _database;
/** data source to use by BPEL engine. */
private DataSource _dataSource;
/** BPEL job scheduler. */
private Scheduler _scheduler;
/** DAO connection factory for BPEL objects. */
private BpelDAOConnectionFactory _daoCF;
/** timeout for BPEL pipelines in seconds. */
private int _txTimeoutMillis;
/** mapping of workflow names to BPEl files. */
private final Map<QName, File> _bpelFiles = new HashMap<QName, File>();
/**
* mapping of (deployed) process ids to process name, needed to unregister processes after undeploying them from
* store.
*/
private final Map<QName, QName> _pidsToProcessNames = new HashMap<QName, QName>();
/** mapping of process names to their current deployment unit. */
private final Map<QName, File> _processNamesToUnits = new HashMap<QName, File>();
/**
* maintain a lock for each deployment unit so that we can synchronized updates and invocations on pipelines in these
* unit.
*/
private final Map<File, ReentrantReadWriteLock> _deployUnitLocks = new HashMap<File, ReentrantReadWriteLock>();
/**
* lock to prevent any pipeline invocation during calls to {@link BpelServerImpl#register(ProcessConf)} or
* {@link BpelServerImpl#unregister(ProcessConf)}, because parallel calls can lead to deadlocks.
*/
private final ReentrantReadWriteLock _serverLock = new ReentrantReadWriteLock(false);
/**
* initialize ODE BPEL engine with settings in specified config properties.
*
* @param odeConfig
* config properties for engine.
* @param contextFactory
* context factory to create the needed context objects.
* @throws ODEServerException
* error in initialization
*/
public ODEServer(final ODEConfigProperties odeConfig, final ODEServerContextFactory contextFactory)
throws ODEServerException {
final ClassLoader tcclBackup = prepareTccl();
try {
_odeConfig = odeConfig;
_server = new BpelServerImpl();
createTransactionManager();
createDataSource();
createScheduler();
createProcessStore(contextFactory);
initBPELServer(contextFactory);
_server.start();
} catch (final Exception ex) {
// _log.error("error in ODE initialization", ex);
throw new ODEServerException("error in ODE initialization" + ex.getMessage(), ex);
} finally {
restoreTccl(tcclBackup);
}
}
/**
* deploy BPEL processes in given directory.
*
* @param deploymentUnitDirectory
* directory to search for BPEL processes.
* @return names of deployed processes
*/
public synchronized Collection<QName> deploy(final File deploymentUnitDirectory) {
final ClassLoader tcclBackup = prepareDeployOperation(deploymentUnitDirectory);
try {
final Collection<QName> pids = _store.deploy(deploymentUnitDirectory);
final Collection<QName> processNames = registerProcesses(pids, deploymentUnitDirectory);
setProcessNamesToUnit(processNames, deploymentUnitDirectory);
return processNames;
} finally {
finishDeployOperation(deploymentUnitDirectory, tcclBackup);
}
}
/**
* update an already deployed BPEL process -> deploy new version, undeploy old version.
*
* @param deploymentUnitDirectory
* directory with new BPEL processes to deploy.
* @param undeploymentUnitDirectory
* directory with old BPEL processes to undeploy.
* @return names of new deployed processes
*/
public synchronized Collection<QName> redeploy(final File deploymentUnitDirectory,
final File undeploymentUnitDirectory) {
final ClassLoader tcclBackup = prepareTccl();
final ReentrantReadWriteLock unitLock = getDeploymentUnitLock(undeploymentUnitDirectory);
try {
unitLock.writeLock().lock();
final Collection<QName> newPids = _store.deploy(deploymentUnitDirectory);
final Collection<QName> newProcessNames = registerProcesses(newPids, deploymentUnitDirectory);
final Collection<QName> oldPids = _store.undeploy(undeploymentUnitDirectory);
unregisterProcesses(oldPids, newProcessNames);
// move unit lock to update unit.
_deployUnitLocks.put(deploymentUnitDirectory, unitLock);
setProcessNamesToUnit(newProcessNames, deploymentUnitDirectory);
_deployUnitLocks.remove(undeploymentUnitDirectory);
return newProcessNames;
} finally {
restoreTccl(tcclBackup);
unitLock.writeLock().unlock();
}
}
/**
* undeploy BPEL processes in given directory.
*
* @param deploymentUnitDirectory
* directory to search for BPEL processes.
* @return names of undeployed processes
*/
public synchronized Collection<QName> undeploy(final File deploymentUnitDirectory) {
final ClassLoader tcclBackup = prepareDeployOperation(deploymentUnitDirectory);
try {
final Collection<QName> pids = _store.undeploy(deploymentUnitDirectory);
final Collection<QName> processNames = unregisterProcesses(pids, null);
_deployUnitLocks.remove(deploymentUnitDirectory);
return processNames;
} finally {
finishDeployOperation(deploymentUnitDirectory, tcclBackup);
}
}
/**
* @param processId
* a process Id as returned by {@link #deploy(File)}.
* @return the BPEL document defining this process, if it exists. Else null.
* @throws IOException
* error reading file.
*/
public String getBpelDocument(final QName processId) throws IOException {
final File bpelFile = _bpelFiles.get(processId);
if (bpelFile == null) {
return null;
}
return FileUtils.readFileToString(bpelFile);
}
/**
* get the configuration of the named process.
*
* @param processId
* qname of proces
* @return definition of process.
*/
public ProcessConf getProcessConfiguration(final QName processId) {
return _store.getProcessConfiguration(processId);
}
/**
* register an extension bundle with the BPEL server.
*
* @param bundle
* an extension bundle.
*/
public void registerExtensionBundle(final AbstractExtensionBundle bundle) {
_server.registerExtensionBundle(bundle);
_store.setExtensionValidators(bundle.getExtensionValidators());
}
/**
* register an extension bundle with the BPEL server.
*
* @param bundle
* an extension bundle.
*/
public void unregisterExtensionBundle(final AbstractExtensionBundle bundle) {
_server.unregisterExtensionBundle(bundle.getNamespaceURI());
}
/**
* register an listener to {@link org.apache.ode.bpel.evt.BpelEvent} issued by the ODE engine during execution of
* processes.
*
* @param listener
* BPEL event listener
*/
public void registerEventListener(final BpelEventListener listener) {
_server.registerBpelEventListener(listener);
}
/**
* unregister an listener to {@link org.apache.ode.bpel.evt.BpelEvent} issued by the ODE engine during execution of
* processes.
*
* @param listener
* BPEL event listener
*/
public void unregisterEventListener(final BpelEventListener listener) {
_server.unregisterBpelEventListener(listener);
}
/**
* invoke a BPEL process.
*
* @param processName
* name of BPEL process as returned by deploy()
* @param opName
* name of the operation to execute
* @param message
* message to send to process
* @return result message
* @throws ODEServerException
* error in invocation
*/
public Element invoke(final QName processName, final String opName, final Element message)
throws ODEServerException {
MyRoleMessageExchange mex = null;
try {
mex = invokeProcess(processName, opName, message);
return processResponse(mex);
} catch (final ODEServerException ex) {
throw ex;
} catch (final RuntimeException ex) {
ex.printStackTrace();
throw new ODEServerException("Runtime exception when invoking BPEL process", ex);
} finally {
if (mex != null) {
mex.complete();
mex.release();
}
}
}
/**
* invoke process.
*
* @param processName
* process name
* @param opName
* operation name
* @param message
* message content
* @return invocation message exchange
* @throws ODEServerException
* processing error
*/
private MyRoleMessageExchange invokeProcess(final QName processName, final String opName, final Element message)
throws ODEServerException {
final ClassLoader tcclBackup = prepareInvokeOperation(processName);
try {
final String messageId = new GUID().toString();
if (_log.isDebugEnabled()) {
_log.debug("request messageID = " + messageId);
}
final MyRoleMessageExchange mex =
_server.createMessageExchange(InvocationStyle.UNRELIABLE, processName, opName, messageId);
if (mex.getOperation() == null) {
throw new ODEServerException("Did not find operation " + opName + " on service " + processName);
}
final Message request = mex.createMessage(mex.getOperation().getInput().getMessage().getQName());
request.setMessage(message);
mex.setRequest(request);
mex.setTimeout(_txTimeoutMillis);
mex.invokeBlocking();
return mex;
} catch (final TimeoutException ex) {
throw new ODEServerException("Timeout in execution of pipeline " + processName, ex);
} catch (final BpelEngineException ex) {
throw new ODEServerException("BPEL error in execution of pipeline " + processName, ex);
} finally {
finishInvokeOperation(processName, tcclBackup);
}
}
/**
* process response of invocation.
*
* @param mex
* invocation mex
* @return result content.
* @throws ODEServerException
* error in processing.
*/
private Element processResponse(final MyRoleMessageExchange mex) throws ODEServerException {
MyRoleMessageExchange responseMex = null;
try {
final QName serviceName = mex.getServiceName();
final String messageId = mex.getMessageExchangeId();
if (_log.isDebugEnabled()) {
_log.debug("response messageID = " + messageId);
}
responseMex = (MyRoleMessageExchange) _server.getMessageExchange(messageId);
switch (responseMex.getAckType()) {
case FAILURE:
throw new ODEServerException("BPEL process " + serviceName.getLocalPart()
+ " completed with failure type " + responseMex.getFailureType() + ", explanation: "
+ responseMex.getFaultExplanation());
case FAULT:
throw new ODEServerException("BPEL process " + serviceName.getLocalPart() + " completed with fault "
+ responseMex.getFault() + ", explanation: " + responseMex.getFaultExplanation());
case RESPONSE:
default:
final Message response = responseMex.getResponse();
return response.getMessage();
}
} finally {
if (responseMex != null) {
responseMex.complete();
responseMex.release();
}
}
}
/**
* shutdown the BPEL engine and all used resources.
*
*/
public void shutdown() {
try {
_server.stop();
} catch (final Exception ex) {
_server = null;
}
try {
_scheduler.stop();
_scheduler.shutdown();
} catch (final Exception ex) {
_scheduler = null;
}
try {
_daoCF.shutdown();
} catch (final Exception ex) {
_daoCF = null;
}
_dataSource = null;
try {
_database.shutdown();
} catch (final Exception ex) {
_database = null;
}
_txManager = null;
}
/**
* @return integrated BPEL engine
*/
protected BpelServer getBpelServer() {
return _server;
}
/**
* register processes in BPEL server, use server lock to prevent any pipeline invocation at the same time.
*
* @return process name
*/
private Collection<QName> registerProcesses(final Collection<QName> pids, final File deploymentUnitDirectory) {
final Collection<QName> processNames = new ArrayList<QName>(pids.size());
for (final QName pid : pids) {
final ProcessConf processConf = _store.getProcessConfiguration(pid);
_serverLock.writeLock().lock();
try {
_server.register(processConf);
} finally {
_serverLock.writeLock().unlock();
}
final QName processName = processConf.getType(); // process type = BPEL definition name
_bpelFiles.put(processName, new File(deploymentUnitDirectory, processConf.getBpelDocument()));
_pidsToProcessNames.put(pid, processName);
processNames.add(processName);
}
_log.info("Deployed new BPEL processes: " + pids);
return processNames;
}
/**
* unregister processes in BPEL server, use server lock to prevent any pipeline invocation at the same time.
*
* @param redeployedProcessNames
* names of processes currently updated. If the new process is contained in this list, it is not removed from
* some internal maps.
* @return process name
*/
private Collection<QName> unregisterProcesses(final Collection<QName> pids,
final Collection<QName> redeployedProcessNames) {
final Collection<QName> processNames = new ArrayList<QName>(pids.size());
for (final QName pid : pids) {
_serverLock.writeLock().lock();
try {
_server.unregister(pid);
} finally {
_serverLock.writeLock().unlock();
}
final QName processName = _pidsToProcessNames.remove(pid);
if (processName != null) {
processNames.add(processName);
if (redeployedProcessNames == null || !redeployedProcessNames.contains(processName)) {
_bpelFiles.remove(processName);
_processNamesToUnits.remove(processName);
}
}
}
_log.info("Undeployed old BPEL processes: " + pids);
return processNames;
}
/** set the deployment unit for the deplyed process names. */
private void setProcessNamesToUnit(final Collection<QName> processNames, final File deploymentUnitDirectory) {
for (final QName processName : processNames) {
_processNamesToUnits.put(processName, deploymentUnitDirectory);
}
}
/** get or create the deployment unit lock. */
private ReentrantReadWriteLock getDeploymentUnitLock(final File deploymentUnit) {
ReentrantReadWriteLock lock = _deployUnitLocks.get(deploymentUnit);
if (lock == null) {
lock = new ReentrantReadWriteLock(true);
_deployUnitLocks.put(deploymentUnit, lock);
}
return lock;
}
/**
* acquire deploy WRITE lock and replace TCCL.
*
* @return replaced classloader for later restoring.
*/
private ClassLoader prepareDeployOperation(final File deploymentUnit) {
final ReentrantReadWriteLock lock = getDeploymentUnitLock(deploymentUnit);
lock.writeLock().lock();
return prepareTccl();
}
/** restore TCCL and release deploy WRITE lock. */
private void finishDeployOperation(final File deploymentUnit, final ClassLoader tcclBackup) {
restoreTccl(tcclBackup);
final ReentrantReadWriteLock lock = _deployUnitLocks.get(deploymentUnit);
if (lock != null && lock.isWriteLockedByCurrentThread()) {
lock.writeLock().unlock();
}
}
/**
* acquire deployment unit's and server's READ lock and replace TCCL.
*
* @return replaced classloader for later restoring.
* @throws ODEServerException
* process not deployed.
*/
private ClassLoader prepareInvokeOperation(final QName processName) throws ODEServerException {
ReentrantReadWriteLock lock = null;
final File deploymentUnit = _processNamesToUnits.get(processName);
if (deploymentUnit != null) {
lock = _deployUnitLocks.get(deploymentUnit);
}
if (lock == null) {
throw new ODEServerException("Process " + processName + " is not deployed currently.");
}
lock.readLock().lock();
_serverLock.readLock().lock();
return prepareTccl();
}
/** restore TCCL and release deployment unit's and server's READ lock. */
private void finishInvokeOperation(final QName processName, final ClassLoader tcclBackup) {
restoreTccl(tcclBackup);
final File deploymentUnit = _processNamesToUnits.get(processName);
if (deploymentUnit != null) {
final ReentrantReadWriteLock lock = _deployUnitLocks.get(deploymentUnit);
if (lock != null) {
lock.readLock().unlock();
}
}
_serverLock.readLock().unlock();
}
/** @return replaced classloader for later restoring. */
private ClassLoader prepareTccl() {
final ClassLoader tcclBackup = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(ODEServer.class.getClassLoader());
return tcclBackup;
}
/** restore TCCL. */
private void restoreTccl(final ClassLoader tcclBackup) {
Thread.currentThread().setContextClassLoader(tcclBackup);
}
/**
* intialize BPEL server.
*
* @param contextFactory
* context factory creating necessary context objects.
*/
private void initBPELServer(final ODEServerContextFactory contextFactory) {
if (_scheduler == null) {
throw new RuntimeException("No scheduler");
}
if (_daoCF == null) {
throw new RuntimeException("No DAO");
}
_server.setDaoConnectionFactory(_daoCF);
_server.setScheduler(_scheduler);
_server.setTransactionManager(_txManager);
_server.setMessageExchangeContext(contextFactory.createMessageExchangeContext());
_server.setBindingContext(contextFactory.createBindingContext(this));
_server.setEndpointReferenceContext(contextFactory.createEPRContext());
_server.setConfigProperties(_odeConfig);
_server.init();
final String txTimeoutValue =
_odeConfig.getProperties().getProperty(PROP_PIPELINE_TIMEOUT, DEFAULT_PIPELINE_TIMEOUT);
final int txTimeout = Integer.parseInt(txTimeoutValue);
_log.info("BPEL process execution timeout: " + txTimeout + " seconds.");
_txTimeoutMillis = txTimeout * MILLIS_PER_SECOND;
}
/**
* create a store for BPEL process management.
*
* @param contextFactory
* context factory to create the needed context objects.
*/
private void createProcessStore(final ODEServerContextFactory contextFactory) {
_store =
new ProcessStoreImpl(contextFactory.createEPRContext(), _dataSource, _odeConfig.getDAOConnectionFactory(),
_odeConfig, true);
}
/**
* create a TransactionManager for the BPEL engine. Currently hardcoded to use the Geronimo implementation of
* transactions.
*
* @return a new transaction manager
* @throws SystemException
* error in initialisation.
*/
private TransactionManager createTransactionManager() throws SystemException {
final EmbeddedGeronimoFactory factory = new EmbeddedGeronimoFactory();
_txManager = factory.getTransactionManager();
return _txManager;
}
/**
* create a data source for persistence operations of the BPEL engine.
*
* @throws DatabaseConfigException
* invalid database configuration.
*/
private void createDataSource() throws DatabaseConfigException {
if (_txManager == null) {
throw new RuntimeException("No transaction manager");
}
_database = new Database(_odeConfig);
_database.setTransactionManager(_txManager);
if (_odeConfig.getDbMode() == OdeConfigProperties.DatabaseMode.EMBEDDED) {
_database.setWorkRoot(new File(_odeConfig.getWorkingDir()));
}
_database.start();
_dataSource = _database.getDataSource();
_daoCF = _database.createDaoCF();
// create dummy connection to setup DB schema.
// this way errors during schema creation (because tables exist alreay, for example) do not
// disturb later operaion (they seem to do with EclipseLink, else).
try {
_txManager.begin();
_daoCF.getConnection();
_txManager.commit();
} catch (final Exception e) {
_log.error("error creating initial BPEL DAO connection", e);
}
}
/**
* create a scheduler for the BPEL engine.
*
* @return a new scheduler
* @throws ODEServerException
* error initializing the scheduler database.
*/
private Scheduler createScheduler() throws ODEServerException {
if (_server == null) {
throw new RuntimeException("No BPEL server");
}
if (_txManager == null) {
throw new RuntimeException("No transaction manager");
}
if (_dataSource == null) {
throw new RuntimeException("No data source");
}
prepareSchedulerDb();
final SimpleScheduler simpleScheduler =
new SimpleScheduler(new GUID().toString(), new JdbcDelegate(_dataSource), _odeConfig.getProperties());
simpleScheduler.setTransactionManager(_txManager);
simpleScheduler.setJobProcessor(_server);
_scheduler = simpleScheduler;
// _scheduler = new MockScheduler(_txManager);
return _scheduler;
}
/**
* create tables and aliases in in-memory HSQLDB.
*
* @throws ODEServerException
* error creating tables
*/
private void prepareSchedulerDb() throws ODEServerException {
Connection c = null;
String sqlScriptName = null;
if ("org.apache.derby.jdbc.EmbeddedDriver".equals(_odeConfig.getDbInternalJdbcDriverClass())) {
sqlScriptName = RESOURCE_SCHEDULER_DERBY_SQL;
} else if ("org.hsqldb.jdbcDriver".equals(_odeConfig.getDbInternalJdbcDriverClass())) {
sqlScriptName = RESOURCE_SCHEDULER_HSQLDB_SQL;
}
// init some tables in DB
if (sqlScriptName != null) {
try {
c = _dataSource.getConnection();
_log.info("Reading SQL commands from " + sqlScriptName + " to prepare DB for scheduler.");
final InputStream sqlStream = getClass().getResourceAsStream(sqlScriptName);
if (sqlStream == null) {
_log.error("Error reading SQL script " + sqlScriptName);
throw new ODEServerException("Error reading SQL script " + sqlScriptName);
}
final BufferedReader reader = new BufferedReader(new InputStreamReader(sqlStream));
String line = null;
StringBuilder sql = new StringBuilder();
while ((line = reader.readLine()) != null) {
sql.append(line.trim());
if (sql.length() > 0 && sql.charAt(sql.length() - 1) == ';') {
// cut off ";", Derby doesn't like it.
sql.setLength(sql.length() - 1);
c.createStatement().execute(sql.toString());
sql = new StringBuilder();
}
}
reader.close();
} catch (final IOException ex) {
throw new ODEServerException("Error reading SQL script: " + ex.getMessage(), ex);
} catch (final SQLException ex) {
_log.info("Error creating tables in scheduler DB: " + ex.toString());
_log.info("Usually this means that the DB has been initialized earlier already, "
+ "in this case everything should be fine.");
} finally {
try {
if (c != null) {
c.close();
}
} catch (SQLException ex) {
ex = null; // ignorable.
}
}
}
}
}