Bug 479433 - Add better support for multiple input streams.

Change-Id: I6234928c5ef6129cda3b77fd7e6467c0e817d7b4
Signed-off-by: Greg Watson <g.watson@computer.org>
diff --git a/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/JSchConnection.java b/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/JSchConnection.java
index c9e2dc1..3ea70d3 100644
--- a/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/JSchConnection.java
+++ b/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/JSchConnection.java
@@ -257,7 +257,7 @@
 	private final Map<String, String> fProperties = new HashMap<String, String>();
 	private final List<Session> fSessions = new ArrayList<Session>();
 
-	private ChannelSftp fSftpChannel;
+	private ChannelSftp fSftpCommandChannel;
 	private boolean isFullySetup; // including sftp channel and environment
 
 	private static final Map<IRemoteConnection, JSchConnection> connectionMap = new HashMap<>();
@@ -363,11 +363,11 @@
 	}
 
 	private synchronized void cleanup() {
-		if (fSftpChannel != null) {
-			if (fSftpChannel.isConnected()) {
-				fSftpChannel.disconnect();
+		if (fSftpCommandChannel != null) {
+			if (fSftpCommandChannel.isConnected()) {
+				fSftpCommandChannel.disconnect();
 			}
-			fSftpChannel = null;
+			fSftpCommandChannel = null;
 		}
 		for (Session session : fSessions) {
 			if (session.isConnected()) {
@@ -677,24 +677,40 @@
 	}
 
 	/**
-	 * Open an sftp channel to the remote host. Always use the second session if available.
+	 * Open an sftp command channel to the remote host. This channel is for commands that do not require any
+	 * state being preserved and should not be closed. Long running commands (such as get/put) should use a separate channel
+	 * obtained via {#link #newSftpChannel()}.
+	 * 
+	 * Always use the second session if available.
 	 *
-	 * @return sftp channel or null if the progress monitor was cancelled
+	 * @return sftp channel
 	 * @throws RemoteConnectionException
 	 *             if a channel could not be opened
 	 */
-	public ChannelSftp getSftpChannel() throws RemoteConnectionException {
-		if (fSftpChannel == null || fSftpChannel.isClosed()) {
-			Session session = fSessions.get(0);
-			if (fSessions.size() > 1) {
-				session = fSessions.get(1);
-			}
-			fSftpChannel = openSftpChannel(session);
-			if (fSftpChannel == null) {
-				throw new RemoteConnectionException(Messages.JSchConnection_Unable_to_open_sftp_channel);
-			}
+	public ChannelSftp getSftpCommandChannel() throws RemoteConnectionException {
+		if (fSftpCommandChannel == null || fSftpCommandChannel.isClosed()) {
+			fSftpCommandChannel = newSftpChannel();
 		}
-		return fSftpChannel;
+		return fSftpCommandChannel;
+	}
+
+	/**
+	 * Open a channel for long running commands. This channel should be closed when the command is completed.
+	 * 
+	 * @return sftp channel
+	 * @throws RemoteConnectionException
+	 *             if a channel could not be opened
+	 */
+	public ChannelSftp newSftpChannel() throws RemoteConnectionException {
+		Session session = fSessions.get(0);
+		if (fSessions.size() > 1) {
+			session = fSessions.get(1);
+		}
+		ChannelSftp channel = openSftpChannel(session);
+		if (channel == null) {
+			throw new RemoteConnectionException(Messages.JSchConnection_Unable_to_open_sftp_channel);
+		}
+		return channel;
 	}
 
 	public Channel getStreamForwarder(String host, int port) throws RemoteConnectionException {
diff --git a/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/commands/AbstractRemoteCommand.java b/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/commands/AbstractRemoteCommand.java
index 3d49cba..8649e9a 100755
--- a/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/commands/AbstractRemoteCommand.java
+++ b/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/commands/AbstractRemoteCommand.java
@@ -188,7 +188,7 @@
 		private ChannelSftp fSftpChannel;
 
 		private Future<T1> asyncCmdInThread() throws RemoteConnectionException {
-			setChannel(fConnection.getSftpChannel());
+			setChannel(fConnection.getSftpCommandChannel());
 			return fPool.submit(this);
 		}
 
@@ -244,7 +244,6 @@
 					if (e.getCause() instanceof SftpException) {
 						throw (SftpException) e.getCause();
 					}
-					getChannel().disconnect();
 					throw new RemoteConnectionException(e.getMessage());
 				}
 				getProgressMonitor().worked(1);
@@ -253,7 +252,6 @@
 				Thread.currentThread().interrupt(); // set current thread flag
 			}
 			future.cancel(true);
-			getChannel().disconnect();
 			throw new RemoteConnectionException(Messages.AbstractRemoteCommand_Operation_cancelled_by_user);
 		}
 	}
diff --git a/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/commands/GetInputStreamCommand.java b/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/commands/GetInputStreamCommand.java
index 9c3743d..aa55234 100644
--- a/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/commands/GetInputStreamCommand.java
+++ b/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/commands/GetInputStreamCommand.java
@@ -11,12 +11,24 @@
 import org.eclipse.remote.internal.jsch.core.JSchConnection;
 import org.eclipse.remote.internal.jsch.core.messages.Messages;
 
+import com.jcraft.jsch.ChannelSftp;
 import com.jcraft.jsch.JSchException;
 import com.jcraft.jsch.SftpException;
 
+/**
+ * The JSch implementation does not support multiple streams open on a single channel, so we must create a new channel for each
+ * subsequent stream. This has the problem that there are usually only a limited number of channels that can be opened
+ * simultaneously, so it is possible that this call will fail unless the open streams are closed first.
+ * 
+ * This code will use the initial (command) channel first, or if that is already being used, will open a new stream. It must be
+ * careful not to close the command stream as other threads may still be using it.
+ */
 public class GetInputStreamCommand extends AbstractRemoteCommand<InputStream> {
 	private final IPath fRemotePath;
 
+	private static ChannelSftp commandChannel;
+	private ChannelSftp thisChannel;
+
 	public GetInputStreamCommand(JSchConnection connection, IPath path) {
 		super(connection);
 		fRemotePath = path;
@@ -25,19 +37,124 @@
 	@Override
 	public InputStream getResult(IProgressMonitor monitor) throws RemoteConnectionException {
 		final SubMonitor subMon = SubMonitor.convert(monitor, 10);
-		SftpCallable<InputStream> c = new SftpCallable<InputStream>() {
+
+		final SftpCallable<InputStream> c = new SftpCallable<InputStream>() {
+			private ChannelSftp newChannel() throws IOException {
+				synchronized (GetInputStreamCommand.class) {
+					if (commandChannel != null) {
+						try {
+							thisChannel = getConnection().newSftpChannel();
+							return thisChannel;
+						} catch (RemoteConnectionException e) {
+							throw new IOException(e.getMessage());
+						}
+					}
+					thisChannel = commandChannel = getChannel();
+					return commandChannel;
+				}
+			}
+
 			@Override
 			public InputStream call() throws JSchException, SftpException, IOException {
-				try {
-					return getConnection().getSftpChannel().get(fRemotePath.toString(),
-							new CommandProgressMonitor(NLS.bind(Messages.GetInputStreamCommand_Receiving, fRemotePath.toString()), getProgressMonitor()));
-				} catch (RemoteConnectionException e) {
-					throw new IOException(e.getMessage());
-				}
+				return newChannel().get(fRemotePath.toString(), new CommandProgressMonitor(
+						NLS.bind(Messages.GetInputStreamCommand_Receiving, fRemotePath.toString()), getProgressMonitor()));
 			}
 		};
 		try {
-			return c.getResult(subMon.newChild(10));
+			final InputStream stream = c.getResult(subMon.newChild(10));
+			return new InputStream() {
+				@Override
+				public int read() throws IOException {
+					return stream.read();
+				}
+
+				/*
+				 * (non-Javadoc)
+				 * 
+				 * @see java.io.InputStream#close()
+				 */
+				@Override
+				public void close() throws IOException {
+					stream.close();
+					synchronized (GetInputStreamCommand.class) {
+						if (thisChannel != commandChannel) {
+							thisChannel.disconnect();
+						} else {
+							commandChannel = null;
+						}
+					}
+				}
+
+				/*
+				 * (non-Javadoc)
+				 * 
+				 * @see java.io.InputStream#read(byte[])
+				 */
+				@Override
+				public int read(byte[] b) throws IOException {
+					return stream.read(b);
+				}
+
+				/*
+				 * (non-Javadoc)
+				 * 
+				 * @see java.io.InputStream#read(byte[], int, int)
+				 */
+				@Override
+				public int read(byte[] b, int off, int len) throws IOException {
+					return stream.read(b, off, len);
+				}
+
+				/*
+				 * (non-Javadoc)
+				 * 
+				 * @see java.io.InputStream#skip(long)
+				 */
+				@Override
+				public long skip(long n) throws IOException {
+					return stream.skip(n);
+				}
+
+				/*
+				 * (non-Javadoc)
+				 * 
+				 * @see java.io.InputStream#available()
+				 */
+				@Override
+				public int available() throws IOException {
+					return stream.available();
+				}
+
+				/*
+				 * (non-Javadoc)
+				 * 
+				 * @see java.io.InputStream#mark(int)
+				 */
+				@Override
+				public synchronized void mark(int readlimit) {
+					stream.mark(readlimit);
+				}
+
+				/*
+				 * (non-Javadoc)
+				 * 
+				 * @see java.io.InputStream#reset()
+				 */
+				@Override
+				public synchronized void reset() throws IOException {
+					stream.reset();
+				}
+
+				/*
+				 * (non-Javadoc)
+				 * 
+				 * @see java.io.InputStream#markSupported()
+				 */
+				@Override
+				public boolean markSupported() {
+					return stream.markSupported();
+				}
+			};
 		} catch (SftpException e) {
 			throw new RemoteConnectionException(e.getMessage());
 		}
diff --git a/tests/org.eclipse.remote.jsch.tests/src/org/eclipse/remote/jsch/tests/FileStoreTests.java b/tests/org.eclipse.remote.jsch.tests/src/org/eclipse/remote/jsch/tests/FileStoreTests.java
index 8c1e3ed..62d0dda 100644
--- a/tests/org.eclipse.remote.jsch.tests/src/org/eclipse/remote/jsch/tests/FileStoreTests.java
+++ b/tests/org.eclipse.remote.jsch.tests/src/org/eclipse/remote/jsch/tests/FileStoreTests.java
@@ -9,8 +9,6 @@
 import java.io.OutputStreamWriter;
 import java.net.URI;
 
-import junit.framework.TestCase;
-
 import org.eclipse.core.filesystem.EFS;
 import org.eclipse.core.filesystem.IFileInfo;
 import org.eclipse.core.filesystem.IFileStore;
@@ -24,6 +22,8 @@
 import org.eclipse.remote.core.IRemoteServicesManager;
 import org.eclipse.remote.internal.jsch.core.JSchConnection;
 
+import junit.framework.TestCase;
+
 public class FileStoreTests extends TestCase {
 	private static final String CONNECTION_NAME = "test_connection";
 	private static final String USERNAME = "test";
@@ -33,7 +33,9 @@
 	private static final String REMOTE_DIR = "/tmp/ptp_" + USERNAME + "/filestore_tests";
 	private static final String LOCAL_FILE = "local_file";
 	private static final String REMOTE_FILE = "remote_file";
+	private static final String REMOTE_FILE2 = "remote_file2";
 	private static final String TEST_STRING = "a string containing fairly *()(*&^$%## random text";
+	private static final String TEST_STRING2 = "a different string containing fairly *()(*&^$%## random text";
 
 	private IRemoteConnectionType fConnectionType;
 	private IRemoteConnection fRemoteConnection;
@@ -82,6 +84,67 @@
 		}
 	}
 
+	public void testMultiStreams() {
+		IFileStore remoteFileStore = fRemoteDir.getChild(REMOTE_FILE);
+		IFileStore remoteFileStore2 = fRemoteDir.getChild(REMOTE_FILE2);
+
+		try {
+			createFile(remoteFileStore, TEST_STRING);
+			createFile(remoteFileStore2, TEST_STRING2);
+		} catch (Exception e) {
+			fail(e.getMessage());
+		}
+
+		assertTrue(remoteFileStore.fetchInfo().exists());
+		assertTrue(remoteFileStore2.fetchInfo().exists());
+
+		/*
+		 * Check how many streams we can open
+		 */
+		InputStream streams[] = new InputStream[100];
+		int streamCount = 0;
+
+		for (; streamCount < streams.length; streamCount++) {
+			try {
+				streams[streamCount] = remoteFileStore.openInputStream(EFS.NONE, null);
+			} catch (Exception e) {
+				if (!e.getMessage().endsWith("channel is not opened.")) {
+					fail(e.getMessage());
+				}
+				break;
+			}
+		}
+
+		for (int i = 0; i < streamCount; i++) {
+			try {
+				streams[i].close();
+			} catch (IOException e) {
+				// No need to deal with this
+			}
+		}
+
+		for (int i = 0; i < streamCount / 2; i++) {
+			try {
+				InputStream stream = remoteFileStore.openInputStream(EFS.NONE, null);
+				assertNotNull(stream);
+				BufferedReader buf = new BufferedReader(new InputStreamReader(stream));
+				String line = buf.readLine().trim();
+				assertTrue(line.equals(TEST_STRING));
+
+				InputStream stream2 = remoteFileStore2.openInputStream(EFS.NONE, null);
+				assertNotNull(stream2);
+				BufferedReader buf2 = new BufferedReader(new InputStreamReader(stream2));
+				String line2 = buf2.readLine().trim();
+				assertTrue(line2.equals(TEST_STRING2));
+
+				stream.close();
+				stream2.close();
+			} catch (Exception e) {
+				fail(e.getMessage());
+			}
+		}
+	}
+
 	public void testCopy() {
 		final IFileStore localFileStore = fLocalDir.getChild(LOCAL_FILE);
 		final IFileStore remoteFileStore = fRemoteDir.getChild(REMOTE_FILE);