| /******************************************************************************* |
| * Copyright (c) 2004, 2007 Boeing. |
| * All rights reserved. This program and the accompanying materials |
| * are made available under the terms of the Eclipse Public License v1.0 |
| * which accompanies this distribution, and is available at |
| * http://www.eclipse.org/legal/epl-v10.html |
| * |
| * Contributors: |
| * Boeing - initial API and implementation |
| *******************************************************************************/ |
| package org.eclipse.osee.ote.messaging.dds.entity; |
| |
| import java.util.Collection; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import org.eclipse.osee.ote.messaging.dds.DataStoreItem; |
| import org.eclipse.osee.ote.messaging.dds.NotImplementedException; |
| import org.eclipse.osee.ote.messaging.dds.ReturnCode; |
| import org.eclipse.osee.ote.messaging.dds.StatusKind; |
| import org.eclipse.osee.ote.messaging.dds.listener.DataReaderListener; |
| import org.eclipse.osee.ote.messaging.dds.listener.Listener; |
| import org.eclipse.osee.ote.messaging.dds.listener.SubscriberListener; |
| import org.eclipse.osee.ote.messaging.dds.service.TopicDescription; |
| import org.eclipse.osee.ote.messaging.dds.status.SampleLostStatus; |
| |
| /** |
| * Provides functionality for applications to create <code>DataReader</code>'s to receive the appropriate published |
| * data. Maintains the data which is made available to those <code>DataReader</code> 's. |
| * |
| * @author Robert A. Fisher |
| * @author David Diepenbrock |
| */ |
| public class Subscriber extends DomainEntity implements EntityFactory { |
| |
| private final ConcurrentHashMap<TopicDescription, CopyOnWriteArrayList<DataReader>> topicMap = |
| new ConcurrentHashMap<TopicDescription, CopyOnWriteArrayList<DataReader>>(512); |
| private final DomainParticipant participant; |
| private final ExecutorService threadService; |
| |
| /** |
| * Constructor for <code>Subscriber</code> with the provided listener attached, and enabled status set as passed. |
| * This constructor is only visible to the DDS system, applications should use |
| * {@link DomainParticipant#createSubscriber(SubscriberListener)}to create a <code>Subscriber</code>. |
| * |
| * @param participant The participant creating this <code>Subscriber</code>. This is also used as the parentEntity |
| * for enabling purposes. |
| * @param enabled If <b>true </b>, the created <code>Subscriber</code> will be enabled iff the parentEntity is |
| * enabled. |
| * @param listener The listener to attach to the created subscriber. |
| */ |
| Subscriber(DomainParticipant participant, boolean enabled, Listener listener) { |
| super(enabled, listener, participant); |
| this.participant = participant; |
| threadService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1); |
| } |
| |
| /** |
| * Gets the attached listener. |
| * |
| * @return The <code>SubscriberListener</code> attached to this. |
| * @see Entity#getBaseListener() |
| */ |
| public SubscriberListener getListener() { |
| return (SubscriberListener) super.getBaseListener(); |
| } |
| |
| /** |
| * Sets the <code>SubscriberListener</code> attached to this. If a listener is already set, this will replace it. |
| * |
| * @see Entity#setBaseListener(Listener, StatusKind) |
| */ |
| public ReturnCode setListener(SubscriberListener listener, StatusKind mask) { |
| return super.setBaseListener(listener, mask); |
| } |
| |
| /** |
| * Creates a <code>DataReader</code> for the given <code>Topic</code> and adds it to this <code>Subscriber</code>. |
| * |
| * @param topicDescription The <code>Topic</code> associated with the <code>DataReader</code> to be created. |
| * @param listener The <code>DataReaderListener</code> to be attached to the created <code>DataReader</code>. |
| * @return The <code>DataReader</code> created, or null if it could not be created. |
| */ |
| public DataReader createDataReader(TopicDescription topicDescription, DataReaderListener listener) { |
| DataReader dataReader = null; |
| |
| if (true != topicDescription instanceof Topic) { |
| return null; |
| } |
| |
| //TUNE - get the classloader & constructor during Topic creation instead of here |
| try { |
| // Class readerClass = topic.getTypeSignature().getClassLoader().loadClass(topic.getTypeSignature().getReaderName()); |
| // |
| // Constructor constructor = readerClass.getConstructor(new Class[] {TopicDescription.class, Subscriber.class, Boolean.class, DataReaderListener.class, |
| // EntityFactory.class}); |
| // |
| // dataReader = (DataReader) constructor.newInstance(new Object[] {topicDescription, this, new Boolean(this.isEnabled()), listener, this}); |
| |
| dataReader = new DataReader(topicDescription, this, new Boolean(this.isEnabled()), listener, this); |
| } |
| // catch (InstantiationException ex) { |
| // ex.printStackTrace(); |
| // } |
| // catch (IllegalAccessException ex) { |
| // ex.printStackTrace(); |
| // } |
| // catch (ClassNotFoundException ex) { |
| // ex.printStackTrace(); |
| // } |
| catch (IllegalArgumentException ex) { |
| ex.printStackTrace(); |
| } catch (SecurityException ex) { |
| ex.printStackTrace(); |
| } |
| // catch (InvocationTargetException ex) { |
| // ex.printStackTrace(); |
| // } |
| // catch (NoSuchMethodException ex) { |
| // ex.printStackTrace(); |
| // } |
| catch (OutOfMemoryError er) { |
| er.printStackTrace(); |
| } |
| |
| // Only keep this writer if it was successfully created. |
| if (dataReader != null) { |
| CopyOnWriteArrayList<DataReader> readers = topicMap.get(topicDescription); |
| if (readers == null) { |
| readers = new CopyOnWriteArrayList<>(); |
| topicMap.put(topicDescription, readers); |
| } |
| readers.add(dataReader); |
| } |
| return dataReader; |
| } |
| |
| /** |
| * Removes the <code>DataReader</code> from this <code>Subscriber</code>. If the reader was already deleted, or was |
| * not created by this <code>Subscriber</code>, an error is returned. |
| * |
| * @param dataReader The reader to delete. |
| * @return {@link ReturnCode#OK}if the reader was successfully deleted, otherwise {@link ReturnCode#NOT_ENABLED}if |
| * this <code>Subscriber</code> is not enabled, or {@link ReturnCode#ALREADY_DELETED}if the reader has already been |
| * deleted, or {@link ReturnCode#PRECONDITION_NOT_MET}if dataReader was not created by this <code>Subscriber</code>. |
| */ |
| public ReturnCode deleteDataReader(DataReader dataReader) { |
| |
| // Check that the Entity is enabled before proceeding (See description of enable on the Entity object in the DDS spec) |
| if (!isEnabled()) { |
| return ReturnCode.NOT_ENABLED; |
| } |
| |
| // Check that the writer is not already marked as deleted (in case others kept a reference to it |
| if (dataReader.isDeleted()) { |
| return ReturnCode.ALREADY_DELETED; |
| } |
| |
| // boolean found = dataReaders.remove(dataReader); |
| |
| if (topicMap.remove(dataReader.getTopicDescription()) != null) { |
| dataReader.markAsDeleted(); |
| return ReturnCode.OK; |
| } else { |
| return ReturnCode.PRECONDITION_NOT_MET; |
| } |
| |
| } |
| |
| /** |
| * This method is here for future functionality that is described in the DDS specification but has not been |
| * implemented or used. |
| */ |
| public DataReader lookupDataReader(String topicName) { |
| // UNSURE This method has not been implemented, but is called out in the spec |
| if (true) { |
| throw new NotImplementedException(); |
| } |
| return null; |
| } |
| |
| /** |
| * This method is here for future functionality that is described in the DDS specification but has not been |
| * implemented or used. |
| */ |
| public ReturnCode beginAccess() { |
| // DONT_NEED This method has not been implemented, but is called out in the spec |
| if (true) { |
| throw new NotImplementedException(); |
| } |
| |
| // Check that the Entity is enabled before proceeding (See description of enable on the Entity object in the DDS spec) |
| if (!isEnabled()) { |
| return ReturnCode.NOT_ENABLED; |
| } |
| |
| return ReturnCode.ERROR; |
| } |
| |
| /** |
| * This method is here for future functionality that is described in the DDS specification but has not been |
| * implemented or used. |
| */ |
| public ReturnCode endAccess() { |
| // DONT_NEED This method has not been implemented, but is called out in the spec |
| if (true) { |
| throw new NotImplementedException(); |
| } |
| |
| // Check that the Entity is enabled before proceeding (See description of enable on the Entity object in the DDS spec) |
| if (!isEnabled()) { |
| return ReturnCode.NOT_ENABLED; |
| } |
| |
| return ReturnCode.ERROR; |
| } |
| |
| /** |
| * This method is here for future functionality that is described in the DDS specification but has not been |
| * implemented or used. |
| */ |
| public ReturnCode getDataReaders(Collection<DataReader> dataReaders, Collection<?> sampleStateKind, Collection<?> viewStateKind, Collection<?> instanceStateKind) { |
| // UNSURE This method has not been implemented, but is called out in the spec |
| if (true) { |
| throw new NotImplementedException(); |
| } |
| |
| // Check that the Entity is enabled before proceeding (See description of enable on the Entity object in the DDS spec) |
| if (!isEnabled()) { |
| return ReturnCode.NOT_ENABLED; |
| } |
| |
| return ReturnCode.ERROR; |
| } |
| |
| /** |
| * This method is here for future functionality that is described in the DDS specification but has not been |
| * implemented or used. |
| */ |
| public void notifyDataReaders() { |
| // UNSURE This method has not been implemented, but is called out in the spec |
| if (true) { |
| throw new NotImplementedException(); |
| } |
| } |
| |
| /** |
| * This method is here for future functionality that is described in the DDS specification but has not been |
| * implemented or used. |
| */ |
| public SampleLostStatus getSampleLostStatus() { |
| // UNSURE This method has not been implemented, but is called out in the spec |
| if (true) { |
| throw new NotImplementedException(); |
| } |
| return null; |
| } |
| |
| /** |
| * @return the <code>DomainParticipant</code> to which this belongs. |
| */ |
| public DomainParticipant getParticipant() { |
| // NOTE this method is part of the DDS spec. |
| return participant; |
| } |
| |
| /** |
| * Deletes all of the <code>DataReader</code> objects currently in this <code>Subscriber</code>. |
| * |
| * @return {@link ReturnCode#OK}if the readers were successfully deleted, otherwise {@link ReturnCode#NOT_ENABLED}if |
| * this <code>Subscriber</code> is not enabled. |
| */ |
| public ReturnCode deleteContainedEntities() { |
| /* |
| * PARTIAL per the DDS spec, this should also delete the ReadCondition and QueryCondition objects stored in the |
| * <code> DataReader </code> 's, however because we haven't implemented the conditions they are not deleted here. |
| */ |
| |
| // Check that the Entity is enabled before proceeding (See description of enable on the Entity object in the DDS spec) |
| if (!isEnabled()) { |
| return ReturnCode.NOT_ENABLED; |
| } |
| |
| for (CopyOnWriteArrayList<DataReader> readers : topicMap.values()) { |
| for (DataReader reader : readers) { |
| reader.markAsDeleted(); |
| } |
| readers.clear(); |
| } |
| topicMap.clear(); |
| return ReturnCode.OK; |
| } |
| |
| /** |
| * @return <b>true </b> iff this <code>Subscriber</code> currently has any data readers. |
| */ |
| public boolean hasDataReaders() { |
| return !topicMap.isEmpty(); |
| } |
| |
| /** |
| * Processes new data received. If this Subscriber has at least one DataReader who is interested in this data, and |
| * the <code>Subscriber</code> has an attached listener, notify it via |
| * {@link SubscriberListener#onDataOnReaders(CopyOfSubscriber)}. Otherwise, if there is no |
| * <code>SubscriberListener</code> then notify all of the interested <code>DataReader</code>'s via |
| * {@link DataReaderListener#onDataAvailable(DataReader)}. The data is stored so that it is available for a reader to |
| * read or take. |
| * |
| * @param dataStoreItem The newly published data. |
| */ |
| void processNewData(final DataStoreItem dataStoreItem) { // This has package scope since it is a system-level type call |
| final Collection<DataReader> readers = topicMap.get(dataStoreItem.getTheTopicDescription()); |
| if (readers != null && !readers.isEmpty()) { |
| |
| // SPEC NOTE: The following listener logic is based on paragraph "Listener access to Read Communication Status" |
| // in the DDS specification. |
| |
| // Invoke the SubscriberListener if available |
| SubscriberListener listener = getListener(); |
| if (listener != null) { |
| listener.onDataOnReaders(this); |
| |
| // Otherwise, invoke the DataReaderListeners |
| } else { |
| // Check all DataReader objects for listeners |
| for (final DataReader reader : readers) { |
| reader.processNewData(dataStoreItem); |
| /* |
| * threadService.submit(new Runnable() { public void run() { reader.processNewData(dataStoreItem); } }); |
| */ |
| |
| } |
| } |
| } |
| } |
| } |