blob: 534078d82cfe0ecda2cc972dda103784bf8f5c7a [file] [log] [blame]
/***********************************************************************************************************************
* Copyright (c) 2008, 2013 Empolis Information Management GmbH and brox IT Solutions GmbH. All rights reserved. This
* program and the accompanying materials are made available under the terms of the Eclipse Public License v1.0 which
* accompanies this distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Juergen Schumacher (Empolis Information Management GmbH) - implementation
**********************************************************************************************************************/
package org.eclipse.smila.zookeeper;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.eclipse.smila.utils.config.ConfigurationUpdateWatcher;
import org.osgi.service.component.ComponentContext;
/**
* Uses ZooKeeper to coordinate configuration updating in a SMILA cluster.
*/
public class ZkConfigurationUpdateWatcher implements ConfigurationUpdateWatcher, Watcher {
private static final String BASEPATH = "/smila/configversions/";
/** interval to poll for changes, in seconds. */
private static final int POLL_INTERVAL = 60;
/** local logger. */
private final Log _log = LogFactory.getLog(getClass());
/** ZooKeeper service reference. */
private ZooKeeperService _zkService;
/** processor to notify about update from other nodes. */
private UpdateableService _service;
private String _configType;
/** znode name to use for our data. */
private String _rootPath;
/** conection to zookeeper. */
private ZkConnection _zk;
/** executes {@link #checkConfigVersions()} regularly. */
private ScheduledExecutorService _scheduler;
/** true if a watch is currently installed. */
private boolean _watcherStarted;
/** true if a watch is currently installed. */
private boolean _watcherInstalled;
/**
* distributed map in ZooKeeper: each custom configuration is a key in this map, the ZK version of the key's node is
* used to track updates.
*/
private ZkConcurrentMap _clusterVersions;
/**
* Versions of the znodes representing each configuration. If the actual version of the znode differs from this value,
* the configuration should be updated on this node.
*/
private final Map<String, Integer> _localVersions = new HashMap<String, Integer>();
public ZkConfigurationUpdateWatcher() {
// configType must be provided by DS property.
}
public ZkConfigurationUpdateWatcher(final ZooKeeperService zkService, final String configType) {
_zkService = zkService;
_configType = configType;
}
/** service reference bind method. */
public void setZkService(final ZooKeeperService zkService) {
_zkService = zkService;
}
/** service reference unbind method. */
public void unsetZkService(final ZooKeeperService zkService) {
if (_zkService == zkService) {
_zkService = null;
}
}
@Override
public synchronized void registerService(final UpdateableService service) {
_service = service;
}
/** activate the service. */
protected void activate(final ComponentContext context) {
try {
_configType = context.getProperties().get("configType").toString();
initialize();
} catch (final KeeperException ex) {
_log.error("Failed to create znode " + _rootPath + ", watcher service will be disfunctional.", ex);
}
}
/** connect to Zookeeper and create root node. */
public void initialize() throws KeeperException {
_rootPath = BASEPATH + _configType;
_zk = new ZkConnection(_zkService);
_zk.ensurePathExists(_rootPath);
_clusterVersions = new ZkConcurrentMap(_zk, _rootPath);
}
@Override
public synchronized boolean startWatching() {
_watcherStarted = true;
reinstallWatch();
return _watcherInstalled;
}
@Override
public synchronized void stopWatching() {
_watcherStarted = false;
_watcherInstalled = false;
}
@Override
public void startPolling() {
startPolling(POLL_INTERVAL);
}
@Override
public synchronized void startPolling(final int pollIntervalSeconds) {
if (_scheduler == null) {
_scheduler = Executors.newScheduledThreadPool(1);
_scheduler.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
checkConfigVersions();
// ensure that watch is installed if watching was started.
installWatch();
}
}, pollIntervalSeconds, pollIntervalSeconds, TimeUnit.SECONDS);
_log.info("Started: polling for " + _configType + " configuration updates each " + pollIntervalSeconds
+ " seconds.");
}
}
@Override
public synchronized void stopPolling() {
if (_scheduler != null) {
_scheduler.shutdownNow();
_scheduler = null;
_log.info("Stopped watcher for " + _configType + " configuration updates.");
}
}
@Override
public synchronized void configLoadedOnStart(final String configurationName, final String timestamp) {
try {
_clusterVersions.putIfAbsent(configurationName, timestamp);
_localVersions.put(configurationName, _clusterVersions.getVersion(configurationName));
} catch (final RuntimeException ex) {
throw new RuntimeException("Failed to initialize for configuration '" + configurationName + "'.");
}
}
@Override
public synchronized void configUpdated(final String configurationName, final String timestamp) {
try {
final String oldTimestamp = _clusterVersions.getString(configurationName);
Integer version;
if (oldTimestamp == null) {
_clusterVersions.put(configurationName, timestamp);
version = _clusterVersions.getVersion(configurationName);
} else {
version = _clusterVersions.replaceAndGetVersion(configurationName, oldTimestamp, timestamp);
if (version == null) {
throw new RuntimeException("Failed to send update notification for configuration '" + configurationName
+ "'");
}
}
_localVersions.put(configurationName, version);
sendNotification(configurationName + " updated at " + timestamp);
} catch (final RuntimeException ex) {
throw new RuntimeException("Failed to update for configuration '" + configurationName + "'.");
}
}
@Override
public synchronized void configDeleted(final String configurationName) {
try {
_clusterVersions.remove(configurationName);
_localVersions.remove(configurationName);
sendNotification(configurationName + " deleted");
} catch (final RuntimeException ex) {
throw new RuntimeException("Failed to update for configuration '" + configurationName + "'.");
}
}
@Override
public synchronized void checkConfigVersions() {
if (_log.isDebugEnabled()) {
_log.debug("checking versions of " + _configType + " configurations deployed in the cluster");
}
try {
final Set<String> obsoleteConfigs = new HashSet<String>(_localVersions.keySet());
for (final String configurationName : _clusterVersions.keySet()) {
obsoleteConfigs.remove(configurationName);
checkConfigVersion(configurationName);
}
for (final String obsoleteConfig : obsoleteConfigs) {
deleteConfig(obsoleteConfig);
}
} catch (final Exception ex) {
_log.warn("Error getting cluster versions of " + _configType
+ " configurations, maybe we are losing an update now. We'll retry later.", ex);
}
}
/** compare cluster version of a single with local version and update local deployment. */
private synchronized void checkConfigVersion(final String configurationName) {
try {
final Integer clusterVersion = _clusterVersions.getVersion(configurationName);
if (clusterVersion != null) {
final Integer localVersion = _localVersions.get(configurationName);
if (localVersion == null || !clusterVersion.equals(localVersion)) {
try {
_service.synchronizeConfiguration(configurationName, false);
_localVersions.put(configurationName, clusterVersion);
} catch (final Exception ex) {
_log.warn("Error updating " + _configType + " configuration '" + configurationName
+ "', old version will stay active.", ex);
}
}
}
} catch (final Exception ex) {
_log.warn("Error getting cluster version of " + _configType + " configuration '" + configurationName
+ "', maybe we are losing an update now.", ex);
}
}
/** remove an obsolete configuration from the cluster. */
private synchronized void deleteConfig(final String configurationName) {
try {
_service.synchronizeConfiguration(configurationName, true);
_localVersions.remove(configurationName);
} catch (final Exception ex) {
_log.warn("Error deleting " + _configType + " configuration '" + configurationName
+ "', old version will stay active.", ex);
}
}
/** touch root znode to trigger watches set by other nodes. */
private void sendNotification(final String text) {
try {
_zk.setData(_rootPath, text.getBytes("utf-8"));
} catch (final Exception ex) {
_log.warn("Failed to update notification " + _rootPath + " for " + text
+ ". Watches may not be triggered, remove configuration updates may need more time.", ex);
}
}
/** set watch on root node, if watching is enabled and no watch is supposed to be installed currently. */
private synchronized void installWatch() {
if (_watcherStarted && !_watcherInstalled) {
try {
_zk.exists(_rootPath, this);
_watcherInstalled = true;
} catch (final KeeperException ex) {
_log.warn("Starting to watch for updates failed, pipeline update notifications may take a bit longer: "
+ "Could not install watch on znode" + _rootPath, ex);
}
}
}
private synchronized void reinstallWatch() {
_watcherInstalled = false;
installWatch();
}
/** reinstalls the watch and calls {@link #checkConfigVersions()}. */
@Override
public void process(final WatchedEvent event) {
if (_log.isDebugEnabled()) {
_log.debug("watch triggered for " + _configType + ": " + event);
}
reinstallWatch();
checkConfigVersions();
}
}