blob: 16fee90aae9ea10fc12e578724c2beb425883ff6 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2004, 2007 Boeing.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Boeing - initial API and implementation
*******************************************************************************/
package org.eclipse.osee.ote.messaging.dds.entity;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArrayList;
import org.eclipse.osee.ote.messaging.dds.Data;
import org.eclipse.osee.ote.messaging.dds.DataSample;
import org.eclipse.osee.ote.messaging.dds.DataStoreItem;
import org.eclipse.osee.ote.messaging.dds.IDestination;
import org.eclipse.osee.ote.messaging.dds.ISource;
import org.eclipse.osee.ote.messaging.dds.NotImplementedException;
import org.eclipse.osee.ote.messaging.dds.ReturnCode;
import org.eclipse.osee.ote.messaging.dds.StatusKind;
import org.eclipse.osee.ote.messaging.dds.listener.DataWriterListener;
import org.eclipse.osee.ote.messaging.dds.listener.Listener;
import org.eclipse.osee.ote.messaging.dds.listener.PublisherListener;
import org.eclipse.osee.ote.messaging.dds.service.TopicDescription;
/**
* Provides functionality for applications to create <code>DataWriter</code> 's, and control when data written from the
* writers is published.
* <p>
* This class also provides the middleware the ability to publish data directly (i.e. without a <code>DataWriter</code>
* ).
*
* @author Robert A. Fisher
* @author David Diepenbrock
*/
public class Publisher extends DomainEntity implements EntityFactory {
private final DomainParticipant participant;
private boolean publicationsAllowed;
private final CopyOnWriteArrayList<DataWriter> dataWriters;
private final Collection<DataStoreItem> publicationQueue; // Stores pending publications to be processed, possibly be a seperate thread
private final Collection<DataStoreItem> pendingQueue; // Stores publications while publications are suspended
/**
* Constructor for <code>Publisher</code> with the provided listener attached, and enabled status set as passed. This
* constructor is only visible to the DDS system, applications should use
* {@link DomainParticipant#createPublisher(PublisherListener)}to create a <code>Publisher</code>..
*
* @param participant The participant creating this <code>Publisher</code>. This is also used as the parentEntity for
* enabling purposes.
* @param enabled If <b>true </b>, the created <code>Publisher</code> will be enabled iff the parentEntity is
* enabled.
* @param listener The listener to attach to the created publisher.
*/
Publisher(DomainParticipant participant, boolean enabled, PublisherListener listener) {
super(enabled, listener, participant);
this.participant = participant;
publicationsAllowed = true;
dataWriters = new CopyOnWriteArrayList<>();
publicationQueue = new ArrayList<>();
pendingQueue = new ArrayList<>();
}
/**
* Gets the attached listener.
*
* @return The <code>PublisherListener</code> attached to this.
* @see Entity#getBaseListener()
*/
public PublisherListener getListener() {
return (PublisherListener) super.getBaseListener();
}
/**
* Sets the <code>PublisherListener</code> attached to this. If a listener is already set, this will replace it.
*
* @see Entity#setBaseListener(Listener, StatusKind)
*/
public ReturnCode setListener(PublisherListener listener, StatusKind mask) {
return super.setBaseListener(listener, mask);
}
/**
* Creates a <code>DataWriter</code> for the given <code>Topic</code> and adds it to this <code>Publisher</code>.
*
* @param topic The <code>Topic</code> associated with the <code>DataWriter</code> to be created.
* @param listener The <code>DataWriterListener</code> to be attached to created <code>DataWriter</code>.
* @return The <code>DataWriter</code> created, or null if it could not be created.
*/
public DataWriter createDataWriter(Topic topic, DataWriterListener listener) {
DataWriter dataWriter = null;
//TUNE - get the classloader & constructor during Topic creation instead of here
try {
// Class writerClass = topic.getTypeSignature().getClassLoader().loadClass(topic.getTypeSignature().getWriterName());
//
// Constructor constructor = writerClass.getConstructor(new Class[] {Topic.class, Publisher.class, Boolean.class, DataWriterListener.class,
// EntityFactory.class});
// dataWriter = (DataWriter) constructor.newInstance(new Object[] {topic, this, new Boolean(this.isEnabled()), listener, this});
dataWriter = new DataWriter(topic, this, new Boolean(this.isEnabled()), listener, this);
}
// catch (InstantiationException ex) {
// ex.printStackTrace();
// }
// catch (IllegalAccessException ex) {
// ex.printStackTrace();
// }
// catch (ClassNotFoundException ex) {
// ex.printStackTrace();
// }
catch (IllegalArgumentException ex) {
ex.printStackTrace();
} catch (SecurityException ex) {
ex.printStackTrace();
}
// catch (InvocationTargetException ex) {
// ex.printStackTrace();
// }
// catch (NoSuchMethodException ex) {
// ex.printStackTrace();
// }
catch (OutOfMemoryError er) {
er.printStackTrace();
}
// Only keep this writer if it was successfully created.
if (dataWriter != null) {
dataWriters.add(dataWriter);
}
return dataWriter;
}
/**
* Removes the <code>DataWriter</code> from this <code>Publisher</code>. If the writer was already deleted, or was
* not created by this <code>Publisher</code>, an error is returned.
*
* @param dataWriter The writer to delete.
* @return {@link ReturnCode#OK}if the writer was successfully deleted, otherwise {@link ReturnCode#NOT_ENABLED}if
* this <code>Publisher</code> is not enabled, or {@link ReturnCode#ALREADY_DELETED}if the writer has already been
* deleted, or {@link ReturnCode#PRECONDITION_NOT_MET}if dataWriter was not created by this <code>Publisher</code>.
*/
public ReturnCode deleteDataWriter(DataWriter dataWriter) {
// Check that the Entity is enabled before proceeding (See description of enable on the Entity object in the DDS spec)
if (!isEnabled()) {
return ReturnCode.NOT_ENABLED;
}
// Check that the writer is not already marked as deleted (in case others kept a reference to it
if (dataWriter.isDeleted()) {
return ReturnCode.ALREADY_DELETED;
}
dataWriter.markAsDeleted();
if (dataWriters.remove(dataWriter)) {
return ReturnCode.OK;
} else {
// If the data writer wasn't found, then return PRECONDITION_NOT_MET
return ReturnCode.PRECONDITION_NOT_MET;
}
}
/**
* This method is here for future functionality that is described in the DDS specification but has not been
* implemented or used.
*/
public DataWriter lookupDataWriter(String topicName) {
// UNSURE This method has not been implemented, but is called out in the spec
if (true) {
throw new NotImplementedException();
}
return null;
}
/**
* Suspends published data from being processed (sent to the subscribers) until <code>resumePublications()</code> is
* called.
*
* @return {@link ReturnCode#OK}if publications are successfully suspended, or {@link ReturnCode#NOT_ENABLED}if this
* <code>Publisher</code> has not been enabled.
*/
public ReturnCode suspendPublications() {
// Check that the Entity is enabled before proceeding (See description of enable on the Entity object in the DDS spec)
if (!isEnabled()) {
return ReturnCode.NOT_ENABLED;
}
publicationsAllowed = false;
return ReturnCode.OK;
}
/**
* Resumes publications that were suspended by calling <code>suspendPublications()</code>.
*
* @return {@link ReturnCode#OK}if publications are successfully suspended, or {@link ReturnCode#NOT_ENABLED}if this
* <code>Publisher</code> has not been enabled, or {@link ReturnCode#PRECONDITION_NOT_MET}if publications were not
* suspended.
*/
public ReturnCode resumePublications() {
// Check that the Entity is enabled before proceeding (See description of enable on the Entity object in the DDS spec)
if (!isEnabled()) {
return ReturnCode.NOT_ENABLED;
}
if (publicationsAllowed) {
return ReturnCode.PRECONDITION_NOT_MET;
}
// Move the pending items in to the publishQueue
synchronized (pendingQueue) {
publicationQueue.addAll(pendingQueue);
pendingQueue.clear();
// Now that the queues have been updated, mark publications as allowed
publicationsAllowed = true;
}
publishQueuedData();
return ReturnCode.OK;
}
/**
* This method is here for future functionality that is described in the DDS specification but has not been
* implemented or used.
*/
public ReturnCode beginCoherentChanges() {
// UNSURE This method has not been implemented, but is called out in the spec
if (true) {
throw new NotImplementedException();
}
// Check that the Entity is enabled before proceeding (See description of enable on the Entity object in the DDS spec)
if (!isEnabled()) {
return ReturnCode.NOT_ENABLED;
}
return ReturnCode.ERROR;
}
/**
* This method is here for future functionality that is described in the DDS specification but has not been
* implemented or used.
*/
public ReturnCode endCoherentChanges() {
// UNSURE This method has not been implemented, but is called out in the spec
if (true) {
throw new NotImplementedException();
}
// Check that the Entity is enabled before proceeding (See description of enable on the Entity object in the DDS spec)
if (!isEnabled()) {
return ReturnCode.NOT_ENABLED;
}
return ReturnCode.ERROR;
}
/**
* @return the <code>DomainParticipant</code> to which this belongs.
*/
public DomainParticipant getParticipant() {
// NOTE this method is part of the DDS spec.
return participant;
}
/**
* Deletes all of the <code>DataWriter</code> objects currently in this <code>Publisher</code>.
*
* @return {@link ReturnCode#OK}if the writers were successfully deleted, otherwise {@link ReturnCode#NOT_ENABLED}if
* this <code>Publisher</code> is not enabled.
*/
public ReturnCode deleteContainedEntities() {
// Check that the Entity is enabled before proceeding (See description of enable on the Entity object in the DDS spec)
if (!isEnabled()) {
return ReturnCode.NOT_ENABLED;
}
for (DataWriter writer : dataWriters) {
writer.markAsDeleted();
}
dataWriters.clear();
return ReturnCode.OK;
}
/**
* @return <b>true </b> iff this <code>Publisher</code> currently has any data writers.
*/
public boolean hasDataWriters() {
return !dataWriters.isEmpty();
}
/**
* Allows the MiddlewarePublisher to publish data without providing the <code>Topic</code> object, but instead
* specifying the Topic as a string. This method will only publish data when called by the
* <code>MiddlewarePublisher</code> of the <code>DomainParticipant</code>. This is intended for an outside System to
* be able to publish any data it wants, regardless of if the <code>Topic</code> has been created in the DDS system.
* If a <code>Topic</code> with the provided name does not exist in this publisher's <code>DomainParticipant</code>,
* the data will not be published.
*
* @param theData The Data to be published
* @param namespace The name of the Topic associated with theData
* @throws IllegalStateException When this method is called by any publisher other than the MiddlewarePublisher
*/
public void publishData(Data theData, String namespace, String topic) {
if (theData == null) {
throw new NullPointerException("data cannot be null");
}
if (namespace == null) {
throw new NullPointerException("namespace cannot be null");
}
if (topic == null) {
throw new NullPointerException("topic cannot be null");
}
if (this.participant.getMiddlewarePublisher() == this) {
TopicDescription theTopicDescription = this.participant.lookupTopicDescription(namespace, topic);
if (theTopicDescription != null) {
DataSample dataSample = new DataSample(theData, new SampleInfo());
publishData(null, null, new DataStoreItem(dataSample, theTopicDescription, null));
}
} else {
// This should never be called by anything other than the middleware publisher, so throw an exception
throw new IllegalStateException("Must be the MiddlewarePublisher to call this method!");
}
}
/**
* If threading is enabled and publications are not suspended, places the data on the queue to be processed by the
* publication thread. If threading is not enabled and publications are not suspended, sends the data to the
* <code>DomainParticipant</code> to be sent to be immediately made available to the subscribers. If publications are
* suspended the data is queued so it can be processed once publications are resumed.
*
* @param destination TODO
* @param source TODO
* @param dataStoreItem The published data to be processed
*/
void publishData(IDestination destination, ISource source, DataStoreItem dataStoreItem) { // package scope
// If publications are not allowed, then add the item to the queue, and return
synchronized (pendingQueue) {
if (!publicationsAllowed) {
pendingQueue.add(dataStoreItem);
return;
}
}
// Otherwise, publications are allowed, so process appropriately
participant.processPublishedData(destination, source, dataStoreItem);
}
/**
* Processes data queued either because we are using a publication thread or because publications were suspended and
* have now been resumed. In either case, the data queued is immediately ready to be published and will be completely
* processed even if publications are again suspended before processing has completed.
*/
void publishQueuedData() { // package scope so that the publisher thread can make use of this
// Only process if there are items in the queue - don't do synchronize if not necessary
if (!publicationQueue.isEmpty()) {
synchronized (publicationQueue) {
Iterator<DataStoreItem> iter = publicationQueue.iterator();
while (iter.hasNext()) {
participant.processPublishedData(null, null, iter.next());
}
publicationQueue.clear();
}
}
}
public void dispose() {
if (deleteContainedEntities() == ReturnCode.NOT_ENABLED) {
System.err.println("failed to delete publisher");
}
publicationQueue.clear();
pendingQueue.clear();
}
}