blob: 82413bd3f3fd7511038635de4f36f0f2e2dd2761 [file] [log] [blame]
/********************************************************************************
* Copyright (c) 2015-2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* SPDX-License-Identifier: EPL-2.0
*
********************************************************************************/
package org.eclipse.mdm.freetextindexer.control;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.UUID;
import javax.ejb.Stateful;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.Query;
import org.eclipse.mdm.freetextindexer.entities.SystemProcess;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>
* Connection handler class which manages a connection from the JDBC pool of the
* application server. Additional it locks a database table entry on the
* 'system_process' table.
* </p>
* <p>
* If the database table 'system_process' does not exist it will automatically
* create it.
* </p>
* <p>
* If the locking entry does not exist within the table it will automatically
* create it.
* </p>
*
* @author Juergen Kleck, Peak Solution GmbH
*/
@Stateful
public class DatabaseLockHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseLockHandler.class);
// Health check timeout of 3 minutes after which the process can takeover
private static final int HEALTH_CHECK_TIMEOUT = 180000;
private boolean lockAcquired = false;
private boolean initialized = false;
private String lockingEntry;
private String processId;
private LockListener listener;
@PersistenceContext(unitName = "openMDM")
private EntityManager em;
/**
* Define the locking entry and the process id
*
* @param lockingEntry The key of the system_process table which should be
* locked
*/
public void init(String lockingEntry, LockListener listener) {
try {
this.processId = InetAddress.getLocalHost().getHostName() + "_" + UUID.randomUUID().toString();
} catch (UnknownHostException e) {
this.processId = UUID.randomUUID().toString();
}
this.lockingEntry = lockingEntry;
this.listener = listener;
this.initialized = false;
}
public void requestLock() {
// check if we have the lock, otherwise try to get it
if (!hasDatabaseLock()) {
acquireDatabaseLock();
}
if (hasDatabaseLock() && doHealthCheck()) {
} else {
LOGGER.debug("No database lock could be acquired for this process.");
}
}
/**
* Instantiate the database setup
*/
public void setupDB() {
if (!this.initialized) {
boolean success = true;
// check if the process key exists or create it
String query = "SELECT sp FROM SystemProcess sp WHERE sp.processKey = '" + lockingEntry + "'";
List<SystemProcess> processes = em.createQuery(query, SystemProcess.class).getResultList();
if (processes.isEmpty()) {
success = insertTableRow();
}
this.initialized = success;
if (success) {
LOGGER.debug("Started database lock handler with process id '{}'", processId);
}
}
}
/**
* This method updates the last connection time of the system process as
* database locks are only active for the transaction lifecycle which end when
* leaving the method
*
* @return true if the database lock is still active
*/
public boolean doHealthCheck() {
try {
LOGGER.debug("Health check for lock handler with process id {}", processId);
SystemProcess entity = em
.createQuery("SELECT sp FROM SystemProcess sp WHERE sp.processKey = '" + lockingEntry + "'",
SystemProcess.class)
.getSingleResult();
em.refresh(entity);
if (this.processId.equals(entity.getProcessId())) {
if (!em.isJoinedToTransaction()) {
em.joinTransaction();
}
entity.setTime(System.currentTimeMillis());
em.merge(entity);
em.flush();
LOGGER.trace("Renewed lock for process id {}", processId);
} else if (hasDatabaseLock()) {
// process id in database has changed, but we still think we have the lock ->
// release lock
LOGGER.trace("Another process id '{}' has acquired the lock. Health check failed for process id '{}'",
entity.getProcessId(), processId);
setLockAcquired(false);
}
} catch (Exception ignored) {
LOGGER.error("Exception occurred: {}", ignored);
}
return hasDatabaseLock();
}
/**
* This method will try get a logical lock on the database table system_process.
* It will check by the process id and the last locked time. The lock will
* remain until the application disconnects from the database. In case of a
* ungraceful disconnect the locking timeout will apply.
*
* @return true if the lock has been acquired
*/
public boolean acquireDatabaseLock() {
setupDB();
try {
LOGGER.trace(
"Trying to acquire the lock for '{}' for process id '{}'.", lockingEntry, processId);
SystemProcess entity = em
.createQuery("SELECT sp FROM SystemProcess sp WHERE sp.processKey = '" + lockingEntry + "'",
SystemProcess.class)
.getSingleResult();
em.refresh(entity);
// check if we can acquire this entity by:
// - different process id
// - last health check time is older than defined
if (!this.processId.equals(entity.getProcessId()) && System
.currentTimeMillis() - HEALTH_CHECK_TIMEOUT > entity.getTime()) {
LOGGER.trace(
"Another process id '{}' not renewed the lock for {} seconds. Replacing lock with own process id '{}'",
entity.getProcessId(), HEALTH_CHECK_TIMEOUT / 1000, processId);
if (!em.isJoinedToTransaction()) {
em.joinTransaction();
}
entity.setProcessId(this.processId);
em.merge(entity);
em.flush();
em.refresh(entity);
entity = em.find(SystemProcess.class, entity.getId());
if (this.processId.equals(entity.getProcessId())) {
setLockAcquired(true);
}
}
} catch (Exception e) {
LOGGER.error("Exception occurred: ", e);
}
LOGGER.trace(
"Acquired the lock for '{}' for process id '{}': ", lockingEntry, processId, hasDatabaseLock());
return hasDatabaseLock();
}
private void setLockAcquired(boolean acquired) {
if (acquired) {
LOGGER.debug(
"Successfully acquired the database lock for this process on the table 'system_process' for the process_key '{}'.",
lockingEntry);
lockAcquired = true;
listener.onLockAcquired();
} else {
LOGGER.debug("This process is no longer responsible for the database updates. Deactivating...");
lockAcquired = false;
listener.onLockLost();
}
}
/**
* This method will release the database lock on the system_process table for
* this locking entry
*/
public void releaseDatabaseLock() {
try {
SystemProcess entity = em
.createQuery("SELECT sp FROM SystemProcess sp WHERE sp.processKey = '" + lockingEntry + "'",
SystemProcess.class)
.getSingleResult();
em.refresh(entity);
// we can only release ourselves
if (this.processId.equals(entity.getProcessId())) {
if (!em.isJoinedToTransaction()) {
em.joinTransaction();
}
entity.setProcessId("-released-");
entity.setTime(0L);
em.merge(entity);
em.flush();
LOGGER.debug(
"Successfully released the database lock for this process on the table 'system_process' for the process_key '{}'.",
lockingEntry);
}
lockAcquired = false;
} catch (Exception ignored) {
LOGGER.error("Exception occurred: ", ignored);
}
}
/**
* Check if the database lock has been applied
*
* @return true if the lock is applied
*/
public boolean hasDatabaseLock() {
return lockAcquired;
}
/**
* Insert the value into the table if it does not exist yet.
*
* @return true if the initial table entry has been committed
*/
private boolean insertTableRow() {
try {
SystemProcess entity = new SystemProcess();
entity.setId(getHighestId() + 1L);
entity.setProcessKey(lockingEntry);
entity.setProcessId("-initialized-");
entity.setTime(0L);
em.persist(entity);
em.flush();
em.refresh(entity);
return true;
} catch (Throwable e) {
LOGGER.error("Failed to insert the data entry.", e);
}
return false;
}
/**
* Get the highest id of the system process table
*
* @return the highest id or 0
*/
private long getHighestId() {
long id = 0L;
Query query = em.createQuery("SELECT sp FROM SystemProcess sp ORDER BY sp.id DESC", SystemProcess.class);
List<SystemProcess> result = query.getResultList();
if (!result.isEmpty()) {
id = result.get(0).getId();
}
return id;
}
}