| /******************************************************************************** |
| * Copyright (c) 2015-2020 Contributors to the Eclipse Foundation |
| * |
| * See the NOTICE file(s) distributed with this work for additional |
| * information regarding copyright ownership. |
| * |
| * This program and the accompanying materials are made available under the |
| * terms of the Eclipse Public License v. 2.0 which is available at |
| * http://www.eclipse.org/legal/epl-2.0. |
| * |
| * SPDX-License-Identifier: EPL-2.0 |
| * |
| ********************************************************************************/ |
| |
| package org.eclipse.mdm.freetextindexer.boundary; |
| |
| import java.io.IOException; |
| import java.net.URI; |
| import java.text.SimpleDateFormat; |
| |
| import javax.annotation.PostConstruct; |
| import javax.ejb.Stateless; |
| import javax.ejb.TransactionAttribute; |
| import javax.ejb.TransactionAttributeType; |
| import javax.inject.Inject; |
| |
| import org.apache.commons.httpclient.HttpClient; |
| import org.apache.commons.httpclient.HttpMethod; |
| import org.apache.commons.httpclient.methods.ByteArrayRequestEntity; |
| import org.apache.commons.httpclient.methods.DeleteMethod; |
| import org.apache.commons.httpclient.methods.GetMethod; |
| import org.apache.commons.httpclient.methods.PutMethod; |
| import org.eclipse.mdm.api.base.model.Entity; |
| import org.eclipse.mdm.freetextindexer.entities.MDMEntityResponse; |
| import org.eclipse.mdm.property.GlobalProperty; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| |
| /** |
| * This Boundary is back-end only to the ElasticSearch Server. It is responsible |
| * for the actual indexing work. |
| * |
| * @author CWE |
| * |
| */ |
| @TransactionAttribute(value = TransactionAttributeType.NOT_SUPPORTED) |
| @Stateless |
| public class ElasticsearchBoundary { |
| |
| private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchBoundary.class); |
| |
| private ObjectMapper jsonMapper; |
| private HttpClient client; |
| private URI esUri; |
| |
| @Inject |
| @GlobalProperty(value = "elasticsearch.url") |
| String esAddress; |
| |
| @Inject |
| @GlobalProperty(value = "freetext.active") |
| String active; |
| |
| /** |
| * Connects to the ElasticSearch Server |
| * |
| * @throws IOException |
| */ |
| public ElasticsearchBoundary() { |
| jsonMapper = new ObjectMapper(); |
| jsonMapper.setDateFormat(new SimpleDateFormat("yyyyMMdd'T'HHmmssZ")); |
| client = new HttpClient(); |
| } |
| |
| @PostConstruct |
| public void init() { |
| try { |
| esUri = URI.create(esAddress + "/"); |
| } catch (IllegalArgumentException e) { |
| throw new IllegalArgumentException( |
| "Value '" + esAddress + "' for parameter 'elasticsearch.url' is invalid!", e); |
| } |
| } |
| |
| public void index(MDMEntityResponse document) { |
| try { |
| PutMethod put = new PutMethod(getPath(document)); |
| |
| byte[] json = jsonMapper.writeValueAsBytes(document); |
| LOGGER.trace("Document {}: {}", put.getPath(), new String(json)); |
| |
| put.setRequestEntity(new ByteArrayRequestEntity(json, "application/json")); |
| |
| execute(put); |
| |
| } catch (IOException e) { |
| throw new IllegalStateException(e); |
| } |
| } |
| |
| /** |
| * @param document |
| * @return the path of the {@link Entity} within elasticsearch |
| */ |
| private String getPath(MDMEntityResponse document) { |
| return getPath(document.source, document.type, document.id); |
| } |
| |
| /** |
| * |
| * @param source Datasource of the {@link Entity} |
| * @param type Type of the {@link Entity} |
| * @param id Instance id of the {@link Entity} |
| * @return the path of the {@link Entity} within elasticsearch |
| */ |
| private String getPath(String source, String type, String id) { |
| return esUri.resolve(source.toLowerCase() + "/_doc/" + getEsCompositeId(type, id)).toString(); |
| } |
| |
| /** |
| * |
| * @param type Type of the {@link Entity} |
| * @param id Instance id of the {@link Entity} |
| * @return the composite id of the {@link Entity} within elasticsearch |
| */ |
| private String getEsCompositeId(String type, String id) { |
| return type + "-" + id; |
| } |
| |
| private void execute(HttpMethod put) { |
| try { |
| int status = client.executeMethod(put); |
| |
| checkError(status, put); |
| } catch (IOException e) { |
| throw new IllegalStateException("Problems querying ElasticSearch.", e); |
| } |
| } |
| |
| private void checkError(int status, HttpMethod method) { |
| String text = String.format("ElasticSearch answered %d. ", status); |
| |
| int httpCategory = status / 100; |
| switch (httpCategory) { |
| case 4: |
| text = text + "This indicates a Client error: "; |
| break; |
| case 5: |
| text = text + "This indicates a Server error. The ES instance must be checked (" + esUri.toString() + "): "; |
| break; |
| } |
| |
| try { |
| if (httpCategory != 2) { |
| throw new IllegalStateException(text + method.getResponseBodyAsString()); |
| } |
| } catch (IOException e) { |
| throw new IllegalStateException(text + "\nError occured during reading the elastic search response!", e); |
| } |
| } |
| |
| public void delete(String api, String type, String id) { |
| String path = getPath(api.toLowerCase(), type, id); |
| DeleteMethod put = new DeleteMethod(esUri.resolve(path).toString()); |
| |
| execute(put); |
| |
| if (LOGGER.isDebugEnabled()) { |
| LOGGER.debug("Document '{}' has been deleted!", path); |
| } |
| } |
| |
| public boolean hasIndex(String source) { |
| boolean hasIndex = false; |
| |
| if (active()) { |
| try { |
| GetMethod get = new GetMethod(esUri.resolve(source.toLowerCase()).toString()); |
| int status = client.executeMethod(get); |
| LOGGER.info("Checking index {}: {}", source, status); |
| |
| hasIndex = status / 100 == 2; |
| } catch (IOException e) { |
| LOGGER.warn("Querying ElasticSearch for the Index failed... Assuming no index is there!", e); |
| hasIndex = false; |
| } |
| } |
| |
| return hasIndex; |
| } |
| |
| public void createIndex(String source) { |
| if (Boolean.valueOf(active)) { |
| execute(new PutMethod(esUri.resolve("/" + source.toLowerCase()).toString())); |
| LOGGER.info("New Index created!"); |
| } |
| } |
| |
| public void validateConnectionIsPossible() { |
| if (active()) { |
| try { |
| GetMethod get = new GetMethod(esUri.toString()); |
| int status = client.executeMethod(get); |
| if (status / 100 != 2) { |
| LOGGER.error( |
| "Cannot connect to elasticsearch at {} but free text search is enabled! http status was: {}", |
| esUri, status); |
| throw new RuntimeException("Cannot connect to elasticsearch."); |
| } else { |
| LOGGER.info("Successfully connected to elasticsearch!"); |
| } |
| |
| } catch (IOException e) { |
| LOGGER.error("Cannot connect to elasticsearch at {} but free text search is enabled!", esUri, e); |
| throw new RuntimeException("Cannot connect to elasticsearch.", e); |
| } |
| } |
| } |
| |
| public boolean active() { |
| return Boolean.valueOf(active); |
| } |
| } |