[245847] reduce warnings
diff --git a/plugins/org.eclipse.actf.util.httpproxy/src/org/eclipse/actf/util/internal/httpproxy/ObjectPoolImpl.java b/plugins/org.eclipse.actf.util.httpproxy/src/org/eclipse/actf/util/internal/httpproxy/ObjectPoolImpl.java
index fb9ac81..3946609 100644
--- a/plugins/org.eclipse.actf.util.httpproxy/src/org/eclipse/actf/util/internal/httpproxy/ObjectPoolImpl.java
+++ b/plugins/org.eclipse.actf.util.httpproxy/src/org/eclipse/actf/util/internal/httpproxy/ObjectPoolImpl.java
@@ -9,56 +9,52 @@
* Hisashi MIYASHITA - initial API and implementation
*******************************************************************************/
-
package org.eclipse.actf.util.internal.httpproxy;
import java.util.ArrayList;
-
-
-
-
public class ObjectPoolImpl implements IObjectPool {
- private final String name;
- private ArrayList pool;
- private int waitInLine;
+ private final String name;
+ private ArrayList<Object> pool;
+ private int waitInLine;
- synchronized public boolean add(Object o) {
- boolean flag = pool.add(o);
- if (flag && (waitInLine > 0)) notify();
- return flag;
- }
+ synchronized public boolean add(Object o) {
+ boolean flag = pool.add(o);
+ if (flag && (waitInLine > 0))
+ notify();
+ return flag;
+ }
- synchronized public Object take() {
- if (pool.size() == 0) return null;
- return pool.remove(0);
- }
+ synchronized public Object take() {
+ if (pool.size() == 0)
+ return null;
+ return pool.remove(0);
+ }
- synchronized public Object take(int timeout)
- throws InterruptedException {
- long startTime = System.currentTimeMillis();
+ synchronized public Object take(int timeout) throws InterruptedException {
+ long startTime = System.currentTimeMillis();
- waitInLine++;
- while (pool.size() == 0) {
- wait(timeout);
- if (timeout > 0) {
- if ((System.currentTimeMillis() - startTime) > timeout) {
- waitInLine--;
- return null;
- }
- }
- }
- waitInLine--;
- return pool.remove(0);
- }
+ waitInLine++;
+ while (pool.size() == 0) {
+ wait(timeout);
+ if (timeout > 0) {
+ if ((System.currentTimeMillis() - startTime) > timeout) {
+ waitInLine--;
+ return null;
+ }
+ }
+ }
+ waitInLine--;
+ return pool.remove(0);
+ }
- public ObjectPoolImpl(String name) {
- this.name = name;
- this.pool = new ArrayList();
- this.waitInLine = 0;
- }
+ public ObjectPoolImpl(String name) {
+ this.name = name;
+ this.pool = new ArrayList<Object>();
+ this.waitInLine = 0;
+ }
- public String getName() {
- return name;
- }
+ public String getName() {
+ return name;
+ }
}
diff --git a/plugins/org.eclipse.actf.util.httpproxy/src/org/eclipse/actf/util/internal/httpproxy/WorkpileControllerImpl.java b/plugins/org.eclipse.actf.util.httpproxy/src/org/eclipse/actf/util/internal/httpproxy/WorkpileControllerImpl.java
index 93a97e4..d7d8339 100644
--- a/plugins/org.eclipse.actf.util.httpproxy/src/org/eclipse/actf/util/internal/httpproxy/WorkpileControllerImpl.java
+++ b/plugins/org.eclipse.actf.util.httpproxy/src/org/eclipse/actf/util/internal/httpproxy/WorkpileControllerImpl.java
@@ -9,37 +9,32 @@
* Hisashi MIYASHITA - initial API and implementation
*******************************************************************************/
-
package org.eclipse.actf.util.internal.httpproxy;
import java.util.ArrayList;
-
-
-
-
public class WorkpileControllerImpl implements IWorkpileController {
- private final String name;
- private final ThreadGroup threadGroup;
- private final ArrayList workpile;
+ private final String name;
+ private final ThreadGroup threadGroup;
+ private final ArrayList<Thread> workpile;
- public void input(Runnable work) {
- Thread th;
- synchronized (this) {
- th = new Thread(threadGroup, work, name + "-" + workpile.size());
- workpile.add(th);
- }
- th.setDaemon(true);
- th.start();
- }
+ public void input(Runnable work) {
+ Thread th;
+ synchronized (this) {
+ th = new Thread(threadGroup, work, name + "-" + workpile.size());
+ workpile.add(th);
+ }
+ th.setDaemon(true);
+ th.start();
+ }
- public String toString() {
- return "WPC:[" + name + "]:" + threadGroup.toString();
- }
+ public String toString() {
+ return "WPC:[" + name + "]:" + threadGroup.toString();
+ }
- public WorkpileControllerImpl(String name) {
- this.name = name;
- this.threadGroup = new ThreadGroup(name);
- this.workpile = new ArrayList();
- }
+ public WorkpileControllerImpl(String name) {
+ this.name = name;
+ this.threadGroup = new ThreadGroup(name);
+ this.workpile = new ArrayList<Thread>();
+ }
}
diff --git a/plugins/org.eclipse.actf.util.httpproxy/src/org/eclipse/actf/util/internal/httpproxy/core/FixedSizeQueue.java b/plugins/org.eclipse.actf.util.httpproxy/src/org/eclipse/actf/util/internal/httpproxy/core/FixedSizeQueue.java
index bc109d0..7d50990 100644
--- a/plugins/org.eclipse.actf.util.httpproxy/src/org/eclipse/actf/util/internal/httpproxy/core/FixedSizeQueue.java
+++ b/plugins/org.eclipse.actf.util.httpproxy/src/org/eclipse/actf/util/internal/httpproxy/core/FixedSizeQueue.java
@@ -14,158 +14,160 @@
import org.eclipse.actf.util.httpproxy.core.TimeoutException;
-
public class FixedSizeQueue {
- private Object[] fQueue;
- private int fSize;
- private int fNextIndex = 0;
- private int fMinIndex = 0;
-
- public FixedSizeQueue(int queueSize) {
- fQueue = new Object[queueSize];
- fSize = queueSize;
- }
-
- public int getQueueSize() {
- return fSize;
- }
+ private Object[] fQueue;
+ private int fSize;
+ private int fNextIndex = 0;
+ private int fMinIndex = 0;
- public synchronized void clear() {
- if (fMinIndex != fNextIndex) {
- do {
- fQueue[fMinIndex++] = null;
- if (fMinIndex > fSize) {
- fMinIndex = 0;
- }
- } while (fMinIndex != fNextIndex);
- }
- fNextIndex = 0;
- fMinIndex = 0;
- fQueue[0] = null;
- }
-
- public synchronized boolean isEmpty() {
- return (fNextIndex == fMinIndex);
- }
-
- public synchronized int getSize() {
- int d = fNextIndex - fMinIndex;
- return (d >= 0) ? d : d + fSize;
- }
-
- public synchronized void put(Object obj) throws InterruptedException {
- if (obj == null) {
- throw new IllegalArgumentException("null");
- }
- while (fQueue[fNextIndex] != null) {
- this.wait();
- }
- fQueue[fNextIndex++] = obj;
- if (fNextIndex >= fSize) {
- fNextIndex = 0;
- }
- this.notifyAll();
- }
-
- public synchronized void put(Object obj, long timeout) throws TimeoutException, InterruptedException {
- if (timeout == 0) {
- put(obj);
- } else {
- if (obj == null) {
- throw new IllegalArgumentException("null");
- }
- if (fQueue[fNextIndex] != null) {
- long t0 = System.currentTimeMillis();
- long wait = timeout;
- boolean timedout = true;
- do {
- this.wait(wait);
- if (fQueue[fNextIndex] == null) {
- timedout = false;
- break;
- }
- long elapsed = System.currentTimeMillis() - t0;
- wait = timeout - elapsed;
- } while (wait > 0);
- if (timedout) {
- throw new TimeoutException("FixedSizeQueue.put");
- }
- }
- fQueue[fNextIndex++] = obj;
- if (fNextIndex >= fSize) {
- fNextIndex = 0;
- }
- this.notifyAll();
- }
- }
+ public FixedSizeQueue(int queueSize) {
+ fQueue = new Object[queueSize];
+ fSize = queueSize;
+ }
- public synchronized Object remove() throws InterruptedException {
- Object req;
- while ((req = fQueue[fMinIndex]) == null) {
- this.wait();
- }
- fQueue[fMinIndex++] = null;
- if (fMinIndex >= fSize) {
- fMinIndex = 0;
- }
- this.notifyAll();
- return req;
- }
-
- public synchronized Object remove(long timeout) throws TimeoutException, InterruptedException {
- Object req = fQueue[fMinIndex];
- if (req == null) {
- long t0 = System.currentTimeMillis();
- long wait = timeout;
- do {
- this.wait(wait);
- if ((req = fQueue[fMinIndex]) != null) {
- break;
- }
- long elapsed = System.currentTimeMillis() - t0;
- wait = timeout - elapsed;
- } while (wait > 0);
- if (req == null) {
- throw new TimeoutException("FixedSizeQueue.remove");
- }
- }
- fQueue[fMinIndex++] = null;
- if (fMinIndex >= fSize) {
- fMinIndex = 0;
- }
- this.notifyAll();
- return req;
- }
-
- public synchronized Object nonBlockingRemove() {
- Object req = fQueue[fMinIndex];
- if (req != null) {
- fQueue[fMinIndex++] = null;
- if (fMinIndex >= fSize) {
- fMinIndex = 0;
- }
- this.notifyAll();
- }
- return req;
- }
-
- public synchronized Object matchAndRemove(Object o, Comparator comparator) {
- if (o == null) {
- throw new IllegalArgumentException();
- }
- Object req = fQueue[fMinIndex];
- if (req == null) {
- return null;
- }
- if (comparator.compare(o, req) == 0) {
- fQueue[fMinIndex++] = null;
- if (fMinIndex >= fSize) {
- fMinIndex = 0;
- }
- this.notifyAll();
- return req;
- } else {
- return null;
- }
- }
+ public int getQueueSize() {
+ return fSize;
+ }
+
+ public synchronized void clear() {
+ if (fMinIndex != fNextIndex) {
+ do {
+ fQueue[fMinIndex++] = null;
+ if (fMinIndex > fSize) {
+ fMinIndex = 0;
+ }
+ } while (fMinIndex != fNextIndex);
+ }
+ fNextIndex = 0;
+ fMinIndex = 0;
+ fQueue[0] = null;
+ }
+
+ public synchronized boolean isEmpty() {
+ return (fNextIndex == fMinIndex);
+ }
+
+ public synchronized int getSize() {
+ int d = fNextIndex - fMinIndex;
+ return (d >= 0) ? d : d + fSize;
+ }
+
+ public synchronized void put(Object obj) throws InterruptedException {
+ if (obj == null) {
+ throw new IllegalArgumentException("null");
+ }
+ while (fQueue[fNextIndex] != null) {
+ this.wait();
+ }
+ fQueue[fNextIndex++] = obj;
+ if (fNextIndex >= fSize) {
+ fNextIndex = 0;
+ }
+ this.notifyAll();
+ }
+
+ public synchronized void put(Object obj, long timeout)
+ throws TimeoutException, InterruptedException {
+ if (timeout == 0) {
+ put(obj);
+ } else {
+ if (obj == null) {
+ throw new IllegalArgumentException("null");
+ }
+ if (fQueue[fNextIndex] != null) {
+ long t0 = System.currentTimeMillis();
+ long wait = timeout;
+ boolean timedout = true;
+ do {
+ this.wait(wait);
+ if (fQueue[fNextIndex] == null) {
+ timedout = false;
+ break;
+ }
+ long elapsed = System.currentTimeMillis() - t0;
+ wait = timeout - elapsed;
+ } while (wait > 0);
+ if (timedout) {
+ throw new TimeoutException("FixedSizeQueue.put");
+ }
+ }
+ fQueue[fNextIndex++] = obj;
+ if (fNextIndex >= fSize) {
+ fNextIndex = 0;
+ }
+ this.notifyAll();
+ }
+ }
+
+ public synchronized Object remove() throws InterruptedException {
+ Object req;
+ while ((req = fQueue[fMinIndex]) == null) {
+ this.wait();
+ }
+ fQueue[fMinIndex++] = null;
+ if (fMinIndex >= fSize) {
+ fMinIndex = 0;
+ }
+ this.notifyAll();
+ return req;
+ }
+
+ public synchronized Object remove(long timeout) throws TimeoutException,
+ InterruptedException {
+ Object req = fQueue[fMinIndex];
+ if (req == null) {
+ long t0 = System.currentTimeMillis();
+ long wait = timeout;
+ do {
+ this.wait(wait);
+ if ((req = fQueue[fMinIndex]) != null) {
+ break;
+ }
+ long elapsed = System.currentTimeMillis() - t0;
+ wait = timeout - elapsed;
+ } while (wait > 0);
+ if (req == null) {
+ throw new TimeoutException("FixedSizeQueue.remove");
+ }
+ }
+ fQueue[fMinIndex++] = null;
+ if (fMinIndex >= fSize) {
+ fMinIndex = 0;
+ }
+ this.notifyAll();
+ return req;
+ }
+
+ public synchronized Object nonBlockingRemove() {
+ Object req = fQueue[fMinIndex];
+ if (req != null) {
+ fQueue[fMinIndex++] = null;
+ if (fMinIndex >= fSize) {
+ fMinIndex = 0;
+ }
+ this.notifyAll();
+ }
+ return req;
+ }
+
+ public synchronized Object matchAndRemove(Object o,
+ Comparator<Object> comparator) {
+ if (o == null) {
+ throw new IllegalArgumentException();
+ }
+ Object req = fQueue[fMinIndex];
+ if (req == null) {
+ return null;
+ }
+ if (comparator.compare(o, req) == 0) {
+ fQueue[fMinIndex++] = null;
+ if (fMinIndex >= fSize) {
+ fMinIndex = 0;
+ }
+ this.notifyAll();
+ return req;
+ } else {
+ return null;
+ }
+ }
}
diff --git a/plugins/org.eclipse.actf.util.httpproxy/src/org/eclipse/actf/util/internal/httpproxy/core/HTTPResponseReader.java b/plugins/org.eclipse.actf.util.httpproxy/src/org/eclipse/actf/util/internal/httpproxy/core/HTTPResponseReader.java
index e74fe27..267d4d9 100644
--- a/plugins/org.eclipse.actf.util.httpproxy/src/org/eclipse/actf/util/internal/httpproxy/core/HTTPResponseReader.java
+++ b/plugins/org.eclipse.actf.util.httpproxy/src/org/eclipse/actf/util/internal/httpproxy/core/HTTPResponseReader.java
@@ -20,7 +20,8 @@
public class HTTPResponseReader extends HTTPMessageReader {
private static final Logger LOGGER = Logger.getLogger(HTTPResponseReader.class);
- private final int id;
+ @SuppressWarnings("unused")
+ private final int id;
// Request = Status-Line
// *(( general-header
@@ -87,7 +88,6 @@
}
return msg;
}
- */
private final void WARNING(String msg) {
StringBuffer sb = new StringBuffer();
@@ -109,5 +109,6 @@
sb.append(msg);
LOGGER.fatal(sb.toString());
}
+ */
}
diff --git a/plugins/org.eclipse.actf.util.httpproxy/src/org/eclipse/actf/util/internal/httpproxy/core/ServerConnection.java b/plugins/org.eclipse.actf.util.httpproxy/src/org/eclipse/actf/util/internal/httpproxy/core/ServerConnection.java
index 32a2699..8700322 100644
--- a/plugins/org.eclipse.actf.util.httpproxy/src/org/eclipse/actf/util/internal/httpproxy/core/ServerConnection.java
+++ b/plugins/org.eclipse.actf.util.httpproxy/src/org/eclipse/actf/util/internal/httpproxy/core/ServerConnection.java
@@ -22,668 +22,697 @@
import org.eclipse.actf.util.httpproxy.core.TimeoutException;
import org.eclipse.actf.util.httpproxy.util.Logger;
-
-
public abstract class ServerConnection implements Runnable {
- private static final Logger LOGGER = Logger.getLogger(ServerConnection.class);
+ private static final Logger LOGGER = Logger
+ .getLogger(ServerConnection.class);
- private static class SocketOpener implements Runnable {
- private ServerConnection fSocketReceiver;
+ private static class SocketOpener implements Runnable {
+ private ServerConnection fSocketReceiver;
- private String fHost;
+ private String fHost;
- private int fPort;
+ private int fPort;
- private int fSOTimeout;
+// private int fSOTimeout;
- private boolean isValid;
+ private boolean isValid;
- SocketOpener(String host, int port, int soTimeout, ServerConnection socketReceiver) {
- fHost = host;
- fPort = port;
- fSOTimeout = soTimeout;
- fSocketReceiver = socketReceiver;
- isValid = true;
- }
+ SocketOpener(String host, int port, int soTimeout,
+ ServerConnection socketReceiver) {
+ fHost = host;
+ fPort = port;
+// fSOTimeout = soTimeout;
+ fSocketReceiver = socketReceiver;
+ isValid = true;
+ }
- synchronized void setValid(boolean v) {
- isValid = v;
- }
+ synchronized void setValid(boolean v) {
+ isValid = v;
+ }
- synchronized boolean isValid() {
- return isValid;
- }
+ synchronized boolean isValid() {
+ return isValid;
+ }
- public void run() {
- fSocketReceiver.DEBUG("SocketOpener started");
- try {
- Socket sock = null;
- OutputStream out = null;
- InputStream in = null;
- if (isValid()) {
- if (LOGGER.isDebugEnabled()) {
- fSocketReceiver.DEBUG("Trying to create a Socket: " + fHost + "@" + fPort);
- }
- sock = new Socket(fHost, fPort);
- // sock = new Socket();
- // sock.setSoTimeout((int) fSOTimeout);
- // sock.setTcpNoDelay(true);
- // SocketAddress dest = new
- // InetSocketAddress(fInfo.getHost(), fInfo.getPort());
- // sock.connect(dest, (int) fSOTimeout);
- sock.setSoTimeout(1);
- out = sock.getOutputStream();
- in = sock.getInputStream();
- if (LOGGER.isDebugEnabled()) {
- fSocketReceiver.DEBUG("Created a Socket: " + fHost + "@" + fPort);
- }
- }
- if (sock != null && out != null && in != null && isValid()) {
- if (LOGGER.isDebugEnabled()) {
- fSocketReceiver.DEBUG("Set a Socket to the ServerConnection: " + fHost + "@" + fPort);
- }
- fSocketReceiver.notifyConnected(sock, out, in);
- }
- } catch (IOException e) {
- // e.printStackTrace();
- if (LOGGER.isDebugEnabled()) {
- fSocketReceiver.DEBUG("Failed to create a Socket (" + e.getClass().getName() + "): " + fHost + "@" + fPort);
- }
- if (isValid()) {
- fSocketReceiver.notifyConnectFailed(e);
- }
- }
- }
- }
+ public void run() {
+ fSocketReceiver.DEBUG("SocketOpener started");
+ try {
+ Socket sock = null;
+ OutputStream out = null;
+ InputStream in = null;
+ if (isValid()) {
+ if (LOGGER.isDebugEnabled()) {
+ fSocketReceiver.DEBUG("Trying to create a Socket: "
+ + fHost + "@" + fPort);
+ }
+ sock = new Socket(fHost, fPort);
+ // sock = new Socket();
+ // sock.setSoTimeout((int) fSOTimeout);
+ // sock.setTcpNoDelay(true);
+ // SocketAddress dest = new
+ // InetSocketAddress(fInfo.getHost(), fInfo.getPort());
+ // sock.connect(dest, (int) fSOTimeout);
+ sock.setSoTimeout(1);
+ out = sock.getOutputStream();
+ in = sock.getInputStream();
+ if (LOGGER.isDebugEnabled()) {
+ fSocketReceiver.DEBUG("Created a Socket: " + fHost
+ + "@" + fPort);
+ }
+ }
+ if (sock != null && out != null && in != null && isValid()) {
+ if (LOGGER.isDebugEnabled()) {
+ fSocketReceiver
+ .DEBUG("Set a Socket to the ServerConnection: "
+ + fHost + "@" + fPort);
+ }
+ fSocketReceiver.notifyConnected(sock, out, in);
+ }
+ } catch (IOException e) {
+ // e.printStackTrace();
+ if (LOGGER.isDebugEnabled()) {
+ fSocketReceiver.DEBUG("Failed to create a Socket ("
+ + e.getClass().getName() + "): " + fHost + "@"
+ + fPort);
+ }
+ if (isValid()) {
+ fSocketReceiver.notifyConnectFailed(e);
+ }
+ }
+ }
+ }
- private static class Status {
- private int fStat = STAT_INIT;
+ private static class Status {
+ private int fStat = STAT_INIT;
- private int fNumWaiters = 0;
+ private int fNumWaiters = 0;
- Status() {
- // nop
- }
+ Status() {
+ // nop
+ }
- synchronized void set(int stat) {
- fStat = stat;
- this.notifyAll();
- }
+ synchronized void set(int stat) {
+ fStat = stat;
+ this.notifyAll();
+ }
- synchronized int get() {
- return fStat;
- }
+ synchronized int get() {
+ return fStat;
+ }
- synchronized boolean equals(int stat) {
- return (fStat == stat);
- }
+ synchronized boolean equals(int stat) {
+ return (fStat == stat);
+ }
- synchronized boolean waitFor(int stat, long timeout)
- throws InterruptedException, TimeoutException {
- if (fStat == stat) return true;
- if (fStat == STAT_CLOSED) return false;
+ synchronized boolean waitFor(int stat, long timeout)
+ throws InterruptedException, TimeoutException {
+ if (fStat == stat)
+ return true;
+ if (fStat == STAT_CLOSED)
+ return false;
- fNumWaiters += 1;
- long deadline = System.currentTimeMillis() + timeout;
- long wait = timeout;
- while (wait > 0) {
- this.wait(wait);
- if (fStat == stat) return true;
- if (fStat == STAT_CLOSED) return false;
- wait = deadline - System.currentTimeMillis();
- }
- throw new TimeoutException();
- }
+ fNumWaiters += 1;
+ long deadline = System.currentTimeMillis() + timeout;
+ long wait = timeout;
+ while (wait > 0) {
+ this.wait(wait);
+ if (fStat == stat)
+ return true;
+ if (fStat == STAT_CLOSED)
+ return false;
+ wait = deadline - System.currentTimeMillis();
+ }
+ throw new TimeoutException();
+ }
- public String toString() {
- switch (fStat) {
- case STAT_CLOSED:
- return "CLOSED";
- case STAT_INIT:
- return "INIT";
- case STAT_CONNECTING:
- return "CONNECTING";
- case STAT_CONNECTED:
- return "CONNECTED";
- default:
- return "Unkown";
- }
- }
- }
+ public String toString() {
+ switch (fStat) {
+ case STAT_CLOSED:
+ return "CLOSED";
+ case STAT_INIT:
+ return "INIT";
+ case STAT_CONNECTING:
+ return "CONNECTING";
+ case STAT_CONNECTED:
+ return "CONNECTED";
+ default:
+ return "Unkown";
+ }
+ }
+ }
- public static final int STAT_CLOSED = -2;
+ public static final int STAT_CLOSED = -2;
- public static final int STAT_FINALIZING = -1;
+ public static final int STAT_FINALIZING = -1;
- public static final int STAT_INIT = 0;
+ public static final int STAT_INIT = 0;
- public static final int STAT_CONNECTING = 1;
+ public static final int STAT_CONNECTING = 1;
- public static final int STAT_CONNECTED = 2;
+ public static final int STAT_CONNECTED = 2;
- private final ServerKey fKey;
+ private final ServerKey fKey;
- private final String name;
+ private final String name;
- private final String host;
+ private final String host;
- public String getHost() {
- return host;
- }
+ public String getHost() {
+ return host;
+ }
- private final int port;
+ private final int port;
- public int getPort() {
- return port;
- }
+ public int getPort() {
+ return port;
+ }
- private final RequestDispatcher fDispatcher;
+ private final RequestDispatcher fDispatcher;
- private final int fRetryTime;
+ private final int fRetryTime;
- private final int timeout;
+ private final int timeout;
- private long fActivatedTime;
+// private long fActivatedTime;
- private Thread fThread;
+ private Thread fThread;
- private SocketOpener fSocketOpener = null;
+ private SocketOpener fSocketOpener = null;
- private Thread fSocketOpenerThread = null;
+ private Thread fSocketOpenerThread = null;
- private IHTTPRequestMessage fRequest;
+ private IHTTPRequestMessage fRequest;
- private long fFirstTimeout = 0;
+ private long fFirstTimeout = 0;
- private Socket fSocket = null;
+ private Socket fSocket = null;
- private IOException fSocketException = null;
+// private IOException fSocketException = null;
- private InputStream fInputStream = null;
+ private InputStream fInputStream = null;
- InputStream getInputStream() {
- return fInputStream;
- }
+ InputStream getInputStream() {
+ return fInputStream;
+ }
- private BufferedOutputStream fOutputStream = null;
+ private BufferedOutputStream fOutputStream = null;
- OutputStream getOutputStream() {
- return fOutputStream;
- }
+ OutputStream getOutputStream() {
+ return fOutputStream;
+ }
- private HTTPResponseReader fReader = null;
+ private HTTPResponseReader fReader = null;
- private long fMessageSerial;
+ private long fMessageSerial;
- private Status fStat = new Status();
+ private Status fStat = new Status();
- /**
- *
- */
- protected ServerConnection(String name, String host, int port, int group, int index, RequestDispatcher dispatcher,
- int retryTime, int timeout) {
- this.name = name;
- this.host = host;
- this.port = port;
- fKey = new ServerKey(group, index);
- fRetryTime = retryTime;
- fDispatcher = dispatcher;
- this.timeout = timeout;
- fRequest = null;
- fMessageSerial = 0;
- }
+ /**
+ *
+ */
+ protected ServerConnection(String name, String host, int port, int group,
+ int index, RequestDispatcher dispatcher, int retryTime, int timeout) {
+ this.name = name;
+ this.host = host;
+ this.port = port;
+ fKey = new ServerKey(group, index);
+ fRetryTime = retryTime;
+ fDispatcher = dispatcher;
+ this.timeout = timeout;
+ fRequest = null;
+ fMessageSerial = 0;
+ }
- protected abstract HTTPResponseMessage createHTTPResponseMessage(long msgSerial);
-
- public synchronized void reset() {
- deactivate();
- setStat(STAT_INIT);
- }
+ protected abstract HTTPResponseMessage createHTTPResponseMessage(
+ long msgSerial);
- public ServerKey getKey() {
- return fKey;
- }
+ public synchronized void reset() {
+ deactivate();
+ setStat(STAT_INIT);
+ }
- public synchronized boolean isActive() {
- return (fThread != null);
- }
+ public ServerKey getKey() {
+ return fKey;
+ }
- // synchronized
- public synchronized void activate() {
- if (isActive()) {
- DEBUG("activate: already active");
- return;
- }
- DEBUG("activate");
- setStat(STAT_CONNECTING);
- fActivatedTime = System.currentTimeMillis();
- fMessageSerial = 0;
- setTimeout(false);
- fThread = new Thread(this, "ServerConnection-" + name);
- fThread.start();
+ public synchronized boolean isActive() {
+ return (fThread != null);
+ }
- fSocket = null;
- fSocketException = null;
- fOutputStream = null;
- fReader = null;
+ // synchronized
+ public synchronized void activate() {
+ if (isActive()) {
+ DEBUG("activate: already active");
+ return;
+ }
+ DEBUG("activate");
+ setStat(STAT_CONNECTING);
+// fActivatedTime = System.currentTimeMillis();
+ fMessageSerial = 0;
+ setTimeout(false);
+ fThread = new Thread(this, "ServerConnection-" + name);
+ fThread.start();
- fSocketOpener = new SocketOpener(host, port, (int) timeout * 2, this);
- fSocketOpenerThread = new Thread(fSocketOpener, "SocketOpener-" + this.toString());
- fSocketOpenerThread.start();
- }
+ fSocket = null;
+// fSocketException = null;
+ fOutputStream = null;
+ fReader = null;
- public void activateAndConnect(long timeout) throws IOException, TimeoutException, InterruptedException {
- activate();
- waitUntilConnected(timeout);
- }
+ fSocketOpener = new SocketOpener(host, port, (int) timeout * 2, this);
+ fSocketOpenerThread = new Thread(fSocketOpener, "SocketOpener-"
+ + this.toString());
+ fSocketOpenerThread.start();
+ }
- public void waitUntilConnected(long timeout) throws IOException, TimeoutException, InterruptedException {
- DEBUG("waitUntilConnected");
- int stat;
- synchronized (fStat) {
- if (fStat.equals(STAT_CONNECTING)) {
- fStat.waitFor(STAT_CONNECTED, timeout);
- }
- stat = fStat.get();
- }
- if (stat == STAT_CONNECTED) {
- DEBUG("connected");
- return;
- } else if (stat == STAT_CONNECTING) {
- throw new TimeoutException("ServerConnection.waitUntilConnected");
- } else if (stat == STAT_INIT) {
- throw new IOException("This connection is not activated yet");
- } else if (stat == STAT_CLOSED) {
- throw new IOException("This connection is already closed");
- }
- }
+ public void activateAndConnect(long timeout) throws IOException,
+ TimeoutException, InterruptedException {
+ activate();
+ waitUntilConnected(timeout);
+ }
- /**
- * Deactivate this connection.
- */
- public synchronized void deactivate() {
- DEBUG("deactivate");
- if (!isActive()) {
- DEBUG("deactivate: already deactive");
- return;
- }
+ public void waitUntilConnected(long timeout) throws IOException,
+ TimeoutException, InterruptedException {
+ DEBUG("waitUntilConnected");
+ int stat;
+ synchronized (fStat) {
+ if (fStat.equals(STAT_CONNECTING)) {
+ fStat.waitFor(STAT_CONNECTED, timeout);
+ }
+ stat = fStat.get();
+ }
+ if (stat == STAT_CONNECTED) {
+ DEBUG("connected");
+ return;
+ } else if (stat == STAT_CONNECTING) {
+ throw new TimeoutException("ServerConnection.waitUntilConnected");
+ } else if (stat == STAT_INIT) {
+ throw new IOException("This connection is not activated yet");
+ } else if (stat == STAT_CLOSED) {
+ throw new IOException("This connection is already closed");
+ }
+ }
- // Stop the thread
- try {
- if (fSocket != null && !fSocket.isOutputShutdown()) {
- fSocket.shutdownOutput();
- }
- } catch (IOException e) {
- // ignore
- WARNING("Failed to shutdown a socket (IOException): " + e.getMessage());
- }
- fThread.interrupt();
- fThread = null;
+ /**
+ * Deactivate this connection.
+ */
+ public synchronized void deactivate() {
+ DEBUG("deactivate");
+ if (!isActive()) {
+ DEBUG("deactivate: already deactive");
+ return;
+ }
- fSocketOpener.setValid(false);
- fSocketOpenerThread.interrupt();
+ // Stop the thread
+ try {
+ if (fSocket != null && !fSocket.isOutputShutdown()) {
+ fSocket.shutdownOutput();
+ }
+ } catch (IOException e) {
+ // ignore
+ WARNING("Failed to shutdown a socket (IOException): "
+ + e.getMessage());
+ }
+ fThread.interrupt();
+ fThread = null;
- fSocketOpener = null;
- fSocketOpenerThread = null;
- fSocket = null;
- fOutputStream = null;
- fReader = null;
+ fSocketOpener.setValid(false);
+ fSocketOpenerThread.interrupt();
- fActivatedTime = 0;
- fMessageSerial = 0;
- setTimeout(false);
- setStat(STAT_CLOSED);
- }
+ fSocketOpener = null;
+ fSocketOpenerThread = null;
+ fSocket = null;
+ fOutputStream = null;
+ fReader = null;
- synchronized void notifyConnected(Socket sock, OutputStream out, InputStream in) {
- if (sock == null) {
- throw new IllegalArgumentException("null");
- }
- DEBUG("setSocket");
- fSocket = sock;
- fSocketException = null;
- fOutputStream = new BufferedOutputStream(new SocketTimeoutRetryOutputStream(out));
- fInputStream = in;
- fReader = fDispatcher.createHTTPResponseReader(in);
- setStat(STAT_CONNECTED);
- }
+// fActivatedTime = 0;
+ fMessageSerial = 0;
+ setTimeout(false);
+ setStat(STAT_CLOSED);
+ }
- synchronized void notifyConnectFailed(IOException e) {
- WARNING("setSocketException: failed to create a socket (IOException): " + e.getMessage());
- fSocket = null;
- fSocketException = e;
- fOutputStream = null;
- fReader = null;
- deactivate();
- }
+ synchronized void notifyConnected(Socket sock, OutputStream out,
+ InputStream in) {
+ if (sock == null) {
+ throw new IllegalArgumentException("null");
+ }
+ DEBUG("setSocket");
+ fSocket = sock;
+ //fSocketException = null;
+ fOutputStream = new BufferedOutputStream(
+ new SocketTimeoutRetryOutputStream(out));
+ fInputStream = in;
+ fReader = fDispatcher.createHTTPResponseReader(in);
+ setStat(STAT_CONNECTED);
+ }
- public synchronized void setTimeout(boolean timeout) {
- if (timeout) {
- if (fFirstTimeout == 0) {
- fFirstTimeout = System.currentTimeMillis();
- }
- } else {
- fFirstTimeout = 0;
- }
- }
+ synchronized void notifyConnectFailed(IOException e) {
+ WARNING("setSocketException: failed to create a socket (IOException): "
+ + e.getMessage());
+ fSocket = null;
+ //fSocketException = e;
+ fOutputStream = null;
+ fReader = null;
+ deactivate();
+ }
- private synchronized boolean isTimeout() {
- return (fFirstTimeout != 0);
- }
+ public synchronized void setTimeout(boolean timeout) {
+ if (timeout) {
+ if (fFirstTimeout == 0) {
+ fFirstTimeout = System.currentTimeMillis();
+ }
+ } else {
+ fFirstTimeout = 0;
+ }
+ }
+
+ /*
+ * private synchronized boolean isTimeout() { return (fFirstTimeout != 0); }
+ */
+
+ public synchronized boolean isInvalid() {
+ if (fFirstTimeout == 0) {
+ return false;
+ }
+ return (System.currentTimeMillis() - fFirstTimeout < fRetryTime);
+ }
+
+ public int getStat() {
+ return fStat.get();
+ }
+
+ private void setStat(int stat) {
+ fStat.set(stat);
+ }
- public synchronized boolean isInvalid() {
- if (fFirstTimeout == 0) {
- return false;
- }
- return (System.currentTimeMillis() - fFirstTimeout < fRetryTime);
- }
+ // synchronized
+ private boolean startSession(long msgSerial, long timeout)
+ throws TimeoutException, InterruptedException {
+ if (fMessageSerial != 0) {
+ if (msgSerial == fMessageSerial) {
+ // Avoid sending the same message
+ return false;
+ }
+ // There is a message whose response is not yet arrived.
+ // Wait until a response arrives, or timeout.
+ long deadline = System.currentTimeMillis() + timeout;
+ long wait = timeout;
+ while (wait > 0) {
+ this.wait(wait); // <--
+ if (fMessageSerial == 0) {
+ break;
+ }
+ wait = deadline - System.currentTimeMillis();
+ }
+ if (fMessageSerial != 0) {
+ throw new TimeoutException("ServerConnection.startSession");
+ }
+ }
+ // (fMessageSerial == 0) && (fMessageSerial != msgSerial)
+ fMessageSerial = msgSerial;
+ if (LOGGER.isDebugEnabled()) {
+ DEBUG("Session started: msgSerial=" + fMessageSerial);
+ }
+ return true;
+ }
- public int getStat() {
- return fStat.get();
- }
+ private synchronized void finishSession() {
+ if (LOGGER.isDebugEnabled()) {
+ DEBUG("Session finished: msgSerial=" + fMessageSerial);
+ }
+ assert (fMessageSerial != 0);
+ fMessageSerial = 0; // -->startSession
+ this.notifyAll();
+ }
- private void setStat(int stat) {
- fStat.set(stat);
- }
+ /**
+ * Send a message asynchronously.
+ *
+ * @param req
+ * @param timeout
+ * @return
+ */
+ public synchronized void putRequest(IHTTPRequestMessage req, long timeout)
+ throws TimeoutException, InterruptedException {
+ if (LOGGER.isDebugEnabled()) {
+ DEBUG("putRequest: msgSerial=" + req.getSerial() + ", tid="
+ + req.getTid());
+ }
+ boolean sessionStarted = startSession(req.getSerial(), timeout);
+ assert (req.getSerial() == fMessageSerial);
+ if (sessionStarted) {
+ // fMessageSerial != null
+ fRequest = req; // -->nextRequest
+ this.notifyAll();
+ }
+ }
- // synchronized
- private boolean startSession(long msgSerial, long timeout) throws TimeoutException, InterruptedException {
- if (fMessageSerial != 0) {
- if (msgSerial == fMessageSerial) {
- // Avoid sending the same message
- return false;
- }
- // There is a message whose response is not yet arrived.
- // Wait until a response arrives, or timeout.
- long deadline = System.currentTimeMillis() + timeout;
- long wait = timeout;
- while (wait > 0) {
- this.wait(wait); // <--
- if (fMessageSerial == 0) {
- break;
- }
- wait = deadline - System.currentTimeMillis();
- }
- if (fMessageSerial != 0) {
- throw new TimeoutException("ServerConnection.startSession");
- }
- }
- // (fMessageSerial == 0) && (fMessageSerial != msgSerial)
- fMessageSerial = msgSerial;
- if (LOGGER.isDebugEnabled()) {
- DEBUG("Session started: msgSerial=" + fMessageSerial);
- }
- return true;
- }
+ /*
+ * public synchronized HTTPResponseMessage getResponse(long timeout) throws
+ * TimeoutException, InterruptedException { while (fResponse == null) {
+ * this.wait(); } HTTPResponseMessage response = fResponse; fResponse =
+ * null; fMessageSerial = 0; //-->startSession (putRequest)
+ * this.notifyAll(); return response; }
+ */
- private synchronized void finishSession() {
- if (LOGGER.isDebugEnabled()) {
- DEBUG("Session finished: msgSerial=" + fMessageSerial);
- }
- assert (fMessageSerial != 0);
- fMessageSerial = 0; // -->startSession
- this.notifyAll();
- }
+ private synchronized IHTTPRequestMessage nextRequest()
+ throws InterruptedException {
+ while (fRequest == null || fMessageSerial == 0) {
+ if (LOGGER.isDebugEnabled()) {
+ DEBUG("nextRequest waiting: request=" + fRequest
+ + ", msgSerial=" + fMessageSerial);
+ }
+ this.wait();
+ }
+ // (fRequest != null) && (fMessageSerial != 0)
+ if (LOGGER.isDebugEnabled()) {
+ DEBUG("nextRequest waiting done: request=" + fRequest
+ + ", msgSerial=" + fMessageSerial);
+ }
+ IHTTPRequestMessage req = fRequest;
+ fRequest = null;
+ return req;
+ }
- /**
- * Send a message asynchronously.
- *
- * @param req
- * @param timeout
- * @return
- */
- public synchronized void putRequest(IHTTPRequestMessage req, long timeout)
- throws TimeoutException, InterruptedException
- {
- if (LOGGER.isDebugEnabled()) {
- DEBUG("putRequest: msgSerial=" + req.getSerial() + ", tid=" + req.getTid());
- }
- boolean sessionStarted = startSession(req.getSerial(), timeout);
- assert (req.getSerial() == fMessageSerial);
- if (sessionStarted) {
- // fMessageSerial != null
- fRequest = req; // -->nextRequest
- this.notifyAll();
- }
- }
+ // run()
+ private IHTTPResponseMessage receiveResponse(long timeout,
+ boolean isBodyEmpty) throws InterruptedException, IOException,
+ TimeoutException {
+ HTTPResponseReader reader;
+ long msgSerial;
+ synchronized (this) {
+ assert (fStat.equals(STAT_CONNECTED)) && (fReader != null)
+ && (fMessageSerial != 0);
+ if (fReader == null || fMessageSerial == 0) {
+ throw new IOException("Deactivated");
+ }
+ reader = fReader;
+ msgSerial = fMessageSerial;
+ }
- /*
- public synchronized HTTPResponseMessage getResponse(long timeout)
- throws TimeoutException, InterruptedException
- {
- while (fResponse == null) {
- this.wait();
- }
- HTTPResponseMessage response = fResponse;
- fResponse = null;
- fMessageSerial = 0; //-->startSession (putRequest)
- this.notifyAll();
- return response;
- }
- */
-
- private synchronized IHTTPRequestMessage nextRequest() throws InterruptedException {
- while (fRequest == null || fMessageSerial == 0) {
- if (LOGGER.isDebugEnabled()) {
- DEBUG("nextRequest waiting: request=" + fRequest + ", msgSerial=" + fMessageSerial);
- }
- this.wait();
- }
- // (fRequest != null) && (fMessageSerial != 0)
- if (LOGGER.isDebugEnabled()) {
- DEBUG("nextRequest waiting done: request=" + fRequest + ", msgSerial=" + fMessageSerial);
- }
- IHTTPRequestMessage req = fRequest;
- fRequest = null;
- return req;
- }
+ // if (fMessageSerial == 0) {
+ // long start = System.currentTimeMillis();
+ // long deadline = start + timeout;
+ // long wait = timeout;
+ // while (wait > 0) {
+ // this.wait(wait);
+ // if (fMessageSerial != 0) {
+ // break;
+ // }
+ // wait = deadline - System.currentTimeMillis();
+ // }
+ // timeout -= System.currentTimeMillis() - start;
+ // }
+ // if (fMessageSerial == 0 || timeout <= 0) {
+ // throw new TimeoutException("ServerConnection.receiveResponse");
+ // }
+ // (fMessageSerial != 0) && (timeout > 0)
- // run()
- private IHTTPResponseMessage receiveResponse(long timeout, boolean isBodyEmpty) throws InterruptedException,
- IOException, TimeoutException {
- HTTPResponseReader reader;
- long msgSerial;
- synchronized (this) {
- assert (fStat.equals(STAT_CONNECTED)) && (fReader != null) && (fMessageSerial != 0);
- if (fReader == null || fMessageSerial == 0) {
- throw new IOException("Deactivated");
- }
- reader = fReader;
- msgSerial = fMessageSerial;
- }
+ HTTPResponseMessage response = createHTTPResponseMessage(msgSerial);
+ DEBUG("Try to read response...");
+ reader.readMessage(response, timeout, isBodyEmpty);
- // if (fMessageSerial == 0) {
- // long start = System.currentTimeMillis();
- // long deadline = start + timeout;
- // long wait = timeout;
- // while (wait > 0) {
- // this.wait(wait);
- // if (fMessageSerial != 0) {
- // break;
- // }
- // wait = deadline - System.currentTimeMillis();
- // }
- // timeout -= System.currentTimeMillis() - start;
- // }
- // if (fMessageSerial == 0 || timeout <= 0) {
- // throw new TimeoutException("ServerConnection.receiveResponse");
- // }
- // (fMessageSerial != 0) && (timeout > 0)
+ // if (LOGGER.isDebugEnabled()) {
+ // DEBUG("Received a response: msgSerial=" + response.getSerial());
+ // System.out.println("#######Response from server group " +
+ // fGroup.getIndex());
+ // try { response.write(System.out); } catch (Exception e) {}
+ // System.out.println("####################################################################");
+ // }
+ // fInfo.decrementProcessingRequest();
+ // if (response.getFFFResultCode() == FFFServletHeader.RC_SUCCESS) {
+ // fInfo.updateLastActivityTime();
+ // }
+ return response;
+ }
- HTTPResponseMessage response = createHTTPResponseMessage(msgSerial);
- DEBUG("Try to read response...");
- reader.readMessage(response, timeout, isBodyEmpty);
-
- // if (LOGGER.isDebugEnabled()) {
- // DEBUG("Received a response: msgSerial=" + response.getSerial());
- // System.out.println("#######Response from server group " + fGroup.getIndex());
- // try { response.write(System.out); } catch (Exception e) {}
- // System.out.println("####################################################################");
- // }
- // fInfo.decrementProcessingRequest();
- // if (response.getFFFResultCode() == FFFServletHeader.RC_SUCCESS) {
- // fInfo.updateLastActivityTime();
- // }
- return response;
- }
-
- private synchronized void sendRequest(IHTTPRequestMessage request,
- long timeout)
- throws IOException, InterruptedException, TimeoutException {
- if (LOGGER.isDebugEnabled()) {
- DEBUG("sendRequest....");
- }
- long t0 = System.currentTimeMillis();
- waitUntilConnected(timeout);
- assert (fOutputStream != null);
- long t1 = System.currentTimeMillis();
- timeout -= t1 - t0;
- if (timeout <= 0) {
- throw new TimeoutException();
- }
+ private synchronized void sendRequest(IHTTPRequestMessage request,
+ long timeout) throws IOException, InterruptedException,
+ TimeoutException {
+ if (LOGGER.isDebugEnabled()) {
+ DEBUG("sendRequest....");
+ }
+ long t0 = System.currentTimeMillis();
+ waitUntilConnected(timeout);
+ assert (fOutputStream != null);
+ long t1 = System.currentTimeMillis();
+ timeout -= t1 - t0;
+ if (timeout <= 0) {
+ throw new TimeoutException();
+ }
- if (LOGGER.isDebugEnabled()) {
- DEBUG("Sent a request: msgSerial=" + request.getSerial() + ", tid=" + request.getTid());
- StringBuffer sb = new StringBuffer();
- ByteArrayOutputStream ob = new ByteArrayOutputStream();
- BifurcatedOutputStream o = new BifurcatedOutputStream(fOutputStream, ob);
- request.write(timeout, o);
- ob.close();
- sb.append("\n===============>====>====>====>=============>\n");
- sb.append(ob.toString());
- sb.append("===============<====<====<====<=============<\n");
- DEBUG(sb.toString());
- } else {
- request.write(timeout, fOutputStream);
- }
- fOutputStream.flush();
+ if (LOGGER.isDebugEnabled()) {
+ DEBUG("Sent a request: msgSerial=" + request.getSerial() + ", tid="
+ + request.getTid());
+ StringBuffer sb = new StringBuffer();
+ ByteArrayOutputStream ob = new ByteArrayOutputStream();
+ BifurcatedOutputStream o = new BifurcatedOutputStream(
+ fOutputStream, ob);
+ request.write(timeout, o);
+ ob.close();
+ sb.append("\n===============>====>====>====>=============>\n");
+ sb.append(ob.toString());
+ sb.append("===============<====<====<====<=============<\n");
+ DEBUG(sb.toString());
+ } else {
+ request.write(timeout, fOutputStream);
+ }
+ fOutputStream.flush();
- fMessageSerial = request.getSerial();
+ fMessageSerial = request.getSerial();
- // fInfo.incrementProcessingRequest();
- }
+ // fInfo.incrementProcessingRequest();
+ }
- /**
- * Reciever a response from the server. Keep trying to read a response and
- * pass it to the dispatcher.
- */
- public void run() {
- DEBUG("receiver thread started");
- // long lastActivityTime = System.currentTimeMillis();
- try {
- boolean serverError = false;
- int counter = 0;
- while (!Thread.interrupted() && !serverError) {
- counter += 1;
- IHTTPRequestMessage request = nextRequest();
+ /**
+ * Reciever a response from the server. Keep trying to read a response and
+ * pass it to the dispatcher.
+ */
+ public void run() {
+ DEBUG("receiver thread started");
+ // long lastActivityTime = System.currentTimeMillis();
+ try {
+ boolean serverError = false;
+ int counter = 0;
+ while (!Thread.interrupted() && !serverError) {
+ counter += 1;
+ IHTTPRequestMessage request = nextRequest();
- // Send a request to the server
- try {
- if (request.isConnectionShutdownRequired()) {
- request.setConnectionHeader(false);
- sendRequest(request, timeout);
- fSocket.shutdownOutput();
- setStat(STAT_FINALIZING);
- } else {
- request.setConnectionHeader(true);
- sendRequest(request, timeout);
- }
- // fInfo.notifySuccessfulSend();
- // lastActivityTime = System.currentTimeMillis();
- } catch (TimeoutException timeout) {
- DEBUG("sendRequest() timeout");
- setTimeout(true);
- continue;
- }
+ // Send a request to the server
+ try {
+ if (request.isConnectionShutdownRequired()) {
+ request.setConnectionHeader(false);
+ sendRequest(request, timeout);
+ fSocket.shutdownOutput();
+ setStat(STAT_FINALIZING);
+ } else {
+ request.setConnectionHeader(true);
+ sendRequest(request, timeout);
+ }
+ // fInfo.notifySuccessfulSend();
+ // lastActivityTime = System.currentTimeMillis();
+ } catch (TimeoutException timeout) {
+ DEBUG("sendRequest() timeout");
+ setTimeout(true);
+ continue;
+ }
- assert (fStat.equals(STAT_CONNECTED) || fStat.equals(STAT_FINALIZING));
+ assert (fStat.equals(STAT_CONNECTED) || fStat
+ .equals(STAT_FINALIZING));
- // Recieve a resposne from the server
- int loop = 0;
- long start = System.currentTimeMillis();
- while (!Thread.currentThread().isInterrupted() && !serverError) {
- try {
- DEBUG("ReceiveResponse...");
- IHTTPResponseMessage response = receiveResponse(timeout, request.isResponseBodyEmpty());
+ // Recieve a resposne from the server
+ int loop = 0;
+ long start = System.currentTimeMillis();
+ while (!Thread.currentThread().isInterrupted() && !serverError) {
+ try {
+ DEBUG("ReceiveResponse...");
+ IHTTPResponseMessage response = receiveResponse(
+ timeout, request.isResponseBodyEmpty());
- // Send the response to the client
- DEBUG("ResponseArrived...");
- if (response.isConnectionToBeClosed()) {
- DEBUG("Since the response is not keep-alive, we do not reuse this connection.");
- setStat(STAT_FINALIZING);
- }
- fDispatcher.responseArrived(this, response); // Interrupted
- finishSession();
+ // Send the response to the client
+ DEBUG("ResponseArrived...");
+ if (response.isConnectionToBeClosed()) {
+ DEBUG("Since the response is not keep-alive, we do not reuse this connection.");
+ setStat(STAT_FINALIZING);
+ }
+ fDispatcher.responseArrived(this, response); // Interrupted
+ finishSession();
- break;
- } catch (TimeoutException timeout) {
- // continue;
- // setTimeout(true);
- if (LOGGER.isDebugEnabled()) {
- DEBUG("response receiving timeout (" + timeout.getMessage() + "): retry=" + loop
- + ", elapsed=" + (System.currentTimeMillis() - start));
- }
- }
- loop += 1;
- long delay = System.currentTimeMillis() - start;
- if (delay > 60 * 1000) {
- throw new IOException("Shutdown this connection");
- }
- }
- }
- } catch (HTTPConnectionException e) {
- DEBUG("The connection is closed by the peer.");
- } catch (IOException e) {
- DEBUG("IOException: " + e.getMessage());
- HTTPMalformedResponseMessage response = new HTTPMalformedResponseMessage(fMessageSerial, e);
- try {
- fDispatcher.responseArrived(this, response);
- } catch (InterruptedException interrupted) {
- }
- finishSession();
- } catch (InterruptedException e) {
- DEBUG("reader thread interrupted");
- // ignore
- } finally {
- DEBUG("receiver thread stopped");
- deactivate();
- }
- }
+ break;
+ } catch (TimeoutException timeout) {
+ // continue;
+ // setTimeout(true);
+ if (LOGGER.isDebugEnabled()) {
+ DEBUG("response receiving timeout ("
+ + timeout.getMessage() + "): retry=" + loop
+ + ", elapsed="
+ + (System.currentTimeMillis() - start));
+ }
+ }
+ loop += 1;
+ long delay = System.currentTimeMillis() - start;
+ if (delay > 60 * 1000) {
+ throw new IOException("Shutdown this connection");
+ }
+ }
+ }
+ } catch (HTTPConnectionException e) {
+ DEBUG("The connection is closed by the peer.");
+ } catch (IOException e) {
+ DEBUG("IOException: " + e.getMessage());
+ HTTPMalformedResponseMessage response = new HTTPMalformedResponseMessage(
+ fMessageSerial, e);
+ try {
+ fDispatcher.responseArrived(this, response);
+ } catch (InterruptedException interrupted) {
+ }
+ finishSession();
+ } catch (InterruptedException e) {
+ DEBUG("reader thread interrupted");
+ // ignore
+ } finally {
+ DEBUG("receiver thread stopped");
+ deactivate();
+ }
+ }
- public String toString() {
- StringBuffer sb = new StringBuffer();
- sb.append(host).append(':').append(port).append('.').append(fKey.toString());
- return sb.toString();
- }
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append(host).append(':').append(port).append('.').append(
+ fKey.toString());
+ return sb.toString();
+ }
- private final void DEBUG(String msg) {
- if (LOGGER.isDebugEnabled()) {
- StringBuffer sb = new StringBuffer();
- sb.append(name).append(" [");
- sb.append(this.toString()).append("] ").append(msg);
- LOGGER.debug(sb.toString());
- }
- }
+ private final void DEBUG(String msg) {
+ if (LOGGER.isDebugEnabled()) {
+ StringBuffer sb = new StringBuffer();
+ sb.append(name).append(" [");
+ sb.append(this.toString()).append("] ").append(msg);
+ LOGGER.debug(sb.toString());
+ }
+ }
- private final void WARNING(String msg) {
- StringBuffer sb = new StringBuffer();
- sb.append(name).append(" [");
- sb.append(this.toString()).append("] ").append(msg);
- LOGGER.warning(sb.toString());
- }
+ private final void WARNING(String msg) {
+ StringBuffer sb = new StringBuffer();
+ sb.append(name).append(" [");
+ sb.append(this.toString()).append("] ").append(msg);
+ LOGGER.warning(sb.toString());
+ }
- public synchronized String dump() {
- StringBuffer sb = new StringBuffer();
- sb.append(this.toString()).append(": stat=").append(fStat);
- sb.append(", thread=");
- sb.append((fThread == null) ? "null" : Boolean.toString(fThread.isAlive()));
- sb.append(", socketOpener=");
- sb.append((fSocketOpener == null) ? "null" : "exists");
- sb.append(", socketOpenerThread=");
- sb.append((fSocketOpenerThread == null) ? "null" : Boolean.toString(fSocketOpenerThread.isAlive()));
- sb.append(", socket=");
- sb.append((fSocket == null) ? "null" : Boolean.toString(fSocket.isConnected()));
- sb.append(", invalid=").append(isInvalid());
- return sb.toString();
- }
+ public synchronized String dump() {
+ StringBuffer sb = new StringBuffer();
+ sb.append(this.toString()).append(": stat=").append(fStat);
+ sb.append(", thread=");
+ sb.append((fThread == null) ? "null" : Boolean.toString(fThread
+ .isAlive()));
+ sb.append(", socketOpener=");
+ sb.append((fSocketOpener == null) ? "null" : "exists");
+ sb.append(", socketOpenerThread=");
+ sb.append((fSocketOpenerThread == null) ? "null" : Boolean
+ .toString(fSocketOpenerThread.isAlive()));
+ sb.append(", socket=");
+ sb.append((fSocket == null) ? "null" : Boolean.toString(fSocket
+ .isConnected()));
+ sb.append(", invalid=").append(isInvalid());
+ return sb.toString();
+ }
}
diff --git a/plugins/org.eclipse.actf.util.httpproxy/src/org/eclipse/actf/util/internal/httpproxy/proxy/ClientStateManager.java b/plugins/org.eclipse.actf.util.httpproxy/src/org/eclipse/actf/util/internal/httpproxy/proxy/ClientStateManager.java
index a43fdb7..d9097f9 100644
--- a/plugins/org.eclipse.actf.util.httpproxy/src/org/eclipse/actf/util/internal/httpproxy/proxy/ClientStateManager.java
+++ b/plugins/org.eclipse.actf.util.httpproxy/src/org/eclipse/actf/util/internal/httpproxy/proxy/ClientStateManager.java
@@ -9,47 +9,52 @@
* Hisashi MIYASHITA - initial API and implementation
*******************************************************************************/
-
package org.eclipse.actf.util.internal.httpproxy.proxy;
import java.util.HashMap;
import org.eclipse.actf.util.httpproxy.proxy.IClientStateManager;
-
-
public class ClientStateManager implements IClientStateManager {
- private Object key;
- private static HashMap clientStateManagers = new HashMap();
+ @SuppressWarnings("unused")
+ private Object key;
+
+ private static HashMap<Object, IClientStateManager> clientStateManagers = new HashMap<Object, IClientStateManager>();
- private HashMap stateMap;
+ private HashMap<Object, Object> stateMap;
- // We should use read-write lock instead of mutex.
- /* (non-Javadoc)
- * @see org.eclipse.actf.util.httpproxy.proxy.IClientStateManager#put(java.lang.Object, java.lang.Object)
+ // We should use read-write lock instead of mutex.
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.actf.util.httpproxy.proxy.IClientStateManager#put(java.lang.Object,
+ * java.lang.Object)
*/
- public synchronized void put(Object stateKey, Object stateValue) {
- stateMap.put(stateKey, stateValue);
- }
- /* (non-Javadoc)
+ public synchronized void put(Object stateKey, Object stateValue) {
+ stateMap.put(stateKey, stateValue);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
* @see org.eclipse.actf.util.httpproxy.proxy.IClientStateManager#get(java.lang.Object)
*/
- public synchronized Object get(Object stateKey) {
- return stateMap.get(stateKey);
- }
+ public synchronized Object get(Object stateKey) {
+ return stateMap.get(stateKey);
+ }
- private ClientStateManager(Object key) {
- this.key = key;
- this.stateMap = new HashMap();
- }
-
- public static IClientStateManager getClientStateManager(Object key) {
- IClientStateManager csm = (IClientStateManager) clientStateManagers.get(key);
- if (csm == null) {
- csm = new ClientStateManager(key);
- clientStateManagers.put(key, csm);
- }
- return csm;
- }
+ private ClientStateManager(Object key) {
+ this.key = key;
+ this.stateMap = new HashMap<Object, Object>();
+ }
+
+ public static IClientStateManager getClientStateManager(Object key) {
+ IClientStateManager csm = (IClientStateManager) clientStateManagers
+ .get(key);
+ if (csm == null) {
+ csm = new ClientStateManager(key);
+ clientStateManagers.put(key, csm);
+ }
+ return csm;
+ }
}
-
diff --git a/plugins/org.eclipse.actf.util.httpproxy/src/org/eclipse/actf/util/internal/httpproxy/proxy/HTTPRequestDispatcher.java b/plugins/org.eclipse.actf.util.httpproxy/src/org/eclipse/actf/util/internal/httpproxy/proxy/HTTPRequestDispatcher.java
index b80d21f..57f877c 100644
--- a/plugins/org.eclipse.actf.util.httpproxy/src/org/eclipse/actf/util/internal/httpproxy/proxy/HTTPRequestDispatcher.java
+++ b/plugins/org.eclipse.actf.util.httpproxy/src/org/eclipse/actf/util/internal/httpproxy/proxy/HTTPRequestDispatcher.java
@@ -29,7 +29,6 @@
import org.eclipse.actf.util.httpproxy.util.Logger;
import org.eclipse.actf.util.httpproxy.util.ParseURI;
import org.eclipse.actf.util.internal.httpproxy.core.HTTPResponseInMemoryMessage;
-import org.eclipse.actf.util.internal.httpproxy.core.HeaderToAdd;
import org.eclipse.actf.util.internal.httpproxy.core.RequestDispatcher;
import org.eclipse.actf.util.internal.httpproxy.core.ServerConnection;
import org.eclipse.actf.util.internal.httpproxy.core.Session;
@@ -41,7 +40,7 @@
private final IClientStateManager clientStateManager;
private final HTTPProxyConnection fClient;
-
+
private final ExternalProxyConfig fExProxyConf;
private final int timeout;
@@ -60,9 +59,9 @@
}
private static CacheConfig[] cacheConfigs = { // new
- // CacheConfig("http://disney\\.go\\.com/.*",
- // "max-age=" + 24 * 60 *
- // 60),
+ // CacheConfig("http://disney\\.go\\.com/.*",
+ // "max-age=" + 24 * 60 *
+ // 60),
new CacheConfig("http://.*\\abcnews\\.go\\.com/Video/.*", "no-cache") };
private IHTTPResponseMessage addCacheControl(String uriStr,
@@ -91,38 +90,40 @@
this.fExProxyConf = client.getExternalProxyConfig();
this.clientStateManager = clientStateManager;
this.timeout = timeout;
-
- //this.localServer = new HTTPLocalServerSWF(client.getSecretManager());
- //this.overrider = new SWFBootloader(getDispatcherId());
- IHTTPLocalServerFactory localServerFactory = client.getProxy().getLocalServerFactory();
- IHTTPSessionOverriderFactory overriderFactory = client.getProxy().getSessionOverriderFactory();
-
- if(null!=localServerFactory){
+ // this.localServer = new HTTPLocalServerSWF(client.getSecretManager());
+ // this.overrider = new SWFBootloader(getDispatcherId());
+
+ IHTTPLocalServerFactory localServerFactory = client.getProxy()
+ .getLocalServerFactory();
+ IHTTPSessionOverriderFactory overriderFactory = client.getProxy()
+ .getSessionOverriderFactory();
+
+ if (null != localServerFactory) {
this.localServer = localServerFactory.newInstance();
}
- if(null!=overriderFactory){
+ if (null != overriderFactory) {
this.overrider = overriderFactory.newInstance(getDispatcherId());
- }
+ }
}
private boolean processLocalServerRequest(IHTTPRequestMessage request)
throws InterruptedException, IOException {
- if(null==localServer){
+ if (null == localServer) {
return false;
}
return localServer.processRequest(getDispatcherId(), fClient, request,
transcoder);
}
- private ArrayList serverConnectionCache = new ArrayList();
+ private ArrayList<HTTPServerConnection> serverConnectionCache = new ArrayList<HTTPServerConnection>();
private HTTPServerConnection getConnection(String host, int port) {
HTTPServerConnection sc;
int len = serverConnectionCache.size();
int i = 0;
while (i < len) {
- sc = (HTTPServerConnection) serverConnectionCache.get(i);
+ sc = serverConnectionCache.get(i);
if (host.equals(sc.getHost()) && (port == sc.getPort())) {
if (sc.getStat() < 0) {
sc.deactivate();
@@ -169,7 +170,7 @@
host = ParseURI.parseHost(authority);
port = ParseURI.parsePort(authority, HTTP_WELL_KNOWN_PORT);
- HeaderToAdd header = new HeaderToAdd();
+ //HeaderToAdd header = new HeaderToAdd();
// header.init(HTTPHeader.HOST, host);
// request.addHeader(header);
request.setHeader(IHTTPHeader.HOST_A, host.getBytes());
@@ -181,7 +182,8 @@
request.setRequestURIString(absPath);
}
- IHTTPHeader pcHeader = request.getHeader(IHTTPHeader.PROXY_CONNECTION_A);
+ IHTTPHeader pcHeader = request
+ .getHeader(IHTTPHeader.PROXY_CONNECTION_A);
if (pcHeader != null) {
pcHeader.setRemoved(true);
request.setHeader(IHTTPHeader.CONNECTION_A, pcHeader.getValue());
@@ -217,25 +219,21 @@
throws InterruptedException, IOException {
fClient.sendResponse(new HTTPResponseInMemoryMessage(request
.getSerial(), IHTTPHeader.HTTP_VERSION_1_0_A, "504".getBytes(),
- "Gateway Timeout".getBytes(),
- IHTTPResponseMessage.EMPTY_BODY));
+ "Gateway Timeout".getBytes(), IHTTPResponseMessage.EMPTY_BODY));
}
private void sendBadGateway(IHTTPRequestMessage request)
throws InterruptedException, IOException {
fClient.sendResponse(new HTTPResponseInMemoryMessage(request
.getSerial(), IHTTPHeader.HTTP_VERSION_1_0_A, "502".getBytes(),
- "Bad Gateway".getBytes(),
- IHTTPResponseMessage.EMPTY_BODY));
+ "Bad Gateway".getBytes(), IHTTPResponseMessage.EMPTY_BODY));
}
private void sendNotFound(IHTTPRequestMessage request)
throws InterruptedException, IOException {
- fClient
- .sendResponse(new HTTPResponseInMemoryMessage(request
- .getSerial(), IHTTPHeader.HTTP_VERSION_1_0_A, "404"
- .getBytes(), "Not Found".getBytes(),
- IHTTPResponseMessage.EMPTY_BODY));
+ fClient.sendResponse(new HTTPResponseInMemoryMessage(request
+ .getSerial(), IHTTPHeader.HTTP_VERSION_1_0_A, "404".getBytes(),
+ "Not Found".getBytes(), IHTTPResponseMessage.EMPTY_BODY));
}
public void run() {
@@ -272,7 +270,9 @@
modeConnect = false;
}
- if (null!=overrider && overrider.replaceRequest(clientStateManager, request)) {
+ if (null != overrider
+ && overrider
+ .replaceRequest(clientStateManager, request)) {
request = overrider.getSessionRequest();
response = overrider.getSessionResponse();
if (response != null) {
@@ -315,8 +315,7 @@
DEBUG("Trying to send a request to " + conn);
}
if (conn.getStat() == ServerConnection.STAT_CONNECTED) {
- if (modeConnect
- && !fExProxyConf.getExternalProxyFlag()) {
+ if (modeConnect && !fExProxyConf.getExternalProxyFlag()) {
fClient.allowTunnel(request, conn, timeout);
clearNextRequest();
request = null;
@@ -327,8 +326,8 @@
} else {
fClient.sendResponse(new HTTPResponseInMemoryMessage(
request.getSerial(),
- IHTTPHeader.HTTP_VERSION_1_0_A,
- "503".getBytes(), "Service Unavailable"
+ IHTTPHeader.HTTP_VERSION_1_0_A, "503"
+ .getBytes(), "Service Unavailable"
.getBytes(),
IHTTPResponseMessage.EMPTY_BODY));
continue;
@@ -363,11 +362,11 @@
} else {
IHTTPResponseMessage newResponse = transcode(
request, response);
- if (null!=overrider && overrider.replaceResponse(
- clientStateManager, request, response,
- timeout)) {
- newResponse = overrider
- .getSessionResponse();
+ if (null != overrider
+ && overrider.replaceResponse(
+ clientStateManager, request,
+ response, timeout)) {
+ newResponse = overrider.getSessionResponse();
}
fClient.sendResponse(timeout, newResponse);
}