blob: 056ee1d0924243e081d3df6f7b7d50203adcd57e [file] [log] [blame]
/*
* Copyright (c) 2017 CEA.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* CEA - initial API and implementation
*/
package org.eclipse.sensinact.gateway.agent.storage.internal;
import org.eclipse.sensinact.gateway.common.bundle.Mediator;
import org.eclipse.sensinact.gateway.protocol.http.client.ConnectionConfiguration;
import org.eclipse.sensinact.gateway.protocol.http.client.ConnectionConfigurationImpl;
import org.eclipse.sensinact.gateway.protocol.http.client.Request;
import org.eclipse.sensinact.gateway.protocol.http.client.Response;
import org.eclipse.sensinact.gateway.protocol.http.client.SimpleRequest;
import org.eclipse.sensinact.gateway.protocol.http.client.SimpleResponse;
import org.eclipse.sensinact.gateway.util.crypto.Base64;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* HTTP Agent dedicated to storage service
*
* @author <a href="mailto:christophe.munilla@cea.fr">Christophe Munilla</a>
*/
class StorageConnection {
private static final Logger LOG = LoggerFactory.getLogger(StorageConnection.class);
private String authorization;
private String uri;
private Mediator mediator;
Stack stack;
private boolean running = false;
/**
* Constructor
*
* @param mediator the associated {@link Mediator}
* @param uri the string URI of the storage server
* @param login the user login
* @param password the user password
* @throws IOException Exception on connection problem
*/
StorageConnection(Mediator mediator, String uri, String login, String password) throws IOException {
this.mediator = mediator;
this.uri = uri;
this.authorization = Base64.encodeBytes((login + ":" + password).getBytes());
this.stack = new Stack();
this.running = true;
Runnable runner = new Runnable() {
@Override
public void run() {
LOG.info("POP thread started");
while (running) {
try {
JSONObject element = stack.pop();
if (element != null) {
sendRequest(element);
} else {
if (! shortSleep(200)) {
running = false;
}
}
} catch (Exception e) {
LOG.error("POP thread error", e);
}
}
LOG.info("POP thread terminated");
}
};
new Thread(runner).start();
}
/**
* Executes the HTTP request defined by the method, target, headers and entity
* arguments
*/
private void sendRequest(JSONObject object) {
ConnectionConfiguration<SimpleResponse, SimpleRequest> configuration = new ConnectionConfigurationImpl<>();
try {
configuration.setContentType("application/json");
configuration.setAccept("application/json");
configuration.setUri(this.uri);
configuration.setContent(object.toString());
configuration.setHttpMethod("POST");
configuration.addHeader("Authorization", "Basic " + authorization);
configuration.addHeader("User-Agent", "java/sensiNact-storage");
Request request = new SimpleRequest(configuration);
Response response = request.send();
if (mediator.isDebugLoggable()) {
this.mediator.debug(" >> response status code: " + response.getStatusCode());
}
Iterator<Map.Entry<String, List<String>>> iterator = response.getHeaders().entrySet().iterator();
if (!iterator.hasNext()) {
return;
}
Map.Entry<String, List<String>> entry = iterator.next();
for (; iterator.hasNext(); entry = iterator.next()) {
this.mediator.debug(entry.getKey() + " :: " + (entry.getValue() == null ? "null"
: Arrays.toString(entry.getValue().toArray(new String[0]))));
}
} catch (Exception e) {
LOG.error("Can't send request", e);
if (this.mediator.isErrorLoggable()) {
this.mediator.error(e.getMessage(), e);
}
}
}
void close() {
for (int i=0; i< 10_000 && !this.stack.isEmpty(); i++) {
if (! shortSleep(200)) {
return;
}
}
LOG.info("close operation ended");
this.running = false;
}
private boolean shortSleep(long millis) {
try {
Thread.sleep(millis);
return true;
} catch (InterruptedException e) {
LOG.error("Sleep operation error", e);
Thread.interrupted();
return false;
}
}
}