blob: 9054eb1632d1cce131e62a5409a84d880dcb197f [file] [log] [blame]
/**
*
*/
package org.eclipse.smila.solr;
import java.util.concurrent.ExecutionException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.AnySeq;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
/**
* @author pwissel
*
*/
public class SolrClients {
private static final String IMPL_CLOUD = "CloudSolrClient";
private static final String IMPL_HTTP = "HttpSolrClient";
private static final String IMPL_CONCURRENT_UPDATE = "ConcurrentUpdateSolrClient";
private static final String IMPL_DELEGATION_TOKEN_HTTP = "DelegationTokenHttpSolrClient";
private static final String IMPL_LB_HTTP = "LBHttpSolrClient";
private static final String PARAM_ZK_HOSTS = "zkHosts";
private static final String PARAM_ZK_CHROOT = "zkChroot";
private static final String PARAM_SEND_UPDATES_ONLY_TO_SHARD_LEADERS = "sendUpdatesOnlyToShardLeaders";
private static final String PARAM_SEND_DIRECT_UPDATES_TO_SHARD_LEADERS_ONLY =
"sendDirectUpdatesToShardLeadersOnly";
private static final String PARAM_BASE_SOLR_URL = "baseSolrUrl";
private static final String PARAM_ALLOW_COMPRESSION = "allowCompression";
private static final String PARAM_QUEUE_SIZE = "queueSize";
private static final String PARAM_THREAD_COUNT = "threadCount";
private static final String PARAM_STREAM_DELETES = "streamDeletes";
private static final String PARAM_DELEGATION_TOKEN = "delegationToken";
private static final String PARAM_BASE_SOLR_URLS = "baseSolrUrls";
private final Log _log = LogFactory.getLog(getClass());
private final Joiner _joiner = Joiner.on(',').skipNulls();
private final SolrConfiguration _configuration;
final LoadingCache<String, SolrClient> _clients = CacheBuilder.newBuilder().build(
new CacheLoader<String, SolrClient>() {
@Override
public SolrClient load(String clientName) throws Exception {
clientName = clientName != null ? clientName : _configuration.getClientDefault();
return createClient(clientName);
}
});
public SolrClients(final SolrConfiguration configuration) {
_configuration = configuration;
}
private SolrClient createClient(final String clientName) {
final AnyMap connection = _configuration.getConnection(clientName);
final String impl = connection.getStringValue(SolrConfiguration.IMPL);
switch (impl) {
case IMPL_CLOUD:
return createCloudSolrClient(clientName, connection, impl);
case IMPL_HTTP:
return createHttpSolrClient(clientName, connection, impl);
case IMPL_CONCURRENT_UPDATE:
return createConcurrentUpdateSolrClient(clientName, connection, impl);
case IMPL_DELEGATION_TOKEN_HTTP:
return createDelegationTokenHttpSolrClient(clientName, connection, impl);
case IMPL_LB_HTTP:
return createLBHttpSolrClient(clientName, connection, impl);
default:
final String message = String.format("Unknown Solr implementation. impl: '%s'.", impl);
throw new IllegalArgumentException(message);
}
}
private CloudSolrClient createCloudSolrClient(final String clientName, final AnyMap connection, final String impl) {
// read client configuration
final AnySeq zkHosts = connection.getSeq(PARAM_ZK_HOSTS);
final String zkChroot = connection.getStringValue(PARAM_ZK_CHROOT);
final Boolean sendUpdatesOnlyToShardLeaders =
connection.getBooleanValue(PARAM_SEND_UPDATES_ONLY_TO_SHARD_LEADERS);
final Boolean sendDirectUpdatesToShardLeadersOnly =
connection.getBooleanValue(PARAM_SEND_DIRECT_UPDATES_TO_SHARD_LEADERS_ONLY);
// log client configuration
if (_log.isDebugEnabled()) {
final String message =
String
.format(
"Create '%s' with zkHosts: '%s'; zkChroot: '%s'; sendUpdatesOnlyToShardLeaders: '%s'; sendDirectUpdatesToShardLeadersOnly: '%s'.",
impl, _joiner.join(zkHosts), zkChroot, sendUpdatesOnlyToShardLeaders,
sendDirectUpdatesToShardLeadersOnly);
_log.debug(message);
}
// set client configuration via builder
final org.apache.solr.client.solrj.impl.CloudSolrClient.Builder builder = new CloudSolrClient.Builder();
// zkHosts
builder.withZkHost(zkHosts.asStrings());
// zkChroot
if (!Strings.isNullOrEmpty(zkChroot)) {
builder.withZkChroot(zkChroot);
}
// sendUpdatesOnlyToShardLeaders
if (sendUpdatesOnlyToShardLeaders != null) {
if (sendUpdatesOnlyToShardLeaders.booleanValue()) {
builder.sendUpdatesOnlyToShardLeaders();
} else {
builder.sendUpdatesToAllReplicasInShard();
}
}
// sendDirectUpdatesToShardLeadersOnly
if (sendDirectUpdatesToShardLeadersOnly != null) {
if (sendDirectUpdatesToShardLeadersOnly.booleanValue()) {
builder.sendDirectUpdatesToShardLeadersOnly();
} else {
builder.sendDirectUpdatesToAnyShardReplica();
}
}
// build solr client
return builder.build();
}
private HttpSolrClient createHttpSolrClient(final String clientName, final AnyMap connection, final String impl) {
// read client configuration
final String baseSolrUrl = connection.getStringValue(PARAM_BASE_SOLR_URL);
final Boolean allowCompression = connection.getBooleanValue(PARAM_ALLOW_COMPRESSION);
// log client configuration
if (_log.isDebugEnabled()) {
final String message = String.format("Create '%s' with baseSolrUrl: '%s'.", impl, baseSolrUrl);
_log.debug(message);
}
// set client configuration via builder
final org.apache.solr.client.solrj.impl.HttpSolrClient.Builder builder =
new HttpSolrClient.Builder(baseSolrUrl);
// allowCompression
if (allowCompression != null) {
builder.allowCompression(allowCompression.booleanValue());
}
// build solr client
return builder.build();
}
private ConcurrentUpdateSolrClient createConcurrentUpdateSolrClient(final String clientName,
final AnyMap connection, final String impl) {
// read client configuration
final String baseSolrUrl = connection.getStringValue(PARAM_BASE_SOLR_URL);
final Long queueSize = connection.getLongValue(PARAM_QUEUE_SIZE);
final Long threadCount = connection.getLongValue(PARAM_THREAD_COUNT);
final Boolean streamDeletes = connection.getBooleanValue(PARAM_STREAM_DELETES);
// log client configuration
if (_log.isDebugEnabled()) {
final String message =
String.format(
"Create '%s' with baseSolrUrl: '%s'; queueSize: '%s'; threadCount: '%s'; streamDeletes: '%s'.", impl,
baseSolrUrl, queueSize, threadCount, streamDeletes);
_log.debug(message);
}
// set client configuration via builder
final org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient.Builder builder =
new ConcurrentUpdateSolrClient.Builder(baseSolrUrl);
// queueSize
if (queueSize != null) {
builder.withQueueSize(queueSize.intValue());
}
// threadCount
if (threadCount != null) {
builder.withThreadCount(threadCount.intValue());
}
// streamDeletes
if (streamDeletes != null) {
if (streamDeletes.booleanValue()) {
builder.alwaysStreamDeletes();
} else {
builder.neverStreamDeletes();
}
}
// build solr client
return builder.build();
}
private HttpSolrClient createDelegationTokenHttpSolrClient(final String clientName, final AnyMap connection,
final String impl) {
// read client configuration
final String baseSolrUrl = connection.getStringValue(PARAM_BASE_SOLR_URL);
final String delegationToken = connection.getStringValue(PARAM_DELEGATION_TOKEN);
// log client configuration
if (_log.isDebugEnabled()) {
final String message =
String.format("Create '%s' with baseSolrUrl: '%s'; delegationToken: '%s'.", impl, baseSolrUrl,
delegationToken);
_log.debug(message);
}
// set client configuration via builder
final org.apache.solr.client.solrj.impl.HttpSolrClient.Builder builder =
new HttpSolrClient.Builder(baseSolrUrl);
// delegationToken
if (delegationToken != null) {
builder.withDelegationToken(delegationToken);
}
// build solr client
return builder.build();
}
private LBHttpSolrClient createLBHttpSolrClient(final String clientName, final AnyMap connection,
final String impl) {
// read client configuration
final AnySeq baseSolrUrls = connection.getSeq(PARAM_BASE_SOLR_URLS);
// log client configuration
if (_log.isDebugEnabled()) {
final String message =
String.format("Create '%s' with baseSolrUrls: '%s'.", impl, _joiner.join(baseSolrUrls));
_log.debug(message);
}
// set client configuration via builder
final org.apache.solr.client.solrj.impl.LBHttpSolrClient.Builder builder = new LBHttpSolrClient.Builder();
// baseSolrUrls
builder.withBaseSolrUrls(baseSolrUrls.asStrings().toArray(new String[baseSolrUrls.size()]));
// build solr client
return builder.build();
}
public SolrClient get(final String clientName) throws ExecutionException {
return _clients.get(clientName);
}
}