blob: 079de5d47c16e05279cff00738d7e38e7ac4ec3e [file] [log] [blame]
/*
* Copyright (c) 2020 Kentyou.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Kentyou - initial API and implementation
*/
package org.eclipse.sensinact.gateway.mqtt.inst.test;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.sensinact.gateway.util.IOUtils;
import org.json.JSONObject;
import org.junit.Test;
/**
* @author <a href="mailto:christophe.munilla@cea.fr">Christophe Munilla</a>
*/
public class MQTTAgentTest {
//********************************************************************//
// NESTED DECLARATIONS //
//********************************************************************//
//********************************************************************//
// ABSTRACT DECLARATIONS //
//********************************************************************//
//********************************************************************//
// STATIC DECLARATIONS //
//********************************************************************//
private static final int INSTANCES_COUNT = 4;
//********************************************************************//
// INSTANCE DECLARATIONS //
//********************************************************************//
private final List<MidOSGiTestExtended> instances = new ArrayList<MidOSGiTestExtended>();
private final Set<String> events = new HashSet<String>();
/**
* @throws Exception
*/
@Test
public void mqttAgentsTest() throws Throwable {
events.add("[/slider/cursor/position][1]0");
events.add("[/slider/cursor/position][2]0");
events.add("[/slider/cursor/position][3]0");
events.add("[/slider/cursor/position][1]45");
events.add("[/slider/cursor/position][2]45");
events.add("[/slider/cursor/position][3]45");
for (int n = 1; n <= INSTANCES_COUNT; n++) {
MidOSGiTestExtended t = null;
if(n == INSTANCES_COUNT) {
t = new MidOSGiTestExtended(n);
for (int j = n-1; j > 0; j--) {
FileInputStream input = new FileInputStream(new File(String.format("src/test/resources/mqtt.agent.broker-ag%s.cfg", j)));
byte[] content = IOUtils.read(input);
byte[] contentPlus = new byte[content.length + 1];
System.arraycopy(content, 0, contentPlus, 0, content.length);
contentPlus[content.length] = '\n';
FileOutputStream output = new FileOutputStream(new File(String.format("target/felix/conf%s/mqtt.agent.broker-ag%s.cfg", n, j)));
IOUtils.write(contentPlus, output);
output.close();
}
} else {
t = new MidOSGiTestExtendedMQTTBroker(n);
FileInputStream input = new FileInputStream(new File(String.format("src/test/resources/mqtt.server.%s.config", n)));
byte[] content = IOUtils.read(input);
byte[] contentPlus = new byte[content.length + 1];
System.arraycopy(content, 0, contentPlus, 0, content.length);
contentPlus[content.length] = '\n';
FileOutputStream output = new FileOutputStream(new File(String.format("target/felix/conf%s/mqtt.server.cfg", n)));
IOUtils.write(contentPlus, output);
output.close();
}
instances.add(t);
t.init();
Thread.sleep(2 * 1000);
}
Thread.sleep(10 * 1000);
for (int n = 1; n < INSTANCES_COUNT; n++) {
final int C = n;
MqttClient client = null;
if (client!=null && client.isConnected()) {
client.disconnect();
}
client = null;
final String brokerUrl = String.format("%s://%s:%d", "tcp", "127.0.0.1", (1883+n));
client = new MqttClient(brokerUrl, UUID.randomUUID().toString(), new MemoryPersistence());
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
cause.printStackTrace();
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String m = ("["+topic+"]["+C+"]" + message);
events.remove(m);
System.out.println(m);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {}
});
client.connect();
client.subscribe("/slider/cursor/position");
}
instances.get(3).moveSlider(0);
String s = instances.get(3).get("slider", "cursor", "position");
JSONObject j = new JSONObject(s);
assertEquals(0, j.getJSONObject("response").getInt("value"));
Thread.sleep(5000);
instances.get(3).moveSlider(45);
s = instances.get(3).get("slider", "cursor", "position");
j = new JSONObject(s);
assertEquals(45, j.getJSONObject("response").getInt("value"));
Thread.sleep(5000);
assertEquals(0, events.size());
}
}