| /* |
| ******************************************************************************* |
| * Copyright (c) 2018 Contributors to the Eclipse Foundation |
| * |
| * See the NOTICE file(s) distributed with this work for additional |
| * information regarding copyright ownership. |
| * |
| * This program and the accompanying materials are made available under the |
| * terms of the Eclipse Public License v. 2.0 which is available at |
| * http://www.eclipse.org/legal/epl-2.0. |
| * |
| * SPDX-License-Identifier: EPL-2.0 |
| ******************************************************************************* |
| */ |
| |
| package org.eclipse.openk.core.bpmn.base.tasks; |
| |
| import com.rabbitmq.client.Channel; |
| import com.rabbitmq.client.Connection; |
| import com.rabbitmq.client.ConnectionFactory; |
| import org.apache.log4j.Logger; |
| import org.eclipse.openk.PlannedGridMeasuresConfiguration; |
| import org.eclipse.openk.core.bpmn.base.ProcessException; |
| import org.eclipse.openk.core.bpmn.base.ProcessSubject; |
| import org.eclipse.openk.core.controller.BackendConfig; |
| import org.eclipse.openk.core.exceptions.HttpStatusException; |
| import org.eclipse.openk.core.messagebroker.Producer; |
| |
| import java.io.IOException; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.concurrent.TimeoutException; |
| |
| public abstract class ServiceTask <T extends ProcessSubject> extends BaseTask<T> { |
| private static final Logger logger = Logger.getLogger(ServiceTask.class.getName()); |
| protected ServiceTask(String description) { |
| super(description); |
| } |
| |
| @Override |
| public void enterStep(ProcessSubject model) throws ProcessException, HttpStatusException { |
| logger.debug("Enter: \""+getDescription()+"\""); |
| this.leaveStep( model ); |
| } |
| |
| @Override |
| protected void onEnterStep(ProcessSubject model) throws ProcessException { |
| // implement empty |
| } |
| |
| @Override |
| protected void onRecover(T model) throws ProcessException, HttpStatusException { |
| enterStep( model ); |
| } |
| |
| public Producer createMessageQueueProducer() throws ProcessException { |
| return new Producer(); |
| } |
| |
| public void createMessageQueue(String queueName) { |
| |
| logger.debug("createMessageQueue called"); |
| PlannedGridMeasuresConfiguration.RabbitmqConfiguration rabbitmqConfiguration = BackendConfig.getInstance().getRabbitmqConfiguration(); |
| Map<String, Object> args = new HashMap<>(); |
| args.put("x-queue-mode", "lazy"); |
| |
| ConnectionFactory factory = createRabbitFactory(rabbitmqConfiguration); |
| try (Connection connection = factory.newConnection(); |
| Channel channel = connection.createChannel()) { |
| channel.queueDeclare(queueName, true, false, false, args); |
| String routingKey = queueName.split("-")[1]; |
| channel.queueBind(queueName, rabbitmqConfiguration.getExchangeName(), routingKey); |
| } catch (TimeoutException | IOException e) { |
| logger.error("Error in createMessageQueue", e); |
| } |
| } |
| |
| |
| private static ConnectionFactory createRabbitFactory(PlannedGridMeasuresConfiguration.RabbitmqConfiguration rabbitmqConfiguration) { |
| ConnectionFactory factory = new ConnectionFactory(); |
| factory.setHost(rabbitmqConfiguration.getHost()); |
| factory.setPassword(rabbitmqConfiguration.getPassword()); |
| factory.setUsername(rabbitmqConfiguration.getUser()); |
| factory.setPort(Integer.parseInt(rabbitmqConfiguration.getPort())); |
| return factory; |
| } |
| } |