| /******************************************************************************** |
| * Copyright (c) 2015-2019 Contributors to the Eclipse Foundation |
| * |
| * See the NOTICE file(s) distributed with this work for additional |
| * information regarding copyright ownership. |
| * |
| * This program and the accompanying materials are made available under the |
| * terms of the Eclipse Public License v. 2.0 which is available at |
| * http://www.eclipse.org/legal/epl-2.0. |
| * |
| * SPDX-License-Identifier: EPL-2.0 |
| * |
| ********************************************************************************/ |
| |
| package org.eclipse.mdm.api.odsadapter.notification.peak; |
| |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| import java.util.stream.Collectors; |
| |
| import javax.ws.rs.client.Client; |
| import javax.ws.rs.client.ClientBuilder; |
| import javax.ws.rs.client.WebTarget; |
| import javax.ws.rs.core.MediaType; |
| import javax.ws.rs.core.Response; |
| import javax.ws.rs.core.Response.Status; |
| |
| import org.eclipse.mdm.api.base.adapter.EntityType; |
| import org.eclipse.mdm.api.base.model.User; |
| import org.eclipse.mdm.api.base.notification.NotificationException; |
| import org.eclipse.mdm.api.base.notification.NotificationFilter; |
| import org.eclipse.mdm.api.base.notification.NotificationListener; |
| import org.eclipse.mdm.api.base.notification.NotificationService; |
| import org.eclipse.mdm.api.base.query.QueryService; |
| import org.eclipse.mdm.api.odsadapter.lookup.config.EntityConfig.Key; |
| import org.eclipse.mdm.api.odsadapter.notification.NotificationEntityLoader; |
| import org.eclipse.mdm.api.odsadapter.query.ODSModelManager; |
| import org.eclipse.mdm.api.odsadapter.utils.ODSUtils; |
| import org.glassfish.jersey.media.sse.EventInput; |
| import org.glassfish.jersey.media.sse.SseFeature; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.peaksolution.ods.notification.protobuf.NotificationProtos.Notification; |
| |
| /** |
| * Notification manager for handling notifications from the Peak ODS Server |
| * Notification Plugin |
| * |
| * @since 1.0.0 |
| * @author Matthias Koller, Peak Solution GmbH |
| * |
| */ |
| public class PeakNotificationManager implements NotificationService { |
| |
| private static final Logger LOGGER = LoggerFactory.getLogger(PeakNotificationManager.class); |
| |
| private final Client client; |
| private final WebTarget endpoint; |
| |
| private final Map<String, EventProcessor> processors = new HashMap<>(); |
| |
| private final ExecutorService executor = Executors.newCachedThreadPool(); |
| |
| private final ODSModelManager modelManager; |
| |
| private final NotificationEntityLoader loader; |
| |
| /** |
| * @param modelManager |
| * @param url URL of the notification plugin |
| * @param loadContextDescribable if true, the corresponding context describable |
| * is loaded if a notification for a context root |
| * or context component is received. |
| * @throws NotificationException Thrown if the manager cannot connect to the |
| * notification server. |
| */ |
| public PeakNotificationManager(ODSModelManager modelManager, QueryService queryService, String url, |
| boolean loadContextDescribable) throws NotificationException { |
| this.modelManager = modelManager; |
| loader = new NotificationEntityLoader(modelManager, queryService, loadContextDescribable); |
| |
| try { |
| client = ClientBuilder.newBuilder().register(SseFeature.class).register(JsonMessageBodyProvider.class) |
| .build(); |
| |
| endpoint = client.target(url).path("events"); |
| |
| Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> reconnect(), 10, 10, TimeUnit.SECONDS); |
| } catch (Exception e) { |
| throw new NotificationException("Could not create " + PeakNotificationManager.class.getName() + "!", e); |
| } |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.eclipse.mdm.api.base.notification.NotificationManager#register(java. |
| * lang.String, org.eclipse.mdm.api.base.notification.NotificationFilter, |
| * org.eclipse.mdm.api.base.notification.NotificationListener) |
| */ |
| @Override |
| public void register(String registration, NotificationFilter filter, NotificationListener listener) |
| throws NotificationException { |
| LOGGER.info("Starting registration for with name: {}", registration); |
| |
| Response response = endpoint.path(registration).request().post( |
| javax.ws.rs.client.Entity.entity(ProtobufConverter.from(filter), MediaType.APPLICATION_JSON_TYPE)); |
| |
| if (response.getStatusInfo().getStatusCode() == Status.CONFLICT.getStatusCode()) { |
| if (LOGGER.isInfoEnabled()) { |
| LOGGER.info("A registration with the name already exists: {}", response.readEntity(String.class)); |
| LOGGER.info("Trying to reregister..."); |
| } |
| deregister(registration); |
| LOGGER.info("Deregisteration successful."); |
| register(registration, filter, listener); |
| return; |
| } |
| |
| if (response.getStatusInfo().getStatusCode() != Status.OK.getStatusCode()) { |
| throw new NotificationException( |
| "Could not create registration at notification service: " + response.readEntity(String.class)); |
| } |
| |
| recreateEventStream(registration, listener); |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see |
| * org.eclipse.mdm.api.base.notification.NotificationManager#deregister(java |
| * .lang.String) |
| */ |
| @Override |
| public void deregister(String registration) { |
| if (processors.containsKey(registration)) { |
| close(registration); |
| } |
| |
| endpoint.path(registration).request().delete(); |
| } |
| |
| @Override |
| public void close(boolean isDeregisterAll) throws NotificationException { |
| LOGGER.info("Closing NotificationManager..."); |
| |
| for (String registration : processors.keySet()) { |
| if (isDeregisterAll) { |
| LOGGER.debug("Deregistering '{}'.", registration); |
| deregister(registration); |
| } else { |
| LOGGER.debug("Disconnecting '{}'.", registration); |
| close(registration); |
| } |
| } |
| } |
| |
| /** |
| * Recreates the {@link EventInput}, attaches an {@link EventProcessor} to it |
| * and stores the {@link EventProcessor} in the processors map. |
| * |
| * @param registration |
| * @param listener |
| * @throws NotificationException |
| */ |
| private void recreateEventStream(String registration, NotificationListener listener) throws NotificationException { |
| try { |
| LOGGER.info("Requesting event input for {}", registration); |
| EventInput eventInput = endpoint.path(registration).request(SseFeature.SERVER_SENT_EVENTS_TYPE) |
| .get(EventInput.class); |
| |
| LOGGER.info("Received event input, starting event processor."); |
| EventProcessor processor = new EventProcessor(eventInput, listener, this, MediaType.APPLICATION_JSON_TYPE); |
| |
| executor.submit(processor); |
| |
| processors.put(registration, processor); |
| LOGGER.info("Event processor started."); |
| } catch (Exception e) { |
| try { |
| deregister(registration); |
| } catch (Exception ex) { |
| LOGGER.error("Exception upon deregistering!"); |
| } |
| throw new NotificationException("Could not create event input stream!", e); |
| } |
| } |
| |
| /** |
| * Checks if any registered event processor got disconnected and tries to |
| * recreate the event stream, if necessary. |
| */ |
| private void reconnect() { |
| |
| for (Map.Entry<String, EventProcessor> entry : processors.entrySet()) { |
| if (entry.getValue().isDisconnected()) { |
| try { |
| LOGGER.trace("Registration '{}' was disconnected and will be recreated.", entry.getKey()); |
| recreateEventStream(entry.getKey(), entry.getValue().getListener()); |
| } catch (NotificationException e) { |
| LOGGER.warn("Cannot recreate event stream for registration " + entry.getKey(), e); |
| } |
| } |
| } |
| } |
| |
| private void close(String registration) { |
| if (processors.containsKey(registration)) { |
| EventProcessor processor = processors.get(registration); |
| processor.stop(); |
| processors.remove(registration); |
| } |
| } |
| |
| /** |
| * Handler for Exceptions during event processing. |
| * |
| * @param e Exception which occured during event processing. |
| */ |
| void processException(Exception e) { |
| LOGGER.error("Exception during notification processing!", e); |
| } |
| |
| /** |
| * Handler for notifications. |
| * |
| * @param n notification to process. |
| * @param notificationListener notification listener for handling the |
| * notification. |
| */ |
| void processNotification(Notification n, NotificationListener notificationListener) { |
| if (LOGGER.isDebugEnabled()) { |
| LOGGER.debug("Processing notification event: " + n); |
| } |
| |
| try { |
| User user = loader.load(new Key<>(User.class), Long.toString(n.getUserId())); |
| |
| EntityType entityType = modelManager.getEntityTypeById(Long.toString(n.getAid())); |
| |
| if (LOGGER.isTraceEnabled()) { |
| LOGGER.trace("Notification event with: entityType=" + entityType + ", user=" + user); |
| } |
| switch (n.getType()) { |
| case NEW: |
| notificationListener.instanceCreated(loader.loadEntities(entityType, n.getIidList().stream() |
| .map(id -> id.toString()).filter(ODSUtils::isValidID).collect(Collectors.toList())), user); |
| break; |
| case MODIFY: |
| notificationListener.instanceModified(loader.loadEntities(entityType, n.getIidList().stream() |
| .map(id -> id.toString()).filter(ODSUtils::isValidID).collect(Collectors.toList())), user); |
| break; |
| case DELETE: |
| notificationListener.instanceDeleted(entityType, n.getIidList().stream().map(id -> id.toString()) |
| .filter(ODSUtils::isValidID).collect(Collectors.toList()), user); |
| break; |
| case MODEL: |
| notificationListener.modelModified(entityType, user); |
| break; |
| case SECURITY: |
| notificationListener.securityModified(entityType, n.getIidList().stream().map(id -> id.toString()) |
| .filter(ODSUtils::isValidID).collect(Collectors.toList()), user); |
| break; |
| default: |
| processException(new NotificationException("Invalid notification type!")); |
| } |
| } catch (Exception e) { |
| processException(new NotificationException("Could not process notification!", e)); |
| } |
| } |
| |
| } |