blob: 684cfe84957d82d56f77107bae3c031ff8399b88 [file] [log] [blame]
/**********************************************************************
* Copyright (c) 2019 École Polytechnique de Montréal
*
* All rights reserved. This program and the accompanying materials are
* made available under the terms of the Eclipse Public License v1.0 which
* accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
**********************************************************************/
package org.eclipse.tracecompass.incubator.internal.ros.core.analysis.model.messageflow;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.tracecompass.incubator.internal.ros.core.Activator;
import org.eclipse.tracecompass.incubator.internal.ros.core.analysis.AbstractRosStateProvider;
import org.eclipse.tracecompass.incubator.internal.ros.core.analysis.messageflow.TargetMessageInfo;
import org.eclipse.tracecompass.incubator.internal.ros.core.analysis.messageflow.TargetMessageInfo.RosQueueType;
import org.eclipse.tracecompass.incubator.internal.ros.core.analysis.model.connections.RosConnectionEndpoint;
import org.eclipse.tracecompass.incubator.internal.ros.core.analysis.model.messageflow.RosMessageFlowSegment.SegmentType;
import org.eclipse.tracecompass.incubator.internal.ros.core.analysis.model.messagestransport.IRosMessagesTransportModel;
import org.eclipse.tracecompass.incubator.internal.ros.core.analysis.model.messagestransport.RosMessageTransport;
import org.eclipse.tracecompass.statesystem.core.ITmfStateSystem;
import org.eclipse.tracecompass.statesystem.core.exceptions.AttributeNotFoundException;
import org.eclipse.tracecompass.statesystem.core.exceptions.StateSystemDisposedException;
import org.eclipse.tracecompass.statesystem.core.interval.ITmfStateInterval;
/**
* {@link IRosMessageFlowModel} implementation
*
* @author Christophe Bedard
*/
public class RosMessageFlowModel implements IRosMessageFlowModel {
private final @NonNull TargetMessageInfo fInfo;
private final @NonNull ITmfStateSystem fQueuesSs;
private final @NonNull IRosMessagesTransportModel fMsgsTransportModel;
private RosMessageFlowSegment fInitialSegment = null;
private boolean fIsModelDone = false;
/**
* Constructor
*
* @param targetInfo
* the target message info
* @param queuesStateSystem
* the queues state system
* @param msgsTransportModel
* the messages transport model
*/
public RosMessageFlowModel(@NonNull TargetMessageInfo targetInfo, @NonNull ITmfStateSystem queuesStateSystem, @NonNull IRosMessagesTransportModel msgsTransportModel) {
fInfo = targetInfo;
fQueuesSs = queuesStateSystem;
fMsgsTransportModel = msgsTransportModel;
}
@Override
public @Nullable RosMessageFlowSegment getFirstSegment() {
return fInitialSegment;
}
@Override
public boolean isModelDone() {
return fIsModelDone;
}
/**
* Contains necessary basic information to get/process the next segment.
* Serves as a mutable container.
*/
private class NextSegmentInfo {
/** Type of the next segment */
private SegmentType fType = null;
/** Relevant quark for the next segment */
private Integer fQuark = null;
/** Relevant timestamp for the next segment */
private Long fTimestamp = null;
/** The next segment that was created */
private RosMessageFlowSegment fSegment = null;
/** The previous segment that was created */
private RosMessageFlowSegment fPreviousSegment = null;
public NextSegmentInfo() {
}
public SegmentType getType() {
return fType;
}
public void setType(SegmentType type) {
fType = type;
}
public Integer getQuark() {
return fQuark;
}
public void setQuark(int quark) {
fQuark = quark;
}
public Long getTimestamp() {
return fTimestamp;
}
public void setTimestamp(long t) {
fTimestamp = t;
}
public RosMessageFlowSegment getNextSegment() {
return fSegment;
}
public void setNextSegment(RosMessageFlowSegment segment) {
fPreviousSegment = fSegment;
fSegment = segment;
}
public RosMessageFlowSegment getPreviousSegment() {
return fPreviousSegment;
}
}
@Override
public void generateModel() {
try {
process();
fIsModelDone = true;
} catch (AttributeNotFoundException | StateSystemDisposedException e) {
Activator.getInstance().logError("Error while processing! ", e); //$NON-NLS-1$
}
}
private void process() throws AttributeNotFoundException, StateSystemDisposedException {
// Find type of initial segment corresponding to target message
SegmentType type = getSegmentTypeFromQueueType(fInfo.getQueueType());
// Find other necessary info about first segment
int queueQuark = getTargetQueueQuark();
int queuePos = fInfo.getQueuePosition();
int queuePosQuark = fQueuesSs.getQuarkRelative(queueQuark, String.valueOf(queuePos));
NextSegmentInfo next = new NextSegmentInfo();
next.setType(type);
next.setQuark(queuePosQuark);
next.setTimestamp(fInfo.getMsgTimestamp());
while (next.getType() != SegmentType.INVALID) {
// process
switch (next.getType()) {
case PUB_QUEUE:
processPubQueue(next);
break;
case SUB_QUEUE:
processSubQueue(next);
break;
case SUB_CALLBACK:
processCallback(next);
break;
case INVALID:
default:
Activator.getInstance().logError("Case " + next.getType().name() + " should be handled!"); //$NON-NLS-1$ //$NON-NLS-2$
break;
}
// Link previous to current
RosMessageFlowSegment nextSegment = next.getNextSegment();
RosMessageFlowSegment previousSegment = next.getPreviousSegment();
if (previousSegment == null) {
fInitialSegment = nextSegment;
} else {
previousSegment.addNext(nextSegment);
}
}
}
private void processPubQueue(NextSegmentInfo next) throws StateSystemDisposedException, AttributeNotFoundException {
int queuePosQuark = next.getQuark();
long msgStart = next.getTimestamp();
ITmfStateInterval firstState = fQueuesSs.querySingleState(msgStart, queuePosQuark);
ITmfStateInterval lastState = getLastStateOfMessageInQueue(firstState, queuePosQuark);
int queueQuark = fQueuesSs.getParentAttributeQuark(queuePosQuark);
long start = firstState.getStartTime();
long end = lastState.getEndTime();
int topicQuark = fQueuesSs.getParentAttributeQuark(fQueuesSs.getParentAttributeQuark(queuePosQuark));
int nodeQuark = fQueuesSs.getParentAttributeQuark(fQueuesSs.getParentAttributeQuark(topicQuark));
String nodeName = fQueuesSs.getAttributeName(nodeQuark);
String topicName = fQueuesSs.getAttributeName(topicQuark);
RosMessageFlowSegment segment = new RosMessageFlowSegment(start, end, next.getType(), nodeName, topicName);
next.setNextSegment(segment);
QueueSegmentTransition transition = getTransitionFromLastQueueState(lastState, fInfo.getQueueType(), queueQuark);
if (transition == null) {
Activator.getInstance().logError("Could not find transition!"); //$NON-NLS-1$
}
if (transition == null || transition == QueueSegmentTransition.DROP) {
next.setType(SegmentType.INVALID);
} else {
next.setType(SegmentType.SUB_QUEUE);
// Find first state after network
RosMessageTransport transport = fMsgsTransportModel.getNextMessageTransport(end, nodeName, topicName);
if (transport == null) {
Activator.getInstance().logError("Could not find next message transport!"); //$NON-NLS-1$
next.setType(SegmentType.INVALID);
} else {
// Find the subscriber queue from connection
RosConnectionEndpoint sub = transport.getConnection().getSub();
queueQuark = fQueuesSs.getQuarkAbsolute(sub.getNodeName(), AbstractRosStateProvider.SUBSCRIBERS_LIST, sub.getTopicName(), AbstractRosStateProvider.QUEUE);
ITmfStateInterval stateBeforeMsg = getStateBeforeMessageAddedToQueue(queueQuark, transport.getDestinationTimestamp());
int incomingMsgPosQuark = stateBeforeMsg.getAttribute();
long msgStateStartTime = stateBeforeMsg.getEndTime() + 1;
next.setQuark(incomingMsgPosQuark);
next.setTimestamp(msgStateStartTime);
}
}
}
private void processSubQueue(NextSegmentInfo next) throws AttributeNotFoundException, StateSystemDisposedException {
int incomingMsgPosQuark = next.getQuark();
long msgStateStartTime = next.getTimestamp();
ITmfStateInterval firstState = fQueuesSs.querySingleState(msgStateStartTime, incomingMsgPosQuark);
ITmfStateInterval lastState = getLastStateOfMessageInQueue(firstState, incomingMsgPosQuark);
long start = firstState.getStartTime();
long end = lastState.getEndTime();
int topicQuark = fQueuesSs.getParentAttributeQuark(fQueuesSs.getParentAttributeQuark(incomingMsgPosQuark));
int nodeQuark = fQueuesSs.getParentAttributeQuark(fQueuesSs.getParentAttributeQuark(topicQuark));
String nodeName = fQueuesSs.getAttributeName(nodeQuark);
String topicName = fQueuesSs.getAttributeName(topicQuark);
RosMessageFlowSegment segment = new RosMessageFlowSegment(start, end, next.getType(), nodeName, topicName);
next.setNextSegment(segment);
QueueSegmentTransition transition = getTransitionFromLastQueueState(lastState, RosQueueType.SUB, fQueuesSs.getParentAttributeQuark(incomingMsgPosQuark));
if (transition == null) {
Activator.getInstance().logError("Could not find transition!"); //$NON-NLS-1$
}
if (transition == null || transition == QueueSegmentTransition.DROP) {
next.setType(SegmentType.INVALID);
} else {
next.setType(SegmentType.SUB_CALLBACK);
// Callback state info
long callbackStart = lastState.getEndTime() + 1;
// Callbacks are not processed per-topic, but per-node/globally
int subsQuark = fQueuesSs.getParentAttributeQuark(topicQuark);
int callbackQuark = fQueuesSs.getQuarkRelative(subsQuark, AbstractRosStateProvider.CALLBACKS);
next.setQuark(callbackQuark);
next.setTimestamp(callbackStart);
}
}
private void processCallback(NextSegmentInfo next) throws StateSystemDisposedException {
long callbackStart = next.getTimestamp();
int callbackQuark = next.getQuark();
ITmfStateInterval callbackState = fQueuesSs.querySingleState(callbackStart, callbackQuark);
int nodeQuark = fQueuesSs.getParentAttributeQuark(fQueuesSs.getParentAttributeQuark(callbackQuark));
// Find messages added to the pub queues during the callback
List<ITmfStateInterval> pubMsgs = new ArrayList<>();
try {
int pubListQuark = fQueuesSs.getQuarkRelative(nodeQuark, AbstractRosStateProvider.PUBLISHERS_LIST);
List<@NonNull Integer> pubTopicQuarks = fQueuesSs.getSubAttributes(pubListQuark, false);
for (Integer pubTopicQuark : pubTopicQuarks) {
String topicName = fQueuesSs.getAttributeName(pubTopicQuark);
// Rejecting rosout for now
// TODO put back in and support
if (!topicName.contains("rosout")) { //$NON-NLS-1$
int queueQuark = fQueuesSs.getQuarkRelative(pubTopicQuark, AbstractRosStateProvider.QUEUE);
ITmfStateInterval stateBefore = getStateBeforeMessageAddedToQueue(queueQuark, callbackState.getStartTime() - 1);
// If there is a new state within the start/end of the
// callback
if (stateBefore.getEndTime() < callbackState.getEndTime()) {
long newMsgsStart = stateBefore.getEndTime() + 1;
ITmfStateInterval newPubMsg = fQueuesSs.querySingleState(newMsgsStart, stateBefore.getAttribute());
pubMsgs.add(newPubMsg);
}
}
}
} catch (AttributeNotFoundException e) {
// Do nothing, assume it means there are no more messages to be
// found
}
long start = callbackState.getStartTime();
long end;
String nodeName = fQueuesSs.getAttributeName(nodeQuark);
// No specific topic associated with a callback
String topicName = StringUtils.EMPTY;
if (pubMsgs.isEmpty()) {
// No messages found
// Still add a segment for time spent in callback
end = callbackState.getEndTime();
RosMessageFlowSegment segment = new RosMessageFlowSegment(start, end, next.getType(), nodeName, topicName);
next.setNextSegment(segment);
next.setType(SegmentType.INVALID);
} else {
// We do not support non-linear flow graphs, so just warn for now
// and take the first message
// TODO support more than one published message
if (pubMsgs.size() > 1) {
Activator.getInstance().logWarning("Found more than one published message during callback!"); //$NON-NLS-1$
}
ITmfStateInterval pubMsg = pubMsgs.get(0);
end = pubMsg.getStartTime() - 1;
RosMessageFlowSegment segment = new RosMessageFlowSegment(start, end, next.getType(), nodeName, topicName);
next.setNextSegment(segment);
next.setQuark(pubMsg.getAttribute());
next.setTimestamp(pubMsg.getStartTime());
next.setType(SegmentType.PUB_QUEUE);
}
}
/**
* Get the state before a new message is (possibly) added to a queue after a
* specific timestamp
*
* @param queueQuark
* the quark of the queue to consider
* @param initialTimestamp
* the last timestamp before the new message could be added to
* the queue (e.g. 1 ns before)
* @return the state before a new message is (possibly) added to the queue
* @throws StateSystemDisposedException
* @throws AttributeNotFoundException
*/
private ITmfStateInterval getStateBeforeMessageAddedToQueue(int queueQuark, long initialTimestamp) throws StateSystemDisposedException, AttributeNotFoundException {
// Get the size of the queue right when the message
// is about to be added to it
ITmfStateInterval queueState = fQueuesSs.querySingleState(initialTimestamp, queueQuark);
int queueSize = (queueState.getValue() == null) ? 0 : queueState.getValueInt();
// Get the state at that moment; the state of the new message added to
// the queue should be right after, with the same quark/queue position
int incomingMsgPos = queueSize + 1;
int incomingMsgPosQuark = fQueuesSs.getQuarkRelative(queueQuark, String.valueOf(incomingMsgPos));
ITmfStateInterval stateBeforeNewMsg = fQueuesSs.querySingleState(initialTimestamp, incomingMsgPosQuark);
return stateBeforeNewMsg;
}
private static SegmentType getSegmentTypeFromQueueType(RosQueueType queueType) {
switch (queueType) {
case SUB:
return SegmentType.SUB_QUEUE;
case PUB:
return SegmentType.PUB_QUEUE;
default:
Activator.getInstance().logError("Case " + queueType.name() + " should be handled!"); //$NON-NLS-1$ //$NON-NLS-2$
return null;
}
}
/**
* Get the message's next transition after a queue
*
* @param lastState
* the last state of the message in the queue (at position 1)
* @return the next transition, or {@code null} if unknown
* @throws AttributeNotFoundException
* @throws StateSystemDisposedException
*/
private QueueSegmentTransition getTransitionFromLastQueueState(ITmfStateInterval lastState, RosQueueType queueType, int queueQuark) throws StateSystemDisposedException {
// TODO check for "dropped" latched messages in pub queues!
// Check for drop
int topicQuark = fQueuesSs.getParentAttributeQuark(queueQuark);
try {
int dropsQuark = fQueuesSs.getQuarkRelative(topicQuark, AbstractRosStateProvider.DROPS);
ITmfStateInterval possibleDrop = fQueuesSs.querySingleState(lastState.getStartTime(), dropsQuark);
if (possibleDrop.getValue() != null) {
return QueueSegmentTransition.DROP;
}
} catch (AttributeNotFoundException e) {
// Do nothing, we know there's no drop if there is no "drops" quark
}
switch (queueType) {
case PUB:
return QueueSegmentTransition.NETWORK;
case SUB:
return QueueSegmentTransition.CALLBACK;
default:
Activator.getInstance().logError("Case " + queueType.name() + " should be handled!"); //$NON-NLS-1$ //$NON-NLS-2$
return null;
}
}
/**
* Get the last state of a given message in a queue
*
* @param message
* one of the states of the message
* @param queuePosQuark
* the quark corresponding to the message state
* @return the last state of this message in the queue
* @throws AttributeNotFoundException
* @throws StateSystemDisposedException
*/
private ITmfStateInterval getLastStateOfMessageInQueue(ITmfStateInterval message, int queuePosQuark) throws AttributeNotFoundException, StateSystemDisposedException {
ITmfStateInterval msg = message;
int queueQuark = fQueuesSs.getParentAttributeQuark(queuePosQuark);
int pos = Integer.parseInt(fQueuesSs.getAttributeName(queuePosQuark));
long ref = msg.getValueLong();
while (pos > 1) {
pos--;
long nextStartTime = msg.getEndTime() + 1;
int nextQueuePosQuark = fQueuesSs.getQuarkRelative(queueQuark, String.valueOf(pos));
msg = fQueuesSs.querySingleState(nextStartTime, nextQueuePosQuark);
// Check continuity
if (msg.getValueLong() != ref) {
Activator.getInstance().logWarning("References do not match! previous=" + ref + " vs. now=" + msg.getValueLong()); //$NON-NLS-1$ //$NON-NLS-2$
}
ref = msg.getValueLong();
}
return msg;
}
private int getTargetQueueQuark() throws AttributeNotFoundException {
return fQueuesSs.getQuarkAbsolute(fInfo.getNode(), getQueueTypeName(fInfo.getQueueType()), fInfo.getTopic(), AbstractRosStateProvider.QUEUE);
}
/**
* Possible transitions for a message after waiting in a queue
*/
private enum QueueSegmentTransition {
/** Transition to a callback */
CALLBACK,
/** Sent through the network (message write) */
NETWORK,
/** Dropped from the queue */
DROP
}
private static @Nullable String getQueueTypeName(RosQueueType type) {
switch (type) {
case PUB:
return AbstractRosStateProvider.PUBLISHERS_LIST;
case SUB:
return AbstractRosStateProvider.SUBSCRIBERS_LIST;
default:
Activator.getInstance().logError("Case " + type.name() + " should be handled!"); //$NON-NLS-1$ //$NON-NLS-2$
return null;
}
}
}