| /** |
| * |
| */ |
| package org.eclipse.smila.solr.update; |
| |
| import java.io.IOException; |
| import java.io.UnsupportedEncodingException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.commons.lang.NotImplementedException; |
| import org.apache.commons.lang.NullArgumentException; |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.solr.client.solrj.SolrServer; |
| import org.apache.solr.client.solrj.SolrServerException; |
| import org.apache.solr.client.solrj.response.UpdateResponse; |
| import org.apache.solr.common.SolrInputDocument; |
| import org.eclipse.smila.blackboard.Blackboard; |
| import org.eclipse.smila.datamodel.AnyMap; |
| import org.eclipse.smila.datamodel.Record; |
| import org.eclipse.smila.datamodel.ipc.IpcAnyWriter; |
| import org.eclipse.smila.processing.Pipelet; |
| import org.eclipse.smila.processing.ProcessingException; |
| import org.eclipse.smila.processing.util.ProcessingConstants; |
| import org.eclipse.smila.processing.util.ResultCollector; |
| import org.eclipse.smila.search.api.QueryConstants; |
| import org.eclipse.smila.solr.SolrServerService; |
| import org.eclipse.smila.solr.SolrUtils; |
| import org.eclipse.smila.solr.params.UpdateParams; |
| import org.eclipse.smila.solr.params.UpdateParams.Operation; |
| import org.eclipse.smila.utils.service.ServiceUtils; |
| |
| /** |
| * @author pwissel |
| * |
| */ |
| public class SolrUpdatePipelet implements Pipelet { |
| |
| public static final String PROCESS_AS_BUNCH = "processAsBunch"; |
| |
| public static final boolean PROCESS_AS_BUNCH_DEFAULT = false; |
| |
| private final Log _log = LogFactory.getLog(getClass()); |
| |
| private SolrServerService _servers; |
| |
| private AnyMap _configuration; |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.eclipse.smila.processing.Pipelet#configure(org.eclipse.smila.datamodel.AnyMap) |
| */ |
| @Override |
| public void configure(AnyMap configuration) throws ProcessingException { |
| _configuration = configuration; |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.eclipse.smila.processing.Pipelet#process(org.eclipse.smila.blackboard.Blackboard, java.lang.String[]) |
| */ |
| @Override |
| public String[] process(Blackboard blackboard, String[] recordIds) throws ProcessingException { |
| // prepare importing params |
| final UpdateParams params = new UpdateParams(blackboard, _configuration); |
| final boolean dropOnError = |
| params.getBooleanParameter(ProcessingConstants.KEY_DROP_ON_ERROR, ProcessingConstants.DROP_ON_ERROR_DEFAULT); |
| final ResultCollector results = new ResultCollector(params, _log, dropOnError); |
| final boolean processAsBunch = params.getBooleanParameter(PROCESS_AS_BUNCH, PROCESS_AS_BUNCH_DEFAULT); |
| if (processAsBunch) { |
| processAsBunch(blackboard, recordIds, params, results); |
| } else { |
| processAsRecord(blackboard, recordIds, params, results); |
| } |
| return results.getResultIds(); |
| } |
| |
| private void processAsRecord(final Blackboard blackboard, final String[] recordIds, final UpdateParams params, |
| final ResultCollector results) throws ProcessingException { |
| // process records |
| for (final String id : recordIds) { |
| params.setCurrentRecord(id); |
| try { |
| // get server |
| final String index = params.getIndexName(); |
| if (index == null) { |
| throw new NullArgumentException(QueryConstants.INDEXNAME); |
| } |
| Record record = blackboard.getRecord(id); |
| final SolrServer server = getServer(index); |
| // process operation |
| final Operation operation = params.getOperation(); |
| final int commitWithinMs = params.getCommitWithinMs(true); |
| UpdateResponse response = null; |
| String query = null; |
| switch (operation) { |
| case ADD: |
| final SolrInputDocument document = convertToSolrInputDocument(record, params); |
| response = server.add(document, commitWithinMs); |
| break; |
| case DELETE_BY_ID: |
| response = server.deleteById(id, commitWithinMs); |
| break; |
| case DELETE_BY_QUERY: |
| query = params.getDeleteQeuery(); |
| if (query == null) { |
| throw new ProcessingException( |
| "Parameter '" + UpdateParams.DELETE_QUERY + "' is required when using operation DELETE_BY_QUERY"); |
| } |
| response = server.deleteByQuery(query, commitWithinMs); |
| break; |
| default: |
| throw new NotImplementedException(operation.toString()); |
| } |
| processResponse(response, index, operation.toString(), id, query); |
| results.addResult(id); |
| } catch (Exception exception) { |
| results.addFailedResult(id, exception); |
| } |
| } |
| } |
| |
| private void processResponse(final UpdateResponse response, final String index, final String operation, |
| final String id, final String query) { |
| final int status = response.getStatus(); |
| if (status == 0) { |
| if (_log.isInfoEnabled()) { |
| final long duration = response.getElapsedTime(); |
| String message = null; |
| if (query != null) { |
| message = String.format( |
| "Process record on Solr server: _recordid: '%s'; index: '%s'; operation: %s; query: '%s'; duration: %sms.", |
| id, index, operation, query, duration); |
| } else { |
| message = String.format( |
| "Process record on Solr server: _recordid: '%s'; index: '%s'; operation: %s; duration: %sms.", id, |
| index, operation, duration); |
| } |
| _log.info(message); |
| } |
| } else { |
| if (_log.isErrorEnabled()) { |
| final String message = |
| String.format("Cannot process record on Solr server: _recordid: %s; index: %s; operation: %s; status:%s.", |
| id, index, operation, status); |
| _log.error(message); |
| } |
| } |
| } |
| |
| @Deprecated |
| private void processAsBunch(final Blackboard blackboard, final String[] ids, final UpdateParams params, |
| final ResultCollector results) throws ProcessingException { |
| final Map<String, Map<Operation, Collection<String>>> serversAndOperations = |
| collectServersAndOperations(ids, params); |
| // walk thru servers |
| for (final String index : serversAndOperations.keySet()) { |
| final Map<Operation, Collection<String>> operations = serversAndOperations.get(index); |
| // walk thru operations |
| for (final Operation operation : operations.keySet()) { |
| final Collection<String> records = operations.get(operation); |
| int commitWithinMs = calculateLowestCommitWithinMs(records, params); |
| // process operation |
| UpdateResponse response; |
| try { |
| final SolrServer server = getServer(index); |
| switch (operation) { |
| case ADD: |
| // convert documents |
| final Collection<SolrInputDocument> documents = new ArrayList<SolrInputDocument>(); |
| for (final String id : records) { |
| params.setCurrentRecord(id); |
| final Record record = blackboard.getRecord(id); |
| final SolrInputDocument document = convertToSolrInputDocument(record, params); |
| documents.add(document); |
| } |
| response = server.add(documents, commitWithinMs); |
| break; |
| case DELETE_BY_ID: |
| response = server.deleteById((List<String>) records, commitWithinMs); |
| break; |
| default: |
| throw new NotImplementedException(operation.toString()); |
| } |
| // handle result |
| handleResponse(response); |
| for (String id : records) { |
| results.addResult(id); |
| } |
| } catch (Exception exception) { |
| // handle error |
| for (String id : records) { |
| results.addFailedResult(id, exception); |
| } |
| } |
| } |
| } |
| } |
| |
| @Deprecated |
| private Map<String, Map<Operation, Collection<String>>> collectServersAndOperations(final String[] ids, |
| final UpdateParams params) { |
| final Map<String, Map<Operation, Collection<String>>> servers = |
| new HashMap<String, Map<Operation, Collection<String>>>(); |
| for (final String id : ids) { |
| params.setCurrentRecord(id); |
| final String server = params.getIndexName(); |
| Map<Operation, Collection<String>> operations = servers.get(server); |
| if (operations == null) { |
| operations = new HashMap<Operation, Collection<String>>(); |
| servers.put(server, operations); |
| } |
| try { |
| final Operation operation = params.getOperation(); |
| Collection<String> records = operations.get(operation); |
| if (records == null) { |
| records = new ArrayList<String>(); |
| operations.put(operation, records); |
| } |
| records.add(id); |
| } catch (final IllegalArgumentException exception) { |
| if (_log.isWarnEnabled()) { |
| _log.warn("Unknown operation.", exception); |
| continue; |
| } |
| } |
| } |
| return servers; |
| } |
| |
| @Deprecated |
| private int calculateLowestCommitWithinMs(final Collection<String> records, final UpdateParams params) { |
| int commitWithinMs = UpdateParams.COMMIT_WITHIN_MS_DEFAULT; |
| for (final String id : records) { |
| params.setCurrentRecord(id); |
| final Integer param = params.getCommitWithinMs(false); |
| if (param != null) { |
| commitWithinMs = commitWithinMs == UpdateParams.COMMIT_WITHIN_MS_DEFAULT || commitWithinMs > param ? param |
| : commitWithinMs; |
| } |
| } |
| return commitWithinMs; |
| } |
| |
| private SolrInputDocument convertToSolrInputDocument(final Record record, final UpdateParams params) |
| throws UnsupportedEncodingException { |
| SolrInputDocument document; |
| AnyMap mapping; |
| // convert record |
| if ((mapping = params.getMapping(false)) != null) { |
| document = new SolrDocumentConverter().toSolrDocument(record, mapping); |
| } else { |
| final boolean attachments = params.getAttachments(true); |
| document = new SolrDocumentConverter().toSolrDocument(record, attachments); |
| } |
| return document; |
| } |
| |
| @Deprecated |
| private void handleResponse(final UpdateResponse response, final String... id) throws ProcessingException { |
| if (SolrUtils.isErrorResponse(response)) { |
| final String message = String.format("Error returned by Solr (status=%s).", response.getStatus()); |
| if (_log.isDebugEnabled()) { |
| final AnyMap error = SolrUtils.parseNamedList(response.getResponse()); |
| try { |
| final String json = new IpcAnyWriter(true).writeJsonObject(error); |
| _log.debug(message + "\n" + json); |
| } catch (IOException exception) { |
| ; // ignore |
| } |
| } |
| throw new ProcessingException(message); |
| } |
| if (_log.isInfoEnabled()) { |
| final String message = String.format("processed record with ids: [%s]", StringUtils.join(id, ", ")); |
| _log.info(message); |
| } |
| } |
| |
| private SolrServer getServer(final String name) throws SolrServerException, InterruptedException { |
| if (_servers == null) { |
| _servers = ServiceUtils.getService(SolrServerService.class); |
| } |
| return _servers.getServer(name); |
| } |
| |
| } |