blob: 64fb487d0610133ac44600f8d55b9fe0e0a4ff2b [file] [log] [blame]
/*
* Copyright (c) 2020 Kentyou.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Kentyou - initial API and implementation
*/
package org.eclipse.sensinact.gateway.generic;
import org.eclipse.sensinact.gateway.common.bundle.Mediator;
import org.eclipse.sensinact.gateway.common.primitive.Name;
import org.eclipse.sensinact.gateway.core.InvalidServiceProviderException;
import org.eclipse.sensinact.gateway.core.ServiceProvider;
import org.eclipse.sensinact.gateway.generic.packet.InvalidPacketException;
import org.eclipse.sensinact.gateway.generic.packet.Packet;
import org.eclipse.sensinact.gateway.generic.packet.PacketReader;
import org.eclipse.sensinact.gateway.generic.packet.PayloadFragment;
import org.eclipse.sensinact.gateway.generic.packet.TaskIdValuePair;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* Manages IO between a {@link ProtocolStackEndpoint} and
* a set of {@link ServiceProvider}s
*
* @author <a href="mailto:cmunilla@kentyou.com">Christophe Munilla</a>
*/
public class Connector<P extends Packet> extends TaskManager {
/**
* map of managed {@link ExtModelInstance}s
*/
protected final List<ExtModelInstance<?>> instances;
/**
* The global XML formated sensiNact resource model
* configuration
*/
protected final ExtModelConfiguration extModelConfiguration;
/**
* The model instance builder
*/
protected final ExtModelInstanceBuilder extModelInstanceBuilder;
/**
* the {@link ConnectorCustomizer} handling the {@link PacketReader}
* initialization
*/
protected ConnectorCustomizer<P> customizer;
/**
* Initial lock status
*/
protected boolean locked;
/**
* Constructor
*
* @param context The associated {@link BundleContext}
* @param locked Defines the initial lock state of the
* {@link TokenEventProvider} to instantiate
*/
public Connector(Mediator mediator,
ProtocolStackEndpoint<?> endpoint,
ExtModelConfiguration extModelConfiguration,
ConnectorCustomizer<P> customizer) {
super(mediator, endpoint, extModelConfiguration.isLockedAtInitializationTime(), extModelConfiguration.isDesynchronized());
this.extModelConfiguration = extModelConfiguration;
this.extModelInstanceBuilder = new ExtModelInstanceBuilder(mediator);
this.extModelInstanceBuilder.withConnector(this);
this.locked = extModelConfiguration.isLockedAtInitializationTime();
this.instances = new ArrayList<ExtModelInstance<?>>();
this.customizer = customizer;
this.configureCustomizer();
}
/**
* Constructor
*
* @param context The associated {@link BundleContext}
* @param locked Defines the initial lock state of the
* {@link TokenEventProvider} to instantiate
*/
public Connector(Mediator mediator, ProtocolStackEndpoint<?> endpoint, ExtModelConfiguration ExtModelConfiguration) {
this(mediator, endpoint, ExtModelConfiguration, null);
}
/**
* Configures this Connector's ConnectorCustomiser according
* to initial configuration properties
*
* @param packetType this Connector's Packet type
* @throws InvalidPacketTypeException
*/
protected void configureCustomizer() {
try {
if (this.customizer == null) {
this.customizer = new DefaultConnectorCustomizer<P>(mediator, this.extModelConfiguration);
}
} catch (Exception e) {
mediator.error(e);
}
}
/**
* @throws InvalidServiceProviderException
* @inheritDoc
* @see PacketReader#process(Packet)
*/
public void process(P packet) throws InvalidPacketException {
if (!this.customizer.preProcessing(packet)) {
if (super.mediator.isDebugLoggable()) {
super.mediator.debug("Do not process the received packet : exiting");
}
return;
}
PacketReader<P> reader = this.customizer.newPacketReader(packet);
if (reader == null) {
throw new InvalidPacketException("Unable to create an appropriate reader");
}
Iterator<TaskIdValuePair> iterator = reader.getTaskIdValuePairs();
while (iterator.hasNext()) {
TaskIdValuePair taskIdValuePair = iterator.next();
String taskIdentifier = taskIdValuePair.taskIdentifier;
// No need to process if the protocol allows
// to identify the response according to the
// initial Task object
List<Task> tasks = super.remove(taskIdentifier);
Iterator<Task> taskIterator = tasks.iterator();
boolean treated = false;
while (taskIterator.hasNext()) {
Task task = taskIterator.next();
if (task != null && !task.isResultAvailable()) {
task.setResult(taskIdValuePair.getValue(), taskIdValuePair.getTimestamp());
treated = true;
}
}
if (treated) {
reader.treated(taskIdentifier);
}
}
Iterator<PayloadFragment> subPacketIterator = reader.iterator();
while (subPacketIterator.hasNext()) {
PayloadFragment subPacket = subPacketIterator.next();
String serviceProviderName = subPacket.getServiceProviderIdentifier();
if (serviceProviderName == null) {
if (this.mediator.isDebugLoggable()) {
this.mediator.debug("Unable to identify the targeted service provider");
}
continue;
}
int index = -1;
ExtModelInstance<?> instance = null;
ExtServiceProviderImpl serviceProvider = null;
if ((index = this.instances.indexOf(new Name<ExtModelInstance<?>>(serviceProviderName))) > -1) {
instance = this.instances.get(index);
}
if (subPacket.isGoodByeMessage()) {
this.processGoodbye(instance);
if (index > -1) {
this.instances.remove(index);
}
continue;
}
if (instance == null) {
try {
instance = this.addModelInstance(subPacket.getProfileId(), serviceProviderName);
if (instance == null) {
continue;
}
super.mediator.debug("Service provider discovered : %s", serviceProviderName);
} catch (InvalidServiceProviderException e) {
throw new InvalidPacketException(e);
}
}
serviceProvider = instance.getRootElement();
if (subPacket.isHelloMessage()) {
this.processHello(serviceProvider);
}
serviceProvider.process(subPacket);
this.customizer.postProcessing(serviceProvider, reader);
}
}
/**
* Processes an 'hello' message sent by the {@link ServiceProvider}
* passed as parameter
*
* @param serviceProvider the {@link ServiceProvider} joining the network
*/
protected void processHello(ExtServiceProviderImpl serviceProvider) {
if (ServiceProvider.LifecycleStatus.INACTIVE.equals(serviceProvider.getStatus())) {
if (super.mediator.isDebugLoggable()) {
super.mediator.debug(new StringBuilder().append("Service provider ").append(serviceProvider.getName()).append("activated").toString());
}
serviceProvider.start();
}
}
/**
* Processes a 'goodbye' message sent by the {@link ServiceProvider}
* passed as parameter
*
* @param serviceProvider the {@link ServiceProvider} leaving the network
*/
protected void processGoodbye(final ExtModelInstance<?> instance) {
if (instance == null) {
if (super.mediator.isDebugLoggable()) {
super.mediator.debug("An unknown model instance is leaving the network");
}
return;
}
if (super.mediator.isInfoLoggable()) {
super.mediator.info(new StringBuilder().append("Service provider '").append(instance.getName()).append("' is leaving the network").toString());
}
instance.unregister();
}
/**
* Creates and returns a new {@link ExtModelInstance} holding
* an {@link ExtServiceProviderImpl} instance whose profile and
* name string are passed as parameter
*
* @param profileId string identifier of the
* profile of the {@link ExtServiceProviderImpl} held by
* the {@link ExtModelInstance} to be instantiated
* @param serviceProviderName string name of the
* {@link ExtServiceProviderImpl} held by
* the {@link ExtModelInstance} to be instantiated
* @return a new {@link ExtModelInstance} instance
* @throws InvalidServiceProviderException
*/
@SuppressWarnings({ "unchecked", "rawtypes", "unused" })
protected synchronized ExtModelInstance<?> addModelInstance(String profileId,
final String serviceProviderName) throws InvalidServiceProviderException {
ExtModelInstance<?> instance = this.extModelInstanceBuilder.<ExtModelConfiguration,
ExtModelInstance>build(serviceProviderName, profileId,this.extModelConfiguration);
if (instance != null) {
this.instances.add(instance);
}
return instance;
}
/**
* Returns the {@link ExtModelInstance} whose name
* is passed as parameter
*
* @param instanceName the name of the {@link ExtModelInstance}
* to return
* @return the {@link ExtModelInstance} with the specified
* name
*/
public ExtModelInstance<?> getModelInstance(String instanceName) {
int index = this.instances.indexOf(new Name<ExtModelInstance<?>>(instanceName));
if (index < 0) {
return null;
}
return this.instances.get(index);
}
/**
* Stops this factory, the created {@link TokenEventProvider} and all
* {@link ExtServiceProviderImpl} instances
*/
public void stop() {
super.stop();
if (this.instances == null || this.instances.size() == 0) {
return;
}
synchronized (this.instances) {
int length = this.instances.size();
for (; length > 0; ) {
try {
this.instances.remove(0).unregister();
} catch (IllegalStateException e) {
}
length = this.instances.size();
}
this.instances.clear();
}
}
}