| /******************************************************************************** |
| * Copyright (c) 2015-2018 Contributors to the Eclipse Foundation |
| * |
| * See the NOTICE file(s) distributed with this work for additional |
| * information regarding copyright ownership. |
| * |
| * This program and the accompanying materials are made available under the |
| * terms of the Eclipse Public License v. 2.0 which is available at |
| * http://www.eclipse.org/legal/epl-2.0. |
| * |
| * SPDX-License-Identifier: EPL-2.0 |
| * |
| ********************************************************************************/ |
| |
| package org.eclipse.mdm.api.atfxadapter.transaction; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.function.Function; |
| import java.util.stream.Collectors; |
| |
| import org.asam.ods.AoException; |
| import org.eclipse.mdm.api.atfxadapter.ATFXContext; |
| import org.eclipse.mdm.api.base.Transaction; |
| import org.eclipse.mdm.api.base.adapter.Core; |
| import org.eclipse.mdm.api.base.adapter.EntityType; |
| import org.eclipse.mdm.api.base.massdata.WriteRequest; |
| import org.eclipse.mdm.api.base.model.Channel; |
| import org.eclipse.mdm.api.base.model.ContextRoot; |
| import org.eclipse.mdm.api.base.model.Deletable; |
| import org.eclipse.mdm.api.base.model.Entity; |
| import org.eclipse.mdm.api.base.model.Measurement; |
| import org.eclipse.mdm.api.base.model.ScalarType; |
| import org.eclipse.mdm.api.base.model.TestStep; |
| import org.eclipse.mdm.api.base.model.Value; |
| import org.eclipse.mdm.api.base.query.DataAccessException; |
| import org.eclipse.mdm.api.dflt.model.CatalogAttribute; |
| import org.eclipse.mdm.api.dflt.model.CatalogComponent; |
| import org.eclipse.mdm.api.dflt.model.CatalogSensor; |
| import org.eclipse.mdm.api.odsadapter.ODSContext; |
| import org.eclipse.mdm.api.odsadapter.query.ODSModelManager; |
| import org.eclipse.mdm.api.odsadapter.utils.ODSUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * ODS implementation of the {@link Transaction} interface. |
| * |
| * @see org.eclipse.mdm.api.odsadapter.transaction.ODSTransaction |
| */ |
| public final class ATFXTransaction implements Transaction { |
| |
| // TODO: it should be possible to a attach a progress listener |
| // -> progress notification updates while uploading files |
| // -> any other useful informations?! |
| // -> splitting of tasks into subtasks may be required... |
| |
| private static final Logger LOGGER = LoggerFactory.getLogger(ATFXTransaction.class); |
| |
| // this one is used to access the application model and execute queries |
| // instance is decoupled from its parent |
| private final ATFXContext context; |
| |
| // only for logging |
| private final String id = UUID.randomUUID().toString(); |
| |
| // need to write version == instanceID -> update after create |
| private final List<ContextRoot> contextRoots = new ArrayList<>(); |
| |
| // reset instance IDs on abort |
| private final List<Core> created = new ArrayList<>(); |
| |
| // apply changes |
| private final List<Core> modified = new ArrayList<>(); |
| |
| private CatalogManager catalogManager; |
| |
| /** |
| * Constructor. |
| * |
| * @param parentModelManager Used to access the persistence. |
| * @param entity Used for security checks |
| * @param transfer The file transfer type. |
| * @throws AoException Thrown if unable to start a co-session. |
| */ |
| public ATFXTransaction(ATFXContext context) throws AoException { |
| this.context = context; |
| this.context.getAoSession().startTransaction(); |
| LOGGER.debug("Transaction '{}' started.", id); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| @SuppressWarnings("unchecked") |
| public <T extends Entity> void create(Collection<T> entities) throws DataAccessException { |
| if (entities.isEmpty()) { |
| return; |
| } else if (entities.stream().filter(e -> ODSUtils.isValidID(e.getID())).findAny().isPresent()) { |
| throw new IllegalArgumentException("At least one given entity is already persisted."); |
| } |
| |
| try { |
| Map<Class<?>, List<T>> entitiesByClassType = entities.stream() |
| .collect(Collectors.groupingBy(e -> e.getClass())); |
| |
| List<CatalogComponent> catalogComponents = (List<CatalogComponent>) entitiesByClassType |
| .get(CatalogComponent.class); |
| if (catalogComponents != null) { |
| getCatalogManager().createCatalogComponents(catalogComponents); |
| } |
| |
| List<CatalogSensor> catalogSensors = (List<CatalogSensor>) entitiesByClassType.get(CatalogSensor.class); |
| if (catalogSensors != null) { |
| // TODO anehmer on 2017-11-16: avalon 4.3b throws an exception in |
| // AoSession.commitTransaction() if multiple |
| // catalog sensors have been deleted and leaves the application |
| // model in a broken state. This is also stated in the documentation. This |
| // comment should be removed later. |
| getCatalogManager().createCatalogSensors(catalogSensors); |
| } |
| |
| List<CatalogAttribute> catalogAttributes = (List<CatalogAttribute>) entitiesByClassType |
| .get(CatalogAttribute.class); |
| if (catalogAttributes != null) { |
| getCatalogManager().createCatalogAttributes(catalogAttributes); |
| } |
| |
| List<TestStep> testSteps = (List<TestStep>) entitiesByClassType.get(TestStep.class); |
| if (testSteps != null) { |
| create(testSteps.stream().map(ContextRoot::of).collect(ArrayList::new, List::addAll, List::addAll)); |
| } |
| |
| List<Measurement> measurements = (List<Measurement>) entitiesByClassType.get(Measurement.class); |
| if (measurements != null) { |
| // Use set here, since measurement siblings point to the same |
| // context roots. Only create those ContextRoots that haven't been created yet: |
| create(measurements.stream().map(ContextRoot::of) |
| .collect(HashSet<ContextRoot>::new, Set<ContextRoot>::addAll, Set<ContextRoot>::addAll).stream() |
| .filter(cr -> !ODSUtils.isValidID(cr.getID())).collect(Collectors.toSet())); |
| } |
| |
| executeStatements(et -> new InsertStatement(this, et), entities); |
| |
| List<ContextRoot> roots = (List<ContextRoot>) entitiesByClassType.get(ContextRoot.class); |
| if (roots != null) { |
| roots.forEach(contextRoot -> { |
| contextRoot.setVersion(contextRoot.getID().toString()); |
| }); |
| |
| // this will restore the ASAM path of each context root |
| executeStatements(et -> new UpdateStatement(this, et, true), roots); |
| contextRoots.addAll(roots); |
| } |
| } catch (AoException e) { |
| throw new DataAccessException("Unable to write new entities due to: " + e.reason, e); |
| } catch (IOException e) { |
| throw new DataAccessException("Unable to write new entities due to: " + e.getMessage(), e); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| @SuppressWarnings("unchecked") |
| public <T extends Entity> void update(Collection<T> entities) throws DataAccessException { |
| if (entities.isEmpty()) { |
| return; |
| } else if (entities.stream().filter(e -> !ODSUtils.isValidID(e.getID())).findAny().isPresent()) { |
| throw new IllegalArgumentException("At least one given entity is not yet persisted."); |
| } |
| |
| try { |
| Map<Class<?>, List<T>> entitiesByClassType = entities.stream() |
| .collect(Collectors.groupingBy(e -> e.getClass())); |
| List<CatalogAttribute> catalogAttributes = (List<CatalogAttribute>) entitiesByClassType |
| .get(CatalogAttribute.class); |
| if (catalogAttributes != null) { |
| getCatalogManager().updateCatalogAttributes(catalogAttributes); |
| } |
| |
| executeStatements(et -> new UpdateStatement(this, et, false), entities); |
| } catch (AoException e) { |
| throw new DataAccessException("Unable to update entities due to: " + e.reason, e); |
| } catch (IOException e) { |
| throw new DataAccessException("Unable to update entities due to: " + e.getMessage(), e); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| @SuppressWarnings("unchecked") |
| public <T extends Deletable> void delete(Collection<T> entities) throws DataAccessException { |
| if (entities.isEmpty()) { |
| return; |
| } |
| |
| List<T> filteredEntities = entities.stream().filter(e -> ODSUtils.isValidID(e.getID())) |
| .collect(Collectors.toList()); |
| |
| try { |
| Map<Class<?>, List<T>> entitiesByClassType = filteredEntities.stream() |
| .collect(Collectors.groupingBy(e -> e.getClass())); |
| |
| List<CatalogComponent> catalogComponents = (List<CatalogComponent>) entitiesByClassType |
| .get(CatalogComponent.class); |
| if (catalogComponents != null) { |
| getCatalogManager().deleteCatalogComponents(catalogComponents); |
| } |
| |
| List<CatalogSensor> catalogSensors = (List<CatalogSensor>) entitiesByClassType.get(CatalogSensor.class); |
| if (catalogSensors != null) { |
| // TODO anehmer on 2017-11-16: avalon 4.3b throws an exception in |
| // AoSession.commitTransaction() if multiple |
| // catalog sensors have been deleted and leaves the application |
| // model in a broken state. This is also stated in the documentation. This |
| // comment should be removed later. |
| getCatalogManager().deleteCatalogSensors(catalogSensors); |
| } |
| |
| List<CatalogAttribute> catalogAttributes = (List<CatalogAttribute>) entitiesByClassType |
| .get(CatalogAttribute.class); |
| if (catalogAttributes != null) { |
| getCatalogManager().deleteCatalogAttributes(catalogAttributes); |
| } |
| |
| /* |
| * TODO: for any template that has to be deleted it is required to ensure there |
| * are no links to it... |
| */ |
| |
| executeStatements(et -> new DeleteStatement(this, et, true), filteredEntities); |
| } catch (AoException e) { |
| throw new DataAccessException("Unable to delete entities due to: " + e.reason, e); |
| } catch (IOException e) { |
| throw new DataAccessException("Unable to delete entities due to: " + e.getMessage(), e); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void writeMeasuredValues(Collection<WriteRequest> writeRequests) throws DataAccessException { |
| if (writeRequests.isEmpty()) { |
| return; |
| } |
| |
| try { |
| Map<ScalarType, List<WriteRequest>> writeRequestsByRawType = writeRequests.stream() |
| .collect(Collectors.groupingBy(WriteRequest::getRawScalarType)); |
| |
| for (List<WriteRequest> writeRequestGroup : writeRequestsByRawType.values()) { |
| WriteRequestHandler writeRequestHandler = new WriteRequestHandler(this); |
| List<Channel> channels = new ArrayList<>(); |
| |
| for (WriteRequest writeRequest : writeRequestGroup) { |
| Channel channel = writeRequest.getChannel(); |
| channel.setScalarType(writeRequest.getCalculatedScalarType()); |
| // TODO it might be required to change relation to another |
| // unit?!?? |
| channels.add(channel); |
| writeRequestHandler.addRequest(writeRequest); |
| } |
| |
| update(channels); |
| writeRequestHandler.execute(); |
| } |
| } catch (AoException e) { |
| throw new DataAccessException("Unable to write measured values due to: " + e.reason, e); |
| } catch (IOException e) { |
| throw new DataAccessException("Unable to write measured values due to: " + e.getMessage(), e); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void commit() throws DataAccessException { |
| try { |
| context.getAoSession().commitTransaction(); |
| |
| // commit succeed -> apply changes in entity cores |
| modified.forEach(Core::apply); |
| |
| if (catalogManager != null) { |
| // application model has been modified -> reload |
| context.getODSModelManager().reloadApplicationModel(); |
| } |
| |
| LOGGER.debug("Transaction '{}' committed.", id); |
| } catch (AoException e) { |
| throw new DataAccessException("Unable to commit transaction '" + id + "' due to: " + e.reason, e); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void abort() { |
| try { |
| // reset version, since creation failed or was aborted |
| contextRoots.forEach(cr -> cr.setVersion(null)); |
| |
| // reset instance IDs |
| String virtualID = "0"; |
| created.forEach(c -> c.setID(virtualID)); |
| |
| context.getAoSession().abortTransaction(); |
| |
| LOGGER.debug("Transaction '{}' aborted.", id); |
| } catch (AoException e) { |
| LOGGER.error("Unable to abort transaction '" + id + "' due to: " + e.reason, e); |
| } |
| } |
| |
| /** |
| * Once {@link #abort()} is called instance ID of given {@link Core} will be |
| * reset to {@code 0} which indicates a virtual {@link Entity}, not yet |
| * persisted, entity. |
| * |
| * @param core The {@code Core} of a newly written {@code Entity}. |
| */ |
| void addCreated(Core core) { |
| created.add(core); |
| } |
| |
| /** |
| * Once {@link #commit()} is {@link Core#apply()} will be called to apply |
| * modified {@link Value} contents and removed related entities. |
| * |
| * @param core The {@code Core} of an updated {@code Entity}. |
| */ |
| void addModified(Core core) { |
| modified.add(core); |
| } |
| |
| /** |
| * Returns the {@link ODSContext}. |
| * |
| * @return The {@code ODSContext} is returned. |
| */ |
| ATFXContext getContext() { |
| return context; |
| } |
| |
| /** |
| * Returns the {@link ODSModelManager}. |
| * |
| * @return The {@code ODSModelManager} is returned. |
| */ |
| ODSModelManager getModelManager() { |
| return context.getODSModelManager(); |
| } |
| |
| /** |
| * Returns the {@link CatalogManager}. |
| * |
| * @return The {@code CatalogManager} is returned. |
| */ |
| private CatalogManager getCatalogManager() { |
| if (catalogManager == null) { |
| catalogManager = new CatalogManager(this); |
| } |
| |
| return catalogManager; |
| } |
| |
| /** |
| * Executes statements for given entities by using given statement factory. |
| * |
| * @param <T> The entity type. |
| * @param statementFactory Used to create a new statement for a given |
| * {@link EntityType}. |
| * @param entities The processed {@code Entity}s. |
| * @throws AoException Thrown if the execution fails. |
| * @throws DataAccessException Thrown if the execution fails. |
| * @throws IOException Thrown if a file transfer operation fails. |
| */ |
| private <T extends Entity> void executeStatements(Function<EntityType, BaseStatement> statementFactory, |
| Collection<T> entities) throws AoException, DataAccessException, IOException { |
| Map<EntityType, List<Entity>> entitiesByType = entities.stream() |
| .collect(Collectors.groupingBy(context.getODSModelManager()::getEntityType)); |
| for (Entry<EntityType, List<Entity>> entry : entitiesByType.entrySet()) { |
| statementFactory.apply(entry.getKey()).execute(entry.getValue()); |
| } |
| } |
| } |