| /* |
| * Copyright (c) 2008-2013 Eike Stepper (Berlin, Germany) and others. |
| * All rights reserved. This program and the accompanying materials |
| * are made available under the terms of the Eclipse Public License v1.0 |
| * which accompanies this distribution, and is available at |
| * http://www.eclipse.org/legal/epl-v10.html |
| * |
| * Contributors: |
| * Simon McDuff - initial API and implementation |
| * Eike Stepper - maintenance |
| */ |
| package org.eclipse.emf.cdo.internal.server; |
| |
| import org.eclipse.emf.cdo.common.branch.CDOBranch; |
| import org.eclipse.emf.cdo.common.branch.CDOBranchPoint; |
| import org.eclipse.emf.cdo.common.util.CDOQueryInfo; |
| import org.eclipse.emf.cdo.common.util.CDOQueryQueue; |
| import org.eclipse.emf.cdo.internal.server.bundle.OM; |
| import org.eclipse.emf.cdo.server.IQueryContext; |
| import org.eclipse.emf.cdo.server.IQueryHandler; |
| import org.eclipse.emf.cdo.server.IRepository; |
| import org.eclipse.emf.cdo.server.IView; |
| import org.eclipse.emf.cdo.server.StoreThreadLocal; |
| import org.eclipse.emf.cdo.spi.common.branch.CDOBranchUtil; |
| import org.eclipse.emf.cdo.spi.server.InternalQueryManager; |
| import org.eclipse.emf.cdo.spi.server.InternalQueryResult; |
| import org.eclipse.emf.cdo.spi.server.InternalRepository; |
| import org.eclipse.emf.cdo.spi.server.InternalSession; |
| import org.eclipse.emf.cdo.spi.server.InternalView; |
| |
| import org.eclipse.net4j.util.concurrent.ConcurrencyUtil; |
| import org.eclipse.net4j.util.concurrent.ThreadPool; |
| import org.eclipse.net4j.util.container.IContainerDelta.Kind; |
| import org.eclipse.net4j.util.container.SingleDeltaContainerEvent; |
| import org.eclipse.net4j.util.event.IEvent; |
| import org.eclipse.net4j.util.event.IListener; |
| import org.eclipse.net4j.util.lifecycle.Lifecycle; |
| import org.eclipse.net4j.util.om.trace.ContextTracer; |
| |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| |
| /** |
| * @author Simon McDuff |
| * @since 2.0 |
| */ |
| public class QueryManager extends Lifecycle implements InternalQueryManager |
| { |
| private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SESSION, QueryManager.class); |
| |
| private InternalRepository repository; |
| |
| private Map<Integer, QueryContext> queryContexts = new ConcurrentHashMap<Integer, QueryContext>(); |
| |
| private ExecutorService executors; |
| |
| private boolean shutdownExecutorService; |
| |
| private int nextQuery; |
| |
| private boolean allowInterruptRunningQueries = true; |
| |
| public QueryManager() |
| { |
| } |
| |
| public InternalRepository getRepository() |
| { |
| return repository; |
| } |
| |
| public void setRepository(InternalRepository repository) |
| { |
| this.repository = repository; |
| |
| String value = repository.getProperties().get(IRepository.Props.ALLOW_INTERRUPT_RUNNING_QUERIES); |
| if (value != null) |
| { |
| allowInterruptRunningQueries = Boolean.parseBoolean(value); |
| } |
| } |
| |
| public synchronized ExecutorService getExecutors() |
| { |
| if (executors == null) |
| { |
| executors = ConcurrencyUtil.getExecutorService(repository); |
| |
| if (executors == null) |
| { |
| shutdownExecutorService = true; |
| executors = ThreadPool.create(); |
| } |
| } |
| |
| return executors; |
| } |
| |
| public synchronized void setExecutors(ExecutorService executors) |
| { |
| if (shutdownExecutorService) |
| { |
| if (this.executors != null) |
| { |
| this.executors.shutdown(); |
| } |
| |
| shutdownExecutorService = false; |
| } |
| |
| this.executors = executors; |
| } |
| |
| public InternalQueryResult execute(InternalView view, CDOQueryInfo queryInfo) |
| { |
| InternalQueryResult queryResult = new QueryResult(view, queryInfo, getNextQueryID()); |
| QueryContext queryContext = new QueryContext(queryResult); |
| execute(queryContext); |
| return queryResult; |
| } |
| |
| public boolean isRunning(int queryID) |
| { |
| QueryContext queryContext = queryContexts.get(queryID); |
| return queryContext != null; |
| } |
| |
| public void cancel(int queryID) |
| { |
| QueryContext queryContext = queryContexts.get(queryID); |
| if (queryContext == null || queryContext.getFuture().isDone()) |
| { |
| throw new RuntimeException("Query " + queryID + " is not running anymore"); //$NON-NLS-1$ //$NON-NLS-2$ |
| } |
| |
| if (TRACER.isEnabled()) |
| { |
| TRACER.trace("Cancelling query for context: " + queryContext); //$NON-NLS-1$ |
| } |
| |
| queryContext.cancel(); |
| } |
| |
| public synchronized void register(QueryContext queryContext) |
| { |
| int queryID = queryContext.getQueryResult().getQueryID(); |
| queryContexts.put(queryID, queryContext); |
| queryContext.addListener(); |
| } |
| |
| public synchronized void unregister(QueryContext queryContext) |
| { |
| int queryID = queryContext.getQueryResult().getQueryID(); |
| queryContexts.remove(queryID); |
| queryContext.removeListener(); |
| } |
| |
| public synchronized int getNextQueryID() |
| { |
| return ++nextQuery; |
| } |
| |
| @Override |
| protected void doDeactivate() throws Exception |
| { |
| super.doDeactivate(); |
| setExecutors(null); |
| } |
| |
| private Future<?> execute(QueryContext queryContext) |
| { |
| register(queryContext); |
| |
| Future<?> future = getExecutors().submit(queryContext); |
| queryContext.setFuture(future); |
| return future; |
| } |
| |
| /** |
| * @author Simon McDuff |
| * @since 2.0 |
| */ |
| private class QueryContext implements IQueryContext, Runnable |
| { |
| private CDOBranchPoint branchPoint; |
| |
| private InternalQueryResult queryResult; |
| |
| private boolean started; |
| |
| private boolean cancelled; |
| |
| private int resultCount; |
| |
| private Future<?> future; |
| |
| private IListener sessionListener = new IListener() |
| { |
| public void notifyEvent(IEvent event) |
| { |
| if (event instanceof SingleDeltaContainerEvent<?>) |
| { |
| IView view = getQueryResult().getView(); |
| SingleDeltaContainerEvent<?> deltaEvent = (SingleDeltaContainerEvent<?>)event; |
| if (deltaEvent.getDeltaKind() == Kind.REMOVED && deltaEvent.getDeltaElement() == view) |
| { |
| // Cancel the query when view is closing |
| cancel(); |
| } |
| } |
| } |
| }; |
| |
| public QueryContext(InternalQueryResult queryResult) |
| { |
| this.queryResult = queryResult; |
| |
| // Remember the branchPoint because it can change |
| InternalView view = getView(); |
| |
| // long timeStamp = view.getTimeStamp(); |
| // if (timeStamp == CDOBranchPoint.UNSPECIFIED_DATE && repository.isSupportingAudits()) |
| // { |
| // timeStamp = repository.getTimeStamp(); |
| // } |
| // |
| // branchPoint = view.getBranch().getPoint(timeStamp); |
| |
| branchPoint = CDOBranchUtil.copyBranchPoint(view); |
| } |
| |
| public InternalQueryResult getQueryResult() |
| { |
| return queryResult; |
| } |
| |
| public InternalView getView() |
| { |
| return queryResult.getView(); |
| } |
| |
| public CDOBranch getBranch() |
| { |
| return branchPoint.getBranch(); |
| } |
| |
| public long getTimeStamp() |
| { |
| return branchPoint.getTimeStamp(); |
| } |
| |
| public Future<?> getFuture() |
| { |
| return future; |
| } |
| |
| public void setFuture(Future<?> future) |
| { |
| this.future = future; |
| } |
| |
| public void cancel() |
| { |
| cancelled = true; |
| if (future != null) |
| { |
| future.cancel(allowInterruptRunningQueries); |
| } |
| |
| if (!started) |
| { |
| unregister(this); |
| } |
| } |
| |
| public int getResultCount() |
| { |
| return resultCount; |
| } |
| |
| public boolean addResult(Object object) |
| { |
| if (resultCount == 0) |
| { |
| throw new IllegalStateException("Maximum number of results exceeded"); //$NON-NLS-1$ |
| } |
| |
| CDOQueryQueue<Object> queue = queryResult.getQueue(); |
| queue.add(object); |
| |
| return !cancelled && --resultCount > 0; |
| } |
| |
| public void run() |
| { |
| CDOQueryQueue<Object> queue = queryResult.getQueue(); |
| |
| InternalSession session = queryResult.getView().getSession(); |
| StoreThreadLocal.setSession(session); |
| |
| try |
| { |
| started = true; |
| |
| CDOQueryInfo info = queryResult.getQueryInfo(); |
| resultCount = info.getMaxResults() < 0 ? Integer.MAX_VALUE : info.getMaxResults(); |
| IQueryHandler handler = repository.getQueryHandler(info); |
| |
| try |
| { |
| handler.executeQuery(info, this); |
| } |
| catch (Throwable executionException) |
| { |
| // int xxx; |
| // ConcurrencyUtil.sleep(2000); |
| |
| addResult(executionException); |
| return; |
| } |
| } |
| catch (Throwable initializationException) |
| { |
| queue.setException(initializationException); |
| } |
| finally |
| { |
| queue.close(); |
| unregister(this); |
| StoreThreadLocal.release(); |
| } |
| } |
| |
| public void addListener() |
| { |
| InternalSession session = getQueryResult().getView().getSession(); |
| session.addListener(sessionListener); |
| } |
| |
| public void removeListener() |
| { |
| InternalSession session = getQueryResult().getView().getSession(); |
| session.removeListener(sessionListener); |
| } |
| } |
| } |