blob: efa081aa02b6eef7f544fce26384cccf811af525 [file] [log] [blame]
/*
* Copyright (c) 2017 CEA.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* CEA - initial API and implementation
*/
package org.eclipse.sensinact.gateway.generic;
import org.eclipse.sensinact.gateway.common.bundle.Mediator;
import org.eclipse.sensinact.gateway.common.execution.Executable;
import org.eclipse.sensinact.gateway.core.ResourceConfig;
import org.eclipse.sensinact.gateway.core.method.AccessMethod;
import org.eclipse.sensinact.gateway.util.UriUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
* Manages {@link Task}s to execute
*
* @author <a href="mailto:christophe.munilla@cea.fr">Christophe Munilla</a>
*/
public abstract class TaskManager {
public static final char IDENTIFIER_SEP_CHAR = '#';
/**
* Locker object
*/
private final Object lock = new Object();
/**
* tasks storage attempting for free token events
*/
protected final Deque<Task> waitingTasks;
/**
* executed tasks storage mapped to requirer
* ServiceProvider's identifier
*/
protected final Map<String, List<Task>> executedTasks;
/**
* the associated {@link TokenEventProvider}
*/
protected TaskDesynchronizer desynchronizer;
/**
* the {@link TaskTranslator} to which
* to ask for Task creation and transmission
*/
protected final TaskTranslator connector;
/**
* the associated {@link Mediator}
*/
protected Mediator mediator;
/**
* Constructor
*
* @param requireTokenEventProvider
* @param desynchronizer the associated token provider
*/
public TaskManager(Mediator mediator, TaskTranslator connector, boolean initialLockState, boolean isDesynchronized) {
this.mediator = mediator;
this.connector = connector;
this.waitingTasks = new LinkedList<Task>();
this.executedTasks = new HashMap<String, List<Task>>();
if (isDesynchronized) {
this.desynchronizer = new TaskDesynchronizer(mediator);
this.desynchronizer.setLocked(initialLockState);
new Thread(this.desynchronizer).start();
}
}
/**
* @inheritDoc
* @see org.eclipse.sensinact.gateway.generic.core.Desynchronizable#
* nextTask()
*/
public void nextTask() {
Task task = null;
synchronized (this.waitingTasks) {
if (!this.waitingTasks.isEmpty()) {
task = this.waitingTasks.poll();
}
}
if (task != null) {
if (!task.isResultAvailable()) {
String taskIdentifier = task.getTaskIdentifier();
if (taskIdentifier != null) {
this.add(UriUtils.getRoot(task.getPath()).substring(1), task);
}
task.execute();
if (taskIdentifier == null) {
//if no identifier avoid a task running in the
//background for a result that will never come
task.abort(AccessMethod.EMPTY);
}
}
} else if (this.mediator.isDebugLoggable()) {
this.mediator.debug("No task left to execute");
}
synchronized (lock) {
if (this.desynchronizer != null) {
this.desynchronizer.freeingToken();
}
}
}
/**
* Returns the {@link Task} whose identifier is
* passed as parameter
*
* @param taskIdentifier the string identifier of the {@link Task}
* to return
* @return the {@link Task} with the specified identifier
*/
protected List<Task> remove(String taskIdentifier) {
if (taskIdentifier == null) {
return Collections.<Task>emptyList();
}
int index = taskIdentifier.indexOf(IDENTIFIER_SEP_CHAR);
if (index == -1) {
return Collections.<Task>emptyList();
}
String serviceProviderIdentifier = taskIdentifier.substring(0, index);
List<Task> tasks = new ArrayList<Task>();
synchronized (this.executedTasks) {
List<Task> executeds = this.executedTasks.get(serviceProviderIdentifier);
if (executeds == null) {
return tasks;
}
Iterator<Task> iterator = executeds.iterator();
while (iterator.hasNext()) {
Task task = iterator.next();
if (taskIdentifier.equals(task.getTaskIdentifier())) {
iterator.remove();
tasks.add(task);
}
}
}
return tasks;
}
/**
* Adds the {@link Task} passed as parameter to the list of
* ones mapped to the specified identifier of the {@link
* ServiceProvider} requirer
*
* @param task the {@link Task} to add
* @param serviceProviderIdentifier the string identifier of the {@link ServiceProvider}
* requirer
*/
private void add(String serviceProviderIdentifier, Task task) {
List<Task> tasks = null;
synchronized (this.executedTasks) {
tasks = this.executedTasks.get(serviceProviderIdentifier);
if (tasks == null) {
tasks = new ArrayList<Task>();
this.executedTasks.put(serviceProviderIdentifier, tasks);
}
tasks.add(task);
task.registerCallBack(new TaskCallBack(new Executable<Task, Void>() {
@Override
public Void execute(Task task) throws Exception {
TaskManager.this.remove(task.getTaskIdentifier());
return null;
}
}));
}
}
/**
* Asks for an asynchronous task execution. Builds a {@link Task} object and
* stores it in the list of waiting ones
*
* @param identifier the requirer {@link PacketProcessor} string identifier
* @param command CommandType : GET, SET, ACT, SUBSCRIBE,
* UNSUBSCRIBE OR SERVICES_ENUMERATION
* @param profileId
* @param resourceConfig the {@link ExtResourceConfig} mapped to the resource on which
* applies the method
* @param parameters the objects array parameterizing the method invocation
* @return the {@link Future} object associated to the result of the
* task execution
*/
public Task execute(Task.CommandType command, String path, String profileId, ResourceConfig resourceConfig, Object[] parameters) {
Task task = null;
switch (command) {
case GET:
task = this.connector.createTask(this.mediator, Task.CommandType.GET, path, profileId, resourceConfig, parameters);
break;
case SET:
task = this.connector.createTask(this.mediator, Task.CommandType.SET, path, profileId, resourceConfig, parameters);
break;
case ACT:
task = this.connector.createTask(this.mediator, Task.CommandType.ACT, path, profileId, resourceConfig, parameters);
break;
case SUBSCRIBE:
task = this.connector.createTask(this.mediator, Task.CommandType.SUBSCRIBE, path, profileId, resourceConfig, parameters);
break;
case UNSUBSCRIBE:
task = this.connector.createTask(this.mediator, Task.CommandType.UNSUBSCRIBE, path, profileId, resourceConfig, parameters);
break;
case SERVICES_ENUMERATION:
task = this.connector.createTask(this.mediator, Task.CommandType.SERVICES_ENUMERATION, path, profileId, resourceConfig, parameters);
break;
default:
;
}
if (task != null) {
return this.execute(task);
}
return null;
}
/**
* Asks for an asynchronous task execution
*
* @param task the task to treat
*/
public Task execute(Task task) {
synchronized (this.waitingTasks) {
this.waitingTasks.offer(task);
}
synchronized (lock) {
if (this.desynchronizer == null) {
this.nextTask();
} else {
this.desynchronizer.require(this);
}
}
return task;
}
/**
* Stops the associated {@link TokenEventProvider} and
* {@link ExecutorService} if not null
*/
public void stop() {
synchronized (lock) {
if (this.desynchronizer != null) {
this.desynchronizer.stop();
this.desynchronizer = null;
}
}
synchronized (this.waitingTasks) {
this.waitingTasks.clear();
}
}
/**
* Calls the <code>freeingToken</code> method of the
* associated {@link TokenEventProvider} to unlock it
*/
protected void unlock() {
synchronized (lock) {
if (this.desynchronizer != null) {
this.desynchronizer.freeingToken();
}
}
}
}