blob: deb6a41bf8e43c6203035530138674f12f5b2978 [file] [log] [blame]
package org.eclipse.osee.ote.filetransfer;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
public class TcpFileTransfer {
private static final int SECRET_CODE = 0xF17E;
private static final int BLOCK_SIZE = 64*1024;
public static TcpFileTransferHandle sendFile(ExecutorService service, final File file, final InetSocketAddress address) throws IOException{
if (!file.exists()) {
throw new IllegalArgumentException("Invalid file specified: " + file.getAbsolutePath());
}
final TransferProgressMonitor monitor = new TransferProgressMonitor();
monitor.updateTransferStartTime(System.currentTimeMillis());
final SocketChannel sendChannel = SocketChannel.open(address);
final TcpFileTransfer fileTransfer = new TcpFileTransfer(Direction.FILE_TO_CHANNEL, file);
Callable<Boolean> c = new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
fileTransfer.openSender(monitor, sendChannel);
monitor.updateTransferCompleteTime(System.currentTimeMillis());
return true;
}
};
InetSocketAddress localAddress = (InetSocketAddress) sendChannel.socket().getLocalSocketAddress();
return new TcpFileTransferHandle(monitor, localAddress, service.submit(c));
}
/**
*
* @param service
* @param file
* @param port
* @return the transfer handle or null if the file to be written to is in use by another process
* @throws IOException
*/
public static TcpFileTransferHandle receiveFile(ExecutorService service, final File file, int port) throws IOException{
if (service == null) {
throw new NullPointerException("Service cannot be null");
}
final TcpFileTransfer fileTransfer;
try {
fileTransfer = new TcpFileTransfer(Direction.CHANNEL_TO_FILE, file);
} catch (FileNotFoundException e) {
// most likely due to file being used by another process
return null;
}
final TransferProgressMonitor monitor = new TransferProgressMonitor();
monitor.updateTransferStartTime(System.currentTimeMillis());
final ServerSocketChannel channel = ServerSocketChannel.open();
channel.configureBlocking(false);
channel.socket().bind(new InetSocketAddress(InetAddress.getLocalHost(), port));
InetSocketAddress localAddress = (InetSocketAddress) channel.socket().getLocalSocketAddress();
Callable<Boolean> c = new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
fileTransfer.openReceiver(monitor, channel);
monitor.updateTransferCompleteTime(System.currentTimeMillis());
return true;
}
};
return new TcpFileTransferHandle(monitor, localAddress, service.submit(c));
}
private static enum Direction {
FILE_TO_CHANNEL {
@Override
long transfer(FileChannel fileChannel, SocketChannel byteChannel, long position, long count) throws IOException{
return fileChannel.transferTo(position, count, byteChannel);
}
@Override
FileChannel createChannel(File file) throws IOException{
return new FileInputStream(file).getChannel();
}
},
CHANNEL_TO_FILE {
@Override
long transfer(FileChannel fileChannel, SocketChannel byteChannel, long position, long count) throws IOException{
return fileChannel.transferFrom(byteChannel, position, count);
}
@Override
FileChannel createChannel(File file) throws IOException {
FileChannel channel = new FileOutputStream(file).getChannel();
channel.lock();
return channel;
}
};
abstract long transfer(FileChannel fileChannel, SocketChannel byteChannel, long position, long count) throws IOException;
abstract FileChannel createChannel(File file) throws IOException;
}
private final Direction direction;
private final File file;
private TcpFileTransfer(Direction direction, File file) throws FileNotFoundException, IOException{
this.direction = direction;
this.file = file;
}
private void openSender(TransferProgressMonitor monitor, SocketChannel sendChannel) throws IOException{
try {
Selector selector = Selector.open();
try {
sendChannel.configureBlocking(false);
sendChannel.register(selector, SelectionKey.OP_WRITE);
final File[] files;
if (file.isDirectory()) {
files = file.listFiles();
} else {
files = new File[]{file};
}
int fileCounter = 1;
ByteBuffer buffer = ByteBuffer.allocate(1024);
for (File targetFile : files) {
long size = targetFile.length();
monitor.updateTotalTransferAmount(size);
buffer.clear();
buffer.putInt(SECRET_CODE);
buffer.putInt(files.length);
buffer.putInt(fileCounter++);
String name = targetFile.getName();
buffer.putInt(name.length());
buffer.put(name.getBytes());
buffer.putLong(size);
buffer.flip();
sendChannel.write(buffer);
transfer(monitor, sendChannel, selector, size, targetFile);
buffer.clear();
sendChannel.keyFor(selector).interestOps(SelectionKey.OP_READ);
waitForData(selector, sendChannel, buffer);
sendChannel.keyFor(selector).interestOps(SelectionKey.OP_WRITE);
}
} finally {
selector.close();
}
} finally {
sendChannel.close();
}
}
private void openReceiver(TransferProgressMonitor monitor, ServerSocketChannel serverChannel) throws IOException{
try {
Selector selector = Selector.open();
try {
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
// only way to timeout on accept with channels is to use a selector.
// ServerSocketChannel does not honor the socket.setSoTimeout() value
if (selector.select(10000) < 1) {
throw new SocketTimeoutException();
}
selector.selectedKeys().clear();
serverChannel.keyFor(selector).cancel();
SocketChannel receiveChannel = serverChannel.accept();
receiveChannel.configureBlocking(false);
receiveChannel.register(selector, SelectionKey.OP_READ);
// now we need to handle timeouts for the data channel
try {
boolean done = false;
ByteBuffer buffer = ByteBuffer.allocate(1024);
byte[] fileNameBytes = new byte[2048];
while (!done) {
buffer.clear();
// we only want to read the next 12 bytes, anything more than 12 is the file data
// and we want that left in the channel to be consumed later
buffer.limit(16);
waitForData(selector, receiveChannel, buffer);
buffer.flip();
int code = buffer.getInt();
if (code != SECRET_CODE) {
throw new IOException("Unexpected data during transfer negotiation");
}
int fileCount = buffer.getInt();
int currentFile = buffer.getInt();
monitor.updateNumberOfFiles(fileCount);
int fileNameLength = buffer.getInt();
buffer.clear();
buffer.limit(fileNameLength + 8);
waitForData(selector, receiveChannel, buffer);
buffer.flip();
buffer.get(fileNameBytes, 0, fileNameLength);
long fileSize = buffer.getLong();
monitor.updateTotalTransferAmount(fileSize);
File targetFile = file.isDirectory() ? new File(file, new String(fileNameBytes, 0, fileNameLength)) : file;
transfer(monitor, receiveChannel, selector, fileSize, targetFile);
buffer.clear();
buffer.putInt(0xF0F0);
buffer.flip();
receiveChannel.write(buffer);
done = currentFile == fileCount;
}
} finally {
receiveChannel.close();
}
} finally {
selector.close();
}
} finally {
serverChannel.close();
}
}
private static void waitForData(Selector selector, SocketChannel channel, ByteBuffer buffer) throws SocketTimeoutException, IOException{
if (selector.select(10000) < 1) {
throw new SocketTimeoutException();
}
selector.selectedKeys().clear();
channel.read(buffer);
}
private static void waitForChannel(Selector selector) throws SocketTimeoutException, IOException{
selector.select(10000);
Set<SelectionKey> set = selector.selectedKeys();
if (set.size() == 0) {
throw new SocketTimeoutException();
}
set.clear();
}
private void transfer(TransferProgressMonitor monitor, SocketChannel channel, Selector selector, long totalTransferAmount, File targetFile) throws IOException {
// we need a selector to deal with time outs
FileChannel fileChannel = direction.createChannel(targetFile);
try {
long position = 0;
while (position < totalTransferAmount) {
waitForChannel(selector);
long count = direction.transfer(fileChannel, channel, position, Math.min(BLOCK_SIZE, totalTransferAmount - position));
if (count < 0) {
channel.keyFor(selector).cancel();
continue;
}
position += count;
monitor.updateAmountTransferred(position);
}
} finally {
fileChannel.close();
}
}
}