blob: acc78654bdf477ef8283c67617305e7690df7d6d [file] [log] [blame]
/********************************************************************************
* Copyright (c) 2015-2019 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* SPDX-License-Identifier: EPL-2.0
*
********************************************************************************/
package org.eclipse.mdm.api.odsadapter.filetransfer;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.asam.ods.AoSession;
import org.asam.ods.ElemId;
import org.eclipse.mdm.api.base.ServiceNotProvidedException;
import org.eclipse.mdm.api.base.adapter.ModelManager;
import org.eclipse.mdm.api.base.model.FileLink;
import org.eclipse.mdm.api.odsadapter.ODSContext;
import org.eclipse.mdm.api.odsadapter.query.ODSModelManager;
import org.omg.CORBA.ORB;
import org.omg.CORBA.ORBPackage.InvalidName;
import org.omg.PortableServer.POA;
import org.omg.PortableServer.POAHelper;
import org.omg.PortableServer.POAManagerPackage.AdapterInactive;
import org.omg.PortableServer.POAPackage.ObjectNotActive;
import org.omg.PortableServer.POAPackage.ServantAlreadyActive;
import org.omg.PortableServer.POAPackage.WrongPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.highqsoft.corbafileserver.generated.CORBAFileServerException;
import com.highqsoft.corbafileserver.generated.CORBAFileServerIF;
import com.highqsoft.corbafileserver.generated.DS_BYTEHolder;
import com.highqsoft.corbafileserver.generated.ErrorCode;
import com.highqsoft.corbafileserver.generated.InputStreamIF;
import com.highqsoft.corbafileserver.generated.InputStreamIFPOA;
import com.highqsoft.corbafileserver.generated.SeverityFlag;
/**
* Service provides access to the low level {@link CORBAFileServerIF} file
* service API.
*
* @since 1.0.0
* @author Viktor Stoehr, Gigatronik Ingolstadt GmbH
*/
final class CORBAFileServer {
private static final Logger LOGGER = LoggerFactory.getLogger(CORBAFileServer.class);
private static final int DEFAULT_BUFFER_SIZE = 100_000;
private static final int SOCKET_TIMEOUT = 5_000;
private static final String INTERFACE_NAME_PROPERTY = "org.eclipse.mdm.api.odsadapter.filetransfer.interfaceName";
private static final String HOST_NAME_PROPERTY = "org.eclipse.mdm.api.odsadapter.filetransfer.hostname";
private final CORBAFileServerIF fileServer;
private final AoSession aoSession;
private final ORB orb;
private final Transfer transfer;
private final int bufferSize;
/**
* Constructor.
*
* @param context Used for setup.
* @param transfer The transfer type for up- and downloads.
*/
CORBAFileServer(ODSContext context, Transfer transfer) {
ModelManager mm = context.getModelManager()
.orElseThrow(() -> new ServiceNotProvidedException(ModelManager.class));
if (!(mm instanceof ODSModelManager)) {
throw new IllegalArgumentException("The supplied ModelManager must be an ODSModelManager!");
}
fileServer = context.getFileServer();
aoSession = context.getAoSession();
orb = context.getORB();
this.transfer = transfer;
bufferSize = getBufferSize();
}
/**
* Opens a consumable download {@link InputStream} for given {@link FileLink}.
*
* @param fileLink Used to access the remote path.
* @param elemId Used for security checks.
* @return The consumable {@code InputStream} is returned.
* @throws IOException Thrown if unable to open an the {@code InputStream}.
*/
public InputStream openStream(FileLink fileLink, ElemId elemId) throws IOException {
InputStream inputStream;
if (transfer.isStream()) {
inputStream = new InputStreamAdapter(openSimpleStream(fileLink, elemId));
} else if (transfer.isSocket()) {
inputStream = openSocketStream(fileLink, elemId);
} else {
throw new IllegalStateException("Transfer state '" + transfer + "' is not supported.");
}
return new BufferedInputStream(inputStream, bufferSize);
}
/**
* Uploads given {@link InputStream} for given {@link FileLink}.
*
* @param inputStream The {@code InputStream} to be uploaded.
* @param fileLink The associated {@code FileLink}.
* @param elemId Used for security checks.
* @throws IOException Thrown if unable to upload given {@code InputStream}.
*/
public void uploadStream(InputStream inputStream, FileLink fileLink, ElemId elemId) throws IOException {
String remotePath;
if (transfer.isStream()) {
remotePath = uploadVIAStream(inputStream, fileLink, elemId);
} else if (transfer.isSocket()) {
remotePath = uploadVIASocket(inputStream, fileLink, elemId);
} else {
throw new IllegalStateException("Transfer state '" + transfer + "' is not supported.");
}
fileLink.setRemotePath(remotePath);
}
/**
* Loads the file size for given {@link FileLink}.
*
* @param fileLink The {@code FileLink} whose file size will be loaded.
* @param elemId Used for security checks.
* @return The file size is returned.
* @throws IOException Thrown if unable to load the file size.
*/
public long loadSize(FileLink fileLink, ElemId elemId) throws IOException {
try {
return fileServer.getSizeForInstance(aoSession, fileLink.getRemotePath(), elemId.aid, elemId.iid);
} catch (CORBAFileServerException e) {
throw new IOException("Unable to query file size due to: " + e.reason, e);
}
}
/**
* Deletes the {@link FileLink} from the remote storage.
*
* @param fileLink Will be deleted from the remote storage.
* @param elemId Used for security checks.
* @throws IOException Thrown if unable to delete given {@code FileLink}.
*/
public void delete(FileLink fileLink, ElemId elemId) throws IOException {
try {
fileServer.deleteForInstance(aoSession, fileLink.getRemotePath(), elemId.aid, elemId.iid);
} catch (CORBAFileServerException e) {
throw new IOException("Unable to delete remote file due to: " + e.reason, e);
}
}
/**
* Opens a simple {@link InputStreamIF} using the {@link CORBAFileServerIF}.
*
* @param fileLink Used to access the remote path.
* @param elemId Used for security checks.
* @return The {@code InputStreamIF} is returned.
* @throws IOException Thrown if unable to open the {@code InputStreamIF}.
*/
private InputStreamIF openSimpleStream(FileLink fileLink, ElemId elemId) throws IOException {
try {
return fileServer.readForInstance(aoSession, fileLink.getRemotePath(), elemId.aid, elemId.iid);
} catch (CORBAFileServerException e) {
throw new IOException("Unable to open stream for file transfer due to: " + e.reason, e);
}
}
/**
* Opens a socket {@link InputStream} using the {@link CORBAFileServerIF}.
*
* @param fileLink Used to access the remote path.
* @param elemId Used for security checks.
* @return The {@code InputStream} is returned.
* @throws IOException Thrown if unable to open the socket {@code
* InputStream}.
*/
private InputStream openSocketStream(FileLink fileLink, ElemId elemId) throws IOException {
// auto assigned port with awaiting exactly ONE incoming connection
try (ServerSocket serverSocket = new ServerSocket(0, 1, getInterfaceAddress())) {
serverSocket.setSoTimeout(SOCKET_TIMEOUT * 6);
new Thread(() -> {
try {
/*
* NOTE: Since a socket file transfer registration may block until this server
* socket's accept method is called, the registration is done asynchronously!
*/
fileServer.getForInstanceBySocket(aoSession, fileLink.getRemotePath(), elemId.aid, elemId.iid,
serverSocket.getInetAddress().getHostAddress(), serverSocket.getLocalPort());
} catch (CORBAFileServerException e) {
LOGGER.error("Unable to initialize socket stream, awaiting socket timeout.", e);
}
}).start();
Socket client = serverSocket.accept();
client.setSoTimeout(SOCKET_TIMEOUT);
return client.getInputStream();
}
}
private InetAddress getInterfaceAddress() throws SocketException {
String hostnameProperty = System.getProperty(HOST_NAME_PROPERTY);
if (!StringUtils.isEmpty(hostnameProperty)) {
LOGGER.debug("Using host name '" + hostnameProperty + "' for file transfer.");
try {
return InetAddress.getByName(hostnameProperty);
} catch (UnknownHostException e) {
LOGGER.warn("Specified host name '" + hostnameProperty + "' cannot be used for file transfer.", e);
// continue with lookup by interface name
}
}
String property = System.getProperty(INTERFACE_NAME_PROPERTY);
if (StringUtils.isEmpty(property)) {
LOGGER.debug("Using no specified interface for file transfer, property not set.");
return null;
}
List<NetworkInterface> interfaces = getInterfaceList();
List<NetworkInterface> filteredInterfaces = getFilteredInterfaces(interfaces);
for (NetworkInterface filteredInterface : filteredInterfaces) {
if (filteredInterface.getName().equals(property)
&& filteredInterface.getInetAddresses().hasMoreElements()) {
InetAddress inetAddress = filteredInterface.getInetAddresses().nextElement();
LOGGER.debug("Using interface {} with address {} for file transfer.", filteredInterface.getName(),
inetAddress);
return inetAddress;
}
}
return getFallback(filteredInterfaces);
}
private InetAddress getFallback(List<NetworkInterface> filteredInterfaces) {
if (filteredInterfaces.isEmpty()) {
LOGGER.debug("Using no specified interface for file transfer, property set but no running interface.");
return null;
}
NetworkInterface networkInterface = filteredInterfaces.get(0);
Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses();
if (inetAddresses.hasMoreElements()) {
InetAddress address = inetAddresses.nextElement();
LOGGER.debug("Using interface {} with address {} for file transfer.", networkInterface.getName(), address);
return address;
}
return null;
}
private List<NetworkInterface> getFilteredInterfaces(List<NetworkInterface> interfaces) throws SocketException {
List<NetworkInterface> filteredInterfaces = new ArrayList<>();
for (NetworkInterface anInterface : interfaces) {
if (anInterface.isUp() && !anInterface.isLoopback() && !anInterface.isVirtual()) {
filteredInterfaces.add(anInterface);
}
}
return filteredInterfaces;
}
private List<NetworkInterface> getInterfaceList() throws SocketException {
List<NetworkInterface> result = new ArrayList<>();
Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
while (networkInterfaces.hasMoreElements()) {
result.add(networkInterfaces.nextElement());
}
return result;
}
/**
* Uploads given {@link InputStream} for given {@link FileLink} using the
* {@link CORBAFileServerIF}.
*
* @param inputStream The {@code InputStream} to be uploaded.
* @param fileLink The associated {@code FileLink}.
* @param elemId Used for security checks.
* @return The remote path of the uploaded {@code InputStream} is returned.
* @throws IOException Thrown if unable to upload given {@code InputStream}.
*/
private String uploadVIAStream(InputStream inputStream, FileLink fileLink, ElemId elemId) throws IOException {
try (CORBAInputStreamAdapter stream = new CORBAInputStreamAdapter(orb, inputStream, fileLink.getSize())) {
return fileServer.saveForInstance(aoSession, fileLink.getFileName(), "", elemId.aid, elemId.iid,
stream._this());
} catch (CORBAFileServerException e) {
throw new IOException("Unable to upload file via stream due to: " + e.reason, e);
}
}
/**
* Uploads given {@link InputStream} for given {@link FileLink} via socket
* upload using the {@link CORBAFileServerIF}.
*
* @param inputStream The {@code InputStream} to be uploaded.
* @param fileLink The associated {@code FileLink}.
* @param elemId Used for security checks.
* @return The remote path of the uploaded {@code InputStream} is returned.
* @throws IOException Thrown if unable to upload given {@code InputStream}.
*/
private String uploadVIASocket(InputStream inputStream, FileLink fileLink, ElemId elemId) throws IOException {
// auto assigned port with awaiting exactly ONE incoming connection
try (ServerSocket serverSocket = new ServerSocket(0, 1, getInterfaceAddress())) {
serverSocket.setSoTimeout(SOCKET_TIMEOUT * 6);
new Thread(() -> {
try (Socket client = serverSocket.accept(); OutputStream outputStream = client.getOutputStream()) {
byte[] buffer = new byte[bufferSize];
int length;
while ((length = inputStream.read(buffer)) > -1) {
outputStream.write(buffer, 0, length);
}
} catch (IOException e) {
LOGGER.error("Unable to initialize socket stream, awaiting socket timeout.", e);
}
}).start();
int localPort = serverSocket.getLocalPort();
String localHostName = serverSocket.getInetAddress().getHostAddress();
try {
return fileServer.saveForInstanceBySocket(aoSession, fileLink.getFileName(), "", elemId.aid, elemId.iid,
localHostName, localPort);
} catch (CORBAFileServerException e) {
String message = String.format("Unable to upload file via socket to %s:%d due to: %s", localHostName,
localPort, e.reason);
throw new IOException(message, e);
}
}
}
/**
* Tries to load the buffer size used by the {@link CORBAFileServerIF} to reach
* best performance. In case of errors a default buffer size of of
* {@value #DEFAULT_BUFFER_SIZE} is used.
*
* @return The buffer size is returned.
*/
private int getBufferSize() {
try {
// try to use the same buffer size as the corba file server for best
// performance
return Integer.parseInt(fileServer.getContext(aoSession, "CORBAFileServer.BufferSize"));
} catch (NumberFormatException | CORBAFileServerException e) {
return DEFAULT_BUFFER_SIZE;
}
}
/**
* A simple {@link InputStream} adapter implementation for an
* {@link InputStreamIF}.
*/
private static final class InputStreamAdapter extends InputStream {
private final InputStreamIF inputStream;
/**
* Constructor.
*
* @param inputStream The wrapped {@link InputStreamIF}.
*/
private InputStreamAdapter(InputStreamIF inputStream) {
this.inputStream = inputStream;
}
/**
* {@inheritDoc}
*/
@Override
public int read() throws IOException {
byte[] b = new byte[1];
return read(b) == -1 ? -1 : b[0];
}
/**
* {@inheritDoc}
*/
@Override
public int read(byte[] buffer, int offset, int length) throws IOException {
try {
DS_BYTEHolder byteHolder = new DS_BYTEHolder();
int receivedBytes = inputStream.read(byteHolder, offset, length);
if (receivedBytes > 0) {
System.arraycopy(byteHolder.value, 0, buffer, 0, receivedBytes);
}
return receivedBytes;
} catch (CORBAFileServerException e) {
throw new IOException("Failed to retrieve bytes from CORBA input stream due to: " + e.reason, e);
}
}
/**
* {@inheritDoc}
*/
@Override
public void close() throws IOException {
try {
inputStream.close();
} catch (CORBAFileServerException e) {
throw new IOException("Unable to close CORBA input stream due to: " + e.reason, e);
} finally {
inputStream._release();
}
}
}
// remotely consumable local input stream
/**
* A simple {@link InputStreamIF} adapter implementation for an
* {@link InputStream}.
*/
private static final class CORBAInputStreamAdapter extends InputStreamIFPOA implements AutoCloseable {
private final InputStream inputStream;
private final long length;
private final byte[] objectID;
private final POA poa;
/**
* Constructor.
*
* @param orb Used to access the root {@link POA} to activate this CORBA
* service object.
* @param inputStream The wrapped {@link InputStream}.
* @param length The length of the wrapped {@code InputStream}.
* @throws IOException Thrown on errors.
*/
private CORBAInputStreamAdapter(ORB orb, InputStream inputStream, long length) throws IOException {
this.inputStream = inputStream;
this.length = length;
try {
poa = POAHelper.narrow(orb.resolve_initial_references("RootPOA"));
poa.the_POAManager().activate();
objectID = poa.activate_object(this);
} catch (AdapterInactive | InvalidName | ServantAlreadyActive | WrongPolicy e) {
throw new IOException("Unable to create CORBA input stream.", e);
}
}
/**
* {@inheritDoc}
*/
@Override
public int read(DS_BYTEHolder b, int off, int len) throws CORBAFileServerException {
b.value = new byte[len];
try {
return inputStream.read(b.value, off, len);
} catch (IOException e) {
throw new CORBAFileServerException(ErrorCode.FILESERVER_IO_EXCEPTION, SeverityFlag.ERROR,
e.getMessage());
}
}
/**
* {@inheritDoc}
*/
@Override
public void close() throws CORBAFileServerException {
try {
inputStream.close();
} catch (IOException e) {
throw new CORBAFileServerException(ErrorCode.FILESERVER_IO_EXCEPTION, SeverityFlag.ERROR,
e.getMessage());
} finally {
try {
poa.deactivate_object(objectID);
} catch (ObjectNotActive | WrongPolicy e) {
LOGGER.warn("Unable to deactive CORBA input stream", e);
}
}
}
/**
* {@inheritDoc}
*/
@Override
public int length() throws CORBAFileServerException {
/*
* NOTE: A file length is of type long and therefore, for very large files (>
* 2.14 GB), the exact length is lost due to narrowing conversion!
*/
return (int) length;
}
/**
* {@inheritDoc}
*/
@Override
public void reset() throws CORBAFileServerException {
try {
inputStream.reset();
} catch (IOException e) {
throw new CORBAFileServerException(ErrorCode.FILESERVER_IO_EXCEPTION, SeverityFlag.ERROR,
e.getMessage());
}
}
}
}