blob: 4d5a906d48d36555a6f7803c2e140c0cbb6659c1 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2013 Boeing.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Boeing - initial API and implementation
*******************************************************************************/
package org.eclipse.osee.ote.internal.message.event.send;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import org.eclipse.osee.framework.logging.OseeLog;
import org.eclipse.osee.ote.message.event.OteEventMessage;
import org.eclipse.osee.ote.message.event.OteEventMessageUtil;
import org.eclipse.osee.ote.message.event.send.OteEventMessageCallable;
import org.eclipse.osee.ote.message.event.send.OteEventMessageFuture;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventHandler;
public class OteEventMessageFutureImpl<T extends OteEventMessage, R extends OteEventMessage> implements OteEventMessageFuture<T, R>, EventHandler{
private final ServiceRegistration<EventHandler> reg;
protected final OteEventMessageCallable<T, R> callable;
protected final Class<R> recieveClasstype;
protected final int responseId;
protected final T sentMessage;
private final ScheduledExecutorService ex;
private final ReentrantLock lock;
private final Condition condition;
private final ScheduledFuture<?> wakeup;
private TimeoutRunnable<T, R> timeoutRunnable;
protected volatile boolean gotResponse = false;
protected volatile boolean isDone = false;
public OteEventMessageFutureImpl(Class<R> recieveClasstype, OteEventMessageCallable<T, R> callable, T sentMessage, String responseTopic, int responseId, long timeout) {
this.callable = callable;
this.responseId = responseId;
this.sentMessage = sentMessage;
this.recieveClasstype = recieveClasstype;
reg = OteEventMessageUtil.subscribe(responseTopic, this);
lock = new ReentrantLock();
condition = lock.newCondition();
ex = Executors.newSingleThreadScheduledExecutor(new ThreadFactory(){
@Override
public Thread newThread(Runnable arg0) {
Thread th = new Thread(arg0);
th.setDaemon(true);
th.setName("OteEventMessage Timeout");
return th;
}
});
timeoutRunnable = new TimeoutRunnable<>(lock, condition, sentMessage, callable, this);
wakeup = ex.schedule(timeoutRunnable, timeout, TimeUnit.MILLISECONDS);
}
@Override
public void handleEvent(Event event) {
try {
R msg = recieveClasstype.newInstance();
OteEventMessageUtil.putBytes(event, msg);
if(msg.getHeader().RESPONSE_ID.getValue() == responseId){
cancel();
gotResponse = true;
callable.call(sentMessage, msg, this);
executeCondition();
}
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
protected final void executeCondition(){
lock.lock();
try{
condition.signal();
} finally {
lock.unlock();
}
}
@Override
public void cancel(){
dispose();
}
@Override
public void waitForCompletion(){
lock.lock();
try{
while(!timeoutRunnable.isTimedOut() && !this.wakeup.isDone()){
try {
condition.await();
} catch (InterruptedException e) {
OseeLog.log(getClass(), Level.SEVERE, e);
}
}
} finally {
lock.unlock();
}
}
@Override
public boolean isTimedOut(){
return timeoutRunnable.isTimedOut();
}
@Override
public boolean gotResponse(){
return gotResponse ;
}
@Override
public void complete(){
cancel();
executeCondition();
}
private void dispose(){
reg.unregister();
wakeup.cancel(false);
this.ex.shutdown();
isDone = true;
}
@Override
public boolean isDone() {
return isDone;
}
}