JmsMomImplementor: automatically recreate broken connections
242253
Change-Id: Id51c940d7306e396e727ac83ae01baddc95c8513
diff --git a/org.eclipse.scout.rt.mom.jms/src/main/java/org/eclipse/scout/rt/mom/jms/JmsMomImplementor.java b/org.eclipse.scout.rt.mom.jms/src/main/java/org/eclipse/scout/rt/mom/jms/JmsMomImplementor.java
index cacdd14..87801cd 100644
--- a/org.eclipse.scout.rt.mom.jms/src/main/java/org/eclipse/scout/rt/mom/jms/JmsMomImplementor.java
+++ b/org.eclipse.scout.rt.mom.jms/src/main/java/org/eclipse/scout/rt/mom/jms/JmsMomImplementor.java
@@ -111,11 +111,11 @@
protected Hashtable<Object, Object> m_contextEnvironment;
protected ConnectionFactory m_connectionFactory;
protected String m_clientId;
- protected Connection m_connection;
protected boolean m_requestReplyEnabled;
protected IDestination<?> m_requestReplyCancellationTopic;
protected IMarshaller m_defaultMarshaller;
// end init
+ protected volatile Connection m_connection;
protected ISubscription m_requestCancellationSubscription;
@@ -132,12 +132,12 @@
m_contextEnvironment = createContextEnvironment(properties);
m_connectionFactory = createConnectionFactory(properties);
m_clientId = computeClientId(properties);
- m_connection = createConnection();
-
m_defaultMarshaller = createDefaultMarshaller(properties);
initRequestReply(properties);
+ m_connection = createConnection();
+
LOG.info("{} initialized: {}", m_symbolicName, m_connection);
}
catch (Exception e) {
@@ -151,7 +151,7 @@
}
}
- protected void initRequestReply(final Map<Object, Object> properties) throws JMSException {
+ protected void initRequestReply(final Map<Object, Object> properties) {
m_requestReplyEnabled = BooleanUtility.nvl(
TypeCastUtility.castValue(properties.get(REQUEST_REPLY_ENABLED), Boolean.class),
CONFIG.getPropertyValue(RequestReplyEnabledProperty.class));
@@ -212,7 +212,7 @@
}
public IJmsSessionProvider createSessionProvider(IDestination<?> destination, boolean transacted) throws JMSException {
- Session session = transacted ? m_connection.createSession(true, Session.SESSION_TRANSACTED) : m_connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session session = transacted ? ensureConnection().createSession(true, Session.SESSION_TRANSACTED) : ensureConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
return new JmsSessionProvider(session, resolveJmsDestination(destination, session));
}
@@ -488,6 +488,7 @@
}
@Override
+ @SuppressWarnings("squid:S1141")
public synchronized void destroy() {
try {
if (m_requestCancellationSubscription != null) {
@@ -505,9 +506,7 @@
}
// close connection
- if (m_connection != null) {
- m_connection.close();
- }
+ destroyConnection();
// wait for jobs to finish
if (!futures.isEmpty()) {
@@ -520,7 +519,7 @@
}
}
}
- catch (JMSException e) {
+ catch (Exception e) {
LOG.error("Failed to destroy MOM", e);
}
}
@@ -546,6 +545,36 @@
return m_connection;
}
+ protected Connection ensureConnection() throws JMSException {
+ if (m_connection == null) {
+ synchronized (this) {
+ if (m_connection == null) {
+ LOG.info("Recreate JMS connection");
+ m_connection = createConnection();
+ }
+ }
+ }
+ return m_connection;
+ }
+
+ protected void destroyConnection() {
+ if (m_connection != null) {
+ synchronized (this) {
+ if (m_connection != null) {
+ try {
+ m_connection.close();
+ }
+ catch (Exception e) {
+ LOG.debug("Error while closing connection, will continue anyway", e);
+ }
+ finally {
+ m_connection = null;
+ }
+ }
+ }
+ }
+ }
+
public <DTO> void send(IJmsSessionProvider sessionProvider, IDestination<DTO> destination, DTO transferObject, PublishInput input) throws JMSException {
Session session = sessionProvider.getSession();
JmsMessageWriter messageWriter = JmsMessageWriter.newInstance(session, resolveMarshaller(destination))
@@ -579,7 +608,7 @@
}
@SuppressWarnings("squid:S1149")
- protected Hashtable<Object, Object> createContextEnvironment(final Map<Object, Object> properties) throws NamingException {
+ protected Hashtable<Object, Object> createContextEnvironment(final Map<Object, Object> properties) {
Hashtable<Object, Object> env = new Hashtable<>();
if (properties != null) {
for (Entry<Object, Object> entry : properties.entrySet()) {
@@ -620,11 +649,16 @@
protected void postCreateConnection(Connection connection) throws JMSException {
connection.setClientID(m_clientId);
- connection.setExceptionListener(ex -> BEANS.get(MomExceptionHandler.class).handle(ex));
+ connection.setExceptionListener(this::handleConnectionError);
// we directly start the shared connection
connection.start();
}
+ protected void handleConnectionError(Exception e) {
+ LOG.warn("JMS connection encountered an unexpected error. Connection will be destroyed!", e);
+ destroyConnection(); // will be recreated by ensureConnection()
+ }
+
public Destination resolveJmsDestination(final IDestination<?> destination, final Session session) {
if (destination == null) {
return null;