blob: 01395ceda7adf47bf427746973a93cb8cfeaa0fb [file] [log] [blame]
/********************************************************************************
* Copyright (c) 2015-2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* SPDX-License-Identifier: EPL-2.0
*
********************************************************************************/
package org.eclipse.mdm.freetextindexer.boundary;
import java.security.Principal;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import javax.ejb.EJB;
import javax.ejb.Schedule;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.enterprise.concurrent.ManagedScheduledExecutorService;
import javax.inject.Inject;
import org.eclipse.mdm.api.base.ConnectionException;
import org.eclipse.mdm.api.base.ServiceNotProvidedException;
import org.eclipse.mdm.api.base.adapter.EntityType;
import org.eclipse.mdm.api.base.model.Entity;
import org.eclipse.mdm.api.base.model.User;
import org.eclipse.mdm.api.base.notification.NotificationException;
import org.eclipse.mdm.api.base.notification.NotificationFilter;
import org.eclipse.mdm.api.base.notification.NotificationListener;
import org.eclipse.mdm.api.base.notification.NotificationService;
import org.eclipse.mdm.api.base.query.DataAccessException;
import org.eclipse.mdm.api.dflt.ApplicationContext;
import org.eclipse.mdm.api.dflt.EntityManager;
import org.eclipse.mdm.connector.boundary.ConnectorService;
import org.eclipse.mdm.freetextindexer.control.DatabaseLockHandler;
import org.eclipse.mdm.freetextindexer.control.LockListener;
import org.eclipse.mdm.freetextindexer.control.UpdateIndex;
import org.eclipse.mdm.freetextindexer.entities.MDMEntityResponse;
import org.eclipse.mdm.property.GlobalProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This boundary is a back-end Boundary to the openMDM Api. It uses the Seach
* Server to build up MDDocuments.
*
* @author CWE
*
*/
@TransactionAttribute(value = TransactionAttributeType.NOT_SUPPORTED)
@Startup
@Singleton
public class MdmApiBoundary {
private static final Logger LOGGER = LoggerFactory.getLogger(MdmApiBoundary.class);
private static final String FREETEXT_NOTIFICATION_NAME = "freetext.notificationName";
private static final String FREETEXT_SESSION_CHECK_INTERVAL = "freetext.sessionCheckInterval";
public class FreeTextNotificationListener implements NotificationListener {
private EntityManager entityManager;
public FreeTextNotificationListener(EntityManager entityManager) {
this.entityManager = entityManager;
}
@Override
public void instanceCreated(List<? extends Entity> entities, User arg1) {
LOGGER.debug("{} entities created: {}", entities.size(), entities);
entities.forEach(e -> update.change(MDMEntityResponse.build(e.getClass(), e, entityManager)));
}
@Override
public void instanceDeleted(EntityType entityType, List<String> ids, User user) {
LOGGER.debug("{} entities deleted: {}", ids.size(), ids);
ids.forEach(id -> update.delete(getApiName(entityManager), workaroundForTypeMapping(entityType), id));
}
@Override
public void instanceModified(List<? extends Entity> entities, User arg1) {
LOGGER.debug("{} entities modified: {}", entities.size(), entities);
entities.forEach(e -> update.change(MDMEntityResponse.build(e.getClass(), e, entityManager)));
}
@Override
public void modelModified(EntityType arg0, User arg1) {
// not needed
}
@Override
public void securityModified(EntityType entityType, List<String> ids, User user) {
// not needed
}
}
@Inject
UpdateIndex update;
@Resource
ManagedScheduledExecutorService scheduler;
@Inject
@GlobalProperty(value = "freetext.active")
private String active = "false";
ConnectorService connectorService;
@Inject
@GlobalProperty
private Map<String, String> globalProperties = Collections.emptyMap();
private ScheduledFuture<?> scheduledFuture;
@EJB
DatabaseLockHandler databaseLockHandler;
@PostConstruct
public void initalize() {
if (!isActive()) {
return;
}
databaseLockHandler.init("free-text-indexer", new LockListener() {
@Override
public void onLockAcquired() {
LOGGER.info("Acquired database lock, starting fretextindexer.");
Principal principal = new Principal() {
@Override
public String getName() {
return null;
}
};
connectorService = new ConnectorService(principal, globalProperties);
connectorService.connect();
connectorService.getContexts().forEach(MdmApiBoundary.this::initializeContext);
long intervalSeconds = 60L;
try {
Long.parseLong(globalProperties.getOrDefault(FREETEXT_SESSION_CHECK_INTERVAL,
"60"));
} catch (NumberFormatException e) {
LOGGER.warn("Could not parse value for parameter '" +
FREETEXT_SESSION_CHECK_INTERVAL + "'. Using default value '60'.");
}
scheduledFuture = MdmApiBoundary.this.scheduler.scheduleAtFixedRate(
MdmApiBoundary.this::sessionCheck, intervalSeconds, intervalSeconds,
TimeUnit.SECONDS);
LOGGER.info("Initialized session tick timer with interval {}s.",
intervalSeconds);
}
@Override
public void onLockLost() {
deregister();
}
});
databaseLockHandler.requestLock();
}
/**
* Tries in intervals of one minute to acquire the database lock. If the lock
* could be acquired, the onLockAcquired method on the registered listener is
* invoked.
*/
@Schedule(hour = "*", minute = "*/1", persistent = false)
public void checkLock() {
databaseLockHandler.requestLock();
}
private void initializeContext(ApplicationContext context) {
String source = getName(context);
try {
EntityManager entityManager = context.getEntityManager()
.orElseThrow(() -> new ServiceNotProvidedException(EntityManager.class));
NotificationService manager = context.getNotificationService()
.orElseThrow(() -> new ConnectionException("Context has no NotificationManager!"));
String notificationName = context.getParameters().getOrDefault(FREETEXT_NOTIFICATION_NAME, "mdm5");
LOGGER.debug("Registering with name '{}' at source '{}'", notificationName, source);
manager.register(notificationName, new NotificationFilter(),
new FreeTextNotificationListener(entityManager));
LOGGER.info("Successfully registered for new notifications with name '{}' at source '{}!", notificationName,
source);
} catch (ConnectionException | NotificationException e) {
throw new IllegalArgumentException(
"The ODS Server and/or the Notification Service cannot be accessed for source '" + source + "'!",
e);
}
}
public void sessionCheck() {
LOGGER.debug("Session check on {} context(s).", connectorService.getContexts().size());
for (ApplicationContext context : connectorService.getContexts()) {
try {
Optional<EntityManager> em = context.getEntityManager();
if (em.isPresent()) {
em.get().loadLoggedOnUser();
}
} catch (Exception e) {
LOGGER.warn("ApplicationContext seems to be closed. Trying to reconnect.");
try {
Optional<ApplicationContext> c = connectorService.reconnect(context);
initializeContext(c.get());
LOGGER.info("Freetextindexer ApplicationContext reconnected.");
} catch (Exception ex) {
LOGGER.info("Could not reconnect Freetextindexer ApplicationContext!", ex);
}
}
}
}
@PreDestroy
public void deregister() {
if (isActive()) {
scheduledFuture.cancel(true);
for (ApplicationContext context : getContexts().values()) {
try {
context.getNotificationService()
.orElseThrow(() -> new ConnectionException("Context has no NotificationManager!"))
.close(false);
} catch (ConnectionException | NotificationException e) {
throw new IllegalStateException(
"The NotificationManager could not be deregistered. In rare cases, this leads to a missed notification. This means the index might not be up-to-date.");
}
}
connectorService.disconnect();
}
if (databaseLockHandler.hasDatabaseLock()) {
databaseLockHandler.releaseDatabaseLock();
}
}
public void doForAllEntities(Class<? extends Entity> entityClass, ApplicationContext context,
Consumer<? super MDMEntityResponse> executor) {
if (isActive()) {
try {
EntityManager entityManager = context.getEntityManager()
.orElseThrow(() -> new ServiceNotProvidedException(EntityManager.class));
entityManager.loadAll(entityClass).stream().map(e -> this.buildEntity(e, entityManager))
.forEach(executor);
} catch (DataAccessException e) {
throw new IllegalStateException("MDM cannot be querried for new elements. Please check the MDM runtime",
e);
}
}
}
public Map<String, ApplicationContext> getContexts() {
if (connectorService == null) {
return Collections.emptyMap();
}
return connectorService.getContexts().stream().collect(Collectors.toMap(this::getName, Function.identity()));
}
public String getName(ApplicationContext context) {
return context.getEntityManager().map(this::getApiName)
.orElseThrow(() -> new ServiceNotProvidedException(EntityManager.class));
}
private String getApiName(EntityManager entityManager) {
return entityManager.loadEnvironment().getSourceName();
}
private boolean isActive() {
return Boolean.parseBoolean(active);
}
private MDMEntityResponse buildEntity(Entity e, EntityManager entityManager) {
return MDMEntityResponse.build(e.getClass(), e, entityManager);
}
/**
* Simple workaround for naming mismatch between Adapter and Business object
* names.
*
* @param entityType entity type
* @return MDM business object name
*/
private static String workaroundForTypeMapping(EntityType entityType) {
switch (entityType.getName()) {
case "StructureLevel":
return "Pool";
case "MeaResult":
return "Measurement";
case "SubMatrix":
return "ChannelGroup";
case "MeaQuantity":
return "Channel";
default:
return entityType.getName();
}
}
}