blob: 85f51ab688d617566f6dd0cf3a2e711aa42aa59f [file] [log] [blame]
/********************************************************************************
* Copyright (c) 2015-2019 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* SPDX-License-Identifier: EPL-2.0
*
********************************************************************************/
package org.eclipse.mdm.api.odsadapter.notification.avalon;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.asam.ods.T_LONGLONG;
import org.eclipse.mdm.api.base.adapter.EntityType;
import org.eclipse.mdm.api.base.model.ContextComponent;
import org.eclipse.mdm.api.base.model.ContextDescribable;
import org.eclipse.mdm.api.base.model.ContextRoot;
import org.eclipse.mdm.api.base.model.User;
import org.eclipse.mdm.api.base.notification.NotificationException;
import org.eclipse.mdm.api.base.notification.NotificationFilter;
import org.eclipse.mdm.api.base.notification.NotificationFilter.ModificationType;
import org.eclipse.mdm.api.base.notification.NotificationListener;
import org.eclipse.mdm.api.base.notification.NotificationService;
import org.eclipse.mdm.api.base.query.DataAccessException;
import org.eclipse.mdm.api.base.query.QueryService;
import org.eclipse.mdm.api.odsadapter.lookup.config.EntityConfig.Key;
import org.eclipse.mdm.api.odsadapter.notification.NotificationEntityLoader;
import org.eclipse.mdm.api.odsadapter.query.ODSModelManager;
import org.eclipse.mdm.api.odsadapter.utils.ODSConverter;
import org.omg.CORBA.ORB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.highqsoft.avalonCorbaNotification.notification.MODE_DELETE;
import com.highqsoft.avalonCorbaNotification.notification.MODE_INSERT;
import com.highqsoft.avalonCorbaNotification.notification.MODE_MODIFYRIGHTS;
import com.highqsoft.avalonCorbaNotification.notification.MODE_REPLACE;
/**
* Notification manager for handling notifications from the Avalon Notification
* Service
*
* ModificationType.MODEL_MODIFIED is not supported!
*
* @since 1.0.0
* @author Matthias Koller, Peak Solution GmbH
*
*/
public class AvalonNotificationManager implements NotificationService {
private static final Logger LOGGER = LoggerFactory.getLogger(AvalonNotificationManager.class);
private final Map<String, EventProcessor> eventProcessors = new HashMap<>();
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
private final ODSModelManager modelManager;
private final String serviceName;
private final String nameServiceURL;
private long pollingInterval = 500L;
private final NotificationEntityLoader loader;
private final ORB orb = ORB.init(new String[] {}, System.getProperties());
/**
* Creates a new AvalonNotificationManager.
*
* @param modelManager ODSModelManager used to laod entities.
* @param serviceName name of the notification service.
* @param nameServiceURL URL of the name service.
* @param loadContextDescribable if true, notifications for {@link ContextRoot}
* and {@link ContextComponent} will load their
* parent {@link ContextDescribable}.
* @param pollingInterval polling interval in milleseconds
*/
public AvalonNotificationManager(ODSModelManager modelManager, QueryService queryService, String serviceName,
String nameServiceURL, boolean loadContextDescribable, long pollingInterval) {
this.modelManager = modelManager;
this.serviceName = serviceName;
this.nameServiceURL = nameServiceURL;
this.pollingInterval = pollingInterval;
loader = new NotificationEntityLoader(modelManager, queryService, loadContextDescribable);
}
@Override
public void register(String registration, NotificationFilter filter, final NotificationListener listener)
throws NotificationException {
try {
EventProcessor consumer = new EventProcessor(orb, listener, this, nameServiceURL, serviceName);
List<String> aids = filter.getEntityTypes().stream().map(e -> e.getId()).collect(Collectors.toList());
Set<ModificationType> modes = filter.getTypes().stream()
.filter(m -> !ModificationType.MODEL_MODIFIED.equals(m)).collect(Collectors.toSet());
consumer.connect();
consumer.setFilter(aids, modes);
ScheduledFuture<?> future = executor.scheduleAtFixedRate(consumer, 0, pollingInterval,
TimeUnit.MILLISECONDS);
consumer.setFuture(future);
eventProcessors.put(registration, consumer);
} catch (Exception e) {
throw new NotificationException("Exception creating notification listener registration!", e);
}
}
@Override
public void deregister(String registration) {
EventProcessor processor = eventProcessors.get(registration);
if (processor != null) {
processor.disconnect();
eventProcessors.remove(registration);
}
}
@Override
public void close(boolean isDeregisterAll) throws NotificationException {
LOGGER.info("Closing NotificationManager...");
for (String registration : eventProcessors.keySet()) {
LOGGER.debug("Disconnecting registration '" + registration + "'.");
deregister(registration);
}
try {
executor.shutdown();
boolean terminated = executor.awaitTermination(10, TimeUnit.SECONDS);
if (!terminated) {
throw new NotificationException("Could not close all registrations!");
}
} catch (InterruptedException e) {
throw new NotificationException("Could not close all registrations!", e);
}
}
void processException(Exception e) {
LOGGER.error("Exception during notification processing!", e);
}
void processNotification(short mode, T_LONGLONG aeId, T_LONGLONG ieId, T_LONGLONG userId, String timestamp,
NotificationListener notificationListener) {
try {
User user = loader.load(new Key<>(User.class), Long.toString(ODSConverter.fromODSLong(userId)));
LOGGER.debug("User loaded");
EntityType entityType = modelManager.getEntityTypeById(Long.toString(ODSConverter.fromODSLong(aeId)));
List<String> ids = Arrays.asList(Long.toString(ODSConverter.fromODSLong(ieId)));
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Notification event with: entityType=" + entityType + ", user=" + user);
}
switch (mode) {
case MODE_INSERT.value:
notificationListener.instanceCreated(loader.loadEntities(entityType, ids), user);
break;
case MODE_REPLACE.value:
notificationListener.instanceModified(loader.loadEntities(entityType, ids), user);
break;
case MODE_DELETE.value:
notificationListener.instanceDeleted(entityType, ids, user);
break;
case MODE_MODIFYRIGHTS.value:
notificationListener.securityModified(entityType, ids, user);
break;
default:
processException(new NotificationException("Invalid notification type!"));
}
} catch (DataAccessException e) {
processException(new NotificationException("Cannot load data for notification!", e));
}
}
}