blob: 3d362e6a97f3cf78e19f677ffff16746953055a8 [file] [log] [blame]
/*******************************************************************************
* Copyright (c)2010 REMAIN B.V. The Netherlands. (http://www.remainsoftware.com).
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Wim Jongman - initial API and implementation
* Ahmed Aadel - initial API and implementation
*******************************************************************************/
package org.eclipse.ecf.provider.zookeeper.node.internal;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.eclipse.core.runtime.Assert;
import org.eclipse.ecf.provider.zookeeper.core.ZooDiscoveryContainer;
import org.eclipse.ecf.provider.zookeeper.util.Logger;
import org.eclipse.ecf.provider.zookeeper.util.PrettyPrinter;
import org.osgi.service.log.LogService;
class WriteRoot implements Watcher {
private ZooKeeper writeKeeper;
private String ip;
private WatchManager watchManager;
private boolean isConnected;
private Object connectionLock = new Object();
WriteRoot(String ip, WatchManager watchManager) {
Assert.isNotNull(ip);
Assert.isNotNull(watchManager);
this.ip = ip;
this.watchManager = watchManager;
initWriteKeeper();
}
@SuppressWarnings({ "incomplete-switch" })
public void process(final WatchedEvent event) {
ZooDiscoveryContainer.CACHED_THREAD_POOL.execute(new Runnable() {
public void run() {
synchronized (connectionLock) {
switch (event.getState()) {
case Disconnected:
isConnected = false;
watchManager.unpublishAll();
connect();
break;
case Expired:
isConnected = false;
watchManager.unpublishAll();
connect();
break;
case SyncConnected:
if (!isConnected) {
isConnected = true;
watchManager.addZooKeeper(writeKeeper);
watchManager.republishAll();
}
break;
// ignore @deprecated cases
}
}
}
});
}
private void connect() {
synchronized (connectionLock) {
if (this.isConnected || watchManager.isDisposed()) {
return;
}
try {
if (writeKeeper != null) {
writeKeeper.close();
watchManager.removeZooKeeper(writeKeeper);
writeKeeper = null;
}
writeKeeper = new ZooKeeper(this.ip, 3000, this);
} catch (Exception e) {
Logger.log(LogService.LOG_DEBUG, e.getMessage(), e);
}
}
}
private void initWriteKeeper() {
try {
if (watchManager.getConfig().isQuorum()
|| watchManager.getConfig().isStandAlone()) {
// we write nodes locally but we should check for client port.
int port = watchManager.getConfig().getClientPort();
if (port != 0)
ip += ":" + port;//$NON-NLS-1$
} else if (watchManager.getConfig().isCentralized()) {
// we write nodes to the machine with this specified IP address.
ip = watchManager.getConfig().getServerIps();
}
try {
writeKeeper = new ZooKeeper(this.ip, 3000, this);
} catch (Exception e) {
// FATAL
Logger.log(LogService.LOG_ERROR,
"Fatal error while initializing a zookeeper client to write to: "
+ ip, e);
// halt here before the NPE's get out of house in
// Publisher.publish()
throw new IllegalStateException(e);
}
while (!this.isConnected) {
synchronized (connectionLock) {
if (watchManager.isDisposed()) {
// no need for connecting, we're disposed.
try {
writeKeeper.close();
} catch (Throwable t) {
// ignore
}
break;
}
try {
Stat s = this.writeKeeper.exists(INode.ROOT, this);
this.isConnected = true;
if (s == null) {
writeKeeper.create(INode.ROOT, new byte[0],
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
if (e.code()
.equals(KeeperException.Code.CONNECTIONLOSS)) {
isConnected = false;
PrettyPrinter.attemptingConnectionTo(this.ip);
} else
Logger.log(
LogService.LOG_ERROR,
"Error while trying to connect to " + this.ip, e); //$NON-NLS-1$
}
}
}
synchronized (this) {
this.notifyAll();
}
} catch (Exception e) {
Logger.log(LogService.LOG_DEBUG, e.getMessage(), e);
}
}
public ZooKeeper getWriteKeeper() {
return writeKeeper;
}
public boolean isConnected() {
return isConnected;
}
public WatchManager getWatchManager() {
return watchManager;
}
}