blob: 5006b800cde865266544492b91c5542b77d6d1d8 [file] [log] [blame]
/*******************************************************************************
* Copyright (c)2011 REMAIN B.V. The Netherlands. (http://www.remainsoftware.com).
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Wim Jongman - initial API and implementation
* Ahmed Aadel - initial API and implementation
*******************************************************************************/
package org.eclipse.ecf.provider.zookeeper.node.internal;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.eclipse.core.runtime.Assert;
import org.eclipse.ecf.provider.zookeeper.core.DiscoverdService;
import org.eclipse.ecf.provider.zookeeper.core.ZooDiscoveryContainer;
import org.eclipse.ecf.provider.zookeeper.util.Geo;
import org.eclipse.ecf.provider.zookeeper.util.Logger;
import org.osgi.service.log.LogService;
public class ReadRoot implements Watcher, ChildrenCallback {
ZooKeeper readKeeper;
String ip;
private WatchManager watchManager;
private boolean isConnected;
private Map<String, NodeReader> nodeReaders = Collections
.synchronizedMap(new HashMap<String, NodeReader>());
private Map<String, DiscoverdService> discoverdServices;
private Map<String, List<DiscoverdService>> perTypeDiscoverdServices;
private Object connectionLock = new Object();
ReadRoot(String ip, WatchManager watchManager) {
Assert.isNotNull(ip);
Assert.isNotNull(watchManager);
this.ip = ip;
this.watchManager = watchManager;
discoverdServices = Collections
.synchronizedMap(new HashMap<String, DiscoverdService>());
perTypeDiscoverdServices = Collections
.synchronizedMap(new HashMap<String, List<DiscoverdService>>());
connect();
}
public void process(final WatchedEvent event) {
ZooDiscoveryContainer.CACHED_THREAD_POOL.execute(new Runnable() {
public void run() {
synchronized (connectionLock) {
if (watchManager.isDisposed())
return;
switch (event.getState()) {
case Disconnected:
ReadRoot.this.isConnected = false;
connect();
break;
case Expired:
ReadRoot.this.isConnected = false;
connect();
break;
case SyncConnected:
if (!ReadRoot.this.isConnected) {
ReadRoot.this.isConnected = true;
ReadRoot.this.watchManager
.addZooKeeper(ReadRoot.this.readKeeper);
ReadRoot.this.readKeeper.exists(INode.ROOT,
ReadRoot.this, null, null);
ReadRoot.this.readKeeper.getChildren(INode.ROOT,
ReadRoot.this, ReadRoot.this, null);
}
break;
// ignore @deprecated cases
}
switch (event.getType()) {
case NodeDeleted:
if (event.getPath() == null
|| event.getPath().equals(INode.ROOT))
break;
ReadRoot.this.nodeReaders.remove(event.getPath());
break;
case NodeChildrenChanged:
if (ReadRoot.this.isConnected) {
ReadRoot.this.readKeeper.exists(INode.ROOT,
ReadRoot.this, null, null);
ReadRoot.this.readKeeper.getChildren(INode.ROOT,
ReadRoot.this, ReadRoot.this, null);
}
break;
}
}
}
});
}
private void connect() {
synchronized (connectionLock) {
if (this.isConnected || watchManager.isDisposed())
return;
this.nodeReaders.clear();
if (this.readKeeper != null) {
// discard the current stale reader
try {
this.readKeeper.close();
} catch (InterruptedException e) {
Logger.log(
LogService.LOG_ERROR,
"Error while closing the current ZooKeeper: "
+ e.getMessage(), e);
}
this.watchManager.removeZooKeeper(this.readKeeper);
this.readKeeper = null;
}
try {
// try reconnecting
this.readKeeper = new ZooKeeper(this.ip, 3000, this);
} catch (IOException ioe) {
Logger.log(LogService.LOG_ERROR,
"Cannot initiate a new ZooKeeper: " + ioe.getMessage(),
ioe);
}
}
}
public void processResult(int rc, final String path, Object ctx,
final List<String> children) {
ZooDiscoveryContainer.CACHED_THREAD_POOL.execute(new Runnable() {
public void run() {
synchronized (connectionLock) {
if (watchManager.isDisposed())
return;
if (path == null || children == null
|| children.size() == 0) {
/* No children available yet, set a watch on it */
ReadRoot.this.readKeeper.getChildren(INode.ROOT,
ReadRoot.this, ReadRoot.this, null);
return;
}
for (String p : children) {
if (Geo.isOwnPublication(p)) {
/* own publications need not to be discovered */
continue;
}
if (!ReadRoot.this.nodeReaders.containsKey(p)) {
/* launch a new reader to handle this node's data */
NodeReader nr = new NodeReader(p, ReadRoot.this);
/* watch this very path for deletion */
ReadRoot.this.readKeeper.exists(
nr.getAbsolutePath(), ReadRoot.this, null,
null);
ReadRoot.this.nodeReaders.put(nr.getPath(), nr);
}
}
}
}
});
}
public ZooKeeper getReadKeeper() {
return this.readKeeper;
}
public String getIp() {
return this.ip;
}
public Map<String, DiscoverdService> getDiscoverdServices() {
return discoverdServices;
}
public Map<String, List<DiscoverdService>> getPerTypeDiscoverdServices() {
return perTypeDiscoverdServices;
}
}