blob: 6441478aa3ad4ada0ee5f953ebc2d5da3de735e1 [file] [log] [blame]
/*
* Copyright (c) 2008, 2009, 2011, 2012, 2014 Eike Stepper (Berlin, Germany) and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Eike Stepper - initial API and implementation
*/
package org.eclipse.net4j.signal;
import org.eclipse.net4j.buffer.BufferInputStream;
import org.eclipse.net4j.buffer.BufferOutputStream;
import org.eclipse.net4j.util.ImplementationError;
import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
import org.eclipse.net4j.util.io.ExtendedDataInputStream;
import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
import org.eclipse.net4j.util.om.monitor.OMMonitor;
import org.eclipse.internal.net4j.bundle.OM;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
/**
* Represents the sender side of a two-way {@link IndicationWithResponse signal} with additional support for remote progress monitoring.
*
* @author Eike Stepper
* @since 2.0
*/
public abstract class RequestWithMonitoring<RESULT> extends RequestWithConfirmation<RESULT>
{
/**
* @since 2.0
*/
public static final long DEFAULT_CANCELATION_POLL_INTERVAL = 100;
/**
* @since 2.0
*/
public static final int DEFAULT_MONITOR_PROGRESS_SECONDS = 1;
/**
* @since 2.0
*/
public static final int DEFAULT_MONITOR_TIMEOUT_SECONDS = 10;
private OMMonitor mainMonitor;
private OMMonitor remoteMonitor;
private Object monitorLock = new Object();
/**
* @since 2.0
*/
public RequestWithMonitoring(SignalProtocol<?> protocol, short id, String name)
{
super(protocol, id, name);
}
/**
* @since 2.0
*/
public RequestWithMonitoring(SignalProtocol<?> protocol, short signalID)
{
super(protocol, signalID);
}
/**
* @since 2.0
*/
public RequestWithMonitoring(SignalProtocol<?> protocol, Enum<?> literal)
{
super(protocol, literal);
}
@Override
public Future<RESULT> sendAsync()
{
return super.sendAsync();
}
public Future<RESULT> sendAsync(OMMonitor monitor)
{
mainMonitor = monitor;
return super.sendAsync();
}
@Override
public RESULT send() throws Exception, RemoteException
{
return super.send();
}
@Override
public RESULT send(long timeout) throws Exception, RemoteException
{
return super.send(timeout);
}
public RESULT send(OMMonitor monitor) throws Exception, RemoteException
{
mainMonitor = monitor;
return super.send();
}
public RESULT send(long timeout, OMMonitor monitor) throws Exception, RemoteException
{
mainMonitor = monitor;
return super.send(timeout);
}
@Override
protected final void requesting(ExtendedDataOutputStream out) throws Exception
{
OMMonitor fork = null;
boolean useMonitor = mainMonitor != null;
out.writeBoolean(useMonitor);
if (useMonitor)
{
double remoteWork = OMMonitor.HUNDRED - getRequestingWorkPercent() - getConfirmingWorkPercent();
if (remoteWork < OMMonitor.ZERO)
{
throw new ImplementationError("Remote work must not be negative: " + remoteWork); //$NON-NLS-1$
}
mainMonitor.begin(OMMonitor.HUNDRED);
OMMonitor subMonitor = mainMonitor.fork(remoteWork);
synchronized (monitorLock)
{
remoteMonitor = subMonitor;
}
ExecutorService executorService = getCancelationExecutorService();
if (executorService != null)
{
executorService.execute(new Runnable()
{
public void run()
{
while (mainMonitor != null)
{
ConcurrencyUtil.sleep(getCancelationPollInterval());
if (mainMonitor != null && mainMonitor.isCanceled())
{
try
{
new MonitorCanceledRequest(getProtocol(), getCorrelationID()).sendAsync();
}
catch (Exception ex)
{
OM.LOG.error(ex);
}
return;
}
}
}
});
}
out.writeInt(getMonitorProgressSeconds());
fork = mainMonitor.fork(getRequestingWorkPercent());
}
out.writeInt(getMonitorTimeoutSeconds());
requesting(out, fork);
}
@Override
protected final RESULT confirming(ExtendedDataInputStream in) throws Exception
{
OMMonitor monitor = mainMonitor != null ? mainMonitor.fork(getConfirmingWorkPercent()) : null;
return confirming(in, monitor);
}
protected abstract void requesting(ExtendedDataOutputStream out, OMMonitor monitor) throws Exception;
/**
* <b>Important Note:</b> The confirmation must not be empty, i.e. the stream must be used at least to read a
* <code>boolean</code>. Otherwise synchronization problems will result!
*/
protected abstract RESULT confirming(ExtendedDataInputStream in, OMMonitor monitor) throws Exception;
/**
* @since 2.0
*/
protected ExecutorService getCancelationExecutorService()
{
return getProtocol().getExecutorService();
}
/**
* @since 2.0
*/
protected long getCancelationPollInterval()
{
return DEFAULT_CANCELATION_POLL_INTERVAL;
}
/**
* @since 2.0
*/
protected int getMonitorProgressSeconds()
{
return DEFAULT_MONITOR_PROGRESS_SECONDS;
}
/**
* @since 2.0
*/
protected int getMonitorTimeoutSeconds()
{
return DEFAULT_MONITOR_TIMEOUT_SECONDS;
}
/**
* @since 2.0
*/
protected int getRequestingWorkPercent()
{
return 2;
}
/**
* @since 2.0
*/
protected int getConfirmingWorkPercent()
{
return 1;
}
@Override
void doExecute(BufferInputStream in, BufferOutputStream out) throws Exception
{
try
{
super.doExecute(in, out);
}
finally
{
synchronized (monitorLock)
{
try
{
if (remoteMonitor != null)
{
remoteMonitor.done();
}
}
catch (Exception ex)
{
OM.LOG.error(ex);
}
finally
{
remoteMonitor = null;
}
}
try
{
if (mainMonitor != null)
{
mainMonitor.done();
}
}
finally
{
mainMonitor = null;
}
}
}
void setMonitorProgress(double totalWork, double work)
{
getBufferInputStream().restartTimeout();
synchronized (monitorLock)
{
if (remoteMonitor != null)
{
if (!remoteMonitor.hasBegun())
{
remoteMonitor.begin(totalWork);
remoteMonitor.worked(work);
}
else
{
double oldRatio = remoteMonitor.getWork() / remoteMonitor.getTotalWork();
double newRatio = work / totalWork;
double newWork = newRatio - oldRatio;
newWork *= remoteMonitor.getTotalWork();
if (newWork >= OMMonitor.ZERO)
{
remoteMonitor.worked(newWork);
}
}
}
}
}
}