blob: 283dee611188b3f095f837630bb74f3f7ad6d898 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2004, 2007 Boeing.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Boeing - initial API and implementation
*******************************************************************************/
package org.eclipse.osee.framework.search.engine.internal;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import org.eclipse.osee.framework.core.server.CoreServerActivator;
import org.eclipse.osee.framework.db.connection.DbTransaction;
import org.eclipse.osee.framework.db.connection.OseeConnection;
import org.eclipse.osee.framework.db.connection.exception.OseeCoreException;
import org.eclipse.osee.framework.search.engine.ISearchEngineTagger;
import org.eclipse.osee.framework.search.engine.ITagListener;
import org.eclipse.osee.framework.search.engine.ITaggerStatistics;
import org.eclipse.osee.framework.search.engine.utility.SearchTagDataStore;
/**
* @author Roberto E. Escobar
*/
public final class SearchEngineTagger implements ISearchEngineTagger {
private static final int CACHE_LIMIT = 1000;
private ExecutorService executor;
private Map<Integer, FutureTask<?>> futureTasks;
private TaggerStatistics statistics;
public SearchEngineTagger() {
this.statistics = new TaggerStatistics();
this.futureTasks = Collections.synchronizedMap(new HashMap<Integer, FutureTask<?>>());
this.executor = Executors.newFixedThreadPool(3, CoreServerActivator.createNewThreadFactory("tagger.worker"));
Timer timer = new Timer("Start-Up Tagger");
timer.schedule(new StartUpRunnable(this), 3000);
}
/* (non-Javadoc)
* @see org.eclipse.osee.framework.search.engine.ISearchEngineTagger#deleteTags(int)
*/
@Override
public int deleteTags(int joinQueryId) throws Exception {
DeleteTagsTx deleteTransaction = new DeleteTagsTx(joinQueryId);
deleteTransaction.execute();
return deleteTransaction.rowsDeleted();
}
/* (non-Javadoc)
* @see org.eclipse.osee.framework.search.engine.ISearchEngineTagger#tagByQueueQueryId(int)
*/
@Override
public void tagByQueueQueryId(int queryId) {
tagByQueueQueryId(null, queryId);
}
/* (non-Javadoc)
* @see org.eclipse.osee.framework.search.engine.ISearchEngineTagger#tagByQueueQueryId(org.eclipse.osee.framework.search.engine.ITagListener, int)
*/
@Override
public void tagByQueueQueryId(ITagListener listener, int queryId) {
TaggerRunnable runnable = new TaggerRunnable(queryId, false, CACHE_LIMIT);
runnable.addListener(statistics);
if (listener != null) {
runnable.addListener(listener);
listener.onTagQueryIdSubmit(queryId);
}
FutureTask<Object> futureTask = new FutureTaggingTask(runnable);
this.futureTasks.put(queryId, futureTask);
this.executor.submit(futureTask);
}
/* (non-Javadoc)
* @see org.eclipse.osee.framework.search.engine.ISearchEngineTagger#tagByBranchId(org.eclipse.osee.framework.search.engine.ITagListener, int)
*/
@Override
public void tagByBranchId(ITagListener listener, int branchId) throws OseeCoreException {
this.executor.submit(new BranchTaggerRunnable(this, listener, branchId, false, CACHE_LIMIT));
}
/* (non-Javadoc)
* @see org.eclipse.osee.framework.search.engine.ISearchEngineTagger#tagByBranchId(int)
*/
@Override
public void tagByBranchId(int branchId) throws OseeCoreException {
tagByBranchId(null, branchId);
}
/* (non-Javadoc)
* @see org.eclipse.osee.framework.search.engine.ISearchEngineTagger#tagFromXmlStream(org.eclipse.osee.framework.search.engine.ITagListener, java.io.InputStream)
*/
@Override
public void tagFromXmlStream(ITagListener listener, InputStream inputStream) throws Exception {
InputStreamTagProcessor inputStreamTagProcessor =
new InputStreamTagProcessor(this, listener, inputStream, false, CACHE_LIMIT);
inputStreamTagProcessor.execute();
}
/* (non-Javadoc)
* @see org.eclipse.osee.framework.search.engine.ISearchEngineTagger#tagFromXmlStream(java.io.InputStream)
*/
@Override
public void tagFromXmlStream(InputStream inputStream) throws Exception {
tagFromXmlStream(null, inputStream);
}
/* (non-Javadoc)
* @see org.eclipse.osee.framework.search.engine.ISearchEngineTagger#getWorkersInQueue()
*/
@Override
public int getWorkersInQueue() {
return futureTasks.size();
}
/* (non-Javadoc)
* @see org.eclipse.osee.framework.search.engine.ISearchEngineTagger#clearStatistics()
*/
@Override
public void clearStatistics() {
this.statistics.clear();
}
/* (non-Javadoc)
* @see org.eclipse.osee.framework.search.engine.ISearchEngineTagger#getStatistics()
*/
@Override
public ITaggerStatistics getStatistics() {
try {
return this.statistics.clone();
} catch (CloneNotSupportedException ex) {
return TaggerStatistics.EMPTY_STATS;
}
}
/* (non-Javadoc)
* @see org.eclipse.osee.framework.search.engine.ISearchEngineTagger#stopTaggingQueryId(int...)
*/
@Override
public int stopTaggingByQueueQueryId(int... queryId) {
int toReturn = 0;
for (int item : queryId) {
FutureTask<?> task = futureTasks.get(item);
if (task != null) {
if (task.isDone()) {
toReturn++;
} else {
if (task.cancel(true)) {
toReturn++;
}
}
}
}
return toReturn;
}
/* (non-Javadoc)
* @see org.eclipse.osee.framework.search.engine.ISearchEngineTagger#stopAllTagging()
*/
@Override
public int stopAllTagging() {
int index = 0;
Set<Integer> list = futureTasks.keySet();
int[] toProcess = new int[list.size()];
for (Integer item : list) {
toProcess[index] = item;
index++;
}
return stopTaggingByQueueQueryId(toProcess);
}
private final class FutureTaggingTask extends FutureTask<Object> {
private TaggerRunnable runnable;
public FutureTaggingTask(TaggerRunnable runnable) {
super(runnable, null);
this.runnable = runnable;
}
/* (non-Javadoc)
* @see java.util.concurrent.FutureTask#done()
*/
@Override
protected void done() {
futureTasks.remove(runnable.getTagQueueQueryId());
}
}
private final class DeleteTagsTx extends DbTransaction {
private final int queryId;
private int updated;
public DeleteTagsTx(int queryId) throws OseeCoreException {
super();
this.queryId = queryId;
this.updated = -1;
}
public int rowsDeleted() {
return updated;
}
@Override
protected void handleTxWork(OseeConnection connection) throws OseeCoreException {
this.updated = SearchTagDataStore.deleteTags(connection, queryId);
}
}
}