blob: 72d78115286991bd6d0402c0393fa0e54987411f [file] [log] [blame]
package org.eclipse.remote.proxy.tests;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.eclipse.remote.proxy.core.ChannelMultiplexer;
import org.eclipse.remote.proxy.core.MultiplexedChannel;
import org.eclipse.remote.proxy.core.ChannelMultiplexer.IChannelListener;
import junit.framework.TestCase;
public class MultiplexTests extends TestCase {
private static final int NUM_CHANS = 1;
private static final int FINISH = -1;
private class ChanReader implements Runnable {
private DataInputStream in;
private List<Integer> recvBufs;
private String name;
public ChanReader(MultiplexedChannel chan, List<Integer> recvBufs, String name) {
this.in = new DataInputStream(chan.getInputStream());
this.recvBufs = recvBufs;
this.name = name;
}
public String getName() {
return name;
}
@Override
public void run() {
try {
synchronized (MultiplexTests.this) {
System.out.println(getName() + " started");
}
try {
while (true) {
int val = in.readInt();
if (val == FINISH) {
break;
}
recvBufs.add(val);
}
} catch (EOFException e) {
// Finish
}
synchronized (MultiplexTests.this) {
System.out.println(getName() + " finished");
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private class ChanWriter implements Runnable {
private DataOutputStream out;
private List<Integer> sentBufs;
private Random r = new Random();
private String name;
public ChanWriter(MultiplexedChannel chan, List<Integer> sentBufs, String name) {
this.out = new DataOutputStream(chan.getOutputStream());
this.sentBufs = sentBufs;
this.name = name;
}
public String getName() {
return name;
}
@Override
public void run() {
try{
synchronized (MultiplexTests.this) {
System.out.println(getName() + " started");
}
for (int i = 0; i < 100; i++) {
int val = r.nextInt(1024);
out.writeInt(val);
out.flush();
sentBufs.add(val);
try {
Thread.sleep(r.nextInt(100));
} catch (InterruptedException e) {
fail(e.getMessage());
}
}
out.writeInt(FINISH);
out.flush();
synchronized (MultiplexTests.this) {
System.out.println(getName() + " finished");
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private class ChanReaderWriter implements Runnable {
private DataInputStream in;
private DataOutputStream out;
private String name;
public ChanReaderWriter(MultiplexedChannel chan, String name) {
this.in = new DataInputStream(chan.getInputStream());
this.out = new DataOutputStream(chan.getOutputStream());
this.name = name;
}
public String getName() {
return name;
}
@Override
public void run() {
try{
synchronized (MultiplexTests.this) {
System.out.println(getName() + " started");
}
try {
while (true) {
int val = in.readInt();
out.writeInt(val);
out.flush();
}
} catch (EOFException e) {
// Finish
}
synchronized (MultiplexTests.this) {
System.out.println(getName() + " finished");
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void testChannels() {
try {
final PipedInputStream inClnt = new PipedInputStream();
final PipedInputStream inSvr = new PipedInputStream();
final PipedOutputStream outClnt = new PipedOutputStream(inSvr);
final PipedOutputStream outSvr = new PipedOutputStream(inClnt);
final List<List<Integer>> clntSentBufs = new ArrayList<List<Integer>>();
final List<List<Integer>> clntRecvBufs = new ArrayList<List<Integer>>();
final Thread[] clntReaders = new Thread[NUM_CHANS];
final Thread[] clntWriters = new Thread[NUM_CHANS];
final Thread[] svrRW = new Thread[NUM_CHANS];
for (int i = 0; i < NUM_CHANS; i++) {
clntSentBufs.add(new ArrayList<Integer>());
clntRecvBufs.add(new ArrayList<Integer>());
}
// Must start server first or it will miss the new channel message
ChannelMultiplexer mpxSvr = startMpxServer(inSvr, outSvr, svrRW);
ChannelMultiplexer mpxClnt = startMpxClient(inClnt, outClnt);
List<MultiplexedChannel> channels = runChannelTest(mpxClnt, clntReaders, clntWriters, clntSentBufs, clntRecvBufs);
for (int i = 0; i < NUM_CHANS; i++) {
clntWriters[i].join();
clntReaders[i].join();
}
for (MultiplexedChannel channel : channels) {
channel.close();
}
for (int i = 0; i < NUM_CHANS; i++) {
svrRW[i].join();
}
for (int i = 0; i < NUM_CHANS; i++) {
assertEquals(clntSentBufs.get(i), clntRecvBufs.get(i));
}
} catch (IOException | InterruptedException e) {
fail(e.getMessage());
}
}
private List<MultiplexedChannel> runChannelTest(ChannelMultiplexer mpx, Thread[] readers, Thread[] writers, final List<List<Integer>> sentBufs, final List<List<Integer>> recvBufs) throws IOException {
List<MultiplexedChannel> channels = new ArrayList<MultiplexedChannel>();
for (int i = 0 ; i < NUM_CHANS; i++) {
MultiplexedChannel chan = mpx.openChannel(); // needs to be in same thread as reader
ChanReader reader = new ChanReader(chan, recvBufs.get(chan.getId()), "clnt reader thread " + chan.getId());
readers[chan.getId()] = new Thread(reader, reader.getName());
ChanWriter writer = new ChanWriter(chan, sentBufs.get(chan.getId()), "clnt writer thread " + chan.getId());
writers[chan.getId()] = new Thread(writer, writer.getName());
readers[chan.getId()].start();
writers[chan.getId()].start();
channels.add(chan);
}
return channels;
}
private ChannelMultiplexer startMpxClient(InputStream in, OutputStream out) {
final ChannelMultiplexer mpx = new ChannelMultiplexer("C", MultiplexTests.this, in, out);
mpx.start();
return mpx;
}
private ChannelMultiplexer startMpxServer(InputStream in, OutputStream out, final Thread[] rws) throws IOException {
final ChannelMultiplexer mpx = new ChannelMultiplexer("S", MultiplexTests.this, in, out);
mpx.addListener(new IChannelListener() {
@Override
public void newChannel(final MultiplexedChannel chan) {
synchronized (MultiplexTests.this) {
System.err.println("newChannel "+chan.getId());
}
ChanReaderWriter rw = new ChanReaderWriter(chan, "svr rw thread " + chan.getId());
rws[chan.getId()] = new Thread(rw, rw.getName());
rws[chan.getId()].start();
}
@Override
public void closeChannel(MultiplexedChannel chan) {
// readers[idx].interrupt();
// writers[idx].interrupt();
synchronized (MultiplexTests.this) {
System.err.println("closeChannel "+ chan.getId());
}
}
});
mpx.start();
return mpx;
}
@Override
protected void setUp() throws Exception {
}
@Override
protected void tearDown() throws Exception {
}
}