| /*********************************************************************************************************************** |
| * Copyright (c) 2008, 2013 Empolis Information Management GmbH and brox IT Solutions GmbH. All rights reserved. This |
| * program and the accompanying materials are made available under the terms of the Eclipse Public License v1.0 which |
| * accompanies this distribution, and is available at http://www.eclipse.org/legal/epl-v10.html |
| * |
| * Contributors: Juergen Schumacher (Empolis Information Management GmbH) - implementation |
| **********************************************************************************************************************/ |
| package org.eclipse.smila.zookeeper; |
| |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.WatchedEvent; |
| import org.apache.zookeeper.Watcher; |
| import org.eclipse.smila.utils.config.ConfigurationUpdateWatcher; |
| import org.osgi.service.component.ComponentContext; |
| |
| /** |
| * Uses ZooKeeper to coordinate configuration updating in a SMILA cluster. |
| */ |
| public class ZkConfigurationUpdateWatcher implements ConfigurationUpdateWatcher, Watcher { |
| |
| private static final String BASEPATH = "/smila/configversions/"; |
| |
| /** interval to poll for changes, in seconds. */ |
| private static final int POLL_INTERVAL = 60; |
| |
| /** local logger. */ |
| private final Log _log = LogFactory.getLog(getClass()); |
| |
| /** ZooKeeper service reference. */ |
| private ZooKeeperService _zkService; |
| |
| /** processor to notify about update from other nodes. */ |
| private UpdateableService _service; |
| |
| private String _configType; |
| |
| /** znode name to use for our data. */ |
| private String _rootPath; |
| |
| /** conection to zookeeper. */ |
| private ZkConnection _zk; |
| |
| /** executes {@link #checkConfigVersions()} regularly. */ |
| private ScheduledExecutorService _scheduler; |
| |
| /** true if a watch is currently installed. */ |
| private boolean _watcherStarted; |
| |
| /** true if a watch is currently installed. */ |
| private boolean _watcherInstalled; |
| |
| /** |
| * distributed map in ZooKeeper: each custom configuration is a key in this map, the ZK version of the key's node is |
| * used to track updates. |
| */ |
| private ZkConcurrentMap _clusterVersions; |
| |
| /** |
| * Versions of the znodes representing each configuration. If the actual version of the znode differs from this value, |
| * the configuration should be updated on this node. |
| */ |
| private final Map<String, Integer> _localVersions = new HashMap<String, Integer>(); |
| |
| public ZkConfigurationUpdateWatcher() { |
| // configType must be provided by DS property. |
| } |
| |
| public ZkConfigurationUpdateWatcher(final ZooKeeperService zkService, final String configType) { |
| _zkService = zkService; |
| _configType = configType; |
| } |
| |
| /** service reference bind method. */ |
| public void setZkService(final ZooKeeperService zkService) { |
| _zkService = zkService; |
| } |
| |
| /** service reference unbind method. */ |
| public void unsetZkService(final ZooKeeperService zkService) { |
| if (_zkService == zkService) { |
| _zkService = null; |
| } |
| } |
| |
| @Override |
| public synchronized void registerService(final UpdateableService service) { |
| _service = service; |
| } |
| |
| /** activate the service. */ |
| protected void activate(final ComponentContext context) { |
| try { |
| _configType = context.getProperties().get("configType").toString(); |
| initialize(); |
| } catch (final KeeperException ex) { |
| _log.error("Failed to create znode " + _rootPath + ", watcher service will be disfunctional.", ex); |
| } |
| } |
| |
| /** connect to Zookeeper and create root node. */ |
| public void initialize() throws KeeperException { |
| _rootPath = BASEPATH + _configType; |
| _zk = new ZkConnection(_zkService); |
| _zk.ensurePathExists(_rootPath); |
| _clusterVersions = new ZkConcurrentMap(_zk, _rootPath); |
| } |
| |
| @Override |
| public synchronized boolean startWatching() { |
| _watcherStarted = true; |
| reinstallWatch(); |
| return _watcherInstalled; |
| } |
| |
| @Override |
| public synchronized void stopWatching() { |
| _watcherStarted = false; |
| _watcherInstalled = false; |
| } |
| |
| @Override |
| public void startPolling() { |
| startPolling(POLL_INTERVAL); |
| } |
| |
| @Override |
| public synchronized void startPolling(final int pollIntervalSeconds) { |
| if (_scheduler == null) { |
| _scheduler = Executors.newScheduledThreadPool(1); |
| _scheduler.scheduleWithFixedDelay(new Runnable() { |
| @Override |
| public void run() { |
| checkConfigVersions(); |
| // ensure that watch is installed if watching was started. |
| installWatch(); |
| } |
| }, pollIntervalSeconds, pollIntervalSeconds, TimeUnit.SECONDS); |
| _log.info("Started: polling for " + _configType + " configuration updates each " + pollIntervalSeconds |
| + " seconds."); |
| } |
| } |
| |
| @Override |
| public synchronized void stopPolling() { |
| if (_scheduler != null) { |
| _scheduler.shutdownNow(); |
| _scheduler = null; |
| _log.info("Stopped watcher for " + _configType + " configuration updates."); |
| } |
| } |
| |
| @Override |
| public synchronized void configLoadedOnStart(final String configurationName, final String timestamp) { |
| try { |
| _clusterVersions.putIfAbsent(configurationName, timestamp); |
| _localVersions.put(configurationName, _clusterVersions.getVersion(configurationName)); |
| } catch (final RuntimeException ex) { |
| throw new RuntimeException("Failed to initialize for configuration '" + configurationName + "'."); |
| } |
| } |
| |
| @Override |
| public synchronized void configUpdated(final String configurationName, final String timestamp) { |
| try { |
| final String oldTimestamp = _clusterVersions.getString(configurationName); |
| Integer version; |
| if (oldTimestamp == null) { |
| _clusterVersions.put(configurationName, timestamp); |
| version = _clusterVersions.getVersion(configurationName); |
| } else { |
| version = _clusterVersions.replaceAndGetVersion(configurationName, oldTimestamp, timestamp); |
| if (version == null) { |
| throw new RuntimeException("Failed to send update notification for configuration '" + configurationName |
| + "'"); |
| } |
| } |
| _localVersions.put(configurationName, version); |
| sendNotification(configurationName + " updated at " + timestamp); |
| } catch (final RuntimeException ex) { |
| throw new RuntimeException("Failed to update for configuration '" + configurationName + "'."); |
| } |
| } |
| |
| @Override |
| public synchronized void configDeleted(final String configurationName) { |
| try { |
| _clusterVersions.remove(configurationName); |
| _localVersions.remove(configurationName); |
| sendNotification(configurationName + " deleted"); |
| } catch (final RuntimeException ex) { |
| throw new RuntimeException("Failed to update for configuration '" + configurationName + "'."); |
| } |
| } |
| |
| @Override |
| public synchronized void checkConfigVersions() { |
| if (_log.isDebugEnabled()) { |
| _log.debug("checking versions of " + _configType + " configurations deployed in the cluster"); |
| } |
| try { |
| final Set<String> obsoleteConfigs = new HashSet<String>(_localVersions.keySet()); |
| for (final String configurationName : _clusterVersions.keySet()) { |
| obsoleteConfigs.remove(configurationName); |
| checkConfigVersion(configurationName); |
| } |
| for (final String obsoleteConfig : obsoleteConfigs) { |
| deleteConfig(obsoleteConfig); |
| } |
| } catch (final Exception ex) { |
| _log.warn("Error getting cluster versions of " + _configType |
| + " configurations, maybe we are losing an update now. We'll retry later.", ex); |
| } |
| } |
| |
| /** compare cluster version of a single with local version and update local deployment. */ |
| private synchronized void checkConfigVersion(final String configurationName) { |
| try { |
| final Integer clusterVersion = _clusterVersions.getVersion(configurationName); |
| if (clusterVersion != null) { |
| final Integer localVersion = _localVersions.get(configurationName); |
| if (localVersion == null || !clusterVersion.equals(localVersion)) { |
| try { |
| _service.synchronizeConfiguration(configurationName, false); |
| _localVersions.put(configurationName, clusterVersion); |
| } catch (final Exception ex) { |
| _log.warn("Error updating " + _configType + " configuration '" + configurationName |
| + "', old version will stay active.", ex); |
| } |
| } |
| } |
| } catch (final Exception ex) { |
| _log.warn("Error getting cluster version of " + _configType + " configuration '" + configurationName |
| + "', maybe we are losing an update now.", ex); |
| } |
| } |
| |
| /** remove an obsolete configuration from the cluster. */ |
| private synchronized void deleteConfig(final String configurationName) { |
| try { |
| _service.synchronizeConfiguration(configurationName, true); |
| _localVersions.remove(configurationName); |
| } catch (final Exception ex) { |
| _log.warn("Error deleting " + _configType + " configuration '" + configurationName |
| + "', old version will stay active.", ex); |
| } |
| } |
| |
| /** touch root znode to trigger watches set by other nodes. */ |
| private void sendNotification(final String text) { |
| try { |
| _zk.setData(_rootPath, text.getBytes("utf-8")); |
| } catch (final Exception ex) { |
| _log.warn("Failed to update notification " + _rootPath + " for " + text |
| + ". Watches may not be triggered, remove configuration updates may need more time.", ex); |
| } |
| } |
| |
| /** set watch on root node, if watching is enabled and no watch is supposed to be installed currently. */ |
| private synchronized void installWatch() { |
| if (_watcherStarted && !_watcherInstalled) { |
| try { |
| _zk.exists(_rootPath, this); |
| _watcherInstalled = true; |
| } catch (final KeeperException ex) { |
| _log.warn("Starting to watch for updates failed, pipeline update notifications may take a bit longer: " |
| + "Could not install watch on znode" + _rootPath, ex); |
| } |
| } |
| } |
| |
| private synchronized void reinstallWatch() { |
| _watcherInstalled = false; |
| installWatch(); |
| } |
| |
| /** reinstalls the watch and calls {@link #checkConfigVersions()}. */ |
| @Override |
| public void process(final WatchedEvent event) { |
| if (_log.isDebugEnabled()) { |
| _log.debug("watch triggered for " + _configType + ": " + event); |
| } |
| reinstallWatch(); |
| checkConfigVersions(); |
| } |
| |
| } |