blob: 8a36d4ef57da774e24e487cea8450bad94af8d41 [file] [log] [blame]
/**
*
*/
package org.eclipse.smila.solr.update;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang.NullArgumentException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrInputDocument;
import org.eclipse.smila.blackboard.Blackboard;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.Record;
import org.eclipse.smila.datamodel.ipc.IpcAnyWriter;
import org.eclipse.smila.processing.Pipelet;
import org.eclipse.smila.processing.ProcessingException;
import org.eclipse.smila.processing.util.ProcessingConstants;
import org.eclipse.smila.processing.util.ResultCollector;
import org.eclipse.smila.search.api.QueryConstants;
import org.eclipse.smila.solr.SolrServerService;
import org.eclipse.smila.solr.SolrUtils;
import org.eclipse.smila.solr.params.UpdateParams;
import org.eclipse.smila.solr.params.UpdateParams.Operation;
import org.eclipse.smila.utils.service.ServiceUtils;
/**
* @author pwissel
*
*/
public class SolrUpdatePipelet implements Pipelet {
public static final String PROCESS_AS_BUNCH = "processAsBunch";
public static final boolean PROCESS_AS_BUNCH_DEFAULT = false;
private final Log _log = LogFactory.getLog(getClass());
private SolrServerService _servers;
private AnyMap _configuration;
/*
* (non-Javadoc)
*
* @see org.eclipse.smila.processing.Pipelet#configure(org.eclipse.smila.datamodel.AnyMap)
*/
@Override
public void configure(AnyMap configuration) throws ProcessingException {
_configuration = configuration;
}
/*
* (non-Javadoc)
*
* @see org.eclipse.smila.processing.Pipelet#process(org.eclipse.smila.blackboard.Blackboard, java.lang.String[])
*/
@Override
public String[] process(Blackboard blackboard, String[] recordIds) throws ProcessingException {
// prepare importing params
final UpdateParams params = new UpdateParams(blackboard, _configuration);
final boolean dropOnError =
params.getBooleanParameter(ProcessingConstants.KEY_DROP_ON_ERROR, ProcessingConstants.DROP_ON_ERROR_DEFAULT);
final ResultCollector results = new ResultCollector(params, _log, dropOnError);
final boolean processAsBunch = params.getBooleanParameter(PROCESS_AS_BUNCH, PROCESS_AS_BUNCH_DEFAULT);
if (processAsBunch) {
processAsBunch(blackboard, recordIds, params, results);
} else {
processAsRecord(blackboard, recordIds, params, results);
}
return results.getResultIds();
}
private void processAsRecord(final Blackboard blackboard, final String[] recordIds, final UpdateParams params,
final ResultCollector results) throws ProcessingException {
// process records
for (final String id : recordIds) {
params.setCurrentRecord(id);
try {
// get server
final String index = params.getIndexName();
if (index == null) {
throw new NullArgumentException(QueryConstants.INDEXNAME);
}
Record record = blackboard.getRecord(id);
final SolrServer server = getServer(index);
// process operation
final Operation operation = params.getOperation();
final int commitWithinMs = params.getCommitWithinMs(true);
UpdateResponse response = null;
String query = null;
switch (operation) {
case ADD:
final SolrInputDocument document = convertToSolrInputDocument(record, params);
response = server.add(document, commitWithinMs);
break;
case DELETE_BY_ID:
response = server.deleteById(id, commitWithinMs);
break;
case DELETE_BY_QUERY:
query = params.getDeleteQeuery();
if (query == null) {
throw new ProcessingException(
"Parameter '" + UpdateParams.DELETE_QUERY + "' is required when using operation DELETE_BY_QUERY");
}
response = server.deleteByQuery(query, commitWithinMs);
break;
default:
throw new NotImplementedException(operation.toString());
}
processResponse(response, index, operation.toString(), id, query);
results.addResult(id);
} catch (Exception exception) {
results.addFailedResult(id, exception);
}
}
}
private void processResponse(final UpdateResponse response, final String index, final String operation,
final String id, final String query) {
final int status = response.getStatus();
if (status == 0) {
if (_log.isInfoEnabled()) {
final long duration = response.getElapsedTime();
String message = null;
if (query != null) {
message = String.format(
"Process record on Solr server: _recordid: '%s'; index: '%s'; operation: %s; query: '%s'; duration: %sms.",
id, index, operation, query, duration);
} else {
message = String.format(
"Process record on Solr server: _recordid: '%s'; index: '%s'; operation: %s; duration: %sms.", id,
index, operation, duration);
}
_log.info(message);
}
} else {
if (_log.isErrorEnabled()) {
final String message =
String.format("Cannot process record on Solr server: _recordid: %s; index: %s; operation: %s; status:%s.",
id, index, operation, status);
_log.error(message);
}
}
}
@Deprecated
private void processAsBunch(final Blackboard blackboard, final String[] ids, final UpdateParams params,
final ResultCollector results) throws ProcessingException {
final Map<String, Map<Operation, Collection<String>>> serversAndOperations =
collectServersAndOperations(ids, params);
// walk thru servers
for (final String index : serversAndOperations.keySet()) {
final Map<Operation, Collection<String>> operations = serversAndOperations.get(index);
// walk thru operations
for (final Operation operation : operations.keySet()) {
final Collection<String> records = operations.get(operation);
int commitWithinMs = calculateLowestCommitWithinMs(records, params);
// process operation
UpdateResponse response;
try {
final SolrServer server = getServer(index);
switch (operation) {
case ADD:
// convert documents
final Collection<SolrInputDocument> documents = new ArrayList<SolrInputDocument>();
for (final String id : records) {
params.setCurrentRecord(id);
final Record record = blackboard.getRecord(id);
final SolrInputDocument document = convertToSolrInputDocument(record, params);
documents.add(document);
}
response = server.add(documents, commitWithinMs);
break;
case DELETE_BY_ID:
response = server.deleteById((List<String>) records, commitWithinMs);
break;
default:
throw new NotImplementedException(operation.toString());
}
// handle result
handleResponse(response);
for (String id : records) {
results.addResult(id);
}
} catch (Exception exception) {
// handle error
for (String id : records) {
results.addFailedResult(id, exception);
}
}
}
}
}
@Deprecated
private Map<String, Map<Operation, Collection<String>>> collectServersAndOperations(final String[] ids,
final UpdateParams params) {
final Map<String, Map<Operation, Collection<String>>> servers =
new HashMap<String, Map<Operation, Collection<String>>>();
for (final String id : ids) {
params.setCurrentRecord(id);
final String server = params.getIndexName();
Map<Operation, Collection<String>> operations = servers.get(server);
if (operations == null) {
operations = new HashMap<Operation, Collection<String>>();
servers.put(server, operations);
}
try {
final Operation operation = params.getOperation();
Collection<String> records = operations.get(operation);
if (records == null) {
records = new ArrayList<String>();
operations.put(operation, records);
}
records.add(id);
} catch (final IllegalArgumentException exception) {
if (_log.isWarnEnabled()) {
_log.warn("Unknown operation.", exception);
continue;
}
}
}
return servers;
}
@Deprecated
private int calculateLowestCommitWithinMs(final Collection<String> records, final UpdateParams params) {
int commitWithinMs = UpdateParams.COMMIT_WITHIN_MS_DEFAULT;
for (final String id : records) {
params.setCurrentRecord(id);
final Integer param = params.getCommitWithinMs(false);
if (param != null) {
commitWithinMs = commitWithinMs == UpdateParams.COMMIT_WITHIN_MS_DEFAULT || commitWithinMs > param ? param
: commitWithinMs;
}
}
return commitWithinMs;
}
private SolrInputDocument convertToSolrInputDocument(final Record record, final UpdateParams params)
throws UnsupportedEncodingException {
SolrInputDocument document;
AnyMap mapping;
// convert record
if ((mapping = params.getMapping(false)) != null) {
document = new SolrDocumentConverter().toSolrDocument(record, mapping);
} else {
final boolean attachments = params.getAttachments(true);
document = new SolrDocumentConverter().toSolrDocument(record, attachments);
}
return document;
}
@Deprecated
private void handleResponse(final UpdateResponse response, final String... id) throws ProcessingException {
if (SolrUtils.isErrorResponse(response)) {
final String message = String.format("Error returned by Solr (status=%s).", response.getStatus());
if (_log.isDebugEnabled()) {
final AnyMap error = SolrUtils.parseNamedList(response.getResponse());
try {
final String json = new IpcAnyWriter(true).writeJsonObject(error);
_log.debug(message + "\n" + json);
} catch (IOException exception) {
; // ignore
}
}
throw new ProcessingException(message);
}
if (_log.isInfoEnabled()) {
final String message = String.format("processed record with ids: [%s]", StringUtils.join(id, ", "));
_log.info(message);
}
}
private SolrServer getServer(final String name) throws SolrServerException, InterruptedException {
if (_servers == null) {
_servers = ServiceUtils.getService(SolrServerService.class);
}
return _servers.getServer(name);
}
}