blob: 5f0a0458e89873fdc5db6ce695edcc1e29ff72fd [file] [log] [blame]
/*
*******************************************************************************
* Copyright (c) 2018 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* SPDX-License-Identifier: EPL-2.0
*******************************************************************************
*/
package org.eclipse.openk.core.messagebroker;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.AMQP.BasicProperties.Builder;
import com.rabbitmq.client.BuiltinExchangeType;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.log4j.Logger;
import org.eclipse.openk.api.GridMeasure;
import org.eclipse.openk.common.JsonGeneratorBase;
import org.eclipse.openk.core.bpmn.base.ProcessException;
import org.eclipse.openk.core.controller.BackendConfig;
public class Producer extends MessageBroker {
private static final Logger LOGGER = Logger.getLogger(Producer.class.getName());
private final String exchangeName;
public Producer() throws ProcessException {
this.exchangeName = BackendConfig.getInstance().getRabbitmqConfiguration().getExchangeName();
}
public void sendMessageAsJson(GridMeasure gridMeasure, String routingKey) throws ProcessException {
BasicProperties basicProperties = new Builder().contentType("application/json").deliveryMode(2).priority(0).build();
String jsonString = JsonGeneratorBase.getGson().toJson(createPayload(gridMeasure));
try {
createExchangeIfNotExists();
createQueueIfNotExists(routingKey);
channel.basicPublish(exchangeName, routingKey, basicProperties, jsonString.getBytes("UTF-8"));
} catch (IOException e) {
LOGGER.error("Error in sendMessageAsJson (RabbitMQ)", e);
throw new ProcessException("Error in sendMessageAsJson (RabbitMQ)");
}
}
private void createExchangeIfNotExists() throws IOException {
String unroutedMessagesExchangeName = BackendConfig.getInstance().getRabbitmqConfiguration().getUnroutedMessagesExchangeName();
Map<String, Object> exchangeArgs = new HashMap<String, Object>();
exchangeArgs.put("alternate-exchange", unroutedMessagesExchangeName);
channel
.exchangeDeclare(BackendConfig.getInstance().getRabbitmqConfiguration().getExchangeName(),
BuiltinExchangeType.DIRECT, true, false, exchangeArgs);
String unroutedMessagesQueueName = BackendConfig.getInstance().getRabbitmqConfiguration().getUnroutedMessagesQueueName();
// durable = true
channel.exchangeDeclare(unroutedMessagesExchangeName, BuiltinExchangeType.FANOUT, true);
channel.queueDeclare(unroutedMessagesQueueName, true, false, false, null);
channel.queueBind(unroutedMessagesQueueName, unroutedMessagesExchangeName, "");
}
private void createQueueIfNotExists(String routingKey) throws IOException {
List<String> queueNamesAsList = BackendConfig.getInstance().getRabbitmqConfiguration().getQueueNamesAsList();
Optional<String> optionalQueuename = queueNamesAsList.stream().filter(it -> it.contains("-" + routingKey)).findFirst();
if (optionalQueuename.isPresent()) {
String queueName = optionalQueuename.get();
Map<String, Object> args = new HashMap<>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare(queueName, true, false, false, args);
channel.queueBind(queueName, exchangeName, routingKey);
}
}
public void sendMessageAsString(String message, String routingKey) throws ProcessException {
BasicProperties basicProperties = new Builder().contentType("text/plain").deliveryMode(2).priority(0).build();
try {
channel.basicPublish(exchangeName, routingKey, basicProperties, message.getBytes());
} catch (IOException e) {
LOGGER.error("Error in sendMessageAsString (RabbitMQ)", e);
throw new ProcessException("Error in sendMessageAsString (RabbitMQ)");
}
}
private GridMeasure createPayload(GridMeasure originalGm) {
GridMeasure payload = new GridMeasure();
payload.setTitle(originalGm.getTitle());
payload.setId(originalGm.getId());
payload.setPlannedStarttimeFirstSinglemeasure(originalGm.getPlannedStarttimeFirstSinglemeasure());
payload.setSwitchingObject(originalGm.getSwitchingObject());
payload.setAffectedResource(originalGm.getAffectedResource());
return payload;
}
}