blob: 565f6ef9e348db6b6b12a824c8cfdaa4c73930e6 [file] [log] [blame]
package org.eclipse.ote.io;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.List;
public class BasicDatagramChannelRunnable extends DatagramChannelRunnable {
private static final int SEND_BUFFER_SIZE = 1024 * 512 * 10;
private static final boolean BLOCKING = System.getProperty("ote.pubsub.send.block", "true").equalsIgnoreCase("true");
public BasicDatagramChannelRunnable(InetSocketAddress address) {
super(address);
}
@Override
public void doSend(DatagramChannel channel, List<DatagramChannelData> dataToSend) throws IOException {
int size = dataToSend.size();
for(int i = 0; i < size; i++){
DatagramChannelData data = dataToSend.get(i);
doSend(channel, data);
}
}
@Override
public void doSend(DatagramChannel channel, DatagramChannelData data) throws IOException {
ByteBuffer byteBuffer = data.getByteBuffer();
byteBuffer.flip();
List<SocketAddress> addresses = data.getAddresses();
int innerSize = addresses.size();
for(int j = 0; j < innerSize; j++){
SocketAddress address = addresses.get(j);
channel.send(byteBuffer, address);
byteBuffer.rewind();
}
}
@Override
public DatagramChannel openAndInitializeDatagramChannel(InetSocketAddress address) throws IOException {
DatagramChannel channel = DatagramChannel.open();
if (channel.socket().getSendBufferSize() < SEND_BUFFER_SIZE) {
channel.socket().setSendBufferSize(SEND_BUFFER_SIZE);
}
channel.socket().setReuseAddress(true);
channel.socket().bind(address);
channel.configureBlocking(BLOCKING);
return channel;
}
}