blob: 2e84f2ec2877b4ab3740bc53b50530aa2811bd89 [file] [log] [blame]
/********************************************************************************
* Copyright (c) 2015-2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* SPDX-License-Identifier: EPL-2.0
*
********************************************************************************/
package org.eclipse.mdm.api.odsadapter.filetransfer;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.reducing;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.asam.ods.ElemId;
import org.eclipse.mdm.api.base.ServiceNotProvidedException;
import org.eclipse.mdm.api.base.adapter.ModelManager;
import org.eclipse.mdm.api.base.file.FileService;
import org.eclipse.mdm.api.base.model.Entity;
import org.eclipse.mdm.api.base.model.FileLink;
import org.eclipse.mdm.api.odsadapter.ODSContext;
import org.eclipse.mdm.api.odsadapter.query.ODSEntityType;
import org.eclipse.mdm.api.odsadapter.utils.ODSConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* CORBA file service implementation of the {@link FileService} interface.
*
* @since 1.0.0
* @author Viktor Stoehr, Gigatronik Ingolstadt GmbH
*/
public class CORBAFileService implements FileService {
// ======================================================================
// Class variables
// ======================================================================
private static final Logger LOGGER = LoggerFactory.getLogger(CORBAFileService.class);
private static final int THREAD_POOL_SIZE = 5;
// ======================================================================
// Instance variables
// ======================================================================
private final CORBAFileServer fileServer;
private final ModelManager modelManager;
// ======================================================================
// Constructors
// ======================================================================
/**
* Constructor.
*
* @param context Used for {@link Entity} to {@link ElemId} conversion.
* @param transfer The transfer type for up- and downloads.
*/
public CORBAFileService(ODSContext context, Transfer transfer) {
this.modelManager = context.getModelManager()
.orElseThrow(() -> new ServiceNotProvidedException(ModelManager.class));
fileServer = new CORBAFileServer(context, transfer);
}
// ======================================================================
// Public methods
// ======================================================================
/**
* {@inheritDoc}
*/
@Override
public void downloadSequential(Entity entity, Path target, Collection<FileLink> fileLinks,
ProgressListener progressListener) throws IOException {
downloadSequential(toElemID(entity), target, fileLinks, progressListener);
}
private void downloadSequential(ElemId elemId, Path target, Collection<FileLink> fileLinks,
ProgressListener progressListener) throws IOException {
Map<String, List<FileLink>> groups = fileLinks.stream().filter(FileLink::isRemote)
.collect(Collectors.groupingBy(FileLink::getRemotePath));
long totalSize = calculateDownloadSize(elemId, groups);
final AtomicLong transferred = new AtomicLong();
LocalTime start = LocalTime.now();
UUID id = UUID.randomUUID();
LOGGER.debug("Sequential download of {} file(s) with id '{}' started.", groups.size(), id);
for (List<FileLink> group : groups.values()) {
FileLink fileLink = group.get(0);
download(elemId, target, fileLink, (b, p) -> {
double tranferredBytes = transferred.addAndGet(b);
if (progressListener != null) {
progressListener.progress(b, (float) (tranferredBytes / totalSize));
}
});
for (FileLink other : group.subList(1, group.size())) {
other.setLocalStream(fileLink.getLocalStream());
}
}
LOGGER.debug("Sequential download with id '{}' finished in {}.", id, Duration.between(start, LocalTime.now()));
}
/**
* {@inheritDoc}
*/
@Override
public void downloadParallel(Entity entity, Path target, Collection<FileLink> fileLinks,
ProgressListener progressListener) throws IOException {
downloadParallel(toElemID(entity), target, fileLinks, progressListener);
}
private void downloadParallel(ElemId elemId, Path target, Collection<FileLink> fileLinks,
ProgressListener progressListener) throws IOException {
Map<String, List<FileLink>> groups = fileLinks.stream().filter(FileLink::isRemote)
.collect(Collectors.groupingBy(FileLink::getRemotePath));
long totalSize = calculateDownloadSize(elemId, groups);
final AtomicLong transferred = new AtomicLong();
List<Callable<Void>> downloadTasks = new ArrayList<>();
for (List<FileLink> group : groups.values()) {
downloadTasks.add(() -> {
FileLink fileLink = group.get(0);
download(elemId, target, fileLink, (b, p) -> {
double tranferredBytes = transferred.addAndGet(b);
if (progressListener != null) {
progressListener.progress(b, (float) (tranferredBytes / totalSize));
}
});
for (FileLink other : group.subList(1, group.size())) {
other.setLocalStream(fileLink.getLocalStream());
}
return null;
});
}
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
LocalTime start = LocalTime.now();
UUID id = UUID.randomUUID();
LOGGER.debug("Parallel download of {} file(s) with id '{}' started.", groups.size(), id);
try {
List<Throwable> errors = executorService.invokeAll(downloadTasks).stream().map(future -> {
try {
future.get();
return null;
} catch (ExecutionException | InterruptedException e) {
LOGGER.error("Download of failed due to: " + e.getMessage(), e);
return e;
}
}).filter(Objects::nonNull).collect(Collectors.toList());
if (!errors.isEmpty()) {
throw new IOException("Download faild for '" + errors.size() + "' files.");
}
LOGGER.debug("Parallel download with id '{}' finished in {}.", id,
Duration.between(start, LocalTime.now()));
} catch (InterruptedException e) {
throw new IOException("Unable to download files due to: " + e.getMessage(), e);
} finally {
executorService.shutdown();
}
}
/**
* {@inheritDoc}
*/
@Override
public void download(Entity entity, Path target, FileLink fileLink, ProgressListener progressListener)
throws IOException {
download(toElemID(entity), target, fileLink, progressListener);
}
private void download(ElemId elemId, Path target, FileLink fileLink, ProgressListener progressListener)
throws IOException {
if (Files.exists(target)) {
if (!Files.isDirectory(target)) {
throw new IllegalArgumentException("Target path is not a directory.");
}
} else {
Files.createDirectory(target);
}
try (InputStream inputStream = openStream(elemId, fileLink, progressListener)) {
Path absolutePath = target.resolve(fileLink.getFileName()).toAbsolutePath();
String remotePath = fileLink.getRemotePath();
LOGGER.debug("Starting download of file '{}' to '{}'.", remotePath, absolutePath);
LocalTime start = LocalTime.now();
Files.copy(inputStream, absolutePath);
LOGGER.debug("File '{}' successfully downloaded in {} to '{}'.", remotePath,
Duration.between(start, LocalTime.now()), absolutePath);
}
}
/**
* {@inheritDoc}
*/
@Override
public InputStream openStream(Entity entity, FileLink fileLink, ProgressListener progressListener)
throws IOException {
return openStream(toElemID(entity), fileLink, progressListener);
}
private InputStream openStream(ElemId elemId, FileLink fileLink, ProgressListener progressListener)
throws IOException {
InputStream sourceStream;
if (fileLink.isLocal()) {
// file is locally available -> USE this shortcut!
sourceStream = fileLink.getLocalStream();
} else if (fileLink.isRemote()) {
sourceStream = fileServer.openStream(fileLink, elemId);
} else {
throw new IllegalArgumentException("File link is neither in local nor remote state: " + fileLink);
}
// NOTE: Access to immediate input stream is buffered.
if (progressListener != null) {
loadSize(elemId, fileLink);
// NOTE: Progress updates immediately triggered by the stream
// consumer.
return new TracedInputStream(sourceStream, progressListener, fileLink.getSize());
}
return sourceStream;
}
/**
* {@inheritDoc}
*/
@Override
public void loadSize(Entity entity, FileLink fileLink) throws IOException {
loadSize(toElemID(entity), fileLink);
}
private void loadSize(ElemId elemId, FileLink fileLink) throws IOException {
if (fileLink.getSize() > -1) {
// file size is already known
return;
} else if (fileLink.isRemote()) {
fileLink.setFileSize(fileServer.loadSize(fileLink, elemId));
} else {
throw new IllegalArgumentException("File link is neither in local nor remote state: " + fileLink);
}
}
/**
* {@inheritDoc}
*/
@Override
public void uploadSequential(Entity entity, Collection<FileLink> fileLinks, ProgressListener progressListener)
throws IOException {
uploadSequential(toElemID(entity), fileLinks, progressListener);
}
private void uploadSequential(ElemId elemId, Collection<FileLink> fileLinks, ProgressListener progressListener)
throws IOException {
Map<InputStream, List<FileLink>> groups = fileLinks.stream().filter(FileLink::isLocal)
.collect(Collectors.groupingBy(FileLink::getLocalStream));
long totalSize = groups.values().stream().map(l -> l.get(0)).mapToLong(FileLink::getSize).sum();
final AtomicLong transferred = new AtomicLong();
LocalTime start = LocalTime.now();
UUID id = UUID.randomUUID();
LOGGER.debug("Sequential upload of {} file(s) with id '{}' started.", groups.size(), id);
for (List<FileLink> group : groups.values()) {
FileLink fileLink = group.get(0);
upload(elemId, fileLink, (b, p) -> {
double tranferredBytes = transferred.addAndGet(b);
if (progressListener != null) {
progressListener.progress(b, (float) (tranferredBytes / totalSize));
}
});
for (FileLink other : group.subList(1, group.size())) {
other.setRemotePath(fileLink.getRemotePath());
}
}
LOGGER.debug("Sequential upload with id '{}' finished in {}.", id, Duration.between(start, LocalTime.now()));
}
/**
* {@inheritDoc}
*/
@Override
public void uploadParallel(Entity entity, Collection<FileLink> fileLinks, ProgressListener progressListener)
throws IOException {
uploadParallel(toElemID(entity), fileLinks, progressListener);
}
private void uploadParallel(ElemId elemId, Collection<FileLink> fileLinks, ProgressListener progressListener)
throws IOException {
Map<InputStream, List<FileLink>> groups = fileLinks.stream().filter(FileLink::isLocal)
.collect(Collectors.groupingBy(FileLink::getLocalStream));
long totalSize = groups.values().stream().map(l -> l.get(0)).mapToLong(FileLink::getSize).sum();
final AtomicLong transferred = new AtomicLong();
List<Callable<Void>> downloadTasks = new ArrayList<>();
for (List<FileLink> group : groups.values()) {
downloadTasks.add(() -> {
FileLink fileLink = group.get(0);
upload(elemId, fileLink, (b, p) -> {
double tranferredBytes = transferred.addAndGet(b);
if (progressListener != null) {
progressListener.progress(b, (float) (tranferredBytes / totalSize));
}
});
for (FileLink other : group.subList(1, group.size())) {
other.setRemotePath(fileLink.getRemotePath());
}
return null;
});
}
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
LocalTime start = LocalTime.now();
UUID id = UUID.randomUUID();
LOGGER.debug("Parallel upload of {} file(s) with id '{}' started.", groups.size(), id);
try {
List<Throwable> errors = executorService.invokeAll(downloadTasks).stream().map(future -> {
try {
future.get();
return null;
} catch (ExecutionException | InterruptedException e) {
LOGGER.error("Upload of failed due to: " + e.getMessage(), e);
return e;
}
}).filter(Objects::nonNull).collect(Collectors.toList());
if (!errors.isEmpty()) {
throw new IOException("Upload faild for '" + errors.size() + "' files.");
}
LOGGER.debug("Parallel upload with id '{}' finished in {}.", id, Duration.between(start, LocalTime.now()));
} catch (InterruptedException e) {
throw new IOException("Unable to upload files due to: " + e.getMessage(), e);
} finally {
executorService.shutdown();
}
}
/**
* {@inheritDoc}
*/
@Override
public void delete(Entity entity, Collection<FileLink> fileLinks) {
delete(toElemID(entity), fileLinks);
}
private void delete(ElemId elemId, Collection<FileLink> fileLinks) {
fileLinks.stream().filter(FileLink::isRemote)
.collect(groupingBy(FileLink::getRemotePath, reducing((fl1, fl2) -> fl1))).values().stream()
.filter(Optional::isPresent).map(Optional::get).forEach(fl -> delete(elemId, fl));
}
/**
* {@inheritDoc}
*/
@Override
public void delete(Entity entity, FileLink fileLink) {
delete(toElemID(entity), fileLink);
}
private void delete(ElemId elemId, FileLink fileLink) {
if (!fileLink.isRemote()) {
// nothing to do
return;
}
try {
fileServer.delete(fileLink, elemId);
LOGGER.debug("File '{}' sucessfully deleted.", fileLink.getRemotePath());
} catch (IOException e) {
LOGGER.warn("Failed to delete remote file.", e);
}
}
// ======================================================================
// Private methods
// ======================================================================
/**
* Uploads given {@link FileLink}. The upload progress may be traced with a
* progress listener.
*
* @param elemId Used for security checks.
* @param fileLink The {@code FileLink} to upload.
* @param progressListener The progress listener.
* @throws IOException Thrown if unable to upload file.
*/
private void upload(ElemId elemId, FileLink fileLink, ProgressListener progressListener) throws IOException {
if (fileLink.isRemote()) {
// nothing to do
return;
} else if (!fileLink.isLocal()) {
throw new IllegalArgumentException("File link does not have a local path.");
}
InputStream sourceStream = fileLink.getLocalStream();
if (progressListener != null) {
sourceStream = new TracedInputStream(sourceStream, progressListener, fileLink.getSize());
}
LOGGER.debug("Starting upload of file '{}'.", fileLink.getFileName());
LocalTime start = LocalTime.now();
fileServer.uploadStream(sourceStream, fileLink, elemId);
LOGGER.debug("File '{}' successfully uploaded in {} to '{}'.", fileLink.getFileName(),
Duration.between(start, LocalTime.now()), fileLink.getRemotePath());
}
/**
* Creates an ODS entity identity {@link ElemId} object for given
* {@link Entity}.
*
* @param entity The {@code Entity}.
* @return The created {@code ElemId} is returned.
*/
private ElemId toElemID(Entity entity) {
return new ElemId(((ODSEntityType) modelManager.getEntityType(entity)).getODSID(),
ODSConverter.toODSID(entity.getID()));
}
/**
* Calculates the total download size for given {@link FileLink} groups.
*
* @param entity Used for security checks.
* @param groups The {@code FileLink} groups.
* @return The total download size is returned.
* @throws IOException Thrown if unable to load the file size.
*/
private long calculateDownloadSize(ElemId elemId, Map<String, List<FileLink>> groups) throws IOException {
List<FileLink> links = groups.values().stream().map(l -> l.get(0)).collect(Collectors.toList());
long totalSize = 0;
for (FileLink fileLink : links) {
loadSize(elemId, fileLink);
// overflow may occur in case of total size exceeds 9223 PB!
totalSize = Math.addExact(totalSize, fileLink.getSize());
}
return totalSize;
}
}