| /* |
| * Copyright (c) 2016 Gigatronik Ingolstadt GmbH and others |
| * All rights reserved. This program and the accompanying materials |
| * are made available under the terms of the Eclipse Public License v1.0 |
| * which accompanies this distribution, and is available at |
| * http://www.eclipse.org/legal/epl-v10.html |
| */ |
| |
| package org.eclipse.mdm.api.odsadapter.filetransfer; |
| |
| import static java.util.stream.Collectors.groupingBy; |
| import static java.util.stream.Collectors.reducing; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.time.Duration; |
| import java.time.LocalTime; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.UUID; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.stream.Collectors; |
| |
| import org.asam.ods.ElemId; |
| import org.eclipse.mdm.api.base.FileService; |
| import org.eclipse.mdm.api.base.model.Entity; |
| import org.eclipse.mdm.api.base.model.FileLink; |
| import org.eclipse.mdm.api.odsadapter.query.ODSEntityType; |
| import org.eclipse.mdm.api.odsadapter.query.ODSModelManager; |
| import org.eclipse.mdm.api.odsadapter.utils.ODSConverter; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * CORBA file service implementation of the {@link FileService} interface. |
| * |
| * @since 1.0.0 |
| * @author Viktor Stoehr, Gigatronik Ingolstadt GmbH |
| */ |
| public class CORBAFileService implements FileService { |
| |
| // ====================================================================== |
| // Class variables |
| // ====================================================================== |
| |
| private static final Logger LOGGER = LoggerFactory.getLogger(CORBAFileService.class); |
| |
| private static final int THREAD_POOL_SIZE = 5; |
| |
| // ====================================================================== |
| // Instance variables |
| // ====================================================================== |
| |
| private final CORBAFileServer fileServer; |
| private final ODSModelManager modelManager; |
| |
| // ====================================================================== |
| // Constructors |
| // ====================================================================== |
| |
| /** |
| * Constructor. |
| * |
| * @param modelManager |
| * Used for {@link Entity} to {@link ElemId} conversion. |
| * @param transfer |
| * The transfer type for up- and downloads. |
| */ |
| public CORBAFileService(ODSModelManager modelManager, Transfer transfer) { |
| this.modelManager = modelManager; |
| fileServer = new CORBAFileServer(modelManager, transfer); |
| } |
| |
| // ====================================================================== |
| // Public methods |
| // ====================================================================== |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void downloadSequential(Entity entity, Path target, Collection<FileLink> fileLinks, |
| ProgressListener progressListener) throws IOException { |
| Map<String, List<FileLink>> groups = fileLinks.stream().filter(FileLink::isRemote) |
| .collect(Collectors.groupingBy(FileLink::getRemotePath)); |
| |
| long totalSize = calculateDownloadSize(entity, groups); |
| final AtomicLong transferred = new AtomicLong(); |
| LocalTime start = LocalTime.now(); |
| UUID id = UUID.randomUUID(); |
| LOGGER.debug("Sequential download of {} file(s) with id '{}' started.", groups.size(), id); |
| for (List<FileLink> group : groups.values()) { |
| FileLink fileLink = group.get(0); |
| |
| download(entity, target, fileLink, (b, p) -> { |
| double tranferredBytes = transferred.addAndGet(b); |
| if (progressListener != null) { |
| progressListener.progress(b, (float) (tranferredBytes / totalSize)); |
| } |
| }); |
| |
| for (FileLink other : group.subList(1, group.size())) { |
| other.setLocalPath(fileLink.getLocalPath()); |
| } |
| } |
| LOGGER.debug("Sequential download with id '{}' finished in {}.", id, Duration.between(start, LocalTime.now())); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void downloadParallel(Entity entity, Path target, Collection<FileLink> fileLinks, |
| ProgressListener progressListener) throws IOException { |
| Map<String, List<FileLink>> groups = fileLinks.stream().filter(FileLink::isRemote) |
| .collect(Collectors.groupingBy(FileLink::getRemotePath)); |
| |
| long totalSize = calculateDownloadSize(entity, groups); |
| final AtomicLong transferred = new AtomicLong(); |
| List<Callable<Void>> downloadTasks = new ArrayList<>(); |
| for (List<FileLink> group : groups.values()) { |
| downloadTasks.add(() -> { |
| FileLink fileLink = group.get(0); |
| |
| download(entity, target, fileLink, (b, p) -> { |
| double tranferredBytes = transferred.addAndGet(b); |
| if (progressListener != null) { |
| progressListener.progress(b, (float) (tranferredBytes / totalSize)); |
| } |
| }); |
| |
| for (FileLink other : group.subList(1, group.size())) { |
| other.setLocalPath(fileLink.getLocalPath()); |
| } |
| |
| return null; |
| }); |
| } |
| |
| ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE); |
| LocalTime start = LocalTime.now(); |
| UUID id = UUID.randomUUID(); |
| LOGGER.debug("Parallel download of {} file(s) with id '{}' started.", groups.size(), id); |
| try { |
| List<Throwable> errors = executorService.invokeAll(downloadTasks).stream().map(future -> { |
| try { |
| future.get(); |
| return null; |
| } catch (ExecutionException | InterruptedException e) { |
| LOGGER.error("Download of failed due to: " + e.getMessage(), e); |
| return e; |
| } |
| }).filter(Objects::nonNull).collect(Collectors.toList()); |
| |
| if (!errors.isEmpty()) { |
| throw new IOException("Download faild for '" + errors.size() + "' files."); |
| } |
| LOGGER.debug("Parallel download with id '{}' finished in {}.", id, |
| Duration.between(start, LocalTime.now())); |
| } catch (InterruptedException e) { |
| throw new IOException("Unable to download files due to: " + e.getMessage(), e); |
| } finally { |
| executorService.shutdown(); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void download(Entity entity, Path target, FileLink fileLink, ProgressListener progressListener) |
| throws IOException { |
| if (Files.exists(target)) { |
| if (!Files.isDirectory(target)) { |
| throw new IllegalArgumentException("Target path is not a directory."); |
| } |
| } else { |
| Files.createDirectory(target); |
| } |
| |
| try (InputStream inputStream = openStream(entity, fileLink, progressListener)) { |
| fileLink.setLocalPath(target.resolve(fileLink.getFileName())); |
| Path absolutePath = fileLink.getLocalPath().toAbsolutePath(); |
| String remotePath = fileLink.getRemotePath(); |
| LOGGER.debug("Starting download of file '{}' to '{}'.", remotePath, absolutePath); |
| LocalTime start = LocalTime.now(); |
| Files.copy(inputStream, fileLink.getLocalPath()); |
| LOGGER.debug("File '{}' successfully downloaded in {} to '{}'.", remotePath, |
| Duration.between(start, LocalTime.now()), absolutePath); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public InputStream openStream(Entity entity, FileLink fileLink, ProgressListener progressListener) |
| throws IOException { |
| InputStream sourceStream; |
| if (fileLink.isLocal()) { |
| // file is locally available -> USE this shortcut! |
| sourceStream = Files.newInputStream(fileLink.getLocalPath()); |
| } else if (fileLink.isRemote()) { |
| sourceStream = fileServer.openStream(fileLink, toElemID(entity)); |
| } else { |
| throw new IllegalArgumentException("File link is neither in local nor remote state: " + fileLink); |
| } |
| |
| // NOTE: Access to immediate input stream is buffered. |
| if (progressListener != null) { |
| loadSize(entity, fileLink); |
| // NOTE: Progress updates immediately triggered by the stream |
| // consumer. |
| return new TracedInputStream(sourceStream, progressListener, fileLink.getSize()); |
| } |
| |
| return sourceStream; |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void loadSize(Entity entity, FileLink fileLink) throws IOException { |
| if (fileLink.getSize() > -1) { |
| // file size is already known |
| return; |
| } else if (fileLink.isLocal()) { |
| fileLink.setFileSize(Files.size(fileLink.getLocalPath())); |
| } else if (fileLink.isRemote()) { |
| fileLink.setFileSize(fileServer.loadSize(fileLink, toElemID(entity))); |
| } else { |
| throw new IllegalArgumentException("File link is neither in local nor remote state: " + fileLink); |
| } |
| } |
| |
| /** |
| * Sequential upload of given {@link FileLink}s. Local {@link Path}s linked |
| * multiple times are uploaded only once. The upload progress may be traced |
| * with a progress listener. |
| * |
| * @param entity |
| * Used for security checks. |
| * @param fileLinks |
| * Collection of {@code FileLink}s to upload. |
| * @param progressListener |
| * The progress listener. |
| * @throws IOException |
| * Thrown if unable to upload files. |
| */ |
| public void uploadSequential(Entity entity, Collection<FileLink> fileLinks, ProgressListener progressListener) |
| throws IOException { |
| Map<Path, List<FileLink>> groups = fileLinks.stream().filter(FileLink::isLocal) |
| .collect(Collectors.groupingBy(FileLink::getLocalPath)); |
| |
| long totalSize = groups.values().stream().map(l -> l.get(0)).mapToLong(FileLink::getSize).sum(); |
| final AtomicLong transferred = new AtomicLong(); |
| LocalTime start = LocalTime.now(); |
| UUID id = UUID.randomUUID(); |
| LOGGER.debug("Sequential upload of {} file(s) with id '{}' started.", groups.size(), id); |
| for (List<FileLink> group : groups.values()) { |
| FileLink fileLink = group.get(0); |
| |
| upload(entity, fileLink, (b, p) -> { |
| double tranferredBytes = transferred.addAndGet(b); |
| if (progressListener != null) { |
| progressListener.progress(b, (float) (tranferredBytes / totalSize)); |
| } |
| }); |
| |
| for (FileLink other : group.subList(1, group.size())) { |
| other.setRemotePath(fileLink.getRemotePath()); |
| } |
| } |
| LOGGER.debug("Sequential upload with id '{}' finished in {}.", id, Duration.between(start, LocalTime.now())); |
| } |
| |
| /** |
| * Parallel upload of given {@link FileLink}s. Local {@link Path}s linked |
| * multiple times are uploaded only once. The upload progress may be traced |
| * with a progress listener. |
| * |
| * @param entity |
| * Used for security checks. |
| * @param fileLinks |
| * Collection of {@code FileLink}s to upload. |
| * @param progressListener |
| * The progress listener. |
| * @throws IOException |
| * Thrown if unable to upload files. |
| */ |
| public void uploadParallel(Entity entity, Collection<FileLink> fileLinks, ProgressListener progressListener) |
| throws IOException { |
| Map<Path, List<FileLink>> groups = fileLinks.stream().filter(FileLink::isLocal) |
| .collect(Collectors.groupingBy(FileLink::getLocalPath)); |
| |
| long totalSize = groups.values().stream().map(l -> l.get(0)).mapToLong(FileLink::getSize).sum(); |
| final AtomicLong transferred = new AtomicLong(); |
| List<Callable<Void>> downloadTasks = new ArrayList<>(); |
| for (List<FileLink> group : groups.values()) { |
| downloadTasks.add(() -> { |
| FileLink fileLink = group.get(0); |
| |
| upload(entity, fileLink, (b, p) -> { |
| double tranferredBytes = transferred.addAndGet(b); |
| if (progressListener != null) { |
| progressListener.progress(b, (float) (tranferredBytes / totalSize)); |
| } |
| }); |
| |
| for (FileLink other : group.subList(1, group.size())) { |
| other.setRemotePath(fileLink.getRemotePath()); |
| } |
| |
| return null; |
| }); |
| } |
| |
| ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE); |
| LocalTime start = LocalTime.now(); |
| UUID id = UUID.randomUUID(); |
| LOGGER.debug("Parallel upload of {} file(s) with id '{}' started.", groups.size(), id); |
| try { |
| List<Throwable> errors = executorService.invokeAll(downloadTasks).stream().map(future -> { |
| try { |
| future.get(); |
| return null; |
| } catch (ExecutionException | InterruptedException e) { |
| LOGGER.error("Upload of failed due to: " + e.getMessage(), e); |
| return e; |
| } |
| }).filter(Objects::nonNull).collect(Collectors.toList()); |
| |
| if (!errors.isEmpty()) { |
| throw new IOException("Upload faild for '" + errors.size() + "' files."); |
| } |
| LOGGER.debug("Parallel upload with id '{}' finished in {}.", id, Duration.between(start, LocalTime.now())); |
| } catch (InterruptedException e) { |
| throw new IOException("Unable to upload files due to: " + e.getMessage(), e); |
| } finally { |
| executorService.shutdown(); |
| } |
| } |
| |
| /** |
| * Deletes given {@link FileLink}s form the remote storage. |
| * |
| * @param entity |
| * Used for security checks. |
| * @param fileLinks |
| * Collection of {@code FileLink}s to delete. |
| */ |
| public void delete(Entity entity, Collection<FileLink> fileLinks) { |
| fileLinks.stream().filter(FileLink::isRemote) |
| .collect(groupingBy(FileLink::getRemotePath, reducing((fl1, fl2) -> fl1))).values().stream() |
| .filter(Optional::isPresent).map(Optional::get).forEach(fl -> delete(entity, fl)); |
| } |
| |
| /** |
| * Deletes given {@link FileLink} form the remote storage. |
| * |
| * @param entity |
| * Used for security checks. |
| * @param fileLink |
| * The {@code FileLink}s to delete. |
| */ |
| public void delete(Entity entity, FileLink fileLink) { |
| if (!fileLink.isRemote()) { |
| // nothing to do |
| return; |
| } |
| |
| try { |
| fileServer.delete(fileLink, toElemID(entity)); |
| LOGGER.debug("File '{}' sucessfully deleted.", fileLink.getRemotePath()); |
| } catch (IOException e) { |
| LOGGER.warn("Failed to delete remote file.", e); |
| } |
| } |
| |
| // ====================================================================== |
| // Private methods |
| // ====================================================================== |
| |
| /** |
| * Uploads given {@link FileLink}. The upload progress may be traced with a |
| * progress listener. |
| * |
| * @param entity |
| * Used for security checks. |
| * @param fileLink |
| * The {@code FileLink} to upload. |
| * @param progressListener |
| * The progress listener. |
| * @throws IOException |
| * Thrown if unable to upload file. |
| */ |
| private void upload(Entity entity, FileLink fileLink, ProgressListener progressListener) throws IOException { |
| if (fileLink.isRemote()) { |
| // nothing to do |
| return; |
| } else if (!fileLink.isLocal()) { |
| throw new IllegalArgumentException("File link does not have a local path."); |
| } |
| |
| InputStream sourceStream = Files.newInputStream(fileLink.getLocalPath()); |
| if (progressListener != null) { |
| sourceStream = new TracedInputStream(sourceStream, progressListener, fileLink.getSize()); |
| } |
| |
| Path absolutePath = fileLink.getLocalPath().toAbsolutePath(); |
| LOGGER.debug("Starting upload of file '{}'.", absolutePath); |
| LocalTime start = LocalTime.now(); |
| fileServer.uploadStream(sourceStream, fileLink, toElemID(entity)); |
| LOGGER.debug("File '{}' successfully uploaded in {} to '{}'.", absolutePath, |
| Duration.between(start, LocalTime.now()), fileLink.getRemotePath()); |
| } |
| |
| /** |
| * Creates an ODS entity identity {@link ElemId} object for given |
| * {@link Entity}. |
| * |
| * @param entity |
| * The {@code Entity}. |
| * @return The created {@code ElemId} is returned. |
| */ |
| private ElemId toElemID(Entity entity) { |
| return new ElemId(((ODSEntityType) modelManager.getEntityType(entity)).getODSID(), |
| ODSConverter.toODSID(entity.getID())); |
| } |
| |
| /** |
| * Calculates the total download size for given {@link FileLink} groups. |
| * |
| * @param entity |
| * Used for security checks. |
| * @param groups |
| * The {@code FileLink} groups. |
| * @return The total download size is returned. |
| * @throws IOException |
| * Thrown if unable to load the file size. |
| */ |
| private long calculateDownloadSize(Entity entity, Map<String, List<FileLink>> groups) throws IOException { |
| List<FileLink> links = groups.values().stream().map(l -> l.get(0)).collect(Collectors.toList()); |
| long totalSize = 0; |
| for (FileLink fileLink : links) { |
| loadSize(entity, fileLink); |
| // overflow may occur in case of total size exceeds 9223 PB! |
| totalSize = Math.addExact(totalSize, fileLink.getSize()); |
| } |
| |
| return totalSize; |
| } |
| |
| } |