blob: 8f0a374c9a19fe23aa3cd49683be3922bf5fbe91 [file] [log] [blame]
/*
* Copyright (c) 2008, 2009, 2011, 2012 Eike Stepper (Berlin, Germany) and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Eike Stepper - initial API and implementation
* Teerawat Chaiyakijpichet (No Magic Asia Ltd.) - SSL
*/
package org.eclipse.net4j.tests;
import org.eclipse.net4j.Net4jUtil;
import org.eclipse.net4j.buffer.IBuffer;
import org.eclipse.net4j.buffer.IBufferProvider;
import org.eclipse.net4j.channel.ChannelInputStream;
import org.eclipse.net4j.channel.ChannelOutputStream;
import org.eclipse.net4j.channel.IChannel;
import org.eclipse.net4j.connector.IConnector;
import org.eclipse.net4j.tests.data.HugeData;
import org.eclipse.net4j.util.container.IContainerDelta;
import org.eclipse.net4j.util.container.IContainerEvent;
import org.eclipse.net4j.util.event.IEvent;
import org.eclipse.net4j.util.event.IListener;
import org.eclipse.net4j.util.factory.IFactory;
import org.eclipse.net4j.util.factory.ProductCreationException;
import org.eclipse.net4j.util.io.IOUtil;
import org.eclipse.spi.net4j.ClientProtocolFactory;
import org.eclipse.spi.net4j.Connector;
import org.eclipse.spi.net4j.Protocol;
import org.eclipse.spi.net4j.ServerProtocolFactory;
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.util.StringTokenizer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* @author Eike Stepper
*/
public abstract class TransportTest extends AbstractProtocolTest
{
public TransportTest()
{
}
@Override
protected abstract boolean useJVMTransport();
protected IBuffer provideBuffer()
{
return provideBuffer(null);
}
protected IBuffer provideBuffer(IConnector iConnector)
{
IBuffer buffer = null;
if (!useJVMTransport() && useSSLTransport())
{
// cannot use buffer provider from net4j need to use SSL Buffer inside the SSLConnector.
buffer = ((Connector)iConnector).provideBuffer();
}
else
{
IBufferProvider bufferProvider = Net4jUtil.getBufferProvider(container);
buffer = bufferProvider.provideBuffer();
}
return buffer;
}
private void registerClientFactory(IFactory factory)
{
if (!useJVMTransport() && useSSLTransport())
{
// need separate container between client and server for SSL.
separateContainer.registerFactory(factory);
}
else
{
container.registerFactory(factory);
}
}
protected IBufferProvider provideBufferProvider(IConnector iConnector)
{
IBufferProvider bufferProvider = null;
if (!useJVMTransport() && useSSLTransport())
{
// cannot use buffer provider from net4j need to use SSL Buffer inside the SSLConnector.
bufferProvider = ((Connector)iConnector).getConfig().getBufferProvider();
}
else
{
bufferProvider = Net4jUtil.getBufferProvider(container);
}
return bufferProvider;
}
public void testConnect() throws Exception
{
startTransport();
}
public void testSendBuffer() throws Exception
{
startTransport();
IConnector connecter = getConnector();
IChannel channel = connecter.openChannel();
for (int i = 0; i < 3; i++)
{
IBuffer buffer = provideBuffer(connecter);
ByteBuffer byteBuffer = buffer.startPutting(channel.getID());
byteBuffer.putInt(1970);
channel.sendBuffer(buffer);
}
}
public void testSendEmptyBuffer() throws Exception
{
startTransport();
IConnector connecter = getConnector();
IChannel channel = connecter.openChannel();
for (int i = 0; i < 3; i++)
{
IBuffer buffer = provideBuffer(connecter);
buffer.startPutting(channel.getID());
channel.sendBuffer(buffer);
}
}
public void testSendEmptyBuffer2() throws Exception
{
startTransport();
IConnector connecter = getConnector();
IChannel channel = connecter.openChannel();
for (int i = 0; i < 3; i++)
{
IBuffer buffer = provideBuffer(connecter);
channel.sendBuffer(buffer);
}
}
public void testHandleBuffer() throws Exception
{
final int COUNT = 3;
final CountDownLatch counter = new CountDownLatch(COUNT);
container.registerFactory(new TestProtocol.ServerFactory(counter));
// need to handle about separating container between client and server for SSL.
registerClientFactory(new TestProtocol.ClientFactory());
startTransport();
IConnector iConnecter = getConnector();
IChannel channel = iConnecter.openChannel(TestProtocol.ClientFactory.TYPE, null);
for (int i = 0; i < COUNT; i++)
{
IBuffer buffer = provideBuffer(iConnecter);
ByteBuffer byteBuffer = buffer.startPutting(channel.getID());
byteBuffer.putInt(1970);
channel.sendBuffer(buffer);
sleep(50);
}
assertEquals(true, counter.await(2, TimeUnit.SECONDS));
}
public void testHandleEmptyBuffer() throws Exception
{
final int COUNT = 3;
final CountDownLatch counter = new CountDownLatch(COUNT);
container.registerFactory(new TestProtocol.ServerFactory(counter));
// need to handle about separating container between client and server for SSL.
registerClientFactory(new TestProtocol.ClientFactory());
startTransport();
IConnector connecter = getConnector();
IChannel channel = connecter.openChannel(TestProtocol.ClientFactory.TYPE, null);
for (int i = 0; i < COUNT; i++)
{
IBuffer buffer = provideBuffer(connecter);
buffer.startPutting(channel.getID());
channel.sendBuffer(buffer);
sleep(50);
}
assertEquals(COUNT, counter.getCount());
}
public void testHandleEmptyBuffer2() throws Exception
{
final int COUNT = 3;
final CountDownLatch counter = new CountDownLatch(COUNT);
container.registerFactory(new TestProtocol.ServerFactory(counter));
// need to handle about separating container between client and server for SSL.
registerClientFactory(new TestProtocol.ClientFactory());
startTransport();
IConnector connecter = getConnector();
IChannel channel = connecter.openChannel(TestProtocol.ClientFactory.TYPE, null);
for (int i = 0; i < COUNT; i++)
{
IBuffer buffer = provideBuffer(connecter);
channel.sendBuffer(buffer);
sleep(50);
}
assertEquals(COUNT, counter.getCount());
}
public void testStreaming() throws Exception
{
final int COUNT = 1;
final CountDownLatch counter = new CountDownLatch(COUNT);
final ChannelInputStream[] inputStream = new ChannelInputStream[1];
getAcceptor().addListener(new IListener()
{
public void notifyEvent(IEvent event)
{
if (event instanceof IContainerEvent<?>)
{
@SuppressWarnings("unchecked")
IContainerEvent<IConnector> e = (IContainerEvent<IConnector>)event;
e.getDeltaElement().addListener(new IListener()
{
public void notifyEvent(IEvent event)
{
if (event instanceof IContainerEvent<?>)
{
@SuppressWarnings("unchecked")
IContainerEvent<IChannel> e = (IContainerEvent<IChannel>)event;
if (e.getDeltaKind() == IContainerDelta.Kind.ADDED)
{
inputStream[0] = new ChannelInputStream(e.getDeltaElement(), 2000);
counter.countDown();
}
}
}
});
}
}
});
IChannel channel = getConnector().openChannel();
assertEquals(true, counter.await(2, TimeUnit.SECONDS));
assertNotNull(inputStream[0]);
ChannelOutputStream outputStream = new ChannelOutputStream(channel);
outputStream.write(HugeData.getBytes());
outputStream.flushWithEOS();
outputStream.close();
try
{
InputStreamReader isr = new InputStreamReader(inputStream[0]);
BufferedReader reader = new BufferedReader(isr);
String line;
while ((line = reader.readLine()) != null)
{
msg(line);
}
isr.close();
}
catch (RuntimeException ex)
{
IOUtil.print(ex);
}
}
/**
* TODO Fails occasionally ;-( Caused by: java.lang.IllegalStateException: selectionKey == null
*/
public void testTextStreaming() throws Exception
{
final int COUNT = 1;
final CountDownLatch counter = new CountDownLatch(COUNT);
final ChannelInputStream[] inputStream = new ChannelInputStream[1];
getAcceptor().addListener(new IListener()
{
public void notifyEvent(IEvent event)
{
if (event instanceof IContainerEvent<?>)
{
@SuppressWarnings("unchecked")
IContainerEvent<IConnector> e = (IContainerEvent<IConnector>)event;
e.getDeltaElement().addListener(new IListener()
{
public void notifyEvent(IEvent event)
{
if (event instanceof IContainerEvent<?>)
{
@SuppressWarnings("unchecked")
IContainerEvent<IChannel> e = (IContainerEvent<IChannel>)event;
if (e.getDeltaKind() == IContainerDelta.Kind.ADDED)
{
inputStream[0] = new ChannelInputStream(e.getDeltaElement(), 2000);
counter.countDown();
}
}
}
});
}
}
});
IChannel channel = getConnector().openChannel();
assertEquals(true, counter.await(2, TimeUnit.SECONDS));
assertNotNull(inputStream[0]);
ChannelOutputStream outputStream = new ChannelOutputStream(channel);
PrintStream printer = new PrintStream(outputStream);
StringTokenizer tokenizer = HugeData.getTokenizer();
while (tokenizer.hasMoreTokens())
{
String token = tokenizer.nextToken();
printer.println(token);
}
outputStream.flushWithEOS();
outputStream.close();
try
{
InputStreamReader isr = new InputStreamReader(inputStream[0]);
BufferedReader reader = new BufferedReader(isr);
String line;
while ((line = reader.readLine()) != null)
{
msg(line);
}
isr.close();
}
catch (RuntimeException ex)
{
IOUtil.print(ex);
}
}
public void testTextStreamingDecoupled() throws Exception
{
final int COUNT = 1;
final CountDownLatch counter = new CountDownLatch(COUNT);
final ChannelInputStream[] inputStream = new ChannelInputStream[1];
getAcceptor().addListener(new IListener()
{
public void notifyEvent(IEvent event)
{
if (event instanceof IContainerEvent<?>)
{
@SuppressWarnings("unchecked")
IContainerEvent<IConnector> e = (IContainerEvent<IConnector>)event;
e.getDeltaElement().addListener(new IListener()
{
public void notifyEvent(IEvent event)
{
if (event instanceof IContainerEvent<?>)
{
@SuppressWarnings("unchecked")
IContainerEvent<IChannel> e = (IContainerEvent<IChannel>)event;
if (e.getDeltaKind() == IContainerDelta.Kind.ADDED)
{
inputStream[0] = new ChannelInputStream(e.getDeltaElement(), 2000);
counter.countDown();
}
}
}
});
}
}
});
final IConnector iConnector = getConnector();
final IChannel channel = iConnector.openChannel();
assertEquals(true, counter.await(2, TimeUnit.SECONDS));
assertNotNull(inputStream[0]);
new Thread()
{
@Override
public void run()
{
try
{
IBufferProvider bufferProvider = provideBufferProvider(iConnector);
ChannelOutputStream outputStream = new ChannelOutputStream(channel, bufferProvider);
PrintStream printer = new PrintStream(outputStream);
StringTokenizer tokenizer = HugeData.getTokenizer();
while (tokenizer.hasMoreTokens())
{
String token = tokenizer.nextToken();
printer.println(token);
}
outputStream.flushWithEOS();
outputStream.close();
}
catch (IOException ex)
{
IOUtil.print(ex);
fail(ex.getLocalizedMessage());
}
}
}.start();
try
{
InputStreamReader isr = new InputStreamReader(inputStream[0]);
BufferedReader reader = new BufferedReader(isr);
String line;
while ((line = reader.readLine()) != null)
{
msg(line);
}
isr.close();
}
catch (RuntimeException ex)
{
IOUtil.print(ex);
}
}
public void testDataStreaming() throws Exception
{
final int COUNT = 1;
final CountDownLatch counter = new CountDownLatch(COUNT);
final ChannelInputStream[] inputStream = new ChannelInputStream[1];
getAcceptor().addListener(new IListener()
{
public void notifyEvent(IEvent event)
{
if (event instanceof IContainerEvent<?>)
{
@SuppressWarnings("unchecked")
IContainerEvent<IConnector> e = (IContainerEvent<IConnector>)event;
e.getDeltaElement().addListener(new IListener()
{
public void notifyEvent(IEvent event)
{
if (event instanceof IContainerEvent<?>)
{
@SuppressWarnings("unchecked")
IContainerEvent<IChannel> e = (IContainerEvent<IChannel>)event;
if (e.getDeltaKind() == IContainerDelta.Kind.ADDED)
{
inputStream[0] = new ChannelInputStream(e.getDeltaElement(), 2000);
counter.countDown();
}
}
}
});
}
}
});
IChannel channel = getConnector().openChannel();
assertEquals(true, counter.await(2, TimeUnit.SECONDS));
ChannelOutputStream outputStream = new ChannelOutputStream(channel);
DataOutputStream dataOutput = new DataOutputStream(outputStream);
byte[] data = HugeData.getBytes();
dataOutput.writeInt(data.length);
dataOutput.write(data);
dataOutput.flush();
dataOutput.close();
outputStream.flush();
DataInputStream dataInput = new DataInputStream(inputStream[0]);
int size = dataInput.readInt();
byte[] b = new byte[size];
dataInput.read(b);
dataInput.close();
msg(new String(b));
}
/**
* @author Eike Stepper
*/
public static final class TestProtocol extends Protocol<CountDownLatch>
{
public TestProtocol(CountDownLatch counter)
{
super(ServerFactory.TYPE);
setInfraStructure(counter);
}
public void handleBuffer(IBuffer buffer)
{
IOUtil.OUT().println("BUFFER ARRIVED"); //$NON-NLS-1$
buffer.release();
getInfraStructure().countDown();
}
/**
* @author Eike Stepper
*/
public static class ServerFactory extends ServerProtocolFactory
{
public static final String TYPE = "test.protocol"; //$NON-NLS-1$
private CountDownLatch counter;
public ServerFactory(CountDownLatch counter)
{
super(TYPE);
this.counter = counter;
}
public TestProtocol create(String description) throws ProductCreationException
{
return new TestProtocol(counter);
}
}
/**
* @author Eike Stepper
*/
public static class ClientFactory extends ClientProtocolFactory
{
public static final String TYPE = ServerFactory.TYPE;
public ClientFactory()
{
super(TYPE);
}
public TestProtocol create(String description) throws ProductCreationException
{
return new TestProtocol(null);
}
}
}
/**
* @author Eike Stepper
*/
public static final class TCP extends TransportTest
{
@Override
protected boolean useJVMTransport()
{
return false;
}
@Override
protected boolean useSSLTransport()
{
return false;
}
}
/**
* @author Eike Stepper
*/
public static final class JVM extends TransportTest
{
@Override
protected boolean useJVMTransport()
{
return true;
}
@Override
protected boolean useSSLTransport()
{
return false;
}
}
/**
* @author Teerawat Chaiyakijpichet (No Magic Asia Ltd.)
*/
public static final class SSL extends TransportTest
{
@Override
protected boolean useJVMTransport()
{
return false;
}
@Override
protected boolean useSSLTransport()
{
return true;
}
}
}