/*******************************************************************************
 * Copyright (c) 2007 IBM Corporation and Others
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/legal/epl-v10.html
 *
 * Contributors:
 *    Hideki TAI - initial API and implementation
 *******************************************************************************/
package org.eclipse.actf.util.httpproxy.core.impl;

import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

import org.eclipse.actf.util.httpproxy.core.IHTTPRequestMessage;
import org.eclipse.actf.util.httpproxy.core.IHTTPResponseMessage;
import org.eclipse.actf.util.httpproxy.util.Logger;
import org.eclipse.actf.util.httpproxy.util.TimeoutException;



public abstract class ServerConnection implements Runnable {
    private static final Logger LOGGER = Logger.getLogger(ServerConnection.class);

    private static class SocketOpener implements Runnable {
        private ServerConnection fSocketReceiver;

        private String fHost;

        private int fPort;

        private int fSOTimeout;

        private boolean isValid;

        SocketOpener(String host, int port, int soTimeout, ServerConnection socketReceiver) {
            fHost = host;
            fPort = port;
            fSOTimeout = soTimeout;
            fSocketReceiver = socketReceiver;
            isValid = true;
        }

        synchronized void setValid(boolean v) {
            isValid = v;
        }

        synchronized boolean isValid() {
            return isValid;
        }

        public void run() {
            fSocketReceiver.DEBUG("SocketOpener started");
            try {
                Socket sock = null;
                OutputStream out = null;
                InputStream in = null;
                if (isValid()) {
                    if (LOGGER.isDebugEnabled()) {
                        fSocketReceiver.DEBUG("Trying to create a Socket: " + fHost + "@" + fPort);
                    }
                    sock = new Socket(fHost, fPort);
                    // sock = new Socket();
                    // sock.setSoTimeout((int) fSOTimeout);
                    // sock.setTcpNoDelay(true);
                    // SocketAddress dest = new
                    // InetSocketAddress(fInfo.getHost(), fInfo.getPort());
                    // sock.connect(dest, (int) fSOTimeout);
                    sock.setSoTimeout(1);
                    out = sock.getOutputStream();
                    in = sock.getInputStream();
                    if (LOGGER.isDebugEnabled()) {
                        fSocketReceiver.DEBUG("Created a Socket: " + fHost + "@" + fPort);
                    }
                }
                if (sock != null && out != null && in != null && isValid()) {
                    if (LOGGER.isDebugEnabled()) {
                        fSocketReceiver.DEBUG("Set a Socket to the ServerConnection: " + fHost + "@" + fPort);
                    }
                    fSocketReceiver.notifyConnected(sock, out, in);
                }
            } catch (IOException e) {
                // e.printStackTrace();
                if (LOGGER.isDebugEnabled()) {
                    fSocketReceiver.DEBUG("Failed to create a Socket (" + e.getClass().getName() + "): " + fHost + "@" + fPort);
                }
                if (isValid()) {
                    fSocketReceiver.notifyConnectFailed(e);
                }
            }
        }
    }

    private static class Status {
        private int fStat = STAT_INIT;

        private int fNumWaiters = 0;

        Status() {
            // nop
        }

        synchronized void set(int stat) {
            fStat = stat;
            this.notifyAll();
        }

        synchronized int get() {
            return fStat;
        }

        synchronized boolean equals(int stat) {
            return (fStat == stat);
        }

        synchronized boolean waitFor(int stat, long timeout)
            throws InterruptedException, TimeoutException {
            if (fStat == stat) return true;
            if (fStat == STAT_CLOSED) return false;

            fNumWaiters += 1;
            long deadline = System.currentTimeMillis() + timeout;
            long wait = timeout;
            while (wait > 0) {
                this.wait(wait);
                if (fStat == stat) return true;
                if (fStat == STAT_CLOSED) return false;
                wait = deadline - System.currentTimeMillis();
            }
            throw new TimeoutException();
        }

        public String toString() {
            switch (fStat) {
            case STAT_CLOSED:
                return "CLOSED";
            case STAT_INIT:
                return "INIT";
            case STAT_CONNECTING:
                return "CONNECTING";
            case STAT_CONNECTED:
                return "CONNECTED";
            default:
                return "Unkown";
            }
        }
    }

