blob: 32ce31bca9085221357cfdfe6e902f174a0d7cec [file] [log] [blame]
package org.apache.solr.handler.loader;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.JsonRecordReader;
import org.noggit.JSONParser;
import org.noggit.ObjectBuilder;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.handler.RequestHandlerUtils;
import org.apache.solr.handler.UpdateRequestHandler;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.RollbackUpdateCommand;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @since solr 4.0
*/
public class JsonLoader extends ContentStreamLoader {
final static Logger log = LoggerFactory.getLogger( JsonLoader.class );
private static final String CHILD_DOC_KEY = "_childDocuments_";
@Override
public String getDefaultWT() {
return "json";
}
@Override
public void load(SolrQueryRequest req, SolrQueryResponse rsp,
ContentStream stream, UpdateRequestProcessor processor) throws Exception {
new SingleThreadedJsonLoader(req,rsp,processor).load(req, rsp, stream, processor);
}
static class SingleThreadedJsonLoader extends ContentStreamLoader {
protected final UpdateRequestProcessor processor;
protected final SolrQueryRequest req;
protected SolrQueryResponse rsp;
protected JSONParser parser;
protected final int commitWithin;
protected final boolean overwrite;
public SingleThreadedJsonLoader(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor processor) {
this.processor = processor;
this.req = req;
this.rsp = rsp;
commitWithin = req.getParams().getInt(UpdateParams.COMMIT_WITHIN, -1);
overwrite = req.getParams().getBool(UpdateParams.OVERWRITE, true);
}
@Override
public void load(SolrQueryRequest req,
SolrQueryResponse rsp,
ContentStream stream,
UpdateRequestProcessor processor) throws Exception {
Reader reader = null;
try {
reader = stream.getReader();
if (log.isTraceEnabled()) {
String body = IOUtils.toString(reader);
log.trace("body", body);
reader = new StringReader(body);
}
parser = new JSONParser(reader);
this.processUpdate();
}
finally {
IOUtils.closeQuietly(reader);
}
}
@SuppressWarnings("fallthrough")
void processUpdate() throws IOException
{
if("false".equals( req.getParams().get("json.command"))){
String split = req.getParams().get("split");
if(split != null){
handleSplitMode(split);
} else {
handleStreamingSingleDocs();
}
return;
}
int ev = parser.nextEvent();
while( ev != JSONParser.EOF ) {
switch( ev )
{
case JSONParser.ARRAY_START:
handleAdds();
break;
case JSONParser.STRING:
if( parser.wasKey() ) {
String v = parser.getString();
if( v.equals( UpdateRequestHandler.ADD ) ) {
int ev2 = parser.nextEvent();
if (ev2 == JSONParser.OBJECT_START) {
processor.processAdd( parseAdd() );
} else if (ev2 == JSONParser.ARRAY_START) {
handleAdds();
} else {
assertEvent(ev2, JSONParser.OBJECT_START);
}
}
else if( v.equals( UpdateRequestHandler.COMMIT ) ) {
CommitUpdateCommand cmd = new CommitUpdateCommand(req, false );
cmd.waitSearcher = true;
parseCommitOptions( cmd );
processor.processCommit( cmd );
}
else if( v.equals( UpdateRequestHandler.OPTIMIZE ) ) {
CommitUpdateCommand cmd = new CommitUpdateCommand(req, true );
cmd.waitSearcher = true;
parseCommitOptions( cmd );
processor.processCommit( cmd );
}
else if( v.equals( UpdateRequestHandler.DELETE ) ) {
handleDeleteCommand();
}
else if( v.equals( UpdateRequestHandler.ROLLBACK ) ) {
processor.processRollback( parseRollback() );
}
else {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown command: "+v+" ["+parser.getPosition()+"]" );
}
break;
}
// fall through
case JSONParser.LONG:
case JSONParser.NUMBER:
case JSONParser.BIGNUMBER:
case JSONParser.BOOLEAN:
case JSONParser.NULL:
log.info( "can't have a value here! "
+JSONParser.getEventString(ev)+" "+parser.getPosition() );
case JSONParser.OBJECT_START:
case JSONParser.OBJECT_END:
case JSONParser.ARRAY_END:
break;
default:
log.info("Noggit UNKNOWN_EVENT_ID:"+ev);
break;
}
// read the next event
ev = parser.nextEvent();
}
}
private void handleSplitMode(String split) throws IOException {
String[] fields = req.getParams().getParams("f");
req.getCore().getLatestSchema().getDefaultSearchFieldName();
final boolean echo = "true".equals( req.getParams().get("echo"));
JsonRecordReader jsonRecordReader = JsonRecordReader.getInst(split, Arrays.asList(fields));
jsonRecordReader.streamRecords(parser,new JsonRecordReader.Handler() {
ArrayList docs =null;
@Override
public void handle(Map<String, Object> record, String path) {
if(echo){
if(docs ==null) {
docs = new ArrayList();
rsp.add("docs",docs);
}
docs.add(record);
} else {
AddUpdateCommand cmd = new AddUpdateCommand(req);
cmd.commitWithin = commitWithin;
cmd.overwrite = overwrite;
cmd.solrDoc = new SolrInputDocument();
for (Map.Entry<String, Object> entry : record.entrySet()) {
cmd.solrDoc.setField(entry.getKey(),entry.getValue());
}
try {
processor.processAdd(cmd);
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "error inserting doc",e);
}
}
}
});
}
private void handleStreamingSingleDocs() throws IOException
{
while( true ) {
int ev = parser.nextEvent();
if(ev == JSONParser.EOF) return;
if(ev == JSONParser.OBJECT_START) {
assertEvent(ev, JSONParser.OBJECT_START);
AddUpdateCommand cmd = new AddUpdateCommand(req);
cmd.commitWithin = commitWithin;
cmd.overwrite = overwrite;
cmd.solrDoc = parseDoc(ev);
processor.processAdd(cmd);
} else if(ev == JSONParser.ARRAY_START){
handleAdds();
} else{
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unexpected event :"+ev);
}
}
}
//
// "delete":"id"
// "delete":["id1","id2"]
// "delete":{"id":"foo"}
// "delete":{"query":"myquery"}
//
void handleDeleteCommand() throws IOException {
int ev = parser.nextEvent();
switch (ev) {
case JSONParser.ARRAY_START:
handleDeleteArray(ev);
break;
case JSONParser.OBJECT_START:
handleDeleteMap(ev);
break;
default:
handleSingleDelete(ev);
}
}
// returns the string value for a primitive value, or null for the null value
String getString(int ev) throws IOException {
switch (ev) {
case JSONParser.STRING:
return parser.getString();
case JSONParser.BIGNUMBER:
case JSONParser.NUMBER:
case JSONParser.LONG:
return parser.getNumberChars().toString();
case JSONParser.BOOLEAN:
return Boolean.toString(parser.getBoolean());
case JSONParser.NULL:
return null;
default:
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Expected primitive JSON value but got: "+JSONParser.getEventString( ev )
+" at ["+parser.getPosition()+"]" );
}
}
void handleSingleDelete(int ev) throws IOException {
if (ev == JSONParser.OBJECT_START) {
handleDeleteMap(ev);
} else {
DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
cmd.commitWithin = commitWithin;
String id = getString(ev);
cmd.setId(id);
processor.processDelete(cmd);
}
}
void handleDeleteArray(int ev) throws IOException {
assert ev == JSONParser.ARRAY_START;
for (;;) {
ev = parser.nextEvent();
if (ev == JSONParser.ARRAY_END) return;
handleSingleDelete(ev);
}
}
void handleDeleteMap(int ev) throws IOException {
assert ev == JSONParser.OBJECT_START;
DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
cmd.commitWithin = commitWithin;
while( true ) {
ev = parser.nextEvent();
if( ev == JSONParser.STRING ) {
String key = parser.getString();
if( parser.wasKey() ) {
if( "id".equals( key ) ) {
cmd.setId(getString(parser.nextEvent()));
} else if( "query".equals(key) ) {
cmd.setQuery(parser.getString());
} else if( "commitWithin".equals(key) ) {
cmd.commitWithin = (int)parser.getLong();
} else if( "_version_".equals(key) ) {
cmd.setVersion(parser.getLong());
} else {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown key: "+key+" ["+parser.getPosition()+"]" );
}
}
else {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"invalid string: " + key
+" at ["+parser.getPosition()+"]" );
}
}
else if( ev == JSONParser.OBJECT_END ) {
if( cmd.getId() == null && cmd.getQuery() == null ) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Missing id or query for delete ["+parser.getPosition()+"]" );
}
processor.processDelete(cmd);
return;
}
else {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Got: "+JSONParser.getEventString( ev )
+" at ["+parser.getPosition()+"]" );
}
}
}
RollbackUpdateCommand parseRollback() throws IOException {
assertNextEvent( JSONParser.OBJECT_START );
assertNextEvent( JSONParser.OBJECT_END );
return new RollbackUpdateCommand(req);
}
void parseCommitOptions(CommitUpdateCommand cmd ) throws IOException
{
assertNextEvent( JSONParser.OBJECT_START );
final Map<String,Object> map = (Map)ObjectBuilder.getVal(parser);
// SolrParams currently expects string values...
SolrParams p = new SolrParams() {
@Override
public String get(String param) {
Object o = map.get(param);
return o == null ? null : o.toString();
}
@Override
public String[] getParams(String param) {
return new String[]{get(param)};
}
@Override
public Iterator<String> getParameterNamesIterator() {
return map.keySet().iterator();
}
};
RequestHandlerUtils.validateCommitParams(p);
p = SolrParams.wrapDefaults(p, req.getParams()); // default to the normal request params for commit options
RequestHandlerUtils.updateCommit(cmd, p);
}
AddUpdateCommand parseAdd() throws IOException
{
AddUpdateCommand cmd = new AddUpdateCommand(req);
cmd.commitWithin = commitWithin;
cmd.overwrite = overwrite;
float boost = 1.0f;
while( true ) {
int ev = parser.nextEvent();
if( ev == JSONParser.STRING ) {
if( parser.wasKey() ) {
String key = parser.getString();
if( "doc".equals( key ) ) {
if( cmd.solrDoc != null ) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "multiple docs in same add command" );
}
ev = assertNextEvent( JSONParser.OBJECT_START );
cmd.solrDoc = parseDoc( ev );
}
else if( UpdateRequestHandler.OVERWRITE.equals( key ) ) {
cmd.overwrite = parser.getBoolean(); // reads next boolean
}
else if( UpdateRequestHandler.COMMIT_WITHIN.equals( key ) ) {
cmd.commitWithin = (int)parser.getLong();
}
else if( "boost".equals( key ) ) {
boost = Float.parseFloat( parser.getNumberChars().toString() );
}
else {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown key: "+key+" ["+parser.getPosition()+"]" );
}
}
else {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Should be a key "
+" at ["+parser.getPosition()+"]" );
}
}
else if( ev == JSONParser.OBJECT_END ) {
if( cmd.solrDoc == null ) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,"missing solr document. "+parser.getPosition() );
}
cmd.solrDoc.setDocumentBoost( boost );
return cmd;
}
else {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Got: "+JSONParser.getEventString( ev )
+" at ["+parser.getPosition()+"]" );
}
}
}
void handleAdds() throws IOException
{
while( true ) {
AddUpdateCommand cmd = new AddUpdateCommand(req);
cmd.commitWithin = commitWithin;
cmd.overwrite = overwrite;
int ev = parser.nextEvent();
if (ev == JSONParser.ARRAY_END) break;
assertEvent(ev, JSONParser.OBJECT_START);
cmd.solrDoc = parseDoc(ev);
processor.processAdd(cmd);
}
}
int assertNextEvent(int expected ) throws IOException
{
int got = parser.nextEvent();
assertEvent(got, expected);
return got;
}
void assertEvent(int ev, int expected) {
if( ev != expected ) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Expected: "+JSONParser.getEventString( expected )
+" but got "+JSONParser.getEventString( ev )
+" at ["+parser.getPosition()+"]" );
}
}
private SolrInputDocument parseDoc(int ev) throws IOException {
assert ev == JSONParser.OBJECT_START;
SolrInputDocument sdoc = new SolrInputDocument();
for (;;) {
ev = parser.nextEvent();
if (ev == JSONParser.OBJECT_END) {
return sdoc;
}
String fieldName = parser.getString();
if(fieldName.equals(JsonLoader.CHILD_DOC_KEY)) {
ev = parser.nextEvent();
assertEvent(ev, JSONParser.ARRAY_START);
while( (ev = parser.nextEvent()) != JSONParser.ARRAY_END ) {
assertEvent(ev, JSONParser.OBJECT_START);
sdoc.addChildDocument(parseDoc(ev));
}
} else {
SolrInputField sif = new SolrInputField(fieldName);
parseFieldValue(sif);
// pulling out the pieces may seem weird, but it's because
// SolrInputDocument.addField will do the right thing
// if the doc already has another value for this field
// (ie: repeating fieldname keys)
sdoc.addField(sif.getName(), sif.getValue(), sif.getBoost());
}
}
}
private void parseFieldValue(SolrInputField sif) throws IOException {
int ev = parser.nextEvent();
if (ev == JSONParser.OBJECT_START) {
parseExtendedFieldValue(sif, ev);
} else {
Object val = parseNormalFieldValue(ev);
sif.setValue(val, 1.0f);
}
}
private void parseExtendedFieldValue(SolrInputField sif, int ev) throws IOException {
assert ev == JSONParser.OBJECT_START;
float boost = 1.0f;
Object normalFieldValue = null;
Map<String, Object> extendedInfo = null;
for (;;) {
ev = parser.nextEvent();
switch (ev) {
case JSONParser.STRING:
String label = parser.getString();
if ("boost".equals(label)) {
ev = parser.nextEvent();
if( ev != JSONParser.NUMBER &&
ev != JSONParser.LONG &&
ev != JSONParser.BIGNUMBER ) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "boost should have number! "+JSONParser.getEventString(ev) );
}
boost = (float)parser.getDouble();
} else if ("value".equals(label)) {
normalFieldValue = parseNormalFieldValue(parser.nextEvent());
} else {
// If we encounter other unknown map keys, then use a map
if (extendedInfo == null) {
extendedInfo = new HashMap<>(2);
}
// for now, the only extended info will be field values
// we could either store this as an Object or a SolrInputField
Object val = parseNormalFieldValue(parser.nextEvent());
extendedInfo.put(label, val);
}
break;
case JSONParser.OBJECT_END:
if (extendedInfo != null) {
if (normalFieldValue != null) {
extendedInfo.put("value",normalFieldValue);
}
sif.setValue(extendedInfo, boost);
} else {
sif.setValue(normalFieldValue, boost);
}
return;
default:
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Error parsing JSON extended field value. Unexpected "+JSONParser.getEventString(ev) );
}
}
}
private Object parseNormalFieldValue(int ev) throws IOException {
if (ev == JSONParser.ARRAY_START) {
List<Object> val = parseArrayFieldValue(ev);
return val;
} else {
Object val = parseSingleFieldValue(ev);
return val;
}
}
private Object parseSingleFieldValue(int ev) throws IOException {
switch (ev) {
case JSONParser.STRING:
return parser.getString();
case JSONParser.LONG:
return parser.getLong();
case JSONParser.NUMBER:
return parser.getDouble();
case JSONParser.BIGNUMBER:
return parser.getNumberChars().toString();
case JSONParser.BOOLEAN:
return parser.getBoolean();
case JSONParser.NULL:
parser.getNull();
return null;
case JSONParser.ARRAY_START:
return parseArrayFieldValue(ev);
default:
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Error parsing JSON field value. Unexpected "+JSONParser.getEventString(ev) );
}
}
private List<Object> parseArrayFieldValue(int ev) throws IOException {
assert ev == JSONParser.ARRAY_START;
ArrayList lst = new ArrayList(2);
for (;;) {
ev = parser.nextEvent();
if (ev == JSONParser.ARRAY_END) {
return lst;
}
Object val = parseSingleFieldValue(ev);
lst.add(val);
}
}
}
}