| /** |
| * |
| * Copyright (c) 2011, 2018 - Loetz GmbH&Co.KG (69115 Heidelberg, Germany) |
| * |
| * All rights reserved. This program and the accompanying materials |
| * are made available under the terms of the Eclipse Public License 2.0 |
| * which accompanies this distribution, and is available at |
| * https://www.eclipse.org/legal/epl-2.0/ |
| * |
| * SPDX-License-Identifier: EPL-2.0 |
| * |
| * Contributors: |
| * Christophe Loetz (Loetz GmbH&Co.KG) - initial implementation |
| * |
| */ |
| package org.eclipse.osbp.xtext.signal.common; |
| |
| import static java.nio.file.LinkOption.NOFOLLOW_LINKS; |
| import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE; |
| import static java.nio.file.StandardWatchEventKinds.OVERFLOW; |
| import static org.quartz.CronScheduleBuilder.cronSchedule; |
| import static org.quartz.CronScheduleBuilder.dailyAtHourAndMinute; |
| import static org.quartz.CronScheduleBuilder.monthlyOnDayAndHourAndMinute; |
| import static org.quartz.CronScheduleBuilder.weeklyOnDayAndHourAndMinute; |
| import static org.quartz.JobBuilder.newJob; |
| import static org.quartz.TriggerBuilder.newTrigger; |
| |
| import java.io.IOException; |
| import java.nio.file.FileSystems; |
| import java.nio.file.FileVisitResult; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.nio.file.SimpleFileVisitor; |
| import java.nio.file.StandardWatchEventKinds; |
| import java.nio.file.WatchEvent; |
| import java.nio.file.WatchEvent.Kind; |
| import java.nio.file.WatchKey; |
| import java.nio.file.WatchService; |
| import java.nio.file.attribute.BasicFileAttributes; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.FutureTask; |
| import java.util.regex.PatternSyntaxException; |
| |
| import org.eclipse.osbp.xtext.datainterchange.common.WorkerThreadRunnable; |
| import org.quartz.CronExpression; |
| import org.quartz.CronTrigger; |
| import org.quartz.JobDetail; |
| import org.quartz.Scheduler; |
| import org.quartz.SchedulerException; |
| import org.quartz.Trigger; |
| import org.quartz.TriggerKey; |
| import org.quartz.impl.StdSchedulerFactory; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public abstract class OSBPSignalWatcher extends OSBPSignalCommonData { |
| /** the log */ |
| public Logger log = LoggerFactory.getLogger("watcher"); |
| /** the scheduler */ |
| private Scheduler scheduler; |
| /** the watcher service */ |
| private WatchService watcher; |
| /** list of all created keys and paths */ |
| private Map<WatchKey, Path> keys = null; |
| /** |
| * determines if the all tree of a directory has to be watched for changes |
| */ |
| private boolean recursive = false; |
| /** determine if the path has already been registered or not */ |
| private boolean pathregistered = false; |
| /** the list of handler names and their file masks */ |
| private static HashMap<String, String> masks = new HashMap<>(); |
| /** the executor service for the processing of all tasks */ |
| private static ExecutorService executorService = Executors.newCachedThreadPool(); |
| |
| /** |
| * Constructor. |
| * |
| * @throws IOException |
| */ |
| public OSBPSignalWatcher() throws Exception { |
| initWatcher(); |
| } |
| |
| /** |
| * Initializes the {@link #watcher}. |
| * |
| * @throws IOException |
| * possible thrown exception |
| */ |
| public void initWatcher() throws IOException { |
| this.watcher = FileSystems.getDefault().newWatchService(); |
| this.keys = new HashMap<>(); |
| } |
| |
| /** |
| * Process all events for keys queued to the watcher. |
| */ |
| public void processEvents() { |
| for (;;) { |
| // wait for key to be signaled |
| WatchKey key; |
| try { |
| key = watcher.take(); |
| } catch (InterruptedException x) { |
| return; |
| } |
| |
| Path dir = keys.get(key); |
| if (dir == null) { |
| log.error("WatchKey not recognized!!"); |
| continue; |
| } |
| |
| for (WatchEvent<?> event : key.pollEvents()) { |
| Kind<?> kind = event.kind(); |
| |
| // handle the OVERFLOW event |
| if (kind == OVERFLOW) { |
| continue; |
| } |
| |
| if (isHandlingAllowed(kind)) { |
| // Context for directory entry event is the file name of |
| // entry |
| WatchEvent<Path> ev = cast(event); |
| Path child = dir.resolve(ev.context()); |
| log.debug(String.format("%s: %s\n", event.kind().name(), child)); |
| |
| // only allowed event can be handle |
| try { |
| handleEvent(child); |
| } catch (Exception e) { |
| log.error("WatcherService shortly interrupted due to: " + e.toString()); |
| log.info("WatcherService resumed."); |
| } |
| |
| // if directory is created, and watching recursively, then |
| // register it and its sub-directories |
| if (recursive && (kind == ENTRY_CREATE)) { |
| try { |
| if (Files.isDirectory(child, NOFOLLOW_LINKS)) { |
| registerAll(child); |
| } |
| } catch (IOException x) { |
| // ignore to keep sample readbale |
| } |
| } |
| } |
| // reset key and remove from set if directory no longer |
| // accessible |
| boolean valid = key.reset(); |
| if (!valid) { |
| keys.remove(key); |
| |
| // all directories are inaccessible |
| if (keys.isEmpty()) { |
| break; |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Checks if an event type has to be handle or not. Only create event are |
| * allowed for now on. |
| * |
| * @param event |
| * {@link WatchEvent<Path>} the event |
| * @return true if yes, false if not |
| */ |
| public boolean isHandlingAllowed(Kind<?> event) { |
| if (event.name().equals("ENTRY_CREATE")) { |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| /** |
| * Handling operation to execute after the given file is created in the |
| * watched directory. |
| * |
| * @param file |
| * the newly created file, to react on |
| */ |
| public void handleEvent(Path file) { |
| } |
| |
| @SuppressWarnings("unchecked") |
| static <T> WatchEvent<T> cast(WatchEvent<?> event) { |
| return (WatchEvent<T>) event; |
| } |
| |
| /** |
| * Register the given directory, and all its sub-directories, with the |
| * WatchService. |
| * |
| * @param rootpath |
| * {@link Path} the root path. |
| * @param signals |
| * @throws IOException |
| */ |
| public void registerAll(Path rootpath) throws IOException { |
| // register directory and sub-directories |
| Files.walkFileTree(rootpath, new SimpleFileVisitor<Path>() { |
| @Override |
| public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { |
| registerPathToWatcher(dir); |
| return FileVisitResult.CONTINUE; |
| } |
| }); |
| } |
| |
| /** |
| * Register the given directory with the WatchService |
| * |
| * @param directory |
| * @param combined |
| * true to create only one {@link WatchKey} for all event types, |
| * false to create separate key for each event type. |
| * @throws IOException |
| */ |
| public void registerPathToWatcher(Path directory) throws IOException { |
| log.info("Directory of path to watch: [" + directory + "] registered!"); |
| getRegisteredKeysAndPaths().put(directory.register(getWatcher(), StandardWatchEventKinds.ENTRY_CREATE), |
| directory); |
| } |
| |
| /** |
| * Checks if the given {@link WatchKey} is valid. |
| * |
| * @return true if yes, false if not |
| */ |
| public boolean isKeyValid(WatchKey key) { |
| return key != null ? key.isValid() : false; |
| } |
| |
| /** |
| * Closes the watcher service |
| * |
| * @return true if yes or non existing, false if not |
| */ |
| public boolean closeWatcher() { |
| try { |
| if (watcher != null) { |
| watcher.close(); |
| } |
| return true; |
| } catch (IOException e) { |
| e.printStackTrace(); |
| } |
| return false; |
| } |
| |
| /** |
| * Get the list of registered directories. |
| * |
| * @return {@link Collection<Path>} {@link #directories} |
| */ |
| public Collection<Path> getRegisteredDirectories() { |
| return keys.values(); |
| } |
| |
| /** |
| * Gets the list of registered keys. |
| * |
| * @return {@link Set<WatchKey>} |
| */ |
| public Set<WatchKey> getRegisteredKeys() { |
| return keys.keySet(); |
| } |
| |
| /** |
| * Gets the list of registered keys and their paths. |
| * |
| * @return {@link Map<WatchKey,Path>} |
| */ |
| public Map<WatchKey, Path> getRegisteredKeysAndPaths() { |
| return keys; |
| } |
| |
| /** |
| * Gets the watcher. |
| * |
| * @return {@link WatchService} |
| */ |
| public WatchService getWatcher() { |
| return watcher; |
| } |
| |
| /** |
| * Checks if the given filename corresponds to one of the file masks |
| * eventually defined in one handler of the watcher |
| * |
| * @param filename |
| * the filename to be checked |
| * @return the corresponding handler name or null |
| */ |
| public String isFileNameValid(String filename) { |
| if (filename != null && !filename.isEmpty() && !masks.isEmpty()) { |
| for (String handlername : masks.keySet()) { |
| String filemask = masks.get(handlername); |
| try { |
| if (filename.toLowerCase().equals(filemask.toLowerCase()) || filename.matches(filemask)) { |
| return handlername; |
| } |
| } catch (PatternSyntaxException e) { |
| log.error("The defined pattern [" + filemask + "] is invalid."); |
| } |
| } |
| } |
| return null; // in the case of no filename matches |
| } |
| |
| /** |
| * Checks if the given directory ends with / or \, whatever is correct. |
| * |
| * @param dirPath |
| * the path |
| * @return true if yes, false if not |
| */ |
| public String checkPath(final String dirPath) { |
| |
| String cs = FileSystems.getDefault().getSeparator(); |
| if (!dirPath.contains( cs )) { |
| cs = "/"; |
| if (!dirPath.contains( cs )) { |
| return dirPath; |
| } |
| } |
| |
| if (!dirPath.endsWith(cs)) { |
| return dirPath.concat(cs); |
| } |
| |
| return dirPath; |
| } |
| |
| /** |
| * Gives the value of {@link #pathregistered} |
| * |
| * @return true or false |
| */ |
| public boolean isPathregistered() { |
| return pathregistered; |
| } |
| |
| /** |
| * Set the value of {@link #pathregistered} |
| * |
| * @param pathregistered |
| * value of {@link #pathregistered} |
| */ |
| public void setPathregistered(boolean pathregistered) { |
| this.pathregistered = pathregistered; |
| } |
| |
| /** |
| * Executes the given list of tasks sequentially. |
| * |
| * @param handlername |
| * handler name, having the list of tasks to be executed |
| * @param triggerfile |
| * the trigger file that has to be deleted at the end |
| */ |
| public void executeTasksSequentially(String handlername, final Path triggerfile) { |
| log.info("Start - Sequential execution of list of tasks from handler [" + handlername + "] ..."); |
| for (WorkerThreadRunnable interchange : getListOfTasks().get(handlername)) { |
| // Making sure that the current task is done before continuing |
| try { |
| executorService.submit(interchange).get(); |
| } catch (Exception e) { |
| log.error("Execution - " + interchange.getDirection() + " for " + interchange.getName() |
| + " interupted!\n" + e.getMessage()); |
| } |
| } |
| deleteFile(triggerfile); // always delete the triggerfile |
| log.info("End - Sequential execution of list of tasks from handler [" + handlername + "] ..."); |
| } |
| |
| /** |
| * Executes the given list of tasks parallel. |
| * |
| * @param handlername |
| * handler name, having the list of tasks to be executed |
| * @param triggerfile |
| * the trigger file that has to be deleted at the end |
| */ |
| @SuppressWarnings("unchecked") |
| public void executeTasksParallel(String handlername, final Path triggerfile) { |
| log.info("Start - Parallel execution of list of tasks from handler [" + handlername + "] ..."); |
| boolean allTasksDone = false; |
| ArrayList<FutureTask<WorkerThreadRunnable>> tasks = new ArrayList<>(); |
| for (WorkerThreadRunnable dataint : getListOfTasks().get(handlername)) { |
| tasks.add((FutureTask<WorkerThreadRunnable>) executorService.submit(dataint)); |
| } |
| while (!allTasksDone) { |
| allTasksDone = allTasksDone(tasks); |
| } |
| if (allTasksDone) { |
| log.info("All tasks done!"); |
| deleteFile(triggerfile); |
| } |
| log.info("End - Parallel execution of list of tasks from handler [" + handlername + "] ..."); |
| } |
| |
| /** |
| * Checks, if the given list of tasks has been fully processed. |
| * |
| * @param tasks |
| * the list of tasks |
| * {@link ArrayList<FutureTask<WorkerThreadRunnable>>} |
| * @return true if yes, false if not |
| */ |
| @Override |
| public boolean allTasksDone(ArrayList<FutureTask<WorkerThreadRunnable>> tasks) { |
| for (FutureTask<WorkerThreadRunnable> task : tasks) { |
| if (task != null && !task.isDone()) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * Gives the list of file back. |
| * |
| * @return {@link HashMap<String, String>} the list of file masks |
| */ |
| public HashMap<String, String> getMasks() { |
| return masks; |
| } |
| |
| /** |
| * Adds a new <handler name - file mask> pair to the {@link #masks}. |
| * |
| * @param handlername |
| * the handler name |
| * @param filemask |
| * the file mask |
| */ |
| public void addFileMasks(String handlername, String filemask) { |
| masks.put(handlername, filemask); |
| } |
| |
| //////////// SCHEDULER FUNCTIONALITIES ////////// |
| |
| /** |
| * Initializes the {@link #scheduler} field. |
| * |
| * @param schedname |
| * @param threadcount |
| * the maximum number of job that can be run simultaneously |
| * @throws SchedulerException |
| * possible thrown exception |
| */ |
| public void initScheduler(String schedname, String threadcount) throws SchedulerException { |
| log.info("Initializes " + schedname + " ..."); |
| Properties properties = new Properties(); |
| properties.put("version", "2.2.1"); |
| properties.put(StdSchedulerFactory.PROP_SCHED_INSTANCE_ID, "AUTO"); |
| properties.put(StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME, schedname); |
| properties.put(StdSchedulerFactory.PROP_SCHED_NAME, schedname); |
| properties.put(StdSchedulerFactory.PROP_THREAD_POOL_PREFIX + ".threadCount", threadcount); |
| properties.put(StdSchedulerFactory.PROP_THREAD_POOL_CLASS, "org.quartz.simpl.SimpleThreadPool"); |
| scheduler = new StdSchedulerFactory(properties).getScheduler(); |
| log.info(schedname + " successfully initialized!"); |
| } |
| |
| /** |
| * Starts the {@link #scheduler}. |
| * |
| * @throws SchedulerException |
| */ |
| public void startScheduler() throws SchedulerException { |
| scheduler.start(); |
| log.info(scheduler.getSchedulerName() + " Service started!"); |
| } |
| |
| /** |
| * Halts the Scheduler's firing of Triggers, and cleans up all resources |
| * associated with the Scheduler. |
| * |
| * @param waitForJobsToComplete |
| * whether or not the scheduler shuts down immediately or wait |
| * for jobs to finish. |
| * @throws SchedulerException |
| */ |
| public void shutdownScheduler(boolean waitForJobsToComplete) throws SchedulerException { |
| log.info("Shutdowns scheduler ..."); |
| scheduler.shutdown(waitForJobsToComplete); |
| } |
| |
| /** |
| * Halts the Scheduler's firing of Triggers, and cleans up all resources |
| * associated with the Scheduler. |
| * |
| * @throws SchedulerException |
| */ |
| public void shutdownScheduler() throws SchedulerException { |
| log.info("Shutdowns scheduler ..."); |
| scheduler.shutdown(); |
| } |
| |
| /** |
| * Pauses the {@link #scheduler}. |
| * |
| * @throws SchedulerException |
| * possible thrown exception |
| */ |
| public void pauseScheduler() throws SchedulerException { |
| log.info("Pauses scheduler ..."); |
| scheduler.pauseAll(); |
| } |
| |
| /** |
| * Resumes the {@link #scheduler}. |
| * |
| * @throws SchedulerException |
| * possible thrown exception |
| */ |
| public void resumeScheduler() throws SchedulerException { |
| log.info("Resumes scheduler ..."); |
| scheduler.resumeAll(); |
| } |
| |
| /** |
| * Gives the {@link #scheduler} back. |
| * |
| * @return the {@link OSBPSignalScheduler} |
| */ |
| public Scheduler getScheduler() { |
| return scheduler; |
| } |
| |
| /** |
| * Schedules a given job based on a given trigger. |
| * |
| * @param job |
| * the {@link JobDetail} to be scheduled |
| * @param trigger |
| * the corresponding trigger |
| * @return |
| */ |
| public Date scheduleJob(JobDetail job, Trigger trigger) { |
| try { |
| return scheduler.scheduleJob(job, trigger); |
| } catch (SchedulerException e) { |
| e.printStackTrace(); |
| } |
| return null; |
| } |
| |
| /** |
| * Unschedules a job. |
| * |
| * @param triggerkey |
| * the trigger key. |
| */ |
| public void unscheduleJob(TriggerKey triggerkey) { |
| try { |
| scheduler.unscheduleJob(triggerkey); |
| } catch (SchedulerException e1) { |
| e1.printStackTrace(); |
| } |
| } |
| |
| /** |
| * Creates a job based on the given watcher - handler name. |
| * |
| * @param name |
| * Job name |
| * @param groupname |
| * Group name |
| * @param description |
| * Description of the job |
| * @param issequential |
| * Execution type of the given lsit of tasks |
| * @param handlername |
| * name of watcher - handler for this specific job. |
| * @return {@link JobDetail} the newly created Job. |
| */ |
| public JobDetail createDataTransferJob(String name, String groupname, String description, boolean issequential, |
| String handlername) { |
| JobDetail job = newJob(OSBPSignalJob.class).usingJobData(OSBPSignalConstants.SEQUENTIAL, issequential) |
| .withDescription(description).withIdentity(name, groupname).build(); |
| job.getJobDataMap().putIfAbsent(OSBPSignalConstants.TASKSLIST, handlername); |
| return job; |
| } |
| |
| /** |
| * Creates an hourly based trigger. |
| * |
| * @param name |
| * Trigger name |
| * @param groupname |
| * Group name |
| * @param minute |
| * The exact minute, in which the trigger is hourly fired (0-59). |
| * @return {@link CronTrigger} the newly created trigger. |
| */ |
| public CronTrigger createHourlyTrigger(String name, String groupname, int minute) { |
| log.debug("Creating createHourlyTrigger [" + name + "] for group [" + groupname + "] running hourly at [" |
| + minute + "] minutes ..."); |
| |
| CronTrigger trigger = newTrigger().withIdentity(name, groupname) |
| .withSchedule(cronSchedule(String.format("0 %s * * * ?", minute))).build(); |
| |
| log.debug("createHourlyTrigger " + trigger + " successfully created."); |
| return trigger; |
| } |
| |
| /** |
| * Creates a daily based trigger. |
| * |
| * @param name |
| * Trigger name |
| * @param groupname |
| * Group name |
| * @param hour |
| * The exact hour, in which the trigger is daily fired (0-23). |
| * @param minute |
| * The exact minute, in which the trigger is hourly in a day |
| * fired (0-59). |
| * @return {@link CronTrigger} the newly created trigger. |
| */ |
| public CronTrigger createDailyTriggerAtHourandMins(String name, String groupname, int hour, int minute) { |
| log.debug("Creating DailyTrigger [" + name + "] for group [" + groupname + "] running at hour[" + hour |
| + "] and [" + minute + "] minutes ..."); |
| CronTrigger trigger = newTrigger().withIdentity(name, groupname) |
| .withSchedule(dailyAtHourAndMinute(hour, minute)).build(); |
| log.debug("DailyTrigger " + trigger + " successfully created."); |
| return trigger; |
| } |
| |
| /** |
| * Creates a weekly based trigger. |
| * |
| * @param name |
| * Trigger name |
| * @param groupname |
| * Group name |
| * @param dayOfWeek |
| * The day of the week on which the trigger has to be fired (0-6 |
| * for Sunday-Saturday). |
| * @param hour |
| * The exact hour, in which the trigger is daily fired (0-23). |
| * @param minute |
| * The exact minute, in which the trigger is hourly in a day |
| * fired (0-59). |
| * @return {@link CronTrigger} the newly created trigger. |
| */ |
| public CronTrigger createWeeklyTriggerOnDayAndHourAndMinute(String name, String groupname, int dayOfWeek, int hour, |
| int minute) { |
| log.debug("Creating Trigger [" + name + "] for group [" + groupname + "] ..."); |
| CronTrigger trigger = newTrigger().withIdentity(name, groupname) |
| .withSchedule(weeklyOnDayAndHourAndMinute(dayOfWeek, hour, minute)).build(); |
| log.debug("Trigger " + trigger + " successfully created."); |
| return trigger; |
| } |
| |
| /** |
| * Creates a monthly based trigger. |
| * |
| * @param name |
| * Trigger name |
| * @param groupname |
| * Group name |
| * @param dayOfMonth |
| * The day of the month on which the trigger has to be fired |
| * (1-31). |
| * @param hour |
| * The exact hour, in which the trigger is daily fired (0-23). |
| * @param minute |
| * The exact minute, in which the trigger is hourly in a day |
| * fired (0-59). |
| * @return {@link CronTrigger} the newly created trigger. |
| */ |
| public CronTrigger createMonthlyTriggerOnDayAndHourAndMinute(String name, String groupname, int dayOfMonth, |
| int hour, int minute) { |
| log.debug("Creating MonthlyTrigger[" + name + "] for group [" + groupname + "] on Day[" + dayOfMonth |
| + "] and Hour[" + hour + "] and [" + minute + "]Minute..."); |
| CronTrigger trigger = newTrigger().withIdentity(name, groupname) |
| .withSchedule(monthlyOnDayAndHourAndMinute(dayOfMonth, hour, minute)).build(); |
| log.debug("MonthlyTrigger " + trigger + " successfully created."); |
| return trigger; |
| } |
| |
| /** |
| * Creates a cron based trigger, depending on the given cron-expression. |
| * |
| * @param name |
| * Trigger name |
| * @param groupname |
| * Group name |
| * @param expression |
| * the {@link CronExpression} as string value |
| * @return {@link CronTrigger} the newly created cron-trigger. |
| */ |
| public CronTrigger createCronTrigger(String name, String groupname, String expression) { |
| log.debug("Creating CronTrigger [" + name + "] for group [" + groupname + "] with CronExpression [" + expression |
| + "]..."); |
| CronTrigger trigger = newTrigger().withIdentity(name, groupname).withSchedule(cronSchedule(expression)).build(); |
| log.debug("CronTrigger " + trigger + " successfully created."); |
| return trigger; |
| } |
| } |