/******************************************************************************* | |
* Copyright (C) 2017 ANSYS medini Technologies AG | |
* | |
* This program and the accompanying materials are made | |
* available under the terms of the Eclipse Public License 2.0 | |
* which is available at https://www.eclipse.org/legal/epl-2.0/ | |
* | |
* SPDX-License-Identifier: EPL-2.0 | |
* | |
* Contributors: | |
* ANSYS medini Technologies AG - initial API and implementation | |
******************************************************************************/ | |
package org.eclipse.opencert.elastic; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.io.InputStreamReader; | |
import java.util.Collections; | |
import java.util.Date; | |
import java.util.HashSet; | |
import java.util.Iterator; | |
import java.util.Map; | |
import java.util.Set; | |
import org.apache.http.HttpEntity; | |
import org.apache.http.HttpHost; | |
import org.apache.http.entity.ContentType; | |
import org.apache.http.impl.nio.client.DefaultAsyncUserTokenHandler; | |
import org.apache.http.nio.entity.NStringEntity; | |
import org.elasticsearch.action.search.SearchRequest; | |
import org.elasticsearch.action.search.SearchResponse; | |
import org.elasticsearch.client.Response; | |
import org.elasticsearch.client.RestClient; | |
import org.elasticsearch.client.RestHighLevelClient; | |
import org.elasticsearch.index.query.QueryBuilder; | |
import org.elasticsearch.index.query.QueryBuilders; | |
import org.elasticsearch.search.SearchHit; | |
import org.elasticsearch.search.SearchHits; | |
import org.elasticsearch.search.builder.SearchSourceBuilder; | |
import com.google.gson.Gson; | |
import com.google.gson.JsonObject; | |
/** | |
* {@link DefaultAsyncUserTokenHandler} implementation of {@link ElasticClient}. | |
* | |
* @author mauersberger | |
*/ | |
public class ElasticClientImpl implements ElasticClient, AutoCloseable { | |
// the "real" client | |
private RestClient restClient; | |
@SuppressWarnings("unused") | |
private int lastStatus; | |
/** | |
* Factory method to create a new {@link ElasticClient}. | |
* | |
* @param host | |
* server name or IP | |
* @param port | |
* port | |
* @param scheme | |
* "http" or "https" | |
* @return the new {@link ElasticClient} instance | |
*/ | |
public static ElasticClient on(String host, int port, String scheme) { | |
ElasticClientImpl client = new ElasticClientImpl(); | |
client.restClient = RestClient.builder(new HttpHost(host, port, scheme)).build(); | |
return client; | |
} | |
/* | |
* Intentionally private. | |
*/ | |
private ElasticClientImpl() { | |
// just to avoid instantiation, force using the factory API | |
} | |
@Override | |
public ElasticClient store(ElasticDocument document) throws IOException { | |
HttpEntity entity = new NStringEntity(document.source.toString(), ContentType.APPLICATION_JSON); | |
Response response = this.restClient.performRequest("PUT", document.getEndPoint(), Collections.emptyMap(), entity); //$NON-NLS-1$ | |
lastStatus = response.getStatusLine().getStatusCode(); | |
return this; | |
} | |
@Override | |
public ElasticClient storeAll(Iterator<ElasticDocument> documents) throws IOException { | |
while (documents.hasNext()) { | |
store (documents.next()); | |
} | |
return this; | |
} | |
@Override | |
public Set<ElasticDocument> search(String indexName, String queryString, Map<String, Object> filters) throws Exception { | |
RestHighLevelClient highLevel = new RestHighLevelClient(this.restClient); | |
SearchRequest searchRequest = new SearchRequest(indexName); | |
// see example at https://dzone.com/articles/java-high-level-rest-client-elasticsearch | |
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); | |
searchRequest.source(sourceBuilder); | |
// find the best query builder depending on the arguments | |
QueryBuilder queryBuilder = QueryBuilders.matchAllQuery(); | |
if (queryString != null) { | |
queryBuilder = QueryBuilders.queryStringQuery(queryString); | |
} | |
sourceBuilder.query(queryBuilder); | |
SearchResponse response = highLevel.search(searchRequest); | |
System.out.println(response); | |
lastStatus = response.status().getStatus(); | |
Set<ElasticDocument> result = new HashSet<>(); | |
SearchHits hits = response.getHits(); | |
for (int i=0; i< hits.totalHits; i++) { | |
SearchHit hit = hits.getAt(i); | |
// convert Hit to Json | |
JsonObject json = new JsonObject(); | |
// put all source information back into JSON | |
Map<String, Object> fields = hit.getSource(); | |
Set<String> keys = fields.keySet(); | |
for (String key : keys) { | |
Object field = fields.get(key); | |
json.addProperty(key, (String) field.toString()); | |
} | |
ElasticDocument doc = new ElasticDocument(json, hit.getIndex(), hit.getType(), hit.getId()); | |
result.add(doc); | |
} | |
return result; | |
} | |
@Override | |
public ElasticClient ping() throws Exception { | |
Response response = this.restClient.performRequest("GET", "/"); //$NON-NLS-1$ //$NON-NLS-2$ | |
lastStatus = response.getStatusLine().getStatusCode(); | |
return this; | |
} | |
@Override | |
public ElasticClient delete(String indexName) throws Exception { | |
Response response = this.restClient.performRequest("DELETE", "/" + indexName, Collections.emptyMap()); //$NON-NLS-1$ //$NON-NLS-2$ | |
lastStatus = response.getStatusLine().getStatusCode(); | |
return this; | |
} | |
@Override | |
public String version() throws IOException { | |
Response response = this.restClient.performRequest("GET", "/"); //$NON-NLS-1$ //$NON-NLS-2$ | |
lastStatus = response.getStatusLine().getStatusCode(); | |
// we assume its a JSON response | |
try (InputStream content = response.getEntity().getContent()) { | |
InputStreamReader reader = new InputStreamReader(content, "UTF8"); //$NON-NLS-1$ | |
JsonObject json = new Gson().fromJson(reader, JsonObject.class); | |
return json.get("version").getAsJsonObject().get("number").getAsString(); //$NON-NLS-1$//$NON-NLS-2$ | |
} catch (Exception exception) { | |
throw new IOException("Unable to ping server"); //$NON-NLS-1$ | |
} | |
} | |
@Override | |
public String indexStatus(String indexName) throws Exception { | |
Response response = this.restClient.performRequest("GET", "/" + indexName); //$NON-NLS-1$ //$NON-NLS-2$ | |
lastStatus = response.getStatusLine().getStatusCode(); | |
// we assume its a JSON response | |
try (InputStream content = response.getEntity().getContent()) { | |
InputStreamReader reader = new InputStreamReader(content, "UTF8"); //$NON-NLS-1$ | |
JsonObject root = new Gson().fromJson(reader, JsonObject.class); | |
JsonObject index = root.get(indexName).getAsJsonObject(); | |
JsonObject settings = index.get("settings").getAsJsonObject(); //$NON-NLS-1$ | |
JsonObject indexSettings = settings.get("index").getAsJsonObject(); //$NON-NLS-1$ | |
long date = indexSettings.get("creation_date").getAsLong(); //$NON-NLS-1$ | |
return "Created " + new Date(date).toString(); //$NON-NLS-1$ | |
} catch (Exception exception) { | |
throw new IOException("Unable to ping index", exception); //$NON-NLS-1$ | |
} | |
} | |
@Override | |
public void close() throws IOException { | |
if (restClient != null) { | |
restClient.close(); | |
restClient = null; | |
} | |
} | |
} |