blob: 5b5b1069e7348cb5f270cf43ba96ee115479d196 [file] [log] [blame]
/*
* Copyright (c) 2009-2013 Eike Stepper (Berlin, Germany) and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Eike Stepper - initial API and implementation
*/
package org.eclipse.net4j.signal.heartbeat;
import org.eclipse.net4j.channel.IChannel;
import org.eclipse.net4j.channel.IChannelMultiplexer;
import org.eclipse.net4j.connector.IConnector;
import org.eclipse.net4j.signal.Indication;
import org.eclipse.net4j.signal.Request;
import org.eclipse.net4j.signal.SignalProtocol;
import org.eclipse.net4j.signal.SignalReactor;
import org.eclipse.net4j.util.WrappedException;
import org.eclipse.net4j.util.concurrent.Timeouter;
import org.eclipse.net4j.util.concurrent.TimerLifecycle;
import org.eclipse.net4j.util.container.IElementProcessor;
import org.eclipse.net4j.util.container.IManagedContainer;
import org.eclipse.net4j.util.container.IPluginContainer;
import org.eclipse.net4j.util.factory.ProductCreationException;
import org.eclipse.net4j.util.io.ExtendedDataInputStream;
import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
import org.eclipse.net4j.util.om.log.OMLogger;
import org.eclipse.internal.net4j.bundle.OM;
import org.eclipse.spi.net4j.ServerProtocolFactory;
import java.io.IOException;
import java.util.Timer;
import java.util.TimerTask;
/**
* A {@link SignalProtocol signal protocol} that keeps the {@link IConnector connector} of its {@link IChannel channel} open
* or provides early deactivation feedback by exchanging periodic heart beats.
*
* @author Eike Stepper
* @since 2.0
*/
public class HeartBeatProtocol extends SignalProtocol<Object>
{
public static final String TYPE = "heartbeat"; //$NON-NLS-1$
private static final short SIGNAL_START = 1;
private static final short SIGNAL_HEART_BEAT = 2;
private static final boolean HEART_BEAT = true;
private Timeouter timeouter;
private Timer timer;
/**
* @since 4.0
*/
protected HeartBeatProtocol(String type, IConnector connector, Timer timer)
{
super(type);
checkArg(timer, "timer"); //$NON-NLS-1$
checkArg(connector, "connector"); //$NON-NLS-1$
this.timer = timer;
open(connector);
}
public HeartBeatProtocol(IConnector connector, Timer timer)
{
this(TYPE, connector, timer);
}
/**
* @since 4.0
*/
public HeartBeatProtocol(IConnector connector, IManagedContainer container)
{
this(connector, getDefaultTimer(container));
}
public HeartBeatProtocol(IConnector connector)
{
this(connector, IPluginContainer.INSTANCE);
}
public Timer getTimer()
{
return timer;
}
/**
* Same as <code>start(rate, 2 * rate)</code>.
*
* @see #start(long, long)
*/
public void start(final long rate)
{
start(rate, 2L * rate);
}
public void start(final long rate, long timeout)
{
checkActive();
checkArg(rate > 0, "rate"); //$NON-NLS-1$
checkArg(timeout >= rate, "timeout"); //$NON-NLS-1$
try
{
new Request(this, SIGNAL_START, "Start") //$NON-NLS-1$
{
@Override
protected void requesting(ExtendedDataOutputStream out) throws Exception
{
requestingStart(out, rate);
}
}.sendAsync();
}
catch (Exception ex)
{
throw WrappedException.wrap(ex);
}
if (timeouter == null)
{
timeouter = new Timeouter(getTimer(), timeout)
{
@Override
protected void handleTimeout(long untouched)
{
HeartBeatProtocol.this.handleTimeout(untouched);
}
};
}
else
{
timeouter.setTimeout(timeout);
timeouter.touch();
}
}
@Override
protected SignalReactor createSignalReactor(short signalID)
{
if (signalID == SIGNAL_HEART_BEAT)
{
return new Indication(HeartBeatProtocol.this, SIGNAL_HEART_BEAT, "HeartBeat") //$NON-NLS-1$
{
@Override
protected void indicating(ExtendedDataInputStream in) throws Exception
{
checkState(in.readBoolean() == HEART_BEAT, "Invalid heart beat"); //$NON-NLS-1$
timeouter.touch();
}
};
}
return null;
}
protected void handleTimeout(long untouched)
{
IChannelMultiplexer multiplexer = getChannel().getMultiplexer();
LifecycleUtil.deactivate(multiplexer, OMLogger.Level.DEBUG);
}
@Override
protected void doDeactivate() throws Exception
{
if (timeouter != null)
{
timeouter.dispose();
timeouter = null;
}
super.doDeactivate();
}
/**
* @since 4.0
*/
protected void requestingStart(ExtendedDataOutputStream out, long rate) throws IOException
{
out.writeLong(rate);
}
public static Timer getDefaultTimer(IManagedContainer container)
{
return TimerLifecycle.DaemonFactory.getTimer(container, null);
}
/**
* The server-side implementation of a {@link HeartBeatProtocol heart beat protocol}.
*
* @author Eike Stepper
*/
public static class Server extends SignalProtocol<Object>
{
private long heartBeatRate;
private Timer heartBeatTimer;
private TimerTask heartBeatTimerTask;
/**
* @since 4.0
*/
protected Server(String type)
{
super(type);
}
public Server()
{
this(TYPE);
}
public Timer getHeartBeatTimer()
{
return heartBeatTimer;
}
public void setHeartBeatTimer(Timer heartBeatTimer)
{
checkInactive();
this.heartBeatTimer = heartBeatTimer;
}
@Override
protected SignalReactor createSignalReactor(short signalID)
{
if (signalID == SIGNAL_START)
{
return new Indication(Server.this, SIGNAL_START, "Start") //$NON-NLS-1$
{
@Override
protected void indicating(ExtendedDataInputStream in) throws Exception
{
indicatingStart(in);
}
};
}
return null;
}
@Override
protected void doBeforeActivate() throws Exception
{
super.doBeforeActivate();
checkState(heartBeatTimer, "heartBeatTimer"); //$NON-NLS-1$
}
@Override
protected void doDeactivate() throws Exception
{
try
{
cancelHeartBeatTask();
}
finally
{
super.doDeactivate();
}
}
/**
* @since 4.0
*/
protected void indicatingStart(ExtendedDataInputStream in) throws IOException
{
heartBeatRate = in.readLong();
cancelHeartBeatTask();
scheduleHeartBeatTask();
}
private void scheduleHeartBeatTask()
{
heartBeatTimerTask = new TimerTask()
{
@Override
public void run()
{
try
{
new Request(Server.this, SIGNAL_HEART_BEAT, "HeartBeat") //$NON-NLS-1$
{
@Override
protected void requesting(ExtendedDataOutputStream out) throws Exception
{
out.writeBoolean(HEART_BEAT);
}
}.sendAsync();
}
catch (Exception ex)
{
OM.LOG.error("HeartBeatProtocolTask failed", ex);
}
}
};
heartBeatTimer.schedule(heartBeatTimerTask, 0L, heartBeatRate);
}
private void cancelHeartBeatTask()
{
if (heartBeatTimerTask != null)
{
heartBeatTimerTask.cancel();
heartBeatTimerTask = null;
}
}
/**
* Creates server-side {@link Server heart beat protocol} instances.
*
* @author Eike Stepper
*/
public static class Factory extends ServerProtocolFactory
{
public Factory()
{
super(TYPE);
}
public Object create(String description) throws ProductCreationException
{
return new HeartBeatProtocol.Server();
}
}
/**
* An {@link IElementProcessor element post processor} that injects a {@link #getTimer(IManagedContainer) timer}
* into server-side {@link Server heart beat protocol} instances.
*
* @author Eike Stepper
*/
public static class TimerInjector implements IElementProcessor
{
public TimerInjector()
{
}
public Object process(IManagedContainer container, String productGroup, String factoryType, String description,
Object element)
{
if (element instanceof Server)
{
Server server = (Server)element;
if (server.getHeartBeatTimer() == null)
{
server.setHeartBeatTimer(getTimer(container));
}
}
return element;
}
protected Timer getTimer(IManagedContainer container)
{
return getDefaultTimer(container);
}
}
}
}