/******************************************************************************* | |
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the | |
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this | |
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: Andreas Schank (Attensity Europe GmbH) - initial implementation | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.clusterconfig.simple; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.net.InetAddress; | |
import java.net.UnknownHostException; | |
import java.util.ArrayList; | |
import java.util.Collection; | |
import java.util.List; | |
import java.util.Map.Entry; | |
import org.apache.commons.io.IOUtils; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.eclipse.smila.clusterconfig.ClusterConfigService; | |
import org.eclipse.smila.datamodel.Any; | |
import org.eclipse.smila.datamodel.AnyMap; | |
import org.eclipse.smila.datamodel.AnySeq; | |
import org.eclipse.smila.datamodel.DataFactory; | |
import org.eclipse.smila.datamodel.ipc.IpcAnyReader; | |
import org.eclipse.smila.utils.config.ConfigUtils; | |
import org.osgi.service.component.ComponentContext; | |
/** | |
* Simple Cluster Configuration Service implementation. | |
* | |
* @see ClusterConfigService | |
*/ | |
public class SimpleClusterConfigService implements ClusterConfigService { | |
/** bundle id. */ | |
public static final String BUNDLE_ID = "org.eclipse.smila.clusterconfig.simple"; | |
// ---- default values ---- | |
/** default zookeeper garbage collection interval. */ | |
public static final long DEFAULT_ZK_GC_INTERVAL = 60; | |
/** default number of nodes that may fail until zookeeper stops to work. */ | |
public static final long DEFAULT_FAILSAFETY_LEVEL = 0; | |
/** default max scale up is -1 which means: unlimited. */ | |
public static final long DEFAULT_MAX_SCALE_UP = -1; | |
/** default max retries for a task. */ | |
public static final long DEFAULT_MAX_RETRIES = 10; | |
/** default time to live for a task. */ | |
public static final long DEFAULT_TIME_TO_LIVE = 300; | |
/** default resumeJobs. */ | |
public static final boolean DEFAULT_RESUME_JOBS = false; | |
// ---- other constants ---- | |
/** properties file name. */ | |
private static final String CONFIG_FILE_NAME = "clusterconfig.json"; | |
// ---- keys ---- | |
/** key for cluster nodes. */ | |
private static final String KEY_CLUSTER_NODES = "clusterNodes"; | |
/** key for specified local host name. */ | |
private static final String KEY_LOCAL_HOST_NAME = "localHost"; | |
/** key for Zookeeper garbage collection interval. */ | |
private static final String KEY_ZK_GC_INTERVAL = "zkGcInterval"; | |
/** key for Zookeeper garbage collection interval. */ | |
private static final String KEY_FAILSAFETY_LEVEL = "failsafetyLevel"; | |
/** key for maximum scale up. */ | |
private static final String KEY_MAX_SCALE_UP = "maxScaleUp"; | |
/** key for workers section. */ | |
private static final String KEY_WORKERS = "workers"; | |
/** key for services section. */ | |
private static final String KEY_SERVICES = "services"; | |
/** key for httpPort. */ | |
private static final String KEY_HTTP_PORT = "httpPort"; | |
/** key for taskmanager section. */ | |
private static final String KEY_TASKMANAGER = "taskmanager"; | |
/** key for timeToLive. */ | |
private static final String KEY_TIME_TO_LIVE = "timeToLive"; | |
/** key for maxRetries section. */ | |
private static final String KEY_MAX_RETRIES = "maxRetries"; | |
/** key for resumeJobs. */ | |
private static final String KEY_RESUME_JOBS = "resumeJobs"; | |
// ---- other fields ---- | |
/** private log. */ | |
private final Log _log = LogFactory.getLog(getClass()); | |
/** properties from file. */ | |
private AnyMap _properties = DataFactory.DEFAULT.createAnyMap(); | |
/** could we read the configuration? */ | |
private boolean _isConfigured; | |
/** | |
* Default Constructor for OSGi DS. | |
*/ | |
public SimpleClusterConfigService() { | |
} | |
/** | |
* OSGi Declarative Services service activation method. | |
* | |
* @param context | |
* OSGi service component context. | |
*/ | |
protected void activate(final ComponentContext context) { | |
try { | |
readConfiguration(); | |
if (_log.isDebugEnabled()) { | |
_log.debug("successfully activated"); | |
} | |
} catch (final Throwable e) { | |
final String msg = "Error while activating " + BUNDLE_ID; | |
if (_log.isErrorEnabled()) { | |
_log.error(msg, e); | |
} | |
throw new RuntimeException(msg, e); | |
} | |
} | |
/** | |
* OSGi Declarative Services service deactivation method. | |
* | |
* @param context | |
* OSGi service component context. | |
*/ | |
protected void deactivate(final ComponentContext context) { | |
if (_log.isDebugEnabled()) { | |
_log.debug("deactivate"); | |
} | |
} | |
@Override | |
public AnyMap getAllProperties() { | |
return _properties; | |
} | |
/** | |
* {@inheritDoc} | |
* | |
* <p> | |
* If no cluster nodes are specified, the returned cluster consists of the local host only (as returned by | |
* {@link #getLocalHost()}. | |
* </p> | |
*/ | |
@Override | |
public List<String> getClusterNodes() { | |
final AnySeq clusterNodes; | |
if (_properties.containsKey(KEY_CLUSTER_NODES)) { | |
if (_properties.get(KEY_CLUSTER_NODES).isSeq()) { | |
clusterNodes = _properties.getSeq(KEY_CLUSTER_NODES); | |
} else { | |
clusterNodes = DataFactory.DEFAULT.createAnySeq(); | |
clusterNodes.add(_properties.getStringValue(KEY_CLUSTER_NODES)); | |
} | |
} else { | |
clusterNodes = DataFactory.DEFAULT.createAnySeq(); | |
clusterNodes.add(getLocalHost()); | |
_properties.put(KEY_CLUSTER_NODES, clusterNodes); | |
} | |
return clusterNodes.asStrings(); | |
} | |
/** | |
* {@inheritDoc} | |
* | |
* <p> | |
* If no name is specified in the configuration, the host name determined by the local {@link InetAddress} is | |
* returned. | |
* </p> | |
*/ | |
@Override | |
public String getLocalHost() { | |
final String localHost; | |
if (_properties.containsKey(KEY_LOCAL_HOST_NAME)) { | |
localHost = _properties.getStringValue(KEY_LOCAL_HOST_NAME); | |
} else { | |
String hostName = "localhost"; | |
try { | |
final InetAddress address = InetAddress.getLocalHost(); | |
hostName = address.getHostName(); | |
} catch (final UnknownHostException e) { | |
_log.warn("Could not retrieve local host name. Using " + hostName); | |
} | |
localHost = hostName; | |
_properties.put(KEY_LOCAL_HOST_NAME, localHost); | |
} | |
return localHost; | |
} | |
/** | |
* {@inheritDoc} | |
* | |
* <p> | |
* If no value is specified, the default value of 60 is returned. | |
* </p> | |
*/ | |
@Override | |
public long getZkGcInterval() { | |
return getLongPropertyOrSetDefault(KEY_ZK_GC_INTERVAL, DEFAULT_ZK_GC_INTERVAL); | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public long getFailSafetyLevel() { | |
return getLongPropertyOrSetDefault(KEY_FAILSAFETY_LEVEL, DEFAULT_FAILSAFETY_LEVEL); | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public boolean isConfigured() { | |
return _isConfigured; | |
} | |
/** | |
* {@inheritDoc} | |
* | |
* <p> | |
* If no limit is configured, -1 is returned. | |
* </p> | |
*/ | |
@Override | |
public long getMaxScaleUp() { | |
final AnyMap taskManagerSection; | |
if (_properties.containsKey(KEY_TASKMANAGER)) { | |
taskManagerSection = _properties.getMap(KEY_TASKMANAGER); | |
} else { | |
taskManagerSection = DataFactory.DEFAULT.createAnyMap(); | |
_properties.put(KEY_TASKMANAGER, taskManagerSection); | |
} | |
return getLongPropertyOrSetDefault(taskManagerSection, KEY_MAX_SCALE_UP, DEFAULT_MAX_SCALE_UP); | |
} | |
@Override | |
public boolean isResumeJobs() { | |
final AnyMap taskManagerSection; | |
if (_properties.containsKey(KEY_TASKMANAGER)) { | |
taskManagerSection = _properties.getMap(KEY_TASKMANAGER); | |
} else { | |
taskManagerSection = DataFactory.DEFAULT.createAnyMap(); | |
_properties.put(KEY_TASKMANAGER, taskManagerSection); | |
} | |
return getBooleanPropertyOrSetDefault(taskManagerSection, KEY_RESUME_JOBS, DEFAULT_RESUME_JOBS); | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public long getWorkerScaleUp(final String worker) { | |
final AnyMap workersSection; | |
if (_properties.containsKey(KEY_WORKERS)) { | |
workersSection = _properties.getMap(KEY_WORKERS); | |
} else { | |
workersSection = DataFactory.DEFAULT.createAnyMap(); | |
_properties.put(KEY_WORKERS, workersSection); | |
} | |
final AnyMap workerProperties; | |
if (workersSection.containsKey(worker)) { | |
workerProperties = workersSection.getMap(worker); | |
} else { | |
workerProperties = DataFactory.DEFAULT.createAnyMap(); | |
workersSection.put(worker, workerProperties); | |
} | |
return getLongPropertyOrSetDefault(workerProperties, KEY_MAX_SCALE_UP, DEFAULT_MAX_SCALE_UP); | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public Collection<String> getWorkersWithScaleUp() { | |
final Collection<String> workers = new ArrayList<String>(); | |
if (_properties.containsKey(KEY_WORKERS)) { | |
for (final Entry<String, Any> entry : _properties.getMap(KEY_WORKERS).entrySet()) { | |
final AnyMap workerMap = (AnyMap) entry.getValue(); | |
if (getLongPropertyOrSetDefault(workerMap, KEY_MAX_SCALE_UP, DEFAULT_MAX_SCALE_UP) >= 0) { | |
workers.add(entry.getKey()); | |
} | |
} | |
} | |
return workers; | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public int getHttpPort(final String serviceName) { | |
final AnyMap servicesSectionMap = _properties.getMap(KEY_SERVICES); | |
if (servicesSectionMap != null && servicesSectionMap.containsKey(serviceName)) { | |
final AnyMap serviceMap = servicesSectionMap.getMap(serviceName); | |
if (serviceMap.containsKey(KEY_HTTP_PORT)) { | |
return serviceMap.getLongValue(KEY_HTTP_PORT).intValue(); | |
} | |
} | |
if (_log.isInfoEnabled()) { | |
_log.info("HTTP port for service '" + serviceName + "' not defined."); | |
} | |
return -1; | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public long getMaxRetries() { | |
final AnyMap taskManagerSection; | |
if (_properties.containsKey(KEY_TASKMANAGER)) { | |
taskManagerSection = _properties.getMap(KEY_TASKMANAGER); | |
} else { | |
taskManagerSection = DataFactory.DEFAULT.createAnyMap(); | |
_properties.put(KEY_TASKMANAGER, taskManagerSection); | |
} | |
return getLongPropertyOrSetDefault(taskManagerSection, KEY_MAX_RETRIES, DEFAULT_MAX_RETRIES); | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public long getTimeToLive() { | |
final AnyMap taskManagerSection; | |
if (_properties.containsKey(KEY_TASKMANAGER)) { | |
taskManagerSection = _properties.getMap(KEY_TASKMANAGER); | |
} else { | |
taskManagerSection = DataFactory.DEFAULT.createAnyMap(); | |
_properties.put(KEY_TASKMANAGER, taskManagerSection); | |
} | |
return getLongPropertyOrSetDefault(taskManagerSection, KEY_TIME_TO_LIVE, DEFAULT_TIME_TO_LIVE); | |
} | |
/** | |
* Looks up the long value with the given key, if this value is not set, the given defaultvalue will be set. | |
* | |
* @param key | |
* the key of the property | |
* @param defaultValueToSet | |
* the default value | |
* @return the value from the properties or the default value if not set. | |
*/ | |
protected long getLongPropertyOrSetDefault(final String key, final long defaultValueToSet) { | |
return getLongPropertyOrSetDefault(_properties, key, defaultValueToSet); | |
} | |
/** | |
* Looks up the long value with the given key in the givent properties map, if this value is not set, the given | |
* defaultvalue will be set. | |
* | |
* @param key | |
* the key of the property | |
* @param defaultValueToSet | |
* the default value | |
* @return the value from the properties or the default value if not set. | |
*/ | |
protected long getLongPropertyOrSetDefault(final AnyMap properties, final String key, final long defaultValueToSet) { | |
final long value; | |
if (properties.containsKey(key)) { | |
value = properties.getLongValue(key); | |
} else { | |
value = defaultValueToSet; | |
properties.put(key, value); | |
} | |
return value; | |
} | |
/** | |
* Looks up the long value with the given key, if this value is not set, the given defaultvalue will be set. | |
* | |
* @param key | |
* the key of the property | |
* @param defaultValueToSet | |
* the default value | |
* @return the value from the properties or the default value if not set. | |
*/ | |
protected boolean getBooleanPropertyOrSetDefault(final String key, final boolean defaultValueToSet) { | |
return getBooleanPropertyOrSetDefault(_properties, key, defaultValueToSet); | |
} | |
/** | |
* Looks up the long value with the given key in the givent properties map, if this value is not set, the given | |
* defaultvalue will be set. | |
* | |
* @param key | |
* the key of the property | |
* @param defaultValueToSet | |
* the default value | |
* @return the value from the properties or the default value if not set. | |
*/ | |
protected boolean getBooleanPropertyOrSetDefault(final AnyMap properties, final String key, | |
final boolean defaultValueToSet) { | |
final boolean value; | |
if (properties.containsKey(key)) { | |
value = properties.getBooleanValue(key); | |
} else { | |
value = defaultValueToSet; | |
properties.put(key, value); | |
} | |
return value; | |
} | |
/** | |
* Read the configuration from the configuration file. | |
* | |
* @throws IOException | |
* error during loading of configuration file. | |
*/ | |
protected void readConfiguration() throws IOException { | |
final IpcAnyReader anyReader = new IpcAnyReader(); | |
InputStream configurationFileStream = null; | |
try { | |
configurationFileStream = ConfigUtils.getConfigStream(BUNDLE_ID, CONFIG_FILE_NAME); | |
_properties = (AnyMap) anyReader.readJsonStream(configurationFileStream); | |
_isConfigured = true; | |
} catch (final IOException ex) { | |
throw new IOException("Could not read configuration property file " + CONFIG_FILE_NAME + ": " + ex.toString()); | |
} finally { | |
IOUtils.closeQuietly(configurationFileStream); | |
} | |
} | |
} |