blob: f6212cfd5bbd3b4c45d7a12e6076746092948a4a [file] [log] [blame]
package org.eclipse.emf.henshin.sam.paf;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ArrayPipe<T> implements IPipe<T> {
private T[] data;
private int lastIndex = 0;
private FilterDispatcher filterDispatcher;
private Condition dequeuCond;
private Condition queueCond;
private Lock pipeLock;
private boolean closed = false;
public void clear() {
this.pipeLock.lock();
for (int i = 0; i < this.lastIndex; i++) {
this.data[i] = null;
}
this.lastIndex = 0;
this.pipeLock.unlock();
}
public T dequeue() throws IllegalStateException {
if (this.closed && this.lastIndex == 0) {
throw new IllegalStateException("Pipe is closed and empty!");
}
try {
this.pipeLock.lock();
while (lastIndex == 0) {
try {
this.dequeuCond.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
T result = this.data[this.lastIndex - 1];
this.lastIndex--;
if (this.lastIndex == this.data.length - 1)
this.queueCond.signal();
return result;
} finally {
this.pipeLock.unlock();
}
}
public void queue(T input) {
try {
this.pipeLock.lock();
while (this.lastIndex >= this.data.length) {
try {
this.queueCond.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
this.data[this.lastIndex] = input;
this.lastIndex++;
if (this.lastIndex == 1)
this.dequeuCond.signal();
} finally {
this.pipeLock.unlock();
}
}
public int sizeOfPipe() {
this.pipeLock.lock();
int res = this.lastIndex;
this.pipeLock.unlock();
return res;
}
public FilterDispatcher getFilterDispatcher() {
return this.filterDispatcher;
}
public boolean setFilterDispatcher(FilterDispatcher value) {
boolean changed = false;
if (value != this.filterDispatcher) {
FilterDispatcher oldValue = this.filterDispatcher;
if (oldValue != null) {
this.filterDispatcher = null;
oldValue.removeFromIDispatchable(this);
}
changed = true;
this.filterDispatcher = value;
if (value != null) {
value.addToIDispatchable(this);
}
}
return changed;
}
public void reset() {
this.clear();
}
@SuppressWarnings("unchecked")
public ArrayPipe(int size) {
if (size <= 0) {
throw new IllegalArgumentException(
"the pipe's size must not be less or equal 0");
}
this.data = (T[]) new Object[size];
this.pipeLock = new ReentrantLock();
this.queueCond = this.pipeLock.newCondition();
this.dequeuCond = this.pipeLock.newCondition();
}
public void close() {
this.closed = true;
}
public boolean isClosed() {
return this.closed;
}
private int readers = 0;
public void decrementReaders() {
this.readers--;
}
public void incrementReaders() {
this.readers++;
}
}