blob: 6374f97f4203258187bf874895cce65b7f481267 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2012, 2016 Ericsson
* Copyright (c) 2010, 2011 École Polytechnique de Montréal
* Copyright (c) 2010, 2011 Alexandre Montplaisir <alexandre.montplaisir@gmail.com>
*
* All rights reserved. This program and the accompanying materials are
* made available under the terms of the Eclipse Public License 2.0 which
* accompanies this distribution, and is available at
* https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Alexandre Montplaisir - Initial API and implementation
*******************************************************************************/
package org.eclipse.tracecompass.internal.statesystem.core.backend.historytree;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.tracecompass.common.core.collect.BufferedBlockingQueue;
import org.eclipse.tracecompass.common.core.log.TraceCompassLog;
import org.eclipse.tracecompass.common.core.log.TraceCompassLogUtils;
import org.eclipse.tracecompass.internal.provisional.datastore.core.condition.IntegerRangeCondition;
import org.eclipse.tracecompass.internal.provisional.datastore.core.condition.TimeRangeCondition;
import org.eclipse.tracecompass.internal.statesystem.core.Activator;
import org.eclipse.tracecompass.statesystem.core.exceptions.StateSystemDisposedException;
import org.eclipse.tracecompass.statesystem.core.exceptions.TimeRangeException;
import org.eclipse.tracecompass.statesystem.core.interval.ITmfStateInterval;
import org.eclipse.tracecompass.statesystem.core.statevalue.TmfStateValue;
import com.google.common.collect.Iterables;
/**
* Variant of the HistoryTreeBackend which runs all the interval-insertion logic
* in a separate thread.
*
* @author Alexandre Montplaisir
*/
public final class ThreadedHistoryTreeBackend extends HistoryTreeBackend
implements Runnable {
private static final @NonNull Logger LOGGER = TraceCompassLog.getLogger(ThreadedHistoryTreeBackend.class);
private static final int CHUNK_SIZE = 127;
private final @NonNull BufferedBlockingQueue<HTInterval> intervalQueue;
private final @NonNull Thread shtThread;
/**
* The backend tracks its end time separately from the tree, to take into
* consideration intervals in the queue.
*/
private long fEndTime;
/**
* New state history constructor
*
* Note that it usually doesn't make sense to use a Threaded HT if you're
* opening an existing state-file, but you know what you're doing...
*
* @param ssid
* The state system's id
* @param newStateFile
* The name of the history file that will be created. Should end
* in ".ht"
* @param providerVersion
* Version of of the state provider. We will only try to reopen
* existing files if this version matches the one in the
* framework.
* @param startTime
* The earliest timestamp stored in the history
* @param queueSize
* The size of the interval insertion queue. 2000 - 10000 usually
* works well
* @param blockSize
* The size of the blocks in the file
* @param maxChildren
* The maximum number of children allowed for each core node
* @throws IOException
* If there was a problem opening the history file for writing
*/
public ThreadedHistoryTreeBackend(@NonNull String ssid,
File newStateFile,
int providerVersion,
long startTime,
int queueSize,
int blockSize,
int maxChildren)
throws IOException {
super(ssid, newStateFile, providerVersion, startTime, blockSize, maxChildren);
fEndTime = startTime;
intervalQueue = new BufferedBlockingQueue<>(queueSize / CHUNK_SIZE, CHUNK_SIZE);
shtThread = new Thread(this, "History Tree Thread"); //$NON-NLS-1$
shtThread.start();
}
/**
* New State History constructor. This version provides default values for
* blockSize and maxChildren.
*
* @param ssid
* The state system's id
* @param newStateFile
* The name of the history file that will be created. Should end
* in ".ht"
* @param providerVersion
* Version of of the state provider. We will only try to reopen
* existing files if this version matches the one in the
* framework.
* @param startTime
* The earliest timestamp stored in the history
* @param queueSize
* The size of the interval insertion queue. 2000 - 10000 usually
* works well
* @throws IOException
* If there was a problem opening the history file for writing
*/
public ThreadedHistoryTreeBackend(@NonNull String ssid,
File newStateFile,
int providerVersion,
long startTime,
int queueSize)
throws IOException {
super(ssid, newStateFile, providerVersion, startTime);
fEndTime = startTime;
intervalQueue = new BufferedBlockingQueue<>(queueSize / CHUNK_SIZE, CHUNK_SIZE);
shtThread = new Thread(this, "History Tree Thread"); //$NON-NLS-1$
shtThread.start();
}
/*
* The Threaded version does not specify an "existing file" constructor,
* since the history is already built (and we only use the other thread
* during building). Just use a plain HistoryTreeProvider in this case.
*
* TODO but what about streaming??
*/
@Override
public void insertPastState(long stateStartTime, long stateEndTime,
int quark, Object value) throws TimeRangeException {
/*
* Here, instead of directly inserting the elements in the History Tree
* underneath, we'll put them in the Queue. They will then be taken and
* processed by the other thread executing the run() method.
*/
HTInterval interval = new HTInterval(stateStartTime, stateEndTime,
quark, value);
intervalQueue.put(interval);
fEndTime = Math.max(fEndTime, stateEndTime);
}
@Override
public long getEndTime() {
return fEndTime;
}
@Override
public void finishedBuilding(long endTime) {
/*
* We need to commit everything in the History Tree and stop the
* standalone thread before returning to the StateHistorySystem. (SHS
* will then write the Attribute Tree to the file, that must not happen
* at the same time we are writing the last nodes!)
*/
stopRunningThread(endTime);
setFinishedBuilding(true);
return;
}
@Override
public void dispose() {
if (!isFinishedBuilding()) {
stopRunningThread(Long.MAX_VALUE);
}
/*
* isFinishedBuilding remains false, so the superclass will ask the
* back-end to delete the file.
*/
super.dispose();
}
private void stopRunningThread(long endTime) {
if (!shtThread.isAlive()) {
return;
}
/*
* Send a "poison pill" in the queue, then wait for the HT to finish its
* closeTree()
*/
try {
HTInterval pill = new HTInterval(Long.MIN_VALUE, endTime, -1, TmfStateValue.nullValue());
intervalQueue.put(pill);
intervalQueue.flushInputBuffer();
shtThread.join();
} catch (TimeRangeException e) {
Activator.getDefault().logError("Error closing state system", e); //$NON-NLS-1$
} catch (InterruptedException e) {
Activator.getDefault().logError("State system interrupted", e); //$NON-NLS-1$
}
}
@Override
public void run() {
try {
HTInterval currentInterval = intervalQueue.blockingPeek();
while (currentInterval.getStartTime() != Long.MIN_VALUE || currentInterval.getAttribute() != -1) {
/* Send the interval to the History Tree */
getSHT().insertInterval(currentInterval);
/* Actually remove the interval from the queue */
// FIXME Replace with remove() once it is implemented.
intervalQueue.take();
currentInterval = intervalQueue.blockingPeek();
}
/*
* We've been told we're done, let's write down everything and quit.
* The end time of this "signal interval" is actually correct.
*/
getSHT().closeTree(currentInterval.getEndTime());
} catch (TimeRangeException e) {
/* This should not happen */
Activator.getDefault().logError("Error starting the state system", e); //$NON-NLS-1$
}
}
// ------------------------------------------------------------------------
// Query methods
// ------------------------------------------------------------------------
@Override
public void doQuery(List<ITmfStateInterval> currentStateInfo, long t)
throws TimeRangeException, StateSystemDisposedException {
super.doQuery(currentStateInfo, t);
if (isFinishedBuilding()) {
/*
* The history tree is the only place to look for intervals once
* construction is finished.
*/
return;
}
/*
* It is possible we may have missed some intervals due to them being in
* the queue while the query was ongoing. Go over the results to see if
* we missed any.
*/
for (int i = 0; i < currentStateInfo.size(); i++) {
if (currentStateInfo.get(i) == null) {
/* Query the missing interval via "unicast" */
ITmfStateInterval interval = doSingularQuery(t, i);
currentStateInfo.set(i, interval);
}
}
}
@Override
public ITmfStateInterval doSingularQuery(long t, int attributeQuark)
throws TimeRangeException, StateSystemDisposedException {
ITmfStateInterval ret = super.doSingularQuery(t, attributeQuark);
if (ret != null) {
return ret;
}
/*
* We couldn't find the interval in the history tree. It's possible that
* it is currently in the intervalQueue. Look for it there. Note that
* BufferedBlockingQueue's iterator() is thread-safe (no need to lock
* the queue).
*/
for (ITmfStateInterval interval : intervalQueue) {
if (interval.getAttribute() == attributeQuark && interval.intersects(t)) {
return interval;
}
}
/*
* If we missed it again, it's because it got inserted in the tree
* *while we were iterating* on the queue. One last pass in the tree
* should find it.
*
* This case is really rare, which is why we do a second pass at the end
* if needed, instead of systematically checking in the queue first
* (which is slow).
*/
return super.doSingularQuery(t, attributeQuark);
}
@Override
public Iterable<@NonNull ITmfStateInterval> query2D(IntegerRangeCondition quarks, TimeRangeCondition times)
throws TimeRangeException {
try (TraceCompassLogUtils.ScopeLog log = new TraceCompassLogUtils.ScopeLog(LOGGER, Level.FINEST, "ThreadedHistoryTreeBackend:query2D", //$NON-NLS-1$
"ssid", getSSID(), //$NON-NLS-1$
"quarks", quarks, //$NON-NLS-1$
"timeCondition", times)) { //$NON-NLS-1$
/*
* There can still be intervals in the queue, search the
* HistoryTreeBackend, then the queue for the intervals we need.
* Iterables will lazily evaluate the BBQ only once the
* HistoryTreeBackend is consumed and if the construction still
* isn't done.
*/
Iterable<@NonNull HTInterval> queuedIntervals = Iterables.filter(intervalQueue,
interval -> !isFinishedBuilding() && quarks.test(interval.getAttribute())
&& times.intersects(interval.getStartTime(), interval.getEndTime()));
return Iterables.concat(super.query2D(quarks, times), queuedIntervals);
}
}
}