blob: dd1a50b3f531dd9ffe24c832eabb918c71a0d02c [file] [log] [blame]
/*
*
* Copyright (c) 2011, 2016 - Loetz GmbH&Co.KG (69115 Heidelberg, Germany)
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Florian Pirchner - Initial implementation
* Loetz GmbH&Co.KG
*
*/
package org.eclipse.osbp.webserver.messagequeue;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.apache.log4j.Logger;
/**
* Dieses gesamte Package muss zum jetzigen Zeitpunkt <b>manuell</b> dupliziert und synchronisiert werden<ul>
* <li><a href="http://10.1.2.27:8100/cc360/browser/OSGI-Plugins/org.eclipse.osbp.webserver.messagequeue">cc360/OSGI-Plugins/org.eclipse.osbp.webserver.messagequeue</a></li>
* <li><a href="http://10.1.2.27:8100/kapstadt/browser/development2/3.50/src/cxj/diamond7/src/webclt-webserver/org/eclipse/osbp/webserver/messagequeue">cc3.50/development2/3.50/src/cxj/diamond7/src/webclt-webserver/org/eclipse/osbp/webserver/messagequeue</a></li>
* </ul>
* <i>Anchor</i> class for communication of WebClient CCNG Hybrid via ActiveMQ
*/
public class CXMqConsumer implements MessageListener, ExceptionListener {
/** The logger. */
private final Logger LOGGER;
/** The connection. */
private CXMqConnection fConnection;
/** The producer. */
private CXMqProducer fProducer;
/** The consumer queue name. */
private final String fConsumerQueueName;
/** The consumer destination. */
private Destination fConsumerDestination;
/** The portal id. */
private int fPortalId;
/** The user name. */
private String fUserName;
/** The password. */
private String fPassword;
/** The legacy host. */
private String fLegacyHost;
/** The legacy prod env. */
private String fLegacyProdEnv;
/** The legacy mandator. */
private int fLegacyMandator;
/** The producer queue name. */
private String fProducerQueueName;
/** The mq side. */
private final String fMqSide;
/** The listener. */
private final ICXMqMessageListener fListener;
/** The Constant PIPE. */
private static final String PIPE = "|";
/** Used at the CCNG application side!.
*
* @param brokerHost
* host where the ActiveMQ Broker resides
* @param brokerPort
* port of the ActiveMQ Broker, can be 0 as default
* @param remoteQueueName
* name of the remote queue
* @param localSession
* local session id
* @param listener
* additional message listener
* @param logger
* log4j logger if any
*/
public CXMqConsumer(
String brokerHost, int brokerPort,
String remoteQueueName,
String localSession,
ICXMqMessageListener listener,
Logger logger) {
LOGGER = logger;
fMqSide = "CCNG";
fListener = listener;
// --- generate the brokerservice ---
fConnection = CXMqConnection.startInstance(brokerHost, brokerPort, logger);
String localQueueName = null;
try {
// --- generate the queuename
localQueueName =
fConnection.getBrokerUrl()+PIPE+
CXMqOneWayCoder.encode(
localSession
)+
UUID.randomUUID();
// --- setup the connection ---
fConsumerDestination = fConnection.createConsumerDestination(localQueueName, this, this);
}
catch (Exception e) {
if (LOGGER != null) {
LOGGER.error(e);
}
else {
System.err.println(e);
}
localQueueName = null;
}
fConsumerQueueName = localQueueName;
try {
// --- setup the connection ---
initialRequestAuthenticate(remoteQueueName);
}
catch (Exception e) {
if (LOGGER != null) {
LOGGER.error(e);
}
else {
System.err.println(e);
}
}
}
/**
* Used at the WebClient side!.
*
* @param brokerHost
* host where the ActiveMQ Broker resides
* @param brokerPort
* port of the ActiveMQ Broker, can be 0 as default
* @param remoteQueueName
* name of the remote queue
* @param portalId
* where the user is authenticated
* @param userName
* where the user is authenticated
* @param password
* the credential
* @param legacyHost
* the legacy production system
* @param legacyProdEnv
* the legacy production system
* @param legacyMandator
* the legacy production system
* @param listener
* additional message listener
* @param logger
* log4j logger if any
*/
public CXMqConsumer(
String brokerHost, int brokerPort,
String remoteQueueName,
int portalId, String userName, String password,
String legacyHost, String legacyProdEnv, int legacyMandator,
ICXMqMessageListener listener,
Logger logger) {
LOGGER = logger;
fMqSide = "WebClient";
fListener = listener;
if (LOGGER != null) {
LOGGER.info(fMqSide+"broker="+brokerHost+":"+brokerPort);
LOGGER.info(fMqSide+"producer="+remoteQueueName);
LOGGER.info(fMqSide+"login="+portalId+"/"+userName+"@"+legacyHost+"/"+legacyProdEnv+"/"+legacyMandator);
LOGGER.info(fMqSide+"listener="+listener.toString());
}
else {
}
// --- generate the brokerservice ---
fConnection = CXMqConnection.startInstance(brokerHost, brokerPort, logger);
// --- remember relevant informations ---
fPortalId = portalId;
fUserName = userName;
fPassword = password;
fLegacyHost = legacyHost;
fLegacyProdEnv = legacyProdEnv;
fLegacyMandator = legacyMandator;
String localQueueName = null;
try {
// --- generate the queuename
localQueueName =
fConnection.getBrokerUrl()+PIPE+
CXMqOneWayCoder.encode(
portalId+PIPE+userName+PIPE+
legacyHost+PIPE+legacyProdEnv+PIPE+legacyMandator
)+
UUID.randomUUID();
if (LOGGER != null) {
LOGGER.info(fMqSide+"consumer="+localQueueName);
}
else {
}
// --- setup the connection ---
fConsumerDestination = fConnection.createConsumerDestination(localQueueName, this, this);
}
catch (Exception e) {
if (LOGGER != null) {
LOGGER.error(e);
}
else {
System.err.println(e);
}
localQueueName = null;
}
fConsumerQueueName = localQueueName;
try {
// --- setup the connection ---
initialAuthenticate(remoteQueueName);
}
catch (Exception e) {
if (LOGGER != null) {
LOGGER.error(e);
}
else {
System.err.println(e);
}
localQueueName = null;
}
if (LOGGER != null) {
LOGGER.debug(fMqSide+": consumer="+fConsumerQueueName);
}
else {
System.err.println(fMqSide+": consumer="+fConsumerQueueName);
}
}
/**
* Gets the local queue name.
*
* @return the name of the local/consuming queue
*/
public String getLocalQueueName() {
return fConsumerQueueName;
}
/**
* the connection via ActiveMQ will be disposed.
*/
public void dispose() {
if (fConnection != null) {
// --- send the dispose to the communication partner ---
sendMessage(ECXMqMessageEvent.DISPOSE);
fConnection.shutDown();
}
fConnection = null;
}
/**
* just to log any exceptions.
*
* @param exception
* the exception
*/
@Override
public void onException(JMSException exception) {
if (LOGGER != null) {
LOGGER.error("JMS Exception occured.\n"+exception.getLocalizedMessage()+exception);
}
else {
System.err.println("JMS Exception occured.\n"+exception.getLocalizedMessage()+exception);
}
}
/**
* send a message without any body.
*
* @param event
* the event
* @see {@link #onMessage(Message)}
* @see {@link #sendMessage(ECXMqMessageEvent, Object...)}
* @see {@link #sendMessage(ECXMqMessageEvent, Map)}
*/
public void sendMessage(ECXMqMessageEvent event) {
sendMessage(event, (Map<ECXMqMessageAttribute,Object>)null);
}
/**
* send a message with body.
*
* @param event
* the event
* @param args
* contains pairs of {@link ECXMqMessageAttribute} and values
* @see {@link #onMessage(Message)}
* @see {@link #sendMessage(ECXMqMessageEvent)}
* @see {@link #sendMessage(ECXMqMessageEvent, Map)}
*/
public void sendMessage(ECXMqMessageEvent event, Object... args) {
Map<ECXMqMessageAttribute,Object> body = new HashMap<ECXMqMessageAttribute,Object>();
if (args.length % 2 == 0) {
for (int i = 0; i < args.length; i +=2) {
if (args[i] instanceof ECXMqMessageAttribute) {
body.put((ECXMqMessageAttribute) args[i], args[i+1]);
}
}
}
sendMessage(event, body);
}
/**
* send a message with body.
*
* @param event
* the event
* @param body
* contains pairs of {@link ECXMqMessageAttribute} and values
* @see {@link #onMessage(Message)}
* @see {@link #sendMessage(ECXMqMessageEvent)}
* @see {@link #sendMessage(ECXMqMessageEvent, Object...)}
*/
public void sendMessage(ECXMqMessageEvent event, Map<ECXMqMessageAttribute,Object> body) {
if (fProducer != null) {
try {
if (LOGGER != null) {
LOGGER.debug(fMqSide+": producer="+fProducerQueueName);
LOGGER.debug(" event: "+event.toString());
}
else {
System.err.println(fMqSide+": producer="+fProducerQueueName);
System.err.println(" event: "+event.toString());
}
if (body != null) {
for (ECXMqMessageAttribute attr : body.keySet()) {
Object value = body.get(attr);
if (LOGGER != null) {
LOGGER.debug(" attr: "+attr.toString()+"="+(value == null ? "<null>" : value.toString()));
}
else {
System.err.println(" attr: "+attr.toString()+"="+(value == null ? "<null>" : value.toString()));
}
}
}
fProducer.sendMessage(event, body);
}
catch (JMSException e) {
if (LOGGER != null) {
LOGGER.error(e);
}
else {
System.err.println(e);
}
}
}
}
/**
* The CCNG side wants to call a legacy program via WebClient!.
*
* @param legacyProgramCall
* legacy program call, see legacy code for WebClient
*/
public void callLegacyProgram(String legacyProgramCall) {
sendMessage(ECXMqMessageEvent.CALL_LEGACY_PROGRAM,
ECXMqMessageAttribute.LEGACY_PROGRAMCALL, legacyProgramCall
);
}
/**
* The CCNG side is started up and wants to get authenticated via
* WebClient!.
*
* @param remoteQueueName
* name of the corresponding WebClient queuename
*/
private void initialRequestAuthenticate(String remoteQueueName) {
fProducerQueueName = remoteQueueName;
if (LOGGER != null) {
LOGGER.info(fMqSide+"producer="+fProducerQueueName);
}
else {
}
fProducer = fConnection.createProducer(fProducerQueueName);
sendMessage(ECXMqMessageEvent.REQUEST_AUTHENTICATE,
ECXMqMessageAttribute.CCNG_QUEUENAME, fConsumerQueueName
);
}
/**
* The WebClient side is started up and wants to get authenticated CCNG!.
*
* @param remoteQueueName
* name of the corresponding CCNG queuename
*/
private void initialAuthenticate(String remoteQueueName) {
if (remoteQueueName != null) {
fProducerQueueName = remoteQueueName;
if (LOGGER != null) {
LOGGER.info(fMqSide+"producer="+fProducerQueueName);
}
else {
}
fProducer = fConnection.createProducer(remoteQueueName);
sendMessage(ECXMqMessageEvent.TRY_AUTHENTICATE,
ECXMqMessageAttribute.PORTAL_ID, fPortalId,
ECXMqMessageAttribute.USERNAME, fUserName,
ECXMqMessageAttribute.PASSWORD, fPassword
);
}
}
/**
* Handles all messages received!<br>
* Some messages will be handled internally! Other will be redirected to the
* {@link #fListener} given!.
*
* @param anyMessage
* the any message
* @see {@link #sendMessage(ECXMqMessageEvent)}
* @see {@link #sendMessage(ECXMqMessageEvent, Object...)}
* @see {@link #sendMessage(ECXMqMessageEvent, Map)}
*/
@Override
public void onMessage(Message anyMessage) {
if (anyMessage instanceof MapMessage) {
MapMessage message = (MapMessage) anyMessage;
if (LOGGER != null) {
LOGGER.info(fMqSide+": event="+ECXMqMessageEvent.getEvent(message).toString());
}
switch (ECXMqMessageEvent.getEvent(message)) {
case REQUEST_AUTHENTICATE:
// --- try to authenticate the communication partner ---
initialAuthenticate((String)ECXMqMessageAttribute.CCNG_QUEUENAME.getValue(message));
break;
case TRY_AUTHENTICATE:
// --- remember relevant informations ---
fPortalId = (Integer)ECXMqMessageAttribute.PORTAL_ID.getValue(message);
fUserName = (String) ECXMqMessageAttribute.USERNAME.getValue(message);
fPassword = (String) ECXMqMessageAttribute.PASSWORD.getValue(message);
default:
// --- try to get all attributes from the message ---
try {
Map<ECXMqMessageAttribute,Object> body = new HashMap<ECXMqMessageAttribute,Object>();
Enumeration<String> keyNames = message.getPropertyNames();
while (keyNames.hasMoreElements()) {
String key = keyNames.nextElement();
if (!key.equals(ECXMqMessageEvent.EVENT.toString())) {
ECXMqMessageAttribute attr = ECXMqMessageAttribute.UNKNOWN;
Object value = null;
try {
attr = ECXMqMessageAttribute.valueOf(key);
if (attr != null) {
value = message.getObjectProperty(key);
body.put(attr, value);
}
}
catch (IllegalArgumentException iae) {
if (LOGGER != null) {
LOGGER.error(iae);
}
else {
System.err.println(iae);
}
}
if (LOGGER != null) {
LOGGER.info(" attr: "+(attr == null ? "<null>" : attr.toString())+"="+(value == null ? "<null>" : value.toString()));
}
}
}
// --- if the message was extracted, redirect to the listener ---
fListener.onMessage(ECXMqMessageEvent.getEvent(message), body);
}
catch (Exception e) {
if (LOGGER != null) {
LOGGER.error(e);
}
else {
System.err.println(e);
}
}
break;
}
}
else {
if (LOGGER != null) {
LOGGER.error("UNHANDLED "+anyMessage.getClass().getSimpleName()+": "+anyMessage);
}
else {
System.err.println("UNHANDLED "+anyMessage.getClass().getSimpleName()+": "+anyMessage);
}
}
}
}