/*
 * Copyright (c) OSGi Alliance (2017). All Rights Reserved.
 * 
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.osgi.util.promise;

import static org.osgi.util.promise.PromiseImpl.uncaughtException;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.RunnableScheduledFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.osgi.annotation.versioning.ConsumerType;
import org.osgi.util.promise.PromiseImpl.Result;

/**
 * Promise factory to create Deferred and Promise objects.
 * <p>
 * Instances of this class can be used to create Deferred and Promise objects
 * which use the executors used to construct this object for any callback or
 * scheduled operation execution.
 * 
 * @Immutable
 * @author $Id$
 * @since 1.1
 */
@ConsumerType
public class PromiseFactory {
	/**
	 * The default factory which uses the default callback executor and default
	 * scheduled executor.
	 */
	final static PromiseFactory				defaultFactory	= new PromiseFactory(
			null, null);

	/**
	 * The executor to use for callbacks. If {@code null}, the default
	 * callback executor is used.
	 */
	private final Executor					callbackExecutor;
	/**
	 * The executor to use for scheduled operations. If {@code null}, the
	 * default scheduled executor is used.
	 */
	private final ScheduledExecutorService	scheduledExecutor;


	/**
	 * Create a new PromiseFactory with the specified callback executor.
	 * <p>
	 * The default scheduled executor will be used.
	 * 
	 * @param callbackExecutor The executor to use for callbacks. {@code null}
	 *            can be specified for the default callback executor.
	 */
	public PromiseFactory(Executor callbackExecutor) {
		this(callbackExecutor, null);
	}

	/**
	 * Create a new PromiseFactory with the specified callback executor and
	 * specified scheduled executor.
	 * 
	 * @param callbackExecutor The executor to use for callbacks. {@code null}
	 *            can be specified for the default callback executor.
	 * @param scheduledExecutor The scheduled executor for use for scheduled
	 *            operations. {@code null} can be specified for the default
	 *            scheduled executor.
	 */
	public PromiseFactory(Executor callbackExecutor,
			ScheduledExecutorService scheduledExecutor) {
		this.callbackExecutor = callbackExecutor;
		this.scheduledExecutor = scheduledExecutor;
	}

	/**
	 * Returns the executor to use for callbacks.
	 * 
	 * @return The executor to use for callbacks. This will be the default
	 *         callback executor if {@code null} was specified for the callback
	 *         executor when this PromiseFactory was created.
	 */
	public Executor executor() {
		if (callbackExecutor == null) {
			return DefaultExecutors.callbackExecutor();
		}
		return callbackExecutor;
	}

	/**
	 * Returns the scheduled executor to use for scheduled operations.
	 * 
	 * @return The scheduled executor to use for scheduled operations. This will
	 *         be the default scheduled executor if {@code null} was specified
	 *         for the scheduled executor when this PromiseFactory was created.
	 */
	public ScheduledExecutorService scheduledExecutor() {
		if (scheduledExecutor == null) {
			return DefaultExecutors.scheduledExecutor();
		}
		return scheduledExecutor;
	}

	/**
	 * Create a new Deferred with the callback executor and scheduled executor
	 * of this PromiseFactory object.
	 * <p>
	 * Use this method instead of {@link Deferred#Deferred()} to create a new
	 * {@link Deferred} whose associated Promise uses executors other than the
	 * default executors.
	 * 
	 * @param <T> The value type associated with the returned Deferred.
	 * @return A new {@link Deferred} with the callback and scheduled executors
	 *         of this PromiseFactory object
	 */
	public <T> Deferred<T> deferred() {
		return new Deferred<>(this);
	}

	/**
	 * Returns a new Promise that has been resolved with the specified value.
	 * <p>
	 * The returned Promise uses the callback executor and scheduled executor of
	 * this PromiseFactory object
	 * <p>
	 * Use this method instead of {@link Promises#resolved(Object)} to create a
	 * Promise which uses executors other than the default executors.
	 * 
	 * @param <T> The value type associated with the returned Promise.
	 * @param value The value of the resolved Promise.
	 * @return A new Promise that has been resolved with the specified value.
	 */
	public <T> Promise<T> resolved(T value) {
		return new ResolvedPromiseImpl<>(value, this);
	}

	/**
	 * Returns a new Promise that has been resolved with the specified failure.
	 * <p>
	 * The returned Promise uses the callback executor and scheduled executor of
	 * this PromiseFactory object
	 * <p>
	 * Use this method instead of {@link Promises#failed(Throwable)} to create a
	 * Promise which uses executors other than the default executors.
	 * 
	 * @param <T> The value type associated with the returned Promise.
	 * @param failure The failure of the resolved Promise. Must not be
	 *            {@code null}.
	 * @return A new Promise that has been resolved with the specified failure.
	 */
	public <T> Promise<T> failed(Throwable failure) {
		return new FailedPromiseImpl<>(failure, this);
	}

