blob: 716c38a2df5bcd17265e58bb13bb6b6b98de375f [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2004, 2007 Boeing.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Boeing - initial API and implementation
*******************************************************************************/
package org.eclipse.osee.framework.search.engine.internal;
import java.util.Collection;
import java.util.Deque;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Set;
import java.util.logging.Level;
import org.eclipse.osee.framework.core.data.JoinUtility;
import org.eclipse.osee.framework.core.data.JoinUtility.JoinItem;
import org.eclipse.osee.framework.db.connection.DbTransaction;
import org.eclipse.osee.framework.db.connection.OseeConnection;
import org.eclipse.osee.framework.db.connection.exception.OseeCoreException;
import org.eclipse.osee.framework.db.connection.exception.OseeDataStoreException;
import org.eclipse.osee.framework.logging.OseeLog;
import org.eclipse.osee.framework.search.engine.ITagListener;
import org.eclipse.osee.framework.search.engine.attribute.AttributeData;
import org.eclipse.osee.framework.search.engine.attribute.AttributeDataStore;
import org.eclipse.osee.framework.search.engine.data.IAttributeLocator;
import org.eclipse.osee.framework.search.engine.data.SearchTag;
import org.eclipse.osee.framework.search.engine.utility.ITagCollector;
import org.eclipse.osee.framework.search.engine.utility.SearchTagDataStore;
/**
* @author Roberto E. Escobar
*/
class TaggerRunnable implements Runnable {
private final Set<ITagListener> listeners;
private final int tagQueueQueryId;
private final boolean isCacheAll;
private final int cacheLimit;
private long processingTime;
private final long waitStart;
private long waitTime;
TaggerRunnable(int tagQueueQueryId, boolean isCacheAll, int cacheLimit) {
this.listeners = new HashSet<ITagListener>();
this.tagQueueQueryId = tagQueueQueryId;
this.waitStart = System.currentTimeMillis();
this.waitTime = 0;
this.processingTime = 0;
this.cacheLimit = cacheLimit;
this.isCacheAll = isCacheAll;
}
public int getTagQueueQueryId() {
return tagQueueQueryId;
}
public void addListener(ITagListener listener) {
if (listener != null) {
this.listeners.add(listener);
}
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
this.waitTime = System.currentTimeMillis() - this.waitStart;
long processStart = System.currentTimeMillis();
try {
AttributeToTagTx attributeToTagTx = new AttributeToTagTx();
attributeToTagTx.execute();
} catch (Exception ex) {
OseeLog.log(Activator.class, Level.SEVERE, String.format("Unable to tag - tagQueueQueryId [%d]",
getTagQueueQueryId()), ex);
} finally {
this.processingTime = System.currentTimeMillis() - processStart;
notifyOnTagQueryIdTagComplete();
}
this.listeners.clear();
}
private void notifyOnAttributeTagComplete(long gammaId, int totalTags, long processingTime) {
for (ITagListener listener : listeners) {
try {
listener.onAttributeTagComplete(tagQueueQueryId, gammaId, totalTags, processingTime);
} catch (Exception ex) {
OseeLog.log(TaggerRunnable.class, Level.SEVERE, String.format("Error notifying listener: [%s] ",
listener.getClass().getName()), ex);
}
}
}
private void notifyOnTagQueryIdTagComplete() {
for (ITagListener listener : listeners) {
try {
listener.onTagQueryIdTagComplete(tagQueueQueryId, waitTime, processingTime);
} catch (Exception ex) {
OseeLog.log(TaggerRunnable.class, Level.SEVERE, String.format("Error notifying listener: [%s] ",
listener.getClass().getName()), ex);
}
}
}
private void notifyOnAttributeAddTagEvent(long gammaId, String word, long codedTag) {
for (ITagListener listener : listeners) {
try {
listener.onAttributeAddTagEvent(tagQueueQueryId, gammaId, word, codedTag);
} catch (Exception ex) {
OseeLog.log(TaggerRunnable.class, Level.SEVERE, String.format("Error notifying listener: [%s] ",
listener.getClass().getName()), ex);
}
}
}
private final class AttributeToTagTx extends DbTransaction implements ITagCollector {
private static final int TOTAL_RETRIES = 10;
private final Deque<SearchTag> searchTags;
private SearchTag currentTag;
public AttributeToTagTx() throws OseeCoreException {
super();
this.searchTags = new LinkedList<SearchTag>();
this.currentTag = null;
}
private Collection<AttributeData> getDataFromQueryId(OseeConnection connection, int queryId, final int numberOfRetries) throws OseeDataStoreException {
Collection<AttributeData> attributeDatas = AttributeDataStore.getAttribute(connection, getTagQueueQueryId());
// Re-try in case query id hasn't been committed to the database
int retry = 0;
while (attributeDatas.isEmpty() && retry < numberOfRetries) {
try {
Thread.sleep(2000);
} catch (InterruptedException ex) {
OseeLog.log(Activator.class, Level.WARNING, ex);
}
attributeDatas = AttributeDataStore.getAttribute(connection, getTagQueueQueryId());
retry++;
}
return attributeDatas;
}
/* (non-Javadoc)
* @see org.eclipse.osee.framework.db.connection.core.transaction.DbTransaction#handleTxWork(java.sql.Connection)
*/
@Override
protected void handleTxWork(OseeConnection connection) throws OseeCoreException {
Collection<AttributeData> attributeDatas = getDataFromQueryId(connection, getTagQueueQueryId(), TOTAL_RETRIES);
if (!attributeDatas.isEmpty()) {
try {
SearchTagDataStore.deleteTags(connection,
attributeDatas.toArray(new IAttributeLocator[attributeDatas.size()]));
for (AttributeData attributeData : attributeDatas) {
long startItemTime = System.currentTimeMillis();
this.currentTag = new SearchTag(attributeData.getGammaId());
this.searchTags.add(this.currentTag);
try {
Activator.getTaggerManager().tagIt(attributeData, this);
checkSizeStoreIfNeeded(connection);
} catch (Throwable ex) {
// ex.printStackTrace();
OseeLog.log(Activator.class, Level.SEVERE, String.format("Unable to tag - [%s]", this.currentTag));
} finally {
notifyOnAttributeTagComplete(this.currentTag.getGammaId(), this.currentTag.getTotalTags(),
(System.currentTimeMillis() - startItemTime));
this.currentTag = null;
}
}
store(connection, this.searchTags);
JoinUtility.deleteQuery(connection, JoinItem.TAG_GAMMA_QUEUE, getTagQueueQueryId());
} catch (Exception ex) {
OseeLog.log(Activator.class, Level.SEVERE, String.format("Unable to store tags - tagQueueQueryId [%d]",
getTagQueueQueryId()), ex);
}
} else {
System.out.println(String.format("Empty gamma query id: %s", getTagQueueQueryId()));
}
}
/* (non-Javadoc)
* @see org.eclipse.osee.framework.db.connection.core.transaction.DbTransaction#handleTxFinally()
*/
@Override
protected void handleTxFinally() throws OseeCoreException {
super.handleTxFinally();
for (SearchTag searchTag : this.searchTags) {
searchTag.clearCache();
}
}
/* (non-Javadoc)
* @see org.eclipse.osee.framework.search.engine.utility.ITagCollector#addTag(java.lang.String, java.lang.Long)
*/
@Override
public void addTag(String word, Long codedTag) {
if (this.currentTag != null) {
this.currentTag.addTag(codedTag);
notifyOnAttributeAddTagEvent(this.currentTag.getGammaId(), word, codedTag);
}
}
private void checkSizeStoreIfNeeded(OseeConnection connection) throws OseeDataStoreException {
int cummulative = 0;
boolean needsStorage = false;
for (SearchTag item : this.searchTags) {
cummulative += item.cacheSize();
if (isCacheAll != true && cummulative >= cacheLimit) {
needsStorage = true;
break;
}
}
if (needsStorage) {
store(connection, this.searchTags);
}
}
private void store(OseeConnection connection, Collection<SearchTag> toStore) throws OseeDataStoreException {
SearchTagDataStore.storeTags(connection, toStore);
for (SearchTag item : toStore) {
item.clearCache();
}
}
}
}