blob: 3ae2f4e789de7102c428c36b51dfa68069ea8e98 [file] [log] [blame]
package org.eclipse.osee.ote.endpoint;
import java.net.InetSocketAddress;
import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import org.eclipse.osee.ote.OTEException;
import org.eclipse.osee.ote.collections.ObjectPool;
import org.eclipse.osee.ote.collections.ObjectPoolConfiguration;
import org.eclipse.osee.ote.message.event.OteEventMessage;
/**
* Launches a Thread that monitors a queue for data to send to specified UDP endpoints.
*
* @author b1528444
*
*/
public class OteUdpEndpointSender implements OteEndpointSender {
static final AddressBuffer POISON_PILL = new AddressBuffer();
private final ObjectPool<AddressBuffer> buffers;
private final ArrayBlockingQueue<AddressBuffer> toSend;
private final InetSocketAddress address;
private boolean debug = false;
private volatile boolean isClosed = false;
private volatile Thread thread;
public OteUdpEndpointSender(InetSocketAddress address){
toSend = new ArrayBlockingQueue<>(5000);
buffers = new ObjectPool<AddressBuffer>(new ObjectPoolConfiguration<AddressBuffer>(50,true) {
@Override
public AddressBuffer make() {
return new AddressBuffer();
}
});
this.address = address;
}
@Override
public void start(){
thread = new Thread(new OteEndpointSendRunnable(toSend, buffers, debug));
thread.setName(String.format("OTE Endpoint Sender[%s]", address.toString()));
thread.setDaemon(true);
thread.start();
}
@Override
public void stop() throws InterruptedException{
toSend.put(POISON_PILL);
isClosed = true;
}
@Override
public InetSocketAddress getAddress(){
return address;
}
@Override
public void send(OteEventMessage message) {
if(debug){
System.out.printf("[%s] sending: [%s] to [%s] [%d]\n", new Date(), message.getHeader().TOPIC.getValue(), address.toString(), message.getData().length);
}
AddressBuffer obj = buffers.getObject();
obj.getBuffer().clear();
obj.getBuffer().put(message.getData());
obj.getBuffer().flip();
obj.setAddress(address);
try {
toSend.put(obj);
} catch (InterruptedException e) {
throw new OTEException(e);
}
if (!thread.isAlive()) {
// our thread has sat idle for too long and self terminated go ahead and start a new one
start();
}
}
@Override
public boolean isClosed() {
return isClosed;
}
@Override
public void setDebug(boolean debug){
this.debug = debug;
}
}