blob: ef8fc804b262d46c61d396d99e2ea3ff323b146c [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Andreas Schank (Attensity Europe GmbH) - initial implementation
**********************************************************************************************************************/
package org.eclipse.smila.clusterconfig.simple;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map.Entry;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.smila.clusterconfig.ClusterConfigService;
import org.eclipse.smila.datamodel.Any;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.AnySeq;
import org.eclipse.smila.datamodel.DataFactory;
import org.eclipse.smila.datamodel.ipc.IpcAnyReader;
import org.eclipse.smila.utils.config.ConfigUtils;
import org.osgi.service.component.ComponentContext;
/**
* Simple Cluster Configuration Service implementation.
*
* @see ClusterConfigService
*/
public class SimpleClusterConfigService implements ClusterConfigService {
/** bundle id. */
public static final String BUNDLE_ID = "org.eclipse.smila.clusterconfig.simple";
// ---- default values ----
/** default zookeeper garbage collection interval. */
public static final long DEFAULT_ZK_GC_INTERVAL = 60;
/** default number of nodes that may fail until zookeeper stops to work. */
public static final long DEFAULT_FAILSAFETY_LEVEL = 0;
/** default max scale up is -1 which means: unlimited. */
public static final long DEFAULT_MAX_SCALE_UP = -1;
/** default max retries for a task. */
public static final long DEFAULT_MAX_RETRIES = 10;
/** default time to live for a task. */
public static final long DEFAULT_TIME_TO_LIVE = 300;
/** default resumeJobs. */
public static final boolean DEFAULT_RESUME_JOBS = false;
// ---- other constants ----
/** properties file name. */
private static final String CONFIG_FILE_NAME = "clusterconfig.json";
// ---- keys ----
/** key for cluster nodes. */
private static final String KEY_CLUSTER_NODES = "clusterNodes";
/** key for specified local host name. */
private static final String KEY_LOCAL_HOST_NAME = "localHost";
/** key for Zookeeper garbage collection interval. */
private static final String KEY_ZK_GC_INTERVAL = "zkGcInterval";
/** key for Zookeeper garbage collection interval. */
private static final String KEY_FAILSAFETY_LEVEL = "failsafetyLevel";
/** key for maximum scale up. */
private static final String KEY_MAX_SCALE_UP = "maxScaleUp";
/** key for workers section. */
private static final String KEY_WORKERS = "workers";
/** key for services section. */
private static final String KEY_SERVICES = "services";
/** key for httpPort. */
private static final String KEY_HTTP_PORT = "httpPort";
/** key for taskmanager section. */
private static final String KEY_TASKMANAGER = "taskmanager";
/** key for timeToLive. */
private static final String KEY_TIME_TO_LIVE = "timeToLive";
/** key for maxRetries section. */
private static final String KEY_MAX_RETRIES = "maxRetries";
/** key for resumeJobs. */
private static final String KEY_RESUME_JOBS = "resumeJobs";
// ---- other fields ----
/** private log. */
private final Log _log = LogFactory.getLog(getClass());
/** properties from file. */
private AnyMap _properties = DataFactory.DEFAULT.createAnyMap();
/** could we read the configuration? */
private boolean _isConfigured;
/**
* Default Constructor for OSGi DS.
*/
public SimpleClusterConfigService() {
}
/**
* OSGi Declarative Services service activation method.
*
* @param context
* OSGi service component context.
*/
protected void activate(final ComponentContext context) {
try {
readConfiguration();
if (_log.isDebugEnabled()) {
_log.debug("successfully activated");
}
} catch (final Throwable e) {
final String msg = "Error while activating " + BUNDLE_ID;
if (_log.isErrorEnabled()) {
_log.error(msg, e);
}
throw new RuntimeException(msg, e);
}
}
/**
* OSGi Declarative Services service deactivation method.
*
* @param context
* OSGi service component context.
*/
protected void deactivate(final ComponentContext context) {
if (_log.isDebugEnabled()) {
_log.debug("deactivate");
}
}
@Override
public AnyMap getAllProperties() {
return _properties;
}
/**
* {@inheritDoc}
*
* <p>
* If no cluster nodes are specified, the returned cluster consists of the local host only (as returned by
* {@link #getLocalHost()}.
* </p>
*/
@Override
public List<String> getClusterNodes() {
final AnySeq clusterNodes;
if (_properties.containsKey(KEY_CLUSTER_NODES)) {
if (_properties.get(KEY_CLUSTER_NODES).isSeq()) {
clusterNodes = _properties.getSeq(KEY_CLUSTER_NODES);
} else {
clusterNodes = DataFactory.DEFAULT.createAnySeq();
clusterNodes.add(_properties.getStringValue(KEY_CLUSTER_NODES));
}
} else {
clusterNodes = DataFactory.DEFAULT.createAnySeq();
clusterNodes.add(getLocalHost());
_properties.put(KEY_CLUSTER_NODES, clusterNodes);
}
return clusterNodes.asStrings();
}
/**
* {@inheritDoc}
*
* <p>
* If no name is specified in the configuration, the host name determined by the local {@link InetAddress} is
* returned.
* </p>
*/
@Override
public String getLocalHost() {
final String localHost;
if (_properties.containsKey(KEY_LOCAL_HOST_NAME)) {
localHost = _properties.getStringValue(KEY_LOCAL_HOST_NAME);
} else {
String hostName = "localhost";
try {
final InetAddress address = InetAddress.getLocalHost();
hostName = address.getHostName();
} catch (final UnknownHostException e) {
_log.warn("Could not retrieve local host name. Using " + hostName);
}
localHost = hostName;
_properties.put(KEY_LOCAL_HOST_NAME, localHost);
}
return localHost;
}
/**
* {@inheritDoc}
*
* <p>
* If no value is specified, the default value of 60 is returned.
* </p>
*/
@Override
public long getZkGcInterval() {
return getLongPropertyOrSetDefault(KEY_ZK_GC_INTERVAL, DEFAULT_ZK_GC_INTERVAL);
}
/** {@inheritDoc} */
@Override
public long getFailSafetyLevel() {
return getLongPropertyOrSetDefault(KEY_FAILSAFETY_LEVEL, DEFAULT_FAILSAFETY_LEVEL);
}
/** {@inheritDoc} */
@Override
public boolean isConfigured() {
return _isConfigured;
}
/**
* {@inheritDoc}
*
* <p>
* If no limit is configured, -1 is returned.
* </p>
*/
@Override
public long getMaxScaleUp() {
final AnyMap taskManagerSection;
if (_properties.containsKey(KEY_TASKMANAGER)) {
taskManagerSection = _properties.getMap(KEY_TASKMANAGER);
} else {
taskManagerSection = DataFactory.DEFAULT.createAnyMap();
_properties.put(KEY_TASKMANAGER, taskManagerSection);
}
return getLongPropertyOrSetDefault(taskManagerSection, KEY_MAX_SCALE_UP, DEFAULT_MAX_SCALE_UP);
}
@Override
public boolean isResumeJobs() {
final AnyMap taskManagerSection;
if (_properties.containsKey(KEY_TASKMANAGER)) {
taskManagerSection = _properties.getMap(KEY_TASKMANAGER);
} else {
taskManagerSection = DataFactory.DEFAULT.createAnyMap();
_properties.put(KEY_TASKMANAGER, taskManagerSection);
}
return getBooleanPropertyOrSetDefault(taskManagerSection, KEY_RESUME_JOBS, DEFAULT_RESUME_JOBS);
}
/** {@inheritDoc} */
@Override
public long getWorkerScaleUp(final String worker) {
final AnyMap workersSection;
if (_properties.containsKey(KEY_WORKERS)) {
workersSection = _properties.getMap(KEY_WORKERS);
} else {
workersSection = DataFactory.DEFAULT.createAnyMap();
_properties.put(KEY_WORKERS, workersSection);
}
final AnyMap workerProperties;
if (workersSection.containsKey(worker)) {
workerProperties = workersSection.getMap(worker);
} else {
workerProperties = DataFactory.DEFAULT.createAnyMap();
workersSection.put(worker, workerProperties);
}
return getLongPropertyOrSetDefault(workerProperties, KEY_MAX_SCALE_UP, DEFAULT_MAX_SCALE_UP);
}
/** {@inheritDoc} */
@Override
public Collection<String> getWorkersWithScaleUp() {
final Collection<String> workers = new ArrayList<String>();
if (_properties.containsKey(KEY_WORKERS)) {
for (final Entry<String, Any> entry : _properties.getMap(KEY_WORKERS).entrySet()) {
final AnyMap workerMap = (AnyMap) entry.getValue();
if (getLongPropertyOrSetDefault(workerMap, KEY_MAX_SCALE_UP, DEFAULT_MAX_SCALE_UP) >= 0) {
workers.add(entry.getKey());
}
}
}
return workers;
}
/** {@inheritDoc} */
@Override
public int getHttpPort(final String serviceName) {
final AnyMap servicesSectionMap = _properties.getMap(KEY_SERVICES);
if (servicesSectionMap != null && servicesSectionMap.containsKey(serviceName)) {
final AnyMap serviceMap = servicesSectionMap.getMap(serviceName);
if (serviceMap.containsKey(KEY_HTTP_PORT)) {
return serviceMap.getLongValue(KEY_HTTP_PORT).intValue();
}
}
if (_log.isInfoEnabled()) {
_log.info("HTTP port for service '" + serviceName + "' not defined.");
}
return -1;
}
/**
* {@inheritDoc}
*/
@Override
public long getMaxRetries() {
final AnyMap taskManagerSection;
if (_properties.containsKey(KEY_TASKMANAGER)) {
taskManagerSection = _properties.getMap(KEY_TASKMANAGER);
} else {
taskManagerSection = DataFactory.DEFAULT.createAnyMap();
_properties.put(KEY_TASKMANAGER, taskManagerSection);
}
return getLongPropertyOrSetDefault(taskManagerSection, KEY_MAX_RETRIES, DEFAULT_MAX_RETRIES);
}
/**
* {@inheritDoc}
*/
@Override
public long getTimeToLive() {
final AnyMap taskManagerSection;
if (_properties.containsKey(KEY_TASKMANAGER)) {
taskManagerSection = _properties.getMap(KEY_TASKMANAGER);
} else {
taskManagerSection = DataFactory.DEFAULT.createAnyMap();
_properties.put(KEY_TASKMANAGER, taskManagerSection);
}
return getLongPropertyOrSetDefault(taskManagerSection, KEY_TIME_TO_LIVE, DEFAULT_TIME_TO_LIVE);
}
/**
* Looks up the long value with the given key, if this value is not set, the given defaultvalue will be set.
*
* @param key
* the key of the property
* @param defaultValueToSet
* the default value
* @return the value from the properties or the default value if not set.
*/
protected long getLongPropertyOrSetDefault(final String key, final long defaultValueToSet) {
return getLongPropertyOrSetDefault(_properties, key, defaultValueToSet);
}
/**
* Looks up the long value with the given key in the givent properties map, if this value is not set, the given
* defaultvalue will be set.
*
* @param key
* the key of the property
* @param defaultValueToSet
* the default value
* @return the value from the properties or the default value if not set.
*/
protected long getLongPropertyOrSetDefault(final AnyMap properties, final String key, final long defaultValueToSet) {
final long value;
if (properties.containsKey(key)) {
value = properties.getLongValue(key);
} else {
value = defaultValueToSet;
properties.put(key, value);
}
return value;
}
/**
* Looks up the long value with the given key, if this value is not set, the given defaultvalue will be set.
*
* @param key
* the key of the property
* @param defaultValueToSet
* the default value
* @return the value from the properties or the default value if not set.
*/
protected boolean getBooleanPropertyOrSetDefault(final String key, final boolean defaultValueToSet) {
return getBooleanPropertyOrSetDefault(_properties, key, defaultValueToSet);
}
/**
* Looks up the long value with the given key in the givent properties map, if this value is not set, the given
* defaultvalue will be set.
*
* @param key
* the key of the property
* @param defaultValueToSet
* the default value
* @return the value from the properties or the default value if not set.
*/
protected boolean getBooleanPropertyOrSetDefault(final AnyMap properties, final String key,
final boolean defaultValueToSet) {
final boolean value;
if (properties.containsKey(key)) {
value = properties.getBooleanValue(key);
} else {
value = defaultValueToSet;
properties.put(key, value);
}
return value;
}
/**
* Read the configuration from the configuration file.
*
* @throws IOException
* error during loading of configuration file.
*/
protected void readConfiguration() throws IOException {
final IpcAnyReader anyReader = new IpcAnyReader();
InputStream configurationFileStream = null;
try {
configurationFileStream = ConfigUtils.getConfigStream(BUNDLE_ID, CONFIG_FILE_NAME);
_properties = (AnyMap) anyReader.readJsonStream(configurationFileStream);
_isConfigured = true;
} catch (final IOException ex) {
throw new IOException("Could not read configuration property file " + CONFIG_FILE_NAME + ": " + ex.toString());
} finally {
IOUtils.closeQuietly(configurationFileStream);
}
}
}