blob: 87801cd633accee018505a2ee8e3b20fbadc73fd [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2010-2017 BSI Business Systems Integration AG.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* BSI Business Systems Integration AG - initial API and implementation
******************************************************************************/
package org.eclipse.scout.rt.mom.jms;
import static org.eclipse.scout.rt.mom.jms.IJmsMomProperties.JMS_PROP_REPLY_ID;
import static org.eclipse.scout.rt.platform.util.Assertions.assertEqual;
import static org.eclipse.scout.rt.platform.util.Assertions.assertFalse;
import static org.eclipse.scout.rt.platform.util.Assertions.assertNotNull;
import static org.eclipse.scout.rt.platform.util.Assertions.assertTrue;
import java.util.Hashtable;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.Topic;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.eclipse.scout.rt.mom.api.DestinationConfigPropertyParser;
import org.eclipse.scout.rt.mom.api.IBiDestination;
import org.eclipse.scout.rt.mom.api.IDestination;
import org.eclipse.scout.rt.mom.api.IDestination.DestinationType;
import org.eclipse.scout.rt.mom.api.IDestination.ResolveMethod;
import org.eclipse.scout.rt.mom.api.IMessageListener;
import org.eclipse.scout.rt.mom.api.IMom;
import org.eclipse.scout.rt.mom.api.IMomImplementor;
import org.eclipse.scout.rt.mom.api.IRequestListener;
import org.eclipse.scout.rt.mom.api.ISubscription;
import org.eclipse.scout.rt.mom.api.MOM;
import org.eclipse.scout.rt.mom.api.PublishInput;
import org.eclipse.scout.rt.mom.api.SubscribeInput;
import org.eclipse.scout.rt.mom.api.marshaller.IMarshaller;
import org.eclipse.scout.rt.mom.api.marshaller.TextMarshaller;
import org.eclipse.scout.rt.platform.BEANS;
import org.eclipse.scout.rt.platform.Platform;
import org.eclipse.scout.rt.platform.config.CONFIG;
import org.eclipse.scout.rt.platform.config.PlatformConfigProperties.ApplicationNameProperty;
import org.eclipse.scout.rt.platform.context.CorrelationId;
import org.eclipse.scout.rt.platform.context.NodeIdentifier;
import org.eclipse.scout.rt.platform.context.RunContexts;
import org.eclipse.scout.rt.platform.exception.DefaultRuntimeExceptionTranslator;
import org.eclipse.scout.rt.platform.exception.ExceptionHandler;
import org.eclipse.scout.rt.platform.exception.PlatformException;
import org.eclipse.scout.rt.platform.exception.ProcessingException;
import org.eclipse.scout.rt.platform.job.IBlockingCondition;
import org.eclipse.scout.rt.platform.job.IFuture;
import org.eclipse.scout.rt.platform.job.JobInput;
import org.eclipse.scout.rt.platform.job.JobState;
import org.eclipse.scout.rt.platform.job.Jobs;
import org.eclipse.scout.rt.platform.transaction.ITransaction;
import org.eclipse.scout.rt.platform.util.Assertions;
import org.eclipse.scout.rt.platform.util.Assertions.AssertionException;
import org.eclipse.scout.rt.platform.util.BooleanUtility;
import org.eclipse.scout.rt.platform.util.IRegistrationHandle;
import org.eclipse.scout.rt.platform.util.ObjectUtility;
import org.eclipse.scout.rt.platform.util.StringUtility;
import org.eclipse.scout.rt.platform.util.TypeCastUtility;
import org.eclipse.scout.rt.platform.util.concurrent.IRunnable;
import org.eclipse.scout.rt.platform.util.concurrent.ThreadInterruptedError;
import org.eclipse.scout.rt.platform.util.concurrent.TimedOutError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implementation of 'instance-scoped' {@link IMom} based on JMS (Java Messaging Standard).
* <p>
* This class expects a JMS implementor to be available at runtime.
*
* @since 6.1
*/
public class JmsMomImplementor implements IMomImplementor {
private static final Logger LOG = LoggerFactory.getLogger(JmsMomImplementor.class);
/**
* Key to explicitly set the JMS client ID. If omitted, the client ID is computed automatically.
*/
public static final String JMS_CLIENT_ID = "scout.mom.jms.clientId";
protected final String m_momUid = UUID.randomUUID().toString();
// init -> thread-safety: only set in init method
protected String m_symbolicName;
@SuppressWarnings("squid:S1149")
protected Hashtable<Object, Object> m_contextEnvironment;
protected ConnectionFactory m_connectionFactory;
protected String m_clientId;
protected boolean m_requestReplyEnabled;
protected IDestination<?> m_requestReplyCancellationTopic;
protected IMarshaller m_defaultMarshaller;
// end init
protected volatile Connection m_connection;
protected ISubscription m_requestCancellationSubscription;
protected final Map<IDestination, Destination> m_jmsDestinations = new ConcurrentHashMap<>();
protected final Map<IDestination, IMarshaller> m_marshallers = new ConcurrentHashMap<>();
@Override
public void init(final Map<Object, Object> properties) throws Exception {
m_symbolicName = Objects.toString(properties.get(SYMBOLIC_NAME), StringUtility.join(" ", CONFIG.getPropertyValue(ApplicationNameProperty.class), "MOM"));
if (Platform.get().inDevelopmentMode()) {
LOG.info("{} configuration: {}", m_symbolicName, properties);
}
try {
m_contextEnvironment = createContextEnvironment(properties);
m_connectionFactory = createConnectionFactory(properties);
m_clientId = computeClientId(properties);
m_defaultMarshaller = createDefaultMarshaller(properties);
initRequestReply(properties);
m_connection = createConnection();
LOG.info("{} initialized: {}", m_symbolicName, m_connection);
}
catch (Exception e) {
try {
destroy();
}
catch (RuntimeException re) {
e.addSuppressed(re);
}
throw e;
}
}
protected void initRequestReply(final Map<Object, Object> properties) {
m_requestReplyEnabled = BooleanUtility.nvl(
TypeCastUtility.castValue(properties.get(REQUEST_REPLY_ENABLED), Boolean.class),
CONFIG.getPropertyValue(RequestReplyEnabledProperty.class));
if (!m_requestReplyEnabled) {
LOG.info("{}: 'request-reply' messaging is disabled", m_symbolicName);
return;
}
// Register consumer to handle cancellation requests for 'request-reply' communication.
Object prop = properties.get(IMomImplementor.REQUEST_REPLY_CANCELLATION_TOPIC);
if (prop instanceof IDestination) {
m_requestReplyCancellationTopic = (IDestination<?>) prop;
}
else {
final String cancellationTopicName = ObjectUtility.toString(prop);
if (cancellationTopicName != null) {
final DestinationConfigPropertyParser p = BEANS.get(DestinationConfigPropertyParser.class).parse(cancellationTopicName);
m_requestReplyCancellationTopic = MOM.newDestination(p.getDestinationName(), DestinationType.TOPIC, p.getResolveMethod(), p.getParameters());
}
else {
m_requestReplyCancellationTopic = CONFIG.getPropertyValue(RequestReplyCancellationTopicProperty.class);
}
}
}
@SuppressWarnings("unchecked")
protected IMarshaller createDefaultMarshaller(final Map<Object, Object> properties) {
Object prop = properties.get(MARSHALLER);
if (prop instanceof IMarshaller) {
return (IMarshaller) prop;
}
else {
Class<? extends IMarshaller> marshallerClass;
String marshallerClassName = ObjectUtility.toString(prop);
if (marshallerClassName != null) {
try {
marshallerClass = (Class<? extends IMarshaller>) Class.forName(marshallerClassName);
}
catch (final ClassNotFoundException | ClassCastException e) {
throw new PlatformException("Failed to load class specified by environment property '{}' [value={}]", MARSHALLER, marshallerClassName, e);
}
}
else {
marshallerClass = CONFIG.getPropertyValue(DefaultMarshallerProperty.class);
}
return BEANS.get(marshallerClass);
}
}
public JobInput newJobInput() {
return Jobs.newInput()
.withExceptionHandling(BEANS.get(MomExceptionHandler.class), true)
.withExecutionHint(m_momUid);
}
public IJmsSessionProvider createSessionProvider() throws JMSException {
return createSessionProvider(null, false);
}
public IJmsSessionProvider createSessionProvider(IDestination<?> destination, boolean transacted) throws JMSException {
Session session = transacted ? ensureConnection().createSession(true, Session.SESSION_TRANSACTED) : ensureConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
return new JmsSessionProvider(session, resolveJmsDestination(destination, session));
}
@Override
public <DTO> void publish(final IDestination<DTO> destination, final DTO transferObject, final PublishInput input) {
assertNotNull(destination, "destination not specified");
assertNotNull(input, "publishInput not specified");
try {
if (input.isTransactional()) {
publishTransactional(destination, transferObject, input);
}
else {
publishNonTransactional(destination, transferObject, input);
}
}
catch (JMSException e) {
throw BEANS.get(DefaultRuntimeExceptionTranslator.class).translate(e);
}
}
protected <DTO> void publishNonTransactional(final IDestination<DTO> destination, final DTO transferObject, final PublishInput input) throws JMSException {
IJmsSessionProvider sessionProvider = createSessionProvider(destination, false);
try {
send(sessionProvider, destination, transferObject, input);
}
finally {
sessionProvider.close();
}
}
protected <DTO> void publishTransactional(final IDestination<DTO> destination, final DTO transferObject, final PublishInput input) throws JMSException {
final ITransaction currentTransaction = assertNotNull(ITransaction.CURRENT.get(), "Transaction required for transactional messaging");
// Register transaction member for transacted publishing.
final JmsTransactionMember txMember = currentTransaction.registerMemberIfAbsent(m_momUid, memberId -> {
try {
return BEANS.get(JmsTransactionMember.class)
.withMemberId(memberId)
.withSessionProvider(createSessionProvider(destination, true))
.withAutoClose(true); // close upon transaction end
}
catch (final JMSException e) {
throw BEANS.get(DefaultRuntimeExceptionTranslator.class).translate(e);
}
});
// Publish the message
send(txMember.getSessionProvider(), destination, transferObject, input);
}
@Override
public <DTO> ISubscription subscribe(final IDestination<DTO> destination, final IMessageListener<DTO> listener, final SubscribeInput input) {
assertNotNull(destination, "destination not specified");
assertNotNull(listener, "messageListener not specified");
assertNotNull(input, "input not specified");
try {
return subscribeImpl(destination, listener, input);
}
catch (final JMSException e) {
throw BEANS.get(DefaultRuntimeExceptionTranslator.class).translate(e);
}
}
protected <DTO> ISubscription subscribeImpl(IDestination<DTO> destination, IMessageListener<DTO> listener, SubscribeInput input) throws JMSException {
IJmsSessionProvider sessionProvider = createSessionProvider(destination, SubscribeInput.ACKNOWLEDGE_TRANSACTED == input.getAcknowledgementMode());
try {
IFuture<Void> jobMonitor = Jobs.schedule(createMessageConsumerJob(sessionProvider, destination, listener, input), newJobInput().withName("JMS subscriber"));
return new JmsSubscription(destination, input, sessionProvider, jobMonitor);
}
catch (JMSException | RuntimeException e) {
sessionProvider.close();
throw e;
}
}
protected <DTO> IRunnable createMessageConsumerJob(IJmsSessionProvider sessionProvider, IDestination<DTO> destination, IMessageListener<DTO> listener, SubscribeInput input) throws JMSException {
return new MessageConsumerJob<DTO>(this, sessionProvider, destination, listener, input);
}
@Override
public <REQUEST, REPLY> REPLY request(final IBiDestination<REQUEST, REPLY> destination, final REQUEST requestObject, final PublishInput input) {
assertTrue(m_requestReplyEnabled, "'request-reply' messaging is not enabled for this MOM");
assertNotNull(destination, "destination not specified");
assertNotNull(input, "publishInput not specified");
assertFalse(input.isTransactional(), "transactional mode not supported for 'request-reply' communication");
final String replyId = String.format("scout.mom.requestreply.uid-%s", UUID.randomUUID()); // JMS message ID not applicable because unknown until sent
final IBlockingCondition condition = Jobs.newBlockingCondition(true);
IFuture<Message> requestFuture = Jobs.schedule(() -> {
try {
return requestImpl(destination, requestObject, input, replyId);
}
finally {
condition.setBlocking(false);
}
}, newJobInput()
.withName("request on {}", destination.getName())
.withExceptionHandling(BEANS.get(MomExceptionHandler.class), false)
.withRunContext(RunContexts.copyCurrent(true)
.withDiagnostics(BEANS.all(IJmsRunContextDiagnostics.class))));
try {
long timeout = input.getRequestReplyTimeout();
if (timeout == PublishInput.INFINITELY) {
condition.waitFor();
}
else {
condition.waitFor(timeout, TimeUnit.MILLISECONDS);
}
Message responseMessage = requestFuture.awaitDoneAndGet();
return transform(responseMessage, replyId, resolveMarshaller(destination));
}
catch (JMSException e) {
throw BEANS.get(DefaultRuntimeExceptionTranslator.class).translate(e);
}
catch (ThreadInterruptedError | TimedOutError e) {
// send cancel to replier
cancelRequest(replyId);
// cancel request job
if (requestFuture.cancel(true)) {
requestFuture.awaitDone();
}
throw e;
}
}
protected <REQUEST, REPLY> Message requestImpl(final IBiDestination<REQUEST, REPLY> destination, final REQUEST requestObject, final PublishInput input, String replyId) throws JMSException {
Message responseMessage;
IJmsSessionProvider sessionProvider = createSessionProvider(destination, false);
try {
TemporaryQueue temporaryQueue = sessionProvider.getTemporaryQueue();
MessageConsumer responseQueueConsumer = sessionProvider.getSession().createConsumer(temporaryQueue);
// send request message
JmsMessageWriter messageWriter = JmsMessageWriter.newInstance(sessionProvider.getSession(), resolveMarshaller(destination))
.writeReplyTo(temporaryQueue)
.writeReplyId(replyId)
.writeProperties(input.getProperties())
.writeTransferObject(requestObject);
send(sessionProvider, destination, messageWriter, input);
// receive response message
responseMessage = responseQueueConsumer.receive();
}
catch (JMSException e) {
if (IFuture.CURRENT.get().isCancelled()) {
// if job was canceled, we ignore JMSException as these are exceptions because of interruption
LOG.info("Request job canceled; {}", e.getMessage());
return null;
}
else {
throw e;
}
}
finally {
sessionProvider.close();
}
// After closing session, delete not required temporary queue. The queue is associated with the connection and exists as long as the connection is alive.
sessionProvider.deleteTemporaryQueue();
return responseMessage;
}
@SuppressWarnings("unchecked")
protected <REPLY> REPLY transform(Message message, String replyId, IMarshaller marshaller) throws JMSException {
assertEqual(replyId, message.getStringProperty(JMS_PROP_REPLY_ID), "expected reply message with id {} but got {}", replyId, message);
JmsMessageReader reader = JmsMessageReader.newInstance(message, marshaller);
Object reply = reader.readMessage().getTransferObject();
if (reader.readRequestReplySuccess()) {
return (REPLY) reply;
}
Throwable cause = reply instanceof Throwable ? (Throwable) reply : new ProcessingException("Request-Reply failed");
throw BEANS.get(DefaultRuntimeExceptionTranslator.class).translate(cause);
}
protected void cancelRequest(final String replyId) {
Jobs.schedule(() -> {
assertNotNull(m_requestReplyCancellationTopic);
IJmsSessionProvider sessionProvider = createSessionProvider();
try {
JmsMessageWriter writer = JmsMessageWriter.newInstance(sessionProvider.getSession(), BEANS.get(TextMarshaller.class))
.writeReplyId(replyId);
send(sessionProvider.getProducer(), resolveJmsDestination(m_requestReplyCancellationTopic, sessionProvider.getSession()), writer, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
}
finally {
sessionProvider.close();
}
}, newJobInput().withName("JMS publish cancel request for {}", replyId));
}
@Override
public <REQUEST, REPLY> ISubscription reply(final IBiDestination<REQUEST, REPLY> destination, final IRequestListener<REQUEST, REPLY> listener, final SubscribeInput input) {
assertTrue(m_requestReplyEnabled, "'request-reply' messaging is not enabled for this MOM");
assertNotNull(destination, "destination not specified");
assertNotNull(listener, "messageListener not specified");
assertNotNull(input, "input not specified");
try {
ensureRequestCancellationSubscription();
return replyImpl(destination, listener, input);
}
catch (final JMSException e) {
throw BEANS.get(DefaultRuntimeExceptionTranslator.class).translate(e);
}
}
protected <REQUEST, REPLY> ISubscription replyImpl(final IBiDestination<REQUEST, REPLY> destination, final IRequestListener<REQUEST, REPLY> listener, final SubscribeInput input) throws JMSException {
IJmsSessionProvider sessionProvider = createSessionProvider(destination, false);
try {
IFuture<Void> jobMonitor = Jobs.schedule(createReplyMessageConsumerJob(sessionProvider, destination, listener, input), newJobInput().withName("JMS subscriber"));
return new JmsSubscription(destination, input, sessionProvider, jobMonitor);
}
catch (JMSException | RuntimeException e) {
sessionProvider.close();
throw e;
}
}
protected <REQUEST, REPLY> IRunnable createReplyMessageConsumerJob(IJmsSessionProvider sessionProvider, IBiDestination<REQUEST, REPLY> destination, IRequestListener<REQUEST, REPLY> listener, SubscribeInput input) throws JMSException {
return new ReplyMessageConsumerJob<REQUEST, REPLY>(this, sessionProvider, destination, listener, input);
}
protected synchronized void ensureRequestCancellationSubscription() throws JMSException {
if (m_requestCancellationSubscription == null) {
m_requestCancellationSubscription = subscribeRequestCancellation(m_requestReplyCancellationTopic);
}
}
protected <DTO> ISubscription subscribeRequestCancellation(IDestination<DTO> cancellationTopic) throws JMSException {
IJmsSessionProvider sessionProvider = createSessionProvider(cancellationTopic, false);
try {
SubscribeInput input = MOM.newSubscribeInput();
IFuture<Void> jobMonitor = Jobs.schedule(new AbstractMessageConsumerJob<DTO>(this, sessionProvider, cancellationTopic, input) {
@Override
protected void onJmsMessage(Message jmsMessage) throws JMSException {
Jobs.getJobManager().cancel(Jobs.newFutureFilterBuilder()
.andMatchExecutionHint(jmsMessage.getStringProperty(JMS_PROP_REPLY_ID))
.toFilter(), true);
}
}, newJobInput().withName("JMS reply cancel message listener"));
return new JmsSubscription(cancellationTopic, input, sessionProvider, jobMonitor);
}
catch (JMSException | RuntimeException e) {
sessionProvider.close();
throw e;
}
}
@Override
public void cancelDurableSubscription(final String durableSubscriptionName) {
try {
IJmsSessionProvider sessionProvider = createSessionProvider();
try {
sessionProvider.getSession().unsubscribe(durableSubscriptionName);
}
finally {
sessionProvider.close();
}
}
catch (final JMSException e) {
throw BEANS.get(DefaultRuntimeExceptionTranslator.class).translate(e);
}
}
@Override
@SuppressWarnings("squid:S1141")
public synchronized void destroy() {
try {
if (m_requestCancellationSubscription != null) {
m_requestCancellationSubscription.dispose();
}
// cancel any still running mom jobs
Predicate<IFuture<?>> momJobsFilter = Jobs.newFutureFilterBuilder().andMatchExecutionHint(m_momUid).toFilter();
Set<IFuture<?>> futures = Jobs.getJobManager().getFutures(momJobsFilter);
if (!futures.isEmpty()) {
Jobs.getJobManager().cancel(Jobs.newFutureFilterBuilder()
.andMatchFuture(futures)
.andMatchNotState(JobState.DONE)
.toFilter(), false);
}
// close connection
destroyConnection();
// wait for jobs to finish
if (!futures.isEmpty()) {
try {
Jobs.getJobManager().awaitDone(momJobsFilter, 10, TimeUnit.SECONDS);
LOG.debug("All mom jobs have finished.");
}
catch (ThreadInterruptedError | TimedOutError e) {
LOG.warn("Unable to cancel all mom jobs: {}", futures, e);
}
}
}
catch (Exception e) {
LOG.error("Failed to destroy MOM", e);
}
}
@Override
public IRegistrationHandle registerMarshaller(final IDestination<?> destination, final IMarshaller marshaller) {
m_marshallers.put(destination, marshaller);
return () -> m_marshallers.remove(destination);
}
/**
* Returns the {@link IMarshaller} registered for the given destination, and is never <code>null</code>.
*/
public IMarshaller resolveMarshaller(final IDestination<?> destination) {
IMarshaller marshaller = m_marshallers.get(destination);
return marshaller != null ? marshaller : m_defaultMarshaller;
}
/**
* @return a shared {@link Connection} to JMS broker.
*/
public Connection getConnection() {
return m_connection;
}
protected Connection ensureConnection() throws JMSException {
if (m_connection == null) {
synchronized (this) {
if (m_connection == null) {
LOG.info("Recreate JMS connection");
m_connection = createConnection();
}
}
}
return m_connection;
}
protected void destroyConnection() {
if (m_connection != null) {
synchronized (this) {
if (m_connection != null) {
try {
m_connection.close();
}
catch (Exception e) {
LOG.debug("Error while closing connection, will continue anyway", e);
}
finally {
m_connection = null;
}
}
}
}
}
public <DTO> void send(IJmsSessionProvider sessionProvider, IDestination<DTO> destination, DTO transferObject, PublishInput input) throws JMSException {
Session session = sessionProvider.getSession();
JmsMessageWriter messageWriter = JmsMessageWriter.newInstance(session, resolveMarshaller(destination))
.writeTransferObject(transferObject)
.writeReplyTo(resolveJmsDestination(input.getReplyTo(), session))
.writeProperties(input.getProperties());
send(sessionProvider, destination, messageWriter, input);
}
public <DTO> void send(IJmsSessionProvider sessionProvider, IDestination<DTO> destination, JmsMessageWriter messageWriter, PublishInput input) throws JMSException {
assertNotNull(destination);
Session session = sessionProvider.getSession();
send(sessionProvider.getProducer(), resolveJmsDestination(destination, session), messageWriter, input);
}
public <DTO> void send(MessageProducer producer, Destination destination, JmsMessageWriter messageWriter, PublishInput input) throws JMSException {
send(producer, destination, messageWriter, toJmsDeliveryMode(input), toJmsPriority(input), toJmsTimeToLive(input));
}
public void send(MessageProducer producer, Destination destination, JmsMessageWriter messageWriter, int deliveryMode, int priority, long timeToLive) throws JMSException {
Message message = messageWriter
.writeCorrelationId(CorrelationId.CURRENT.get())
.build();
LOG.debug("Sending JMS message [destination={}, message={}]", destination, message);
producer.send(destination, message, deliveryMode, priority, timeToLive);
}
protected Context createContext() throws NamingException {
// calling this constructor on InitialContext will not modify m_contextEnvironment
return new InitialContext(m_contextEnvironment);
}
@SuppressWarnings("squid:S1149")
protected Hashtable<Object, Object> createContextEnvironment(final Map<Object, Object> properties) {
Hashtable<Object, Object> env = new Hashtable<>();
if (properties != null) {
for (Entry<Object, Object> entry : properties.entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
LOG.info("ignoring property having null key or value [key={}, value={}]", entry.getKey(), entry.getValue());
}
else if (ObjectUtility.isOneOf(entry.getKey(), CONNECTION_FACTORY, SYMBOLIC_NAME, MARSHALLER, REQUEST_REPLY_ENABLED, REQUEST_REPLY_CANCELLATION_TOPIC, JMS_CLIENT_ID)) {
// Don't pass MOM-specific properties to the initial context to prevent problems with non-standard values.
// For example, some containers throw an error if it finds an unserializable value.
}
else {
env.put(entry.getKey(), entry.getValue());
}
}
}
return env;
}
protected ConnectionFactory createConnectionFactory(Map<Object, Object> properties) throws NamingException {
String connectionFactoryName = (String) assertNotNull(properties.get(CONNECTION_FACTORY), "Property {} not specified to lookup connection factory", CONNECTION_FACTORY);
return (ConnectionFactory) createContext().lookup(connectionFactoryName);
}
protected Connection createConnection() throws JMSException {
Object securityPrincipal = m_contextEnvironment.get(Context.SECURITY_PRINCIPAL);
Object securityCredentials = m_contextEnvironment.get(Context.SECURITY_CREDENTIALS);
Connection connection;
if (securityPrincipal != null && securityCredentials != null) {
connection = m_connectionFactory.createConnection(securityPrincipal.toString(), securityCredentials.toString());
}
else {
connection = m_connectionFactory.createConnection();
}
postCreateConnection(connection);
return connection;
}
protected void postCreateConnection(Connection connection) throws JMSException {
connection.setClientID(m_clientId);
connection.setExceptionListener(this::handleConnectionError);
// we directly start the shared connection
connection.start();
}
protected void handleConnectionError(Exception e) {
LOG.warn("JMS connection encountered an unexpected error. Connection will be destroyed!", e);
destroyConnection(); // will be recreated by ensureConnection()
}
public Destination resolveJmsDestination(final IDestination<?> destination, final Session session) {
if (destination == null) {
return null;
}
Destination jmsDestination = m_jmsDestinations.get(destination);
if (jmsDestination == null) {
synchronized (m_jmsDestinations) {
jmsDestination = m_jmsDestinations.get(destination);
if (jmsDestination == null) {
jmsDestination = defineOrLookupJmsDestination(destination, session);
m_jmsDestinations.put(destination, jmsDestination);
}
}
}
return jmsDestination;
}
protected Destination defineOrLookupJmsDestination(final IDestination<?> destination, final Session session) {
try {
if (destination.getResolveMethod() == ResolveMethod.JNDI) {
return lookupJmsDestination(destination);
}
else if (destination.getResolveMethod() == ResolveMethod.DEFINE) {
return defineJmsDestination(destination, session);
}
throw new AssertionException("Unsupported resolve method [{}]", destination);
}
catch (final JMSException | NamingException e) {
throw BEANS.get(DefaultRuntimeExceptionTranslator.class).translate(e);
}
}
/**
* Looks up a destination via JNDI.
*/
protected Destination lookupJmsDestination(final IDestination<?> destination) throws NamingException {
final Object object = Assertions.assertNotNull(createContext().lookup(destination.getName()));
final Class<?> expectedType = (destination.getType() == DestinationType.QUEUE ? Queue.class : Topic.class);
Assertions.assertInstance(object, expectedType, "The looked up destination is of type '{}', but expected type '{}' [{}]", object.getClass().getName(), expectedType.getName(), destination);
return (Destination) object;
}
/**
* Creates a destination ad-hoc.
*/
protected Destination defineJmsDestination(final IDestination<?> destination, final Session session) throws JMSException {
if (destination.getType() == DestinationType.QUEUE) {
return session.createQueue(destination.getName());
}
else if (destination.getType() == DestinationType.TOPIC) {
return session.createTopic(destination.getName());
}
throw new AssertionException("Unsupported destination type [{}]", destination);
}
public int toJmsPriority(final PublishInput publishInput) {
return publishInput.getPriority() + Message.DEFAULT_PRIORITY;
}
public long toJmsTimeToLive(final PublishInput publishInput) {
return publishInput.getTimeToLive() == PublishInput.INFINITELY ? Message.DEFAULT_TIME_TO_LIVE : publishInput.getTimeToLive();
}
public int toJmsDeliveryMode(final PublishInput publishInput) {
return publishInput.getDeliveryMode() == PublishInput.DELIVERY_MODE_PERSISTENT ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
}
/**
* @return the identifier to name the {@link Connection}.
*/
protected String computeClientId(final Map<Object, Object> properties) {
final String clientId = ObjectUtility.toString(properties.get(JMS_CLIENT_ID));
if (clientId != null) {
return clientId;
}
final String nodeId = BEANS.get(NodeIdentifier.class).get();
return StringUtility.join(" ", m_symbolicName, StringUtility.box("(", nodeId, ")"));
}
/**
* Exception handler used in MOM.
*/
public static class MomExceptionHandler extends ExceptionHandler {
}
}