JmsMomImplementor: automatically recreate broken connections

242253

Change-Id: Id51c940d7306e396e727ac83ae01baddc95c8513
diff --git a/org.eclipse.scout.rt.mom.jms/src/main/java/org/eclipse/scout/rt/mom/jms/JmsMomImplementor.java b/org.eclipse.scout.rt.mom.jms/src/main/java/org/eclipse/scout/rt/mom/jms/JmsMomImplementor.java
index cacdd14..87801cd 100644
--- a/org.eclipse.scout.rt.mom.jms/src/main/java/org/eclipse/scout/rt/mom/jms/JmsMomImplementor.java
+++ b/org.eclipse.scout.rt.mom.jms/src/main/java/org/eclipse/scout/rt/mom/jms/JmsMomImplementor.java
@@ -111,11 +111,11 @@
   protected Hashtable<Object, Object> m_contextEnvironment;
   protected ConnectionFactory m_connectionFactory;
   protected String m_clientId;
-  protected Connection m_connection;
   protected boolean m_requestReplyEnabled;
   protected IDestination<?> m_requestReplyCancellationTopic;
   protected IMarshaller m_defaultMarshaller;
   // end init
+  protected volatile Connection m_connection;
 
   protected ISubscription m_requestCancellationSubscription;
 
@@ -132,12 +132,12 @@
       m_contextEnvironment = createContextEnvironment(properties);
       m_connectionFactory = createConnectionFactory(properties);
       m_clientId = computeClientId(properties);
-      m_connection = createConnection();
-
       m_defaultMarshaller = createDefaultMarshaller(properties);
 
       initRequestReply(properties);
 
+      m_connection = createConnection();
+
       LOG.info("{} initialized: {}", m_symbolicName, m_connection);
     }
     catch (Exception e) {
@@ -151,7 +151,7 @@
     }
   }
 
-  protected void initRequestReply(final Map<Object, Object> properties) throws JMSException {
+  protected void initRequestReply(final Map<Object, Object> properties) {
     m_requestReplyEnabled = BooleanUtility.nvl(
         TypeCastUtility.castValue(properties.get(REQUEST_REPLY_ENABLED), Boolean.class),
         CONFIG.getPropertyValue(RequestReplyEnabledProperty.class));
@@ -212,7 +212,7 @@
   }
 
   public IJmsSessionProvider createSessionProvider(IDestination<?> destination, boolean transacted) throws JMSException {
-    Session session = transacted ? m_connection.createSession(true, Session.SESSION_TRANSACTED) : m_connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    Session session = transacted ? ensureConnection().createSession(true, Session.SESSION_TRANSACTED) : ensureConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
     return new JmsSessionProvider(session, resolveJmsDestination(destination, session));
   }
 
@@ -488,6 +488,7 @@
   }
 
   @Override
+  @SuppressWarnings("squid:S1141")
   public synchronized void destroy() {
     try {
       if (m_requestCancellationSubscription != null) {
@@ -505,9 +506,7 @@
       }
 
       // close connection
-      if (m_connection != null) {
-        m_connection.close();
-      }
+      destroyConnection();
 
       // wait for jobs to finish
       if (!futures.isEmpty()) {
@@ -520,7 +519,7 @@
         }
       }
     }
-    catch (JMSException e) {
+    catch (Exception e) {
       LOG.error("Failed to destroy MOM", e);
     }
   }
@@ -546,6 +545,36 @@
     return m_connection;
   }
 
+  protected Connection ensureConnection() throws JMSException {
+    if (m_connection == null) {
+      synchronized (this) {
+        if (m_connection == null) {
+          LOG.info("Recreate JMS connection");
+          m_connection = createConnection();
+        }
+      }
+    }
+    return m_connection;
+  }
+
+  protected void destroyConnection() {
+    if (m_connection != null) {
+      synchronized (this) {
+        if (m_connection != null) {
+          try {
+            m_connection.close();
+          }
+          catch (Exception e) {
+            LOG.debug("Error while closing connection, will continue anyway", e);
+          }
+          finally {
+            m_connection = null;
+          }
+        }
+      }
+    }
+  }
+
   public <DTO> void send(IJmsSessionProvider sessionProvider, IDestination<DTO> destination, DTO transferObject, PublishInput input) throws JMSException {
     Session session = sessionProvider.getSession();
     JmsMessageWriter messageWriter = JmsMessageWriter.newInstance(session, resolveMarshaller(destination))
@@ -579,7 +608,7 @@
   }
 
   @SuppressWarnings("squid:S1149")
-  protected Hashtable<Object, Object> createContextEnvironment(final Map<Object, Object> properties) throws NamingException {
+  protected Hashtable<Object, Object> createContextEnvironment(final Map<Object, Object> properties) {
     Hashtable<Object, Object> env = new Hashtable<>();
     if (properties != null) {
       for (Entry<Object, Object> entry : properties.entrySet()) {
@@ -620,11 +649,16 @@
 
   protected void postCreateConnection(Connection connection) throws JMSException {
     connection.setClientID(m_clientId);
-    connection.setExceptionListener(ex -> BEANS.get(MomExceptionHandler.class).handle(ex));
+    connection.setExceptionListener(this::handleConnectionError);
     // we directly start the shared connection
     connection.start();
   }
 
+  protected void handleConnectionError(Exception e) {
+    LOG.warn("JMS connection encountered an unexpected error. Connection will be destroyed!", e);
+    destroyConnection(); // will be recreated by ensureConnection()
+  }
+
   public Destination resolveJmsDestination(final IDestination<?> destination, final Session session) {
     if (destination == null) {
       return null;