package org.eclipse.jemo.sys;
import org.eclipse.jemo.AbstractJemo;
import org.eclipse.jemo.api.KeyValue;
import org.eclipse.jemo.api.ModuleLimit;
import org.eclipse.jemo.internal.model.CloudProvider;
import org.eclipse.jemo.sys.internal.Util;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
* this class will contain methods for running the scheduler engine.
* @author Christopher Stura ""
public class JemoScheduler extends Thread {
public static class Nomination {
private final long nominatedOn;
private final String instanceId;
public Nomination(String instanceId,long nominatedOn) {
this.nominatedOn = nominatedOn;
this.instanceId = instanceId;
public long getNominatedOn() {
return nominatedOn;
public String getInstanceId() {
return instanceId;
public boolean isExpired() {
long timeSinceNomination = System.currentTimeMillis() - nominatedOn;
return timeSinceNomination > TimeUnit.MINUTES.toMillis(5);
protected static class ModuleActivityMap {
private final AbstractJemo jemoServer;
private final ModuleInfo moduleInfo;
private final Set<String> instances;
private final ModuleLimit limits;
public ModuleActivityMap(AbstractJemo jemoServer, ModuleInfo moduleInfo, Set<String> instances, ModuleLimit limits) {
this.moduleInfo = moduleInfo;
this.instances = instances;
this.limits = limits;
this.jemoServer = jemoServer;
public int getCurrentGSMActivity() {
return jemoServer.getPluginManager().getNumModuleEventsRunningOnGSM(moduleInfo.getId(), moduleInfo.getVersion(), moduleInfo.getImplementation());
public int getCurrentInstanceActivity(String instanceId) {
return JemoPluginManager.getNumModuleEventsRunning(instanceId, moduleInfo.getId(), moduleInfo.getVersion(), moduleInfo.getImplementation());
public int getCurrentLocationActivity(String location) {
return jemoServer.getPluginManager().getNumModuleEventsRunningOnLocation(location,moduleInfo.getId(), moduleInfo.getVersion(), moduleInfo.getImplementation());
public int getGSMMaximum() {
return limits == null ? -1 : limits.getMaxActiveBatchesPerGSM();
public int getInstanceMaximum() {
return limits == null ? -1 : limits.getMaxActiveBatchesPerInstance();
public int getLocationMaximum() {
return limits == null ? -1 : limits.getMaxActiveBatchesPerLocation();
public boolean isValidLocation(String location) {
return limits == null ? true : (limits.getBatchLocations() == null || limits.getBatchLocations().length == 0 ? true : Arrays.asList(limits.getBatchLocations()).contains(location));
public boolean isValidForFrequencyInterval() {
if(limits == null) {
return true;
if(limits.getBatchFrequency() == null) {
return true;
long lastExecutionDate = jemoServer.getPluginManager().getLastLaunchedModuleEventOnGSM(moduleInfo.getId(), moduleInfo.getVersion(), moduleInfo.getImplementation());
if(lastExecutionDate == 0) {
return true;
return (System.currentTimeMillis() - lastExecutionDate >= limits.getBatchFrequency().getUnit().toMillis(limits.getBatchFrequency().getValue()));
private AbstractJemo jemoServer = null;
private final Random NOMINATION_RND = new Random(System.currentTimeMillis());
private Nomination currentNomination = null;
private final AtomicBoolean RUNNING = new AtomicBoolean(false);
public JemoScheduler(AbstractJemo jemoServer) {
this.jemoServer = jemoServer;
public synchronized void start() {
super.start();
public Nomination getCurrentNomination() {
String nominee = CloudProvider.getInstance().getRuntime().retrieve(STORAGE_SCHEDULER_NOMINEE, String.class);
Long LAST_NOMINATION = CloudProvider.getInstance().getRuntime().retrieve(STORAGE_SCHEDULER_NOMINATEDON, Long.class);
if(nominee != null && LAST_NOMINATION != null) {
long timeSinceNomination = System.currentTimeMillis() - LAST_NOMINATION;
if(timeSinceNomination > TimeUnit.MINUTES.toMillis(5)) { //the scheduler will hold it's nomination for 5 minutes (this will make Jemo cheaper to run)
//this means that the last nomination is stale because of the time in which the nomination was made.
return null;
} else if(!jemoServer.getPluginManager().isInstanceActive(nominee)) {
return null;
} else {
return new Nomination(nominee, LAST_NOMINATION);
return null;
* this method will produce a new nominated instance. the instance that decides this nomination will be the instance
* with the lowest CRC 32 of their instance id, the actual nominated instance will be chosen randomly.
* @return a reference to the new nomination only if we were selected as the nominated instance otherwise null.
public Nomination newNomination() {
Nomination nomination = null;
Set<String> activeInstanceList = jemoServer.getPluginManager().getActiveInstanceList();
String nominatingInstanceId =
.map(inst -> new KeyValue<>(inst, Util.crc(inst.getBytes(Util.UTF8_CHARSET))))
.min((k1,k2) -> k1.getValue().compareTo(k2.getValue()))
if(nominatingInstanceId != null) { //if at least 1 instance is active in the GSM.
if(nominatingInstanceId.equals(jemoServer.getINSTANCE_ID())) {
//we are the nominating instance
int nominatedInstancePos = NOMINATION_RND.nextInt(activeInstanceList.size());
final String nominatedInstanceId = (activeInstanceList.toArray(new String[] {}))[nominatedInstancePos];
nomination = new Nomination(nominatedInstanceId, System.currentTimeMillis());
CloudProvider.getInstance().getRuntime().store(STORAGE_SCHEDULER_NOMINEE, nominatedInstanceId);
CloudProvider.getInstance().getRuntime().store(STORAGE_SCHEDULER_NOMINATEDON, nomination.getNominatedOn());
if(nominatedInstanceId.equals(jemoServer.getINSTANCE_ID())) {
return nomination;
return null;
public void interrupt() {
super.interrupt();
jemoServer.LOG(Level.INFO, "[%s][%s] THE SCHEDULER HAS BEEN INTERUPTED", getClass().getSimpleName(), jemoServer.getINSTANCE_ID());
public void run() {
while(RUNNING.get()) { //run as long as nobody has interrupted us.
if (!jemoServer.isInInstallationMode()) {
if (currentNomination == null) {
currentNomination = getCurrentNomination();
if (currentNomination == null || currentNomination.isExpired()) {
currentNomination = newNomination();
if(currentNomination != null && currentNomination.getInstanceId().equals(jemoServer.getINSTANCE_ID())) {
//1. we will need to know the module list for each active instance in the cluster.
jemoServer.LOG(Level.INFO, "[%s][%s] NOMINATED SCHEDULER IS %s - RUNNING STATUS IS %s", getClass().getSimpleName(), jemoServer.getINSTANCE_ID(), currentNomination.getInstanceId(), String.valueOf(RUNNING.get()));
//we need to know where things are being executed so we can filter out modules which are over their limits.
Set<String> activeInstanceList = jemoServer.getPluginManager().getActiveInstanceList();
Map<String,String> instanceLocationMap = jemoServer.getPluginManager().getInstanceLocationMap(activeInstanceList.toArray(new String[] {}));
Map<String,List<ModuleInfo>> instanceModuleMap = activeInstanceList
.map(inst -> new KeyValue<>(inst,
.filter(mod -> mod.isBatch())
.collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue));
List<ModuleActivityMap> activityMap = instanceModuleMap.values().stream()
.map(m -> new ModuleActivityMap(jemoServer, m, activeInstanceList, jemoServer.getPluginManager().getModuleLimits(m.getId(), m.getVersion(),m.getImplementation())))
.filter(act -> act.getGSMMaximum() == -1 || act.getCurrentGSMActivity() < act.getGSMMaximum()) //filter out any modules that have reached their GSM activity maximum
.filter(act -> act.isValidForFrequencyInterval()) //remove modules with an incompatible frequency.
//remove modules from the instance map that have incompatible activity.
instanceModuleMap.values().forEach(mList ->
mList.removeIf(m -> !
.anyMatch(act -> act.moduleInfo.getId() == m.getId() && act.moduleInfo.getImplementation().equals(m.getImplementation()))));
//2. now we need an inverted map that will tell us the modules per instance (all module versions are available on all instances)
Map<ModuleInfo,Set<String>> moduleInstanceMap = instanceModuleMap.entrySet().stream()
.flatMap(e -> e.getValue().stream().map(mod -> new AbstractMap.SimpleEntry<>(mod,e.getKey())))
.collect(Collectors.groupingBy(AbstractMap.SimpleEntry::getKey,Collectors.mapping(AbstractMap.SimpleEntry::getValue, Collectors.toSet())));
//3. we now need to figure out of the instances which can potentially run the batch which one we should use.
Map<String,List<String>> moduleInstanceTargetMap = moduleInstanceMap.entrySet().stream()
.filter(e -> !e.getValue().isEmpty())
.map(e -> {
final KeyValue<List<String>> result = new KeyValue<>(e.getKey().getImplementation() + "_" + e.getKey().getVersion(),new ArrayList<>());
ModuleActivityMap modActivity = -> act.moduleInfo.getImplementation().equals(e.getKey().getImplementation()) && act.moduleInfo.getId() == e.getKey().getId())
if(modActivity != null) {
if(modActivity.getInstanceMaximum() != -1) { //this means that we should have at least 1 per instance and possibly more if the maximum is higher.
.filter(inst -> modActivity.getInstanceMaximum() > modActivity.getCurrentInstanceActivity(inst)) //keep only the instances that are under their maximum
.forEach(inst -> result.getValue().add(inst));
} else if(modActivity.getLocationMaximum() != -1) {
//we need to find the locations where the instances that can process this module are located.
.filter(me -> e.getValue().contains(me.getKey()))
.map(me -> me.getValue())
.filter(l -> modActivity.isValidLocation(l))
.filter(l -> modActivity.getLocationMaximum() > modActivity.getCurrentLocationActivity(l)) //remove any locations that have already reached their maximum
.forEach(l -> result.getValue().add(l));
} else {
//we need to filter the available instances by those with a valid location for this module.
String[] validInstances = e.getValue().stream()
.filter(inst -> modActivity.isValidLocation(instanceLocationMap.get(inst)))
if(validInstances != null && validInstances.length > 0) {
} /*else {
result.getValue().add((e.getValue().toArray(new String[] {}))[NOMINATION_RND.nextInt(e.getValue().size())]);
}*/ //this should never happen.
return result; //new KeyValue<>(e.getKey(), (e.getValue().toArray(new String[] {}))[NOMINATION_RND.nextInt(e.getValue().size())]);
//4. now we can dispatch the execution messages since we know what to run and where to run it.
List<String> instanceQueueUrlList = CloudProvider.getInstance().getRuntime().listQueueIds(null, false);
.flatMap(e -> e.getValue().stream().map(inst -> new KeyValue<>(inst,e.getKey())))
.forEach(e -> {
final String queueUrl = -> qId.endsWith(e.getKey())).findFirst().orElse(e.getKey());
if(queueUrl != null) {
final String[] moduleClassNameAndVersion = e.getValue().split("_");
final int moduleId = instanceModuleMap.values().stream()
.filter(m -> m.getImplementation().equals(moduleClassNameAndVersion[0]))
.map(m -> m.getId())
if(moduleId != -1) {
if(RUNNING.get()) {
jemoServer.sendRunBatchMessage(moduleId, moduleClassNameAndVersion[0], Double.parseDouble(moduleClassNameAndVersion[1]), queueUrl);
//now we should sleep for a bit (1 second) before trying again.
try { TimeUnit.SECONDS.sleep(1); }catch(InterruptedException irrEx) {}