blob: db53f7a1cce958fb5659f684210973520c218cb7 [file] [log] [blame]
package org.eclipse.ote.io;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.DatagramChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.logging.Level;
import org.eclipse.osee.framework.logging.OseeLog;
public abstract class DatagramChannelRunnable implements Runnable {
private ArrayBlockingQueue<DatagramChannelData> data;
private InetSocketAddress address;
public DatagramChannelRunnable(InetSocketAddress address){
this.address = address;
}
public DatagramChannelRunnable(){
}
void setQueue(ArrayBlockingQueue<DatagramChannelData> data) {
this.data = data;
}
@Override
public void run() {
DatagramChannel channel = null;
try {
channel = openAndInitializeDatagramChannel(address);
boolean keepRunning = true;
final List<DatagramChannelData> dataToSend = new ArrayList<DatagramChannelData>(32);
while(keepRunning){
try{
dataToSend.clear();
if (data.drainTo(dataToSend) < 1) {
try {
// block until something is available
dataToSend.add(data.take());
} catch (InterruptedException e) {
keepRunning = false;
continue;
}
}
int size = dataToSend.size();
for (int i = 0; i < size; i++) {
DatagramChannelData data = dataToSend.get(i);
if (data == DatagramChannelWorker.POISON_PILL) {
keepRunning = false;
break;
}
}
if(keepRunning){
doSend(channel, dataToSend);
}
} catch (ClosedByInterruptException ex){
OseeLog.log(getClass(), Level.SEVERE, "Error trying to send data", ex);
channel = openAndInitializeDatagramChannel(address);
} catch (AsynchronousCloseException ex){
OseeLog.log(getClass(), Level.SEVERE, "Error trying to send data", ex);
channel = openAndInitializeDatagramChannel(address);
} catch (ClosedChannelException ex){
OseeLog.log(getClass(), Level.SEVERE, "Error trying to send data", ex);
channel = openAndInitializeDatagramChannel(address);
} catch (IOException ex){
OseeLog.log(getClass(), Level.SEVERE, "Error trying to send data", ex);
} finally {
int size = dataToSend.size();
for (int i = 0; i < size; i++) {
DatagramChannelData data = dataToSend.get(i);
data.postProcess();
}
}
}
} catch (IOException ex){
OseeLog.log(getClass(), Level.SEVERE, "Error opening DatagramChannel. Ending DatagramChannelRunnable unexpectedly.", ex);
} finally{
try {
if (channel != null) {
channel.close();
}
} catch (IOException e) {
OseeLog.log(getClass(), Level.SEVERE, "Error trying to send data", e);
}
}
}
public abstract void doSend(DatagramChannel channel2, List<DatagramChannelData> dataToSend) throws ClosedChannelException, AsynchronousCloseException, ClosedByInterruptException, IOException;
public abstract DatagramChannel openAndInitializeDatagramChannel(InetSocketAddress address) throws IOException;
}