blob: ed2e6be4c306760bb59b7d49f1a9efdb8a531164 [file] [log] [blame]
package org.eclipse.osee.ote.endpoint;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.DatagramChannel;
import java.util.Date;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.logging.Level;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
import org.eclipse.osee.framework.logging.OseeLog;
import org.eclipse.osee.ote.OTEException;
import org.eclipse.osee.ote.message.event.OteEventMessage;
import org.eclipse.osee.ote.message.event.OteEventMessageHeader;
import org.eclipse.osee.ote.message.event.OteEventMessageUtil;
public class OteEndpointReceiveRunnable implements Runnable {
private static final int DATA_SIZE = 65536;
private static final int UDP_TIMEOUT = 240000; // 4 MINUTES
private static final int ONE_MEG = 1024 * 1024;
private volatile boolean run = true;
private volatile boolean debugOutput = false;
private Class<OteEndpointReceiveRunnable> logger = OteEndpointReceiveRunnable.class;
private final InetSocketAddress address;
private static final int MAGIC_NUMBER = ByteBuffer.wrap(OteEndpointSendRunnable.MAGIC_NUMBER).getInt();
private final Inflater inflater = new Inflater();
private CopyOnWriteArrayList<EndpointDataProcessor> dataProcessors = new CopyOnWriteArrayList<>();
public OteEndpointReceiveRunnable(InetSocketAddress address){
this.address = address;
}
public void stop(){
run = false;
}
@Override
public void run() {
ByteBuffer buffer = ByteBuffer.allocate(DATA_SIZE);
DatagramChannel channel = null;
try{
while(run){
try {
channel = DatagramChannel.open();
channel.socket().setReuseAddress(true);
channel.socket().bind(address);
channel.socket().setSoTimeout(UDP_TIMEOUT);
channel.socket().setReceiveBufferSize(ONE_MEG);
channel.configureBlocking(true);
while (run) {
try{
buffer.clear();
channel.receive(buffer);
buffer.flip();
processBuffer(buffer);
} catch(ClosedByInterruptException ex){
stop();
} catch (Throwable th){
th.printStackTrace();
}
}
} catch (BindException ex) {
if(debugOutput){
OseeLog.log(logger, Level.FINEST, ex);
}
channel.close();
Thread.sleep(1000);
}
}
}catch (InterruptedIOException ex) {
Thread.interrupted();
if (run && debugOutput) {
OseeLog.log(logger, Level.WARNING, "Unexpected interruption", ex);
}
} catch (ClosedByInterruptException ie) {
Thread.interrupted();
if (run && debugOutput) {
OseeLog.log(logger, Level.WARNING, "Unexpected interruption", ie);
}
} catch (Throwable t) {
throw new OTEException(t);
} finally {
try {
if (channel != null) {
channel.close();
}
} catch (IOException ex) {
if(debugOutput){
ex.printStackTrace();
}
}
}
}
private void processBuffer(ByteBuffer buffer) {
int magicNumber = 0;
if(buffer.remaining() > 4) {
magicNumber = buffer.getInt(0);
}
// compressed stream
if(magicNumber == MAGIC_NUMBER) {
inflater.reset();
inflater.setInput(buffer.array(), 4, buffer.remaining() - 4);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream(buffer.remaining() - 4);
byte[] tempBuf = new byte[1024];
try {
while (!inflater.finished()) {
int count = inflater.inflate(tempBuf);
outputStream.write(tempBuf, 0, count);
}
outputStream.close();
buffer = ByteBuffer.wrap(outputStream.toByteArray());
}
catch (DataFormatException e) {
OseeLog.log(getClass(), Level.SEVERE, e);
}
catch (IOException e) {
OseeLog.log(getClass(), Level.SEVERE, e);
}
}
int typeId = buffer.getShort(0) & 0xFFFF;
if(typeId == OteEventMessageHeader.MARKER_VALUE){
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
OteEventMessage msg = new OteEventMessage((byte[])data);
msg.getHeader().TTL.setNoLog(1);
if(debugOutput){
try {
System.out.printf("[%s] received: [%s][%d] from [%s:%d]\n", new Date(), msg.getHeader().TOPIC.getValue(), msg.getData().length, msg.getHeader().ADDRESS.getAddress().getHostAddress(), msg.getHeader().ADDRESS.getPort());
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
OteEventMessageUtil.postEvent(msg);
} else {
for(EndpointDataProcessor processor: dataProcessors){
if(processor.getTypeId() == typeId){
try{
processor.processBuffer(buffer);
} catch (Throwable th){
th.printStackTrace();
}
}
}
}
}
public void setDebugOutput(boolean enable) {
debugOutput = enable;
}
public InetSocketAddress getAddress() {
return address;
}
public void addDataProcessor(EndpointDataProcessor processor) {
dataProcessors.add(processor);
}
public void removeDataProcessor(EndpointDataProcessor processor) {
dataProcessors.remove(processor);
}
}