blob: 5be2921c959444dac53d767f058f2d6f384cb607 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.catalina.tribes.transport;
import java.io.IOException;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.ChannelReceiver;
import org.apache.catalina.tribes.MessageListener;
import org.apache.catalina.tribes.io.ListenCallback;
import org.apache.catalina.tribes.util.ExecutorFactory;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
/**
* <p>Title: </p>
*
* <p>Description: </p>
*
* <p>Company: </p>
*
* @author not attributable
* @version 1.0
*/
public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, RxTaskPool.TaskCreator {
public static final int OPTION_DIRECT_BUFFER = 0x0004;
private static final Log log = LogFactory.getLog(ReceiverBase.class);
private MessageListener listener;
private String host = "auto";
private InetAddress bind;
private int port = 4000;
private int udpPort = -1;
private int securePort = -1;
private int rxBufSize = 43800;
private int txBufSize = 25188;
private int udpRxBufSize = 43800;
private int udpTxBufSize = 25188;
private boolean listen = false;
private RxTaskPool pool;
private boolean direct = true;
private long tcpSelectorTimeout = 5000;
//how many times to search for an available socket
private int autoBind = 100;
private int maxThreads = 15;
private int minThreads = 6;
private int maxTasks = 100;
private int minTasks = 10;
private boolean tcpNoDelay = true;
private boolean soKeepAlive = false;
private boolean ooBInline = true;
private boolean soReuseAddress = true;
private boolean soLingerOn = true;
private int soLingerTime = 3;
private int soTrafficClass = 0x04 | 0x08 | 0x010;
private int timeout = 3000; //3 seconds
private boolean useBufferPool = true;
private boolean daemon = true;
private long maxIdleTime = 60000;
private ExecutorService executor;
public ReceiverBase() {
}
public void start() throws IOException {
if ( executor == null ) {
//executor = new ThreadPoolExecutor(minThreads,maxThreads,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
TaskThreadFactory tf = new TaskThreadFactory("Tribes-Task-Receiver-");
executor = ExecutorFactory.newThreadPool(minThreads, maxThreads, maxIdleTime, TimeUnit.MILLISECONDS, tf);
}
}
public void stop() {
if ( executor != null ) executor.shutdownNow();//ignore left overs
executor = null;
}
/**
* getMessageListener
*
* @return MessageListener
* TODO Implement this org.apache.catalina.tribes.ChannelReceiver method
*/
public MessageListener getMessageListener() {
return listener;
}
/**
*
* @return The port
* TODO Implement this org.apache.catalina.tribes.ChannelReceiver method
*/
public int getPort() {
return port;
}
public int getRxBufSize() {
return rxBufSize;
}
public int getTxBufSize() {
return txBufSize;
}
/**
* @deprecated use getMinThreads()/getMaxThreads()
* @return int
*/
@Deprecated
public int getTcpThreadCount() {
return getMaxThreads();
}
/**
* setMessageListener
*
* @param listener MessageListener
* TODO Implement this org.apache.catalina.tribes.ChannelReceiver method
*/
public void setMessageListener(MessageListener listener) {
this.listener = listener;
}
/**
* @deprecated use setPort
* @param tcpListenPort int
*/
@Deprecated
public void setTcpListenPort(int tcpListenPort) {
setPort(tcpListenPort);
}
/**
* @deprecated use setAddress
* @param tcpListenHost String
*/
@Deprecated
public void setTcpListenAddress(String tcpListenHost) {
setAddress(tcpListenHost);
}
public void setRxBufSize(int rxBufSize) {
this.rxBufSize = rxBufSize;
}
public void setTxBufSize(int txBufSize) {
this.txBufSize = txBufSize;
}
/**
* @deprecated use setMaxThreads/setMinThreads
* @param tcpThreadCount int
*/
@Deprecated
public void setTcpThreadCount(int tcpThreadCount) {
setMaxThreads(tcpThreadCount);
setMinThreads(tcpThreadCount);
}
/**
* @return Returns the bind.
*/
public InetAddress getBind() {
if (bind == null) {
try {
if ("auto".equals(host)) {
host = java.net.InetAddress.getLocalHost().getHostAddress();
}
if (log.isDebugEnabled())
log.debug("Starting replication listener on address:"+ host);
bind = java.net.InetAddress.getByName(host);
} catch (IOException ioe) {
log.error("Failed bind replication listener on address:"+ host, ioe);
}
}
return bind;
}
/**
* recursive bind to find the next available port
* @param socket ServerSocket
* @param portstart int
* @param retries int
* @return int
* @throws IOException
*/
protected int bind(ServerSocket socket, int portstart, int retries) throws IOException {
InetSocketAddress addr = null;
while ( retries > 0 ) {
try {
addr = new InetSocketAddress(getBind(), portstart);
socket.bind(addr);
setPort(portstart);
log.info("Receiver Server Socket bound to:"+addr);
return 0;
}catch ( IOException x) {
retries--;
if ( retries <= 0 ) {
log.info("Unable to bind server socket to:"+addr+" throwing error.");
throw x;
}
portstart++;
try {Thread.sleep(25);}catch( InterruptedException ti){Thread.interrupted();}
retries = bind(socket,portstart,retries);
}
}
return retries;
}
/**
* Same as bind() except it does it for the UDP port
* @param socket
* @param portstart
* @param retries
* @return int
* @throws IOException
*/
protected int bindUdp(DatagramSocket socket, int portstart, int retries) throws IOException {
InetSocketAddress addr = null;
while ( retries > 0 ) {
try {
addr = new InetSocketAddress(getBind(), portstart);
socket.bind(addr);
setUdpPort(portstart);
log.info("UDP Receiver Server Socket bound to:"+addr);
return 0;
}catch ( IOException x) {
retries--;
if ( retries <= 0 ) {
log.info("Unable to bind UDP socket to:"+addr+" throwing error.");
throw x;
}
portstart++;
try {Thread.sleep(25);}catch( InterruptedException ti){Thread.interrupted();}
retries = bindUdp(socket,portstart,retries);
}
}
return retries;
}
public void messageDataReceived(ChannelMessage data) {
if ( this.listener != null ) {
if ( listener.accept(data) ) listener.messageReceived(data);
}
}
public int getWorkerThreadOptions() {
int options = 0;
if ( getDirect() ) options = options | OPTION_DIRECT_BUFFER;
return options;
}
/**
* @param bind The bind to set.
*/
public void setBind(java.net.InetAddress bind) {
this.bind = bind;
}
/**
* @deprecated use getPort
* @return int
*/
@Deprecated
public int getTcpListenPort() {
return getPort();
}
public boolean getDirect() {
return direct;
}
public void setDirect(boolean direct) {
this.direct = direct;
}
public String getAddress() {
getBind();
return this.host;
}
public String getHost() {
return getAddress();
}
public long getSelectorTimeout() {
return tcpSelectorTimeout;
}
/**
* @deprecated use getSelectorTimeout
* @return long
*/
@Deprecated
public long getTcpSelectorTimeout() {
return getSelectorTimeout();
}
public boolean doListen() {
return listen;
}
public MessageListener getListener() {
return listener;
}
public RxTaskPool getTaskPool() {
return pool;
}
/**
* @deprecated use getAddress
* @return String
*/
@Deprecated
public String getTcpListenAddress() {
return getAddress();
}
public int getAutoBind() {
return autoBind;
}
public int getMaxThreads() {
return maxThreads;
}
public int getMinThreads() {
return minThreads;
}
public boolean getTcpNoDelay() {
return tcpNoDelay;
}
public boolean getSoKeepAlive() {
return soKeepAlive;
}
public boolean getOoBInline() {
return ooBInline;
}
public boolean getSoLingerOn() {
return soLingerOn;
}
public int getSoLingerTime() {
return soLingerTime;
}
public boolean getSoReuseAddress() {
return soReuseAddress;
}
public int getSoTrafficClass() {
return soTrafficClass;
}
public int getTimeout() {
return timeout;
}
public boolean getUseBufferPool() {
return useBufferPool;
}
public int getSecurePort() {
return securePort;
}
public int getMinTasks() {
return minTasks;
}
public int getMaxTasks() {
return maxTasks;
}
public ExecutorService getExecutor() {
return executor;
}
public boolean isListening() {
return listen;
}
/**
* @deprecated use setSelectorTimeout
* @param selTimeout long
*/
@Deprecated
public void setTcpSelectorTimeout(long selTimeout) {
setSelectorTimeout(selTimeout);
}
public void setSelectorTimeout(long selTimeout) {
tcpSelectorTimeout = selTimeout;
}
public void setListen(boolean doListen) {
this.listen = doListen;
}
public void setAddress(String host) {
this.host = host;
}
public void setHost(String host) {
setAddress(host);
}
public void setListener(MessageListener listener) {
this.listener = listener;
}
public void setPool(RxTaskPool pool) {
this.pool = pool;
}
public void setPort(int port) {
this.port = port;
}
public void setAutoBind(int autoBind) {
this.autoBind = autoBind;
if ( this.autoBind <= 0 ) this.autoBind = 1;
}
public void setMaxThreads(int maxThreads) {
this.maxThreads = maxThreads;
}
public void setMinThreads(int minThreads) {
this.minThreads = minThreads;
}
public void setTcpNoDelay(boolean tcpNoDelay) {
this.tcpNoDelay = tcpNoDelay;
}
public void setSoKeepAlive(boolean soKeepAlive) {
this.soKeepAlive = soKeepAlive;
}
public void setOoBInline(boolean ooBInline) {
this.ooBInline = ooBInline;
}
public void setSoLingerOn(boolean soLingerOn) {
this.soLingerOn = soLingerOn;
}
public void setSoLingerTime(int soLingerTime) {
this.soLingerTime = soLingerTime;
}
public void setSoReuseAddress(boolean soReuseAddress) {
this.soReuseAddress = soReuseAddress;
}
public void setSoTrafficClass(int soTrafficClass) {
this.soTrafficClass = soTrafficClass;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
public void setUseBufferPool(boolean useBufferPool) {
this.useBufferPool = useBufferPool;
}
public void setSecurePort(int securePort) {
this.securePort = securePort;
}
public void setMinTasks(int minTasks) {
this.minTasks = minTasks;
}
public void setMaxTasks(int maxTasks) {
this.maxTasks = maxTasks;
}
public void setExecutor(ExecutorService executor) {
this.executor = executor;
}
public void heartbeat() {
//empty operation
}
public int getUdpPort() {
return udpPort;
}
public void setUdpPort(int udpPort) {
this.udpPort = udpPort;
}
public int getUdpRxBufSize() {
return udpRxBufSize;
}
public void setUdpRxBufSize(int udpRxBufSize) {
this.udpRxBufSize = udpRxBufSize;
}
public int getUdpTxBufSize() {
return udpTxBufSize;
}
public void setUdpTxBufSize(int udpTxBufSize) {
this.udpTxBufSize = udpTxBufSize;
}
// ---------------------------------------------- ThreadFactory Inner Class
class TaskThreadFactory implements ThreadFactory {
final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;
TaskThreadFactory(String namePrefix) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
this.namePrefix = namePrefix;
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement());
t.setDaemon(daemon);
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
public boolean isDaemon() {
return daemon;
}
public long getMaxIdleTime() {
return maxIdleTime;
}
public void setDaemon(boolean daemon) {
this.daemon = daemon;
}
public void setMaxIdleTime(long maxIdleTime) {
this.maxIdleTime = maxIdleTime;
}
}