	/**
	 * Returns a new Promise that will hold the result of the specified task.
	 * <p>
	 * The returned Promise uses the callback executor and scheduled executor of
	 * this PromiseFactory object
	 * <p>
	 * The specified task will be executed on the {@link #executor() callback
	 * executor}.
	 * 
	 * @param <T> The value type associated with the returned Promise.
	 * @param task The task whose result will be available from the returned
	 *            Promise.
	 * @return A new Promise that will hold the result of the specified task.
	 */
	public <T> Promise<T> submit(Callable< ? extends T> task) {
		DeferredPromiseImpl<T> promise = new DeferredPromiseImpl<>(this);
		Runnable submit = promise.new Submit(task);
		try {
			executor().execute(submit);
		} catch (Exception t) {
			promise.tryResolve(null, t);
		}
		return promise;
	}

	/**
	 * Returns a new Promise that is a latch on the resolution of the specified
	 * Promises.
	 * <p>
	 * The returned Promise uses the callback executor and scheduled executor of
	 * this PromiseFactory object
	 * <p>
	 * The returned Promise acts as a gate and must be resolved after all of the
	 * specified Promises are resolved.
	 * 
	 * @param <T> The value type of the List value associated with the returned
	 *            Promise.
	 * @param <S> A subtype of the value type of the List value associated with
	 *            the returned Promise.
	 * @param promises The Promises which must be resolved before the returned
	 *            Promise must be resolved. Must not be {@code null} and all of
	 *            the elements in the collection must not be {@code null}.
	 * @return A Promise that must be successfully resolved with a List of the
	 *         values in the order of the specified Promises if all the
	 *         specified Promises are successfully resolved. The List in the
	 *         returned Promise is the property of the caller and is modifiable.
	 *         The returned Promise must be resolved with a failure of
	 *         {@link FailedPromisesException} if any of the specified Promises
	 *         are resolved with a failure. The failure
	 *         {@link FailedPromisesException} must contain all of the specified
	 *         Promises which resolved with a failure.
	 */
	public <T, S extends T> Promise<List<T>> all(
			Collection<Promise<S>> promises) {
		if (promises.isEmpty()) {
			List<T> result = new ArrayList<>();
			return resolved(result);
		}

		DeferredPromiseImpl<List<T>> promise = new DeferredPromiseImpl<>(this);
		/* make a copy and capture the ordering */
		List<Promise<S>> list = new ArrayList<>(promises);
		All<T,S> all = new All<>(promise, list);
		for (Promise<S> p : list) {
			p.onResolve(all);
		}
		return promise;
	}

	/**
	 * A callback used to resolve the specified Promise when the specified list
	 * of Promises are resolved for the {@link PromiseFactory#all(Collection)}
	 * method.
	 * 
	 * @ThreadSafe
	 */
	private static final class All<T, S extends T> implements Runnable {
		private final DeferredPromiseImpl<List<T>>	promise;
		private final List<Promise<S>>		promises;
		private final AtomicInteger			promiseCount;

		All(DeferredPromiseImpl<List<T>> promise,
				List<Promise<S>> promises) {
			this.promise = promise;
			this.promises = promises;
			this.promiseCount = new AtomicInteger(promises.size());
		}

		@Override
		public void run() {
			if (promiseCount.decrementAndGet() != 0) {
				return;
			}
			List<T> value = new ArrayList<>(promises.size());
			List<Promise< ? >> failed = new ArrayList<>(promises.size());
			Throwable cause = null;
			for (Promise<S> p : promises) {
				Result<S> result = PromiseImpl.collect(p);
				if (result.fail != null) {
					failed.add(p);
					if (cause == null) {
						cause = result.fail;
					}
				} else {
					value.add(result.value);
				}
			}
			if (failed.isEmpty()) {
				promise.tryResolve(value, null);
			} else {
				promise.tryResolve(null,
						new FailedPromisesException(failed, cause));
			}
		}
	}

	/**
	 * Returns an Executor implementation that executes tasks immediately on the
	 * thread calling the {@code Executor.execute} method.
	 * 
	 * @return An Executor implementation that executes tasks immediately on the
	 *         thread calling the {@code Executor.execute} method.
	 */
	public static Executor inlineExecutor() {
		return new InlineExecutor();
	}

	/**
	 * An Executor implementation which executes the task immediately on the
	 * thread calling the {@code Executor.execute} method.
	 * 
	 * @Immutable
	 */
	private static final class InlineExecutor implements Executor {
		InlineExecutor() {}

