blob: 743d97161ceed34ae36689f24f0fc5b6e92a0963 [file] [log] [blame]
package org.apache.solr.update;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.io.InputStream;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.http.HttpResponse;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.BinaryResponseParser;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.Diagnostics;
import org.apache.solr.update.processor.DistributedUpdateProcessor.RequestReplicationTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SolrCmdDistributor {
private static final int MAX_RETRIES_ON_FORWARD = 25;
public static Logger log = LoggerFactory.getLogger(SolrCmdDistributor.class);
private StreamingSolrServers servers;
private int retryPause = 500;
private int maxRetriesOnForward = MAX_RETRIES_ON_FORWARD;
private final List<Error> allErrors = new ArrayList<>();
private final List<Error> errors = Collections.synchronizedList(new ArrayList<Error>());
private final ExecutorService updateExecutor;
private final CompletionService<Object> completionService;
private final Set<Future<Object>> pending = new HashSet<>();
public static interface AbortCheck {
public boolean abortCheck();
}
public SolrCmdDistributor(UpdateShardHandler updateShardHandler) {
this.servers = new StreamingSolrServers(updateShardHandler);
this.updateExecutor = updateShardHandler.getUpdateExecutor();
this.completionService = new ExecutorCompletionService<>(updateExecutor);
}
public SolrCmdDistributor(StreamingSolrServers servers, int maxRetriesOnForward, int retryPause) {
this.servers = servers;
this.maxRetriesOnForward = maxRetriesOnForward;
this.retryPause = retryPause;
this.updateExecutor = servers.getUpdateExecutor();
completionService = new ExecutorCompletionService<>(updateExecutor);
}
public void finish() {
try {
blockAndDoRetries();
} finally {
servers.shutdown();
}
}
private void doRetriesIfNeeded() {
// NOTE: retries will be forwards to a single url
List<Error> errors = new ArrayList<>(this.errors);
errors.addAll(servers.getErrors());
List<Error> resubmitList = new ArrayList<>();
for (Error err : errors) {
try {
String oldNodeUrl = err.req.node.getUrl();
// if there is a retry url, we want to retry...
boolean isRetry = err.req.node.checkRetry();
boolean doRetry = false;
int rspCode = err.statusCode;
if (testing_errorHook != null) Diagnostics.call(testing_errorHook,
err.e);
// this can happen in certain situations such as shutdown
if (isRetry) {
if (rspCode == 404 || rspCode == 403 || rspCode == 503) {
doRetry = true;
}
// if its a connect exception, lets try again
if (err.e instanceof SolrServerException) {
if (((SolrServerException) err.e).getRootCause() instanceof ConnectException) {
doRetry = true;
}
}
if (err.e instanceof ConnectException) {
doRetry = true;
}
if (err.req.retries < maxRetriesOnForward && doRetry) {
err.req.retries++;
SolrException.log(SolrCmdDistributor.log, "forwarding update to "
+ oldNodeUrl + " failed - retrying ... retries: "
+ err.req.retries + " " + err.req.cmdString + " params:"
+ err.req.uReq.getParams() + " rsp:" + rspCode, err.e);
try {
Thread.sleep(retryPause);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn(null, e);
}
resubmitList.add(err);
} else {
allErrors.add(err);
}
} else {
allErrors.add(err);
}
} catch (Exception e) {
// continue on
log.error("Unexpected Error while doing request retries", e);
}
}
servers.clearErrors();
this.errors.clear();
for (Error err : resubmitList) {
submit(err.req, false);
}
if (resubmitList.size() > 0) {
blockAndDoRetries();
}
}
public void distribDelete(DeleteUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params) throws IOException {
distribDelete(cmd, nodes, params, false);
}
public void distribDelete(DeleteUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params, boolean sync) throws IOException {
for (Node node : nodes) {
UpdateRequest uReq = new UpdateRequest();
uReq.setParams(params);
if (cmd.isDeleteById()) {
uReq.deleteById(cmd.getId(), cmd.getVersion());
} else {
uReq.deleteByQuery(cmd.query);
}
submit(new Req(cmd.toString(), node, uReq, sync), false);
}
}
public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params) throws IOException {
distribAdd(cmd, nodes, params, false, null);
}
public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params, boolean synchronous) throws IOException {
distribAdd(cmd, nodes, params, synchronous, null);
}
public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params, boolean synchronous, RequestReplicationTracker rrt) throws IOException {
for (Node node : nodes) {
UpdateRequest uReq = new UpdateRequest();
uReq.setParams(params);
uReq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
submit(new Req(cmd.toString(), node, uReq, synchronous, rrt), false);
}
}
public void distribCommit(CommitUpdateCommand cmd, List<Node> nodes,
ModifiableSolrParams params) throws IOException {
// we need to do any retries before commit...
blockAndDoRetries();
UpdateRequest uReq = new UpdateRequest();
uReq.setParams(params);
addCommit(uReq, cmd);
log.debug("Distrib commit to: {} params: {}", nodes, params);
for (Node node : nodes) {
submit(new Req(cmd.toString(), node, uReq, false), true);
}
}
private void blockAndDoRetries() {
servers.blockUntilFinished();
// wait for any async commits to complete
while (pending != null && pending.size() > 0) {
Future<Object> future = null;
try {
future = completionService.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("blockAndDoRetries interrupted", e);
}
if (future == null) break;
pending.remove(future);
}
doRetriesIfNeeded();
}
void addCommit(UpdateRequest ureq, CommitUpdateCommand cmd) {
if (cmd == null) return;
ureq.setAction(cmd.optimize ? AbstractUpdateRequest.ACTION.OPTIMIZE
: AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher, cmd.maxOptimizeSegments, cmd.softCommit, cmd.expungeDeletes, cmd.openSearcher);
}
private void submit(final Req req, boolean isCommit) {
if (req.synchronous) {
blockAndDoRetries();
HttpSolrServer server = new HttpSolrServer(req.node.getUrl(),
servers.getHttpClient());
try {
server.request(req.uReq);
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Failed synchronous update on shard " + req.node + " update: " + req.uReq , e);
} finally {
server.shutdown();
}
return;
}
if (log.isDebugEnabled()) {
log.debug("sending update to "
+ req.node.getUrl() + " retry:"
+ req.retries + " " + req.cmdString + " params:" + req.uReq.getParams());
}
if (isCommit) {
// a commit using ConncurrentUpdateSolrServer is not async,
// so we make it async to prevent commits from happening
// serially across multiple nodes
pending.add(completionService.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
doRequest(req);
return null;
}
}));
} else {
doRequest(req);
}
}
private void doRequest(final Req req) {
try {
SolrServer solrServer = servers.getSolrServer(req);
solrServer.request(req.uReq);
} catch (Exception e) {
SolrException.log(log, e);
Error error = new Error();
error.e = e;
error.req = req;
if (e instanceof SolrException) {
error.statusCode = ((SolrException) e).code();
}
errors.add(error);
}
}
public static class Req {
public Node node;
public UpdateRequest uReq;
public int retries;
public boolean synchronous;
public String cmdString;
public RequestReplicationTracker rfTracker;
public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous) {
this(cmdString, node, uReq, synchronous, null);
}
public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous, RequestReplicationTracker rfTracker) {
this.node = node;
this.uReq = uReq;
this.synchronous = synchronous;
this.cmdString = cmdString;
this.rfTracker = rfTracker;
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("SolrCmdDistributor$Req: cmd=").append(String.valueOf(cmdString));
sb.append("; node=").append(String.valueOf(node));
return sb.toString();
}
public void trackRequestResult(HttpResponse resp, boolean success) {
if (rfTracker != null) {
Integer rf = null;
if (resp != null) {
// need to parse out the rf from requests that were forwards to another leader
InputStream inputStream = null;
try {
inputStream = resp.getEntity().getContent();
BinaryResponseParser brp = new BinaryResponseParser();
NamedList<Object> nl= brp.processResponse(inputStream, null);
Object hdr = nl.get("responseHeader");
if (hdr != null && hdr instanceof NamedList) {
NamedList<Object> hdrList = (NamedList<Object>)hdr;
Object rfObj = hdrList.get(UpdateRequest.REPFACT);
if (rfObj != null && rfObj instanceof Integer) {
rf = (Integer)rfObj;
}
}
} catch (Exception e) {
log.warn("Failed to parse response from "+node+" during replication factor accounting due to: "+e);
} finally {
if (inputStream != null) {
try {
inputStream.close();
} catch (Exception ignore){}
}
}
}
rfTracker.trackRequestResult(node, success, rf);
}
}
}
public static Diagnostics.Callable testing_errorHook; // called on error when forwarding request. Currently data=[this, Request]
public static class Response {
public List<Error> errors = new ArrayList<>();
}
public static class Error {
public Exception e;
public int statusCode = -1;
public Req req;
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("SolrCmdDistributor$Error: statusCode=").append(statusCode);
sb.append("; exception=").append(String.valueOf(e));
sb.append("; req=").append(String.valueOf(req));
return sb.toString();
}
}
public static abstract class Node {
public abstract String getUrl();
public abstract boolean checkRetry();
public abstract String getCoreName();
public abstract String getBaseUrl();
public abstract ZkCoreNodeProps getNodeProps();
public abstract String getCollection();
public abstract String getShardId();
}
public static class StdNode extends Node {
protected ZkCoreNodeProps nodeProps;
protected String collection;
protected String shardId;
public StdNode(ZkCoreNodeProps nodeProps) {
this(nodeProps, null, null);
}
public StdNode(ZkCoreNodeProps nodeProps, String collection, String shardId) {
this.nodeProps = nodeProps;
this.collection = collection;
this.shardId = shardId;
}
public String getCollection() {
return collection;
}
public String getShardId() {
return shardId;
}
@Override
public String getUrl() {
return nodeProps.getCoreUrl();
}
@Override
public String toString() {
return this.getClass().getSimpleName() + ": " + nodeProps.getCoreUrl();
}
@Override
public boolean checkRetry() {
return false;
}
@Override
public String getBaseUrl() {
return nodeProps.getBaseUrl();
}
@Override
public String getCoreName() {
return nodeProps.getCoreName();
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
String baseUrl = nodeProps.getBaseUrl();
String coreName = nodeProps.getCoreName();
String url = nodeProps.getCoreUrl();
result = prime * result + ((baseUrl == null) ? 0 : baseUrl.hashCode());
result = prime * result + ((coreName == null) ? 0 : coreName.hashCode());
result = prime * result + ((url == null) ? 0 : url.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
StdNode other = (StdNode) obj;
String baseUrl = nodeProps.getBaseUrl();
String coreName = nodeProps.getCoreName();
String url = nodeProps.getCoreUrl();
if (baseUrl == null) {
if (other.nodeProps.getBaseUrl() != null) return false;
} else if (!baseUrl.equals(other.nodeProps.getBaseUrl())) return false;
if (coreName == null) {
if (other.nodeProps.getCoreName() != null) return false;
} else if (!coreName.equals(other.nodeProps.getCoreName())) return false;
if (url == null) {
if (other.nodeProps.getCoreUrl() != null) return false;
} else if (!url.equals(other.nodeProps.getCoreUrl())) return false;
return true;
}
@Override
public ZkCoreNodeProps getNodeProps() {
return nodeProps;
}
}
// RetryNodes are used in the case of 'forward to leader' where we want
// to try the latest leader on a fail in the case the leader just went down.
public static class RetryNode extends StdNode {
private ZkStateReader zkStateReader;
public RetryNode(ZkCoreNodeProps nodeProps, ZkStateReader zkStateReader, String collection, String shardId) {
super(nodeProps, collection, shardId);
this.zkStateReader = zkStateReader;
this.collection = collection;
this.shardId = shardId;
}
@Override
public boolean checkRetry() {
ZkCoreNodeProps leaderProps;
try {
leaderProps = new ZkCoreNodeProps(zkStateReader.getLeaderRetry(
collection, shardId));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
} catch (Exception e) {
// we retry with same info
log.warn(null, e);
return true;
}
this.nodeProps = leaderProps;
return true;
}
@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result
+ ((collection == null) ? 0 : collection.hashCode());
result = prime * result + ((shardId == null) ? 0 : shardId.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (!super.equals(obj)) return false;
if (getClass() != obj.getClass()) return false;
RetryNode other = (RetryNode) obj;
if (nodeProps.getCoreUrl() == null) {
if (other.nodeProps.getCoreUrl() != null) return false;
} else if (!nodeProps.getCoreUrl().equals(other.nodeProps.getCoreUrl())) return false;
return true;
}
}
public List<Error> getErrors() {
return allErrors;
}
}