blob: 2bea986bc37e1739bb272279106b22630e134051 [file] [log] [blame]
/**
*
* Copyright (c) 2011, 2018 - Loetz GmbH&Co.KG (69115 Heidelberg, Germany)
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License 2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Christophe Loetz (Loetz GmbH&Co.KG) - initial implementation
*
*/
package org.eclipse.osbp.xtext.signal.common;
import static java.nio.file.LinkOption.NOFOLLOW_LINKS;
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
import static org.quartz.CronScheduleBuilder.cronSchedule;
import static org.quartz.CronScheduleBuilder.dailyAtHourAndMinute;
import static org.quartz.CronScheduleBuilder.monthlyOnDayAndHourAndMinute;
import static org.quartz.CronScheduleBuilder.weeklyOnDayAndHourAndMinute;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.TriggerBuilder.newTrigger;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchEvent.Kind;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.regex.PatternSyntaxException;
import org.eclipse.osbp.xtext.datainterchange.common.WorkerThreadRunnable;
import org.quartz.CronExpression;
import org.quartz.CronTrigger;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerKey;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class OSBPSignalWatcher extends OSBPSignalCommonData {
/** the log */
public Logger log = LoggerFactory.getLogger("watcher");
/** the scheduler */
private Scheduler scheduler;
/** the watcher service */
private WatchService watcher;
/** list of all created keys and paths */
private Map<WatchKey, Path> keys = null;
/**
* determines if the all tree of a directory has to be watched for changes
*/
private boolean recursive = false;
/** determine if the path has already been registered or not */
private boolean pathregistered = false;
/** the list of handler names and their file masks */
private static HashMap<String, String> masks = new HashMap<>();
/** the executor service for the processing of all tasks */
private static ExecutorService executorService = Executors.newCachedThreadPool();
/**
* Constructor.
*
* @throws IOException
*/
public OSBPSignalWatcher() throws Exception {
initWatcher();
}
/**
* Initializes the {@link #watcher}.
*
* @throws IOException
* possible thrown exception
*/
public void initWatcher() throws IOException {
this.watcher = FileSystems.getDefault().newWatchService();
this.keys = new HashMap<>();
}
/**
* Process all events for keys queued to the watcher.
*/
public void processEvents() {
for (;;) {
// wait for key to be signaled
WatchKey key;
try {
key = watcher.take();
} catch (InterruptedException x) {
return;
}
Path dir = keys.get(key);
if (dir == null) {
log.error("WatchKey not recognized!!");
continue;
}
for (WatchEvent<?> event : key.pollEvents()) {
Kind<?> kind = event.kind();
// handle the OVERFLOW event
if (kind == OVERFLOW) {
continue;
}
if (isHandlingAllowed(kind)) {
// Context for directory entry event is the file name of
// entry
WatchEvent<Path> ev = cast(event);
Path child = dir.resolve(ev.context());
log.debug(String.format("%s: %s\n", event.kind().name(), child));
// only allowed event can be handle
try {
handleEvent(child);
} catch (Exception e) {
log.error("WatcherService shortly interrupted due to: " + e.toString());
log.info("WatcherService resumed.");
}
// if directory is created, and watching recursively, then
// register it and its sub-directories
if (recursive && (kind == ENTRY_CREATE)) {
try {
if (Files.isDirectory(child, NOFOLLOW_LINKS)) {
registerAll(child);
}
} catch (IOException x) {
// ignore to keep sample readbale
}
}
}
// reset key and remove from set if directory no longer
// accessible
boolean valid = key.reset();
if (!valid) {
keys.remove(key);
// all directories are inaccessible
if (keys.isEmpty()) {
break;
}
}
}
}
}
/**
* Checks if an event type has to be handle or not. Only create event are
* allowed for now on.
*
* @param event
* {@link WatchEvent<Path>} the event
* @return true if yes, false if not
*/
public boolean isHandlingAllowed(Kind<?> event) {
if (event.name().equals("ENTRY_CREATE")) {
return true;
} else {
return false;
}
}
/**
* Handling operation to execute after the given file is created in the
* watched directory.
*
* @param file
* the newly created file, to react on
*/
public void handleEvent(Path file) {
}
@SuppressWarnings("unchecked")
static <T> WatchEvent<T> cast(WatchEvent<?> event) {
return (WatchEvent<T>) event;
}
/**
* Register the given directory, and all its sub-directories, with the
* WatchService.
*
* @param rootpath
* {@link Path} the root path.
* @param signals
* @throws IOException
*/
public void registerAll(Path rootpath) throws IOException {
// register directory and sub-directories
Files.walkFileTree(rootpath, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
registerPathToWatcher(dir);
return FileVisitResult.CONTINUE;
}
});
}
/**
* Register the given directory with the WatchService
*
* @param directory
* @param combined
* true to create only one {@link WatchKey} for all event types,
* false to create separate key for each event type.
* @throws IOException
*/
public void registerPathToWatcher(Path directory) throws IOException {
log.info("Directory of path to watch: [" + directory + "] registered!");
getRegisteredKeysAndPaths().put(directory.register(getWatcher(), StandardWatchEventKinds.ENTRY_CREATE),
directory);
}
/**
* Checks if the given {@link WatchKey} is valid.
*
* @return true if yes, false if not
*/
public boolean isKeyValid(WatchKey key) {
return key != null ? key.isValid() : false;
}
/**
* Closes the watcher service
*
* @return true if yes or non existing, false if not
*/
public boolean closeWatcher() {
try {
if (watcher != null) {
watcher.close();
}
return true;
} catch (IOException e) {
e.printStackTrace();
}
return false;
}
/**
* Get the list of registered directories.
*
* @return {@link Collection<Path>} {@link #directories}
*/
public Collection<Path> getRegisteredDirectories() {
return keys.values();
}
/**
* Gets the list of registered keys.
*
* @return {@link Set<WatchKey>}
*/
public Set<WatchKey> getRegisteredKeys() {
return keys.keySet();
}
/**
* Gets the list of registered keys and their paths.
*
* @return {@link Map<WatchKey,Path>}
*/
public Map<WatchKey, Path> getRegisteredKeysAndPaths() {
return keys;
}
/**
* Gets the watcher.
*
* @return {@link WatchService}
*/
public WatchService getWatcher() {
return watcher;
}
/**
* Checks if the given filename corresponds to one of the file masks
* eventually defined in one handler of the watcher
*
* @param filename
* the filename to be checked
* @return the corresponding handler name or null
*/
public String isFileNameValid(String filename) {
if (filename != null && !filename.isEmpty() && !masks.isEmpty()) {
for (String handlername : masks.keySet()) {
String filemask = masks.get(handlername);
try {
if (filename.toLowerCase().equals(filemask.toLowerCase()) || filename.matches(filemask)) {
return handlername;
}
} catch (PatternSyntaxException e) {
log.error("The defined pattern [" + filemask + "] is invalid.");
}
}
}
return null; // in the case of no filename matches
}
/**
* Checks if the given directory ends with / or \, whatever is correct.
*
* @param dirPath
* the path
* @return true if yes, false if not
*/
public String checkPath(final String dirPath) {
String cs = FileSystems.getDefault().getSeparator();
if (!dirPath.contains( cs )) {
cs = "/";
if (!dirPath.contains( cs )) {
return dirPath;
}
}
if (!dirPath.endsWith(cs)) {
return dirPath.concat(cs);
}
return dirPath;
}
/**
* Gives the value of {@link #pathregistered}
*
* @return true or false
*/
public boolean isPathregistered() {
return pathregistered;
}
/**
* Set the value of {@link #pathregistered}
*
* @param pathregistered
* value of {@link #pathregistered}
*/
public void setPathregistered(boolean pathregistered) {
this.pathregistered = pathregistered;
}
/**
* Executes the given list of tasks sequentially.
*
* @param handlername
* handler name, having the list of tasks to be executed
* @param triggerfile
* the trigger file that has to be deleted at the end
*/
public void executeTasksSequentially(String handlername, final Path triggerfile) {
log.info("Start - Sequential execution of list of tasks from handler [" + handlername + "] ...");
for (WorkerThreadRunnable interchange : getListOfTasks().get(handlername)) {
// Making sure that the current task is done before continuing
try {
executorService.submit(interchange).get();
} catch (Exception e) {
log.error("Execution - " + interchange.getDirection() + " for " + interchange.getName()
+ " interupted!\n" + e.getMessage());
}
}
deleteFile(triggerfile); // always delete the triggerfile
log.info("End - Sequential execution of list of tasks from handler [" + handlername + "] ...");
}
/**
* Executes the given list of tasks parallel.
*
* @param handlername
* handler name, having the list of tasks to be executed
* @param triggerfile
* the trigger file that has to be deleted at the end
*/
@SuppressWarnings("unchecked")
public void executeTasksParallel(String handlername, final Path triggerfile) {
log.info("Start - Parallel execution of list of tasks from handler [" + handlername + "] ...");
boolean allTasksDone = false;
ArrayList<FutureTask<WorkerThreadRunnable>> tasks = new ArrayList<>();
for (WorkerThreadRunnable dataint : getListOfTasks().get(handlername)) {
tasks.add((FutureTask<WorkerThreadRunnable>) executorService.submit(dataint));
}
while (!allTasksDone) {
allTasksDone = allTasksDone(tasks);
}
if (allTasksDone) {
log.info("All tasks done!");
deleteFile(triggerfile);
}
log.info("End - Parallel execution of list of tasks from handler [" + handlername + "] ...");
}
/**
* Checks, if the given list of tasks has been fully processed.
*
* @param tasks
* the list of tasks
* {@link ArrayList<FutureTask<WorkerThreadRunnable>>}
* @return true if yes, false if not
*/
@Override
public boolean allTasksDone(ArrayList<FutureTask<WorkerThreadRunnable>> tasks) {
for (FutureTask<WorkerThreadRunnable> task : tasks) {
if (task != null && !task.isDone()) {
return false;
}
}
return true;
}
/**
* Gives the list of file back.
*
* @return {@link HashMap<String, String>} the list of file masks
*/
public HashMap<String, String> getMasks() {
return masks;
}
/**
* Adds a new <handler name - file mask> pair to the {@link #masks}.
*
* @param handlername
* the handler name
* @param filemask
* the file mask
*/
public void addFileMasks(String handlername, String filemask) {
masks.put(handlername, filemask);
}
//////////// SCHEDULER FUNCTIONALITIES //////////
/**
* Initializes the {@link #scheduler} field.
*
* @param schedname
* @param threadcount
* the maximum number of job that can be run simultaneously
* @throws SchedulerException
* possible thrown exception
*/
public void initScheduler(String schedname, String threadcount) throws SchedulerException {
log.info("Initializes " + schedname + " ...");
Properties properties = new Properties();
properties.put("version", "2.2.1");
properties.put(StdSchedulerFactory.PROP_SCHED_INSTANCE_ID, "AUTO");
properties.put(StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME, schedname);
properties.put(StdSchedulerFactory.PROP_SCHED_NAME, schedname);
properties.put(StdSchedulerFactory.PROP_THREAD_POOL_PREFIX + ".threadCount", threadcount);
properties.put(StdSchedulerFactory.PROP_THREAD_POOL_CLASS, "org.quartz.simpl.SimpleThreadPool");
scheduler = new StdSchedulerFactory(properties).getScheduler();
log.info(schedname + " successfully initialized!");
}
/**
* Starts the {@link #scheduler}.
*
* @throws SchedulerException
*/
public void startScheduler() throws SchedulerException {
scheduler.start();
log.info(scheduler.getSchedulerName() + " Service started!");
}
/**
* Halts the Scheduler's firing of Triggers, and cleans up all resources
* associated with the Scheduler.
*
* @param waitForJobsToComplete
* whether or not the scheduler shuts down immediately or wait
* for jobs to finish.
* @throws SchedulerException
*/
public void shutdownScheduler(boolean waitForJobsToComplete) throws SchedulerException {
log.info("Shutdowns scheduler ...");
scheduler.shutdown(waitForJobsToComplete);
}
/**
* Halts the Scheduler's firing of Triggers, and cleans up all resources
* associated with the Scheduler.
*
* @throws SchedulerException
*/
public void shutdownScheduler() throws SchedulerException {
log.info("Shutdowns scheduler ...");
scheduler.shutdown();
}
/**
* Pauses the {@link #scheduler}.
*
* @throws SchedulerException
* possible thrown exception
*/
public void pauseScheduler() throws SchedulerException {
log.info("Pauses scheduler ...");
scheduler.pauseAll();
}
/**
* Resumes the {@link #scheduler}.
*
* @throws SchedulerException
* possible thrown exception
*/
public void resumeScheduler() throws SchedulerException {
log.info("Resumes scheduler ...");
scheduler.resumeAll();
}
/**
* Gives the {@link #scheduler} back.
*
* @return the {@link OSBPSignalScheduler}
*/
public Scheduler getScheduler() {
return scheduler;
}
/**
* Schedules a given job based on a given trigger.
*
* @param job
* the {@link JobDetail} to be scheduled
* @param trigger
* the corresponding trigger
* @return
*/
public Date scheduleJob(JobDetail job, Trigger trigger) {
try {
return scheduler.scheduleJob(job, trigger);
} catch (SchedulerException e) {
e.printStackTrace();
}
return null;
}
/**
* Unschedules a job.
*
* @param triggerkey
* the trigger key.
*/
public void unscheduleJob(TriggerKey triggerkey) {
try {
scheduler.unscheduleJob(triggerkey);
} catch (SchedulerException e1) {
e1.printStackTrace();
}
}
/**
* Creates a job based on the given watcher - handler name.
*
* @param name
* Job name
* @param groupname
* Group name
* @param description
* Description of the job
* @param issequential
* Execution type of the given lsit of tasks
* @param handlername
* name of watcher - handler for this specific job.
* @return {@link JobDetail} the newly created Job.
*/
public JobDetail createDataTransferJob(String name, String groupname, String description, boolean issequential,
String handlername) {
JobDetail job = newJob(OSBPSignalJob.class).usingJobData(OSBPSignalConstants.SEQUENTIAL, issequential)
.withDescription(description).withIdentity(name, groupname).build();
job.getJobDataMap().putIfAbsent(OSBPSignalConstants.TASKSLIST, handlername);
return job;
}
/**
* Creates an hourly based trigger.
*
* @param name
* Trigger name
* @param groupname
* Group name
* @param minute
* The exact minute, in which the trigger is hourly fired (0-59).
* @return {@link CronTrigger} the newly created trigger.
*/
public CronTrigger createHourlyTrigger(String name, String groupname, int minute) {
log.debug("Creating createHourlyTrigger [" + name + "] for group [" + groupname + "] running hourly at ["
+ minute + "] minutes ...");
CronTrigger trigger = newTrigger().withIdentity(name, groupname)
.withSchedule(cronSchedule(String.format("0 %s * * * ?", minute))).build();
log.debug("createHourlyTrigger " + trigger + " successfully created.");
return trigger;
}
/**
* Creates a daily based trigger.
*
* @param name
* Trigger name
* @param groupname
* Group name
* @param hour
* The exact hour, in which the trigger is daily fired (0-23).
* @param minute
* The exact minute, in which the trigger is hourly in a day
* fired (0-59).
* @return {@link CronTrigger} the newly created trigger.
*/
public CronTrigger createDailyTriggerAtHourandMins(String name, String groupname, int hour, int minute) {
log.debug("Creating DailyTrigger [" + name + "] for group [" + groupname + "] running at hour[" + hour
+ "] and [" + minute + "] minutes ...");
CronTrigger trigger = newTrigger().withIdentity(name, groupname)
.withSchedule(dailyAtHourAndMinute(hour, minute)).build();
log.debug("DailyTrigger " + trigger + " successfully created.");
return trigger;
}
/**
* Creates a weekly based trigger.
*
* @param name
* Trigger name
* @param groupname
* Group name
* @param dayOfWeek
* The day of the week on which the trigger has to be fired (0-6
* for Sunday-Saturday).
* @param hour
* The exact hour, in which the trigger is daily fired (0-23).
* @param minute
* The exact minute, in which the trigger is hourly in a day
* fired (0-59).
* @return {@link CronTrigger} the newly created trigger.
*/
public CronTrigger createWeeklyTriggerOnDayAndHourAndMinute(String name, String groupname, int dayOfWeek, int hour,
int minute) {
log.debug("Creating Trigger [" + name + "] for group [" + groupname + "] ...");
CronTrigger trigger = newTrigger().withIdentity(name, groupname)
.withSchedule(weeklyOnDayAndHourAndMinute(dayOfWeek, hour, minute)).build();
log.debug("Trigger " + trigger + " successfully created.");
return trigger;
}
/**
* Creates a monthly based trigger.
*
* @param name
* Trigger name
* @param groupname
* Group name
* @param dayOfMonth
* The day of the month on which the trigger has to be fired
* (1-31).
* @param hour
* The exact hour, in which the trigger is daily fired (0-23).
* @param minute
* The exact minute, in which the trigger is hourly in a day
* fired (0-59).
* @return {@link CronTrigger} the newly created trigger.
*/
public CronTrigger createMonthlyTriggerOnDayAndHourAndMinute(String name, String groupname, int dayOfMonth,
int hour, int minute) {
log.debug("Creating MonthlyTrigger[" + name + "] for group [" + groupname + "] on Day[" + dayOfMonth
+ "] and Hour[" + hour + "] and [" + minute + "]Minute...");
CronTrigger trigger = newTrigger().withIdentity(name, groupname)
.withSchedule(monthlyOnDayAndHourAndMinute(dayOfMonth, hour, minute)).build();
log.debug("MonthlyTrigger " + trigger + " successfully created.");
return trigger;
}
/**
* Creates a cron based trigger, depending on the given cron-expression.
*
* @param name
* Trigger name
* @param groupname
* Group name
* @param expression
* the {@link CronExpression} as string value
* @return {@link CronTrigger} the newly created cron-trigger.
*/
public CronTrigger createCronTrigger(String name, String groupname, String expression) {
log.debug("Creating CronTrigger [" + name + "] for group [" + groupname + "] with CronExpression [" + expression
+ "]...");
CronTrigger trigger = newTrigger().withIdentity(name, groupname).withSchedule(cronSchedule(expression)).build();
log.debug("CronTrigger " + trigger + " successfully created.");
return trigger;
}
}