feature[ats_TW2133]: Adding command-line for datagram sending. This command line option will allow us to switch the DatagramChannel.send between blocking and non-blocking. The default will be blocking (as it was before) but if we ever need this thread to exexute as fast as possible it can be set to non-blocking (set property to false). Change-Id: Ib3e23c036d97e3d9ea9acc6352ecae7e542bd45d
diff --git a/org.eclipse.ote.io/src/org/eclipse/ote/io/BasicDatagramChannelRunnable.java b/org.eclipse.ote.io/src/org/eclipse/ote/io/BasicDatagramChannelRunnable.java index 1df753d..565f6ef 100644 --- a/org.eclipse.ote.io/src/org/eclipse/ote/io/BasicDatagramChannelRunnable.java +++ b/org.eclipse.ote.io/src/org/eclipse/ote/io/BasicDatagramChannelRunnable.java
@@ -3,12 +3,14 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.util.List; public class BasicDatagramChannelRunnable extends DatagramChannelRunnable { - private static final int SEND_BUFFER_SIZE = 1024 * 512; + private static final int SEND_BUFFER_SIZE = 1024 * 512 * 10; + private static final boolean BLOCKING = System.getProperty("ote.pubsub.send.block", "true").equalsIgnoreCase("true"); public BasicDatagramChannelRunnable(InetSocketAddress address) { super(address); @@ -25,15 +27,17 @@ @Override public void doSend(DatagramChannel channel, DatagramChannelData data) throws IOException { - data.getByteBuffer().flip(); + ByteBuffer byteBuffer = data.getByteBuffer(); + byteBuffer.flip(); List<SocketAddress> addresses = data.getAddresses(); int innerSize = addresses.size(); for(int j = 0; j < innerSize; j++){ - channel.send(data.getByteBuffer(), addresses.get(j)); - data.getByteBuffer().rewind(); + SocketAddress address = addresses.get(j); + channel.send(byteBuffer, address); + byteBuffer.rewind(); } } - + @Override public DatagramChannel openAndInitializeDatagramChannel(InetSocketAddress address) throws IOException { DatagramChannel channel = DatagramChannel.open(); @@ -42,7 +46,7 @@ } channel.socket().setReuseAddress(true); channel.socket().bind(address); - channel.configureBlocking(true); + channel.configureBlocking(BLOCKING); return channel; }