blob: 9b13bb42eba2477becf8d53680c1b2b923839f93 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2018 Aston University.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the Eclipse
* Public License, v. 2.0 are satisfied: GNU General Public License, version 3.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-3.0
*
* Contributors:
* Antonio Garcia-Dominguez - initial API and implementation
******************************************************************************/
package org.eclipse.hawk.greycat.lucene;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.UUID;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.DoublePoint;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.BooleanQuery.Builder;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.PrefixQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.SimpleCollector;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedNumericSortField;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHitCountCollector;
import org.apache.lucene.search.WildcardQuery;
import org.eclipse.hawk.core.graph.IGraphIterable;
import org.eclipse.hawk.core.graph.IGraphNode;
import org.eclipse.hawk.core.graph.timeaware.ITimeAwareGraphNode;
import org.eclipse.hawk.core.graph.timeaware.ITimeAwareGraphNodeIndex;
import org.eclipse.hawk.greycat.AbstractGreycatDatabase;
import org.eclipse.hawk.greycat.GreycatNode;
import org.eclipse.hawk.greycat.lucene.IntervalCollector.Interval;
import org.eclipse.hawk.greycat.lucene.SoftTxLucene.SearcherCloseable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
/**
* <p>Integration between Greycat and Apache Lucene, to allow it to have the type
* of advanced indexing that we need for Hawk.</p>
*
* <p>The standard approach for Lucene is to commit only every so often, in a background
* thread: commits are extremely expensive!</p>
*
* <p>We want to follow the same approach, while being able to react to real time queries
* easily. To do this, we keep "soft" tx with a rollback log: should a soft rollback be
* requested, the various operations since the previous soft commit will be undone in
* reverse order in-memory.</p>
*
* <p>We have a background thread that will do a real commit if the rollback log is
* empty. There is also an explicit commit when this indexer shuts down.</p>
*
* <p>TODO: add support for multiple worlds to this index. This may require keeping track
* of how worlds branch off from each other.</p>
*/
public class GreycatLuceneIndexer {
private static final Logger LOGGER = LoggerFactory.getLogger(GreycatLuceneNodeIndex.class);
private static final String ATTRIBUTE_PREFIX = "a_";
private static final String UUID_FIELD = "h_id";
private static final String INDEX_FIELD = "h_index";
private static final String DOCTYPE_FIELD = "h_doctype";
private static final String FIELDS_FIELD = "h_fields";
private static final String INDEX_DOCTYPE = "indexdecl";
/** Node ID, given by Greycat. */
private static final String NODEID_FIELD = "h_nodeid";
/**
* Timepoint from which this index entry is valid. This is set to the timepoint
* of the node being indexed.
*/
private static final String VALIDFROM_FIELD = "h_from";
/**
* Timepoint up to which (itself included) this index entry is valid. This is
* initially set to {@link Long#MAX_VALUE}, but it may be reduced if the index
* entry is overridden or removed later.
*/
private static final String VALIDTO_FIELD = "h_to";
protected class MatchExistsCollector extends SimpleCollector {
private boolean matchFound = false;
@Override
public boolean needsScores() {
return false;
}
@Override
public void collect(int doc) throws IOException {
matchFound = true;
throw new CollectionTerminatedException();
}
public boolean isMatchFound() {
return matchFound;
}
}
protected final class NodeListCollector extends ListCollector {
private final Long timepoint;
protected NodeListCollector(IndexSearcher searcher, Long timepoint) {
super(searcher);
this.timepoint = timepoint;
}
public Iterator<GreycatNode> getNodeIterator() throws IOException {
final List<Document> docs = getDocuments();
final Iterator<Document> itDocs = docs.iterator();
return new Iterator<GreycatNode>() {
@Override
public boolean hasNext() {
return itDocs.hasNext();
}
@Override
public GreycatNode next() {
Document document = itDocs.next();
return getNodeByDocumentAt(document, timepoint);
}
};
}
}
protected final class LuceneGraphIterable implements IGraphIterable<GreycatNode> {
private final Query query;
private final Long timepoint;
protected LuceneGraphIterable(Query query, Long timepoint) {
this.query = query;
this.timepoint = timepoint;
}
@Override
public Iterator<GreycatNode> iterator() {
try (SearcherCloseable sc = lucene.getSearcher()) {
final IndexSearcher searcher = sc.get();
final NodeListCollector lc = new NodeListCollector(searcher, timepoint);
searcher.search(query, lc);
return lc.getNodeIterator();
} catch (IOException e) {
LOGGER.error("Failed to obtain result", e);
return Collections.emptyIterator();
}
}
@Override
public int size() {
try (SearcherCloseable sc = lucene.getSearcher()) {
final IndexSearcher searcher = sc.get();
final TotalHitCountCollector collector = new TotalHitCountCollector();
searcher.search(query, collector);
return collector.getTotalHits();
} catch (IOException e) {
LOGGER.error("Failed to obtain size", e);
return 0;
}
}
@Override
public GreycatNode getSingle() {
try (SearcherCloseable sc = lucene.getSearcher()) {
final IndexSearcher searcher = sc.get();
TopDocs results = searcher.search(query, 1);
if (results.totalHits > 0) {
final Document document = searcher.doc(results.scoreDocs[0].doc);
final GreycatNode node = getNodeByDocumentAt(document, timepoint);
return node;
}
} catch (IOException e) {
LOGGER.error("Failed to obtain single result", e);
}
throw new NoSuchElementException();
}
}
/**
* Implements a node index as a collection of documents, with a single document
* representing the existence of the index itself. The default timepoint can be
* optionally specified upon creation: if not set, we will refer to the
* database's current time.
*/
public final class GreycatLuceneNodeIndex implements ITimeAwareGraphNodeIndex {
private final String name;
/**
* Timepoint which may override the current graph time, if not <code>null</code>.
* If <code>null</code> (the default), the current graph timepoint will be used.
*/
private final Long timepoint;
private static final String NODE_DOCTYPE = "node";
public GreycatLuceneNodeIndex(String name) {
this(name, null);
}
public GreycatLuceneNodeIndex(String name, Long timepoint) {
this.name = name;
this.timepoint = timepoint;
}
@Override
public void remove(IGraphNode n, String key, Object value) {
try (SearcherCloseable sc = lucene.getSearcher()) {
final IndexSearcher searcher = sc.get();
final GreycatNode gn = (GreycatNode) n;
/*
* All documents from this point in time in the index need to be revised. One
* may be still valid, others just need to have the future value removed.
*
* If both key and value are present, we can add those to the query to reduce
* the number of documents to be changed.
*/
final Builder queryBuilder = getIndexQueryBuilder()
.add(LongPoint.newExactQuery(NODEID_FIELD, gn.getId()), Occur.FILTER)
.add(LongPoint.newRangeQuery(VALIDTO_FIELD, gn.getTime(), Long.MAX_VALUE), Occur.FILTER);
if (key != null && value != null) {
queryBuilder.add(getValueQuery(key, value), Occur.FILTER);
}
final Query query = queryBuilder.build();
final ListCollector collector = new ListCollector(searcher);
searcher.search(query, collector);
if (key == null) {
for (Document doc : collector.getDocuments()) {
removeValue(doc, gn, value);
}
} else {
for (Document doc : collector.getDocuments()) {
removeKeyValue(doc, gn, key, value);
}
}
} catch (IOException e) {
LOGGER.error("Could not remove node from index", e);
}
}
protected void removeKeyValue(final Document oldDocument, final GreycatNode gn, String key, Object value) throws IOException {
final Document updated = new Document();
// Copy all other fields as we go
boolean anyMatched = false;
for (IndexableField field : oldDocument.getFields()) {
boolean matched = false;
if (field.name().equals(ATTRIBUTE_PREFIX + key)) {
if (value == null) {
matched = true;
} else if (value instanceof Float) {
final float fValue = field.numericValue() == null ? Float.valueOf(field.stringValue()) : field.numericValue().floatValue();
matched = fValue == ((float) value);
} else if (value instanceof Double) {
final double fValue = field.numericValue() == null ? Double.valueOf(field.stringValue()) : field.numericValue().doubleValue();
matched = fValue == ((double) value);
} else if (value instanceof Number) {
final Long fValue = field.numericValue() == null ? Long.valueOf(field.stringValue()) : field.numericValue().longValue();
matched = ((Number) value).longValue() == fValue;
} else if (value.equals(field.stringValue())) {
matched = true;
}
}
if (!matched) {
copyField(field, updated);
}
anyMatched = anyMatched || matched;
}
if (anyMatched) {
replaceDocumentAtTimepoint(gn, oldDocument, updated);
}
}
private void replaceDocumentAtTimepoint(final GreycatNode gn, final Document oldDocument, final Document newDocument) throws IOException {
assert oldDocument != null : "Old document should not be null";
assert newDocument != null : "New document should not be null";
assert newDocument.getField(VALIDFROM_FIELD) != null : "New document should have a starting point";
assert newDocument.getField(VALIDTO_FIELD) != null : "New document should have an ending point";
assert oldDocument.getField(UUID_FIELD).stringValue().equals(newDocument.getField(UUID_FIELD).stringValue()) : "Both documents should have same UUID";
final long lOldFrom = oldDocument.getField(VALIDFROM_FIELD).numericValue().longValue();
final long lOldTo = oldDocument.getField(VALIDTO_FIELD).numericValue().longValue();
// Is the old document currently in effect? If so, we need to shorten its lifespan.
if (gn.getTime() >= lOldFrom && gn.getTime() <= lOldTo) {
// the old document is currently in effect, we have to shorten its lifespan
final long lOldNewTo = gn.getTime() - 1;
if (lOldNewTo < lOldFrom) {
updateOrDelete(oldDocument, newDocument);
} else {
// shorten lifespan of old document, generate new UUID
final Document shortenedDoc = copy(oldDocument);
replaceRawField(shortenedDoc, VALIDTO_FIELD, lOldNewTo);
lucene.update(new Term(UUID_FIELD, oldDocument.get(UUID_FIELD)), oldDocument, shortenedDoc);
// generate new UUID for the other document and set starting timepoint
if (newDocument.getField(FIELDS_FIELD) != null) {
final String newUUID = UUID.randomUUID().toString();
replaceRawField(newDocument, UUID_FIELD, newUUID);
replaceRawField(newDocument, VALIDFROM_FIELD, gn.getTime());
lucene.update(new Term(UUID_FIELD, newUUID), null, newDocument);
}
}
} else {
// the old document is not in effect - just replace the values there
updateOrDelete(oldDocument, newDocument);
}
}
private void updateOrDelete(final Document oldDocument, final Document newDocument) throws IOException {
if (newDocument.getField(FIELDS_FIELD) == null) {
// no fields left at this point, just delete
lucene.delete(new Term(UUID_FIELD, oldDocument.get(UUID_FIELD)));
} else {
// old document would not have a lifespan - just replace
lucene.update(new Term(UUID_FIELD, oldDocument.get(UUID_FIELD)), oldDocument, newDocument);
}
}
protected void removeValue(final Document oldDocument, final GreycatNode gn, Object value) throws IOException {
final Document updated = new Document();
boolean matched = false;
for (IndexableField field : oldDocument.getFields()) {
if (field.name().startsWith(ATTRIBUTE_PREFIX)) {
final String existingValue = field.stringValue();
if (value == null || existingValue.equals(value)) {
matched = true;
} else {
copyField(field, updated);
}
} else {
copyField(field, updated);
}
}
if (matched) {
replaceDocumentAtTimepoint(gn, oldDocument, updated);
}
}
@Override
public void remove(IGraphNode n) {
try {
final GreycatNode gn = (GreycatNode) n;
// All documents for this node starting in the future must be deleted
final Query queryToDelete = getIndexQueryBuilder()
.add(findNodeQuery(gn), Occur.FILTER)
.add(LongPoint.newRangeQuery(VALIDFROM_FIELD, gn.getTime() + 1, Long.MAX_VALUE), Occur.FILTER)
.build();
lucene.delete(queryToDelete);
// Currently valid documents must be invalidated from this timepoint
final Query queryToInvalidate = getIndexQueryBuilder()
.add(findValidNodeDocuments(gn), Occur.FILTER)
.build();
invalidateAtTimepoint(gn, queryToInvalidate);
} catch (IOException e) {
LOGGER.error(String.format("Could not remove node with id %d from index %s", n.getId(), name), e);
}
}
@Override
public IGraphIterable<GreycatNode> query(String key, Number from, Number to, boolean fromInclusive, boolean toInclusive) {
Query query;
if (from instanceof Float || to instanceof Double) {
final double dFrom = from.doubleValue(), dTo = to.doubleValue();
query = DoublePoint.newRangeQuery(ATTRIBUTE_PREFIX + key, fromInclusive ? dFrom : DoublePoint.nextUp(dFrom), toInclusive ? dTo : DoublePoint.nextDown(dTo));
} else {
final long lFrom = from.longValue(), lTo = to.longValue();
query = LongPoint.newRangeQuery(ATTRIBUTE_PREFIX + key, fromInclusive ? lFrom : Math.addExact(lFrom, 1), toInclusive ? lTo : Math.addExact(lTo, -1));
}
// Also filter by index and timepoint (using database for now)
query = getIndexQueryBuilder()
.add(query, Occur.FILTER)
.add(findValidDocumentsAtTimepoint(getTimepoint()), Occur.FILTER)
.build();
return new LuceneGraphIterable(query, timepoint);
}
/**
* Counts all the document in this node index. Mostly useful for internal
* testing.
*/
public int countAll() {
return countAll(getIndexQueryBuilder().build());
}
private int countAll(Query query) {
try (SearcherCloseable sc = lucene.getSearcher()) {
final IndexSearcher searcher = sc.get();
final TotalHitCountCollector collector = new TotalHitCountCollector();
searcher.search(query, collector);
return collector.getTotalHits();
} catch (IOException e) {
LOGGER.error("Failed to obtain size", e);
return 0;
}
}
@Override
public IGraphIterable<GreycatNode> query(String key, Object valueExpr) {
final String sValueExpr = valueExpr.toString();
Query valueQuery = null;
if ("*".equals(key)) {
if (!"*".equals(valueExpr)) {
throw new UnsupportedOperationException("*:non-null not implemented yet for query");
} else {
// We can just delegate on the query == null case below
}
} else if ("*".equals(valueExpr)) {
valueQuery = new TermQuery(new Term(FIELDS_FIELD, key));
} else if (valueExpr instanceof Float || valueExpr instanceof Double) {
valueQuery = DoublePoint.newExactQuery(ATTRIBUTE_PREFIX + key, ((Number) valueExpr).doubleValue());
} else if (valueExpr instanceof Number) {
valueQuery = LongPoint.newExactQuery(ATTRIBUTE_PREFIX + key, ((Number) valueExpr).longValue());
} else {
final int starIdx = sValueExpr.indexOf('*');
if (starIdx == -1) {
valueQuery = new TermQuery(new Term(ATTRIBUTE_PREFIX + key, sValueExpr));
} else if (starIdx > 0 && starIdx == sValueExpr.length() - 1) {
final String prefix = sValueExpr.substring(0, sValueExpr.length() - 1);
valueQuery = new PrefixQuery(new Term(ATTRIBUTE_PREFIX + key, prefix));
} else {
valueQuery = new WildcardQuery(new Term(ATTRIBUTE_PREFIX + key, sValueExpr));
}
}
final Builder builder = getIndexQueryBuilder()
.add(findValidDocumentsAtTimepoint(getTimepoint()), Occur.FILTER);
if (valueQuery != null) {
builder.add(valueQuery, Occur.FILTER);
}
final Query query = builder.build();
return new LuceneGraphIterable(query, timepoint);
}
@Override
public String getName() {
return name;
}
@Override
public IGraphIterable<GreycatNode> get(String key, Object valueExpr) {
final Query valueQuery = getValueQuery(key, valueExpr);
final Query query = getIndexQueryBuilder()
.add(valueQuery, Occur.FILTER)
.add(findValidDocumentsAtTimepoint(getTimepoint()), Occur.FILTER)
.build();
return new LuceneGraphIterable(query, timepoint);
}
@Override
public List<Long> getVersions(ITimeAwareGraphNode gn, String key, Object valueExpr, long startTimepointIncluded) {
final Query valueQuery = getValueQuery(key, valueExpr);
final Query query = getIndexQueryBuilder()
.add(valueQuery, Occur.FILTER)
.add(LongPoint.newRangeQuery(VALIDTO_FIELD, startTimepointIncluded, Long.MAX_VALUE), Occur.FILTER)
.add(LongPoint.newExactQuery(NODEID_FIELD, (long) gn.getId()), Occur.FILTER)
.build();
try (SearcherCloseable sc = lucene.getSearcher()) {
final IndexSearcher searcher = sc.get();
/*
* The index is structured in intervals, NOT in specific timepoints - this means
* that a specific interval may contain multiple timepoints. We can ask the node
* for those timepoints - this also ensures composability in whenAnnotated('...')
* operations.
*/
final IntervalCollector<Long> fc = new IntervalCollector<>(
searcher, VALIDFROM_FIELD, VALIDTO_FIELD, f -> f.numericValue().longValue()
);
searcher.search(query, fc);
final List<Interval<Long>> intervals = fc.getValues();
Collections.sort(intervals, (a, b) -> -Long.compare(a.getFrom(), b.getFrom()));
final List<Long> timepoints = new ArrayList<>();
for (Interval<Long> interval : intervals) {
List<Long> intervalTimepoints = gn.getInstantsBetween(interval.getFrom(), interval.getTo());
timepoints.addAll(intervalTimepoints);
}
return timepoints;
} catch (IOException e) {
LOGGER.error("Failed to obtain result", e);
return Collections.emptyList();
}
}
@Override
public Long getEarliestVersionSince(ITimeAwareGraphNode gn, String key, Object valueExpr) {
final Query valueQuery = getValueQuery(key, valueExpr);
final Query query = getIndexQueryBuilder()
.add(valueQuery, Occur.FILTER)
.add(LongPoint.newExactQuery(NODEID_FIELD, (long) gn.getId()), Occur.FILTER)
.add(LongPoint.newRangeQuery(VALIDFROM_FIELD, gn.getTime(), Long.MAX_VALUE), Occur.FILTER)
.build();
try (SearcherCloseable sc = lucene.getSearcher()) {
final IndexSearcher searcher = sc.get();
final Sort sort = new Sort(new SortedNumericSortField(VALIDFROM_FIELD, SortField.Type.LONG));
final ScoreDoc[] hits = searcher.search(query, 1, sort).scoreDocs;
if (hits.length == 0) {
return null;
}
final Document doc = searcher.doc(hits[0].doc);
final long from = doc.getField(VALIDFROM_FIELD).numericValue().longValue();
final long to = doc.getField(VALIDTO_FIELD).numericValue().longValue();
final List<Long> versions = gn.getInstantsBetween(from, to);
if (versions.isEmpty()) {
return null;
}
return versions.get(versions.size() - 1);
} catch (IOException e) {
LOGGER.error("Failed to obtain result", e);
return null;
}
}
private Query getValueQuery(String key, Object valueExpr) {
Query valueQuery;
if (valueExpr instanceof Float || valueExpr instanceof Double) {
valueQuery = DoublePoint.newExactQuery(ATTRIBUTE_PREFIX + key, ((Number) valueExpr).doubleValue());
} else if (valueExpr instanceof Number) {
valueQuery = LongPoint.newExactQuery(ATTRIBUTE_PREFIX + key, ((Number) valueExpr).longValue());
} else {
final Term term = new Term(ATTRIBUTE_PREFIX + key, valueExpr.toString());
valueQuery = new TermQuery(term);
}
return valueQuery;
}
@Override
public void flush() {
lucene.flush();
}
@Override
public void delete() {
// This operation is NOT time-aware: it will drop the entire index in one go.
try {
lucene.delete(new TermQuery(new Term(INDEX_FIELD, name)));
nodeIndexCache.invalidate(name);
} catch (IOException e) {
LOGGER.error("Could not delete index " + name, e);
}
}
@Override
public void add(IGraphNode n, String key, Object value) {
if (value != null) {
add(n, Collections.singletonMap(key, value));
}
}
@Override
public void add(IGraphNode n, Map<String, Object> values) {
if (values == null) {
return;
}
final GreycatNode gn = (GreycatNode)n;
try (SearcherCloseable sc = lucene.getSearcher()) {
final IndexSearcher searcher = sc.get();
// We want to find the currently valid document for this node and update it
final Query latestVersionQuery = getIndexQueryBuilder()
.add(findNodeQuery(gn), Occur.FILTER)
.add(findValidDocumentsAtTimepoint(gn.getTime()), Occur.FILTER)
.build();
final TopDocs results = searcher.search(latestVersionQuery, 1);
Long validTo = null;
if (results.totalHits > 0) {
final Document oldDocument = searcher.doc(results.scoreDocs[0].doc);
if (differenceFound(oldDocument, values)) {
validTo = extendCurrentDocument(gn, values, searcher, results);
}
} else {
validTo = addNewDocument(gn, values, searcher);
}
// If this document does not last forever, we need to update future documents too.
// No need to manipulate lifespans in this case.
if (validTo != null && validTo < Long.MAX_VALUE) {
extendFutureDocuments(gn, values, searcher, validTo);
}
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
}
}
private boolean differenceFound(Document oldDocument, Map<String, Object> values) {
for (Entry<String, Object> e : values.entrySet()) {
boolean matched = false;
for (IndexableField f : oldDocument.getFields(ATTRIBUTE_PREFIX + e.getKey())) {
if (f.numericValue() == null) {
matched = matched || f.stringValue().equals(e.getValue());
} else {
matched = matched || f.numericValue().equals(e.getValue());
}
}
if (!matched) {
return true;
}
}
// All fields have equivalent values - no need to do anything
return false;
}
private void extendFutureDocuments(final GreycatNode gn, Map<String, Object> values,
final IndexSearcher searcher, long validTo) throws IOException {
final Query allFutureQuery = getIndexQueryBuilder()
.add(findNodeQuery(gn), Occur.FILTER)
.add(LongPoint.newRangeQuery(VALIDFROM_FIELD, validTo + 1, Long.MAX_VALUE), Occur.FILTER)
.build();
final ListCollector lc = new ListCollector(searcher);
searcher.search(allFutureQuery, lc);
for (final Document doc : lc.getDocuments()) {
final Document updatedFuture = copy(doc);
addAttributes(updatedFuture, values);
final String uuid = updatedFuture.getField(UUID_FIELD).stringValue();
lucene.update(new Term(UUID_FIELD, uuid), doc, updatedFuture);
}
}
private long addNewDocument(final GreycatNode gn, Map<String, Object> values, final IndexSearcher searcher) throws IOException {
// 'valid to' depends on future entries - need to compute!
final long validTo = computeValidToForNewDocument(gn, searcher);
return addNewDocument(gn, values, validTo);
}
private long addNewDocument(final GreycatNode gn, Map<String, Object> values, final long validTo) throws IOException {
final String uuid = UUID.randomUUID().toString();
final Document newDocument = new Document();
addRawField(newDocument, NODEID_FIELD, gn.getId());
addRawField(newDocument, DOCTYPE_FIELD, NODE_DOCTYPE);
addRawField(newDocument, INDEX_FIELD, name);
addRawField(newDocument, UUID_FIELD, uuid);
// 'valid from' is easy - from now onwards
addRawField(newDocument, VALIDFROM_FIELD, gn.getTime());
addRawField(newDocument, VALIDTO_FIELD, validTo);
addAttributes(newDocument, values);
lucene.update(new Term(UUID_FIELD, uuid), null, newDocument);
return validTo;
}
private long extendCurrentDocument(final GreycatNode gn, Map<String, Object> values,
final IndexSearcher searcher, final TopDocs results) throws IOException {
final Document oldDocument = searcher.doc(results.scoreDocs[0].doc);
final Document updatedDocument = new Document();
addRawField(updatedDocument, NODEID_FIELD, gn.getId());
addRawField(updatedDocument, DOCTYPE_FIELD, NODE_DOCTYPE);
addRawField(updatedDocument, INDEX_FIELD, name);
addRawField(updatedDocument, VALIDFROM_FIELD, gn.getTime());
for (IndexableField oldField : oldDocument.getFields()) {
final String rawOldFieldName = oldField.name();
if (rawOldFieldName.startsWith(ATTRIBUTE_PREFIX)
|| rawOldFieldName.equals(UUID_FIELD)
|| rawOldFieldName.equals(VALIDTO_FIELD)) {
copyField(oldField, updatedDocument);
}
}
addAttributes(updatedDocument, values);
replaceDocumentAtTimepoint(gn, oldDocument, updatedDocument);
final long validTo = updatedDocument.getField(VALIDTO_FIELD).numericValue().longValue();
return validTo;
}
private long computeValidToForNewDocument(final GreycatNode gn, final IndexSearcher searcher) throws IOException {
final Query afterStartQuery = getIndexQueryBuilder()
.add(findNodeQuery(gn), Occur.FILTER)
.add(LongPoint.newRangeQuery(VALIDFROM_FIELD, gn.getTime() + 1, Long.MAX_VALUE), Occur.FILTER)
.build();
final ListCollector lc = new ListCollector(searcher);
searcher.search(afterStartQuery, lc);
Long minFrom = null;
for (Document dAfterStart : lc.getDocuments()) {
final long from = dAfterStart.getField(VALIDFROM_FIELD).numericValue().longValue();
if (minFrom == null) {
minFrom = from;
} else {
minFrom = Math.min(from, minFrom);
}
}
return minFrom == null ? Long.MAX_VALUE : minFrom - 1;
}
@Override
public GreycatLuceneNodeIndex travelInTime(long timepoint) {
return new GreycatLuceneNodeIndex(name, timepoint);
}
private long getTimepoint() {
if (timepoint == null) {
return database.getTime();
} else {
return timepoint;
}
}
protected Query findNodeQuery(final GreycatNode n) {
return LongPoint.newExactQuery(NODEID_FIELD, n.getId());
}
protected BooleanQuery.Builder getIndexQueryBuilder() {
return new BooleanQuery.Builder()
.add(new TermQuery(new Term(INDEX_FIELD, name)), Occur.FILTER)
.add(new TermQuery(new Term(DOCTYPE_FIELD, NODE_DOCTYPE)), Occur.FILTER);
}
@Override
public void annotate(IGraphNode n, String name) {
try {
final GreycatNode gn = (GreycatNode)n;
addNewDocument(gn, Collections.singletonMap(name, true), gn.getTime());
} catch (Exception ex) {
LOGGER.error(ex.getMessage(), ex);
}
}
}
private final AbstractGreycatDatabase database;
private final Cache<String, GreycatLuceneNodeIndex> nodeIndexCache =
CacheBuilder.newBuilder().maximumSize(100).build();
private final SoftTxLucene lucene;
public GreycatLuceneIndexer(AbstractGreycatDatabase db, File dir) throws IOException {
this.database = db;
this.lucene = new SoftTxLucene(dir);
}
public GreycatLuceneNodeIndex getIndex(String name) throws Exception {
return nodeIndexCache.get(name, () -> {
try (SearcherCloseable sc = lucene.getSearcher()) {
final IndexSearcher searcher = sc.get();
final Query query = new BooleanQuery.Builder()
.add(new TermQuery(new Term(INDEX_FIELD, name)), Occur.FILTER)
.add(new TermQuery(new Term(DOCTYPE_FIELD, INDEX_DOCTYPE)), Occur.FILTER).build();
final TotalHitCountCollector thc = new TotalHitCountCollector();
searcher.search(query, thc);
if (thc.getTotalHits() == 0) {
Document doc = new Document();
doc.add(new StringField(INDEX_FIELD, name, Store.YES));
doc.add(new StringField(DOCTYPE_FIELD, INDEX_DOCTYPE, Store.YES));
lucene.update(new Term(INDEX_FIELD, name), null, doc);
}
return new GreycatLuceneNodeIndex(name);
}
});
}
public Set<String> getIndexNames() {
try (SearcherCloseable sc = lucene.getSearcher()) {
final IndexSearcher searcher = sc.get();
final ListCollector lc = new ListCollector(searcher);
searcher.search(new TermQuery(new Term(DOCTYPE_FIELD, INDEX_DOCTYPE)), lc);
final Set<String> names = new HashSet<>();
for (Document doc : lc.getDocuments()) {
names.add(doc.getField(INDEX_FIELD).stringValue());
}
return names;
} catch (IOException e) {
LOGGER.error("Could not list index name", e.getMessage());
return Collections.emptySet();
}
}
public boolean indexExists(String name) {
try (SearcherCloseable sc = lucene.getSearcher()) {
final IndexSearcher searcher = sc.get();
Query query = new BooleanQuery.Builder()
.add(new TermQuery(new Term(DOCTYPE_FIELD, INDEX_DOCTYPE)), Occur.FILTER)
.add(new TermQuery(new Term(INDEX_FIELD, name)), Occur.FILTER)
.build();
final TotalHitCountCollector collector = new TotalHitCountCollector();
searcher.search(query, collector);
return collector.getTotalHits() > 0;
} catch (IOException e) {
LOGGER.error(String.format("Could not check if %s exists", name), e);
return false;
}
}
/**
* Removes this node from all indices, from the timepoint of the node onwards.
*/
public void remove(GreycatNode gn) {
try {
// To be removed - all documents on the node valid after this timepoint
final Query queryToDelete = new BooleanQuery.Builder()
.add(LongPoint.newExactQuery(NODEID_FIELD, gn.getId()), Occur.FILTER)
.add(LongPoint.newRangeQuery(VALIDFROM_FIELD, gn.getTime() + 1, Long.MAX_VALUE), Occur.FILTER)
.build();
lucene.delete(queryToDelete);
// To be updated - all documents valid up to now
final Query queryToRevise = findValidNodeDocuments(gn);
invalidateAtTimepoint(gn, queryToRevise);
} catch (IOException e) {
LOGGER.error(String.format(
"Could not remove node %s in world %d from time %d onwards",
gn.getId(), gn.getWorld(), gn.getTime()
), e);
}
}
/**
* Commits all changes to the index. This is a soft-commit: real Lucene
* commits are only done periodically in the background, when the rollback
* log is empty.
*
* @throws IOException
* Failed to commit the changes.
*/
public void commit() throws IOException {
lucene.commit();
}
/**
* Rolls back all changes to the index. This is a soft-rollback: changes that
* have not been committed yet are undone in memory.
*
* @throws IOException
* Failed to roll back the changes.
*/
public void rollback() throws IOException {
lucene.rollback();
}
/**
* Commits all pending changes and shuts down Lucene.
*/
public void shutdown() {
lucene.shutdown();
}
protected static void replaceRawField(Document document, final String fieldName, final Object value) {
document.removeFields(fieldName);
addRawField(document, fieldName, value);
}
protected static void addAttributes(final Document updated, Map<String, Object> values) {
for (Entry<String, Object> entry : values.entrySet()) {
final String attributeFieldName = ATTRIBUTE_PREFIX + entry.getKey();
addRawField(updated, attributeFieldName, entry.getValue());
}
}
protected static void addRawField(Document document, final String fieldName, final Object value) {
/*
* Point classes are very useful for fast range queries, but they do not store
* the value in the document. We need to add a StoredField so we can use the
* full version of remove (key, value and node).
*
* TODO: do we get these back after a soft rollback? We need tests for this.
*/
// add check to avoid having the same field multiple times
IndexableField[] existing = document.getFields(fieldName);
for (IndexableField f : existing) {
if (f.numericValue() == null) {
if (f.stringValue().equals(value)) {
return;
}
} else if (f.numericValue().equals(value)) {
// nothing to do - same string present!
return;
}
}
if (value instanceof Float || value instanceof Double) {
final double doubleValue = ((Number)value).doubleValue();
document.add(new DoublePoint(fieldName, doubleValue));
document.add(new StoredField(fieldName, doubleValue));
} else if (value instanceof Number) {
final long longValue = ((Number)value).longValue();
if (document.getFields(fieldName).length == 0) {
// Can only have one docvalue per field!
document.add(new NumericDocValuesField(fieldName, longValue));
}
document.add(new LongPoint(fieldName, longValue));
document.add(new StoredField(fieldName, longValue));
} else {
document.add(new StringField(fieldName, value.toString(), Store.YES));
}
if (fieldName.startsWith(ATTRIBUTE_PREFIX)) {
document.add(new StringField(FIELDS_FIELD, fieldName.substring(ATTRIBUTE_PREFIX.length()), Store.YES));
}
}
/**
* Copies and recreates an entire document, including IntPoint and DoublePoint fields.
*/
protected static Document copy(Document doc) {
if (doc == null) {
return null;
}
final Document newDoc = new Document();
for (IndexableField f : doc.getFields()) {
copyField(f, newDoc);
}
return newDoc;
}
/**
* Copies an existing field into a document, as long as it is not the `meta`
* {@link #FIELDS_FIELD} that is used to indicate that an attribute has been
* set.
*/
protected static void copyField(IndexableField field, final Document copy) {
if (!FIELDS_FIELD.equals(field.name())) {
if (field.numericValue() instanceof Number) {
addRawField(copy, field.name(), field.numericValue());
} else {
addRawField(copy, field.name(), field.stringValue());
}
}
}
protected GreycatNode getNodeByDocumentAt(Document document, Long timepoint) {
final long id = document.getField(NODEID_FIELD).numericValue().longValue();
return database.getNodeByIdAt(id, timepoint);
}
protected Query findValidNodeDocuments(GreycatNode gn) {
return new BooleanQuery.Builder()
.add(LongPoint.newExactQuery(NODEID_FIELD, gn.getId()), Occur.FILTER)
.add(findValidDocumentsAtTimepoint(gn.getTime()), Occur.FILTER)
.build();
}
protected Query findValidDocumentsAtTimepoint(final long time) {
return new BooleanQuery.Builder()
.add(LongPoint.newRangeQuery(VALIDFROM_FIELD, Long.MIN_VALUE, time), Occur.FILTER)
.add(LongPoint.newRangeQuery(VALIDTO_FIELD, time, Long.MAX_VALUE), Occur.FILTER)
.build();
}
protected void invalidateAtTimepoint(GreycatNode gn, final Query queryToRevise) throws IOException {
try (SearcherCloseable sc = lucene.getSearcher()) {
final IndexSearcher searcher = sc.get();
final ListCollector lc = new ListCollector(searcher);
searcher.search(queryToRevise, lc);
for (Document doc : lc.getDocuments()) {
invalidateAtTimepoint(gn, doc);
}
}
}
protected void invalidateAtTimepoint(GreycatNode gn, Document doc) throws IOException {
final long lFrom = doc.getField(VALIDFROM_FIELD).numericValue().longValue();
if (lFrom == gn.getTime()) {
// Document was only valid at this very timepoint: simply delete
lucene.delete(new Term(UUID_FIELD, doc.get(UUID_FIELD)));
} else {
// Document was valid before this timepoint: shorten lifespan
Document revisedDoc = copy(doc);
replaceRawField(revisedDoc, VALIDTO_FIELD, gn.getTime() - 1);
lucene.update(new Term(UUID_FIELD, doc.get(UUID_FIELD)), doc, revisedDoc);
}
}
}