| /* |
| * |
| * Copyright (c) 2011, 2016 - Loetz GmbH&Co.KG (69115 Heidelberg, Germany) |
| * |
| * All rights reserved. This program and the accompanying materials |
| * are made available under the terms of the Eclipse Public License v1.0 |
| * which accompanies this distribution, and is available at |
| * http://www.eclipse.org/legal/epl-v10.html |
| * |
| * Contributors: |
| * Florian Pirchner - Initial implementation |
| * Loetz GmbH&Co.KG |
| * |
| */ |
| |
| |
| package org.eclipse.osbp.webserver.messagequeue; |
| |
| import java.util.Enumeration; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.UUID; |
| |
| import javax.jms.Destination; |
| import javax.jms.ExceptionListener; |
| import javax.jms.JMSException; |
| import javax.jms.MapMessage; |
| import javax.jms.Message; |
| import javax.jms.MessageListener; |
| |
| import org.apache.log4j.Logger; |
| |
| /** |
| * Dieses gesamte Package muss zum jetzigen Zeitpunkt <b>manuell</b> dupliziert und synchronisiert werden<ul> |
| * <li><a href="http://10.1.2.27:8100/cc360/browser/OSGI-Plugins/org.eclipse.osbp.webserver.messagequeue">cc360/OSGI-Plugins/org.eclipse.osbp.webserver.messagequeue</a></li> |
| * <li><a href="http://10.1.2.27:8100/kapstadt/browser/development2/3.50/src/cxj/diamond7/src/webclt-webserver/org/eclipse/osbp/webserver/messagequeue">cc3.50/development2/3.50/src/cxj/diamond7/src/webclt-webserver/org/eclipse/osbp/webserver/messagequeue</a></li> |
| * </ul> |
| * <i>Anchor</i> class for communication of WebClient CCNG Hybrid via ActiveMQ |
| */ |
| public class CXMqConsumer implements MessageListener, ExceptionListener { |
| |
| /** The logger. */ |
| private final Logger LOGGER; |
| |
| /** The connection. */ |
| private CXMqConnection fConnection; |
| |
| /** The producer. */ |
| private CXMqProducer fProducer; |
| |
| /** The consumer queue name. */ |
| private final String fConsumerQueueName; |
| |
| /** The consumer destination. */ |
| private Destination fConsumerDestination; |
| |
| /** The portal id. */ |
| private int fPortalId; |
| |
| /** The user name. */ |
| private String fUserName; |
| |
| /** The password. */ |
| private String fPassword; |
| |
| /** The legacy host. */ |
| private String fLegacyHost; |
| |
| /** The legacy prod env. */ |
| private String fLegacyProdEnv; |
| |
| /** The legacy mandator. */ |
| private int fLegacyMandator; |
| |
| /** The producer queue name. */ |
| private String fProducerQueueName; |
| |
| /** The mq side. */ |
| private final String fMqSide; |
| |
| /** The listener. */ |
| private final ICXMqMessageListener fListener; |
| |
| /** The Constant PIPE. */ |
| private static final String PIPE = "|"; |
| |
| /** Used at the CCNG application side!. |
| * |
| * @param brokerHost |
| * host where the ActiveMQ Broker resides |
| * @param brokerPort |
| * port of the ActiveMQ Broker, can be 0 as default |
| * @param remoteQueueName |
| * name of the remote queue |
| * @param localSession |
| * local session id |
| * @param listener |
| * additional message listener |
| * @param logger |
| * log4j logger if any |
| */ |
| public CXMqConsumer( |
| String brokerHost, int brokerPort, |
| String remoteQueueName, |
| String localSession, |
| ICXMqMessageListener listener, |
| Logger logger) { |
| LOGGER = logger; |
| fMqSide = "CCNG"; |
| fListener = listener; |
| |
| // --- generate the brokerservice --- |
| fConnection = CXMqConnection.startInstance(brokerHost, brokerPort, logger); |
| |
| String localQueueName = null; |
| |
| try { |
| // --- generate the queuename |
| localQueueName = |
| fConnection.getBrokerUrl()+PIPE+ |
| CXMqOneWayCoder.encode( |
| localSession |
| )+ |
| UUID.randomUUID(); |
| |
| // --- setup the connection --- |
| fConsumerDestination = fConnection.createConsumerDestination(localQueueName, this, this); |
| } |
| catch (Exception e) { |
| if (LOGGER != null) { |
| LOGGER.error(e); |
| } |
| else { |
| System.err.println(e); |
| } |
| localQueueName = null; |
| } |
| |
| fConsumerQueueName = localQueueName; |
| |
| try { |
| // --- setup the connection --- |
| initialRequestAuthenticate(remoteQueueName); |
| } |
| catch (Exception e) { |
| if (LOGGER != null) { |
| LOGGER.error(e); |
| } |
| else { |
| System.err.println(e); |
| } |
| } |
| } |
| |
| /** |
| * Used at the WebClient side!. |
| * |
| * @param brokerHost |
| * host where the ActiveMQ Broker resides |
| * @param brokerPort |
| * port of the ActiveMQ Broker, can be 0 as default |
| * @param remoteQueueName |
| * name of the remote queue |
| * @param portalId |
| * where the user is authenticated |
| * @param userName |
| * where the user is authenticated |
| * @param password |
| * the credential |
| * @param legacyHost |
| * the legacy production system |
| * @param legacyProdEnv |
| * the legacy production system |
| * @param legacyMandator |
| * the legacy production system |
| * @param listener |
| * additional message listener |
| * @param logger |
| * log4j logger if any |
| */ |
| public CXMqConsumer( |
| String brokerHost, int brokerPort, |
| String remoteQueueName, |
| int portalId, String userName, String password, |
| String legacyHost, String legacyProdEnv, int legacyMandator, |
| ICXMqMessageListener listener, |
| Logger logger) { |
| LOGGER = logger; |
| fMqSide = "WebClient"; |
| fListener = listener; |
| |
| if (LOGGER != null) { |
| LOGGER.info(fMqSide+"broker="+brokerHost+":"+brokerPort); |
| LOGGER.info(fMqSide+"producer="+remoteQueueName); |
| LOGGER.info(fMqSide+"login="+portalId+"/"+userName+"@"+legacyHost+"/"+legacyProdEnv+"/"+legacyMandator); |
| LOGGER.info(fMqSide+"listener="+listener.toString()); |
| } |
| else { |
| } |
| |
| // --- generate the brokerservice --- |
| fConnection = CXMqConnection.startInstance(brokerHost, brokerPort, logger); |
| |
| // --- remember relevant informations --- |
| fPortalId = portalId; |
| fUserName = userName; |
| fPassword = password; |
| fLegacyHost = legacyHost; |
| fLegacyProdEnv = legacyProdEnv; |
| fLegacyMandator = legacyMandator; |
| |
| String localQueueName = null; |
| |
| try { |
| // --- generate the queuename |
| localQueueName = |
| fConnection.getBrokerUrl()+PIPE+ |
| CXMqOneWayCoder.encode( |
| portalId+PIPE+userName+PIPE+ |
| legacyHost+PIPE+legacyProdEnv+PIPE+legacyMandator |
| )+ |
| UUID.randomUUID(); |
| |
| if (LOGGER != null) { |
| LOGGER.info(fMqSide+"consumer="+localQueueName); |
| } |
| else { |
| } |
| |
| // --- setup the connection --- |
| fConsumerDestination = fConnection.createConsumerDestination(localQueueName, this, this); |
| } |
| catch (Exception e) { |
| if (LOGGER != null) { |
| LOGGER.error(e); |
| } |
| else { |
| System.err.println(e); |
| } |
| localQueueName = null; |
| } |
| |
| fConsumerQueueName = localQueueName; |
| |
| try { |
| // --- setup the connection --- |
| initialAuthenticate(remoteQueueName); |
| } |
| catch (Exception e) { |
| if (LOGGER != null) { |
| LOGGER.error(e); |
| } |
| else { |
| System.err.println(e); |
| } |
| localQueueName = null; |
| } |
| |
| if (LOGGER != null) { |
| LOGGER.debug(fMqSide+": consumer="+fConsumerQueueName); |
| } |
| else { |
| System.err.println(fMqSide+": consumer="+fConsumerQueueName); |
| } |
| } |
| |
| /** |
| * Gets the local queue name. |
| * |
| * @return the name of the local/consuming queue |
| */ |
| public String getLocalQueueName() { |
| return fConsumerQueueName; |
| } |
| |
| /** |
| * the connection via ActiveMQ will be disposed. |
| */ |
| public void dispose() { |
| if (fConnection != null) { |
| // --- send the dispose to the communication partner --- |
| sendMessage(ECXMqMessageEvent.DISPOSE); |
| fConnection.shutDown(); |
| } |
| fConnection = null; |
| } |
| |
| /** |
| * just to log any exceptions. |
| * |
| * @param exception |
| * the exception |
| */ |
| @Override |
| public void onException(JMSException exception) { |
| if (LOGGER != null) { |
| LOGGER.error("JMS Exception occured.\n"+exception.getLocalizedMessage()+exception); |
| } |
| else { |
| System.err.println("JMS Exception occured.\n"+exception.getLocalizedMessage()+exception); |
| } |
| } |
| |
| /** |
| * send a message without any body. |
| * |
| * @param event |
| * the event |
| * @see {@link #onMessage(Message)} |
| * @see {@link #sendMessage(ECXMqMessageEvent, Object...)} |
| * @see {@link #sendMessage(ECXMqMessageEvent, Map)} |
| */ |
| public void sendMessage(ECXMqMessageEvent event) { |
| sendMessage(event, (Map<ECXMqMessageAttribute,Object>)null); |
| } |
| |
| /** |
| * send a message with body. |
| * |
| * @param event |
| * the event |
| * @param args |
| * contains pairs of {@link ECXMqMessageAttribute} and values |
| * @see {@link #onMessage(Message)} |
| * @see {@link #sendMessage(ECXMqMessageEvent)} |
| * @see {@link #sendMessage(ECXMqMessageEvent, Map)} |
| */ |
| public void sendMessage(ECXMqMessageEvent event, Object... args) { |
| Map<ECXMqMessageAttribute,Object> body = new HashMap<ECXMqMessageAttribute,Object>(); |
| if (args.length % 2 == 0) { |
| for (int i = 0; i < args.length; i +=2) { |
| if (args[i] instanceof ECXMqMessageAttribute) { |
| body.put((ECXMqMessageAttribute) args[i], args[i+1]); |
| } |
| } |
| } |
| sendMessage(event, body); |
| } |
| |
| /** |
| * send a message with body. |
| * |
| * @param event |
| * the event |
| * @param body |
| * contains pairs of {@link ECXMqMessageAttribute} and values |
| * @see {@link #onMessage(Message)} |
| * @see {@link #sendMessage(ECXMqMessageEvent)} |
| * @see {@link #sendMessage(ECXMqMessageEvent, Object...)} |
| */ |
| public void sendMessage(ECXMqMessageEvent event, Map<ECXMqMessageAttribute,Object> body) { |
| if (fProducer != null) { |
| try { |
| if (LOGGER != null) { |
| LOGGER.debug(fMqSide+": producer="+fProducerQueueName); |
| LOGGER.debug(" event: "+event.toString()); |
| } |
| else { |
| System.err.println(fMqSide+": producer="+fProducerQueueName); |
| System.err.println(" event: "+event.toString()); |
| } |
| if (body != null) { |
| for (ECXMqMessageAttribute attr : body.keySet()) { |
| Object value = body.get(attr); |
| if (LOGGER != null) { |
| LOGGER.debug(" attr: "+attr.toString()+"="+(value == null ? "<null>" : value.toString())); |
| } |
| else { |
| System.err.println(" attr: "+attr.toString()+"="+(value == null ? "<null>" : value.toString())); |
| } |
| } |
| } |
| fProducer.sendMessage(event, body); |
| } |
| catch (JMSException e) { |
| if (LOGGER != null) { |
| LOGGER.error(e); |
| } |
| else { |
| System.err.println(e); |
| } |
| } |
| } |
| } |
| |
| /** |
| * The CCNG side wants to call a legacy program via WebClient!. |
| * |
| * @param legacyProgramCall |
| * legacy program call, see legacy code for WebClient |
| */ |
| public void callLegacyProgram(String legacyProgramCall) { |
| sendMessage(ECXMqMessageEvent.CALL_LEGACY_PROGRAM, |
| ECXMqMessageAttribute.LEGACY_PROGRAMCALL, legacyProgramCall |
| ); |
| } |
| |
| /** |
| * The CCNG side is started up and wants to get authenticated via |
| * WebClient!. |
| * |
| * @param remoteQueueName |
| * name of the corresponding WebClient queuename |
| */ |
| private void initialRequestAuthenticate(String remoteQueueName) { |
| fProducerQueueName = remoteQueueName; |
| if (LOGGER != null) { |
| LOGGER.info(fMqSide+"producer="+fProducerQueueName); |
| } |
| else { |
| } |
| fProducer = fConnection.createProducer(fProducerQueueName); |
| sendMessage(ECXMqMessageEvent.REQUEST_AUTHENTICATE, |
| ECXMqMessageAttribute.CCNG_QUEUENAME, fConsumerQueueName |
| ); |
| } |
| |
| /** |
| * The WebClient side is started up and wants to get authenticated CCNG!. |
| * |
| * @param remoteQueueName |
| * name of the corresponding CCNG queuename |
| */ |
| private void initialAuthenticate(String remoteQueueName) { |
| if (remoteQueueName != null) { |
| fProducerQueueName = remoteQueueName; |
| if (LOGGER != null) { |
| LOGGER.info(fMqSide+"producer="+fProducerQueueName); |
| } |
| else { |
| } |
| fProducer = fConnection.createProducer(remoteQueueName); |
| sendMessage(ECXMqMessageEvent.TRY_AUTHENTICATE, |
| ECXMqMessageAttribute.PORTAL_ID, fPortalId, |
| ECXMqMessageAttribute.USERNAME, fUserName, |
| ECXMqMessageAttribute.PASSWORD, fPassword |
| ); |
| } |
| } |
| |
| /** |
| * Handles all messages received!<br> |
| * Some messages will be handled internally! Other will be redirected to the |
| * {@link #fListener} given!. |
| * |
| * @param anyMessage |
| * the any message |
| * @see {@link #sendMessage(ECXMqMessageEvent)} |
| * @see {@link #sendMessage(ECXMqMessageEvent, Object...)} |
| * @see {@link #sendMessage(ECXMqMessageEvent, Map)} |
| */ |
| @Override |
| public void onMessage(Message anyMessage) { |
| if (anyMessage instanceof MapMessage) { |
| MapMessage message = (MapMessage) anyMessage; |
| if (LOGGER != null) { |
| LOGGER.info(fMqSide+": event="+ECXMqMessageEvent.getEvent(message).toString()); |
| } |
| switch (ECXMqMessageEvent.getEvent(message)) { |
| case REQUEST_AUTHENTICATE: |
| // --- try to authenticate the communication partner --- |
| initialAuthenticate((String)ECXMqMessageAttribute.CCNG_QUEUENAME.getValue(message)); |
| break; |
| case TRY_AUTHENTICATE: |
| // --- remember relevant informations --- |
| fPortalId = (Integer)ECXMqMessageAttribute.PORTAL_ID.getValue(message); |
| fUserName = (String) ECXMqMessageAttribute.USERNAME.getValue(message); |
| fPassword = (String) ECXMqMessageAttribute.PASSWORD.getValue(message); |
| default: |
| // --- try to get all attributes from the message --- |
| try { |
| Map<ECXMqMessageAttribute,Object> body = new HashMap<ECXMqMessageAttribute,Object>(); |
| Enumeration<String> keyNames = message.getPropertyNames(); |
| while (keyNames.hasMoreElements()) { |
| String key = keyNames.nextElement(); |
| if (!key.equals(ECXMqMessageEvent.EVENT.toString())) { |
| ECXMqMessageAttribute attr = ECXMqMessageAttribute.UNKNOWN; |
| Object value = null; |
| try { |
| attr = ECXMqMessageAttribute.valueOf(key); |
| if (attr != null) { |
| value = message.getObjectProperty(key); |
| body.put(attr, value); |
| } |
| } |
| catch (IllegalArgumentException iae) { |
| if (LOGGER != null) { |
| LOGGER.error(iae); |
| } |
| else { |
| System.err.println(iae); |
| } |
| } |
| if (LOGGER != null) { |
| LOGGER.info(" attr: "+(attr == null ? "<null>" : attr.toString())+"="+(value == null ? "<null>" : value.toString())); |
| } |
| } |
| } |
| // --- if the message was extracted, redirect to the listener --- |
| fListener.onMessage(ECXMqMessageEvent.getEvent(message), body); |
| } |
| catch (Exception e) { |
| if (LOGGER != null) { |
| LOGGER.error(e); |
| } |
| else { |
| System.err.println(e); |
| } |
| } |
| break; |
| } |
| } |
| else { |
| if (LOGGER != null) { |
| LOGGER.error("UNHANDLED "+anyMessage.getClass().getSimpleName()+": "+anyMessage); |
| } |
| else { |
| System.err.println("UNHANDLED "+anyMessage.getClass().getSimpleName()+": "+anyMessage); |
| } |
| } |
| } |
| } |