    public static final int STAT_CLOSED = -2;

    public static final int STAT_FINALIZING = -1;

    public static final int STAT_INIT = 0;

    public static final int STAT_CONNECTING = 1;

    public static final int STAT_CONNECTED = 2;

    private final ServerKey fKey;

    private final String name;

    private final String host;

    public String getHost() {
        return host;
    }

    private final int port;

    public int getPort() {
        return port;
    }

    private final RequestDispatcher fDispatcher;

    private final int fRetryTime;

    private final int timeout;

    private long fActivatedTime;

    private Thread fThread;

    private SocketOpener fSocketOpener = null;

    private Thread fSocketOpenerThread = null;

    private IHTTPRequestMessage fRequest;

    private long fFirstTimeout = 0;

    private Socket fSocket = null;

    private IOException fSocketException = null;

    private InputStream fInputStream = null;

    InputStream getInputStream() {
        return fInputStream;
    }

    private BufferedOutputStream fOutputStream = null;

    OutputStream getOutputStream() {
        return fOutputStream;
    }

    private HTTPResponseReader fReader = null;

    private long fMessageSerial;

    private Status fStat = new Status();

    /**
     * 
     */
    protected ServerConnection(String name, String host, int port, int group, int index, RequestDispatcher dispatcher,
            int retryTime, int timeout) {
        this.name = name;
        this.host = host;
        this.port = port;
        fKey = new ServerKey(group, index);
        fRetryTime = retryTime;
        fDispatcher = dispatcher;
        this.timeout = timeout;
        fRequest = null;
        fMessageSerial = 0;
    }

    protected abstract HTTPResponseMessage createHTTPResponseMessage(long msgSerial);
    
    public synchronized void reset() {
        deactivate();
        setStat(STAT_INIT);
    }

    public ServerKey getKey() {
        return fKey;
    }

    public synchronized boolean isActive() {
        return (fThread != null);
    }

    // synchronized
    public synchronized void activate() {
        if (isActive()) {
            DEBUG("activate: already active");
            return;
        }
        DEBUG("activate");
        setStat(STAT_CONNECTING);
        fActivatedTime = System.currentTimeMillis();
        fMessageSerial = 0;
        setTimeout(false);
        fThread = new Thread(this, "ServerConnection-" + name);
        fThread.start();

        fSocket = null;
        fSocketException = null;
        fOutputStream = null;
        fReader = null;

        fSocketOpener = new SocketOpener(host, port, (int) timeout * 2, this);
        fSocketOpenerThread = new Thread(fSocketOpener, "SocketOpener-" + this.toString());
        fSocketOpenerThread.start();
    }

    public void activateAndConnect(long timeout) throws IOException, TimeoutException, InterruptedException {
        activate();
        waitUntilConnected(timeout);
    }

    public void waitUntilConnected(long timeout) throws IOException, TimeoutException, InterruptedException {
        DEBUG("waitUntilConnected");
        int stat;
        synchronized (fStat) {
            if (fStat.equals(STAT_CONNECTING)) {
                fStat.waitFor(STAT_CONNECTED, timeout);
            }
            stat = fStat.get();
        }
        if (stat == STAT_CONNECTED) {
            DEBUG("connected");
            return;
        } else if (stat == STAT_CONNECTING) {
            throw new TimeoutException("ServerConnection.waitUntilConnected");
        } else if (stat == STAT_INIT) {
            throw new IOException("This connection is not activated yet");
        } else if (stat == STAT_CLOSED) {
            throw new IOException("This connection is already closed");
        }
    }

    /**
     * Deactivate this connection.
     */
    public synchronized void deactivate() {
        DEBUG("deactivate");
        if (!isActive()) {
            DEBUG("deactivate: already deactive");
            return;
        }

        // Stop the thread
        try {
            if (fSocket != null && !fSocket.isOutputShutdown()) {
                fSocket.shutdownOutput();
            }
        } catch (IOException e) {
            // ignore
            WARNING("Failed to shutdown a socket (IOException): " + e.getMessage());
        }
        fThread.interrupt();
        fThread = null;

        fSocketOpener.setValid(false);
        fSocketOpenerThread.interrupt();

        fSocketOpener = null;
        fSocketOpenerThread = null;
        fSocket = null;
        fOutputStream = null;
        fReader = null;

        fActivatedTime = 0;
        fMessageSerial = 0;
        setTimeout(false);
        setStat(STAT_CLOSED);
    }

