| /* |
| * |
| * Copyright (c) 2011, 2016 - Loetz GmbH&Co.KG (69115 Heidelberg, Germany) |
| * |
| * All rights reserved. This program and the accompanying materials |
| * are made available under the terms of the Eclipse Public License 2.0 |
| * which accompanies this distribution, and is available at |
| * https://www.eclipse.org/legal/epl-2.0/ |
| * |
| * SPDX-License-Identifier: EPL-2.0 |
| * |
| * Contributors: |
| * Florian Pirchner - Initial implementation |
| * Loetz GmbH&Co.KG |
| * |
| */ |
| |
| package org.eclipse.osbp.webserver.messagequeue; |
| |
| import java.util.StringTokenizer; |
| |
| import javax.jms.Connection; |
| import javax.jms.Destination; |
| import javax.jms.ExceptionListener; |
| import javax.jms.JMSException; |
| import javax.jms.MessageConsumer; |
| import javax.jms.MessageListener; |
| import javax.jms.Session; |
| |
| import org.apache.activemq.ActiveMQConnectionFactory; |
| import org.apache.log4j.Logger; |
| |
| /** |
| * Dieses gesamte Package muss zum jetzigen Zeitpunkt <b>manuell</b> dupliziert und synchronisiert werden<ul> |
| * <li><a href="http://10.1.2.27:8100/cc360/browser/OSGI-Plugins/org.eclipse.osbp.webserver.messagequeue">cc360/OSGI-Plugins/org.eclipse.osbp.webserver.messagequeue</a></li> |
| * <li><a href="http://10.1.2.27:8100/kapstadt/browser/development2/3.50/src/cxj/diamond7/src/webclt-webserver/org/eclipse/osbp/webserver/messagequeue">cc3.50/development2/3.50/src/cxj/diamond7/src/webclt-webserver/org/eclipse/osbp/webserver/messagequeue</a></li> |
| * </ul> |
| * Internal connection instance |
| */ |
| public class CXMqConnection { |
| |
| /** The logger. */ |
| private final Logger LOGGER; |
| |
| /** The started. */ |
| private boolean fStarted = false; |
| |
| /** The broker host. */ |
| private String fBrokerHost; |
| |
| /** The broker port. */ |
| private int fBrokerPort = 61616; // default |
| |
| /** The factory. */ |
| private ActiveMQConnectionFactory fFactory; |
| |
| /** The connection. */ |
| private Connection fConnection; |
| |
| /** The session. */ |
| private Session fSession; |
| |
| /** |
| * Try to check, if there is a valid broker running on the given adress. |
| * |
| * @param brokerHost |
| * the broker host |
| * @param brokerPort |
| * the broker port |
| * @return true/false |
| */ |
| public static boolean checkValidBroker(String brokerHost, int brokerPort) { |
| CXMqConnection instance = startInstance(brokerHost, brokerPort, null); |
| boolean retcode = instance.fStarted; |
| instance.shutDown(); |
| return retcode; |
| } |
| |
| /** |
| * Start instance. |
| * |
| * @param brokerHost |
| * of the ActiveMQ Broker Service |
| * @return the initialized/started instance for the broker |
| */ |
| public static CXMqConnection startInstance(String brokerHost) { |
| return startInstance(brokerHost, 0, null); |
| } |
| |
| /** Start instance. |
| * |
| * @param brokerHost |
| * of the ActiveMQ Broker Service |
| * @param logger |
| * log4j logger |
| * @return the initialized/started instance for the broker |
| */ |
| public static CXMqConnection startInstance(String brokerHost, Logger logger) { |
| return startInstance(brokerHost, 0, logger); |
| } |
| |
| /** Start instance. |
| * |
| * @param brokerHost |
| * of the ActiveMQ Broker Service |
| * @param brokerPort |
| * of the ActiveMQ Broker Service, may be 0 for default |
| * @param logger |
| * log4j logger |
| * @return the initialized/started instance for the broker |
| */ |
| public static CXMqConnection startInstance(String brokerHost, int brokerPort, Logger logger) { |
| CXMqConnection instance = new CXMqConnection(logger); |
| instance.startUp(brokerHost, brokerPort); |
| return instance; |
| } |
| |
| /** Gets the broker url. |
| * |
| * @return the ActiveMQ Broker Service URL |
| */ |
| protected final String getBrokerUrl() { |
| return "tcp://"+fBrokerHost+":"+fBrokerPort; |
| } |
| |
| /** Creates the producer. |
| * |
| * @param remoteQueueName |
| * the remote queue name |
| * @return the new producer instance for the <code>remoteQueueName</code> |
| */ |
| protected CXMqProducer createProducer(String remoteQueueName) { |
| return new CXMqProducer(fSession, remoteQueueName, LOGGER); |
| } |
| |
| /** Creates the consumer destination. |
| * |
| * @param localQueueName |
| * the local queue name |
| * @param exceptionListener |
| * listener for any exceptions which may occur |
| * @param messageListener |
| * listener for any messages received on the queue name |
| * @return the created consumer destination for the parameters given |
| */ |
| protected Destination createConsumerDestination(String localQueueName, ExceptionListener exceptionListener, MessageListener messageListener) { |
| if (fStarted) { |
| try { |
| Destination destination = fSession.createQueue(localQueueName); |
| MessageConsumer consumer = fSession.createConsumer(destination); |
| fConnection.setExceptionListener(exceptionListener); |
| consumer.setMessageListener(messageListener); |
| return destination; |
| } |
| catch (JMSException e) { |
| if (LOGGER != null) { |
| LOGGER.error(e); |
| } |
| else { |
| System.err.println(e); |
| } |
| } |
| } |
| return null; |
| } |
| |
| /** internal startup of all ActiveMQ instances necessary. |
| * |
| * @param brokerHost |
| * the broker host |
| * @param brokerPort |
| * the broker port |
| */ |
| private void startUp(String brokerHost, int brokerPort) { |
| if (!fStarted) { |
| try { |
| if (brokerHost != null) { |
| fBrokerHost = brokerHost; |
| } |
| if (brokerPort > 0) { |
| fBrokerPort = brokerPort; |
| } |
| fFactory = new ActiveMQConnectionFactory(getBrokerUrl()); |
| fConnection = fFactory.createConnection(); |
| fConnection.start(); |
| fStarted = true; |
| fSession = fConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| if (LOGGER != null) { |
| LOGGER.debug("Apache-MQ Broker-Service started"); |
| } |
| else { |
| System.err.println("Apache-MQ Broker-Service started"); |
| } |
| } |
| catch (LinkageError | Exception e) { |
| shutDown(); |
| if (LOGGER != null) { |
| LOGGER.error("Apache-MQ Broker-Service NOT started\n"+e.getLocalizedMessage()); |
| } |
| else { |
| System.err.println("Apache-MQ Broker-Service NOT started\n"+e.getLocalizedMessage()); |
| } |
| } |
| } |
| else { |
| if (LOGGER != null) { |
| LOGGER.warn("Apache-MQ Broker-Service already started"); |
| } |
| else { |
| System.err.println("Apache-MQ Broker-Service already started"); |
| } |
| } |
| } |
| |
| /** internal constructor, getting the default AciveMQ Broker Service!. |
| * |
| * @param logger |
| * the logger |
| */ |
| private CXMqConnection(Logger logger) { |
| super(); |
| LOGGER = logger; |
| StringTokenizer tokens = new StringTokenizer(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL, "/:"); |
| for (int i = 0; i < tokens.countTokens()-2; i++) { |
| tokens.nextToken(); |
| } |
| fBrokerHost = tokens.nextToken(); |
| fBrokerPort = Integer.valueOf(tokens.nextToken()); |
| } |
| |
| /** |
| * internal shutdown all ActiveMQ instances. |
| */ |
| protected void shutDown() { |
| if (fConnection != null) { |
| try { |
| fConnection.close(); |
| } |
| catch (LinkageError | Exception e) { |
| if (LOGGER != null) { |
| LOGGER.error(e); |
| } |
| else { |
| System.err.println(e); |
| } |
| } |
| fFactory = null; |
| fConnection = null; |
| fSession = null; |
| } |
| fStarted = false; |
| } |
| } |