blob: fd604e4607689d941dbec1f70298925a88c1ec14 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2010 BSI Business Systems Integration AG.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* BSI Business Systems Integration AG - initial API and implementation
******************************************************************************/
package org.eclipse.scout.rt.server.scheduler;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import org.eclipse.scout.commons.StoppableThread;
import org.eclipse.scout.commons.exception.ProcessingException;
import org.eclipse.scout.commons.logger.IScoutLogger;
import org.eclipse.scout.commons.logger.ScoutLogManager;
import org.eclipse.scout.rt.server.admin.diagnostic.DiagnosticFactory;
import org.eclipse.scout.rt.server.admin.diagnostic.IDiagnostic;
public abstract class AbstractScheduler implements IScheduler, IDiagnostic {
protected static final IScoutLogger LOG = ScoutLogManager.getLogger(Scheduler.class);
private P_Dispatcher m_dispatcher;
private final Object m_queueLock;
private final HashSet<ISchedulerJob> m_availableJobs;
private final HashSet<ISchedulerJob> m_runningJobs;
private final Ticker m_ticker;
private boolean m_active = true;
public AbstractScheduler() throws ProcessingException {
this(new Ticker(Calendar.MINUTE));
}
public AbstractScheduler(Ticker ticker) throws ProcessingException {
m_availableJobs = new HashSet<ISchedulerJob>();
m_runningJobs = new HashSet<ISchedulerJob>();
m_queueLock = new Object();
m_ticker = ticker;
DiagnosticFactory.addDiagnosticStatusProvider(this);
}
@Override
public void setActive(boolean b) {
m_active = b;
}
@Override
public boolean isActive() {
return m_active;
}
@Override
public Ticker getTicker() {
return m_ticker;
}
@Override
public void start() {
synchronized (m_queueLock) {
if (m_dispatcher == null) {
m_dispatcher = new P_Dispatcher();
m_dispatcher.start();
}
}
}
@Override
public void stop() {
synchronized (m_queueLock) {
if (m_dispatcher != null) {
m_dispatcher.setStopSignal();
m_dispatcher = null;
for (ISchedulerJob job : m_runningJobs) {
try {
job.setInterrupted(true);
}
catch (Throwable t) {
LOG.error("" + job, t);
}
}
}
}
}
/**
* Job Queue
*/
private boolean matches(ISchedulerJob job, String groupId, String jobId) {
return (groupId == null || groupId.equals(job.getGroupId())) && (jobId == null || jobId.equals(job.getJobId()));
}
@Override
public void addJob(ISchedulerJob newJob) {
if (newJob == null) {
throw new IllegalArgumentException("job must not be null");
}
synchronized (m_queueLock) {
newJob.setDisposed(false);
String groupId = newJob.getGroupId();
String jobId = newJob.getJobId();
ArrayList<ISchedulerJob> oldJobs = new ArrayList<ISchedulerJob>();
for (ISchedulerJob job : m_availableJobs) {
if (matches(job, groupId, jobId)) {
job.setDisposed(true);
oldJobs.add(job);
}
}
m_availableJobs.removeAll(oldJobs);
m_availableJobs.add(newJob);
// check if job should already be run
boolean oldJobsRunning = false;
for (ISchedulerJob job : m_runningJobs) {
if (matches(job, groupId, jobId)) {
oldJobsRunning = true;
break;
}
}
if (!oldJobsRunning) {
TickSignal tick = m_ticker.getCurrentTick();
visitJobWithoutLocking(newJob, tick);
}
}
}
/**
* convenience for removeJobs(null,null)
*/
@Override
public void removeAllJobs() {
removeJobs(null, null);
}
/**
* @param groupId
* filter value or null as wildcard
* @param jobId
* filter value or null as wildcard
* @return the list of removed jobs
*/
@Override
public Collection<ISchedulerJob> removeJobs(String groupId, String jobId) {
synchronized (m_queueLock) {
ArrayList<ISchedulerJob> removedJobs = new ArrayList<ISchedulerJob>();
for (ISchedulerJob job : m_availableJobs) {
if (matches(job, groupId, jobId)) {
job.setDisposed(true);
removedJobs.add(job);
}
}
m_availableJobs.removeAll(removedJobs);
return removedJobs;
}
}
/**
* convenience for interruptJobs(null,null)
*/
@Override
public void interruptAllJobs() {
interruptJobs(null, null);
}
/**
* @param groupId
* filter value or null as wildcard
* @param jobId
* filter value or null as wildcard
* @return the list of interrupted jobs
*/
@Override
public Collection<ISchedulerJob> interruptJobs(String groupId, String jobId) {
synchronized (m_queueLock) {
ArrayList<ISchedulerJob> intJobs = new ArrayList<ISchedulerJob>();
for (ISchedulerJob job : m_availableJobs) {
if (matches(job, groupId, jobId)) {
if (m_runningJobs.contains(job)) {
job.setInterrupted(true);
intJobs.add(job);
}
}
}
return intJobs;
}
}
@Override
public int getJobCount() {
synchronized (m_queueLock) {
return m_availableJobs.size();
}
}
@Override
public int getRunningJobCount() {
synchronized (m_queueLock) {
return m_runningJobs.size();
}
}
/**
* convenience for getJobs(null,jobId) Note that this will return the first
* found job with that id even though there might be other jobs with that same
* id
*/
@Override
public ISchedulerJob getJob(String jobId) {
Collection<ISchedulerJob> list = getJobs(null, jobId);
if (list.size() >= 1) {
return list.iterator().next();
}
else {
return null;
}
}
/**
* convenience for getJobs(null,null)
*/
@Override
public Collection<ISchedulerJob> getAllJobs() {
return getJobs(null, null);
}
@Override
public Collection<ISchedulerJob> getJobs(String groupId, String jobId) {
synchronized (m_queueLock) {
ArrayList<ISchedulerJob> jobs = new ArrayList<ISchedulerJob>();
for (ISchedulerJob job : m_availableJobs) {
if (matches(job, groupId, jobId)) {
jobs.add(job);
}
}
return jobs;
}
}
/**
* convenience for getRunningJobs(null,null)
*/
@Override
public Collection<ISchedulerJob> getAllRunningJobs() {
return getRunningJobs(null, null);
}
@Override
public Collection<ISchedulerJob> getRunningJobs(String groupId, String jobId) {
synchronized (m_queueLock) {
ArrayList<ISchedulerJob> jobs = new ArrayList<ISchedulerJob>();
for (ISchedulerJob job : m_runningJobs) {
if (matches(job, groupId, jobId)) {
jobs.add(job);
}
}
return jobs;
}
}
protected void visitAllJobs(TickSignal tick) {
synchronized (m_queueLock) {
visitAllJobsWithoutLocking(tick);
}
}
protected void visitAllJobsWithoutLocking(TickSignal tick) {
for (ISchedulerJob job : new ArrayList<ISchedulerJob>(m_availableJobs)) {
visitJobWithoutLocking(job, tick);
}
}
protected void visitJob(ISchedulerJob job, TickSignal tick) {
synchronized (m_queueLock) {
visitJobWithoutLocking(job, tick);
}
}
protected void visitJobWithoutLocking(ISchedulerJob job, TickSignal tick) {
try {
if (m_runningJobs.contains(job)) {
// still running
if (LOG.isInfoEnabled()) {
if (job.acceptTick(tick)) {
LOG.info("job " + job + " is still running at " + tick);
}
}
}
else {
// idle
if (job.isDisposed()) {
m_availableJobs.remove(job);
}
else if (job.acceptTick(tick)) {
m_runningJobs.add(job);
if (LOG.isInfoEnabled()) {
LOG.info("job " + job + " triggered at " + tick);
}
P_JobRunner runner = new P_JobRunner(job, tick);
Thread t = new Thread(runner, "Scheduler.JobLauncher." + job.getGroupId() + "." + job.getJobId());
t.setDaemon(true);
t.start();
}
}
}
catch (Throwable t) {
LOG.error("" + job, t);
}
}
/**
* Every job trigger is launched using this private class
*/
class P_JobRunner implements Runnable {
private ISchedulerJob m_job;
private TickSignal m_signal;
public P_JobRunner(ISchedulerJob job, TickSignal signal) {
m_job = job;
m_signal = signal;
}
public ISchedulerJob getJob() {
return m_job;
}
public TickSignal getTickSignal() {
return m_signal;
}
@Override
public void run() {
try {
m_job.setInterrupted(false);
handleJobExecution(m_job, m_signal);
}
catch (Throwable t) {
LOG.error("uncaught exception", t);
}
finally {
// remove job from running queue
synchronized (m_queueLock) {
m_runningJobs.remove(m_job);
if (m_job.isDisposed()) {
m_availableJobs.remove(m_job);
}
}
}
}
}// end private class
class P_Dispatcher extends StoppableThread {
public P_Dispatcher() {
setName("Scheduler.Dispatcher");
setDaemon(true);
}
@Override
public void run() {
if (LOG.isInfoEnabled()) {
LOG.info("scheduler started");
}
TickSignal signal = m_ticker.waitForNextTick();
if (LOG.isDebugEnabled()) {
LOG.debug("tick " + signal);
}
while (!isStopSignal()) {
try {
if (isActive()) {
visitAllJobs(signal);
signal = m_ticker.waitForNextTick();
if (LOG.isDebugEnabled()) {
LOG.debug("tick " + signal);
}
}
else {
if (LOG.isDebugEnabled()) {
LOG.debug("ticking suspended");
}
try {
sleep(1000);
}
catch (InterruptedException ie) {
}
}
}
catch (Throwable t) {
t.printStackTrace();
LOG.error("unexpected error: ", t);
}
}
if (LOG.isInfoEnabled()) {
LOG.info("scheduler stopped");
}
}
}
/*
* Diagnostics
*/
@Override
public void addDiagnosticItemToList(List<List<String>> result) {
DiagnosticFactory.addDiagnosticItemToList(result, "Scheduler", "", this.isActive() ? DiagnosticFactory.STATUS_ACTIVE : DiagnosticFactory.STATUS_INACTIVE);
if (this.isActive()) {
DiagnosticFactory.addDiagnosticItemToList(result, "Scheduler Jobs", "Total jobs: " + this.getJobCount() + ", Running: " + this.getRunningJobCount(), DiagnosticFactory.STATUS_INFO);
}
}
@Override
public String[] getPossibleActions() {
return null;
}
@Override
public void addSubmitButtonsHTML(List<List<String>> result) {
//NOP
}
@Override
public void call(String action, Object[] value) {
//NOP
}
}