/*********************************************************************************************************************** | |
* Copyright (c) 2008 empolis GmbH and brox IT Solutions GmbH. All rights reserved. This program and the accompanying | |
* materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this distribution, | |
* and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: Peter Wissel (brox IT Solutions GmbH) - initial API and implementation | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.solr.index; | |
import static java.lang.String.format; | |
import static org.apache.commons.lang.StringUtils.defaultString; | |
import static org.apache.commons.lang.StringUtils.isBlank; | |
import static org.apache.commons.lang.StringUtils.isNotBlank; | |
import static org.apache.commons.lang.StringUtils.trimToNull; | |
import java.io.IOException; | |
import java.io.UnsupportedEncodingException; | |
import java.text.MessageFormat; | |
import java.util.ArrayList; | |
import java.util.Collection; | |
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.Map.Entry; | |
import org.apache.commons.lang.NotImplementedException; | |
import org.apache.commons.lang.StringUtils; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.apache.solr.client.solrj.SolrServer; | |
import org.apache.solr.client.solrj.SolrServerException; | |
import org.apache.solr.client.solrj.response.UpdateResponse; | |
import org.apache.solr.common.SolrInputDocument; | |
import org.eclipse.smila.blackboard.Blackboard; | |
import org.eclipse.smila.blackboard.BlackboardAccessException; | |
import org.eclipse.smila.datamodel.Any; | |
import org.eclipse.smila.datamodel.AnyMap; | |
import org.eclipse.smila.datamodel.AnySeq; | |
import org.eclipse.smila.datamodel.Record; | |
import org.eclipse.smila.datamodel.xml.XmlSerializationUtils; | |
import org.eclipse.smila.processing.Pipelet; | |
import org.eclipse.smila.processing.ProcessingException; | |
import org.eclipse.smila.processing.parameters.MissingParameterException; | |
import org.eclipse.smila.solr.Activator; | |
import org.eclipse.smila.solr.SolrConstants; | |
import org.eclipse.smila.solr.SolrConstants.AttributeOrAttachment; | |
import org.eclipse.smila.solr.SolrConstants.ExecutionMode; | |
import org.eclipse.smila.solr.SolrManager; | |
import org.eclipse.smila.solr.util.SolrQueryUtils; | |
/** | |
* SolrIndexPipelet class. | |
* | |
* @author pwissel | |
* | |
*/ | |
public class SolrIndexPipelet implements Pipelet { | |
/** | |
* The configuration error text. | |
*/ | |
private static final String CONF_ERROR = "Invalid Pipelet configuration -> "; | |
/** | |
* The configuration parameter missing error text. | |
*/ | |
private static final String CONF_ERROR_PARAMETER_MISSING = CONF_ERROR + "Parameter missing: "; | |
/** | |
* The log. | |
*/ | |
private final Log _log = LogFactory.getLog(SolrIndexPipelet.class); | |
/** | |
* The configuration map. | |
*/ | |
private AnyMap _configuration; | |
/** | |
* The core fields sequence. | |
*/ | |
private AnySeq _coreFieldsSeq; | |
/** | |
* The execution mode. | |
*/ | |
private ExecutionMode _executionMode; | |
/** | |
* The solr server instance that corresponds to {@link #_defaultCoreName}, attempted to be assigned on each execution | |
* of the pipelet. | |
*/ | |
private SolrServer _defaultTargetCore; | |
/** | |
* Optional, if given then it must be alive, otherwise a warning is logged each pipelet execution and processing will | |
* continue. It is not an error because it still might be overridden by a dynamic core name. | |
*/ | |
private String _defaultCoreName; | |
/** | |
* {@inheritDoc} | |
* | |
* @see org.eclipse.smila.processing.IPipelet#configure(org.eclipse.smila.processing.configuration.PipeletConfiguration) | |
*/ | |
@Override | |
public void configure(AnyMap configuration) throws ProcessingException { | |
if (_log.isDebugEnabled()) { | |
_log.debug("Configure SolrIndexPipelet"); | |
} | |
_configuration = configuration; | |
try { | |
if (StringUtils.isBlank(configuration.getStringValue(SolrConstants.EXECUTION_MODE))) { | |
throw new MissingParameterException(CONF_ERROR_PARAMETER_MISSING + SolrConstants.EXECUTION_MODE); | |
} | |
_executionMode = ExecutionMode.valueOf(_configuration.getStringValue(SolrConstants.EXECUTION_MODE)); | |
switch (_executionMode) { | |
case DELETE: | |
break; | |
case ADD: | |
case UPDATE: | |
// nothing much happens here yet, see the TODOs in addRecords() | |
_coreFieldsSeq = _configuration.getSeq(SolrConstants.CORE_FIELDS); | |
for (Any coreFieldMap : _coreFieldsSeq) { | |
if (!coreFieldMap.isMap()) { | |
throw new ProcessingException("all items must be of type AnyMap in pipelet config: " | |
+ SolrConstants.CORE_FIELDS); | |
} | |
} | |
break; | |
default: | |
throw new NotImplementedException("_executionMode: " + _executionMode); | |
} | |
// this does nothing except to issue a one time warning about the missing config. | |
_defaultCoreName = trimToNull(_configuration.getStringValue(SolrConstants.CORE_NAME)); | |
if (isBlank(_defaultCoreName)) { | |
_log.warn("there is no default core configured in the pipelet config! " | |
+ "Target core must be set now on each record dynamically via: " + SolrConstants.DYNAMIC_TARGET_CORE); | |
} | |
} catch (Exception exception) { | |
throw new ProcessingException("Error while configure SolrIndexPipelet", exception); | |
} | |
} | |
/** | |
* {@inheritDoc} | |
* | |
* @see org.eclipse.smila.processing.Pipelet#process(org.eclipse.smila.blackboard.Blackboard, | |
* org.eclipse.smila.datamodel.id.String[]) | |
*/ | |
@Override | |
public String[] process(Blackboard blackboard, String[] recordIds) throws ProcessingException { | |
if (recordIds != null) { | |
if (_defaultCoreName != null) { | |
try { | |
_defaultTargetCore = getSolrManager().getSolrServer(_defaultCoreName); | |
} catch (Exception e) { | |
_log | |
.warn(format( | |
"default core not reachable: %s. Records must have a dynamic core specified or their processing will fail. Exception: %s", | |
_defaultCoreName, e.getMessage())); | |
} | |
} | |
switch (_executionMode) { | |
case ADD: | |
addRecords(blackboard, recordIds); | |
break; | |
case DELETE: | |
deleteRecords(blackboard, recordIds); | |
break; | |
default: | |
throw new NotImplementedException("executionMode: " + _executionMode); | |
} | |
} | |
return recordIds; | |
} | |
/** | |
* Add to solr. | |
* | |
* @param blackboard | |
* the blackboard. | |
* @param recordIds | |
* the record ids. | |
* @param server | |
* @throws ProcessingException | |
* ProcessingException. | |
*/ | |
private void addRecords(Blackboard blackboard, String[] recordIds) throws ProcessingException { | |
// since the target core can be set dynamically we must collect all docs per core and this is what this map does | |
final Map<SolrServer, Collection<SolrInputDocument>> coreToDocsMap = | |
new HashMap<SolrServer, Collection<SolrInputDocument>>(); | |
final int recordCount = recordIds.length; | |
for (String id : recordIds) { | |
traceRecord(blackboard, id); | |
try { | |
final AnyMap metadata = blackboard.getMetadata(id); | |
final String dynamicCoreValue = metadata.getStringValue(SolrConstants.DYNAMIC_TARGET_CORE); | |
final Collection<SolrInputDocument> solrDocs = | |
getInputDocumentsForCore(coreToDocsMap, recordCount, dynamicCoreValue); | |
final SolrInputDocument doc = createInputDocument(blackboard, id, metadata); | |
solrDocs.add(doc); | |
if (_log.isInfoEnabled()) { | |
_log.info(format("record added to unsent in-memory document collection. id: %s core: %s", id, | |
defaultString(dynamicCoreValue, _defaultCoreName))); | |
if (_log.isTraceEnabled()) { | |
_log.trace("solr document: " + doc.toString()); | |
} | |
} | |
} catch (Exception e) { | |
final String msg = "Error while adding record with id: " + id; | |
throw new ProcessingException(msg, e); | |
} | |
} | |
try { | |
for (Entry<SolrServer, Collection<SolrInputDocument>> entry : coreToDocsMap.entrySet()) { | |
final SolrServer server = entry.getKey(); | |
final Collection<SolrInputDocument> docs = entry.getValue(); | |
addDocumentsToServer(server, docs); | |
} | |
} catch (Exception e) { | |
throw new ProcessingException("Error while adding document collection to solr server.", e); | |
} | |
} | |
/** | |
* get or create the collection of {@link SolrInputDocument}s to add the record to. | |
* | |
* @param dynamicCoreValue | |
* may be null | |
*/ | |
private Collection<SolrInputDocument> getInputDocumentsForCore( | |
final Map<SolrServer, Collection<SolrInputDocument>> coreToDocsMap, int recordCount, String dynamicCoreValue) | |
throws SolrServerException, IOException, ProcessingException { | |
final SolrServer targetCore = getTargetCore(dynamicCoreValue); | |
Collection<SolrInputDocument> solrDocs = coreToDocsMap.get(targetCore); | |
if (solrDocs == null) { | |
solrDocs = new ArrayList<SolrInputDocument>(recordCount); | |
coreToDocsMap.put(targetCore, solrDocs); | |
} | |
return solrDocs; | |
} | |
/** | |
* get the target Solr core for the given record. | |
* | |
* @param dynamicCoreValue | |
* TODO | |
*/ | |
private SolrServer getTargetCore(String dynamicCoreValue) throws SolrServerException, IOException, | |
ProcessingException { | |
if (isNotBlank(dynamicCoreValue)) { | |
return getSolrManager().getSolrServer(dynamicCoreValue); | |
} else { | |
if (_defaultTargetCore == null) { | |
throw new ProcessingException( | |
"no dynamic core in record given while default core is null. check if the pipelet defines a valid default core or that the record carries the dynamicCore attribute"); | |
} | |
return _defaultTargetCore; | |
} | |
} | |
/** create a {@link SolrInputDocument} from the given record using the specified mapping. */ | |
private SolrInputDocument createInputDocument(Blackboard blackboard, String id, final AnyMap metadata) | |
throws ProcessingException, BlackboardAccessException, UnsupportedEncodingException { | |
final SolrInputDocument doc = new SolrInputDocument(); | |
// add id field | |
doc.addField(SolrConstants.CORE_FIELD_ID, id); | |
// set optional boost factor | |
final Double boostFactor = metadata.getDoubleValue(SolrConstants.DOC_BOOST); | |
if (boostFactor != null) { | |
doc.setDocumentBoost(boostFactor.floatValue()); | |
} | |
/* | |
* PERF: move these checks into config() | TM @ Jun 7, 2011 | |
* | |
* PERF: transform the map structure into a faster structure and do conversion and setting of default values there | | |
* TM @ Jun 7, 2011 | |
* | |
* BETTER: check that no target core field is mapped twice (from diff. sourc fields), as it will be overwritten. | |
* since there are rare use cases for this (e.g. processing ensures that just on of the source field is actually | |
* set), we just should issue a warning here) | TM @ Jun 7, 2011 | |
*/ | |
for (Any coreFieldMap : _coreFieldsSeq) { | |
final AnyMap fieldMap = coreFieldMap.asMap(); // configure validates this in advance | |
final String fieldName = getFieldName(fieldMap); | |
final String recSourceName = getRecordSourceName(fieldMap, fieldName); | |
final AttributeOrAttachment recSourceType = getRecordSourceType(fieldMap, fieldName); | |
/* TODO | TM | add boost on field level | TM @ May 20, 2011 */ | |
switch (recSourceType) { | |
case ATTRIBUTE: | |
createFieldFromAttribute(doc, fieldName, metadata, recSourceName); | |
break; | |
case ATTACHMENT: | |
createFieldFromAttachment(doc, fieldName, blackboard, id, recSourceName); | |
break; | |
default: | |
throw new NotImplementedException("recSourceType: " + recSourceType); | |
} | |
} | |
return doc; | |
} | |
/** get the field name from the record-document-mapping. */ | |
private String getFieldName(final AnyMap fieldMap) throws MissingParameterException { | |
final String fieldName = fieldMap.getStringValue(SolrConstants.CORE_FIELD_NAME); | |
if (StringUtils.isBlank(fieldName)) { | |
throw new MissingParameterException(CONF_ERROR_PARAMETER_MISSING + SolrConstants.CORE_FIELD_NAME); | |
} | |
return fieldName; | |
} | |
/** get the attribute or attachment name from the record-document-mapping. */ | |
private String getRecordSourceName(final AnyMap fieldMap, final String fieldName) { | |
String recSourceName = fieldMap.getStringValue(SolrConstants.SOURCE_NAME); | |
if (StringUtils.isBlank(recSourceName)) { | |
recSourceName = fieldName; | |
if (_log.isTraceEnabled()) { | |
_log.trace(format("core field mapping %s: no %s config'ed. Defaulting to field name", fieldName, | |
SolrConstants.SOURCE_NAME)); | |
} | |
} | |
return recSourceName; | |
} | |
/** get the record source type from record-document-mapping. */ | |
private AttributeOrAttachment getRecordSourceType(final AnyMap fieldMap, final String fieldName) { | |
final String recSourceTypeString = fieldMap.getStringValue(SolrConstants.SOURCE_TYPE); | |
final AttributeOrAttachment recSourceType; | |
if (isBlank(recSourceTypeString)) { | |
recSourceType = AttributeOrAttachment.ATTRIBUTE; | |
if (_log.isTraceEnabled()) { | |
_log.trace(format("core field mapping %s: no %s config'ed. Defaulting to %s", fieldName, | |
SolrConstants.SOURCE_TYPE, recSourceType)); | |
} | |
} else { | |
recSourceType = AttributeOrAttachment.valueOf(recSourceTypeString.toUpperCase()); | |
} | |
return recSourceType; | |
} | |
/** create a Solr field from a record attribute value. */ | |
private void createFieldFromAttribute(final SolrInputDocument doc, final String fieldName, final AnyMap metadata, | |
String recordAttribute) throws ProcessingException { | |
final Any any = metadata.get(recordAttribute); | |
if (any != null) { | |
if (any.isValue()) { | |
doc.addField(fieldName, any.asValue().getObject()); | |
} else if (any.isSeq()) { | |
final AnySeq asSeq = any.asSeq(); | |
for (Any seqValues : asSeq) { | |
doc.addField(fieldName, seqValues.asValue().getObject()); | |
} | |
} else { | |
throw new ProcessingException( | |
"value type for indexing in solr not supported. Must be one of Value, Seq but is: " + any.getValueType()); | |
} | |
} else { | |
if (_log.isTraceEnabled()) {// this case is quite common and hence only logged @ trace | |
_log.trace("Record doesn't contain an attribute named: " + recordAttribute); | |
} | |
} | |
} | |
/** create a Solr field from a record attachment. */ | |
private void createFieldFromAttachment(final SolrInputDocument doc, final String fieldName, | |
Blackboard blackboard, String id, String attachmentName) throws BlackboardAccessException, | |
UnsupportedEncodingException { | |
final byte[] value = blackboard.getAttachmentAsBytes(id, attachmentName); | |
if (value != null) { | |
final String string = new String(value, "UTF-8"); | |
doc.addField(fieldName, string); | |
} else { | |
// this case is quite common and hence only logged @ trace | |
if (_log.isTraceEnabled() && !blackboard.hasAttachment(id, attachmentName)) { | |
_log.trace("Record doesn't have an attachment named: " + attachmentName); | |
} | |
} | |
} | |
/** send the created {@link SolrInputDocument}s to a server. */ | |
private void addDocumentsToServer(final SolrServer server, final Collection<SolrInputDocument> docs) | |
throws SolrServerException, IOException, ProcessingException { | |
final UpdateResponse solrResponse = server.add(docs); | |
if (SolrQueryUtils.responseStatusIsError(solrResponse)) { | |
throw new ProcessingException("Error reported by solr reponse with status: " + solrResponse.getStatus()); | |
} | |
// TODO: What is best way to commit? | |
// _defaultTargetCore.commit(); // use autoCommit in solrconfig.xml | |
if (_log.isDebugEnabled()) { | |
_log.info(MessageFormat.format("document collection was added to solr in {1} ms, doc count = {0} ", // | |
docs.size(), solrResponse.getElapsedTime())); | |
} | |
} | |
/** Log serialized record on trace level. */ | |
private void traceRecord(Blackboard blackboard, String id) { | |
if (_log.isTraceEnabled()) { | |
try { | |
final Record record = blackboard.getRecord(id); | |
final String serialize2string = XmlSerializationUtils.serialize2string(record).replace('\n', ' '); | |
_log.trace("processing record: " + serialize2string); | |
} catch (BlackboardAccessException e) { | |
_log.trace("error on serializing the record for logging: ", e); | |
} | |
} | |
} | |
/** Delete from solr. */ | |
private void deleteRecords(Blackboard blackboard, String[] recordIds) throws ProcessingException { | |
for (String id : recordIds) { | |
try { | |
final AnyMap metadata = blackboard.getMetadata(id); | |
final String indexName = | |
StringUtils.defaultIfEmpty(metadata.getStringValue(SolrConstants.DYNAMIC_TARGET_CORE), _defaultCoreName); | |
final SolrServer server = getSolrManager().getSolrServer(indexName); | |
/* | |
* NOTE: solr offers to delete a set of ids. but we deliberately don't do this for logging purposes | tmenzel @ | |
* May 20, 2011 | |
*/ | |
final UpdateResponse deleteResponse = server.deleteById(id); | |
if (SolrQueryUtils.responseStatusIsError(deleteResponse)) { | |
throw new ProcessingException("Error reported by solr reponse while delete record with id: " + id); | |
} | |
if (_log.isDebugEnabled()) { | |
final String msg = | |
MessageFormat.format("Record deleted: Id: {0} Index: {1} time: {2}ms.", id, | |
deleteResponse.getRequestUrl(), deleteResponse.getElapsedTime()); | |
_log.debug(msg); | |
} | |
} catch (Exception e) { | |
throw new ProcessingException("Error while delete record with id: " + id, e); | |
} | |
} | |
} | |
/** | |
* Get SolrManager. | |
* | |
* @return the SolrManager. | |
*/ | |
public SolrManager getSolrManager() { | |
/* | |
* WORKAROUND: must be here instead of ctor/configure() due to init problems. suspicion: pipelet tracker calling | |
* configure() before the activator is called, despite it's lazy setting | TM @ Jun 7, 2011 | |
*/ | |
return Activator.getInstance().getSolrManager(); | |
} | |
} |