blob: 5278c1d0adf87d885890d3c383d09a742fda7348 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2013 Boeing.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Boeing - initial API and implementation
*******************************************************************************/
package org.eclipse.ote.bytemessage.internal;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import org.eclipse.osee.framework.logging.OseeLog;
import org.eclipse.ote.bytemessage.OteByteMessage;
import org.eclipse.ote.bytemessage.OteByteMessageCallable;
import org.eclipse.ote.bytemessage.OteByteMessageFuture;
import org.eclipse.ote.bytemessage.OteByteMessageUtil;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventHandler;
public class OteByteMessageFutureImpl<T extends OteByteMessage, R extends OteByteMessage> implements OteByteMessageFuture<T, R>, EventHandler{
private final ServiceRegistration<EventHandler> reg;
private final OteByteMessageCallable<T, R> callable;
private final Class<R> recieveClasstype;
private final int responseId;
private final T sentMessage;
private final ScheduledExecutorService ex;
private final ReentrantLock lock;
private final Condition condition;
private final ScheduledFuture<?> wakeup;
private TimeoutRunnable<T, R> timeoutRunnable;
private volatile boolean gotResponse = false;
public OteByteMessageFutureImpl(Class<R> recieveClasstype, OteByteMessageCallable<T, R> callable, T sentMessage, String responseTopic, int responseId, long timeout) {
this.callable = callable;
this.responseId = responseId;
this.sentMessage = sentMessage;
this.recieveClasstype = recieveClasstype;
reg = OteByteMessageUtil.subscribe(responseTopic, this);
lock = new ReentrantLock();
condition = lock.newCondition();
ex = Executors.newSingleThreadScheduledExecutor(new ThreadFactory(){
@Override
public Thread newThread(Runnable arg0) {
Thread th = new Thread(arg0);
th.setName("OteByteMessage Timeout");
return th;
}
});
timeoutRunnable = new TimeoutRunnable<T, R>(lock, condition, sentMessage, callable, this);
wakeup = ex.schedule(timeoutRunnable, timeout, TimeUnit.MILLISECONDS);
}
@Override
public void handleEvent(Event event) {
try {
R msg = recieveClasstype.newInstance();
OteByteMessageUtil.putBytes(event, msg);
if(msg.getHeader().RESPONSE_ID.getValue() == responseId){
cancel();
gotResponse = true;
callable.call(sentMessage, msg);
lock.lock();
try{
condition.signal();
} finally {
lock.unlock();
}
}
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
public void cancel(){
dispose();
}
public void waitForCompletion(){
lock.lock();
try{
while(!timeoutRunnable.isTimedOut() && !this.wakeup.isDone()){
try {
condition.await();
} catch (InterruptedException e) {
OseeLog.log(getClass(), Level.SEVERE, e);
}
}
} finally {
lock.unlock();
}
}
public boolean isTimedOut(){
return timeoutRunnable.isTimedOut();
}
public boolean gotResponse(){
return gotResponse ;
}
private void dispose(){
reg.unregister();
wakeup.cancel(false);
this.ex.shutdown();
}
}