    synchronized void notifyConnected(Socket sock, OutputStream out, InputStream in) {
        if (sock == null) {
            throw new IllegalArgumentException("null");
        }
        DEBUG("setSocket");
        fSocket = sock;
        fSocketException = null;
        fOutputStream = new BufferedOutputStream(new SocketTimeoutRetryOutputStream(out));
        fInputStream = in;
        fReader = fDispatcher.createHTTPResponseReader(in);
        setStat(STAT_CONNECTED);
    }

    synchronized void notifyConnectFailed(IOException e) {
        WARNING("setSocketException: failed to create a socket (IOException): " + e.getMessage());
        fSocket = null;
        fSocketException = e;
        fOutputStream = null;
        fReader = null;
        deactivate();
    }

    public synchronized void setTimeout(boolean timeout) {
        if (timeout) {
            if (fFirstTimeout == 0) {
                fFirstTimeout = System.currentTimeMillis();
            }
        } else {
            fFirstTimeout = 0;
        }
    }

    private synchronized boolean isTimeout() {
        return (fFirstTimeout != 0);
    }

    public synchronized boolean isInvalid() {
        if (fFirstTimeout == 0) {
            return false;
        }
        return (System.currentTimeMillis() - fFirstTimeout < fRetryTime);
    }

    public int getStat() {
        return fStat.get();
    }

    private void setStat(int stat) {
        fStat.set(stat);
    }

    // synchronized
    private boolean startSession(long msgSerial, long timeout) throws TimeoutException, InterruptedException {
        if (fMessageSerial != 0) {
            if (msgSerial == fMessageSerial) {
                // Avoid sending the same message
                return false;
            }
            // There is a message whose response is not yet arrived.
            // Wait until a response arrives, or timeout.
            long deadline = System.currentTimeMillis() + timeout;
            long wait = timeout;
            while (wait > 0) {
                this.wait(wait); // <--
                if (fMessageSerial == 0) {
                    break;
                }
                wait = deadline - System.currentTimeMillis();
            }
            if (fMessageSerial != 0) {
                throw new TimeoutException("ServerConnection.startSession");
            }
        }
        // (fMessageSerial == 0) && (fMessageSerial != msgSerial)
        fMessageSerial = msgSerial;
        if (LOGGER.isDebugEnabled()) {
            DEBUG("Session started: msgSerial=" + fMessageSerial);
        }
        return true;
    }

    private synchronized void finishSession() {
        if (LOGGER.isDebugEnabled()) {
            DEBUG("Session finished: msgSerial=" + fMessageSerial);
        }
        assert (fMessageSerial != 0);
        fMessageSerial = 0; // -->startSession
        this.notifyAll();
    }

    /**
     * Send a message asynchronously.
     * 
     * @param req
     * @param timeout
     * @return
     */
    public synchronized void putRequest(IHTTPRequestMessage req, long timeout)
        throws TimeoutException, InterruptedException
    {
        if (LOGGER.isDebugEnabled()) {
            DEBUG("putRequest: msgSerial=" + req.getSerial() + ", tid=" + req.getTid());
        }
        boolean sessionStarted = startSession(req.getSerial(), timeout);
        assert (req.getSerial() == fMessageSerial);
        if (sessionStarted) {
            // fMessageSerial != null
            fRequest = req; // -->nextRequest
            this.notifyAll();
        }
    }

    /*
      public synchronized HTTPResponseMessage getResponse(long timeout) 
      throws TimeoutException, InterruptedException 
      {
      while (fResponse == null) {
      this.wait();
      }
      HTTPResponseMessage response = fResponse;
      fResponse = null;
      fMessageSerial = 0;  //-->startSession (putRequest) 
      this.notifyAll();
      return response;
      }
    */
        
