blob: 6836220a606104212376118e84de2904eb2070d0 [file] [log] [blame]
/**
*
* Copyright (c) 2011, 2018 - Loetz GmbH&Co.KG (69115 Heidelberg, Germany)
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License 2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Christophe Loetz (Loetz GmbH&Co.KG) - initial implementation
*
*/
package org.eclipse.osbp.xtext.signal.common;
import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
import static org.eclipse.osbp.xtext.signal.common.SignalConstants.CSV_EXTENSION;
import static org.eclipse.osbp.xtext.signal.common.SignalConstants.EDI_EXTENSION;
import static org.eclipse.osbp.xtext.signal.common.SignalConstants.XML_EXTENSION;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.FileSystems;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchEvent.Kind;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.PatternSyntaxException;
public class WatcherImpl extends SignalCommonData {
/** the queue containing all watcher tasks to be executed */
public static ArrayDeque<WatcherJobImpl> watcherjobs = new ArrayDeque<WatcherJobImpl>();
public static ArrayList<WatcherJobImpl> temp = new ArrayList<>();
/** whether the queue is allowed to be handled or not */
private static AtomicBoolean activeJobshandler = new AtomicBoolean(false);
/** the job's handler thread */
private static Thread jobshandler;
/** the watcher service */
private WatchService watcher;
/** list of all created keys and paths */
private Map<WatchKey, Path> keys = null;
/** watcher name for log outputs */
private String identifier = null;
/** the executor services for the processing of all tasks */
private static ExecutorService executorService;
/** lastwatcherid */
private String lastwatcherid = null;
/**
* Constructor.
*
* @param identifier
* the watcher name
* @throws IOException
*/
public WatcherImpl(String identifier) throws Exception {
signalcount++;
this.identifier = identifier;
initWatcher();
}
/**
* Initializes the {@link #watcher}.
*
* @throws IOException
* possible thrown exception
*/
public void initWatcher() throws IOException {
this.watcher = FileSystems.getDefault().newWatchService();
this.keys = new HashMap<>();
}
/**
* Process all events for keys queued to the watcher.
*/
public void processEvents() {
for (;;) {
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
// NOSONAR
}
// wait for key to be signaled
WatchKey key;
try {
key = watcher.take();
} catch (InterruptedException x) {
return;
}
Path dir = keys.get(key);
if (dir == null) {
log.error(identifier + " - WatchKey not recognized!!");
continue;
}
for (WatchEvent<?> event : key.pollEvents()) {
Kind<?> kind = event.kind();
// handle the OVERFLOW event
if (kind == OVERFLOW) {
continue;
}
if (isHandlingAllowed(kind)) {
// Context for directory entry event is the file name of
// entry
WatchEvent<Path> ev = cast(event);
Path child = dir.resolve(ev.context());
//only handle the creation of files and ignores directories
if(!child.toFile().isDirectory()){
log.debug(String.format("%s - %s: %s\n", identifier, event.kind().name(), child));
// only allowed event can be handle
try {
handleEvent(child);
} catch (Exception e) {
log.error(identifier + " - WatcherService shortly interrupted ...", e);
log.info(identifier + " - WatcherService resumed.");
}
}
}
// reset key and remove from set if directory no longer
// accessible
boolean valid = key.reset();
if (!valid) {
keys.remove(key);
// all directories are inaccessible
if (keys.isEmpty()) {
break;
}
}
}
}
}
/**
* Checks if an event type has to be handle or not. Only create event are
* allowed for now on.
*
* @param event
* {@link WatchEvent<Path>} the event
* @return true if yes, false if not
*/
public boolean isHandlingAllowed(Kind<?> event) {
if (event.name().equals("ENTRY_CREATE")) {
return true;
} else {
return false;
}
}
/**
* Identifies, if the with the given path corresponds to a valid trigger
* file. If it does the combination containing the same trigger file and the
* list of tasks to be executed is given back.
*
* @param file
* the newly created file, to react on
*/
public void handleEvent(Path file) {
}
@SuppressWarnings("unchecked")
static <T> WatchEvent<T> cast(WatchEvent<?> event) {
return (WatchEvent<T>) event;
}
/**
* Register the given directory, and all its sub-directories, with the
* WatchService.
*
* @param rootpath
* {@link Path} the root path.
* @param signals
* @throws IOException
*/
public void registerAll(Path rootpath) throws IOException {
// register directory and sub-directories
Files.walkFileTree(rootpath, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
registerPathToWatch(dir);
return FileVisitResult.CONTINUE;
}
});
}
/**
* Register the given directory with the WatchService
*
* @param directory
* @param combined
* true to create only one {@link WatchKey} for all event types,
* false to create separate key for each event type.
* @throws IOException
*/
public void registerPathToWatch(Path directory) throws IOException {
getRegisteredKeysAndPaths().put(directory.register(getWatcher(), StandardWatchEventKinds.ENTRY_CREATE),
directory);
log.info(String.format("%s - Directory to watch: [%s] registered!", identifier, directory));
}
/**
* Checks if the given {@link WatchKey} is valid.
*
* @return true if yes, false if not
*/
public boolean isKeyValid(WatchKey key) {
return key != null ? key.isValid() : false;
}
/**
* Closes the watcher service
*
*/
public void closeWatcher() {
try {
if (watcher != null) {
watcher.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* Get the list of registered directories.
*
* @return {@link Collection<Path>} {@link #directories}
*/
public Collection<Path> getRegisteredDirectories() {
return keys.values();
}
/**
* Gets the list of registered keys.
*
* @return {@link Set<WatchKey>}
*/
public Set<WatchKey> getRegisteredKeys() {
return keys.keySet();
}
/**
* Gets the list of registered keys and their paths.
*
* @return {@link Map<WatchKey,Path>}
*/
public Map<WatchKey, Path> getRegisteredKeysAndPaths() {
return keys;
}
/**
* Gets the watcher.
*
* @return {@link WatchService}
*/
public WatchService getWatcher() {
return watcher;
}
/**
* Checks if the given filename corresponds matches the given file masks.
*
* @param filename
* the filename to be checked
* @param filemask
* the filemask to be matched with
* @return true if it's a match, false if not
*/
public boolean isFileNameValidToFileMask(String filename, String filemask) {
if (filename != null && !filename.isEmpty() && filemask != null && !filemask.isEmpty()) {
return FileSystems.getDefault().getPathMatcher("glob:" + filemask).matches(Paths.get(filename));
}
return false;
}
/**
* Checks if the given file name equals the expected one.
*
* @param filename
* the file name to be checked
* @param filemask
* the base file name to be compared with
* @return true if both are equals, false if not
*/
public boolean isFileNameValidToBaseFileName(String filename, String basename) {
if (filename != null && basename != null && basename.equals(filename)) {
return true;
}
return false;
}
/**
* Checks if the given file mask is empty. Further validation can be
* implemented here.
*
* @param filemask
* the file mask
* @return the file mask value, or null
*/
public String getFilemaskValue(String filemask) {
if (filemask != null && !filemask.isEmpty()) {
return filemask;
}
return null;
}
/**
* Checks if the given file name is empty. Further validation can be
* implemented here.
*
* @param filename
* the file n
* @return the file mask value, or null
*/
public String getFileNameValue(String filename) {
if (filename != null && !filename.isEmpty()) {
return filename;
}
return null;
}
/**
* Convenience method to register watcher directories
*
* @param url
* a file path url
* @param modelurl
* a model path url
*/
public void registerUrl(String url, String modelurl) {
if (url != null && !url.isEmpty()) {
if (url.contains("\\")) {
url = url.substring(0, url.lastIndexOf("\\"));
} else if (url.contains("/")) {
url = url.substring(0, url.lastIndexOf("/"));
} else {
url = modelurl;
}
} else {
url = modelurl;
}
URL a = isDirectoryValidURL(url);
if (a != null && a.getProtocol().equals("file")) {
if (existsDirectory(url)) {
try {
registerPathToWatch(Paths.get(url));
} catch (IOException e) {
log.error(String.format("%s - Path [%s] could not be registered!", identifier, url), e);
}
} else {
log.error(String.format("%s - Path [%s] not existing! Directory not registered!", identifier, url));
}
}
}
/**
* Checks if the given string a valid url.
*
* @param url
* @return true if yes, false if not.
*/
public URL isDirectoryValidURL(String url) {
try {
return new URL(url);
} catch (MalformedURLException e1) {
if (e1.getMessage().startsWith("unknown protocol") || e1.getMessage().startsWith("no protocol")) {
try {
return Paths.get(url).toUri().toURL();
} catch (MalformedURLException e2) {
StringWriter sw = new StringWriter();
e2.printStackTrace(new PrintWriter(sw));
log.error(identifier + " - {}", sw.toString());
}
}
}
return null;
}
/**
* Checks if the given path points to an existing directory.
*
* @param path
* the path
* @return true if yes, false if not
*/
public boolean existsDirectory(String path) {
try {
return Files.isDirectory(Paths.get(path));
} catch (Exception e) {
log.error(identifier + " - existsDirectory - error ", e);
}
return false;
}
/**
* Add a watcher task to the queue.
*
* @param task
* the watcher task
*/
public void addWatcherJob(WatcherJobImpl task) {
synchronized (watcherjobs) {
watcherjobs.add(task);
log.debug(identifier+" - WatcherJobsHandler - Execution of [" + task.getClass().getSimpleName() + "] of ["
+ task.getWatcherId() + "] for [" + task.getTriggerfile() + "] planed!");
}
}
/**
* Starts the watcher jobs handler, which is responsible to handle each
* queue tasks. in {@link #watcherjobs}.
*/
public void startWatcherJobsHandling() {
synchronized (activeJobshandler) {
if (!activeJobshandler.get()) {
int result = -1;
while (true) {
if (getMaxParallelThreadsCount() == -1) {
try {
Thread.sleep(SignalConstants.WAITINGTIME_FOR_WATCHER_INIT);
} catch (InterruptedException e) {
// NOSONAR
}
} else {
result = getMaxParallelThreadsCount();
break;
}
}
setExecutorService(createCustomExecutorService("WatcherJobHandler", result, 0L));
log.debug("WatcherJobsHandler - ExecutorService with maximum parallel threads count set to [" + result
+ "].");
// can be started only once
activeJobshandler.set(true);
// start the watcher job handler
jobshandler = new Thread("WatcherJobsHandler") {
@Override
public void run() {
// the queue execution
while (activeJobshandler.get()) {
try {
Thread.sleep(SignalConstants.WAITINGTIME_FOR_NEWJOB_CHECK);
} catch (InterruptedException e) {
// NOSONAR
}
checkAndHandleJobs();
}
log.info("WatcherJobsHandler - WatcherJobsHandling stopped ...");
log.info("WatcherJobsHandler - Remaining unhandled jobs count[" + watcherjobs.size() + "] !");
}
@Override
public void interrupt() {
log.info("WatcherJobsHandler - WatcherJobsHandling interrupted ...");
log.info("WatcherJobsHandler - Remaining unhandled jobs count[" + watcherjobs.size() + "] !");
super.interrupt();
}
};
jobshandler.start();
log.info("WatcherJobsHandler - WatcherJobsHandling started ...");
}
}
}
/**
* Checks and executes any task in the queue.
*/
private void checkAndHandleJobs() {
if (!watcherjobs.isEmpty()) {
// remove the task from the queue
WatcherJobImpl job = watcherjobs.pop();
// here only watcher with file mask are considered
// since they only handle one task, thus one (import) file TODO
// Export case with file mask!!!!!
if (job.isParallelJobExecutionAllowed()) { // only allowed by
// watcher with file
// mask
if (lastwatcherid == null) {
// mark the task id
lastwatcherid = job.getWatcherId();
log.debug("WatcherJobHandler - START PROCESSING GROUP: " + lastwatcherid.toUpperCase());
// look for its completion
temp.add(job);
// allow it to be processed
job.executeListOfTasks();
}
// grouping task happens here: is the task from the same watcher
// as the last marked (or currently in use) watcher, then it is
// allowed to be processed
// if not then wait until all tasks from the current watcher are
// done.
else {
// check if the watcher is the same
if (job.getWatcherId().equals(lastwatcherid)) {
// look for its completion
temp.add(job);
// trigger the job execution
job.executeListOfTasks();
} else {
// is parallel but not from the same watcher
// the wait until completion of the all watcher task in
// temp
while (!areAllJobsInTempDone()) {
try {
Thread.sleep(SignalConstants.WAITINGTIME_FOR_JOBCOMPLETION_CHECK);
} catch (InterruptedException e) {
log.error("WatcherJobsHandler - ", e);
}
}
log.debug("WatcherJobHandler - END PROCESSING GROUP: " + lastwatcherid.toUpperCase());
// override the last watcher id (type)
lastwatcherid = job.getWatcherId();
log.debug("WatcherJobHandler - START PROCESSING GROUP: " + lastwatcherid.toUpperCase());
// reset the temp list
resetTemp();
// look for the new watcher task its completion
temp.add(job);
// trigger the job execution
job.executeListOfTasks();
}
}
} else {
// check if there is no watcher marked
if (lastwatcherid == null) {
// mark the current watcher id
lastwatcherid = job.getWatcherId();
log.debug("WatcherJobHandler - START PROCESSING GROUP: " + lastwatcherid.toUpperCase());
// look for the new watcher task its completion
temp.add(job);
// trigger the job execution
job.executeListOfTasks();
} else {
// there is a watcher already marked ???
// TODO check if from same watcher?
// yes then allow
// no then wait
// then new group
// wait until completion of all task of the current watcher
// in temp
while (!areAllJobsInTempDone()) {
try {
Thread.sleep(SignalConstants.WAITINGTIME_FOR_JOBCOMPLETION_CHECK);
} catch (InterruptedException e) {
log.error("WatcherJobsHandler - ", e);
}
}
log.debug("WatcherJobHandler - END PROCESSING GROUP: " + lastwatcherid.toUpperCase());
// ONLY WATCHER, WHOSE TASKS HAVE TO BE EXECUTED IN PARALLEL
// NEED TO BE MARKED
// FOR ALL OTHER WATCHERS ONLY ONE THREAD IS NEEDED,
// THEREFORE THOSE TASK SHOULD ALWAYS BE ALLOWED TO BE
// PROCESSED
// override the last watcher id (type)
lastwatcherid = job.getWatcherId();
log.debug("WatcherJobHandler - START PROCESSING GROUP: " + lastwatcherid.toUpperCase());
// reset the temp list
resetTemp();
// look for the new watcher job completion
temp.add(job);
// trigger the job execution
job.executeListOfTasks();
}
}
}
}
public void resetTemp() {
temp = new ArrayList<>();
}
public boolean areAllJobsInTempDone() {
for (WatcherJobImpl t : temp) {
if (!t.isDone()) {
return false;
}
}
return true;
}
/**
* Stops the handling of all watcher jobs.
*
* @throws InterruptedException
*/
public void stopWatcherJobsHandling() throws InterruptedException {
synchronized (activeJobshandler) {
if (activeJobshandler.get()) {
activeJobshandler.set(false);
} else {
log.info(identifier+" - WatcherJobsHandling already stopped.");
}
}
}
/**
* Gives the {@link #executorService} back.
*
* @return {@link ExecutorService} the executor service
*/
public static ExecutorService getExecutorService() {
return executorService;
}
/**
* Sets the {@link #executorService}.
*
* @param executorService
* the executor service
*/
public void setExecutorService(ExecutorService executorService) {
WatcherImpl.executorService = executorService;
}
/**
* Creats a url based on the given directory path.
* @param directory directory path
* @return {@link URL} the created url or null
*/
public URL getDirectoryURL(String directory) {
try {
return new URL(directory);
}
catch (MalformedURLException e1) {
if(e1.getMessage().startsWith("unknown protocol") || e1.getMessage().startsWith("no protocol")) {
try {
return Paths.get(directory).toUri().toURL();
} catch (MalformedURLException e2) {
return null;
}
}
}
return null;
}
/**
* Checks if the given url is valid.
* @param path the path to be checked
* @return true if yes, false if not
*/
public boolean checkURLValidity(String path) {
URL url = getDirectoryURL(path);
if(url != null && !url.getProtocol().equals("file") ){
log.error("The watched directory is not a file based url.");
return false;
}
return true;
}
/**
* Checks if the given file mask is valid.
* @param filemask the file mask
* @return true if yes, false if not
*/
public boolean isFileMaskValid(String filemask) {
if (filemask != null && !filemask.isEmpty()) {
try {
FileSystems.getDefault().getPathMatcher("glob:" + filemask);
return true;
} catch (PatternSyntaxException exception) {
log.error(String.format("%s - The file mask pattern ["+filemask+"] from the configuration file is invalid. Error: ", identifier), exception);
} catch (UnsupportedOperationException exception) {
log.error(String.format("%s - The file mask pattern ["+filemask+"] from the configuration file is not recognized. Error: ", identifier), exception);
} catch (IllegalArgumentException exception) {
log.error(String.format("%s - The file mask pattern ["+filemask+"] from the configuration file is invalid. Error: ", identifier), exception);
}
}
return false;
}
/**
* Checks the given file name is valid.
* @param filename the file name
* @param validextension the valid extension
* @return true if yes, false if not
*/
public boolean isFileNameValid(String filename, String validextension) {
if (filename != null && !filename.isEmpty() && validextension != null && !validextension.isEmpty()) {
if (validextension.equals(CSV_EXTENSION) && !isExtensionAllowedForInterchange(filename, validextension)) {
log.error(String.format("%s - The file name [%s] you have entered is not valid. Only file names ending with the extension %s are allowed for this watcher!", identifier, filename, validextension));
return false;
}
else if (validextension.equals(EDI_EXTENSION)&& !isExtensionAllowedForInterchange(filename, validextension)) {
log.error(String.format("%s - The file name [%s] you have entered is not valid. Only file names ending with the extension %s are allowed for this watcher!", identifier, filename, validextension));
return false;
}
else if (validextension.equals(XML_EXTENSION)&& !isExtensionAllowedForInterchange(filename, validextension)) {
log.error(String.format("%s - The file name [%s] you have entered is not valid. Only file names ending with the extension %s are allowed for this watcher!", identifier, filename, validextension));
return false;
}
else if (!isExtensionValid(validextension)) {
log.error(String.format("%s - The file name extension you have entered is not currently supported. Only file names ending with either one of following extension are allowed: '.csv' '.edi' or '.xml' .", identifier));
return false;
}
return true;
}
return false;
}
/**
* Checks if the given file mask contains the given and expected valid extension.
* @param filemask the file mask
* @param validextension the expected valid extension
* @return true if yes, false if not
*/
public boolean isFileMaskValidWithExpectedFileExtension(String filemask, String validextension) {
if (filemask != null && !filemask.isEmpty() && validextension != null
&& !validextension.isEmpty()) {
if (validextension.equals(CSV_EXTENSION) && !isExtensionAllowedForInterchange(filemask, validextension)) {
log.error(String.format("%s - The file mask [%s] from the configuration file is not valid. Only file masks ending with the extension %s are allowed for this watcher!", identifier, filemask, validextension));
return false;
}
else if ( validextension.equals(EDI_EXTENSION) && !isExtensionAllowedForInterchange(filemask, validextension)) {
log.error(String.format("%s - The file mask [%s] from the configuration file is not valid. Only file masks ending with the extension %s are allowed for this watcher!", identifier, filemask, validextension));
return false;
}
else if ( validextension.equals(XML_EXTENSION) && !isExtensionAllowedForInterchange(filemask, validextension)) {
log.error(String.format("%s - The file mask [%s] from the configuration file is not valid. Only file masks ending with extension %s are allowed for this watcher!", identifier, filemask, validextension));
return false;
}
else if (!isExtensionValid(validextension)) {
log.error(String.format("%s - The file mask extension from the configuration file is not currently supported. Only file masks ending with either one of following extension are allowed: '.csv' '.edi' or '.xml' .", identifier));
return false;
}
return true;
}
return false;
}
/**
* Checks if the given file name contains the given allowed extension.
* @param filename the file name
* @param allowedextension the allowed extension
* @return true if yes, false if not
*/
public boolean isExtensionAllowedForInterchange(String filename, String allowedextension) {
return (filename.endsWith(allowedextension) || filename.endsWith(allowedextension.toLowerCase()));
}
/**
* Checks if the given file name contains the allowed extensions.
* @param filename
* @return
*/
public boolean isExtensionValid(String filename) {
return (filename.endsWith(CSV_EXTENSION) || filename.endsWith(CSV_EXTENSION.toLowerCase())
|| filename.endsWith(EDI_EXTENSION) || filename.endsWith(EDI_EXTENSION.toLowerCase())
|| filename.endsWith(XML_EXTENSION) || filename.endsWith(XML_EXTENSION.toLowerCase()));
}
}