Bug 479433 - Add better support for multiple input streams.
Change-Id: I6234928c5ef6129cda3b77fd7e6467c0e817d7b4
Signed-off-by: Greg Watson <g.watson@computer.org>
diff --git a/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/JSchConnection.java b/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/JSchConnection.java
index c9e2dc1..3ea70d3 100644
--- a/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/JSchConnection.java
+++ b/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/JSchConnection.java
@@ -257,7 +257,7 @@
private final Map<String, String> fProperties = new HashMap<String, String>();
private final List<Session> fSessions = new ArrayList<Session>();
- private ChannelSftp fSftpChannel;
+ private ChannelSftp fSftpCommandChannel;
private boolean isFullySetup; // including sftp channel and environment
private static final Map<IRemoteConnection, JSchConnection> connectionMap = new HashMap<>();
@@ -363,11 +363,11 @@
}
private synchronized void cleanup() {
- if (fSftpChannel != null) {
- if (fSftpChannel.isConnected()) {
- fSftpChannel.disconnect();
+ if (fSftpCommandChannel != null) {
+ if (fSftpCommandChannel.isConnected()) {
+ fSftpCommandChannel.disconnect();
}
- fSftpChannel = null;
+ fSftpCommandChannel = null;
}
for (Session session : fSessions) {
if (session.isConnected()) {
@@ -677,24 +677,40 @@
}
/**
- * Open an sftp channel to the remote host. Always use the second session if available.
+ * Open an sftp command channel to the remote host. This channel is for commands that do not require any
+ * state being preserved and should not be closed. Long running commands (such as get/put) should use a separate channel
+ * obtained via {#link #newSftpChannel()}.
+ *
+ * Always use the second session if available.
*
- * @return sftp channel or null if the progress monitor was cancelled
+ * @return sftp channel
* @throws RemoteConnectionException
* if a channel could not be opened
*/
- public ChannelSftp getSftpChannel() throws RemoteConnectionException {
- if (fSftpChannel == null || fSftpChannel.isClosed()) {
- Session session = fSessions.get(0);
- if (fSessions.size() > 1) {
- session = fSessions.get(1);
- }
- fSftpChannel = openSftpChannel(session);
- if (fSftpChannel == null) {
- throw new RemoteConnectionException(Messages.JSchConnection_Unable_to_open_sftp_channel);
- }
+ public ChannelSftp getSftpCommandChannel() throws RemoteConnectionException {
+ if (fSftpCommandChannel == null || fSftpCommandChannel.isClosed()) {
+ fSftpCommandChannel = newSftpChannel();
}
- return fSftpChannel;
+ return fSftpCommandChannel;
+ }
+
+ /**
+ * Open a channel for long running commands. This channel should be closed when the command is completed.
+ *
+ * @return sftp channel
+ * @throws RemoteConnectionException
+ * if a channel could not be opened
+ */
+ public ChannelSftp newSftpChannel() throws RemoteConnectionException {
+ Session session = fSessions.get(0);
+ if (fSessions.size() > 1) {
+ session = fSessions.get(1);
+ }
+ ChannelSftp channel = openSftpChannel(session);
+ if (channel == null) {
+ throw new RemoteConnectionException(Messages.JSchConnection_Unable_to_open_sftp_channel);
+ }
+ return channel;
}
public Channel getStreamForwarder(String host, int port) throws RemoteConnectionException {
diff --git a/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/commands/AbstractRemoteCommand.java b/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/commands/AbstractRemoteCommand.java
index 3d49cba..8649e9a 100755
--- a/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/commands/AbstractRemoteCommand.java
+++ b/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/commands/AbstractRemoteCommand.java
@@ -188,7 +188,7 @@
private ChannelSftp fSftpChannel;
private Future<T1> asyncCmdInThread() throws RemoteConnectionException {
- setChannel(fConnection.getSftpChannel());
+ setChannel(fConnection.getSftpCommandChannel());
return fPool.submit(this);
}
@@ -244,7 +244,6 @@
if (e.getCause() instanceof SftpException) {
throw (SftpException) e.getCause();
}
- getChannel().disconnect();
throw new RemoteConnectionException(e.getMessage());
}
getProgressMonitor().worked(1);
@@ -253,7 +252,6 @@
Thread.currentThread().interrupt(); // set current thread flag
}
future.cancel(true);
- getChannel().disconnect();
throw new RemoteConnectionException(Messages.AbstractRemoteCommand_Operation_cancelled_by_user);
}
}
diff --git a/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/commands/GetInputStreamCommand.java b/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/commands/GetInputStreamCommand.java
index 9c3743d..aa55234 100644
--- a/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/commands/GetInputStreamCommand.java
+++ b/bundles/org.eclipse.remote.jsch.core/src/org/eclipse/remote/internal/jsch/core/commands/GetInputStreamCommand.java
@@ -11,12 +11,24 @@
import org.eclipse.remote.internal.jsch.core.JSchConnection;
import org.eclipse.remote.internal.jsch.core.messages.Messages;
+import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.SftpException;
+/**
+ * The JSch implementation does not support multiple streams open on a single channel, so we must create a new channel for each
+ * subsequent stream. This has the problem that there are usually only a limited number of channels that can be opened
+ * simultaneously, so it is possible that this call will fail unless the open streams are closed first.
+ *
+ * This code will use the initial (command) channel first, or if that is already being used, will open a new stream. It must be
+ * careful not to close the command stream as other threads may still be using it.
+ */
public class GetInputStreamCommand extends AbstractRemoteCommand<InputStream> {
private final IPath fRemotePath;
+ private static ChannelSftp commandChannel;
+ private ChannelSftp thisChannel;
+
public GetInputStreamCommand(JSchConnection connection, IPath path) {
super(connection);
fRemotePath = path;
@@ -25,19 +37,124 @@
@Override
public InputStream getResult(IProgressMonitor monitor) throws RemoteConnectionException {
final SubMonitor subMon = SubMonitor.convert(monitor, 10);
- SftpCallable<InputStream> c = new SftpCallable<InputStream>() {
+
+ final SftpCallable<InputStream> c = new SftpCallable<InputStream>() {
+ private ChannelSftp newChannel() throws IOException {
+ synchronized (GetInputStreamCommand.class) {
+ if (commandChannel != null) {
+ try {
+ thisChannel = getConnection().newSftpChannel();
+ return thisChannel;
+ } catch (RemoteConnectionException e) {
+ throw new IOException(e.getMessage());
+ }
+ }
+ thisChannel = commandChannel = getChannel();
+ return commandChannel;
+ }
+ }
+
@Override
public InputStream call() throws JSchException, SftpException, IOException {
- try {
- return getConnection().getSftpChannel().get(fRemotePath.toString(),
- new CommandProgressMonitor(NLS.bind(Messages.GetInputStreamCommand_Receiving, fRemotePath.toString()), getProgressMonitor()));
- } catch (RemoteConnectionException e) {
- throw new IOException(e.getMessage());
- }
+ return newChannel().get(fRemotePath.toString(), new CommandProgressMonitor(
+ NLS.bind(Messages.GetInputStreamCommand_Receiving, fRemotePath.toString()), getProgressMonitor()));
}
};
try {
- return c.getResult(subMon.newChild(10));
+ final InputStream stream = c.getResult(subMon.newChild(10));
+ return new InputStream() {
+ @Override
+ public int read() throws IOException {
+ return stream.read();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.io.InputStream#close()
+ */
+ @Override
+ public void close() throws IOException {
+ stream.close();
+ synchronized (GetInputStreamCommand.class) {
+ if (thisChannel != commandChannel) {
+ thisChannel.disconnect();
+ } else {
+ commandChannel = null;
+ }
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.io.InputStream#read(byte[])
+ */
+ @Override
+ public int read(byte[] b) throws IOException {
+ return stream.read(b);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.io.InputStream#read(byte[], int, int)
+ */
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return stream.read(b, off, len);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.io.InputStream#skip(long)
+ */
+ @Override
+ public long skip(long n) throws IOException {
+ return stream.skip(n);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.io.InputStream#available()
+ */
+ @Override
+ public int available() throws IOException {
+ return stream.available();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.io.InputStream#mark(int)
+ */
+ @Override
+ public synchronized void mark(int readlimit) {
+ stream.mark(readlimit);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.io.InputStream#reset()
+ */
+ @Override
+ public synchronized void reset() throws IOException {
+ stream.reset();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.io.InputStream#markSupported()
+ */
+ @Override
+ public boolean markSupported() {
+ return stream.markSupported();
+ }
+ };
} catch (SftpException e) {
throw new RemoteConnectionException(e.getMessage());
}
diff --git a/tests/org.eclipse.remote.jsch.tests/src/org/eclipse/remote/jsch/tests/FileStoreTests.java b/tests/org.eclipse.remote.jsch.tests/src/org/eclipse/remote/jsch/tests/FileStoreTests.java
index 8c1e3ed..62d0dda 100644
--- a/tests/org.eclipse.remote.jsch.tests/src/org/eclipse/remote/jsch/tests/FileStoreTests.java
+++ b/tests/org.eclipse.remote.jsch.tests/src/org/eclipse/remote/jsch/tests/FileStoreTests.java
@@ -9,8 +9,6 @@
import java.io.OutputStreamWriter;
import java.net.URI;
-import junit.framework.TestCase;
-
import org.eclipse.core.filesystem.EFS;
import org.eclipse.core.filesystem.IFileInfo;
import org.eclipse.core.filesystem.IFileStore;
@@ -24,6 +22,8 @@
import org.eclipse.remote.core.IRemoteServicesManager;
import org.eclipse.remote.internal.jsch.core.JSchConnection;
+import junit.framework.TestCase;
+
public class FileStoreTests extends TestCase {
private static final String CONNECTION_NAME = "test_connection";
private static final String USERNAME = "test";
@@ -33,7 +33,9 @@
private static final String REMOTE_DIR = "/tmp/ptp_" + USERNAME + "/filestore_tests";
private static final String LOCAL_FILE = "local_file";
private static final String REMOTE_FILE = "remote_file";
+ private static final String REMOTE_FILE2 = "remote_file2";
private static final String TEST_STRING = "a string containing fairly *()(*&^$%## random text";
+ private static final String TEST_STRING2 = "a different string containing fairly *()(*&^$%## random text";
private IRemoteConnectionType fConnectionType;
private IRemoteConnection fRemoteConnection;
@@ -82,6 +84,67 @@
}
}
+ public void testMultiStreams() {
+ IFileStore remoteFileStore = fRemoteDir.getChild(REMOTE_FILE);
+ IFileStore remoteFileStore2 = fRemoteDir.getChild(REMOTE_FILE2);
+
+ try {
+ createFile(remoteFileStore, TEST_STRING);
+ createFile(remoteFileStore2, TEST_STRING2);
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+
+ assertTrue(remoteFileStore.fetchInfo().exists());
+ assertTrue(remoteFileStore2.fetchInfo().exists());
+
+ /*
+ * Check how many streams we can open
+ */
+ InputStream streams[] = new InputStream[100];
+ int streamCount = 0;
+
+ for (; streamCount < streams.length; streamCount++) {
+ try {
+ streams[streamCount] = remoteFileStore.openInputStream(EFS.NONE, null);
+ } catch (Exception e) {
+ if (!e.getMessage().endsWith("channel is not opened.")) {
+ fail(e.getMessage());
+ }
+ break;
+ }
+ }
+
+ for (int i = 0; i < streamCount; i++) {
+ try {
+ streams[i].close();
+ } catch (IOException e) {
+ // No need to deal with this
+ }
+ }
+
+ for (int i = 0; i < streamCount / 2; i++) {
+ try {
+ InputStream stream = remoteFileStore.openInputStream(EFS.NONE, null);
+ assertNotNull(stream);
+ BufferedReader buf = new BufferedReader(new InputStreamReader(stream));
+ String line = buf.readLine().trim();
+ assertTrue(line.equals(TEST_STRING));
+
+ InputStream stream2 = remoteFileStore2.openInputStream(EFS.NONE, null);
+ assertNotNull(stream2);
+ BufferedReader buf2 = new BufferedReader(new InputStreamReader(stream2));
+ String line2 = buf2.readLine().trim();
+ assertTrue(line2.equals(TEST_STRING2));
+
+ stream.close();
+ stream2.close();
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+ }
+
public void testCopy() {
final IFileStore localFileStore = fLocalDir.getChild(LOCAL_FILE);
final IFileStore remoteFileStore = fRemoteDir.getChild(REMOTE_FILE);