blob: 187bdcfd2b56b228db0c800411bc83217a988687 [file] [log] [blame]
/*
* Copyright (c) 2007-2012, 2015 Eike Stepper (Berlin, Germany) and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Eike Stepper - initial API and implementation
*/
package org.eclipse.net4j.jms.internal.server;
import org.eclipse.net4j.internal.jms.ConnectionFactoryImpl;
import org.eclipse.net4j.internal.jms.JMSDestination;
import org.eclipse.net4j.internal.jms.MessageImpl;
import org.eclipse.net4j.jms.internal.server.bundle.OM;
import org.eclipse.net4j.jms.internal.server.messages.Messages;
import org.eclipse.net4j.jms.server.IDestination;
import org.eclipse.net4j.jms.server.IServer;
import org.eclipse.net4j.jms.server.IStore;
import org.eclipse.net4j.jms.server.IStoreTransaction;
import org.eclipse.net4j.util.HexUtil;
import org.eclipse.net4j.util.concurrent.NonBlockingLongCounter;
import org.eclipse.net4j.util.concurrent.QueueWorker;
import org.eclipse.net4j.util.om.OMPlatform;
import javax.jms.Destination;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* @author Eike Stepper
*/
public class Server extends QueueWorker<MessageImpl>implements IServer
{
public static final Server INSTANCE = new Server();
private static final boolean REBIND_DESTINATIONS = true;
private NonBlockingLongCounter messageIDCounter = new NonBlockingLongCounter();
private NonBlockingLongCounter consumerIDCounter = new NonBlockingLongCounter();
private IStore store = null;
private Context jndiContext;
private ConcurrentMap<String, ServerDestination> destinations = new ConcurrentHashMap<String, ServerDestination>();
private Set<ServerConnection> connections = new HashSet<ServerConnection>();
private ConcurrentMap<Long, ServerConsumer> consumers = new ConcurrentHashMap<Long, ServerConsumer>();
public Server()
{
}
public IStore getStore()
{
return store;
}
public void setStore(IStore store)
{
this.store = store;
}
public IDestination createDestination(String name, IDestination.Type type)
{
ServerDestination destination = new ServerDestination(name, type);
ServerDestination existing = destinations.putIfAbsent(name, destination);
if (existing != null)
{
throw new IllegalStateException("Destination " + type + " " + name + " does already exist"); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
}
IStoreTransaction transaction = store.startTransaction();
transaction.destinationAdded(destination);
store.commitTransaction(transaction);
try
{
destination.bind(jndiContext, REBIND_DESTINATIONS);
}
catch (NamingException ex)
{
OM.LOG.error(ex);
destinations.remove(name);
destination = null;
}
return destination;
}
public ServerDestination getDestination(String name)
{
return destinations.get(name);
}
public ServerDestination getServerDestination(Destination jmsDestination)
{
String name = ((JMSDestination)jmsDestination).getName();
ServerDestination destination = destinations.get(name);
if (destination == null)
{
OM.LOG.error(Messages.getString("Server.3") + name); //$NON-NLS-1$
return null;
}
return destination;
}
public ServerConnection logon(String userName, String password)
{
ServerConnection connection = new ServerConnection(this, userName);
synchronized (connections)
{
connections.add(connection);
}
return connection;
}
public ServerConsumer createConsumer(ServerDestination destination, String messageSelector, boolean noLocal,
boolean durable)
{
long consumerID = consumerIDCounter.increment();
ServerConsumer consumer = new ServerConsumer(consumerID, destination, messageSelector, noLocal, durable);
consumers.put(consumer.getID(), consumer);
return consumer;
}
public ServerConsumer getConsumer(long consumerID)
{
return consumers.get(consumerID);
}
public String[] handleClientMessages(MessageImpl[] messages)
{
IStoreTransaction transaction = store.startTransaction();
String[] messageIDs = handleClientMessagesInTransaction(transaction, messages);
store.commitTransaction(transaction);
return messageIDs;
}
public String[] handleClientMessagesInTransaction(IStoreTransaction transaction, MessageImpl[] messages)
{
String[] messageIDs = new String[messages.length];
for (int i = 0; i < messages.length; i++)
{
MessageImpl message = messages[i];
if (getServerDestination(message.getJMSDestination()) == null)
{
return null;
}
messageIDs[i] = "ID:NET4J:" + HexUtil.longToHex(messageIDCounter.increment()); //$NON-NLS-1$
message.setJMSMessageID(messageIDs[i]);
}
for (MessageImpl message : messages)
{
transaction.messageReceived(message);
addWork(message);
}
return messageIDs;
}
@Override
protected String getThreadName()
{
return "jms-server"; //$NON-NLS-1$
}
@Override
protected void work(WorkContext context, MessageImpl message)
{
ServerDestination destination = getServerDestination(message.getJMSDestination());
IStoreTransaction transaction = store.startTransaction();
destination.handleClientMessage(transaction, message);
store.commitTransaction(transaction);
}
@Override
protected void doBeforeActivate() throws Exception
{
super.doBeforeActivate();
if (store == null)
{
throw new IllegalStateException("store == null"); //$NON-NLS-1$
}
}
@Override
protected void doActivate() throws Exception
{
super.doActivate();
String name = OMPlatform.INSTANCE.getProperty("net4j.jms.connection.factory.name", "net4j.jms.ConnectionFactory"); //$NON-NLS-1$ //$NON-NLS-2$
String type = OMPlatform.INSTANCE.getProperty("net4j.jms.connector.type", "tcp"); //$NON-NLS-1$ //$NON-NLS-2$
String desc = OMPlatform.INSTANCE.getProperty("net4j.jms.connector.description", "localhost"); //$NON-NLS-1$ //$NON-NLS-2$
jndiContext = new InitialContext();
jndiContext.rebind(name, new ConnectionFactoryImpl(type, desc));
}
@Override
protected void doDeactivate() throws Exception
{
synchronized (connections)
{
for (ServerConnection connection : connections)
{
try
{
connection.close();
}
catch (Exception ex)
{
OM.LOG.error(ex);
}
}
}
jndiContext.close();
jndiContext = null;
super.doDeactivate();
}
}