blob: 44b6360f9f0c13cb28a3d80588624c31edd2869b [file] [log] [blame]
/*******************************************************************************
* Copyright (c)2010 REMAIN B.V. The Netherlands. (http://www.remainsoftware.com).
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Wim Jongman - initial API and implementation
* Ahmed Aadel - initial API and implementation
*******************************************************************************/
package org.eclipse.ecf.provider.zookeeper.core;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.zookeeper.server.NIOServerCnxn;
import org.apache.zookeeper.server.PurgeTxnLog;
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.NIOServerCnxn.Factory;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.eclipse.core.runtime.Assert;
import org.eclipse.ecf.core.ContainerConnectException;
import org.eclipse.ecf.core.identity.ID;
import org.eclipse.ecf.core.identity.Namespace;
import org.eclipse.ecf.core.security.IConnectContext;
import org.eclipse.ecf.discovery.AbstractDiscoveryContainerAdapter;
import org.eclipse.ecf.discovery.IServiceInfo;
import org.eclipse.ecf.discovery.IServiceListener;
import org.eclipse.ecf.discovery.IServiceTypeListener;
import org.eclipse.ecf.discovery.identity.IServiceID;
import org.eclipse.ecf.discovery.identity.IServiceTypeID;
import org.eclipse.ecf.provider.zookeeper.core.internal.Advertiser;
import org.eclipse.ecf.provider.zookeeper.core.internal.Configuration;
import org.eclipse.ecf.provider.zookeeper.core.internal.Configurator;
import org.eclipse.ecf.provider.zookeeper.core.internal.Localizer;
import org.eclipse.ecf.provider.zookeeper.core.internal.Notification;
import org.eclipse.ecf.provider.zookeeper.node.internal.WatchManager;
import org.eclipse.ecf.provider.zookeeper.util.Geo;
import org.eclipse.ecf.provider.zookeeper.util.Logger;
import org.eclipse.ecf.provider.zookeeper.util.PrettyPrinter;
import org.osgi.framework.ServiceReference;
import org.osgi.service.log.LogService;
public class ZooDiscoveryContainer extends AbstractDiscoveryContainerAdapter {
private static ZooDiscoveryContainer discovery;
public static ExecutorService CACHED_THREAD_POOL = Executors
.newCachedThreadPool();
private QuorumPeer quorumPeer;
private Properties DiscoveryProperties;
protected Advertiser advertiser;
protected Localizer localizer;
private ZooKeeperServer zooKeeperServer;
private ID targetId;
protected boolean isQuorumPeerReady;
private ZooDiscoveryNamespace namespace;
private boolean isConnected;
private boolean isDisposed;
private WatchManager watchManager;
public enum FLAVOR {
STANDALONE, CENTRALIZED, REPLICATED;
public String toString() {
switch (this) {
case STANDALONE:
return DefaultDiscoveryConfig.ZOODISCOVERY_FLAVOR_STANDALONE;
case CENTRALIZED:
return DefaultDiscoveryConfig.ZOODISCOVERY_FLAVOR_CENTRALIZED;
case REPLICATED:
return DefaultDiscoveryConfig.ZOODISCOVERY_FLAVOR_REPLICATED;
}
throw new AssertionError("Unsupported configuration");
}
}
private ZooDiscoveryContainer() {
super(ZooDiscoveryNamespace.NAME, Configurator.INSTANCE);
DiscoveryProperties = new Properties();
this.namespace = new ZooDiscoveryNamespace();
if (System.getProperty(DefaultDiscoveryConfig.ZOODISCOVERY_PREFIX
+ DefaultDiscoveryConfig.ZOOKEEPER_AUTOSTART) != null) {
try {
this.targetId = this.getConnectNamespace().createInstance(
new String[] { DefaultDiscoveryConfig
.getDefaultTarget() });
init(targetId);
} catch (Exception e) {
Logger.log(LogService.LOG_ERROR, e.getMessage(), e);
}
}
PrettyPrinter.prompt(PrettyPrinter.ACTIVATED, null);
}
public synchronized static ZooDiscoveryContainer getSingleton() {
if (discovery == null) {
discovery = new ZooDiscoveryContainer();
}
discovery.setDisposed(false);
return discovery;
}
public void init(ServiceReference reference) {
Configuration conf = Configurator.INSTANCE.createConfig(reference)
.configure();
doStart(conf);
}
private void init(ID targetID) {
Configuration conf = Configurator.INSTANCE.createConfig(targetID)
.configure();
doStart(conf);
}
private void doStart(final Configuration conf) {
if (watchManager != null && !watchManager.isDisposed())
return;
watchManager = new WatchManager(conf);
this.advertiser = Advertiser.getSingleton(watchManager);
this.localizer = Localizer.getSingleton();
if (conf.isCentralized()) {
if (Geo.getHost().equals(conf.getServerIps().split(":")[0])) { //$NON-NLS-1$
CACHED_THREAD_POOL.execute(new Runnable() {
public void run() {
startStandAlone(conf);
watchManager.watch();
ZooDiscoveryContainer.this.localizer.init();
}
});
} else {
watchManager.watch();
this.localizer.init();
}
} else if (conf.isQuorum()) {
CACHED_THREAD_POOL.execute(new Runnable() {
public void run() {
startQuorumPeer(conf);
watchManager.watch();
ZooDiscoveryContainer.this.localizer.init();
}
});
}
else if (conf.isStandAlone()) {
CACHED_THREAD_POOL.execute(new Runnable() {
public void run() {
startStandAlone(conf);
watchManager.watch();
ZooDiscoveryContainer.this.localizer.init();
}
});
}
}
/**
* Start a ZooKeeer server locally to write nodes to. Implied by
* {@link IDiscoveryConfig#ZOODISCOVERY_FLAVOR_STANDALONE} configuration.
*
* @param conf
*/
void startStandAlone(final Configuration conf) {
if (this.zooKeeperServer != null && this.zooKeeperServer.isRunning())
return;
else if (this.zooKeeperServer != null
&& !this.zooKeeperServer.isRunning())
try {
this.zooKeeperServer.startup();
return;
} catch (Exception e) {
Logger.log(LogService.LOG_DEBUG,
"Zookeeper server cannot be started! ", e);//$NON-NLS-1$
}
try {
ZooDiscoveryContainer.this.zooKeeperServer = new ZooKeeperServer();
FileTxnSnapLog fileTxnSnapLog = new FileTxnSnapLog(conf
.getZookeeperDataFile(), conf.getZookeeperDataFile());
ZooDiscoveryContainer.this.zooKeeperServer
.setTxnLogFactory(fileTxnSnapLog);
ZooDiscoveryContainer.this.zooKeeperServer.setTickTime(conf
.getTickTime());
Factory cnxnFactory = new NIOServerCnxn.Factory(
new InetSocketAddress(conf.getClientPort()));
cnxnFactory.startup(ZooDiscoveryContainer.this.zooKeeperServer);
} catch (Exception e) {
Logger
.log(
LogService.LOG_ERROR,
"Zookeeper server cannot be started! Possibly another instance is already running on the same port. ",
e);
}
}
/**
* Start a local ZooKeeer server to write nodes to. It plays as a peer
* within a replicated servers configuration. Implied by
* {@link IDiscoveryConfig#ZOODISCOVERY_FLAVOR_REPLICATED} configuration.
*
* @param conf
*/
void startQuorumPeer(final Configuration conf) {
if (this.quorumPeer != null && this.quorumPeer.isAlive()) {
return;
} else if (this.quorumPeer != null && !this.quorumPeer.isAlive()) {
this.quorumPeer.start();
return;
}
try {
final QuorumPeerConfig quorumPeerConfig = new QuorumPeerConfig();
quorumPeerConfig.parse(conf.getConfFile());
QuorumPeer.Factory qpFactory = new QuorumPeer.Factory() {
public QuorumPeer create(NIOServerCnxn.Factory cnxnFactory)
throws IOException {
ServerConfig serverConfig = new ServerConfig();
serverConfig.readFrom(quorumPeerConfig);
QuorumPeer peer = new QuorumPeer(quorumPeerConfig
.getServers(), new File(serverConfig.getDataDir()),
new File(serverConfig.getDataLogDir()),
quorumPeerConfig.getElectionAlg(), quorumPeerConfig
.getServerId(), quorumPeerConfig
.getTickTime(), quorumPeerConfig
.getInitLimit(), quorumPeerConfig
.getSyncLimit(), cnxnFactory,
quorumPeerConfig.getQuorumVerifier());
ZooDiscoveryContainer.this.quorumPeer = peer;
return peer;
}
public NIOServerCnxn.Factory createConnectionFactory()
throws IOException {
return new NIOServerCnxn.Factory(quorumPeerConfig
.getClientPortAddress());
}
};
quorumPeer = qpFactory.create(qpFactory.createConnectionFactory());
quorumPeer.start();
quorumPeer.setDaemon(true);
isQuorumPeerReady = true;
} catch (Exception e) {
Logger.log(LogService.LOG_ERROR,
"Zookeeper quorum cannot be started! ", e); //$NON-NLS-1$
isQuorumPeerReady = false;
}
}
public void setDiscoveryProperties(Properties discoveryProperties) {
this.DiscoveryProperties = discoveryProperties;
}
public Properties getDiscoveryProperties() {
return this.DiscoveryProperties;
}
public synchronized void shutdown() {
if (isDisposed)
return;
try {
if (watchManager != null) {
watchManager.dispose();
}
if (this.localizer != null) {
this.localizer.close();
}
if (this.zooKeeperServer != null) {
// purge snaps and logs. Keep only last three of each
PurgeTxnLog.purge(this.zooKeeperServer.getTxnLogFactory()
.getDataDir(), this.zooKeeperServer.getTxnLogFactory()
.getSnapDir(), 3);
this.zooKeeperServer.shutdown();
}
if (this.quorumPeer != null) {
// purge snaps and logs. Keep only last three of each
PurgeTxnLog.purge(this.quorumPeer.getTxnFactory().getDataDir(),
this.quorumPeer.getTxnFactory().getSnapDir(), 3);
// shut down server
if (this.quorumPeer.isAlive()) {
this.quorumPeer.shutdown();
}
// shutdown sockets
this.quorumPeer.getCnxnFactory().shutdown();
}
} catch (Throwable t) {
Logger.log(LogService.LOG_ERROR, t.getMessage(), t);
}
targetId = null;
isConnected = false;
isDisposed = true;
}
public ZooKeeperServer getLocalServer() {
return this.zooKeeperServer;
}
public void connect(ID id, IConnectContext connectContext)
throws ContainerConnectException {
if (isDisposed)
throw new ContainerConnectException("Container already disposed!");
if (this.isConnected)
throw new ContainerConnectException("Container already connected!");
this.targetId = id;
if (this.targetId == null) {
this.targetId = this.getConnectNamespace().createInstance(
new String[] { DefaultDiscoveryConfig.getDefaultTarget() });
}
init(this.targetId);
isConnected = true;
}
public void disconnect() {
if (watchManager != null) {
watchManager.dispose();
}
isConnected = false;
targetId = null;
}
public Namespace getConnectNamespace() {
return this.namespace;
}
public ID getConnectedID() {
if (!isConnected || isDisposed)
return null;
return this.targetId;
}
public IServiceInfo getServiceInfo(IServiceID serviceID) {
Assert.isNotNull(serviceID);
return watchManager.getAllKnownServices().get(serviceID.getName());
}
public IServiceTypeID[] getServiceTypes() {
IServiceTypeID ids[] = new IServiceTypeID[getServices().length];
for (int i = 0; i < ids.length; i++) {
ids[i] = getServices()[i].getServiceID().getServiceTypeID();
}
return ids;
}
public IServiceInfo[] getServices() {
if (watchManager == null) {
return new IServiceInfo[0];
}
return watchManager.getAllKnownServices().values().toArray(
new IServiceInfo[watchManager.getAllKnownServices().size()]);
}
public IServiceInfo[] getServices(IServiceTypeID type) {
Assert.isNotNull(type);
if (watchManager == null) {
return new IServiceInfo[0];
}
List<IServiceInfo> services = new ArrayList<IServiceInfo>();
for (IServiceInfo sinfo : watchManager.getAllKnownServices().values()) {
if (sinfo.getServiceID().getServiceTypeID().getInternal() == type
.getInternal())
services.add(sinfo);
}
return services.toArray(new IServiceInfo[services.size()]);
}
public Namespace getServicesNamespace() {
return this.namespace;
}
public synchronized void registerService(IServiceInfo serviceInfo) {
Assert.isNotNull(serviceInfo);
if (targetId == null) {
this.targetId = this.getConnectNamespace().createInstance(
new String[] { DefaultDiscoveryConfig.getDefaultTarget() });
init(this.targetId);
}
if (serviceInfo instanceof AdvertisedService) {
watchManager.publish((AdvertisedService) serviceInfo);
} else {
watchManager.publish(new AdvertisedService(serviceInfo));
}
Localizer.getSingleton().localize(
new Notification(serviceInfo, Notification.AVAILABLE));
}
public void unregisterAllServices() {
watchManager.unpublishAll();
}
public void unregisterService(IServiceInfo serviceInfo) {
Assert.isNotNull(serviceInfo);
watchManager.unpublish(serviceInfo.getServiceID().getServiceTypeID()
.getInternal());
Localizer.getSingleton().localize(
new Notification(serviceInfo, Notification.UNAVAILABLE));
}
public Collection<IServiceListener> getAllServiceListeners() {
return super.allServiceListeners;
}
public Collection<IServiceListener> getServiceListenersForType(
IServiceTypeID type) {
return super.getListeners(type);
}
public Collection<IServiceTypeListener> getServiceTypeListeners() {
return super.serviceTypeListeners;
}
public void dispose() {
shutdown();
super.dispose();
}
public ID getID() {
return Configurator.INSTANCE.getID();
}
public boolean isDisposed() {
return this.isDisposed;
}
private void setDisposed(boolean d) {
this.isDisposed = d;
}
}