		@Override
		public void execute(Runnable callback) {
			callback.run();
		}
	}

	/**
	 * Default executors for Promises.
	 * 
	 * @Immutable
	 */
	private static final class DefaultExecutors
			implements ThreadFactory, RejectedExecutionHandler, Runnable {
		private static final DefaultExecutors	callbacks;
		private static final ScheduledExecutor	scheduledExecutor;
		private static final ThreadPoolExecutor	callbackExecutor;
		static {
			callbacks = new DefaultExecutors();
			scheduledExecutor = new ScheduledExecutor(2, callbacks);
			callbackExecutor = new ThreadPoolExecutor(0, 64, 60L,
					TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
					callbacks, callbacks);
		}

		static Executor callbackExecutor() {
			return callbackExecutor;
		}

		static ScheduledExecutorService scheduledExecutor() {
			return scheduledExecutor;
		}

		private final AtomicBoolean	shutdownHookInstalled;
		private final ThreadFactory	delegateThreadFactory;

		private DefaultExecutors() {
			shutdownHookInstalled = new AtomicBoolean();
			delegateThreadFactory = Executors.defaultThreadFactory();
		}

		/**
		 * Executor threads should not prevent VM from exiting
		 */
		@Override
		public Thread newThread(Runnable r) {
			if (shutdownHookInstalled.compareAndSet(false, true)) {
				Thread shutdownThread = delegateThreadFactory.newThread(this);
				shutdownThread.setName(
						"ExecutorShutdownHook," + shutdownThread.getName());
				try {
					Runtime.getRuntime().addShutdownHook(shutdownThread);
				} catch (IllegalStateException e) {
					// VM is already shutting down...
					callbackExecutor.shutdown();
					scheduledExecutor.shutdown();
				}
			}
			Thread t = delegateThreadFactory.newThread(r);
			t.setName("PromiseFactory," + t.getName());
			t.setDaemon(true);
			return t;
		}

		/**
		 * Call the callback using the caller's thread because the thread pool
		 * rejected the execution.
		 */
		@Override
		public void rejectedExecution(Runnable callback,
				ThreadPoolExecutor executor) {
			try {
				callback.run();
			} catch (Throwable t) {
				uncaughtException(t);
			}
		}

		/**
		 * Shutdown hook
		 */
		@Override
		public void run() {
			// limit new thread creation
			callbackExecutor.setMaximumPoolSize(
					Math.max(1, callbackExecutor.getPoolSize()));
			// Run all delayed callbacks now
			scheduledExecutor.shutdown();
			BlockingQueue<Runnable> queue = scheduledExecutor.getQueue();
			if (!queue.isEmpty()) {
				for (Object r : queue.toArray()) {
					if (r instanceof RunnableScheduledFuture< ? >) {
						RunnableScheduledFuture< ? > future = (RunnableScheduledFuture< ? >) r;
						if ((future.getDelay(TimeUnit.NANOSECONDS) > 0L)
								&& queue.remove(future)) {
							future.run();
							scheduledExecutor.afterExecute(future, null);
						}
					}
				}
				scheduledExecutor.shutdown();
			}
			try {
				scheduledExecutor.awaitTermination(20, TimeUnit.SECONDS);
			} catch (InterruptedException e) {
				Thread.currentThread().interrupt();
			}
			// Shutdown callback executor
			callbackExecutor.shutdown();
			try {
				callbackExecutor.awaitTermination(20, TimeUnit.SECONDS);
			} catch (InterruptedException e) {
				Thread.currentThread().interrupt();
			}
		}

		/**
		 * ScheduledThreadPoolExecutor for scheduled execution.
		 * 
		 * @ThreadSafe
		 */
		private static final class ScheduledExecutor
				extends ScheduledThreadPoolExecutor {
			ScheduledExecutor(int corePoolSize, ThreadFactory threadFactory) {
				super(corePoolSize, threadFactory);
			}

			/**
			 * Handle uncaught exceptions
			 */
			@Override
			protected void afterExecute(Runnable r, Throwable t) {
				super.afterExecute(r, t);
				if ((t == null) && (r instanceof Future< ? >)) {
					boolean interrupted = Thread.interrupted();
					try {
						((Future< ? >) r).get();
					} catch (CancellationException e) {
						// ignore
					} catch (InterruptedException e) {
						interrupted = true;
					} catch (ExecutionException e) {
						t = e.getCause();
					} finally {
						if (interrupted) { // restore interrupt status
							Thread.currentThread().interrupt();
						}
					}
				}
				if (t != null) {
					uncaughtException(t);
				}
			}
		}
	}
}
