ssh channel pool
diff --git a/rse/plugins/org.eclipse.dltk.ssh.core/src/org/eclipse/dltk/ssh/core/SshConnectionManager.java b/rse/plugins/org.eclipse.dltk.ssh.core/src/org/eclipse/dltk/ssh/core/SshConnectionManager.java
index ae10e80..89ae9b2 100644
--- a/rse/plugins/org.eclipse.dltk.ssh.core/src/org/eclipse/dltk/ssh/core/SshConnectionManager.java
+++ b/rse/plugins/org.eclipse.dltk.ssh.core/src/org/eclipse/dltk/ssh/core/SshConnectionManager.java
@@ -46,8 +46,7 @@
Collection<SshConnection> values = connections.values();
for (ISshConnection connection : values) {
connection.disconnect();
- connection.setDisabled(60 * 1000 * 1000 * 1000); // Disable for
- // ever.
+ connection.setDisabled(60 * 1000 * 1000 * 1000); // Disable forever.
}
}
}
diff --git a/rse/plugins/org.eclipse.dltk.ssh.core/src/org/eclipse/dltk/ssh/internal/core/ChannelPool.java b/rse/plugins/org.eclipse.dltk.ssh.core/src/org/eclipse/dltk/ssh/internal/core/ChannelPool.java
new file mode 100644
index 0000000..d3760cf
--- /dev/null
+++ b/rse/plugins/org.eclipse.dltk.ssh.core/src/org/eclipse/dltk/ssh/internal/core/ChannelPool.java
@@ -0,0 +1,240 @@
+/*******************************************************************************
+ * Copyright (c) 2009 xored software, Inc.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * xored software, Inc. - initial API and Implementation (Alex Panchenko)
+ *******************************************************************************/
+package org.eclipse.dltk.ssh.internal.core;
+
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.eclipse.jsch.core.IJSchService;
+
+import com.jcraft.jsch.ChannelSftp;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import com.jcraft.jsch.UIKeyboardInteractive;
+import com.jcraft.jsch.UserInfo;
+
+public class ChannelPool {
+
+ private final String userName;
+ private final int port;
+ private final String hostName;
+
+ private String password;
+
+ private Session session;
+ private final List<ChannelSftp> freeChannels = new ArrayList<ChannelSftp>();
+ private final Map<ChannelSftp, ChannelUsageInfo> usedChannels = new IdentityHashMap<ChannelSftp, ChannelUsageInfo>();
+
+ private static class ChannelUsageInfo {
+ private final Object context;
+
+ public ChannelUsageInfo(Object context) {
+ this.context = context;
+ }
+
+ }
+
+ /**
+ * @param userName
+ * @param hostName
+ * @param port
+ */
+ public ChannelPool(String userName, String hostName, int port) {
+ this.userName = userName;
+ this.hostName = hostName;
+ this.port = port;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ private final class LocalUserInfo implements UserInfo,
+ UIKeyboardInteractive {
+ public void showMessage(String arg0) {
+ }
+
+ public boolean promptYesNo(String arg0) {
+ return false;
+ }
+
+ public boolean promptPassword(String arg0) {
+ return true;
+ }
+
+ public boolean promptPassphrase(String arg0) {
+ return false;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public String getPassphrase() {
+ return ""; //$NON-NLS-1$
+ }
+
+ public String[] promptKeyboardInteractive(String destination,
+ String name, String instruction, String[] prompt, boolean[] echo) {
+ final String p = password;
+ return p != null ? new String[] { p } : null;
+ }
+ }
+
+ protected synchronized void connectSession() throws JSchException {
+ if (session == null) {
+ IJSchService service = Activator.getDefault().getJSch();
+ session = service.createSession(hostName, port, userName);
+ session.setTimeout(0);
+ session.setServerAliveInterval(300000);
+ session.setServerAliveCountMax(6);
+ session.setPassword(password); // Set password
+ // directly
+ UserInfo ui = new LocalUserInfo();
+ session.setUserInfo(ui);
+ }
+
+ if (!session.isConnected()) {
+ // Connect with default timeout
+ session.connect(60 * 1000);
+ }
+ }
+
+ protected ChannelSftp acquireChannel(final Object context, int tryCount) {
+ for (;;) {
+ try {
+ return acquireChannel(context);
+ } catch (JSchException ex) {
+ Activator.log(ex);
+ if (--tryCount <= 0) {
+ return null;
+ }
+ }
+ }
+ }
+
+ protected synchronized ChannelSftp acquireChannel(Object context)
+ throws JSchException {
+ try {
+ connectSession();
+ while (!freeChannels.isEmpty()) {
+ final ChannelSftp channel = freeChannels.remove(freeChannels
+ .size() - 1);
+ if (channel.isConnected()) {
+ usedChannels.put(channel, createUsageInfo(context));
+ return channel;
+ }
+ }
+ final ChannelSftp channel = (ChannelSftp) session
+ .openChannel("sftp"); //$NON-NLS-1$
+ if (!channel.isConnected()) {
+ channel.connect();
+ }
+ usedChannels.put(channel, createUsageInfo(context));
+ return channel;
+ } catch (JSchException e) {
+ // String eToStr = e.toString();
+ boolean needLog = true;
+ // if (eToStr.indexOf("Auth cancel") >= 0 || eToStr.indexOf("Auth fail") >= 0 || eToStr.indexOf("session is down") >= 0) { //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+ // if (session.isConnected()) {
+ // session.disconnect();
+ // session = null;
+ // }
+ // }
+ disconnect();
+ if (needLog) {
+ Activator.error("Failed to create direct connection", e); //$NON-NLS-1$
+ }
+ throw e;
+ // if (session != null) {
+ // session.disconnect();
+ // session = null;
+ // }
+ }
+ }
+
+ /**
+ * @return
+ */
+ private ChannelUsageInfo createUsageInfo(Object context) {
+ return new ChannelUsageInfo(context);
+ }
+
+ protected void releaseChannel(ChannelSftp channel) {
+ usedChannels.remove(channel);
+ freeChannels.add(channel);
+ }
+
+ public void disconnect() {
+ for (ChannelSftp channel : freeChannels) {
+ channel.disconnect();
+ }
+ freeChannels.clear();
+ // TODO log used connections
+ for (Map.Entry<ChannelSftp, ChannelUsageInfo> entry : usedChannels
+ .entrySet()) {
+ entry.getKey().disconnect();
+ }
+ usedChannels.clear();
+ if (session != null) {
+ session.disconnect();
+ session = null;
+ }
+ }
+
+ public boolean isConnected() {
+ return session != null && session.isConnected();
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result
+ + ((hostName == null) ? 0 : hostName.hashCode());
+ result = prime * result + port;
+ result = prime * result
+ + ((userName == null) ? 0 : userName.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ ChannelPool other = (ChannelPool) obj;
+ if (hostName == null) {
+ if (other.hostName != null)
+ return false;
+ } else if (!hostName.equals(other.hostName))
+ return false;
+ if (port != other.port)
+ return false;
+ if (userName == null) {
+ if (other.userName != null)
+ return false;
+ } else if (!userName.equals(other.userName))
+ return false;
+ return true;
+ }
+
+}
diff --git a/rse/plugins/org.eclipse.dltk.ssh.core/src/org/eclipse/dltk/ssh/internal/core/SshConnection.java b/rse/plugins/org.eclipse.dltk.ssh.core/src/org/eclipse/dltk/ssh/internal/core/SshConnection.java
index 2e524d9..62d3350 100644
--- a/rse/plugins/org.eclipse.dltk.ssh.core/src/org/eclipse/dltk/ssh/internal/core/SshConnection.java
+++ b/rse/plugins/org.eclipse.dltk.ssh.core/src/org/eclipse/dltk/ssh/internal/core/SshConnection.java
@@ -12,60 +12,28 @@
import org.eclipse.core.runtime.Path;
import org.eclipse.dltk.ssh.core.ISshConnection;
import org.eclipse.dltk.ssh.core.ISshFileHandle;
-import org.eclipse.jsch.core.IJSchService;
import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSchException;
-import com.jcraft.jsch.Session;
import com.jcraft.jsch.SftpATTRS;
import com.jcraft.jsch.SftpException;
-import com.jcraft.jsch.UIKeyboardInteractive;
-import com.jcraft.jsch.UserInfo;
import com.jcraft.jsch.ChannelSftp.LsEntry;
/**
* TODO: Add correct operation synchronization.
*
*/
-public class SshConnection implements ISshConnection {
+public class SshConnection extends ChannelPool implements ISshConnection {
private long disabledTime = 0;
- private final class LocalUserInfo implements UserInfo,
- UIKeyboardInteractive {
- public void showMessage(String arg0) {
- }
-
- public boolean promptYesNo(String arg0) {
- return false;
- }
-
- public boolean promptPassword(String arg0) {
- return true;
- }
-
- public boolean promptPassphrase(String arg0) {
- return false;
- }
-
- public String getPassword() {
- return password;
- }
-
- public String getPassphrase() {
- return ""; //$NON-NLS-1$
- }
-
- public String[] promptKeyboardInteractive(String destination,
- String name, String instruction, String[] prompt, boolean[] echo) {
- final String p = password;
- return p != null ? new String[] { p } : null;
- }
- }
-
private static abstract class Operation {
boolean finished = false;
- public abstract void perform() throws JSchException, SftpException;
+ public boolean isLongRunning() {
+ return false;
+ }
+
+ public abstract void perform(ChannelSftp channel) throws SftpException;
public void setFinished() {
finished = true;
@@ -75,11 +43,9 @@
return finished;
}
- public void failed() {
- }
}
- private class GetStatOperation extends Operation {
+ private static class GetStatOperation extends Operation {
private IPath path;
private SftpATTRS attrs;
@@ -93,8 +59,8 @@
}
@Override
- public void perform() throws JSchException, SftpException {
- attrs = getChannel().stat(path.toString());
+ public void perform(ChannelSftp channel) throws SftpException {
+ attrs = channel.stat(path.toString());
}
public SftpATTRS getAttrs() {
@@ -102,7 +68,7 @@
}
}
- private class ResolveLinkOperation extends Operation {
+ private static class ResolveLinkOperation extends Operation {
private IPath path;
private IPath resolvedPath;
@@ -116,7 +82,7 @@
}
@Override
- public void perform() throws JSchException, SftpException {
+ public void perform(ChannelSftp channel) throws SftpException {
SftpATTRS attrs = channel.stat(path.toString());
boolean isRoot = (path.segmentCount() == 0);
String linkTarget = null;
@@ -168,6 +134,8 @@
}
}
+ private static final int STREAM_BUFFER_SIZE = 32000;
+
private class GetOperation extends Operation {
private IPath path;
private InputStream stream;
@@ -177,9 +145,14 @@
}
@Override
- public void perform() throws JSchException, SftpException {
- stream = channel.get(path.toString());
- performStreamOperation = true;
+ public boolean isLongRunning() {
+ return true;
+ }
+
+ @Override
+ public void perform(ChannelSftp channel) throws SftpException {
+ stream = new GetOperationInputStream(channel.get(path.toString()),
+ channel);
}
@Override
@@ -187,34 +160,30 @@
return "Get input stream for file:" + path; //$NON-NLS-1$
}
- @Override
- public void failed() {
- // channel.disconnect();
- // channel = null;
- }
-
public InputStream getStream() {
- if (stream != null) {
- InputStream wrapperStream = new BufferedInputStream(stream,
- 32000) {
- @Override
- public void close() throws IOException {
- super.close();
- // channel.disconnect();
- // channel = null;
- doneStreamOperation();
- }
- };
- return wrapperStream;
- }
return stream;
}
}
- protected synchronized void doneStreamOperation() {
- performStreamOperation = false;
- notifyAll();
+ private class GetOperationInputStream extends BufferedInputStream {
+
+ private final ChannelSftp channel;
+
+ public GetOperationInputStream(InputStream in, ChannelSftp channel) {
+ super(in, STREAM_BUFFER_SIZE);
+ this.channel = channel;
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ super.close();
+ } finally {
+ releaseChannel(channel);
+ }
+ }
+
}
private class PutOperation extends Operation {
@@ -231,30 +200,44 @@
}
@Override
- public void perform() throws JSchException, SftpException {
- stream = channel.put(path.toString(), ChannelSftp.OVERWRITE);
- performStreamOperation = true;
+ public boolean isLongRunning() {
+ return true;
+ }
+
+ @Override
+ public void perform(ChannelSftp channel) throws SftpException {
+ stream = new PutOperationOutputStream(channel.put(path.toString(),
+ ChannelSftp.OVERWRITE), channel);
}
public OutputStream getStream() {
- if (stream != null) {
- OutputStream wrapperStream = new BufferedOutputStream(stream,
- 32000) {
- @Override
- public void close() throws IOException {
- super.close();
- channel.disconnect();
- channel = null;
- doneStreamOperation();
- }
- };
- return wrapperStream;
- }
return stream;
}
+
}
- private class ListFolderOperation extends Operation {
+ private class PutOperationOutputStream extends BufferedOutputStream {
+ private final ChannelSftp channel;
+
+ public PutOperationOutputStream(OutputStream out, ChannelSftp channel) {
+ super(out, STREAM_BUFFER_SIZE);
+ this.channel = channel;
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ super.close();
+ } finally {
+ releaseChannel(channel);
+ // TODO channel.disconnect();
+ // TODO channel = null;
+ }
+ }
+
+ }
+
+ private static class ListFolderOperation extends Operation {
private IPath path;
private Vector<LsEntry> v;
@@ -269,8 +252,8 @@
@Override
@SuppressWarnings("unchecked")
- public void perform() throws JSchException, SftpException {
- v = getChannel().ls(path.toString());
+ public void perform(ChannelSftp channel) throws SftpException {
+ v = channel.ls(path.toString());
}
public Vector<LsEntry> getVector() {
@@ -280,160 +263,64 @@
private static final int DEFAULT_RETRY_COUNT = 2;
- // private static final long TIMEOUT = 3000; // One second timeout
-
- private Session session;
- private String userName;
- private String password;
- private int port;
- private ChannelSftp channel;
-
- private String hostName;
-
public SshConnection(String userName, String hostName, int port) {
- this.userName = userName;
- this.hostName = hostName;
- this.port = port;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.eclipse.dltk.ssh.core.ISshConnection#setPassword(java.lang.String)
- */
- public void setPassword(String password) {
- this.password = password;
+ super(userName, hostName, port);
}
public boolean connect() {
- return connect(0);
- }
-
- public synchronized boolean connect(int trycount) {
try {
- if (session == null) {
- IJSchService service = Activator.getDefault().getJSch();
- session = service.createSession(hostName, port, userName);
- session.setTimeout(0);
- session.setServerAliveInterval(300000);
- session.setServerAliveCountMax(6);
- session.setPassword(password); // Set password directly
- UserInfo ui = new LocalUserInfo();
- session.setUserInfo(ui);
- }
-
- if (!session.isConnected()) {
- session.connect(60 * 1000); // Connect with defautl timeout
- }
-
- if (channel == null) {
- channel = (ChannelSftp) session.openChannel("sftp"); //$NON-NLS-1$
- }
- if (!channel.isConnected()) {
- channel.connect();
+ final ChannelSftp channel = acquireChannel("connect()"); //$NON-NLS-1$
+ try {
+ return true;
+ } finally {
+ releaseChannel(channel);
}
} catch (JSchException e) {
- String eToStr = e.toString();
- boolean needLog = true;
- if (eToStr.indexOf("Auth cancel") >= 0 || eToStr.indexOf("Auth fail") >= 0 || eToStr.indexOf("session is down") >= 0) { //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
- if (session.isConnected()) {
- session.disconnect();
- session = null;
- }
- }
- if (session != null) {
- session.disconnect();
- session = null;
- }
- if (needLog) {
- Activator.error("Failed to create direct connection", e); //$NON-NLS-1$
- }
- }
- if (session == null || !session.isConnected() || channel == null
- || !channel.isConnected()) {
- if (trycount > 0) {
- if (session == null || !session.isConnected()) {
- session = null;
- }
- channel = null;
- return connect(trycount - 1);
- } else {
- // Lets disable connection for a while.
- setDisabled(1000 * 10); // 10 seconds
- // seconds
- }
return false;
- } else {
- return true;
}
}
- /*
- * (non-Javadoc)
- *
- * @see org.eclipse.dltk.ssh.core.ISshConnection#disconnect()
- */
- public void disconnect() {
- if (channel != null && channel.isConnected()) {
- channel.disconnect();
- channel = null;
- }
- if (session != null && session.isConnected()) {
- session.disconnect();
- session = null;
- }
+ private void performOperation(final Operation op) {
+ performOperation(op, DEFAULT_RETRY_COUNT);
}
- private boolean performStreamOperation = false;
-
- private synchronized void performOperation(final Operation op, int tryCount) {
- while (performStreamOperation) {
+ private void performOperation(final Operation op, int tryCount) {
+ final ChannelSftp channel = acquireChannel(op, tryCount);
+ if (channel != null) {
+ boolean badChannel = false;
try {
- this.wait(10);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- try {
- connect();
- op.perform();
- op.setFinished();
- } catch (JSchException ex) {
- Activator.log(ex);
- // if (!channel.isConnected()) {
- if (tryCount > 0) {
- performOperation(op, tryCount - 1);
- }
- // }
- } catch (SftpException e) {
- if (e.id != ChannelSftp.SSH_FX_NO_SUCH_FILE) {
- if (e.id == ChannelSftp.SSH_FX_PERMISSION_DENIED) {
- Activator.log("Permission denied to perform:" //$NON-NLS-1$
- + op.toString());
- } else {
+ op.perform(channel);
+ op.setFinished();
+ } catch (SftpException e) {
+ if (e.id == ChannelSftp.SSH_FX_FAILURE
+ && e.getCause() instanceof JSchException) {
Activator.log(e);
+ badChannel = true;
+ disconnect();
+ if (tryCount > 0) {
+ performOperation(op, tryCount - 1);
+ }
+ } else if (e.id != ChannelSftp.SSH_FX_NO_SUCH_FILE) {
+ if (e.id == ChannelSftp.SSH_FX_PERMISSION_DENIED) {
+ Activator.log("Permission denied to perform:" //$NON-NLS-1$
+ + op.toString());
+ } else {
+ Activator.log(e);
+ }
+ }
+ } finally {
+ if (!badChannel && !op.isLongRunning()) {
+ releaseChannel(channel);
}
}
-
- } finally {
- if (!op.isFinished()) {
- op.failed();
- }
}
-
- }
-
- private ChannelSftp getChannel() {
- return channel;
}
/*
* (non-Javadoc)
*
- * @see
- * org.eclipse.dltk.ssh.core.ISshConnection#getHandle(org.eclipse.core.runtime
- * .IPath)
+ * @see org.eclipse.dltk.ssh.core.ISshConnection#getHandle(org.eclipse.core
+ * .runtime .IPath)
*/
public ISshFileHandle getHandle(IPath path) throws Exception {
if (isDisabled()) {
@@ -449,15 +336,15 @@
public boolean isDisabled() {
return disabledTime > System.currentTimeMillis();
- };
+ }
public void setDisabled(int timeout) {
disabledTime = System.currentTimeMillis() + timeout;
- };
+ }
SftpATTRS getAttrs(IPath path) {
GetStatOperation op = new GetStatOperation(path);
- performOperation(op, DEFAULT_RETRY_COUNT);
+ performOperation(op);
if (op.isFinished()) {
return op.getAttrs();
}
@@ -466,7 +353,7 @@
IPath getResolvedPath(IPath path) {
ResolveLinkOperation op = new ResolveLinkOperation(path);
- performOperation(op, DEFAULT_RETRY_COUNT);
+ performOperation(op);
if (op.isFinished()) {
return op.getResolvedPath();
}
@@ -475,7 +362,7 @@
Vector<LsEntry> list(IPath path) {
ListFolderOperation op = new ListFolderOperation(path);
- performOperation(op, DEFAULT_RETRY_COUNT);
+ performOperation(op);
if (op.isFinished()) {
return op.getVector();
}
@@ -485,43 +372,42 @@
void setLastModified(final IPath path, final long timestamp) {
Operation op = new Operation() {
@Override
- public void perform() throws JSchException, SftpException {
+ public void perform(ChannelSftp channel) throws SftpException {
Date date = new Date(timestamp);
System.out.println(date.toString());
- getChannel().setMtime(path.toString(),
- (int) (timestamp / 1000L));
+ channel.setMtime(path.toString(), (int) (timestamp / 1000L));
}
};
- performOperation(op, DEFAULT_RETRY_COUNT);
+ performOperation(op);
}
void delete(final IPath path, final boolean dir) {
Operation op = new Operation() {
@Override
- public void perform() throws JSchException, SftpException {
+ public void perform(ChannelSftp channel) throws SftpException {
if (!dir) {
- getChannel().rm(path.toString());
+ channel.rm(path.toString());
} else {
- getChannel().rmdir(path.toString());
+ channel.rmdir(path.toString());
}
}
};
- performOperation(op, DEFAULT_RETRY_COUNT);
+ performOperation(op);
}
void mkdir(final IPath path) {
Operation op = new Operation() {
@Override
- public void perform() throws JSchException, SftpException {
- getChannel().mkdir(path.toString());
+ public void perform(ChannelSftp channel) throws SftpException {
+ channel.mkdir(path.toString());
}
};
- performOperation(op, DEFAULT_RETRY_COUNT);
+ performOperation(op);
}
InputStream get(IPath path) {
GetOperation op = new GetOperation(path);
- performOperation(op, DEFAULT_RETRY_COUNT);
+ performOperation(op);
if (op.isFinished()) {
return op.getStream();
}
@@ -530,54 +416,11 @@
OutputStream put(IPath path) {
PutOperation op = new PutOperation(path);
- performOperation(op, DEFAULT_RETRY_COUNT);
+ performOperation(op);
if (op.isFinished()) {
return op.getStream();
}
return null;
}
- public boolean isConnected() {
- if (session != null) {
- return session.isConnected();
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result
- + ((hostName == null) ? 0 : hostName.hashCode());
- result = prime * result + port;
- result = prime * result
- + ((userName == null) ? 0 : userName.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- SshConnection other = (SshConnection) obj;
- if (hostName == null) {
- if (other.hostName != null)
- return false;
- } else if (!hostName.equals(other.hostName))
- return false;
- if (port != other.port)
- return false;
- if (userName == null) {
- if (other.userName != null)
- return false;
- } else if (!userName.equals(other.userName))
- return false;
- return true;
- }
-
}