blob: c4191a4c9a485cd391ce63b0b66f89b5dccfccc2 [file] [log] [blame]
/*
*******************************************************************************
* Copyright (c) 2018 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* SPDX-License-Identifier: EPL-2.0
*******************************************************************************
*/
package org.eclipse.openk.core.bpmn.base.tasks;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.apache.log4j.Logger;
import org.eclipse.openk.PlannedGridMeasuresConfiguration;
import org.eclipse.openk.core.bpmn.base.ProcessException;
import org.eclipse.openk.core.bpmn.base.ProcessSubject;
import org.eclipse.openk.core.controller.BackendConfig;
import org.eclipse.openk.core.exceptions.HttpStatusException;
import org.eclipse.openk.core.messagebroker.Producer;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public abstract class ServiceTask <T extends ProcessSubject> extends BaseTask<T> {
private static final Logger logger = Logger.getLogger(ServiceTask.class.getName());
protected ServiceTask(String description) {
super(description);
}
@Override
public void enterStep(ProcessSubject model) throws ProcessException, HttpStatusException {
logger.debug("Enter: \""+getDescription()+"\"");
this.leaveStep( model );
}
@Override
protected void onEnterStep(ProcessSubject model) throws ProcessException {
// implement empty
}
@Override
protected void onRecover(T model) throws ProcessException, HttpStatusException {
enterStep( model );
}
public Producer createMessageQueueProducer() throws ProcessException {
return new Producer();
}
public void createMessageQueue(String queueName) {
logger.debug("createMessageQueue called");
PlannedGridMeasuresConfiguration.RabbitmqConfiguration rabbitmqConfiguration = BackendConfig.getInstance().getRabbitmqConfiguration();
Map<String, Object> args = new HashMap<>();
args.put("x-queue-mode", "lazy");
ConnectionFactory factory = createRabbitFactory(rabbitmqConfiguration);
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(queueName, true, false, false, args);
String routingKey = queueName.split("-")[1];
channel.queueBind(queueName, rabbitmqConfiguration.getExchangeName(), routingKey);
} catch (TimeoutException | IOException e) {
logger.error("Error in createMessageQueue", e);
}
}
private static ConnectionFactory createRabbitFactory(PlannedGridMeasuresConfiguration.RabbitmqConfiguration rabbitmqConfiguration) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(rabbitmqConfiguration.getHost());
factory.setPassword(rabbitmqConfiguration.getPassword());
factory.setUsername(rabbitmqConfiguration.getUser());
factory.setPort(Integer.parseInt(rabbitmqConfiguration.getPort()));
return factory;
}
}