blob: 4a2bc3ab2ae5db32b46c0ac6a73ba2d6689def7c [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2007 IBM Corporation and Others
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Hideki TAI - initial API and implementation
*******************************************************************************/
package org.eclipse.actf.util.httpproxy.core.impl;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import org.eclipse.actf.util.httpproxy.core.IHTTPRequestMessage;
import org.eclipse.actf.util.httpproxy.core.IHTTPResponseMessage;
import org.eclipse.actf.util.httpproxy.util.Logger;
import org.eclipse.actf.util.httpproxy.util.TimeoutException;
public abstract class ServerConnection implements Runnable {
private static final Logger LOGGER = Logger.getLogger(ServerConnection.class);
private static class SocketOpener implements Runnable {
private ServerConnection fSocketReceiver;
private String fHost;
private int fPort;
private int fSOTimeout;
private boolean isValid;
SocketOpener(String host, int port, int soTimeout, ServerConnection socketReceiver) {
fHost = host;
fPort = port;
fSOTimeout = soTimeout;
fSocketReceiver = socketReceiver;
isValid = true;
}
synchronized void setValid(boolean v) {
isValid = v;
}
synchronized boolean isValid() {
return isValid;
}
public void run() {
fSocketReceiver.DEBUG("SocketOpener started");
try {
Socket sock = null;
OutputStream out = null;
InputStream in = null;
if (isValid()) {
if (LOGGER.isDebugEnabled()) {
fSocketReceiver.DEBUG("Trying to create a Socket: " + fHost + "@" + fPort);
}
sock = new Socket(fHost, fPort);
// sock = new Socket();
// sock.setSoTimeout((int) fSOTimeout);
// sock.setTcpNoDelay(true);
// SocketAddress dest = new
// InetSocketAddress(fInfo.getHost(), fInfo.getPort());
// sock.connect(dest, (int) fSOTimeout);
sock.setSoTimeout(1);
out = sock.getOutputStream();
in = sock.getInputStream();
if (LOGGER.isDebugEnabled()) {
fSocketReceiver.DEBUG("Created a Socket: " + fHost + "@" + fPort);
}
}
if (sock != null && out != null && in != null && isValid()) {
if (LOGGER.isDebugEnabled()) {
fSocketReceiver.DEBUG("Set a Socket to the ServerConnection: " + fHost + "@" + fPort);
}
fSocketReceiver.notifyConnected(sock, out, in);
}
} catch (IOException e) {
// e.printStackTrace();
if (LOGGER.isDebugEnabled()) {
fSocketReceiver.DEBUG("Failed to create a Socket (" + e.getClass().getName() + "): " + fHost + "@" + fPort);
}
if (isValid()) {
fSocketReceiver.notifyConnectFailed(e);
}
}
}
}
private static class Status {
private int fStat = STAT_INIT;
private int fNumWaiters = 0;
Status() {
// nop
}
synchronized void set(int stat) {
fStat = stat;
this.notifyAll();
}
synchronized int get() {
return fStat;
}
synchronized boolean equals(int stat) {
return (fStat == stat);
}
synchronized boolean waitFor(int stat, long timeout)
throws InterruptedException, TimeoutException {
if (fStat == stat) return true;
if (fStat == STAT_CLOSED) return false;
fNumWaiters += 1;
long deadline = System.currentTimeMillis() + timeout;
long wait = timeout;
while (wait > 0) {
this.wait(wait);
if (fStat == stat) return true;
if (fStat == STAT_CLOSED) return false;
wait = deadline - System.currentTimeMillis();
}
throw new TimeoutException();
}
public String toString() {
switch (fStat) {
case STAT_CLOSED:
return "CLOSED";
case STAT_INIT:
return "INIT";
case STAT_CONNECTING:
return "CONNECTING";
case STAT_CONNECTED:
return "CONNECTED";
default:
return "Unkown";
}
}
}
public static final int STAT_CLOSED = -2;
public static final int STAT_FINALIZING = -1;
public static final int STAT_INIT = 0;
public static final int STAT_CONNECTING = 1;
public static final int STAT_CONNECTED = 2;
private final ServerKey fKey;
private final String name;
private final String host;
public String getHost() {
return host;
}
private final int port;
public int getPort() {
return port;
}
private final RequestDispatcher fDispatcher;
private final int fRetryTime;
private final int timeout;
private long fActivatedTime;
private Thread fThread;
private SocketOpener fSocketOpener = null;
private Thread fSocketOpenerThread = null;
private IHTTPRequestMessage fRequest;
private long fFirstTimeout = 0;
private Socket fSocket = null;
private IOException fSocketException = null;
private InputStream fInputStream = null;
InputStream getInputStream() {
return fInputStream;
}
private BufferedOutputStream fOutputStream = null;
OutputStream getOutputStream() {
return fOutputStream;
}
private HTTPResponseReader fReader = null;
private long fMessageSerial;
private Status fStat = new Status();
/**
*
*/
protected ServerConnection(String name, String host, int port, int group, int index, RequestDispatcher dispatcher,
int retryTime, int timeout) {
this.name = name;
this.host = host;
this.port = port;
fKey = new ServerKey(group, index);
fRetryTime = retryTime;
fDispatcher = dispatcher;
this.timeout = timeout;
fRequest = null;
fMessageSerial = 0;
}
protected abstract HTTPResponseMessage createHTTPResponseMessage(long msgSerial);
public synchronized void reset() {
deactivate();
setStat(STAT_INIT);
}
public ServerKey getKey() {
return fKey;
}
public synchronized boolean isActive() {
return (fThread != null);
}
// synchronized
public synchronized void activate() {
if (isActive()) {
DEBUG("activate: already active");
return;
}
DEBUG("activate");
setStat(STAT_CONNECTING);
fActivatedTime = System.currentTimeMillis();
fMessageSerial = 0;
setTimeout(false);
fThread = new Thread(this, "ServerConnection-" + name);
fThread.start();
fSocket = null;
fSocketException = null;
fOutputStream = null;
fReader = null;
fSocketOpener = new SocketOpener(host, port, (int) timeout * 2, this);
fSocketOpenerThread = new Thread(fSocketOpener, "SocketOpener-" + this.toString());
fSocketOpenerThread.start();
}
public void activateAndConnect(long timeout) throws IOException, TimeoutException, InterruptedException {
activate();
waitUntilConnected(timeout);
}
public void waitUntilConnected(long timeout) throws IOException, TimeoutException, InterruptedException {
DEBUG("waitUntilConnected");
int stat;
synchronized (fStat) {
if (fStat.equals(STAT_CONNECTING)) {
fStat.waitFor(STAT_CONNECTED, timeout);
}
stat = fStat.get();
}
if (stat == STAT_CONNECTED) {
DEBUG("connected");
return;
} else if (stat == STAT_CONNECTING) {
throw new TimeoutException("ServerConnection.waitUntilConnected");
} else if (stat == STAT_INIT) {
throw new IOException("This connection is not activated yet");
} else if (stat == STAT_CLOSED) {
throw new IOException("This connection is already closed");
}
}
/**
* Deactivate this connection.
*/
public synchronized void deactivate() {
DEBUG("deactivate");
if (!isActive()) {
DEBUG("deactivate: already deactive");
return;
}
// Stop the thread
try {
if (fSocket != null && !fSocket.isOutputShutdown()) {
fSocket.shutdownOutput();
}
} catch (IOException e) {
// ignore
WARNING("Failed to shutdown a socket (IOException): " + e.getMessage());
}
fThread.interrupt();
fThread = null;
fSocketOpener.setValid(false);
fSocketOpenerThread.interrupt();
fSocketOpener = null;
fSocketOpenerThread = null;
fSocket = null;
fOutputStream = null;
fReader = null;
fActivatedTime = 0;
fMessageSerial = 0;
setTimeout(false);
setStat(STAT_CLOSED);
}
synchronized void notifyConnected(Socket sock, OutputStream out, InputStream in) {
if (sock == null) {
throw new IllegalArgumentException("null");
}
DEBUG("setSocket");
fSocket = sock;
fSocketException = null;
fOutputStream = new BufferedOutputStream(new SocketTimeoutRetryOutputStream(out));
fInputStream = in;
fReader = fDispatcher.createHTTPResponseReader(in);
setStat(STAT_CONNECTED);
}
synchronized void notifyConnectFailed(IOException e) {
WARNING("setSocketException: failed to create a socket (IOException): " + e.getMessage());
fSocket = null;
fSocketException = e;
fOutputStream = null;
fReader = null;
deactivate();
}
public synchronized void setTimeout(boolean timeout) {
if (timeout) {
if (fFirstTimeout == 0) {
fFirstTimeout = System.currentTimeMillis();
}
} else {
fFirstTimeout = 0;
}
}
private synchronized boolean isTimeout() {
return (fFirstTimeout != 0);
}
public synchronized boolean isInvalid() {
if (fFirstTimeout == 0) {
return false;
}
return (System.currentTimeMillis() - fFirstTimeout < fRetryTime);
}
public int getStat() {
return fStat.get();
}
private void setStat(int stat) {
fStat.set(stat);
}
// synchronized
private boolean startSession(long msgSerial, long timeout) throws TimeoutException, InterruptedException {
if (fMessageSerial != 0) {
if (msgSerial == fMessageSerial) {
// Avoid sending the same message
return false;
}
// There is a message whose response is not yet arrived.
// Wait until a response arrives, or timeout.
long deadline = System.currentTimeMillis() + timeout;
long wait = timeout;
while (wait > 0) {
this.wait(wait); // <--
if (fMessageSerial == 0) {
break;
}
wait = deadline - System.currentTimeMillis();
}
if (fMessageSerial != 0) {
throw new TimeoutException("ServerConnection.startSession");
}
}
// (fMessageSerial == 0) && (fMessageSerial != msgSerial)
fMessageSerial = msgSerial;
if (LOGGER.isDebugEnabled()) {
DEBUG("Session started: msgSerial=" + fMessageSerial);
}
return true;
}
private synchronized void finishSession() {
if (LOGGER.isDebugEnabled()) {
DEBUG("Session finished: msgSerial=" + fMessageSerial);
}
assert (fMessageSerial != 0);
fMessageSerial = 0; // -->startSession
this.notifyAll();
}
/**
* Send a message asynchronously.
*
* @param req
* @param timeout
* @return
*/
public synchronized void putRequest(IHTTPRequestMessage req, long timeout)
throws TimeoutException, InterruptedException
{
if (LOGGER.isDebugEnabled()) {
DEBUG("putRequest: msgSerial=" + req.getSerial() + ", tid=" + req.getTid());
}
boolean sessionStarted = startSession(req.getSerial(), timeout);
assert (req.getSerial() == fMessageSerial);
if (sessionStarted) {
// fMessageSerial != null
fRequest = req; // -->nextRequest
this.notifyAll();
}
}
/*
public synchronized HTTPResponseMessage getResponse(long timeout)
throws TimeoutException, InterruptedException
{
while (fResponse == null) {
this.wait();
}
HTTPResponseMessage response = fResponse;
fResponse = null;
fMessageSerial = 0; //-->startSession (putRequest)
this.notifyAll();
return response;
}
*/
private synchronized IHTTPRequestMessage nextRequest() throws InterruptedException {
while (fRequest == null || fMessageSerial == 0) {
if (LOGGER.isDebugEnabled()) {
DEBUG("nextRequest waiting: request=" + fRequest + ", msgSerial=" + fMessageSerial);
}
this.wait();
}
// (fRequest != null) && (fMessageSerial != 0)
if (LOGGER.isDebugEnabled()) {
DEBUG("nextRequest waiting done: request=" + fRequest + ", msgSerial=" + fMessageSerial);
}
IHTTPRequestMessage req = fRequest;
fRequest = null;
return req;
}
// run()
private IHTTPResponseMessage receiveResponse(long timeout, boolean isBodyEmpty) throws InterruptedException,
IOException, TimeoutException {
HTTPResponseReader reader;
long msgSerial;
synchronized (this) {
assert (fStat.equals(STAT_CONNECTED)) && (fReader != null) && (fMessageSerial != 0);
if (fReader == null || fMessageSerial == 0) {
throw new IOException("Deactivated");
}
reader = fReader;
msgSerial = fMessageSerial;
}
// if (fMessageSerial == 0) {
// long start = System.currentTimeMillis();
// long deadline = start + timeout;
// long wait = timeout;
// while (wait > 0) {
// this.wait(wait);
// if (fMessageSerial != 0) {
// break;
// }
// wait = deadline - System.currentTimeMillis();
// }
// timeout -= System.currentTimeMillis() - start;
// }
// if (fMessageSerial == 0 || timeout <= 0) {
// throw new TimeoutException("ServerConnection.receiveResponse");
// }
// (fMessageSerial != 0) && (timeout > 0)
HTTPResponseMessage response = createHTTPResponseMessage(msgSerial);
DEBUG("Try to read response...");
reader.readMessage(response, timeout, isBodyEmpty);
// if (LOGGER.isDebugEnabled()) {
// DEBUG("Received a response: msgSerial=" + response.getSerial());
// System.out.println("#######Response from server group " + fGroup.getIndex());
// try { response.write(System.out); } catch (Exception e) {}
// System.out.println("####################################################################");
// }
// fInfo.decrementProcessingRequest();
// if (response.getFFFResultCode() == FFFServletHeader.RC_SUCCESS) {
// fInfo.updateLastActivityTime();
// }
return response;
}
private synchronized void sendRequest(IHTTPRequestMessage request,
long timeout)
throws IOException, InterruptedException, TimeoutException {
if (LOGGER.isDebugEnabled()) {
DEBUG("sendRequest....");
}
long t0 = System.currentTimeMillis();
waitUntilConnected(timeout);
assert (fOutputStream != null);
long t1 = System.currentTimeMillis();
timeout -= t1 - t0;
if (timeout <= 0) {
throw new TimeoutException();
}
if (LOGGER.isDebugEnabled()) {
DEBUG("Sent a request: msgSerial=" + request.getSerial() + ", tid=" + request.getTid());
StringBuffer sb = new StringBuffer();
ByteArrayOutputStream ob = new ByteArrayOutputStream();
BifurcatedOutputStream o = new BifurcatedOutputStream(fOutputStream, ob);
request.write(timeout, o);
ob.close();
sb.append("\n===============>====>====>====>=============>\n");
sb.append(ob.toString());
sb.append("===============<====<====<====<=============<\n");
DEBUG(sb.toString());
} else {
request.write(timeout, fOutputStream);
}
fOutputStream.flush();
fMessageSerial = request.getSerial();
// fInfo.incrementProcessingRequest();
}
/**
* Reciever a response from the server. Keep trying to read a response and
* pass it to the dispatcher.
*/
public void run() {
DEBUG("receiver thread started");
// long lastActivityTime = System.currentTimeMillis();
try {
boolean serverError = false;
int counter = 0;
while (!Thread.interrupted() && !serverError) {
counter += 1;
IHTTPRequestMessage request = nextRequest();
// Send a request to the server
try {
if (request.isConnectionShutdownRequired()) {
request.setConnectionHeader(false);
sendRequest(request, timeout);
fSocket.shutdownOutput();
setStat(STAT_FINALIZING);
} else {
request.setConnectionHeader(true);
sendRequest(request, timeout);
}
// fInfo.notifySuccessfulSend();
// lastActivityTime = System.currentTimeMillis();
} catch (TimeoutException timeout) {
DEBUG("sendRequest() timeout");
setTimeout(true);
continue;
}
assert (fStat.equals(STAT_CONNECTED) || fStat.equals(STAT_FINALIZING));
// Recieve a resposne from the server
int loop = 0;
long start = System.currentTimeMillis();
while (!Thread.currentThread().isInterrupted() && !serverError) {
try {
DEBUG("ReceiveResponse...");
IHTTPResponseMessage response = receiveResponse(timeout, request.isResponseBodyEmpty());
// Send the response to the client
DEBUG("ResponseArrived...");
if (response.isConnectionToBeClosed()) {
DEBUG("Since the response is not keep-alive, we do not reuse this connection.");
setStat(STAT_FINALIZING);
}
fDispatcher.responseArrived(this, response); // Interrupted
finishSession();
break;
} catch (TimeoutException timeout) {
// continue;
// setTimeout(true);
if (LOGGER.isDebugEnabled()) {
DEBUG("response receiving timeout (" + timeout.getMessage() + "): retry=" + loop
+ ", elapsed=" + (System.currentTimeMillis() - start));
}
}
loop += 1;
long delay = System.currentTimeMillis() - start;
if (delay > 60 * 1000) {
throw new IOException("Shutdown this connection");
}
}
}
} catch (HTTPConnectionException e) {
DEBUG("The connection is closed by the peer.");
} catch (IOException e) {
DEBUG("IOException: " + e.getMessage());
HTTPMalformedResponseMessage response = new HTTPMalformedResponseMessage(fMessageSerial, e);
try {
fDispatcher.responseArrived(this, response);
} catch (InterruptedException interrupted) {
}
finishSession();
} catch (InterruptedException e) {
DEBUG("reader thread interrupted");
// ignore
} finally {
DEBUG("receiver thread stopped");
deactivate();
}
}
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append(host).append(':').append(port).append('.').append(fKey.toString());
return sb.toString();
}
private final void DEBUG(String msg) {
if (LOGGER.isDebugEnabled()) {
StringBuffer sb = new StringBuffer();
sb.append(name).append(" [");
sb.append(this.toString()).append("] ").append(msg);
LOGGER.debug(sb.toString());
}
}
private final void WARNING(String msg) {
StringBuffer sb = new StringBuffer();
sb.append(name).append(" [");
sb.append(this.toString()).append("] ").append(msg);
LOGGER.warning(sb.toString());
}
public synchronized String dump() {
StringBuffer sb = new StringBuffer();
sb.append(this.toString()).append(": stat=").append(fStat);
sb.append(", thread=");
sb.append((fThread == null) ? "null" : Boolean.toString(fThread.isAlive()));
sb.append(", socketOpener=");
sb.append((fSocketOpener == null) ? "null" : "exists");
sb.append(", socketOpenerThread=");
sb.append((fSocketOpenerThread == null) ? "null" : Boolean.toString(fSocketOpenerThread.isAlive()));
sb.append(", socket=");
sb.append((fSocket == null) ? "null" : Boolean.toString(fSocket.isConnected()));
sb.append(", invalid=").append(isInvalid());
return sb.toString();
}
}