blob: 70130bdc5037e37f0ef25e44f1eeb8c101156662 [file] [log] [blame]
/***********************************************************************************************************************
* Copyright (c) 2008 empolis GmbH and brox IT Solutions GmbH. All rights reserved. This program and the accompanying
* materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Peter Wissel (brox IT Solutions GmbH) - initial API and implementation
**********************************************************************************************************************/
package org.eclipse.smila.solr.index;
import static java.lang.String.format;
import static org.apache.commons.lang.StringUtils.defaultString;
import static org.apache.commons.lang.StringUtils.isBlank;
import static org.apache.commons.lang.StringUtils.isNotBlank;
import static org.apache.commons.lang.StringUtils.trimToNull;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrInputDocument;
import org.eclipse.smila.blackboard.Blackboard;
import org.eclipse.smila.blackboard.BlackboardAccessException;
import org.eclipse.smila.datamodel.Any;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.AnySeq;
import org.eclipse.smila.datamodel.Record;
import org.eclipse.smila.datamodel.xml.XmlSerializationUtils;
import org.eclipse.smila.processing.Pipelet;
import org.eclipse.smila.processing.ProcessingException;
import org.eclipse.smila.processing.parameters.MissingParameterException;
import org.eclipse.smila.solr.Activator;
import org.eclipse.smila.solr.SolrConstants;
import org.eclipse.smila.solr.SolrConstants.AttributeOrAttachment;
import org.eclipse.smila.solr.SolrConstants.ExecutionMode;
import org.eclipse.smila.solr.SolrManager;
import org.eclipse.smila.solr.util.SolrQueryUtils;
/**
* SolrIndexPipelet class.
*
* @author pwissel
*
*/
public class SolrIndexPipelet implements Pipelet {
/**
* The configuration error text.
*/
private static final String CONF_ERROR = "Invalid Pipelet configuration -> ";
/**
* The configuration parameter missing error text.
*/
private static final String CONF_ERROR_PARAMETER_MISSING = CONF_ERROR + "Parameter missing: ";
/**
* The log.
*/
private final Log _log = LogFactory.getLog(SolrIndexPipelet.class);
/**
* The configuration map.
*/
private AnyMap _configuration;
/**
* The core fields sequence.
*/
private AnySeq _coreFieldsSeq;
/**
* The execution mode.
*/
private ExecutionMode _executionMode;
/**
* The solr server instance that corresponds to {@link #_defaultCoreName}, attempted to be assigned on each execution
* of the pipelet.
*/
private SolrServer _defaultTargetCore;
/**
* Optional, if given then it must be alive, otherwise a warning is logged each pipelet execution and processing will
* continue. It is not an error because it still might be overridden by a dynamic core name.
*/
private String _defaultCoreName;
/**
* {@inheritDoc}
*
* @see org.eclipse.smila.processing.IPipelet#configure(org.eclipse.smila.processing.configuration.PipeletConfiguration)
*/
@Override
public void configure(AnyMap configuration) throws ProcessingException {
if (_log.isDebugEnabled()) {
_log.debug("Configure SolrIndexPipelet");
}
_configuration = configuration;
try {
if (StringUtils.isBlank(configuration.getStringValue(SolrConstants.EXECUTION_MODE))) {
throw new MissingParameterException(CONF_ERROR_PARAMETER_MISSING + SolrConstants.EXECUTION_MODE);
}
_executionMode = ExecutionMode.valueOf(_configuration.getStringValue(SolrConstants.EXECUTION_MODE));
switch (_executionMode) {
case DELETE:
break;
case ADD:
case UPDATE:
// nothing much happens here yet, see the TODOs in addRecords()
_coreFieldsSeq = _configuration.getSeq(SolrConstants.CORE_FIELDS);
for (Any coreFieldMap : _coreFieldsSeq) {
if (!coreFieldMap.isMap()) {
throw new ProcessingException("all items must be of type AnyMap in pipelet config: "
+ SolrConstants.CORE_FIELDS);
}
}
break;
default:
throw new NotImplementedException("_executionMode: " + _executionMode);
}
// this does nothing except to issue a one time warning about the missing config.
_defaultCoreName = trimToNull(_configuration.getStringValue(SolrConstants.CORE_NAME));
if (isBlank(_defaultCoreName)) {
_log.warn("there is no default core configured in the pipelet config! "
+ "Target core must be set now on each record dynamically via: " + SolrConstants.DYNAMIC_TARGET_CORE);
}
} catch (Exception exception) {
throw new ProcessingException("Error while configure SolrIndexPipelet", exception);
}
}
/**
* {@inheritDoc}
*
* @see org.eclipse.smila.processing.Pipelet#process(org.eclipse.smila.blackboard.Blackboard,
* org.eclipse.smila.datamodel.id.String[])
*/
@Override
public String[] process(Blackboard blackboard, String[] recordIds) throws ProcessingException {
if (recordIds != null) {
if (_defaultCoreName != null) {
try {
_defaultTargetCore = getSolrManager().getSolrServer(_defaultCoreName);
} catch (Exception e) {
_log
.warn(format(
"default core not reachable: %s. Records must have a dynamic core specified or their processing will fail. Exception: %s",
_defaultCoreName, e.getMessage()));
}
}
switch (_executionMode) {
case ADD:
addRecords(blackboard, recordIds);
break;
case DELETE:
deleteRecords(blackboard, recordIds);
break;
default:
throw new NotImplementedException("executionMode: " + _executionMode);
}
}
return recordIds;
}
/**
* Add to solr.
*
* @param blackboard
* the blackboard.
* @param recordIds
* the record ids.
* @param server
* @throws ProcessingException
* ProcessingException.
*/
private void addRecords(Blackboard blackboard, String[] recordIds) throws ProcessingException {
// since the target core can be set dynamically we must collect all docs per core and this is what this map does
final Map<SolrServer, Collection<SolrInputDocument>> coreToDocsMap =
new HashMap<SolrServer, Collection<SolrInputDocument>>();
final int recordCount = recordIds.length;
for (String id : recordIds) {
traceRecord(blackboard, id);
try {
final AnyMap metadata = blackboard.getMetadata(id);
final String dynamicCoreValue = metadata.getStringValue(SolrConstants.DYNAMIC_TARGET_CORE);
final Collection<SolrInputDocument> solrDocs =
getInputDocumentsForCore(coreToDocsMap, recordCount, dynamicCoreValue);
final SolrInputDocument doc = createInputDocument(blackboard, id, metadata);
solrDocs.add(doc);
if (_log.isInfoEnabled()) {
_log.info(format("record added to unsent in-memory document collection. id: %s core: %s", id,
defaultString(dynamicCoreValue, _defaultCoreName)));
if (_log.isTraceEnabled()) {
_log.trace("solr document: " + doc.toString());
}
}
} catch (Exception e) {
final String msg = "Error while adding record with id: " + id;
throw new ProcessingException(msg, e);
}
}
try {
for (Entry<SolrServer, Collection<SolrInputDocument>> entry : coreToDocsMap.entrySet()) {
final SolrServer server = entry.getKey();
final Collection<SolrInputDocument> docs = entry.getValue();
addDocumentsToServer(server, docs);
}
} catch (Exception e) {
throw new ProcessingException("Error while adding document collection to solr server.", e);
}
}
/**
* get or create the collection of {@link SolrInputDocument}s to add the record to.
*
* @param dynamicCoreValue
* may be null
*/
private Collection<SolrInputDocument> getInputDocumentsForCore(
final Map<SolrServer, Collection<SolrInputDocument>> coreToDocsMap, int recordCount, String dynamicCoreValue)
throws SolrServerException, IOException, ProcessingException {
final SolrServer targetCore = getTargetCore(dynamicCoreValue);
Collection<SolrInputDocument> solrDocs = coreToDocsMap.get(targetCore);
if (solrDocs == null) {
solrDocs = new ArrayList<SolrInputDocument>(recordCount);
coreToDocsMap.put(targetCore, solrDocs);
}
return solrDocs;
}
/**
* get the target Solr core for the given record.
*
* @param dynamicCoreValue
* TODO
*/
private SolrServer getTargetCore(String dynamicCoreValue) throws SolrServerException, IOException,
ProcessingException {
if (isNotBlank(dynamicCoreValue)) {
return getSolrManager().getSolrServer(dynamicCoreValue);
} else {
if (_defaultTargetCore == null) {
throw new ProcessingException(
"no dynamic core in record given while default core is null. check if the pipelet defines a valid default core or that the record carries the dynamicCore attribute");
}
return _defaultTargetCore;
}
}
/** create a {@link SolrInputDocument} from the given record using the specified mapping. */
private SolrInputDocument createInputDocument(Blackboard blackboard, String id, final AnyMap metadata)
throws ProcessingException, BlackboardAccessException, UnsupportedEncodingException {
final SolrInputDocument doc = new SolrInputDocument();
// add id field
doc.addField(SolrConstants.CORE_FIELD_ID, id);
// set optional boost factor
final Double boostFactor = metadata.getDoubleValue(SolrConstants.DOC_BOOST);
if (boostFactor != null) {
doc.setDocumentBoost(boostFactor.floatValue());
}
/*
* PERF: move these checks into config() | TM @ Jun 7, 2011
*
* PERF: transform the map structure into a faster structure and do conversion and setting of default values there |
* TM @ Jun 7, 2011
*
* BETTER: check that no target core field is mapped twice (from diff. sourc fields), as it will be overwritten.
* since there are rare use cases for this (e.g. processing ensures that just on of the source field is actually
* set), we just should issue a warning here) | TM @ Jun 7, 2011
*/
for (Any coreFieldMap : _coreFieldsSeq) {
final AnyMap fieldMap = coreFieldMap.asMap(); // configure validates this in advance
final String fieldName = getFieldName(fieldMap);
final String recSourceName = getRecordSourceName(fieldMap, fieldName);
final AttributeOrAttachment recSourceType = getRecordSourceType(fieldMap, fieldName);
/* TODO | TM | add boost on field level | TM @ May 20, 2011 */
switch (recSourceType) {
case ATTRIBUTE:
createFieldFromAttribute(doc, fieldName, metadata, recSourceName);
break;
case ATTACHMENT:
createFieldFromAttachment(doc, fieldName, blackboard, id, recSourceName);
break;
default:
throw new NotImplementedException("recSourceType: " + recSourceType);
}
}
return doc;
}
/** get the field name from the record-document-mapping. */
private String getFieldName(final AnyMap fieldMap) throws MissingParameterException {
final String fieldName = fieldMap.getStringValue(SolrConstants.CORE_FIELD_NAME);
if (StringUtils.isBlank(fieldName)) {
throw new MissingParameterException(CONF_ERROR_PARAMETER_MISSING + SolrConstants.CORE_FIELD_NAME);
}
return fieldName;
}
/** get the attribute or attachment name from the record-document-mapping. */
private String getRecordSourceName(final AnyMap fieldMap, final String fieldName) {
String recSourceName = fieldMap.getStringValue(SolrConstants.SOURCE_NAME);
if (StringUtils.isBlank(recSourceName)) {
recSourceName = fieldName;
if (_log.isTraceEnabled()) {
_log.trace(format("core field mapping %s: no %s config'ed. Defaulting to field name", fieldName,
SolrConstants.SOURCE_NAME));
}
}
return recSourceName;
}
/** get the record source type from record-document-mapping. */
private AttributeOrAttachment getRecordSourceType(final AnyMap fieldMap, final String fieldName) {
final String recSourceTypeString = fieldMap.getStringValue(SolrConstants.SOURCE_TYPE);
final AttributeOrAttachment recSourceType;
if (isBlank(recSourceTypeString)) {
recSourceType = AttributeOrAttachment.ATTRIBUTE;
if (_log.isTraceEnabled()) {
_log.trace(format("core field mapping %s: no %s config'ed. Defaulting to %s", fieldName,
SolrConstants.SOURCE_TYPE, recSourceType));
}
} else {
recSourceType = AttributeOrAttachment.valueOf(recSourceTypeString.toUpperCase());
}
return recSourceType;
}
/** create a Solr field from a record attribute value. */
private void createFieldFromAttribute(final SolrInputDocument doc, final String fieldName, final AnyMap metadata,
String recordAttribute) throws ProcessingException {
final Any any = metadata.get(recordAttribute);
if (any != null) {
if (any.isValue()) {
doc.addField(fieldName, any.asValue().getObject());
} else if (any.isSeq()) {
final AnySeq asSeq = any.asSeq();
for (Any seqValues : asSeq) {
doc.addField(fieldName, seqValues.asValue().getObject());
}
} else {
throw new ProcessingException(
"value type for indexing in solr not supported. Must be one of Value, Seq but is: " + any.getValueType());
}
} else {
if (_log.isTraceEnabled()) {// this case is quite common and hence only logged @ trace
_log.trace("Record doesn't contain an attribute named: " + recordAttribute);
}
}
}
/** create a Solr field from a record attachment. */
private void createFieldFromAttachment(final SolrInputDocument doc, final String fieldName,
Blackboard blackboard, String id, String attachmentName) throws BlackboardAccessException,
UnsupportedEncodingException {
final byte[] value = blackboard.getAttachmentAsBytes(id, attachmentName);
if (value != null) {
final String string = new String(value, "UTF-8");
doc.addField(fieldName, string);
} else {
// this case is quite common and hence only logged @ trace
if (_log.isTraceEnabled() && !blackboard.hasAttachment(id, attachmentName)) {
_log.trace("Record doesn't have an attachment named: " + attachmentName);
}
}
}
/** send the created {@link SolrInputDocument}s to a server. */
private void addDocumentsToServer(final SolrServer server, final Collection<SolrInputDocument> docs)
throws SolrServerException, IOException, ProcessingException {
final UpdateResponse solrResponse = server.add(docs);
if (SolrQueryUtils.responseStatusIsError(solrResponse)) {
throw new ProcessingException("Error reported by solr reponse with status: " + solrResponse.getStatus());
}
// TODO: What is best way to commit?
// _defaultTargetCore.commit(); // use autoCommit in solrconfig.xml
if (_log.isDebugEnabled()) {
_log.info(MessageFormat.format("document collection was added to solr in {1} ms, doc count = {0} ", //
docs.size(), solrResponse.getElapsedTime()));
}
}
/** Log serialized record on trace level. */
private void traceRecord(Blackboard blackboard, String id) {
if (_log.isTraceEnabled()) {
try {
final Record record = blackboard.getRecord(id);
final String serialize2string = XmlSerializationUtils.serialize2string(record).replace('\n', ' ');
_log.trace("processing record: " + serialize2string);
} catch (BlackboardAccessException e) {
_log.trace("error on serializing the record for logging: ", e);
}
}
}
/** Delete from solr. */
private void deleteRecords(Blackboard blackboard, String[] recordIds) throws ProcessingException {
for (String id : recordIds) {
try {
final AnyMap metadata = blackboard.getMetadata(id);
final String indexName =
StringUtils.defaultIfEmpty(metadata.getStringValue(SolrConstants.DYNAMIC_TARGET_CORE), _defaultCoreName);
final SolrServer server = getSolrManager().getSolrServer(indexName);
/*
* NOTE: solr offers to delete a set of ids. but we deliberately don't do this for logging purposes | tmenzel @
* May 20, 2011
*/
final UpdateResponse deleteResponse = server.deleteById(id);
if (SolrQueryUtils.responseStatusIsError(deleteResponse)) {
throw new ProcessingException("Error reported by solr reponse while delete record with id: " + id);
}
if (_log.isDebugEnabled()) {
final String msg =
MessageFormat.format("Record deleted: Id: {0} Index: {1} time: {2}ms.", id,
deleteResponse.getRequestUrl(), deleteResponse.getElapsedTime());
_log.debug(msg);
}
} catch (Exception e) {
throw new ProcessingException("Error while delete record with id: " + id, e);
}
}
}
/**
* Get SolrManager.
*
* @return the SolrManager.
*/
public SolrManager getSolrManager() {
/*
* WORKAROUND: must be here instead of ctor/configure() due to init problems. suspicion: pipelet tracker calling
* configure() before the activator is called, despite it's lazy setting | TM @ Jun 7, 2011
*/
return Activator.getInstance().getSolrManager();
}
}