blob: cf1b2d7037a8ccb0f64ae2c324725719302b3031 [file] [log] [blame]
/*
* Copyright (c) 2007-2012, 2015 Eike Stepper (Berlin, Germany) and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Eike Stepper - initial API and implementation
*/
package org.eclipse.net4j.internal.jms;
import org.eclipse.net4j.Net4jUtil;
import org.eclipse.net4j.channel.IChannel;
import org.eclipse.net4j.connector.IConnector;
import org.eclipse.net4j.internal.jms.bundle.OM;
import org.eclipse.net4j.internal.jms.messages.Messages;
import org.eclipse.net4j.internal.jms.protocol.JMSClientProtocol;
import org.eclipse.net4j.internal.jms.protocol.JMSLogonRequest;
import org.eclipse.net4j.internal.jms.protocol.JMSOpenSessionRequest;
import org.eclipse.net4j.jms.JMSUtil;
import org.eclipse.net4j.util.container.Container;
import org.eclipse.net4j.util.container.IContainer;
import org.eclipse.net4j.util.container.IContainerDelta;
import org.eclipse.net4j.util.container.IContainerDelta.Kind;
import org.eclipse.net4j.util.container.IContainerEvent;
import org.eclipse.net4j.util.container.IManagedContainer;
import org.eclipse.net4j.util.container.LifecycleEventConverter;
import org.eclipse.net4j.util.container.SingleDeltaContainerEvent;
import org.eclipse.net4j.util.event.IListener;
import org.eclipse.net4j.util.lifecycle.ILifecycle;
import org.eclipse.net4j.util.lifecycle.LifecycleEventAdapter;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.Topic;
import java.util.ArrayList;
import java.util.List;
public class ConnectionImpl extends Container<Session>implements Connection
{
private String connectorType;
private String connectorDescription;
private String userName;
private String password;
private ExceptionListener exceptionListener;
private String clientID;
private ConnectionMetaData metaData = new ConnectionMetaDataImpl(this);
private long sendTimeout = 2500;
private IManagedContainer transportContainer;
private JMSClientProtocol protocol;
private List<SessionImpl> sessions = new ArrayList<SessionImpl>(0);
private transient IListener sessionListener = new LifecycleEventConverter<Session>(this)
{
@Override
protected IContainerEvent<Session> createContainerEvent(IContainer<Session> container, Session element, Kind kind)
{
if (kind == IContainerDelta.Kind.REMOVED)
{
removeSession((SessionImpl)element);
}
return new SingleDeltaContainerEvent<Session>(container, element, kind);
}
};
private transient IListener channelListener = new LifecycleEventAdapter()
{
@Override
protected void onDeactivated(ILifecycle lifecycle)
{
close();
}
};
private boolean modified;
private boolean stopped = true;
public ConnectionImpl(IManagedContainer transportContainer, String connectorType, String connectorDescription,
String userName, String password) throws JMSException
{
this.transportContainer = transportContainer == null ? JMSUtil.getTransportContainer() : transportContainer;
if (transportContainer == null)
{
throw new JMSException(Messages.getString("ConnectionImpl_0")); //$NON-NLS-1$
}
this.connectorType = connectorType;
this.connectorDescription = connectorDescription;
this.userName = userName;
this.password = password;
IConnector connector = Net4jUtil.getConnector(transportContainer, connectorType, connectorDescription);
JMSClientProtocol protocol = new JMSClientProtocol(this);
IChannel channel = protocol.open(connector);
channel.addListener(channelListener);
try
{
if (!new JMSLogonRequest(protocol, userName, password).send())
{
throw new JMSException(Messages.getString("ConnectionImpl_1")); //$NON-NLS-1$
}
}
catch (JMSException ex)
{
throw ex;
}
catch (Exception ex)
{
throw new JMSException(ex.getMessage());
}
}
public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
ServerSessionPool sessionPool, int maxMessages)
{
throw new UnsupportedOperationException();
}
public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
String messageSelector, ServerSessionPool sessionPool, int maxMessages)
{
throw new UnsupportedOperationException();
}
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException
{
ensureOpen();
setModified();
int sessionID = findFreeSessionID();
SessionImpl session = new SessionImpl(this, sessionID, transacted, acknowledgeMode);
addSession(session);
try
{
if (!new JMSOpenSessionRequest(protocol, sessionID).send())
{
throw new JMSException(Messages.getString("ConnectionImpl_2")); //$NON-NLS-1$
}
}
catch (JMSException ex)
{
throw ex;
}
catch (Exception ex)
{
throw new JMSException(ex.getMessage());
}
return session;
}
public IManagedContainer getTransportContainer()
{
return transportContainer;
}
public String getConnectorType()
{
return connectorType;
}
public String getConnectorDescription()
{
return connectorDescription;
}
public String getUserName()
{
return userName;
}
public String getPassword()
{
return password;
}
public ConnectionMetaData getMetaData()
{
ensureOpen();
setModified();
return metaData;
}
public String getClientID()
{
ensureOpen();
setModified();
return clientID;
}
public void setClientID(String clientID)
{
ensureOpen();
if (clientID != null)
{
throw new IllegalStateException("clientID != null"); //$NON-NLS-1$
}
if (modified)
{
throw new IllegalStateException("modified == true"); //$NON-NLS-1$
}
this.clientID = clientID;
}
public ExceptionListener getExceptionListener()
{
ensureOpen();
setModified();
return exceptionListener;
}
public void setExceptionListener(ExceptionListener listener)
{
ensureOpen();
setModified();
exceptionListener = listener;
}
public long getSendTimeout()
{
return sendTimeout;
}
public void setSendTimeout(long sendTimeout)
{
this.sendTimeout = sendTimeout;
}
public synchronized void start() throws JMSException
{
ensureOpen();
setModified();
if (stopped)
{
for (SessionImpl session : getSessions())
{
try
{
session.activate();
}
catch (Exception ex)
{
OM.LOG.error(ex);
throw new JMSException(ex.getMessage());
}
}
stopped = false;
}
}
public synchronized void stop()
{
ensureOpen();
setModified();
if (!stopped)
{
for (SessionImpl session : getSessions())
{
session.deactivate();
}
stopped = true;
}
}
public synchronized void close()
{
if (protocol != null)
{
stop();
for (SessionImpl session : getSessions())
{
session.close();
}
protocol.getChannel().removeListener(channelListener);
protocol.close();
protocol = null;
}
}
/**
* @since 2.0
*/
public JMSClientProtocol getProtocol()
{
return protocol;
}
public void handleMessageFromSignal(int sessionID, long consumerID, MessageImpl message)
{
SessionImpl session = sessions.get(sessionID);
session.handleServerMessage(consumerID, message);
}
public SessionImpl[] getSessions()
{
List<SessionImpl> result = new ArrayList<SessionImpl>(sessions.size());
synchronized (sessions)
{
for (SessionImpl session : sessions)
{
if (session != null)
{
result.add(session);
}
}
}
return result.toArray(new SessionImpl[result.size()]);
}
public Session[] getElements()
{
return getSessions();
}
@Override
public boolean isEmpty()
{
return getSessions().length == 0;
}
private int findFreeSessionID()
{
synchronized (sessions)
{
int size = sessions.size();
for (int i = 0; i < size; i++)
{
if (sessions.get(i) == null)
{
return i;
}
}
return size;
}
}
private void addSession(SessionImpl session)
{
synchronized (sessions)
{
int sessionID = session.getID();
while (sessionID >= sessions.size())
{
sessions.add(null);
}
sessions.set(sessionID, session);
}
}
private boolean removeSession(SessionImpl session)
{
synchronized (sessions)
{
int sessionID = session.getID();
if (sessions.get(sessionID) == session)
{
session.removeListener(sessionListener);
sessions.set(sessionID, null);
return true;
}
return false;
}
}
private void setModified()
{
modified = true;
}
private void ensureOpen() throws IllegalStateException
{
if (protocol == null)
{
throw new IllegalStateException("protocol == null"); //$NON-NLS-1$
}
}
}