| /** |
| * |
| */ |
| package org.eclipse.smila.solr; |
| |
| import java.util.concurrent.ExecutionException; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.solr.client.solrj.SolrClient; |
| import org.apache.solr.client.solrj.impl.CloudSolrClient; |
| import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient; |
| import org.apache.solr.client.solrj.impl.HttpSolrClient; |
| import org.apache.solr.client.solrj.impl.LBHttpSolrClient; |
| import org.eclipse.smila.datamodel.AnyMap; |
| import org.eclipse.smila.datamodel.AnySeq; |
| |
| import com.google.common.base.Joiner; |
| import com.google.common.base.Strings; |
| import com.google.common.cache.CacheBuilder; |
| import com.google.common.cache.CacheLoader; |
| import com.google.common.cache.LoadingCache; |
| |
| /** |
| * @author pwissel |
| * |
| */ |
| public class SolrClients { |
| |
| private static final String IMPL_CLOUD = "CloudSolrClient"; |
| |
| private static final String IMPL_HTTP = "HttpSolrClient"; |
| |
| private static final String IMPL_CONCURRENT_UPDATE = "ConcurrentUpdateSolrClient"; |
| |
| private static final String IMPL_DELEGATION_TOKEN_HTTP = "DelegationTokenHttpSolrClient"; |
| |
| private static final String IMPL_LB_HTTP = "LBHttpSolrClient"; |
| |
| private static final String PARAM_ZK_HOSTS = "zkHosts"; |
| |
| private static final String PARAM_ZK_CHROOT = "zkChroot"; |
| |
| private static final String PARAM_SEND_UPDATES_ONLY_TO_SHARD_LEADERS = "sendUpdatesOnlyToShardLeaders"; |
| |
| private static final String PARAM_SEND_DIRECT_UPDATES_TO_SHARD_LEADERS_ONLY = |
| "sendDirectUpdatesToShardLeadersOnly"; |
| |
| private static final String PARAM_BASE_SOLR_URL = "baseSolrUrl"; |
| |
| private static final String PARAM_ALLOW_COMPRESSION = "allowCompression"; |
| |
| private static final String PARAM_QUEUE_SIZE = "queueSize"; |
| |
| private static final String PARAM_THREAD_COUNT = "threadCount"; |
| |
| private static final String PARAM_STREAM_DELETES = "streamDeletes"; |
| |
| private static final String PARAM_DELEGATION_TOKEN = "delegationToken"; |
| |
| private static final String PARAM_BASE_SOLR_URLS = "baseSolrUrls"; |
| |
| private final Log _log = LogFactory.getLog(getClass()); |
| |
| private final Joiner _joiner = Joiner.on(',').skipNulls(); |
| |
| private final SolrConfiguration _configuration; |
| |
| final LoadingCache<String, SolrClient> _clients = CacheBuilder.newBuilder().build( |
| new CacheLoader<String, SolrClient>() { |
| |
| @Override |
| public SolrClient load(String clientName) throws Exception { |
| clientName = clientName != null ? clientName : _configuration.getClientDefault(); |
| return createClient(clientName); |
| } |
| }); |
| |
| public SolrClients(final SolrConfiguration configuration) { |
| _configuration = configuration; |
| } |
| |
| private SolrClient createClient(final String clientName) { |
| final AnyMap connection = _configuration.getConnection(clientName); |
| final String impl = connection.getStringValue(SolrConfiguration.IMPL); |
| switch (impl) { |
| case IMPL_CLOUD: |
| return createCloudSolrClient(clientName, connection, impl); |
| case IMPL_HTTP: |
| return createHttpSolrClient(clientName, connection, impl); |
| case IMPL_CONCURRENT_UPDATE: |
| return createConcurrentUpdateSolrClient(clientName, connection, impl); |
| case IMPL_DELEGATION_TOKEN_HTTP: |
| return createDelegationTokenHttpSolrClient(clientName, connection, impl); |
| case IMPL_LB_HTTP: |
| return createLBHttpSolrClient(clientName, connection, impl); |
| default: |
| final String message = String.format("Unknown Solr implementation. impl: '%s'.", impl); |
| throw new IllegalArgumentException(message); |
| } |
| } |
| |
| private CloudSolrClient createCloudSolrClient(final String clientName, final AnyMap connection, final String impl) { |
| // read client configuration |
| final AnySeq zkHosts = connection.getSeq(PARAM_ZK_HOSTS); |
| final String zkChroot = connection.getStringValue(PARAM_ZK_CHROOT); |
| final Boolean sendUpdatesOnlyToShardLeaders = |
| connection.getBooleanValue(PARAM_SEND_UPDATES_ONLY_TO_SHARD_LEADERS); |
| final Boolean sendDirectUpdatesToShardLeadersOnly = |
| connection.getBooleanValue(PARAM_SEND_DIRECT_UPDATES_TO_SHARD_LEADERS_ONLY); |
| // log client configuration |
| if (_log.isDebugEnabled()) { |
| final String message = |
| String |
| .format( |
| "Create '%s' with zkHosts: '%s'; zkChroot: '%s'; sendUpdatesOnlyToShardLeaders: '%s'; sendDirectUpdatesToShardLeadersOnly: '%s'.", |
| impl, _joiner.join(zkHosts), zkChroot, sendUpdatesOnlyToShardLeaders, |
| sendDirectUpdatesToShardLeadersOnly); |
| _log.debug(message); |
| } |
| // set client configuration via builder |
| final org.apache.solr.client.solrj.impl.CloudSolrClient.Builder builder = new CloudSolrClient.Builder(); |
| // zkHosts |
| builder.withZkHost(zkHosts.asStrings()); |
| // zkChroot |
| if (!Strings.isNullOrEmpty(zkChroot)) { |
| builder.withZkChroot(zkChroot); |
| } |
| // sendUpdatesOnlyToShardLeaders |
| if (sendUpdatesOnlyToShardLeaders != null) { |
| if (sendUpdatesOnlyToShardLeaders.booleanValue()) { |
| builder.sendUpdatesOnlyToShardLeaders(); |
| } else { |
| builder.sendUpdatesToAllReplicasInShard(); |
| } |
| } |
| // sendDirectUpdatesToShardLeadersOnly |
| if (sendDirectUpdatesToShardLeadersOnly != null) { |
| if (sendDirectUpdatesToShardLeadersOnly.booleanValue()) { |
| builder.sendDirectUpdatesToShardLeadersOnly(); |
| } else { |
| builder.sendDirectUpdatesToAnyShardReplica(); |
| } |
| } |
| // build solr client |
| return builder.build(); |
| } |
| |
| private HttpSolrClient createHttpSolrClient(final String clientName, final AnyMap connection, final String impl) { |
| // read client configuration |
| final String baseSolrUrl = connection.getStringValue(PARAM_BASE_SOLR_URL); |
| final Boolean allowCompression = connection.getBooleanValue(PARAM_ALLOW_COMPRESSION); |
| // log client configuration |
| if (_log.isDebugEnabled()) { |
| final String message = String.format("Create '%s' with baseSolrUrl: '%s'.", impl, baseSolrUrl); |
| _log.debug(message); |
| } |
| // set client configuration via builder |
| final org.apache.solr.client.solrj.impl.HttpSolrClient.Builder builder = |
| new HttpSolrClient.Builder(baseSolrUrl); |
| // allowCompression |
| if (allowCompression != null) { |
| builder.allowCompression(allowCompression.booleanValue()); |
| } |
| // build solr client |
| return builder.build(); |
| } |
| |
| private ConcurrentUpdateSolrClient createConcurrentUpdateSolrClient(final String clientName, |
| final AnyMap connection, final String impl) { |
| // read client configuration |
| final String baseSolrUrl = connection.getStringValue(PARAM_BASE_SOLR_URL); |
| final Long queueSize = connection.getLongValue(PARAM_QUEUE_SIZE); |
| final Long threadCount = connection.getLongValue(PARAM_THREAD_COUNT); |
| final Boolean streamDeletes = connection.getBooleanValue(PARAM_STREAM_DELETES); |
| // log client configuration |
| if (_log.isDebugEnabled()) { |
| final String message = |
| String.format( |
| "Create '%s' with baseSolrUrl: '%s'; queueSize: '%s'; threadCount: '%s'; streamDeletes: '%s'.", impl, |
| baseSolrUrl, queueSize, threadCount, streamDeletes); |
| _log.debug(message); |
| } |
| // set client configuration via builder |
| final org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient.Builder builder = |
| new ConcurrentUpdateSolrClient.Builder(baseSolrUrl); |
| // queueSize |
| if (queueSize != null) { |
| builder.withQueueSize(queueSize.intValue()); |
| } |
| // threadCount |
| if (threadCount != null) { |
| builder.withThreadCount(threadCount.intValue()); |
| } |
| // streamDeletes |
| if (streamDeletes != null) { |
| if (streamDeletes.booleanValue()) { |
| builder.alwaysStreamDeletes(); |
| } else { |
| builder.neverStreamDeletes(); |
| } |
| } |
| // build solr client |
| return builder.build(); |
| } |
| |
| private HttpSolrClient createDelegationTokenHttpSolrClient(final String clientName, final AnyMap connection, |
| final String impl) { |
| // read client configuration |
| final String baseSolrUrl = connection.getStringValue(PARAM_BASE_SOLR_URL); |
| final String delegationToken = connection.getStringValue(PARAM_DELEGATION_TOKEN); |
| // log client configuration |
| if (_log.isDebugEnabled()) { |
| final String message = |
| String.format("Create '%s' with baseSolrUrl: '%s'; delegationToken: '%s'.", impl, baseSolrUrl, |
| delegationToken); |
| _log.debug(message); |
| } |
| // set client configuration via builder |
| final org.apache.solr.client.solrj.impl.HttpSolrClient.Builder builder = |
| new HttpSolrClient.Builder(baseSolrUrl); |
| // delegationToken |
| if (delegationToken != null) { |
| builder.withDelegationToken(delegationToken); |
| } |
| // build solr client |
| return builder.build(); |
| } |
| |
| private LBHttpSolrClient createLBHttpSolrClient(final String clientName, final AnyMap connection, |
| final String impl) { |
| // read client configuration |
| final AnySeq baseSolrUrls = connection.getSeq(PARAM_BASE_SOLR_URLS); |
| // log client configuration |
| if (_log.isDebugEnabled()) { |
| final String message = |
| String.format("Create '%s' with baseSolrUrls: '%s'.", impl, _joiner.join(baseSolrUrls)); |
| _log.debug(message); |
| } |
| // set client configuration via builder |
| final org.apache.solr.client.solrj.impl.LBHttpSolrClient.Builder builder = new LBHttpSolrClient.Builder(); |
| // baseSolrUrls |
| builder.withBaseSolrUrls(baseSolrUrls.asStrings().toArray(new String[baseSolrUrls.size()])); |
| // build solr client |
| return builder.build(); |
| } |
| |
| public SolrClient get(final String clientName) throws ExecutionException { |
| return _clients.get(clientName); |
| } |
| |
| } |