| /******************************************************************************** |
| * Copyright (c) 2015-2020 Contributors to the Eclipse Foundation |
| * |
| * See the NOTICE file(s) distributed with this work for additional |
| * information regarding copyright ownership. |
| * |
| * This program and the accompanying materials are made available under the |
| * terms of the Eclipse Public License v. 2.0 which is available at |
| * http://www.eclipse.org/legal/epl-2.0. |
| * |
| * SPDX-License-Identifier: EPL-2.0 |
| * |
| ********************************************************************************/ |
| |
| package org.eclipse.mdm.freetextindexer.control; |
| |
| import java.net.InetAddress; |
| import java.net.UnknownHostException; |
| import java.util.List; |
| import java.util.UUID; |
| |
| import javax.ejb.Stateful; |
| import javax.persistence.EntityManager; |
| import javax.persistence.PersistenceContext; |
| import javax.persistence.Query; |
| |
| import org.eclipse.mdm.freetextindexer.entities.SystemProcess; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * <p> |
| * Connection handler class which manages a connection from the JDBC pool of the |
| * application server. Additional it locks a database table entry on the |
| * 'system_process' table. |
| * </p> |
| * <p> |
| * If the database table 'system_process' does not exist it will automatically |
| * create it. |
| * </p> |
| * <p> |
| * If the locking entry does not exist within the table it will automatically |
| * create it. |
| * </p> |
| * |
| * @author Juergen Kleck, Peak Solution GmbH |
| */ |
| @Stateful |
| public class DatabaseLockHandler { |
| |
| private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseLockHandler.class); |
| |
| // Health check timeout of 3 minutes after which the process can takeover |
| private static final int HEALTH_CHECK_TIMEOUT = 180000; |
| |
| private boolean lockAcquired = false; |
| private boolean initialized = false; |
| private String lockingEntry; |
| private String processId; |
| private LockListener listener; |
| |
| @PersistenceContext(unitName = "openMDM") |
| private EntityManager em; |
| |
| /** |
| * Define the locking entry and the process id |
| * |
| * @param lockingEntry The key of the system_process table which should be |
| * locked |
| */ |
| public void init(String lockingEntry, LockListener listener) { |
| try { |
| this.processId = InetAddress.getLocalHost().getHostName() + "_" + UUID.randomUUID().toString(); |
| } catch (UnknownHostException e) { |
| this.processId = UUID.randomUUID().toString(); |
| } |
| |
| this.lockingEntry = lockingEntry; |
| this.listener = listener; |
| this.initialized = false; |
| } |
| |
| public void requestLock() { |
| // check if we have the lock, otherwise try to get it |
| if (!hasDatabaseLock()) { |
| acquireDatabaseLock(); |
| } |
| if (hasDatabaseLock() && doHealthCheck()) { |
| } else { |
| LOGGER.debug("No database lock could be acquired for this process."); |
| } |
| } |
| |
| /** |
| * Instantiate the database setup |
| */ |
| public void setupDB() { |
| if (!this.initialized) { |
| boolean success = true; |
| |
| // check if the process key exists or create it |
| String query = "SELECT sp FROM SystemProcess sp WHERE sp.processKey = '" + lockingEntry + "'"; |
| List<SystemProcess> processes = em.createQuery(query, SystemProcess.class).getResultList(); |
| if (processes.isEmpty()) { |
| success = insertTableRow(); |
| } |
| this.initialized = success; |
| if (success) { |
| LOGGER.debug("Started database lock handler with process id '{}'", processId); |
| } |
| } |
| } |
| |
| /** |
| * This method updates the last connection time of the system process as |
| * database locks are only active for the transaction lifecycle which end when |
| * leaving the method |
| * |
| * @return true if the database lock is still active |
| */ |
| public boolean doHealthCheck() { |
| try { |
| LOGGER.debug("Health check for lock handler with process id {}", processId); |
| SystemProcess entity = em |
| .createQuery("SELECT sp FROM SystemProcess sp WHERE sp.processKey = '" + lockingEntry + "'", |
| SystemProcess.class) |
| .getSingleResult(); |
| em.refresh(entity); |
| |
| if (this.processId.equals(entity.getProcessId())) { |
| if (!em.isJoinedToTransaction()) { |
| em.joinTransaction(); |
| } |
| entity.setTime(System.currentTimeMillis()); |
| em.merge(entity); |
| em.flush(); |
| LOGGER.trace("Renewed lock for process id {}", processId); |
| } else if (hasDatabaseLock()) { |
| // process id in database has changed, but we still think we have the lock -> |
| // release lock |
| LOGGER.trace("Another process id '{}' has acquired the lock. Health check failed for process id '{}'", |
| entity.getProcessId(), processId); |
| setLockAcquired(false); |
| } |
| } catch (Exception ignored) { |
| LOGGER.error("Exception occurred: {}", ignored); |
| } |
| return hasDatabaseLock(); |
| } |
| |
| /** |
| * This method will try get a logical lock on the database table system_process. |
| * It will check by the process id and the last locked time. The lock will |
| * remain until the application disconnects from the database. In case of a |
| * ungraceful disconnect the locking timeout will apply. |
| * |
| * @return true if the lock has been acquired |
| */ |
| public boolean acquireDatabaseLock() { |
| setupDB(); |
| try { |
| LOGGER.trace( |
| "Trying to acquire the lock for '{}' for process id '{}'.", lockingEntry, processId); |
| |
| SystemProcess entity = em |
| .createQuery("SELECT sp FROM SystemProcess sp WHERE sp.processKey = '" + lockingEntry + "'", |
| SystemProcess.class) |
| .getSingleResult(); |
| em.refresh(entity); |
| |
| // check if we can acquire this entity by: |
| // - different process id |
| // - last health check time is older than defined |
| if (!this.processId.equals(entity.getProcessId()) && System |
| .currentTimeMillis() - HEALTH_CHECK_TIMEOUT > entity.getTime()) { |
| LOGGER.trace( |
| "Another process id '{}' not renewed the lock for {} seconds. Replacing lock with own process id '{}'", |
| entity.getProcessId(), HEALTH_CHECK_TIMEOUT / 1000, processId); |
| if (!em.isJoinedToTransaction()) { |
| em.joinTransaction(); |
| } |
| entity.setProcessId(this.processId); |
| em.merge(entity); |
| em.flush(); |
| |
| em.refresh(entity); |
| entity = em.find(SystemProcess.class, entity.getId()); |
| if (this.processId.equals(entity.getProcessId())) { |
| setLockAcquired(true); |
| } |
| } |
| } catch (Exception e) { |
| LOGGER.error("Exception occurred: ", e); |
| } |
| LOGGER.trace( |
| "Acquired the lock for '{}' for process id '{}': ", lockingEntry, processId, hasDatabaseLock()); |
| return hasDatabaseLock(); |
| } |
| |
| private void setLockAcquired(boolean acquired) { |
| if (acquired) { |
| LOGGER.debug( |
| "Successfully acquired the database lock for this process on the table 'system_process' for the process_key '{}'.", |
| lockingEntry); |
| lockAcquired = true; |
| listener.onLockAcquired(); |
| } else { |
| LOGGER.debug("This process is no longer responsible for the database updates. Deactivating..."); |
| lockAcquired = false; |
| listener.onLockLost(); |
| } |
| } |
| |
| /** |
| * This method will release the database lock on the system_process table for |
| * this locking entry |
| */ |
| public void releaseDatabaseLock() { |
| try { |
| SystemProcess entity = em |
| .createQuery("SELECT sp FROM SystemProcess sp WHERE sp.processKey = '" + lockingEntry + "'", |
| SystemProcess.class) |
| .getSingleResult(); |
| em.refresh(entity); |
| |
| // we can only release ourselves |
| if (this.processId.equals(entity.getProcessId())) { |
| if (!em.isJoinedToTransaction()) { |
| em.joinTransaction(); |
| } |
| entity.setProcessId("-released-"); |
| entity.setTime(0L); |
| em.merge(entity); |
| em.flush(); |
| LOGGER.debug( |
| "Successfully released the database lock for this process on the table 'system_process' for the process_key '{}'.", |
| lockingEntry); |
| } |
| lockAcquired = false; |
| } catch (Exception ignored) { |
| LOGGER.error("Exception occurred: ", ignored); |
| } |
| } |
| |
| /** |
| * Check if the database lock has been applied |
| * |
| * @return true if the lock is applied |
| */ |
| public boolean hasDatabaseLock() { |
| return lockAcquired; |
| } |
| |
| /** |
| * Insert the value into the table if it does not exist yet. |
| * |
| * @return true if the initial table entry has been committed |
| */ |
| private boolean insertTableRow() { |
| try { |
| SystemProcess entity = new SystemProcess(); |
| entity.setId(getHighestId() + 1L); |
| entity.setProcessKey(lockingEntry); |
| entity.setProcessId("-initialized-"); |
| entity.setTime(0L); |
| em.persist(entity); |
| em.flush(); |
| em.refresh(entity); |
| return true; |
| } catch (Throwable e) { |
| LOGGER.error("Failed to insert the data entry.", e); |
| } |
| return false; |
| } |
| |
| /** |
| * Get the highest id of the system process table |
| * |
| * @return the highest id or 0 |
| */ |
| private long getHighestId() { |
| long id = 0L; |
| Query query = em.createQuery("SELECT sp FROM SystemProcess sp ORDER BY sp.id DESC", SystemProcess.class); |
| List<SystemProcess> result = query.getResultList(); |
| if (!result.isEmpty()) { |
| id = result.get(0).getId(); |
| } |
| return id; |
| } |
| |
| } |