| /* |
| ******************************************************************************* |
| * Copyright (c) 2018 Contributors to the Eclipse Foundation |
| * |
| * See the NOTICE file(s) distributed with this work for additional |
| * information regarding copyright ownership. |
| * |
| * This program and the accompanying materials are made available under the |
| * terms of the Eclipse Public License v. 2.0 which is available at |
| * http://www.eclipse.org/legal/epl-2.0. |
| * |
| * SPDX-License-Identifier: EPL-2.0 |
| ******************************************************************************* |
| */ |
| |
| |
| package org.eclipse.openk.core.messagebroker; |
| |
| import com.rabbitmq.client.AMQP.BasicProperties; |
| import com.rabbitmq.client.AMQP.BasicProperties.Builder; |
| import com.rabbitmq.client.BuiltinExchangeType; |
| import java.io.IOException; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import org.apache.log4j.Logger; |
| import org.eclipse.openk.api.GridMeasure; |
| import org.eclipse.openk.common.JsonGeneratorBase; |
| import org.eclipse.openk.core.bpmn.base.ProcessException; |
| import org.eclipse.openk.core.controller.BackendConfig; |
| |
| public class Producer extends MessageBroker { |
| |
| private static final Logger LOGGER = Logger.getLogger(Producer.class.getName()); |
| |
| private final String exchangeName; |
| |
| public Producer() throws ProcessException { |
| this.exchangeName = BackendConfig.getInstance().getRabbitmqConfiguration().getExchangeName(); |
| } |
| |
| public void sendMessageAsJson(GridMeasure gridMeasure, String routingKey) throws ProcessException { |
| BasicProperties basicProperties = new Builder().contentType("application/json").deliveryMode(2).priority(0).build(); |
| String jsonString = JsonGeneratorBase.getGson().toJson(createPayload(gridMeasure)); |
| try { |
| |
| createExchangeIfNotExists(); |
| createQueueIfNotExists(routingKey); |
| channel.basicPublish(exchangeName, routingKey, basicProperties, jsonString.getBytes("UTF-8")); |
| } catch (IOException e) { |
| LOGGER.error("Error in sendMessageAsJson (RabbitMQ)", e); |
| throw new ProcessException("Error in sendMessageAsJson (RabbitMQ)"); |
| } |
| } |
| |
| private void createExchangeIfNotExists() throws IOException { |
| String unroutedMessagesExchangeName = BackendConfig.getInstance().getRabbitmqConfiguration().getUnroutedMessagesExchangeName(); |
| Map<String, Object> exchangeArgs = new HashMap<String, Object>(); |
| exchangeArgs.put("alternate-exchange", unroutedMessagesExchangeName); |
| channel |
| .exchangeDeclare(BackendConfig.getInstance().getRabbitmqConfiguration().getExchangeName(), |
| BuiltinExchangeType.DIRECT, true, false, exchangeArgs); |
| |
| String unroutedMessagesQueueName = BackendConfig.getInstance().getRabbitmqConfiguration().getUnroutedMessagesQueueName(); |
| // durable = true |
| channel.exchangeDeclare(unroutedMessagesExchangeName, BuiltinExchangeType.FANOUT, true); |
| channel.queueDeclare(unroutedMessagesQueueName, true, false, false, null); |
| channel.queueBind(unroutedMessagesQueueName, unroutedMessagesExchangeName, ""); |
| } |
| |
| private void createQueueIfNotExists(String routingKey) throws IOException { |
| List<String> queueNamesAsList = BackendConfig.getInstance().getRabbitmqConfiguration().getQueueNamesAsList(); |
| Optional<String> optionalQueuename = queueNamesAsList.stream().filter(it -> it.contains("-" + routingKey)).findFirst(); |
| if (optionalQueuename.isPresent()) { |
| String queueName = optionalQueuename.get(); |
| Map<String, Object> args = new HashMap<>(); |
| args.put("x-queue-mode", "lazy"); |
| channel.queueDeclare(queueName, true, false, false, args); |
| channel.queueBind(queueName, exchangeName, routingKey); |
| } |
| } |
| |
| public void sendMessageAsString(String message, String routingKey) throws ProcessException { |
| BasicProperties basicProperties = new Builder().contentType("text/plain").deliveryMode(2).priority(0).build(); |
| try { |
| channel.basicPublish(exchangeName, routingKey, basicProperties, message.getBytes()); |
| } catch (IOException e) { |
| LOGGER.error("Error in sendMessageAsString (RabbitMQ)", e); |
| throw new ProcessException("Error in sendMessageAsString (RabbitMQ)"); |
| } |
| } |
| |
| private GridMeasure createPayload(GridMeasure originalGm) { |
| GridMeasure payload = new GridMeasure(); |
| payload.setTitle(originalGm.getTitle()); |
| payload.setId(originalGm.getId()); |
| payload.setPlannedStarttimeFirstSinglemeasure(originalGm.getPlannedStarttimeFirstSinglemeasure()); |
| payload.setSwitchingObject(originalGm.getSwitchingObject()); |
| payload.setAffectedResource(originalGm.getAffectedResource()); |
| return payload; |
| } |
| } |