blob: 0ab1058af881513e2fbf0e6ec7fd7b147f0a0a6a [file] [log] [blame]
/*
* Copyright (c) 2016 Gigatronik Ingolstadt GmbH and others
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*/
package org.eclipse.mdm.api.odsadapter.filetransfer;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.reducing;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.asam.ods.ElemId;
import org.eclipse.mdm.api.base.FileService;
import org.eclipse.mdm.api.base.model.Entity;
import org.eclipse.mdm.api.base.model.FileLink;
import org.eclipse.mdm.api.odsadapter.query.ODSEntityType;
import org.eclipse.mdm.api.odsadapter.query.ODSModelManager;
import org.eclipse.mdm.api.odsadapter.utils.ODSConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* CORBA file service implementation of the {@link FileService} interface.
*
* @since 1.0.0
* @author Viktor Stoehr, Gigatronik Ingolstadt GmbH
*/
public class CORBAFileService implements FileService {
// ======================================================================
// Class variables
// ======================================================================
private static final Logger LOGGER = LoggerFactory.getLogger(CORBAFileService.class);
private static final int THREAD_POOL_SIZE = 5;
// ======================================================================
// Instance variables
// ======================================================================
private final CORBAFileServer fileServer;
private final ODSModelManager modelManager;
// ======================================================================
// Constructors
// ======================================================================
/**
* Constructor.
*
* @param modelManager
* Used for {@link Entity} to {@link ElemId} conversion.
* @param transfer
* The transfer type for up- and downloads.
*/
public CORBAFileService(ODSModelManager modelManager, Transfer transfer) {
this.modelManager = modelManager;
fileServer = new CORBAFileServer(modelManager, transfer);
}
// ======================================================================
// Public methods
// ======================================================================
/**
* {@inheritDoc}
*/
@Override
public void downloadSequential(Entity entity, Path target, Collection<FileLink> fileLinks,
ProgressListener progressListener) throws IOException {
Map<String, List<FileLink>> groups = fileLinks.stream().filter(FileLink::isRemote)
.collect(Collectors.groupingBy(FileLink::getRemotePath));
long totalSize = calculateDownloadSize(entity, groups);
final AtomicLong transferred = new AtomicLong();
LocalTime start = LocalTime.now();
UUID id = UUID.randomUUID();
LOGGER.debug("Sequential download of {} file(s) with id '{}' started.", groups.size(), id);
for (List<FileLink> group : groups.values()) {
FileLink fileLink = group.get(0);
download(entity, target, fileLink, (b, p) -> {
double tranferredBytes = transferred.addAndGet(b);
if (progressListener != null) {
progressListener.progress(b, (float) (tranferredBytes / totalSize));
}
});
for (FileLink other : group.subList(1, group.size())) {
other.setLocalPath(fileLink.getLocalPath());
}
}
LOGGER.debug("Sequential download with id '{}' finished in {}.", id, Duration.between(start, LocalTime.now()));
}
/**
* {@inheritDoc}
*/
@Override
public void downloadParallel(Entity entity, Path target, Collection<FileLink> fileLinks,
ProgressListener progressListener) throws IOException {
Map<String, List<FileLink>> groups = fileLinks.stream().filter(FileLink::isRemote)
.collect(Collectors.groupingBy(FileLink::getRemotePath));
long totalSize = calculateDownloadSize(entity, groups);
final AtomicLong transferred = new AtomicLong();
List<Callable<Void>> downloadTasks = new ArrayList<>();
for (List<FileLink> group : groups.values()) {
downloadTasks.add(() -> {
FileLink fileLink = group.get(0);
download(entity, target, fileLink, (b, p) -> {
double tranferredBytes = transferred.addAndGet(b);
if (progressListener != null) {
progressListener.progress(b, (float) (tranferredBytes / totalSize));
}
});
for (FileLink other : group.subList(1, group.size())) {
other.setLocalPath(fileLink.getLocalPath());
}
return null;
});
}
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
LocalTime start = LocalTime.now();
UUID id = UUID.randomUUID();
LOGGER.debug("Parallel download of {} file(s) with id '{}' started.", groups.size(), id);
try {
List<Throwable> errors = executorService.invokeAll(downloadTasks).stream().map(future -> {
try {
future.get();
return null;
} catch (ExecutionException | InterruptedException e) {
LOGGER.error("Download of failed due to: " + e.getMessage(), e);
return e;
}
}).filter(Objects::nonNull).collect(Collectors.toList());
if (!errors.isEmpty()) {
throw new IOException("Download faild for '" + errors.size() + "' files.");
}
LOGGER.debug("Parallel download with id '{}' finished in {}.", id,
Duration.between(start, LocalTime.now()));
} catch (InterruptedException e) {
throw new IOException("Unable to download files due to: " + e.getMessage(), e);
} finally {
executorService.shutdown();
}
}
/**
* {@inheritDoc}
*/
@Override
public void download(Entity entity, Path target, FileLink fileLink, ProgressListener progressListener)
throws IOException {
if (Files.exists(target)) {
if (!Files.isDirectory(target)) {
throw new IllegalArgumentException("Target path is not a directory.");
}
} else {
Files.createDirectory(target);
}
try (InputStream inputStream = openStream(entity, fileLink, progressListener)) {
fileLink.setLocalPath(target.resolve(fileLink.getFileName()));
Path absolutePath = fileLink.getLocalPath().toAbsolutePath();
String remotePath = fileLink.getRemotePath();
LOGGER.debug("Starting download of file '{}' to '{}'.", remotePath, absolutePath);
LocalTime start = LocalTime.now();
Files.copy(inputStream, fileLink.getLocalPath());
LOGGER.debug("File '{}' successfully downloaded in {} to '{}'.", remotePath,
Duration.between(start, LocalTime.now()), absolutePath);
}
}
/**
* {@inheritDoc}
*/
@Override
public InputStream openStream(Entity entity, FileLink fileLink, ProgressListener progressListener)
throws IOException {
InputStream sourceStream;
if (fileLink.isLocal()) {
// file is locally available -> USE this shortcut!
sourceStream = Files.newInputStream(fileLink.getLocalPath());
} else if (fileLink.isRemote()) {
sourceStream = fileServer.openStream(fileLink, toElemID(entity));
} else {
throw new IllegalArgumentException("File link is neither in local nor remote state: " + fileLink);
}
// NOTE: Access to immediate input stream is buffered.
if (progressListener != null) {
loadSize(entity, fileLink);
// NOTE: Progress updates immediately triggered by the stream
// consumer.
return new TracedInputStream(sourceStream, progressListener, fileLink.getSize());
}
return sourceStream;
}
/**
* {@inheritDoc}
*/
@Override
public void loadSize(Entity entity, FileLink fileLink) throws IOException {
if (fileLink.getSize() > -1) {
// file size is already known
return;
} else if (fileLink.isLocal()) {
fileLink.setFileSize(Files.size(fileLink.getLocalPath()));
} else if (fileLink.isRemote()) {
fileLink.setFileSize(fileServer.loadSize(fileLink, toElemID(entity)));
} else {
throw new IllegalArgumentException("File link is neither in local nor remote state: " + fileLink);
}
}
/**
* Sequential upload of given {@link FileLink}s. Local {@link Path}s linked
* multiple times are uploaded only once. The upload progress may be traced
* with a progress listener.
*
* @param entity
* Used for security checks.
* @param fileLinks
* Collection of {@code FileLink}s to upload.
* @param progressListener
* The progress listener.
* @throws IOException
* Thrown if unable to upload files.
*/
public void uploadSequential(Entity entity, Collection<FileLink> fileLinks, ProgressListener progressListener)
throws IOException {
Map<Path, List<FileLink>> groups = fileLinks.stream().filter(FileLink::isLocal)
.collect(Collectors.groupingBy(FileLink::getLocalPath));
long totalSize = groups.values().stream().map(l -> l.get(0)).mapToLong(FileLink::getSize).sum();
final AtomicLong transferred = new AtomicLong();
LocalTime start = LocalTime.now();
UUID id = UUID.randomUUID();
LOGGER.debug("Sequential upload of {} file(s) with id '{}' started.", groups.size(), id);
for (List<FileLink> group : groups.values()) {
FileLink fileLink = group.get(0);
upload(entity, fileLink, (b, p) -> {
double tranferredBytes = transferred.addAndGet(b);
if (progressListener != null) {
progressListener.progress(b, (float) (tranferredBytes / totalSize));
}
});
for (FileLink other : group.subList(1, group.size())) {
other.setRemotePath(fileLink.getRemotePath());
}
}
LOGGER.debug("Sequential upload with id '{}' finished in {}.", id, Duration.between(start, LocalTime.now()));
}
/**
* Parallel upload of given {@link FileLink}s. Local {@link Path}s linked
* multiple times are uploaded only once. The upload progress may be traced
* with a progress listener.
*
* @param entity
* Used for security checks.
* @param fileLinks
* Collection of {@code FileLink}s to upload.
* @param progressListener
* The progress listener.
* @throws IOException
* Thrown if unable to upload files.
*/
public void uploadParallel(Entity entity, Collection<FileLink> fileLinks, ProgressListener progressListener)
throws IOException {
Map<Path, List<FileLink>> groups = fileLinks.stream().filter(FileLink::isLocal)
.collect(Collectors.groupingBy(FileLink::getLocalPath));
long totalSize = groups.values().stream().map(l -> l.get(0)).mapToLong(FileLink::getSize).sum();
final AtomicLong transferred = new AtomicLong();
List<Callable<Void>> downloadTasks = new ArrayList<>();
for (List<FileLink> group : groups.values()) {
downloadTasks.add(() -> {
FileLink fileLink = group.get(0);
upload(entity, fileLink, (b, p) -> {
double tranferredBytes = transferred.addAndGet(b);
if (progressListener != null) {
progressListener.progress(b, (float) (tranferredBytes / totalSize));
}
});
for (FileLink other : group.subList(1, group.size())) {
other.setRemotePath(fileLink.getRemotePath());
}
return null;
});
}
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
LocalTime start = LocalTime.now();
UUID id = UUID.randomUUID();
LOGGER.debug("Parallel upload of {} file(s) with id '{}' started.", groups.size(), id);
try {
List<Throwable> errors = executorService.invokeAll(downloadTasks).stream().map(future -> {
try {
future.get();
return null;
} catch (ExecutionException | InterruptedException e) {
LOGGER.error("Upload of failed due to: " + e.getMessage(), e);
return e;
}
}).filter(Objects::nonNull).collect(Collectors.toList());
if (!errors.isEmpty()) {
throw new IOException("Upload faild for '" + errors.size() + "' files.");
}
LOGGER.debug("Parallel upload with id '{}' finished in {}.", id, Duration.between(start, LocalTime.now()));
} catch (InterruptedException e) {
throw new IOException("Unable to upload files due to: " + e.getMessage(), e);
} finally {
executorService.shutdown();
}
}
/**
* Deletes given {@link FileLink}s form the remote storage.
*
* @param entity
* Used for security checks.
* @param fileLinks
* Collection of {@code FileLink}s to delete.
*/
public void delete(Entity entity, Collection<FileLink> fileLinks) {
fileLinks.stream().filter(FileLink::isRemote)
.collect(groupingBy(FileLink::getRemotePath, reducing((fl1, fl2) -> fl1))).values().stream()
.filter(Optional::isPresent).map(Optional::get).forEach(fl -> delete(entity, fl));
}
/**
* Deletes given {@link FileLink} form the remote storage.
*
* @param entity
* Used for security checks.
* @param fileLink
* The {@code FileLink}s to delete.
*/
public void delete(Entity entity, FileLink fileLink) {
if (!fileLink.isRemote()) {
// nothing to do
return;
}
try {
fileServer.delete(fileLink, toElemID(entity));
LOGGER.debug("File '{}' sucessfully deleted.", fileLink.getRemotePath());
} catch (IOException e) {
LOGGER.warn("Failed to delete remote file.", e);
}
}
// ======================================================================
// Private methods
// ======================================================================
/**
* Uploads given {@link FileLink}. The upload progress may be traced with a
* progress listener.
*
* @param entity
* Used for security checks.
* @param fileLink
* The {@code FileLink} to upload.
* @param progressListener
* The progress listener.
* @throws IOException
* Thrown if unable to upload file.
*/
private void upload(Entity entity, FileLink fileLink, ProgressListener progressListener) throws IOException {
if (fileLink.isRemote()) {
// nothing to do
return;
} else if (!fileLink.isLocal()) {
throw new IllegalArgumentException("File link does not have a local path.");
}
InputStream sourceStream = Files.newInputStream(fileLink.getLocalPath());
if (progressListener != null) {
sourceStream = new TracedInputStream(sourceStream, progressListener, fileLink.getSize());
}
Path absolutePath = fileLink.getLocalPath().toAbsolutePath();
LOGGER.debug("Starting upload of file '{}'.", absolutePath);
LocalTime start = LocalTime.now();
fileServer.uploadStream(sourceStream, fileLink, toElemID(entity));
LOGGER.debug("File '{}' successfully uploaded in {} to '{}'.", absolutePath,
Duration.between(start, LocalTime.now()), fileLink.getRemotePath());
}
/**
* Creates an ODS entity identity {@link ElemId} object for given
* {@link Entity}.
*
* @param entity
* The {@code Entity}.
* @return The created {@code ElemId} is returned.
*/
private ElemId toElemID(Entity entity) {
return new ElemId(((ODSEntityType) modelManager.getEntityType(entity)).getODSID(),
ODSConverter.toODSID(entity.getID()));
}
/**
* Calculates the total download size for given {@link FileLink} groups.
*
* @param entity
* Used for security checks.
* @param groups
* The {@code FileLink} groups.
* @return The total download size is returned.
* @throws IOException
* Thrown if unable to load the file size.
*/
private long calculateDownloadSize(Entity entity, Map<String, List<FileLink>> groups) throws IOException {
List<FileLink> links = groups.values().stream().map(l -> l.get(0)).collect(Collectors.toList());
long totalSize = 0;
for (FileLink fileLink : links) {
loadSize(entity, fileLink);
// overflow may occur in case of total size exceeds 9223 PB!
totalSize = Math.addExact(totalSize, fileLink.getSize());
}
return totalSize;
}
}