blob: fc4afbde7bb14f91f9e45d0d95922d4735f08241 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2004, 2007 Boeing.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Boeing - initial API and implementation
*******************************************************************************/
package org.eclipse.osee.ote.message.data;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Locale;
import java.util.logging.Level;
import org.eclipse.osee.framework.logging.OseeLog;
import org.eclipse.osee.ote.core.CopyOnWriteNoIteratorList;
import org.eclipse.osee.ote.core.GCHelper;
import org.eclipse.osee.ote.core.log.Env;
import org.eclipse.osee.ote.message.IMessageDisposeListener;
import org.eclipse.osee.ote.message.IMessageHeader;
import org.eclipse.osee.ote.message.IMessageSendListener;
import org.eclipse.osee.ote.message.Message;
import org.eclipse.osee.ote.message.MessageSystemException;
import org.eclipse.osee.ote.message.MessageSystemTestEnvironment;
import org.eclipse.osee.ote.message.enums.DataType;
import org.eclipse.osee.ote.message.interfaces.Namespace;
import org.eclipse.osee.ote.messaging.dds.Data;
import org.eclipse.osee.ote.messaging.dds.DataSample;
import org.eclipse.osee.ote.messaging.dds.IDestination;
import org.eclipse.osee.ote.messaging.dds.ISource;
import org.eclipse.osee.ote.messaging.dds.ReturnCode;
import org.eclipse.osee.ote.messaging.dds.entity.DataReader;
import org.eclipse.osee.ote.messaging.dds.entity.DataWriter;
import org.eclipse.osee.ote.messaging.dds.listener.DataReaderListener;
import org.eclipse.osee.ote.messaging.dds.listener.DataWriterListener;
import org.eclipse.osee.ote.messaging.dds.service.Key;
import org.eclipse.osee.ote.messaging.dds.service.TypeSupport;
import org.eclipse.osee.ote.messaging.dds.status.LivelinessChangedStatus;
import org.eclipse.osee.ote.messaging.dds.status.LivelinessLostStatus;
import org.eclipse.osee.ote.messaging.dds.status.OfferedDeadlineMissedStatus;
import org.eclipse.osee.ote.messaging.dds.status.OfferedIncompatibleQosStatus;
import org.eclipse.osee.ote.messaging.dds.status.PublicationMatchStatus;
import org.eclipse.osee.ote.messaging.dds.status.RequestedDeadlineMissedStatus;
import org.eclipse.osee.ote.messaging.dds.status.RequestedIncompatibleQosStatus;
import org.eclipse.osee.ote.messaging.dds.status.SampleLostStatus;
import org.eclipse.osee.ote.messaging.dds.status.SampleRejectedStatus;
import org.eclipse.osee.ote.messaging.dds.status.SubscriptionMatchStatus;
import org.eclipse.osee.ote.properties.OtePropertiesCore;
/**
* @author Andrew M. Finkbeiner
*/
public abstract class MessageData implements DataReaderListener, DataWriterListener, Data, Key {
private static long debugTimeout = OtePropertiesCore.timeDebugTimeout.getLongValue();
private static boolean debugTime = OtePropertiesCore.timeDebug.getBooleanValue();
private DataWriter writer;
private DataReader reader;
private DataSample myDataSample;
private final MemoryResource mem;
private final String typeName;
private final String name;
private final CopyOnWriteNoIteratorList<Message> messages = new CopyOnWriteNoIteratorList<>(Message.class);
private final CopyOnWriteNoIteratorList<IMessageSendListener> messageSendListeners = new CopyOnWriteNoIteratorList<>(IMessageSendListener.class);
private final int defaultDataByteSize;
private final DataType memType;
private final boolean isEnabled = true;
private long activityCount = 0;
private long sentCount;
private int currentLength;
private boolean isScheduled = false;
private long time = -1;
public MessageData(String typeName, String name, int dataByteSize, int offset, DataType memType) {
mem = new MemoryResource(new byte[dataByteSize], offset, dataByteSize - offset);
myDataSample = new DataSample(this);
this.typeName = typeName;
this.name = name;
this.defaultDataByteSize = dataByteSize;
this.currentLength = dataByteSize;
this.memType = memType;
}
public MessageData(String typeName, String name, MemoryResource mem, DataType memType) {
this.mem = mem;
myDataSample = new DataSample(this);
this.typeName = typeName;
this.name = name;
this.defaultDataByteSize = mem.getLength();
this.currentLength = mem.getLength();
this.memType = memType;
GCHelper.getGCHelper().addRefWatch(this);
}
public MessageData(String name, int dataByteSize, int offset, DataType memType) {
this(name, name, dataByteSize, offset, memType);
}
public MessageData(byte[] data, int dataByteSize, int offset) {
this.mem = new MemoryResource(data, offset, dataByteSize - offset);
this.typeName = "";
this.name = "";
this.defaultDataByteSize = dataByteSize;
this.currentLength = dataByteSize;
this.memType = null;
GCHelper.getGCHelper().addRefWatch(this);
}
public MessageData(MemoryResource memoryResource) {
this("", memoryResource);
}
public MessageData(String name, MemoryResource memoryResource) {
this.mem = memoryResource;
this.typeName = "";
this.name = name;
this.defaultDataByteSize = memoryResource.getLength();
this.currentLength = memoryResource.getLength();
this.memType = null;
GCHelper.getGCHelper().addRefWatch(this);
}
public abstract IMessageHeader getMsgHeader();
public DataType getType() {
return memType;
}
/**
* Returns the number of byte words in the payload of this message.
*
* @return the number of bytes in the message payload
*/
public int getPayloadSize() {
return currentLength;
}
public String getName() {
return name;
}
/**
* adds a {@link Message} who are mapped to this data object
*/
public void addMessage(Message message) {
if (!messages.contains(message)) {
messages.add(message);
message.addPreMessageDisposeListener(disposeListener);
}
}
/**
* returns a list of the message that this data is a source for. <BR>
*
* @return a collection of messages
*/
public Collection<Message> getMessages() {
return messages.fillCollection(new ArrayList<Message>());
// return new ArrayList<Message>(messages);
}
/**
* @return Returns the activityCount.
*/
public long getActivityCount() {
return activityCount;
}
/**
* @param activityCount The activityCount to set.
*/
public void setActivityCount(long activityCount) {
this.activityCount = activityCount;
}
public void incrementActivityCount() {
activityCount++;
}
public void incrementSentCount() {
sentCount++;
}
public long getSentCount() {
return sentCount;
}
public boolean isEnabled() {
return isEnabled;
}
public abstract void visit(IMessageDataVisitor visitor);
public void dispose() {
try{
Message[] msgs = messages.get();
for (int i = 0; i < msgs.length; i++){
Message local = msgs[i];
if(local != null){
local.removePreMessageDisposeListener(disposeListener);
}
}
} catch (Throwable th){
OseeLog.log(getClass(), Level.SEVERE, "failed to remove message dispose listener.", th);
}
messages.clear();
if (writer != null) {
writer.getPublisher().deleteDataWriter(writer);
writer.dispose(this, null);
writer = null;
} else if (reader != null && reader.getSubscriber() != null) {
reader.getSubscriber().deleteDataReader(reader);
reader.dispose();
reader = null;
}
disposeListener = null;
}
public void copyData(int destOffset, byte[] data, int srcOffset, int length) {
setCurrentLength(length + destOffset);
mem.copyData(destOffset, data, srcOffset, length);
}
public void copyData(int destOffset, ByteBuffer data, int length) throws MessageSystemException {
try {
setCurrentLength(destOffset + length);
mem.copyData(destOffset, data, length);
} catch (MessageSystemException ex) {
OseeLog.logf(MessageSystemTestEnvironment.class, Level.INFO, ex,
"increasing backing store for %s to %d. prev length: %d, recv cnt: %d", getName(), destOffset + length,
mem.getData().length, this.activityCount);
setNewBackingBuffer(data, destOffset, length);
}
}
public void copyData(ByteBuffer data) {
copyData(0, data, data.remaining());
}
/**
* Notifies all {@link Message}s that have this registered as a data source of the update
*/
public void notifyListeners() throws MessageSystemException {
final DataType memType = getType();
Message[] ref = messages.get();
for (int i = 0; i < ref.length; i++) {
Message message = ref[i];
try {
if (!message.isDestroyed()) {
message.notifyListeners(this, memType);
}
} catch (Throwable t) {
final String msg =
String.format("Problem during listener notification for message %s. Data=%s, MemType=%s",
message.getName(), this.getName(), this.getType());
OseeLog.log(MessageSystemTestEnvironment.class, Level.SEVERE, msg, t);
}
}
}
/**
* @return the currentLength
*/
public int getCurrentLength() {
return currentLength;
}
/**
* @param currentLength the currentLength to set
*/
public void setCurrentLength(int currentLength) {
this.currentLength = currentLength;
}
/**
* Override this method if you need to set some default data in the backing buffer.
*/
public void setNewBackingBuffer(byte[] data) {
setCurrentLength(data.length);
this.mem.setData(data);
if (this.getMsgHeader() != null) {
initializeDefaultHeaderValues();
} else {
// System.out.println("what??-- bad HeaderData");
}
}
public void setNewBackingBuffer(ByteBuffer buffer) {
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
this.mem.setData(data);
setCurrentLength(data.length);
if (this.getMsgHeader() != null) {
initializeDefaultHeaderValues();
} else {
// System.out.println("what??-- bad HeaderData");
}
}
public void setNewBackingBuffer(ByteBuffer buffer, int offset, int length) {
byte[] data = new byte[offset + length];
buffer.get(data, offset, length);
this.mem.setData(data);
setCurrentLength(data.length);
if (this.getMsgHeader() != null) {
initializeDefaultHeaderValues();
} else {
// System.out.println("what??-- bad HeaderData");
}
}
public abstract void initializeDefaultHeaderValues();
/**
* @return the mem
*/
public MemoryResource getMem() {
return mem;
}
public int getDefaultDataByteSize() {
return defaultDataByteSize;
}
@Override
public synchronized void onDataAvailable(DataReader theReader) {
// System.out.println(String.format("data available %s %s", this.getName(),
// this.getNamespace()));
if (isEnabled()) {
ReturnCode val = theReader.takeNextSample(myDataSample);
if (val == ReturnCode.OK) {
incrementActivityCount();
notifyListeners();
} else {
Env.getInstance().severe(val.getDescription());
}
}
}
@Override
public void onLivelinessChanged(DataReader theReader, LivelinessChangedStatus status) {
}
@Override
public void onRequestedDeadlineMissed(DataReader theReader, RequestedDeadlineMissedStatus status) {
}
@Override
public void onRequestedIncompatibleQos(DataReader theReader, RequestedIncompatibleQosStatus status) {
}
@Override
public void onSampleLost(DataReader theReader, SampleLostStatus status) {
}
@Override
public void onSampleRejected(DataReader theReader, SampleRejectedStatus status) {
}
@Override
public void onSubscriptionMatch(DataReader theReader, SubscriptionMatchStatus status) {
}
@Override
public synchronized void onDataSentToMiddleware(DataWriter theWriter) {
// header.setSequenceNumber(header.getSequenceNumber() + 1);
notifyListeners();
}
@Override
public void onLivelinessLost(DataWriter theWriter, LivelinessLostStatus status) {
}
@Override
public void onOfferedDeadlineMissed(DataWriter theWriter, OfferedDeadlineMissedStatus status) {
}
@Override
public void onOfferedIncompatibleQos(DataWriter theWriter, OfferedIncompatibleQosStatus status) {
}
@Override
public void onPublicationMatch(DataWriter theWriter, PublicationMatchStatus status) {
}
@Override
public Object getKeyValue() {
return null;
}
@Override
public void setFromByteArray(byte[] input) {
try {
copyData(0, input, 0, input.length);
} catch (MessageSystemException ex) {
OseeLog.logf(MessageSystemTestEnvironment.class, Level.WARNING,
"Copy Failed: setting new backing buffer. msg[%s], oldSize[%d] newSize[%d]", this.getName(),
this.mem.getData().length, input.length);
setNewBackingBuffer(input);
}
}
@Override
public void setFromByteBuffer(ByteBuffer buffer) {
try {
copyData(buffer);
} catch (Exception e) {
OseeLog.logf(MessageSystemTestEnvironment.class, Level.SEVERE,
"Copy Failed: setting new backing buffer. msg[%s], oldSize[%d] newSize[%d]", this.getName(),
this.mem.getData().length, buffer.limit());
setNewBackingBuffer(buffer);
}
}
@Override
public ByteBuffer toByteBuffer() {
return mem.getAsBuffer();
}
public void setFromByteArray(byte[] input, int length) {
try {
copyData(0, input, 0, length);
} catch (MessageSystemException ex) {
OseeLog.logf(MessageSystemTestEnvironment.class, Level.SEVERE,
"Copy Failed: setting new backing buffer. msg[%s], oldSize[%d] newSize[%d]", this.getName(),
this.mem.getData().length, length);
setNewBackingBuffer(input);
}
}
public void setFromByteArray(int destOffset, byte[] input, int srcOffset, int length) {
try {
copyData(destOffset, input, srcOffset, length);
} catch (MessageSystemException ex) {
OseeLog.logf(MessageSystemTestEnvironment.class, Level.SEVERE,
"Copy Failed: setting new backing buffer. msg[%s], oldSize[%d] newSize[%d]", this.getName(),
this.mem.getData().length, length);
setNewBackingBuffer(input);
}
}
public void setFromByteArray(ByteBuffer input, int length) {
try {
copyData(0, input, length);
} catch (MessageSystemException ex) {
OseeLog.logf(MessageSystemTestEnvironment.class, Level.SEVERE,
"Copy Failed: setting new backing buffer. msg[%s], oldSize[%d] newSize[%d]", this.getName(),
this.mem.getData().length, length);
setNewBackingBuffer(input);
}
}
@Override
public byte[] toByteArray() {
return mem.getData();
}
public void setReader(DataReader reader) {
this.reader = reader;
}
public void setWriter(DataWriter writer) {
this.writer = writer;
}
public void send() throws MessageSystemException {
if (writer == null) {
OseeLog.log(MessageSystemTestEnvironment.class, Level.SEVERE, getName() + " - the writer is null");
} else if (shouldSendData()) {
try {
notifyPreSendListeners();
long start = 0, elapsed;
if(debugTime){
start = System.nanoTime();
}
getMem().setDataHasChanged(false);
writer.write(null, null, this, null);
incrementSentCount();
if(debugTime){
elapsed = System.nanoTime() - start;
if(elapsed > debugTimeout){
Locale.setDefault(Locale.US);
System.out.printf("%s SLOW IOSEND %,d\n", getName(), elapsed);
}
}
notifyPostSendListeners();
} catch (Throwable ex) {
throw new MessageSystemException("Could not send message data " + getName(), Level.SEVERE, ex);
}
}
}
protected void sendTo(IDestination destination, ISource source) throws MessageSystemException {
if (writer == null) {
OseeLog.log(MessageSystemTestEnvironment.class, Level.WARNING, getName() + " - the writer is null");
} else if (shouldSendData()) {
try {
notifyPreSendListeners();
// this.initializeDefaultHeaderValues();
getMem().setDataHasChanged(false);
writer.write(destination, source, this, null);
incrementSentCount();
notifyPostSendListeners();
} catch (Throwable ex) {
throw new MessageSystemException("Could not send message data " + getName(), Level.SEVERE, ex);
}
}
}
/**
* Override this method if you want to specialize the send criteria in a data source. For example, if you only want
* to send data to the MUX driver if the data has changed.
*/
protected boolean shouldSendData() {
return true;
}
public TypeSupport getTypeSupport() {
return new DDSTypeSupport(this, getName(), getName(), getPayloadSize());
}
public String getTopicName() {
return getName();
}
public String getTypeName() {
return typeName;
}
@Override
public boolean isSameInstance(byte[] data1, byte[] data2) {
return true;
}
public Namespace getNamespace() {
if (isWriter()) {
return new Namespace(writer.getTopic().getNamespace());
} else {
return new Namespace(reader.getTopicDescription().getNamespace());
}
}
/*
* each type that extends DDSData needs to have it's own namespace.... we need to go through each DDSData child and
* determine all of it's possible namespaces
*/
public boolean isWriter() {
if (writer != null && reader == null) {
return true;
} else if (writer == null && reader != null) {
return false;
} else {
throw new MessageSystemException(
"This is an illegal message it has neither a reader or a writer [" + this.getName() + "].", Level.SEVERE);
}
}
private IMessageDisposeListener disposeListener = new IMessageDisposeListener() {
@Override
public void onPreDispose(Message message) {
messages.remove(message);
}
@Override
public void onPostDispose(Message message) {
}
};
@Override
public void copyFrom(Data data) {
ByteBuffer buffer = data.toByteBuffer();
copyData(data.getOffset(), buffer, buffer.remaining());
}
@Override
public String toString() {
return getClass().getName() + ": name=" + getName();
}
@Override
public int getOffset() {
return 0;
}
/**
* @return the isScheduled
*/
public boolean isScheduled() {
return isScheduled;
}
/**
* @param isScheduled the isScheduled to set
*/
public void setScheduled(boolean isScheduled) {
this.isScheduled = isScheduled;
}
private void notifyPostSendListeners() {
try {
long start = 0, elapsed;
IMessageSendListener[] listeners = messageSendListeners.get();
for (int i = 0; i < listeners.length; i++) {
IMessageSendListener listener = listeners[i];
if(debugTime){
start = System.nanoTime();
}
listener.onPostSend(this);
if(debugTime){
elapsed = System.nanoTime() - start;
if(elapsed > debugTimeout){
Locale.setDefault(Locale.US);
System.out.printf("%s %s SLOW POST SEND %,d\n", getName(), listener.getClass().getName(), elapsed);
}
}
}
} catch (Exception ex) {
OseeLog.log(Message.class, Level.SEVERE, ex);
}
}
private void notifyPreSendListeners() {
try {
long start = 0, elapsed;
IMessageSendListener[] listeners = messageSendListeners.get();
for (int i = 0; i < listeners.length; i++) {
IMessageSendListener listener = listeners[i];
if(debugTime){
start = System.nanoTime();
}
listener.onPreSend(this);
if(debugTime){
elapsed = System.nanoTime() - start;
if(elapsed > debugTimeout){
Locale.setDefault(Locale.US);
System.out.printf("%s %s SLOW PRE SEND %,d\n", getName(), listener.getClass().getName(), elapsed);
}
}
}
} catch (Exception ex) {
OseeLog.log(Message.class, Level.SEVERE, ex);
}
}
public void addSendListener(IMessageSendListener listener) {
messageSendListeners.add(listener);
}
public void removeSendListener(IMessageSendListener listener) {
messageSendListeners.remove(listener);
}
public boolean containsSendListener(IMessageSendListener listener) {
return messageSendListeners.contains(listener);
}
public boolean isMessageCollectionNotEmpty() {
return messages.get().length > 0;
}
public void zeroize() {
final byte[] data = toByteArray();
Arrays.fill(data, getMsgHeader().getHeaderSize(), data.length, (byte) 0);
}
/**
* A time value associated with this message.
* The time value will have different meanings or may not be used depending on the context and usage.
*/
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
}