blob: a0b53b75e454d77dda9c537acaf711166b740ae5 [file] [log] [blame]
/***************************************************************************
* Copyright (c) 2004 - 2008 Eike Stepper, Germany.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Simon McDuff - initial API and implementation
* Eike Stepper - maintenance
**************************************************************************/
package org.eclipse.emf.cdo.internal.server;
import org.eclipse.emf.cdo.common.query.CDOQueryInfo;
import org.eclipse.emf.cdo.internal.server.bundle.OM;
import org.eclipse.emf.cdo.server.IQueryContext;
import org.eclipse.emf.cdo.server.IQueryHandler;
import org.eclipse.emf.cdo.server.IRepository;
import org.eclipse.emf.cdo.server.IRepositoryElement;
import org.eclipse.emf.cdo.server.IView;
import org.eclipse.emf.cdo.server.StoreThreadLocal;
import org.eclipse.net4j.util.container.SingleDeltaContainerEvent;
import org.eclipse.net4j.util.container.IContainerDelta.Kind;
import org.eclipse.net4j.util.event.IEvent;
import org.eclipse.net4j.util.event.IListener;
import org.eclipse.net4j.util.lifecycle.Lifecycle;
import org.eclipse.net4j.util.om.trace.ContextTracer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* @author Simon McDuff
* @since 2.0
*/
public class QueryManager extends Lifecycle implements IRepositoryElement
{
private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SESSION, QueryManager.class);
private IRepository repository;
private Map<Integer, QueryContext> queryContexts = new ConcurrentHashMap<Integer, QueryContext>();
private ExecutorService executors;
private int nextQuery;
public QueryManager()
{
}
public IRepository getRepository()
{
return repository;
}
public void setRepository(IRepository repository)
{
this.repository = repository;
}
public synchronized ExecutorService getExecutors()
{
if (executors == null)
{
executors = Executors.newFixedThreadPool(10);
}
return executors;
}
public void setExecutors(ExecutorService executors)
{
this.executors = executors;
}
public QueryResult execute(IView view, CDOQueryInfo queryInfo)
{
QueryResult queryResult = new QueryResult(view, queryInfo, nextQuery());
QueryContext queryContext = new QueryContext(queryResult);
execute(queryContext);
return queryResult;
}
public boolean isRunning(int queryID)
{
QueryContext queryContext = queryContexts.get(queryID);
return queryContext != null;
}
public void cancel(int queryID)
{
QueryContext queryContext = queryContexts.get(queryID);
if (queryContext == null || queryContext.getFuture().isDone())
{
throw new RuntimeException("Query " + queryID + " is not running anymore");
}
if (TRACER.isEnabled())
{
TRACER.trace("Cancelling query for context: " + queryContext);
}
queryContext.cancel();
}
public synchronized void register(final QueryContext queryContext)
{
queryContexts.put(queryContext.getQueryResult().getQueryID(), queryContext);
queryContext.addListener();
}
public synchronized void unregister(final QueryContext queryContext)
{
if (queryContexts.remove(queryContext.getQueryResult().getQueryID()) != null)
{
queryContext.removeListener();
}
}
public synchronized int nextQuery()
{
return nextQuery++;
}
private Future<?> execute(QueryContext queryContext)
{
Future<?> future = getExecutors().submit(queryContext);
queryContext.setFuture(future);
register(queryContext);
return future;
}
/**
* @author Simon McDuff
* @since 2.0
*/
private class QueryContext implements IQueryContext, Runnable
{
private long timeStamp;
private QueryResult queryResult;
private boolean started;
private boolean cancelled;
private int resultCount;
private Future<?> future;
private IListener sessionListener = new IListener()
{
public void notifyEvent(IEvent event)
{
if (event instanceof SingleDeltaContainerEvent)
{
IView view = getQueryResult().getView();
SingleDeltaContainerEvent<?> deltaEvent = (SingleDeltaContainerEvent<?>)event;
if (deltaEvent.getDeltaKind() == Kind.REMOVED && deltaEvent.getDeltaElement() == view)
{
// Cancel the query when view is closing
cancel();
}
}
}
};
public QueryContext(QueryResult queryResult)
{
this.queryResult = queryResult;
// Remember the timeStamp because it can change for audits
View view = (View)getView();
timeStamp = view.getTimeStamp();
}
public QueryResult getQueryResult()
{
return queryResult;
}
public IView getView()
{
return queryResult.getView();
}
public long getTimeStamp()
{
return timeStamp;
}
public Future<?> getFuture()
{
return future;
}
public void setFuture(Future<?> future)
{
this.future = future;
}
public void cancel()
{
cancelled = true;
if (future != null)
{
future.cancel(true);
}
if (!started)
{
unregister(this);
}
}
public boolean addResult(Object object)
{
if (resultCount == 0)
{
throw new IllegalStateException("Maximum number of results exceeded");
}
queryResult.getQueue().add(object);
return !cancelled && --resultCount > 0;
}
public void run()
{
Session session = (Session)queryResult.getView().getSession();
StoreThreadLocal.setSession(session);
try
{
started = true;
CDOQueryInfo info = queryResult.getQueryInfo();
resultCount = info.getMaxResults() < 0 ? Integer.MAX_VALUE : info.getMaxResults();
IQueryHandler handler = repository.getQueryHandler(info);
handler.executeQuery(info, this);
}
catch (Throwable exception)
{
queryResult.getQueue().setException(exception);
}
finally
{
queryResult.getQueue().close();
unregister(this);
StoreThreadLocal.release();
}
}
public void addListener()
{
IView view = getQueryResult().getView();
view.getSession().addListener(sessionListener);
}
public void removeListener()
{
IView view = getQueryResult().getView();
view.getSession().removeListener(sessionListener);
}
}
}