Merge branch 'master' of ssh://wjongman@git.eclipse.org/gitroot/ecf/org.eclipse.ecf.git
diff --git a/providers/bundles/org.eclipse.ecf.provider.zookeeper/src/org/eclipse/ecf/provider/zookeeper/node/internal/WriteRoot.java b/providers/bundles/org.eclipse.ecf.provider.zookeeper/src/org/eclipse/ecf/provider/zookeeper/node/internal/WriteRoot.java
index c29cf31..3d362e6 100644
--- a/providers/bundles/org.eclipse.ecf.provider.zookeeper/src/org/eclipse/ecf/provider/zookeeper/node/internal/WriteRoot.java
+++ b/providers/bundles/org.eclipse.ecf.provider.zookeeper/src/org/eclipse/ecf/provider/zookeeper/node/internal/WriteRoot.java
@@ -1,153 +1,166 @@
-/*******************************************************************************
- * Copyright (c)2010 REMAIN B.V. The Netherlands. (http://www.remainsoftware.com).
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Wim Jongman - initial API and implementation
- * Ahmed Aadel - initial API and implementation
- *******************************************************************************/
-
-package org.eclipse.ecf.provider.zookeeper.node.internal;
-
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.eclipse.core.runtime.Assert;
-import org.eclipse.ecf.provider.zookeeper.util.Logger;
-import org.eclipse.ecf.provider.zookeeper.util.PrettyPrinter;
-import org.osgi.service.log.LogService;
-
-class WriteRoot implements Watcher {
- private ZooKeeper writeKeeper;
- private String ip;
- private WatchManager watchManager;
-
- private boolean isConnected;
-
- WriteRoot(String ip, WatchManager watchManager) {
- Assert.isNotNull(ip);
- Assert.isNotNull(watchManager);
- this.ip = ip;
- this.watchManager = watchManager;
- initWriteKeeper();
- }
-
- @SuppressWarnings({ "incomplete-switch" })
- public void process(WatchedEvent event) {
- switch (event.getState()) {
- case Disconnected:
- this.isConnected = false;
- this.watchManager.unpublishAll();
- connect();
- break;
- case Expired:
- this.isConnected = false;
- this.watchManager.unpublishAll();
-
- connect();
- break;
- case SyncConnected:
- if (!this.isConnected) {
- this.isConnected = true;
- this.watchManager.republishAll();
- }
- break;
- // ignore @deprecated cases
- }
- }
-
- private void connect() {
- if (this.isConnected || watchManager.isDisposed()) {
- return;
- }
- try {
- if (this.writeKeeper != null) {
- this.writeKeeper.close();
- this.writeKeeper = null;
- this.watchManager.removeZooKeeper(this.writeKeeper);
- }
- this.writeKeeper = new ZooKeeper(this.ip, 3000, this);
-
- } catch (Exception e) {
- Logger.log(LogService.LOG_DEBUG, e.getMessage(), e);
- }
- }
-
- private void initWriteKeeper() {
-
- if (watchManager.getConfig().isQuorum()
- || watchManager.getConfig().isStandAlone()) {
- // we write nodes locally but we should check for client port.
- int port = watchManager.getConfig().getClientPort();
- if (port != 0)
- ip += ":" + port;//$NON-NLS-1$
- } else if (watchManager.getConfig().isCentralized()) {
- // we write nodes to the machine with this specified IP address.
- ip = watchManager.getConfig().getServerIps();
- }
- try {
- this.writeKeeper = new ZooKeeper(this.ip, 3000, this);
- } catch (Exception e) {
- // FATAL
- Logger.log(LogService.LOG_ERROR,
- "Fatal error while initializing a zookeeper client to write to: "
- + ip, e);
- // halt here before the NPE's get out of house in
- // Publisher.publish()
- throw new IllegalStateException(e);
- }
- while (!this.isConnected) {
- if (watchManager.isDisposed()) {
- // no need for connecting, we're disposed.
- try {
- this.writeKeeper.close();
- } catch (Throwable t) {
- // ignore
- }
- break;
- }
- try {
- Stat s = this.writeKeeper.exists(INode.ROOT, this);
- this.isConnected = true;
- if (s == null) {
- this.writeKeeper.create(INode.ROOT, new byte[0],
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }
-
- } catch (KeeperException e) {
- if (e.code().equals(KeeperException.Code.CONNECTIONLOSS)) {
- this.isConnected = false;
- PrettyPrinter.attemptingConnectionTo(this.ip);
- } else
- Logger.log(LogService.LOG_ERROR,
- "Error while trying to connect to " + this.ip, e); //$NON-NLS-1$
- } catch (InterruptedException e) {
- // ignore
- }
- }
- synchronized (this) {
- this.notifyAll();
- }
- this.watchManager.addZooKeeper(this.writeKeeper);
- }
-
- public ZooKeeper getWriteKeeper() {
- return this.writeKeeper;
- }
-
- public boolean isConnected() {
- return this.isConnected;
- }
-
- public WatchManager getWatchManager() {
- return watchManager;
- }
-
+/*******************************************************************************
+ * Copyright (c)2010 REMAIN B.V. The Netherlands. (http://www.remainsoftware.com).
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Wim Jongman - initial API and implementation
+ * Ahmed Aadel - initial API and implementation
+ *******************************************************************************/
+
+package org.eclipse.ecf.provider.zookeeper.node.internal;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.eclipse.core.runtime.Assert;
+import org.eclipse.ecf.provider.zookeeper.core.ZooDiscoveryContainer;
+import org.eclipse.ecf.provider.zookeeper.util.Logger;
+import org.eclipse.ecf.provider.zookeeper.util.PrettyPrinter;
+import org.osgi.service.log.LogService;
+
+class WriteRoot implements Watcher {
+ private ZooKeeper writeKeeper;
+ private String ip;
+ private WatchManager watchManager;
+ private boolean isConnected;
+ private Object connectionLock = new Object();
+
+ WriteRoot(String ip, WatchManager watchManager) {
+ Assert.isNotNull(ip);
+ Assert.isNotNull(watchManager);
+ this.ip = ip;
+ this.watchManager = watchManager;
+ initWriteKeeper();
+ }
+
+ @SuppressWarnings({ "incomplete-switch" })
+ public void process(final WatchedEvent event) {
+ ZooDiscoveryContainer.CACHED_THREAD_POOL.execute(new Runnable() {
+ public void run() {
+ synchronized (connectionLock) {
+ switch (event.getState()) {
+ case Disconnected:
+ isConnected = false;
+ watchManager.unpublishAll();
+ connect();
+ break;
+ case Expired:
+ isConnected = false;
+ watchManager.unpublishAll();
+ connect();
+ break;
+ case SyncConnected:
+ if (!isConnected) {
+ isConnected = true;
+ watchManager.addZooKeeper(writeKeeper);
+ watchManager.republishAll();
+ }
+ break;
+ // ignore @deprecated cases
+ }
+ }
+ }
+ });
+ }
+
+ private void connect() {
+ synchronized (connectionLock) {
+ if (this.isConnected || watchManager.isDisposed()) {
+ return;
+ }
+ try {
+ if (writeKeeper != null) {
+ writeKeeper.close();
+ watchManager.removeZooKeeper(writeKeeper);
+ writeKeeper = null;
+ }
+ writeKeeper = new ZooKeeper(this.ip, 3000, this);
+
+ } catch (Exception e) {
+ Logger.log(LogService.LOG_DEBUG, e.getMessage(), e);
+ }
+ }
+ }
+
+ private void initWriteKeeper() {
+ try {
+ if (watchManager.getConfig().isQuorum()
+ || watchManager.getConfig().isStandAlone()) {
+ // we write nodes locally but we should check for client port.
+ int port = watchManager.getConfig().getClientPort();
+ if (port != 0)
+ ip += ":" + port;//$NON-NLS-1$
+ } else if (watchManager.getConfig().isCentralized()) {
+ // we write nodes to the machine with this specified IP address.
+ ip = watchManager.getConfig().getServerIps();
+ }
+ try {
+ writeKeeper = new ZooKeeper(this.ip, 3000, this);
+ } catch (Exception e) {
+ // FATAL
+ Logger.log(LogService.LOG_ERROR,
+ "Fatal error while initializing a zookeeper client to write to: "
+ + ip, e);
+ // halt here before the NPE's get out of house in
+ // Publisher.publish()
+ throw new IllegalStateException(e);
+ }
+ while (!this.isConnected) {
+ synchronized (connectionLock) {
+ if (watchManager.isDisposed()) {
+ // no need for connecting, we're disposed.
+ try {
+ writeKeeper.close();
+ } catch (Throwable t) {
+ // ignore
+ }
+ break;
+ }
+ try {
+ Stat s = this.writeKeeper.exists(INode.ROOT, this);
+ this.isConnected = true;
+ if (s == null) {
+ writeKeeper.create(INode.ROOT, new byte[0],
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+
+ } catch (KeeperException e) {
+ if (e.code()
+ .equals(KeeperException.Code.CONNECTIONLOSS)) {
+ isConnected = false;
+ PrettyPrinter.attemptingConnectionTo(this.ip);
+ } else
+ Logger.log(
+ LogService.LOG_ERROR,
+ "Error while trying to connect to " + this.ip, e); //$NON-NLS-1$
+ }
+ }
+ }
+ synchronized (this) {
+ this.notifyAll();
+ }
+
+ } catch (Exception e) {
+ Logger.log(LogService.LOG_DEBUG, e.getMessage(), e);
+ }
+ }
+
+ public ZooKeeper getWriteKeeper() {
+ return writeKeeper;
+ }
+
+ public boolean isConnected() {
+ return isConnected;
+ }
+
+ public WatchManager getWatchManager() {
+ return watchManager;
+ }
}
\ No newline at end of file