JMS: handle interrupt and harden connection management
Thread.interrupt while activemq is in an IO operation not only
interrupts that operation but also kills and disposes the complete jms
connection to the server. This behaviour is valid in theory but wrong in
practice. A Thread.interrupt should not terminate the connection and
therefore the sessions of other threads.
This fix handles auto-failover in the scout jms wrapper and ensures
seamless continuation of subscriber listeners as well as subsequent
publish operations.
Change-Id: I1aca4ed115e31168259ca899ac96f1fa9e02ceba
Signed-off-by: Ivan Motsch <ivan.motsch@bsiag.com>
Reviewed-on: https://git.eclipse.org/r/141790
Tested-by: CI Bot
Reviewed-by: Tobias Schlatter <tobias.schlatter@bsi-software.com>
diff --git a/org.eclipse.scout.rt.mom.jms.test/src/test/java/org/eclipse/scout/rt/mom/jms/JmsMomManualTest.java b/org.eclipse.scout.rt.mom.jms.test/src/test/java/org/eclipse/scout/rt/mom/jms/JmsMomManualTest.java
new file mode 100644
index 0000000..b9fc63a
--- /dev/null
+++ b/org.eclipse.scout.rt.mom.jms.test/src/test/java/org/eclipse/scout/rt/mom/jms/JmsMomManualTest.java
@@ -0,0 +1,253 @@
+/*******************************************************************************
+ * Copyright (c) 2019 BSI Business Systems Integration AG.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * BSI Business Systems Integration AG - initial API and implementation
+ ******************************************************************************/
+package org.eclipse.scout.rt.mom.jms;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import javax.naming.Context;
+
+import org.apache.activemq.broker.BrokerRegistry;
+import org.apache.activemq.broker.BrokerService;
+import org.eclipse.scout.rt.mom.api.AbstractMomTransport;
+import org.eclipse.scout.rt.mom.api.IDestination;
+import org.eclipse.scout.rt.mom.api.IDestination.DestinationType;
+import org.eclipse.scout.rt.mom.api.IDestination.ResolveMethod;
+import org.eclipse.scout.rt.mom.api.IMessage;
+import org.eclipse.scout.rt.mom.api.IMessageListener;
+import org.eclipse.scout.rt.mom.api.IMomImplementor;
+import org.eclipse.scout.rt.mom.api.MOM;
+import org.eclipse.scout.rt.mom.api.SubscribeInput;
+import org.eclipse.scout.rt.mom.api.marshaller.ObjectMarshaller;
+import org.eclipse.scout.rt.platform.BEANS;
+import org.eclipse.scout.rt.platform.BeanMetaData;
+import org.eclipse.scout.rt.platform.IBean;
+import org.eclipse.scout.rt.platform.IgnoreBean;
+import org.eclipse.scout.rt.platform.filter.IFilter;
+import org.eclipse.scout.rt.platform.job.IFuture;
+import org.eclipse.scout.rt.platform.job.JobState;
+import org.eclipse.scout.rt.platform.job.Jobs;
+import org.eclipse.scout.rt.platform.util.BeanUtility;
+import org.eclipse.scout.rt.platform.util.IDisposable;
+import org.eclipse.scout.rt.platform.util.SleepUtil;
+import org.eclipse.scout.rt.platform.util.StringUtility;
+import org.eclipse.scout.rt.platform.util.concurrent.IRunnable;
+import org.eclipse.scout.rt.platform.util.concurrent.ThreadInterruptedError;
+import org.eclipse.scout.rt.platform.util.concurrent.TimedOutError;
+import org.eclipse.scout.rt.testing.platform.runner.PlatformTestRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(PlatformTestRunner.class)
+public class JmsMomManualTest {
+ private static final Logger LOG = LoggerFactory.getLogger(JmsMomManualTest.class);
+
+ private FixtureJmsMom m_mom;
+ private final List<IBean<?>> m_beans = new ArrayList<>();
+ private final List<IDisposable> m_disposables = new ArrayList<>();
+
+ private String m_testJobExecutionHint;
+
+ @Rule
+ public TestName m_testName = new TestName();
+ public long m_t0;
+
+ @Before
+ public void before() {
+ installTestMom(FixtureJmsMom.class);
+
+ LOG.info("---------------------------------------------------");
+ LOG.info("<{}>", m_testName.getMethodName());
+ m_t0 = System.nanoTime();
+ m_testJobExecutionHint = UUID.randomUUID().toString();
+ }
+
+ @After
+ public void after() throws Exception {
+ // Dispose resources
+ dispose(m_disposables);
+
+ // Cancel regular jobs
+ IFilter<IFuture<?>> testJobsFilter = Jobs.newFutureFilterBuilder()
+ .andMatchExecutionHint(m_testJobExecutionHint)
+ .toFilter();
+ Set<IFuture<?>> futures = Jobs.getJobManager().getFutures(testJobsFilter);
+ if (futures.size() > 0) {
+ LOG.info("Cancelling {} regular jobs: {}", futures.size(), futures);
+ Jobs.getJobManager().cancel(Jobs.newFutureFilterBuilder()
+ .andMatchFuture(futures)
+ .andMatchNotState(JobState.DONE)
+ .toFilter(), true);
+ long t0 = System.nanoTime();
+ try {
+ Jobs.getJobManager().awaitDone(testJobsFilter, 30, TimeUnit.SECONDS);
+ LOG.info("All regular jobs have finished after {} ms", StringUtility.formatNanos(System.nanoTime() - t0));
+ }
+ catch (TimedOutError e) {
+ LOG.warn("Some cancelled regular jobs are still running after {} ms! Please check their implementation.", StringUtility.formatNanos(System.nanoTime() - t0));
+ }
+ }
+ // Cancel jms subscriber jobs
+ IFilter<IFuture<?>> jmsJobsFilter = Jobs.newFutureFilterBuilder()
+ .andMatchExecutionHint(FixtureJmsJobInput.HINT)
+ .toFilter();
+ futures = Jobs.getJobManager().getFutures(jmsJobsFilter);
+ if (futures.size() > 0) {
+ LOG.info("Cancelling {} subscriber jobs: {}", futures.size(), futures);
+ Jobs.getJobManager().cancel(Jobs.newFutureFilterBuilder()
+ .andMatchFuture(futures)
+ .andMatchNotState(JobState.DONE)
+ .toFilter(), false);
+ long t0 = System.nanoTime();
+ try {
+ Jobs.getJobManager().awaitDone(testJobsFilter, 30, TimeUnit.SECONDS);
+ LOG.info("All subscriber jobs have finished after {} ms", StringUtility.formatNanos(System.nanoTime() - t0));
+ }
+ catch (TimedOutError e) {
+ LOG.warn("Some cancelled subscriber jobs are still running after {} ms! Please check their implementation.", StringUtility.formatNanos(System.nanoTime() - t0));
+ }
+ }
+
+ uninstallTestMom();
+
+ // ensure activeMQ is stopped
+ BrokerService brokerService = BrokerRegistry.getInstance().findFirst();
+ if (brokerService != null) {
+ brokerService.stop();
+ brokerService.waitUntilStopped();
+ }
+
+ LOG.info("Finished test in {} ms", StringUtility.formatNanos(System.nanoTime() - m_t0));
+ LOG.info("</{}>", m_testName.getMethodName());
+ }
+
+ protected void installTestMom(Class<? extends FixtureJmsMom> transportType) {
+ m_beans.add(BEANS.getBeanManager().registerBean(new BeanMetaData(FixtureJmsJobInput.class)));
+
+ FixtureJmsMom transport = BeanUtility.createInstance(transportType);
+ m_beans.add(BEANS.getBeanManager().registerBean(new BeanMetaData(transportType, transport)));
+ m_mom = BEANS.get(transportType);
+ }
+
+ protected void uninstallTestMom() {
+ m_mom.destroy();
+ m_mom = null;
+ for (IBean<?> bean : m_beans) {
+ BEANS.getBeanManager().unregisterBean(bean);
+ }
+ m_beans.clear();
+ }
+
+ private void dispose(Collection<IDisposable> disposables) {
+ if (!disposables.isEmpty()) {
+ LOG.info("Disposing {} objects: {}", disposables.size(), disposables);
+ for (IDisposable disposable : disposables) {
+ disposable.dispose();
+ }
+ disposables.clear();
+ }
+ }
+
+ @Ignore
+ @Test
+ public void test() throws InterruptedException {
+ //retryCount=3, retryInterval=1s, sesionRetryInterval=2s
+
+ final IDestination<String> queue = MOM.newDestination("test/mom/testSubscribeFailover", DestinationType.QUEUE, ResolveMethod.DEFINE, null);
+ m_disposables.add(MOM.registerMarshaller(FixtureJmsMom.class, queue, BEANS.get(ObjectMarshaller.class)));
+
+ // Register subscriber
+ m_disposables.add(MOM.subscribe(FixtureJmsMom.class, queue,
+ new IMessageListener<String>() {
+ @Override
+ public void onMessage(IMessage<String> message) {
+ System.out.println("RECEIVED " + message.getTransferObject());
+ }
+ },
+ MOM.newSubscribeInput().withAcknowledgementMode(SubscribeInput.ACKNOWLEDGE_AUTO)));
+
+ // Publish messages
+ IFuture<?> publishJob = Jobs.schedule(new IRunnable() {
+ @Override
+ public void run() throws Exception {
+ int i = 0;
+ while (true) {
+ i++;
+ try {
+ MOM.publish(FixtureJmsMom.class, queue, "message-" + i, MOM.newPublishInput());
+ System.out.println("SEND " + i + " OK");
+ }
+ catch (Exception | ThreadInterruptedError e) {
+ System.out.println("SEND " + i + " FAILED");
+ }
+ SleepUtil.sleepSafe(1, TimeUnit.SECONDS);
+ Thread.interrupted();
+ }
+ }
+ }, Jobs.newInput());
+
+ publishJob.awaitDone();
+ }
+
+ /**
+ * Encapsulates {@link JmsMomImplementor} for testing purpose.
+ */
+ @IgnoreBean
+ public static class FixtureJmsMom extends AbstractMomTransport {
+ @Override
+ protected Class<? extends IMomImplementor> getConfiguredImplementor() {
+ return JmsMomImplementor.class;
+ }
+
+ @Override
+ protected Map<String, String> getConfiguredEnvironment() {
+ // We do not need jmx for unit testing. Also we must disable watchTopicAdvisories else some concurrent issues with broker recreation will happen
+ final Map<String, String> activeMQEnvironment = new HashMap<>();
+ activeMQEnvironment.put(Context.INITIAL_CONTEXT_FACTORY, org.apache.activemq.jndi.ActiveMQInitialContextFactory.class.getName());
+ activeMQEnvironment.put("connectionFactoryNames", "JUnitConnectionFactory"); // Active MQ specific
+ activeMQEnvironment.put(IMomImplementor.CONNECTION_FACTORY, "JUnitConnectionFactory");
+ activeMQEnvironment.put(IMomImplementor.SYMBOLIC_NAME, "Scout JUnit MOM");
+
+ //Server embedded in vm
+ //activeMQEnvironment.put(Context.PROVIDER_URL, "vm://mom" + MOM_COUNTER.incrementAndGet() + "/junit" + activeMQUrlOptions);
+
+ //Server external with activemq failover
+ //activeMQEnvironment.put(Context.PROVIDER_URL, "failover:(tcp://localhost:61616?keepAlive=true)/junit?randomize=false&jms.prefetchPolicy.queuePrefetch=1");
+
+ //Server external without activemq failover
+ activeMQEnvironment.put(Context.PROVIDER_URL, "tcp://localhost:61616?jms.prefetchPolicy.all=100&jms.redeliveryPolicy.maximumRedeliveries=5");
+
+ return activeMQEnvironment;
+ }
+
+ @Override
+ protected Map<Object, Object> lookupEnvironment() {
+ Map<Object, Object> env = super.lookupEnvironment();
+ env.put(IMomImplementor.CONNECTION_RETRY_COUNT, 5);
+ env.put(IMomImplementor.CONNECTION_RETRY_INTERVAL_MILLIS, 1000);
+ env.put(IMomImplementor.SESSION_RETRY_INTERVAL_MILLIS, 2000);
+ return env;
+ }
+ }
+}
diff --git a/org.eclipse.scout.rt.mom.jms/src/main/java/org/eclipse/scout/rt/mom/jms/AbstractMessageConsumerJob.java b/org.eclipse.scout.rt.mom.jms/src/main/java/org/eclipse/scout/rt/mom/jms/AbstractMessageConsumerJob.java
index 6ec7651..135b779 100644
--- a/org.eclipse.scout.rt.mom.jms/src/main/java/org/eclipse/scout/rt/mom/jms/AbstractMessageConsumerJob.java
+++ b/org.eclipse.scout.rt.mom.jms/src/main/java/org/eclipse/scout/rt/mom/jms/AbstractMessageConsumerJob.java
@@ -29,6 +29,7 @@
import org.eclipse.scout.rt.platform.transaction.ITransaction;
import org.eclipse.scout.rt.platform.transaction.TransactionScope;
import org.eclipse.scout.rt.platform.util.concurrent.IRunnable;
+import org.eclipse.scout.rt.platform.util.concurrent.ThreadInterruptedError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,7 +97,9 @@
continue;
}
}
- catch (Exception e) {
+ catch (Exception | ThreadInterruptedError e) {
+ //not catching ThreadInterruptedError would exit the event loop in case of accidential thread interruption
+ Thread.interrupted();
if (IFuture.CURRENT.get().isCancelled() || m_sessionProvider.isClosing()) {
LOG.debug("JMS MessageConsumer for {} was closed", m_destination);
break;
@@ -109,7 +112,9 @@
LOG.debug("Receiving JMS message [message={}]", message);
onJmsMessage(message);
}
- catch (Exception e) {
+ catch (Exception | ThreadInterruptedError e) {
+ //not catching ThreadInterruptedError would exit the event loop in case of accidential thread interruption in the downstream call to handleIncoming
+ Thread.interrupted();
if (isRollbackNecessary(e)) {
try {
transactedSession.rollback();
@@ -130,7 +135,7 @@
* <p>
* Once the scout JMS-Transaction-Member is registered it will safely be rollbacked on errors.
*/
- protected boolean isRollbackNecessary(Exception e) {
+ protected boolean isRollbackNecessary(Throwable e) {
return isTransacted() && !(e instanceof PlatformException);
}
diff --git a/org.eclipse.scout.rt.mom.jms/src/main/java/org/eclipse/scout/rt/mom/jms/JmsMomImplementor.java b/org.eclipse.scout.rt.mom.jms/src/main/java/org/eclipse/scout/rt/mom/jms/JmsMomImplementor.java
index 4b6d84f..3aaac4c 100644
--- a/org.eclipse.scout.rt.mom.jms/src/main/java/org/eclipse/scout/rt/mom/jms/JmsMomImplementor.java
+++ b/org.eclipse.scout.rt.mom.jms/src/main/java/org/eclipse/scout/rt/mom/jms/JmsMomImplementor.java
@@ -683,16 +683,30 @@
protected Connection createConnection() throws JMSException {
Object securityPrincipal = m_contextEnvironment.get(Context.SECURITY_PRINCIPAL);
Object securityCredentials = m_contextEnvironment.get(Context.SECURITY_CREDENTIALS);
- Connection connection;
- if (securityPrincipal != null && securityCredentials != null) {
- connection = m_connectionFactory.createConnection(securityPrincipal.toString(), securityCredentials.toString());
+ Connection connection = null;
+ boolean connectionValid = false;
+ try {
+ if (securityPrincipal != null && securityCredentials != null) {
+ connection = m_connectionFactory.createConnection(securityPrincipal.toString(), securityCredentials.toString());
+ }
+ else {
+ connection = m_connectionFactory.createConnection();
+ }
+ postCreateConnection(connection);
+ connectionValid = true;
+ return connection;
}
- else {
- connection = m_connectionFactory.createConnection();
+ finally {
+ //detect failure
+ if (connection != null && !connectionValid) {
+ try {
+ connection.close();
+ }
+ catch (JMSException e2) {
+ LOG.info("Close invalid connection", e2);
+ }
+ }
}
-
- postCreateConnection(connection);
- return connection;
}
protected void postCreateConnection(Connection connection) throws JMSException {
diff --git a/org.eclipse.scout.rt.mom.jms/src/main/java/org/eclipse/scout/rt/mom/jms/internal/JmsConnectionWrapper.java b/org.eclipse.scout.rt.mom.jms/src/main/java/org/eclipse/scout/rt/mom/jms/internal/JmsConnectionWrapper.java
index 4287dd2..6013545 100644
--- a/org.eclipse.scout.rt.mom.jms/src/main/java/org/eclipse/scout/rt/mom/jms/internal/JmsConnectionWrapper.java
+++ b/org.eclipse.scout.rt.mom.jms/src/main/java/org/eclipse/scout/rt/mom/jms/internal/JmsConnectionWrapper.java
@@ -143,23 +143,27 @@
*/
public void invalidate(JMSException e) {
synchronized (m_connectionFunction) {
- //invalidate all sessions
- LOG.warn("invalidate connection and {} sessions due to '{}'", m_sessionWrappers.size(), e.getMessage());
- for (JmsSessionProviderWrapper s : m_sessionWrappers.keySet()) {
- s.invalidate();
- }
- //close real connection
- if (m_impl != null) {
+ try {
try {
- m_impl.close();
- }
- catch (JMSException e2) {
- BEANS.get(MomExceptionHandler.class).handle(e2);
+ //invalidate all sessions
+ LOG.warn("invalidate connection and {} sessions due to '{}'", m_sessionWrappers.size(), e.getMessage());
+ for (JmsSessionProviderWrapper s : m_sessionWrappers.keySet()) {
+ s.invalidate();
+ }
}
finally {
- m_impl = null;
+ //close real connection
+ if (m_impl != null) {
+ m_impl.close();
+ }
}
}
+ catch (JMSException e2) {
+ BEANS.get(MomExceptionHandler.class).handle(e2);
+ }
+ finally {
+ m_impl = null;
+ }
}
}
@@ -191,24 +195,39 @@
if (m_impl != null) {
return m_impl;
}
- final Connection c = m_connectionFunction.create();
- if (m_connectionRetryCount > 0) {
- c.setExceptionListener(new ExceptionListener() {
- @Override
- public void onException(JMSException e1) {
- if (m_connectionRetryCount > 0) {
- LOG.info("JMS connection dropped with '{}'; starting failover", e1.getMessage());
- invalidate(e1);
- }
- else {
+
+ Connection tmp = null;
+ try {
+ tmp = m_connectionFunction.create();
+ if (m_connectionRetryCount > 0) {
+ tmp.setExceptionListener(new ExceptionListener() {
+ @Override
+ public void onException(final JMSException e1) {
+ if (m_connectionRetryCount > 0) {
+ LOG.info("JMS connection dropped; starting failover", e1);
+ invalidate(e1);
+ return;
+ }
BEANS.get(MomExceptionHandler.class).handle(e1);
}
- }
- });
+ });
+ }
+ //success
+ m_impl = tmp;
+ LOG.info("JMS connection established: {}", m_impl);
+ return m_impl;
}
- m_impl = c;
- LOG.info("JMS connection established: {}", m_impl);
- return m_impl;
+ finally {
+ //detect failure
+ if (m_impl == null && tmp != null) {
+ try {
+ tmp.close();
+ }
+ catch (JMSException e2) {
+ LOG.info("Close invalid connection", e2);
+ }
+ }
+ }
}
/**
@@ -219,7 +238,7 @@
protected void waitForRetry(int retry, JMSException e) throws JMSException {
if (retry > m_connectionRetryCount) {
if (m_connectionRetryCount > 0) {
- LOG.info("JMS connection unavailable '{}'; fail after {} retry", e, retry);
+ LOG.warn("JMS connection unavailable '{}'; fail after {} retry", e, m_connectionRetryCount);
}
throw e;
}
@@ -228,6 +247,7 @@
Thread.sleep(m_connectionRetryIntervalMillis);
}
catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
throw new ThreadInterruptedError("Interrupted", ie);
}
}
diff --git a/org.eclipse.scout.rt.mom.jms/src/main/java/org/eclipse/scout/rt/mom/jms/internal/JmsSessionProviderWrapper.java b/org.eclipse.scout.rt.mom.jms/src/main/java/org/eclipse/scout/rt/mom/jms/internal/JmsSessionProviderWrapper.java
index 7dea5f3..a4d041f 100644
--- a/org.eclipse.scout.rt.mom.jms/src/main/java/org/eclipse/scout/rt/mom/jms/internal/JmsSessionProviderWrapper.java
+++ b/org.eclipse.scout.rt.mom.jms/src/main/java/org/eclipse/scout/rt/mom/jms/internal/JmsSessionProviderWrapper.java
@@ -78,6 +78,10 @@
/**
* Called by {@link JmsConnectionWrapper} upon failover
+ * <p>
+ *
+ * @throws no
+ * exceptions
*/
public void invalidate() {
synchronized (m_sessionProviderFunction) {
@@ -86,6 +90,9 @@
LOG.info("Invalidate sessionProvider {}", m_impl);
m_impl.close();
}
+ catch (Throwable e) {//NOSONAR
+ //nop
+ }
finally {
m_impl = null;
}