blob: a09f096ff181803ed608a3b202b879b1e8838b36 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2004, 2007 Boeing.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Boeing - initial API and implementation
*******************************************************************************/
package org.eclipse.osee.ote.client.msg.core.internal;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.logging.Level;
import org.eclipse.osee.framework.logging.OseeLog;
import org.eclipse.osee.ote.client.msg.core.db.AbstractMessageDataBase;
import org.eclipse.osee.ote.client.msg.core.db.MessageInstance;
import org.eclipse.osee.ote.message.data.MessageData;
import org.eclipse.osee.ote.message.enums.DataType;
/**
* Handles processing of updates from a channel. The channel processor maintains an internal queue whose max size
* dictates the maximum number of concurrent updates. All updates are submitted to the given thread pool for execution.
*
* @author Ken J. Aguilar
*/
final public class ChannelProcessor {
private final ArrayBlockingQueue<Task> queue;
private final ExecutorService threadPool;
private final AbstractMessageDataBase msgDb;
private final DataType memType;
/**
* A task allows each channel to have multiple updates processed concurrently. Each task has its own buffers.
*
* @author Ken J. Aguilar
*/
private final class Task implements Runnable {
private final ByteBuffer buffer;
public Task(ByteBuffer buffer) {
this.buffer = buffer;
}
public void prepTask(ReadableByteChannel channel) throws IOException {
buffer.clear();
// read the data from the channel into the buffer
channel.read(buffer);
buffer.flip();
}
@Override
public void run() {
try {
final int id = buffer.getInt();
final long time = buffer.getLong();
final MessageInstance instance = msgDb.findById(id);
if (instance != null) {
onUpdate(instance, buffer, time);
}
// return to the queue
queue.put(this);
} catch (InterruptedException e) {
// do nothing
} catch (Exception ex) {
OseeLog.log(Activator.class, Level.SEVERE, "failed to process message update", ex);
}
}
}
public ChannelProcessor(int depth, int bufferSize, ExecutorService threadPool, AbstractMessageDataBase msgDb, DataType memType) {
this.queue = new ArrayBlockingQueue<>(depth);
try {
// fill the queue with pre-allocated tasks
for (int i = 0; i < depth; i++) {
queue.put(new Task(ByteBuffer.allocateDirect(bufferSize)));
}
} catch (InterruptedException ex) {
throw new Error("should never happen", ex);
}
this.threadPool = threadPool;
this.msgDb = msgDb;
this.memType = memType;
}
public final void process(final ReadableByteChannel channel) throws InterruptedException, IOException {
// get a free task
final Task task = queue.take();
// prep the task
task.prepTask(channel);
// the task is now ready for execution, submit it to the thread pool
threadPool.submit(task);
}
/**
* called when there is data to be processed from a channel. Can be called by one or more threads for the same data
* concurrently to so implementors need to be thread safe
*/
protected void onUpdate(MessageInstance instance, ByteBuffer buffer, long time) {
MessageData msgData = instance.getMessage().getActiveDataSource(memType);
if (msgData != null) {
msgData.setTime(time);
byte[] data = msgData.getMem().getData();
int remaining = buffer.remaining();
if (data.length < remaining) {
OseeLog.logf(Activator.class, Level.WARNING,
"Message [%s] changed it's backing data size from [%d] to [%d].", instance.getMessage().getName(),
data.length, remaining);
data = new byte[remaining];
buffer.get(data, 0, remaining);
msgData.setNewBackingBuffer(data);
msgData.incrementActivityCount();
msgData.notifyListeners();
return;
}
if (remaining < data.length) {
Arrays.fill(data, remaining, data.length, (byte) 0);
// msg.getActiveDataSource().setCurrentLength(remaining);
}
buffer.get(data, 0, remaining);
msgData.setCurrentLength(remaining);
msgData.incrementActivityCount();
msgData.notifyListeners();
}
}
}