blob: 4e984e95b77728a5ac200068fc51e7d364a13897 [file] [log] [blame]
/*
*
* Copyright (c) 2011, 2016 - Loetz GmbH&Co.KG (69115 Heidelberg, Germany)
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License 2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Florian Pirchner - Initial implementation
* Loetz GmbH&Co.KG
*
*/
package org.eclipse.osbp.webserver.messagequeue;
import java.util.StringTokenizer;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;
/**
* Dieses gesamte Package muss zum jetzigen Zeitpunkt <b>manuell</b> dupliziert und synchronisiert werden<ul>
* <li><a href="http://10.1.2.27:8100/cc360/browser/OSGI-Plugins/org.eclipse.osbp.webserver.messagequeue">cc360/OSGI-Plugins/org.eclipse.osbp.webserver.messagequeue</a></li>
* <li><a href="http://10.1.2.27:8100/kapstadt/browser/development2/3.50/src/cxj/diamond7/src/webclt-webserver/org/eclipse/osbp/webserver/messagequeue">cc3.50/development2/3.50/src/cxj/diamond7/src/webclt-webserver/org/eclipse/osbp/webserver/messagequeue</a></li>
* </ul>
* Internal connection instance
*/
public class CXMqConnection {
/** The logger. */
private final Logger LOGGER;
/** The started. */
private boolean fStarted = false;
/** The broker host. */
private String fBrokerHost;
/** The broker port. */
private int fBrokerPort = 61616; // default
/** The factory. */
private ActiveMQConnectionFactory fFactory;
/** The connection. */
private Connection fConnection;
/** The session. */
private Session fSession;
/**
* Try to check, if there is a valid broker running on the given adress.
*
* @param brokerHost
* the broker host
* @param brokerPort
* the broker port
* @return true/false
*/
public static boolean checkValidBroker(String brokerHost, int brokerPort) {
CXMqConnection instance = startInstance(brokerHost, brokerPort, null);
boolean retcode = instance.fStarted;
instance.shutDown();
return retcode;
}
/**
* Start instance.
*
* @param brokerHost
* of the ActiveMQ Broker Service
* @return the initialized/started instance for the broker
*/
public static CXMqConnection startInstance(String brokerHost) {
return startInstance(brokerHost, 0, null);
}
/** Start instance.
*
* @param brokerHost
* of the ActiveMQ Broker Service
* @param logger
* log4j logger
* @return the initialized/started instance for the broker
*/
public static CXMqConnection startInstance(String brokerHost, Logger logger) {
return startInstance(brokerHost, 0, logger);
}
/** Start instance.
*
* @param brokerHost
* of the ActiveMQ Broker Service
* @param brokerPort
* of the ActiveMQ Broker Service, may be 0 for default
* @param logger
* log4j logger
* @return the initialized/started instance for the broker
*/
public static CXMqConnection startInstance(String brokerHost, int brokerPort, Logger logger) {
CXMqConnection instance = new CXMqConnection(logger);
instance.startUp(brokerHost, brokerPort);
return instance;
}
/** Gets the broker url.
*
* @return the ActiveMQ Broker Service URL
*/
protected final String getBrokerUrl() {
return "tcp://"+fBrokerHost+":"+fBrokerPort;
}
/** Creates the producer.
*
* @param remoteQueueName
* the remote queue name
* @return the new producer instance for the <code>remoteQueueName</code>
*/
protected CXMqProducer createProducer(String remoteQueueName) {
return new CXMqProducer(fSession, remoteQueueName, LOGGER);
}
/** Creates the consumer destination.
*
* @param localQueueName
* the local queue name
* @param exceptionListener
* listener for any exceptions which may occur
* @param messageListener
* listener for any messages received on the queue name
* @return the created consumer destination for the parameters given
*/
protected Destination createConsumerDestination(String localQueueName, ExceptionListener exceptionListener, MessageListener messageListener) {
if (fStarted) {
try {
Destination destination = fSession.createQueue(localQueueName);
MessageConsumer consumer = fSession.createConsumer(destination);
fConnection.setExceptionListener(exceptionListener);
consumer.setMessageListener(messageListener);
return destination;
}
catch (JMSException e) {
if (LOGGER != null) {
LOGGER.error(e);
}
else {
System.err.println(e);
}
}
}
return null;
}
/** internal startup of all ActiveMQ instances necessary.
*
* @param brokerHost
* the broker host
* @param brokerPort
* the broker port
*/
private void startUp(String brokerHost, int brokerPort) {
if (!fStarted) {
try {
if (brokerHost != null) {
fBrokerHost = brokerHost;
}
if (brokerPort > 0) {
fBrokerPort = brokerPort;
}
fFactory = new ActiveMQConnectionFactory(getBrokerUrl());
fConnection = fFactory.createConnection();
fConnection.start();
fStarted = true;
fSession = fConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
if (LOGGER != null) {
LOGGER.debug("Apache-MQ Broker-Service started");
}
else {
System.err.println("Apache-MQ Broker-Service started");
}
}
catch (LinkageError | Exception e) {
shutDown();
if (LOGGER != null) {
LOGGER.error("Apache-MQ Broker-Service NOT started\n"+e.getLocalizedMessage());
}
else {
System.err.println("Apache-MQ Broker-Service NOT started\n"+e.getLocalizedMessage());
}
}
}
else {
if (LOGGER != null) {
LOGGER.warn("Apache-MQ Broker-Service already started");
}
else {
System.err.println("Apache-MQ Broker-Service already started");
}
}
}
/** internal constructor, getting the default AciveMQ Broker Service!.
*
* @param logger
* the logger
*/
private CXMqConnection(Logger logger) {
super();
LOGGER = logger;
StringTokenizer tokens = new StringTokenizer(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL, "/:");
for (int i = 0; i < tokens.countTokens()-2; i++) {
tokens.nextToken();
}
fBrokerHost = tokens.nextToken();
fBrokerPort = Integer.valueOf(tokens.nextToken());
}
/**
* internal shutdown all ActiveMQ instances.
*/
protected void shutDown() {
if (fConnection != null) {
try {
fConnection.close();
}
catch (LinkageError | Exception e) {
if (LOGGER != null) {
LOGGER.error(e);
}
else {
System.err.println(e);
}
}
fFactory = null;
fConnection = null;
fSession = null;
}
fStarted = false;
}
}