blob: e855a37ab50ce2af4dc75a1f03bbbe0c642e8b81 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.catalina.tribes.transport.nio;
import java.io.IOException;
import java.net.UnknownHostException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.UniqueId;
import org.apache.catalina.tribes.io.ChannelData;
import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.transport.AbstractSender;
import org.apache.catalina.tribes.transport.MultiPointSender;
import org.apache.catalina.tribes.transport.SenderState;
import org.apache.catalina.tribes.util.Logs;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
/**
* <p>Title: </p>
*
* <p>Description: </p>
*
* <p>Company: </p>
*
* @author not attributable
* @version 1.0
*/
public class ParallelNioSender extends AbstractSender implements MultiPointSender {
private static final Log log = LogFactory.getLog(ParallelNioSender.class);
protected long selectTimeout = 5000; //default 5 seconds, same as send timeout
protected Selector selector;
protected HashMap<Member, NioSender> nioSenders =
new HashMap<Member, NioSender>();
public ParallelNioSender() throws IOException {
selector = Selector.open();
setConnected(true);
}
@Override
public synchronized void sendMessage(Member[] destination, ChannelMessage msg) throws ChannelException {
long start = System.currentTimeMillis();
this.setUdpBased((msg.getOptions()&Channel.SEND_OPTIONS_UDP) == Channel.SEND_OPTIONS_UDP);
byte[] data = XByteBuffer.createDataPackage((ChannelData)msg);
NioSender[] senders = setupForSend(destination);
connect(senders);
setData(senders,data);
int remaining = senders.length;
ChannelException cx = null;
try {
//loop until complete, an error happens, or we timeout
long delta = System.currentTimeMillis() - start;
boolean waitForAck = (Channel.SEND_OPTIONS_USE_ACK & msg.getOptions()) == Channel.SEND_OPTIONS_USE_ACK;
while ( (remaining>0) && (delta<getTimeout()) ) {
try {
remaining -= doLoop(selectTimeout, getMaxRetryAttempts(),waitForAck,msg);
} catch (Exception x ) {
if (log.isTraceEnabled()) log.trace("Error sending message", x);
int faulty = (cx == null)?0:cx.getFaultyMembers().length;
if ( cx == null ) {
if ( x instanceof ChannelException ) cx = (ChannelException)x;
else cx = new ChannelException("Parallel NIO send failed.", x);
} else {
if (x instanceof ChannelException) cx.addFaultyMember( ( (ChannelException) x).getFaultyMembers());
}
//count down the remaining on an error
if (faulty<cx.getFaultyMembers().length) remaining -= (cx.getFaultyMembers().length-faulty);
}
//bail out if all remaining senders are failing
if ( cx != null && cx.getFaultyMembers().length == remaining ) throw cx;
delta = System.currentTimeMillis() - start;
}
if ( remaining > 0 ) {
//timeout has occurred
ChannelException cxtimeout = new ChannelException("Operation has timed out("+getTimeout()+" ms.).");
if ( cx==null ) cx = new ChannelException("Operation has timed out("+getTimeout()+" ms.).");
for (int i=0; i<senders.length; i++ ) {
if (!senders[i].isComplete() ) cx.addFaultyMember(senders[i].getDestination(),cxtimeout);
}
throw cx;
} else if ( cx != null ) {
//there was an error
throw cx;
}
} catch (Exception x ) {
try { this.disconnect(); } catch (Exception e) {/*Ignore*/}
if ( x instanceof ChannelException ) throw (ChannelException)x;
else throw new ChannelException(x);
}
}
private int doLoop(long selectTimeOut, int maxAttempts, boolean waitForAck, ChannelMessage msg) throws IOException, ChannelException {
int completed = 0;
int selectedKeys = selector.select(selectTimeOut);
if (selectedKeys == 0) {
return 0;
}
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey sk = it.next();
it.remove();
int readyOps = sk.readyOps();
sk.interestOps(sk.interestOps() & ~readyOps);
NioSender sender = (NioSender) sk.attachment();
try {
if (sender.process(sk,waitForAck)) {
completed++;
sender.setComplete(true);
if ( Logs.MESSAGES.isTraceEnabled() ) {
Logs.MESSAGES.trace("ParallelNioSender - Sent msg:" + new UniqueId(msg.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " to "+sender.getDestination().getName());
}
SenderState.getSenderState(sender.getDestination()).setReady();
}//end if
} catch (Exception x) {
if (log.isTraceEnabled()) {
log.trace("Error while processing send to " + sender.getDestination().getName(), x);
}
SenderState state = SenderState.getSenderState(sender.getDestination());
int attempt = sender.getAttempt()+1;
boolean retry = (sender.getAttempt() <= maxAttempts && maxAttempts>0);
synchronized (state) {
//sk.cancel();
if (state.isSuspect()) state.setFailing();
if (state.isReady()) {
state.setSuspect();
if ( retry )
log.warn("Member send is failing for:" + sender.getDestination().getName() +" ; Setting to suspect and retrying.");
else
log.warn("Member send is failing for:" + sender.getDestination().getName() +" ; Setting to suspect.", x);
}
}
if ( !isConnected() ) {
log.warn("Not retrying send for:" + sender.getDestination().getName() + "; Sender is disconnected.");
ChannelException cx = new ChannelException("Send failed, and sender is disconnected. Not retrying.",x);
cx.addFaultyMember(sender.getDestination(),x);
throw cx;
}
byte[] data = sender.getMessage();
if ( retry ) {
try {
sender.disconnect();
sender.connect();
sender.setAttempt(attempt);
sender.setMessage(data);
}catch ( Exception ignore){
state.setFailing();
}
} else {
ChannelException cx = new ChannelException("Send failed, attempt:"+sender.getAttempt()+" max:"+maxAttempts,x);
cx.addFaultyMember(sender.getDestination(),x);
throw cx;
}//end if
}
}
return completed;
}
private void connect(NioSender[] senders) throws ChannelException {
ChannelException x = null;
for (int i=0; i<senders.length; i++ ) {
try {
senders[i].connect();
}catch ( IOException io ) {
if ( x==null ) x = new ChannelException(io);
x.addFaultyMember(senders[i].getDestination(),io);
}
}
if ( x != null ) throw x;
}
private void setData(NioSender[] senders, byte[] data) throws ChannelException {
ChannelException x = null;
for (int i=0; i<senders.length; i++ ) {
try {
senders[i].setMessage(data);
}catch ( IOException io ) {
if ( x==null ) x = new ChannelException(io);
x.addFaultyMember(senders[i].getDestination(),io);
}
}
if ( x != null ) throw x;
}
private NioSender[] setupForSend(Member[] destination) throws ChannelException {
ChannelException cx = null;
NioSender[] result = new NioSender[destination.length];
for ( int i=0; i<destination.length; i++ ) {
NioSender sender = nioSenders.get(destination[i]);
try {
if (sender == null) {
sender = new NioSender();
AbstractSender.transferProperties(this, sender);
nioSenders.put(destination[i], sender);
}
sender.reset();
sender.setDestination(destination[i]);
sender.setSelector(selector);
sender.setUdpBased(isUdpBased());
result[i] = sender;
}catch ( UnknownHostException x ) {
if (cx == null) cx = new ChannelException("Unable to setup NioSender.", x);
cx.addFaultyMember(destination[i], x);
}
}
if ( cx != null ) throw cx;
else return result;
}
@Override
public void connect() {
//do nothing, we connect on demand
setConnected(true);
}
private synchronized void close() throws ChannelException {
ChannelException x = null;
Object[] members = nioSenders.keySet().toArray();
for (int i=0; i<members.length; i++ ) {
Member mbr = (Member)members[i];
try {
NioSender sender = nioSenders.get(mbr);
sender.disconnect();
}catch ( Exception e ) {
if ( x == null ) x = new ChannelException(e);
x.addFaultyMember(mbr,e);
}
nioSenders.remove(mbr);
}
if ( x != null ) throw x;
}
@Override
public void add(Member member) {
// NOOP
}
@Override
public void remove(Member member) {
//disconnect senders
NioSender sender = nioSenders.remove(member);
if ( sender != null ) sender.disconnect();
}
@Override
public synchronized void disconnect() {
setConnected(false);
try {close(); }catch (Exception x){/*Ignore*/}
}
@Override
public void finalize() {
try {disconnect(); }catch ( Exception e){/*Ignore*/}
try {
selector.close();
}catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("Failed to close selector", e);
}
}
}
@Override
public boolean keepalive() {
boolean result = false;
for ( Iterator<Entry<Member, NioSender>> i = nioSenders.entrySet().iterator(); i.hasNext();) {
Map.Entry<Member, NioSender> entry = i.next();
NioSender sender = entry.getValue();
if ( sender.keepalive() ) {
//nioSenders.remove(entry.getKey());
i.remove();
result = true;
} else {
try {
sender.read(null);
}catch ( IOException x ) {
sender.disconnect();
sender.reset();
//nioSenders.remove(entry.getKey());
i.remove();
result = true;
}catch ( Exception x ) {
log.warn("Error during keepalive test for sender:"+sender,x);
}
}
}
//clean up any cancelled keys
if ( result ) try { selector.selectNow(); }catch (Exception e){/*Ignore*/}
return result;
}
}