blob: 425229231c9f45854755cc5fa6a4469c161ba92c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openejb.util;
import org.apache.openejb.loader.Options;
import org.apache.openejb.util.executor.OfferRejectedExecutionHandler;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @version $Rev$ $Date$
*/
public class ExecutorBuilder {
private int size = 10;
private String prefix = "Pool";
private ThreadFactory threadFactory;
private RejectedExecutionHandler rejectedExecutionHandler;
public ExecutorBuilder() {
}
public ExecutorBuilder size(final int size) {
this.size = size;
return this;
}
public ExecutorBuilder prefix(final String prefix) {
this.prefix = prefix;
return this;
}
public ExecutorBuilder threadFactory(final ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
return this;
}
public ExecutorBuilder rejectedExecutionHandler(final RejectedExecutionHandler rejectedExecutionHandler) {
this.rejectedExecutionHandler = rejectedExecutionHandler;
return this;
}
public ThreadPoolExecutor build(final Options options) {
int corePoolSize = options.get(prefix + ".CorePoolSize", size);
if (corePoolSize < 1) {
corePoolSize = 1;
}
// Default setting is for a fixed pool size, MaximumPoolSize==CorePoolSize
int maximumPoolSize = Math.max(options.get(prefix + ".MaximumPoolSize", corePoolSize), corePoolSize);
if (maximumPoolSize < corePoolSize) {
maximumPoolSize = corePoolSize;
}
// Default QueueSize is bounded using the corePoolSize, else bounded pools will never grow
final int qsize = options.get(prefix + ".QueueSize", corePoolSize);
// Keep Threads inactive threads alive for 60 seconds by default
final Duration keepAliveTime = options.get(prefix + ".KeepAliveTime", new Duration(60, TimeUnit.SECONDS));
// All threads can be timed out by default
final boolean allowCoreThreadTimeout = options.get(prefix + ".AllowCoreThreadTimeOut", true);
// If the user explicitly set the QueueSize to 0, we default QueueType to SYNCHRONOUS
final QueueType defaultQueueType = (qsize < 1) ? QueueType.SYNCHRONOUS : QueueType.LINKED;
final BlockingQueue<Runnable> queue = options.get(prefix + ".QueueType", defaultQueueType).create(options, prefix, qsize);
ThreadFactory factory = this.threadFactory;
if (factory == null) {
factory = new DaemonThreadFactory(prefix);
}
RejectedExecutionHandler handler = this.rejectedExecutionHandler;
if (handler == null) {
final Duration duration = options.get(prefix + ".OfferTimeout", new Duration(30, TimeUnit.SECONDS));
handler = new OfferRejectedExecutionHandler(duration);
}
final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize
, maximumPoolSize
, keepAliveTime.getTime()
, keepAliveTime.getUnit() != null ? keepAliveTime.getUnit() : TimeUnit.SECONDS
, queue
, factory
, handler
);
threadPoolExecutor.allowCoreThreadTimeOut(allowCoreThreadTimeout);
return threadPoolExecutor;
}
/**
* @version $Rev$ $Date$
*/
public static enum QueueType {
ARRAY,
LINKED,
PRIORITY,
SYNCHRONOUS;
public BlockingQueue<Runnable> create(final Options options, final String prefix, final int queueSize) {
switch (this) {
case ARRAY: {
return new ArrayBlockingQueue<Runnable>(queueSize > 0 ? queueSize : 1);
}
case LINKED: {
return new LinkedBlockingQueue<Runnable>(queueSize > 0 ? queueSize : 1);
}
case PRIORITY: {
return new PriorityBlockingQueue<Runnable>();
}
case SYNCHRONOUS: {
return new SynchronousQueue<Runnable>(options.get(prefix + ".QueueFair", false));
}
default: {
// The Options class will throw an error if the user supplies an unknown enum string
// The only way we can reach this is if we add a new QueueType element and forget to
// implement it in the above switch statement.
throw new IllegalArgumentException("Unknown QueueType type: " + this);
}
}
}
}
}