blob: 6cd64904769b53481d7104825bbbdfa43d8c9140 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2004, 2007 Boeing.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Boeing - initial API and implementation
*******************************************************************************/
package org.eclipse.osee.ote.messaging.dds.entity;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.eclipse.osee.ote.messaging.dds.DataStoreItem;
import org.eclipse.osee.ote.messaging.dds.NotImplementedException;
import org.eclipse.osee.ote.messaging.dds.ReturnCode;
import org.eclipse.osee.ote.messaging.dds.StatusKind;
import org.eclipse.osee.ote.messaging.dds.listener.DataReaderListener;
import org.eclipse.osee.ote.messaging.dds.listener.Listener;
import org.eclipse.osee.ote.messaging.dds.listener.SubscriberListener;
import org.eclipse.osee.ote.messaging.dds.service.TopicDescription;
import org.eclipse.osee.ote.messaging.dds.status.SampleLostStatus;
/**
* Provides functionality for applications to create <code>DataReader</code>'s to receive the appropriate published
* data. Maintains the data which is made available to those <code>DataReader</code> 's.
*
* @author Robert A. Fisher
* @author David Diepenbrock
*/
public class Subscriber extends DomainEntity implements EntityFactory {
private final ConcurrentHashMap<TopicDescription, CopyOnWriteArrayList<DataReader>> topicMap =
new ConcurrentHashMap<TopicDescription, CopyOnWriteArrayList<DataReader>>(512);
private final DomainParticipant participant;
private final ExecutorService threadService;
/**
* Constructor for <code>Subscriber</code> with the provided listener attached, and enabled status set as passed.
* This constructor is only visible to the DDS system, applications should use
* {@link DomainParticipant#createSubscriber(SubscriberListener)}to create a <code>Subscriber</code>.
*
* @param participant The participant creating this <code>Subscriber</code>. This is also used as the parentEntity
* for enabling purposes.
* @param enabled If <b>true </b>, the created <code>Subscriber</code> will be enabled iff the parentEntity is
* enabled.
* @param listener The listener to attach to the created subscriber.
*/
Subscriber(DomainParticipant participant, boolean enabled, Listener listener) {
super(enabled, listener, participant);
this.participant = participant;
threadService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
}
/**
* Gets the attached listener.
*
* @return The <code>SubscriberListener</code> attached to this.
* @see Entity#getBaseListener()
*/
public SubscriberListener getListener() {
return (SubscriberListener) super.getBaseListener();
}
/**
* Sets the <code>SubscriberListener</code> attached to this. If a listener is already set, this will replace it.
*
* @see Entity#setBaseListener(Listener, StatusKind)
*/
public ReturnCode setListener(SubscriberListener listener, StatusKind mask) {
return super.setBaseListener(listener, mask);
}
/**
* Creates a <code>DataReader</code> for the given <code>Topic</code> and adds it to this <code>Subscriber</code>.
*
* @param topicDescription The <code>Topic</code> associated with the <code>DataReader</code> to be created.
* @param listener The <code>DataReaderListener</code> to be attached to the created <code>DataReader</code>.
* @return The <code>DataReader</code> created, or null if it could not be created.
*/
public DataReader createDataReader(TopicDescription topicDescription, DataReaderListener listener) {
DataReader dataReader = null;
if (true != topicDescription instanceof Topic) {
return null;
}
//TUNE - get the classloader & constructor during Topic creation instead of here
try {
// Class readerClass = topic.getTypeSignature().getClassLoader().loadClass(topic.getTypeSignature().getReaderName());
//
// Constructor constructor = readerClass.getConstructor(new Class[] {TopicDescription.class, Subscriber.class, Boolean.class, DataReaderListener.class,
// EntityFactory.class});
//
// dataReader = (DataReader) constructor.newInstance(new Object[] {topicDescription, this, new Boolean(this.isEnabled()), listener, this});
dataReader = new DataReader(topicDescription, this, new Boolean(this.isEnabled()), listener, this);
}
// catch (InstantiationException ex) {
// ex.printStackTrace();
// }
// catch (IllegalAccessException ex) {
// ex.printStackTrace();
// }
// catch (ClassNotFoundException ex) {
// ex.printStackTrace();
// }
catch (IllegalArgumentException ex) {
ex.printStackTrace();
} catch (SecurityException ex) {
ex.printStackTrace();
}
// catch (InvocationTargetException ex) {
// ex.printStackTrace();
// }
// catch (NoSuchMethodException ex) {
// ex.printStackTrace();
// }
catch (OutOfMemoryError er) {
er.printStackTrace();
}
// Only keep this writer if it was successfully created.
if (dataReader != null) {
CopyOnWriteArrayList<DataReader> readers = topicMap.get(topicDescription);
if (readers == null) {
readers = new CopyOnWriteArrayList<>();
topicMap.put(topicDescription, readers);
}
readers.add(dataReader);
}
return dataReader;
}
/**
* Removes the <code>DataReader</code> from this <code>Subscriber</code>. If the reader was already deleted, or was
* not created by this <code>Subscriber</code>, an error is returned.
*
* @param dataReader The reader to delete.
* @return {@link ReturnCode#OK}if the reader was successfully deleted, otherwise {@link ReturnCode#NOT_ENABLED}if
* this <code>Subscriber</code> is not enabled, or {@link ReturnCode#ALREADY_DELETED}if the reader has already been
* deleted, or {@link ReturnCode#PRECONDITION_NOT_MET}if dataReader was not created by this <code>Subscriber</code>.
*/
public ReturnCode deleteDataReader(DataReader dataReader) {
// Check that the Entity is enabled before proceeding (See description of enable on the Entity object in the DDS spec)
if (!isEnabled()) {
return ReturnCode.NOT_ENABLED;
}
// Check that the writer is not already marked as deleted (in case others kept a reference to it
if (dataReader.isDeleted()) {
return ReturnCode.ALREADY_DELETED;
}
// boolean found = dataReaders.remove(dataReader);
if (topicMap.remove(dataReader.getTopicDescription()) != null) {
dataReader.markAsDeleted();
return ReturnCode.OK;
} else {
return ReturnCode.PRECONDITION_NOT_MET;
}
}
/**
* This method is here for future functionality that is described in the DDS specification but has not been
* implemented or used.
*/
public DataReader lookupDataReader(String topicName) {
// UNSURE This method has not been implemented, but is called out in the spec
if (true) {
throw new NotImplementedException();
}
return null;
}
/**
* This method is here for future functionality that is described in the DDS specification but has not been
* implemented or used.
*/
public ReturnCode beginAccess() {
// DONT_NEED This method has not been implemented, but is called out in the spec
if (true) {
throw new NotImplementedException();
}
// Check that the Entity is enabled before proceeding (See description of enable on the Entity object in the DDS spec)
if (!isEnabled()) {
return ReturnCode.NOT_ENABLED;
}
return ReturnCode.ERROR;
}
/**
* This method is here for future functionality that is described in the DDS specification but has not been
* implemented or used.
*/
public ReturnCode endAccess() {
// DONT_NEED This method has not been implemented, but is called out in the spec
if (true) {
throw new NotImplementedException();
}
// Check that the Entity is enabled before proceeding (See description of enable on the Entity object in the DDS spec)
if (!isEnabled()) {
return ReturnCode.NOT_ENABLED;
}
return ReturnCode.ERROR;
}
/**
* This method is here for future functionality that is described in the DDS specification but has not been
* implemented or used.
*/
public ReturnCode getDataReaders(Collection<DataReader> dataReaders, Collection<?> sampleStateKind, Collection<?> viewStateKind, Collection<?> instanceStateKind) {
// UNSURE This method has not been implemented, but is called out in the spec
if (true) {
throw new NotImplementedException();
}
// Check that the Entity is enabled before proceeding (See description of enable on the Entity object in the DDS spec)
if (!isEnabled()) {
return ReturnCode.NOT_ENABLED;
}
return ReturnCode.ERROR;
}
/**
* This method is here for future functionality that is described in the DDS specification but has not been
* implemented or used.
*/
public void notifyDataReaders() {
// UNSURE This method has not been implemented, but is called out in the spec
if (true) {
throw new NotImplementedException();
}
}
/**
* This method is here for future functionality that is described in the DDS specification but has not been
* implemented or used.
*/
public SampleLostStatus getSampleLostStatus() {
// UNSURE This method has not been implemented, but is called out in the spec
if (true) {
throw new NotImplementedException();
}
return null;
}
/**
* @return the <code>DomainParticipant</code> to which this belongs.
*/
public DomainParticipant getParticipant() {
// NOTE this method is part of the DDS spec.
return participant;
}
/**
* Deletes all of the <code>DataReader</code> objects currently in this <code>Subscriber</code>.
*
* @return {@link ReturnCode#OK}if the readers were successfully deleted, otherwise {@link ReturnCode#NOT_ENABLED}if
* this <code>Subscriber</code> is not enabled.
*/
public ReturnCode deleteContainedEntities() {
/*
* PARTIAL per the DDS spec, this should also delete the ReadCondition and QueryCondition objects stored in the
* <code> DataReader </code> 's, however because we haven't implemented the conditions they are not deleted here.
*/
// Check that the Entity is enabled before proceeding (See description of enable on the Entity object in the DDS spec)
if (!isEnabled()) {
return ReturnCode.NOT_ENABLED;
}
for (CopyOnWriteArrayList<DataReader> readers : topicMap.values()) {
for (DataReader reader : readers) {
reader.markAsDeleted();
}
readers.clear();
}
topicMap.clear();
return ReturnCode.OK;
}
/**
* @return <b>true </b> iff this <code>Subscriber</code> currently has any data readers.
*/
public boolean hasDataReaders() {
return !topicMap.isEmpty();
}
/**
* Processes new data received. If this Subscriber has at least one DataReader who is interested in this data, and
* the <code>Subscriber</code> has an attached listener, notify it via
* {@link SubscriberListener#onDataOnReaders(CopyOfSubscriber)}. Otherwise, if there is no
* <code>SubscriberListener</code> then notify all of the interested <code>DataReader</code>'s via
* {@link DataReaderListener#onDataAvailable(DataReader)}. The data is stored so that it is available for a reader to
* read or take.
*
* @param dataStoreItem The newly published data.
*/
void processNewData(final DataStoreItem dataStoreItem) { // This has package scope since it is a system-level type call
final Collection<DataReader> readers = topicMap.get(dataStoreItem.getTheTopicDescription());
if (readers != null && !readers.isEmpty()) {
// SPEC NOTE: The following listener logic is based on paragraph "Listener access to Read Communication Status"
// in the DDS specification.
// Invoke the SubscriberListener if available
SubscriberListener listener = getListener();
if (listener != null) {
listener.onDataOnReaders(this);
// Otherwise, invoke the DataReaderListeners
} else {
// Check all DataReader objects for listeners
for (final DataReader reader : readers) {
reader.processNewData(dataStoreItem);
/*
* threadService.submit(new Runnable() { public void run() { reader.processNewData(dataStoreItem); } });
*/
}
}
}
}
}