| /******************************************************************************* |
| * Copyright (c) 2012, 2016 Ericsson |
| * Copyright (c) 2010, 2011 École Polytechnique de Montréal |
| * Copyright (c) 2010, 2011 Alexandre Montplaisir <alexandre.montplaisir@gmail.com> |
| * |
| * All rights reserved. This program and the accompanying materials are |
| * made available under the terms of the Eclipse Public License 2.0 which |
| * accompanies this distribution, and is available at |
| * https://www.eclipse.org/legal/epl-2.0/ |
| * |
| * SPDX-License-Identifier: EPL-2.0 |
| * |
| * Contributors: |
| * Alexandre Montplaisir - Initial API and implementation |
| *******************************************************************************/ |
| |
| package org.eclipse.tracecompass.internal.statesystem.core.backend.historytree; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.List; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| |
| import org.eclipse.jdt.annotation.NonNull; |
| import org.eclipse.tracecompass.common.core.collect.BufferedBlockingQueue; |
| import org.eclipse.tracecompass.common.core.log.TraceCompassLog; |
| import org.eclipse.tracecompass.common.core.log.TraceCompassLogUtils; |
| import org.eclipse.tracecompass.internal.provisional.datastore.core.condition.IntegerRangeCondition; |
| import org.eclipse.tracecompass.internal.provisional.datastore.core.condition.TimeRangeCondition; |
| import org.eclipse.tracecompass.internal.statesystem.core.Activator; |
| import org.eclipse.tracecompass.statesystem.core.exceptions.StateSystemDisposedException; |
| import org.eclipse.tracecompass.statesystem.core.exceptions.TimeRangeException; |
| import org.eclipse.tracecompass.statesystem.core.interval.ITmfStateInterval; |
| import org.eclipse.tracecompass.statesystem.core.statevalue.TmfStateValue; |
| |
| import com.google.common.collect.Iterables; |
| |
| /** |
| * Variant of the HistoryTreeBackend which runs all the interval-insertion logic |
| * in a separate thread. |
| * |
| * @author Alexandre Montplaisir |
| */ |
| public final class ThreadedHistoryTreeBackend extends HistoryTreeBackend |
| implements Runnable { |
| |
| private static final @NonNull Logger LOGGER = TraceCompassLog.getLogger(ThreadedHistoryTreeBackend.class); |
| |
| private static final int CHUNK_SIZE = 127; |
| private final @NonNull BufferedBlockingQueue<HTInterval> intervalQueue; |
| private final @NonNull Thread shtThread; |
| /** |
| * The backend tracks its end time separately from the tree, to take into |
| * consideration intervals in the queue. |
| */ |
| private long fEndTime; |
| |
| /** |
| * New state history constructor |
| * |
| * Note that it usually doesn't make sense to use a Threaded HT if you're |
| * opening an existing state-file, but you know what you're doing... |
| * |
| * @param ssid |
| * The state system's id |
| * @param newStateFile |
| * The name of the history file that will be created. Should end |
| * in ".ht" |
| * @param providerVersion |
| * Version of of the state provider. We will only try to reopen |
| * existing files if this version matches the one in the |
| * framework. |
| * @param startTime |
| * The earliest timestamp stored in the history |
| * @param queueSize |
| * The size of the interval insertion queue. 2000 - 10000 usually |
| * works well |
| * @param blockSize |
| * The size of the blocks in the file |
| * @param maxChildren |
| * The maximum number of children allowed for each core node |
| * @throws IOException |
| * If there was a problem opening the history file for writing |
| */ |
| public ThreadedHistoryTreeBackend(@NonNull String ssid, |
| File newStateFile, |
| int providerVersion, |
| long startTime, |
| int queueSize, |
| int blockSize, |
| int maxChildren) |
| throws IOException { |
| super(ssid, newStateFile, providerVersion, startTime, blockSize, maxChildren); |
| fEndTime = startTime; |
| |
| intervalQueue = new BufferedBlockingQueue<>(queueSize / CHUNK_SIZE, CHUNK_SIZE); |
| shtThread = new Thread(this, "History Tree Thread"); //$NON-NLS-1$ |
| shtThread.start(); |
| } |
| |
| /** |
| * New State History constructor. This version provides default values for |
| * blockSize and maxChildren. |
| * |
| * @param ssid |
| * The state system's id |
| * @param newStateFile |
| * The name of the history file that will be created. Should end |
| * in ".ht" |
| * @param providerVersion |
| * Version of of the state provider. We will only try to reopen |
| * existing files if this version matches the one in the |
| * framework. |
| * @param startTime |
| * The earliest timestamp stored in the history |
| * @param queueSize |
| * The size of the interval insertion queue. 2000 - 10000 usually |
| * works well |
| * @throws IOException |
| * If there was a problem opening the history file for writing |
| */ |
| public ThreadedHistoryTreeBackend(@NonNull String ssid, |
| File newStateFile, |
| int providerVersion, |
| long startTime, |
| int queueSize) |
| throws IOException { |
| super(ssid, newStateFile, providerVersion, startTime); |
| fEndTime = startTime; |
| |
| intervalQueue = new BufferedBlockingQueue<>(queueSize / CHUNK_SIZE, CHUNK_SIZE); |
| shtThread = new Thread(this, "History Tree Thread"); //$NON-NLS-1$ |
| shtThread.start(); |
| } |
| |
| /* |
| * The Threaded version does not specify an "existing file" constructor, |
| * since the history is already built (and we only use the other thread |
| * during building). Just use a plain HistoryTreeProvider in this case. |
| * |
| * TODO but what about streaming?? |
| */ |
| |
| @Override |
| public void insertPastState(long stateStartTime, long stateEndTime, |
| int quark, Object value) throws TimeRangeException { |
| /* |
| * Here, instead of directly inserting the elements in the History Tree |
| * underneath, we'll put them in the Queue. They will then be taken and |
| * processed by the other thread executing the run() method. |
| */ |
| HTInterval interval = new HTInterval(stateStartTime, stateEndTime, |
| quark, value); |
| intervalQueue.put(interval); |
| fEndTime = Math.max(fEndTime, stateEndTime); |
| } |
| |
| @Override |
| public long getEndTime() { |
| return fEndTime; |
| } |
| |
| @Override |
| public void finishedBuilding(long endTime) { |
| /* |
| * We need to commit everything in the History Tree and stop the |
| * standalone thread before returning to the StateHistorySystem. (SHS |
| * will then write the Attribute Tree to the file, that must not happen |
| * at the same time we are writing the last nodes!) |
| */ |
| |
| stopRunningThread(endTime); |
| setFinishedBuilding(true); |
| return; |
| } |
| |
| @Override |
| public void dispose() { |
| if (!isFinishedBuilding()) { |
| stopRunningThread(Long.MAX_VALUE); |
| } |
| /* |
| * isFinishedBuilding remains false, so the superclass will ask the |
| * back-end to delete the file. |
| */ |
| super.dispose(); |
| } |
| |
| private void stopRunningThread(long endTime) { |
| if (!shtThread.isAlive()) { |
| return; |
| } |
| |
| /* |
| * Send a "poison pill" in the queue, then wait for the HT to finish its |
| * closeTree() |
| */ |
| try { |
| HTInterval pill = new HTInterval(Long.MIN_VALUE, endTime, -1, TmfStateValue.nullValue()); |
| intervalQueue.put(pill); |
| intervalQueue.flushInputBuffer(); |
| shtThread.join(); |
| } catch (TimeRangeException e) { |
| Activator.getDefault().logError("Error closing state system", e); //$NON-NLS-1$ |
| } catch (InterruptedException e) { |
| Activator.getDefault().logError("State system interrupted", e); //$NON-NLS-1$ |
| } |
| } |
| |
| @Override |
| public void run() { |
| try { |
| HTInterval currentInterval = intervalQueue.blockingPeek(); |
| while (currentInterval.getStartTime() != Long.MIN_VALUE || currentInterval.getAttribute() != -1) { |
| /* Send the interval to the History Tree */ |
| getSHT().insertInterval(currentInterval); |
| /* Actually remove the interval from the queue */ |
| // FIXME Replace with remove() once it is implemented. |
| intervalQueue.take(); |
| currentInterval = intervalQueue.blockingPeek(); |
| } |
| /* |
| * We've been told we're done, let's write down everything and quit. |
| * The end time of this "signal interval" is actually correct. |
| */ |
| getSHT().closeTree(currentInterval.getEndTime()); |
| } catch (TimeRangeException e) { |
| /* This should not happen */ |
| Activator.getDefault().logError("Error starting the state system", e); //$NON-NLS-1$ |
| } |
| } |
| |
| // ------------------------------------------------------------------------ |
| // Query methods |
| // ------------------------------------------------------------------------ |
| |
| @Override |
| public void doQuery(List<ITmfStateInterval> currentStateInfo, long t) |
| throws TimeRangeException, StateSystemDisposedException { |
| super.doQuery(currentStateInfo, t); |
| |
| if (isFinishedBuilding()) { |
| /* |
| * The history tree is the only place to look for intervals once |
| * construction is finished. |
| */ |
| return; |
| } |
| |
| /* |
| * It is possible we may have missed some intervals due to them being in |
| * the queue while the query was ongoing. Go over the results to see if |
| * we missed any. |
| */ |
| for (int i = 0; i < currentStateInfo.size(); i++) { |
| if (currentStateInfo.get(i) == null) { |
| /* Query the missing interval via "unicast" */ |
| ITmfStateInterval interval = doSingularQuery(t, i); |
| currentStateInfo.set(i, interval); |
| } |
| } |
| } |
| |
| @Override |
| public ITmfStateInterval doSingularQuery(long t, int attributeQuark) |
| throws TimeRangeException, StateSystemDisposedException { |
| ITmfStateInterval ret = super.doSingularQuery(t, attributeQuark); |
| if (ret != null) { |
| return ret; |
| } |
| |
| /* |
| * We couldn't find the interval in the history tree. It's possible that |
| * it is currently in the intervalQueue. Look for it there. Note that |
| * BufferedBlockingQueue's iterator() is thread-safe (no need to lock |
| * the queue). |
| */ |
| for (ITmfStateInterval interval : intervalQueue) { |
| if (interval.getAttribute() == attributeQuark && interval.intersects(t)) { |
| return interval; |
| } |
| } |
| |
| /* |
| * If we missed it again, it's because it got inserted in the tree |
| * *while we were iterating* on the queue. One last pass in the tree |
| * should find it. |
| * |
| * This case is really rare, which is why we do a second pass at the end |
| * if needed, instead of systematically checking in the queue first |
| * (which is slow). |
| */ |
| return super.doSingularQuery(t, attributeQuark); |
| } |
| |
| @Override |
| public Iterable<@NonNull ITmfStateInterval> query2D(IntegerRangeCondition quarks, TimeRangeCondition times) |
| throws TimeRangeException { |
| try (TraceCompassLogUtils.ScopeLog log = new TraceCompassLogUtils.ScopeLog(LOGGER, Level.FINEST, "ThreadedHistoryTreeBackend:query2D", //$NON-NLS-1$ |
| "ssid", getSSID(), //$NON-NLS-1$ |
| "quarks", quarks, //$NON-NLS-1$ |
| "timeCondition", times)) { //$NON-NLS-1$ |
| /* |
| * There can still be intervals in the queue, search the |
| * HistoryTreeBackend, then the queue for the intervals we need. |
| * Iterables will lazily evaluate the BBQ only once the |
| * HistoryTreeBackend is consumed and if the construction still |
| * isn't done. |
| */ |
| Iterable<@NonNull HTInterval> queuedIntervals = Iterables.filter(intervalQueue, |
| interval -> !isFinishedBuilding() && quarks.test(interval.getAttribute()) |
| && times.intersects(interval.getStartTime(), interval.getEndTime())); |
| return Iterables.concat(super.query2D(quarks, times), queuedIntervals); |
| } |
| } |
| } |