    private synchronized IHTTPRequestMessage nextRequest() throws InterruptedException {
        while (fRequest == null || fMessageSerial == 0) {
            if (LOGGER.isDebugEnabled()) {
                DEBUG("nextRequest waiting: request=" + fRequest + ", msgSerial=" + fMessageSerial);
            }
            this.wait();
        }
        // (fRequest != null) && (fMessageSerial != 0)
        if (LOGGER.isDebugEnabled()) {
            DEBUG("nextRequest waiting done: request=" + fRequest + ", msgSerial=" + fMessageSerial);
        }
        IHTTPRequestMessage req = fRequest;
        fRequest = null;
        return req;
    }

    // run()
    private IHTTPResponseMessage receiveResponse(long timeout, boolean isBodyEmpty) throws InterruptedException,
            IOException, TimeoutException {
        HTTPResponseReader reader;
        long msgSerial;
        synchronized (this) {
            assert (fStat.equals(STAT_CONNECTED)) && (fReader != null) && (fMessageSerial != 0);
            if (fReader == null || fMessageSerial == 0) {
                throw new IOException("Deactivated");
            }
            reader = fReader;
            msgSerial = fMessageSerial;
        }

        // if (fMessageSerial == 0) {
        // long start = System.currentTimeMillis();
        // long deadline = start + timeout;
        // long wait = timeout;
        // while (wait > 0) {
        // this.wait(wait);
        // if (fMessageSerial != 0) {
        // break;
        // }
        // wait = deadline - System.currentTimeMillis();
        // }
        // timeout -= System.currentTimeMillis() - start;
        // }
        // if (fMessageSerial == 0 || timeout <= 0) {
        // throw new TimeoutException("ServerConnection.receiveResponse");
        // }
        // (fMessageSerial != 0) && (timeout > 0)

        HTTPResponseMessage response = createHTTPResponseMessage(msgSerial);
        DEBUG("Try to read response...");
        reader.readMessage(response, timeout, isBodyEmpty);
        
        //              if (LOGGER.isDebugEnabled()) {
        //                      DEBUG("Received a response: msgSerial=" + response.getSerial());
        //                      System.out.println("#######Response from server group " + fGroup.getIndex());
        //                      try { response.write(System.out); } catch (Exception e) {}
        //                      System.out.println("####################################################################");
        //              }
        //              fInfo.decrementProcessingRequest();
        //              if (response.getFFFResultCode() == FFFServletHeader.RC_SUCCESS) {
        //                      fInfo.updateLastActivityTime();
        //              }
        return response;
    }
        
    private synchronized void sendRequest(IHTTPRequestMessage request,
                                          long timeout)
        throws IOException, InterruptedException, TimeoutException {
        if (LOGGER.isDebugEnabled()) {
            DEBUG("sendRequest....");
        }
        long t0 = System.currentTimeMillis();
        waitUntilConnected(timeout);
        assert (fOutputStream != null);
        long t1 = System.currentTimeMillis();
        timeout -= t1 - t0;
        if (timeout <= 0) {
            throw new TimeoutException(); 
        }

        if (LOGGER.isDebugEnabled()) {
            DEBUG("Sent a request: msgSerial=" + request.getSerial() + ", tid=" + request.getTid());
            StringBuffer sb = new StringBuffer();
            ByteArrayOutputStream ob = new ByteArrayOutputStream();
            BifurcatedOutputStream o = new BifurcatedOutputStream(fOutputStream, ob);
            request.write(timeout, o);
            ob.close();
            sb.append("\n===============>====>====>====>=============>\n");
            sb.append(ob.toString());
            sb.append("===============<====<====<====<=============<\n");
            DEBUG(sb.toString());
        } else {
            request.write(timeout, fOutputStream);
        }
        fOutputStream.flush();

        fMessageSerial = request.getSerial();

        // fInfo.incrementProcessingRequest();
    }

    /**
     * Reciever a response from the server. Keep trying to read a response and
     * pass it to the dispatcher.
     */
    public void run() {
        DEBUG("receiver thread started");
        // long lastActivityTime = System.currentTimeMillis();
        try {
            boolean serverError = false;
            int counter = 0;
            while (!Thread.interrupted() && !serverError) {
                counter += 1;
                IHTTPRequestMessage request = nextRequest();

                // Send a request to the server
                try {
                    if (request.isConnectionShutdownRequired()) {
                        request.setConnectionHeader(false);
                        sendRequest(request, timeout);
                        fSocket.shutdownOutput();
                        setStat(STAT_FINALIZING);
                    } else {
                        request.setConnectionHeader(true);
                        sendRequest(request, timeout);
                    }
                    // fInfo.notifySuccessfulSend();
                    // lastActivityTime = System.currentTimeMillis();
                } catch (TimeoutException timeout) {
                    DEBUG("sendRequest() timeout");
                    setTimeout(true);
                    continue;
                }

                assert (fStat.equals(STAT_CONNECTED) || fStat.equals(STAT_FINALIZING));

                // Recieve a resposne from the server
                int loop = 0;
                long start = System.currentTimeMillis();
                while (!Thread.currentThread().isInterrupted() && !serverError) {
                    try {
                        DEBUG("ReceiveResponse...");
                        IHTTPResponseMessage response = receiveResponse(timeout, request.isResponseBodyEmpty());

                        // Send the response to the client
                        DEBUG("ResponseArrived...");
                        if (response.isConnectionToBeClosed()) {
                            DEBUG("Since the response is not keep-alive, we do not reuse this connection.");
                            setStat(STAT_FINALIZING);
                        }
                        fDispatcher.responseArrived(this, response); // Interrupted
                        finishSession();

                        break;
                    } catch (TimeoutException timeout) {
                        // continue;
                        // setTimeout(true);
                        if (LOGGER.isDebugEnabled()) {
                            DEBUG("response receiving timeout (" + timeout.getMessage() + "): retry=" + loop
                                    + ", elapsed=" + (System.currentTimeMillis() - start));
                        }
                    }
                    loop += 1;
                    long delay = System.currentTimeMillis() - start;
                    if (delay > 60 * 1000) {
                        throw new IOException("Shutdown this connection");
                    }
                }
            }
        } catch (HTTPConnectionException e) {
        	DEBUG("The connection is closed by the peer.");
        } catch (IOException e) {
            DEBUG("IOException: " + e.getMessage());
            HTTPMalformedResponseMessage response = new HTTPMalformedResponseMessage(fMessageSerial, e);
            try {
                fDispatcher.responseArrived(this, response);
            } catch (InterruptedException interrupted) {
            }
            finishSession();
        } catch (InterruptedException e) {
            DEBUG("reader thread interrupted");
            // ignore
        } finally {
            DEBUG("receiver thread stopped");
            deactivate();
        }
    }

    public String toString() {
        StringBuffer sb = new StringBuffer();
        sb.append(host).append(':').append(port).append('.').append(fKey.toString());
        return sb.toString();
    }

    private final void DEBUG(String msg) {
        if (LOGGER.isDebugEnabled()) {
            StringBuffer sb = new StringBuffer();
            sb.append(name).append(" [");
            sb.append(this.toString()).append("] ").append(msg);
            LOGGER.debug(sb.toString());
        }
    }

    private final void WARNING(String msg) {
        StringBuffer sb = new StringBuffer();
        sb.append(name).append(" [");
        sb.append(this.toString()).append("] ").append(msg);
        LOGGER.warning(sb.toString());
    }

    public synchronized String dump() {
        StringBuffer sb = new StringBuffer();
        sb.append(this.toString()).append(": stat=").append(fStat);
        sb.append(", thread=");
        sb.append((fThread == null) ? "null" : Boolean.toString(fThread.isAlive()));
        sb.append(", socketOpener=");
        sb.append((fSocketOpener == null) ? "null" : "exists");
        sb.append(", socketOpenerThread=");
        sb.append((fSocketOpenerThread == null) ? "null" : Boolean.toString(fSocketOpenerThread.isAlive()));
        sb.append(", socket=");
        sb.append((fSocket == null) ? "null" : Boolean.toString(fSocket.isConnected()));
        sb.append(", invalid=").append(isInvalid());
        return sb.toString();
    }
}
