View Javadoc
1   package org.kuali.ole.batch.impl;
2   
3   import org.apache.commons.lang.StringUtils;
4   import org.apache.log4j.Logger;
5   import org.kuali.ole.OLEConstants;
6   import org.kuali.ole.batch.bo.OLEBatchProcessScheduleBo;
7   import org.kuali.ole.batch.service.OLEBatchSchedulerService;
8   import org.kuali.ole.sys.batch.Job;
9   import org.kuali.ole.sys.batch.JobDescriptor;
10  import org.kuali.ole.sys.batch.Step;
11  import org.kuali.ole.sys.batch.service.impl.SchedulerServiceImpl;
12  import org.kuali.rice.krad.service.KRADServiceLocator;
13  import org.quartz.CronExpression;
14  import org.quartz.JobDetail;
15  import org.quartz.SchedulerException;
16  import org.quartz.UnableToInterruptJobException;
17  import org.quartz.JobExecutionContext;
18  import org.springframework.scheduling.quartz.CronTriggerBean;
19  
20  import java.text.ParseException;
21  import java.util.*;
22  
23  /**
24   * Created with IntelliJ IDEA.
25   * User: adityas
26   * Date: 7/23/13
27   * Time: 2:47 PM
28   * To change this template use File | Settings | File Templates.
29   */
30  public class OLEBatchSchedulerServiceImpl extends SchedulerServiceImpl implements OLEBatchSchedulerService {
31      private static final Logger LOG = Logger.getLogger(OLEBatchSchedulerServiceImpl.class);
32      private static volatile Set<String> initJobs = new HashSet<String>();
33      private static final String TRIGGER_SFX = "_Trigger";
34  
35      @Override
36      public void initialize() {
37          //read from scheduler
38          jobListener.setSchedulerService(this);
39          Collection<OLEBatchProcessScheduleBo> oleBatchProcessScheduleBoList = KRADServiceLocator.getBusinessObjectService().findAll(OLEBatchProcessScheduleBo.class);
40          Step step = new OLEBatchProcessStep();
41          for (OLEBatchProcessScheduleBo scheduleBo : oleBatchProcessScheduleBoList) {
42              try {
43                  CronExpression exp = new CronExpression(scheduleBo.getCronExpression());
44                  Date date = exp.getNextValidTimeAfter(new Date());
45                  if (date == null)
46                      continue;
47              } catch (Exception e) {
48                  LOG.error("Error while validating cron exp::" + scheduleBo.getCronExpression(), e);
49  
50              }
51              scheduleBo.getCronExpression();
52              initializeJobs(OLEConstants.OLEBatchProcess.BATCH_JOB + scheduleBo.getScheduleId(), step);
53              try {
54                  initializeTriggersForModule(scheduleBo.getScheduleId(), scheduleBo.getCronExpression());
55              } catch (Exception e) {
56                  throw new RuntimeException(e);
57              }
58              initJobs.add(OLEConstants.OLEBatchProcess.BATCH_JOB + scheduleBo.getScheduleId());
59          }
60      }
61  
62      @Override
63      public void initializeJob(String jobName, Job job) {
64          if (initJobs.contains(jobName)) {
65              job.setSchedulerService(this);
66              job.setParameterService(parameterService);
67              List<Step> steps = new ArrayList<Step>();
68              steps.add(new OLEBatchProcessStep());
69              steps.add(new OLEBatchProcessAdhocStep());
70              steps.add(new OLEBatchProcessEmailStep());
71              job.setSteps(steps);
72              job.setDateTimeService(dateTimeService);
73          } else {
74              super.initializeJob(jobName, job);
75          }
76      }
77  
78      /**
79       * This method loads the jobs with the given jobName - Job name here is the schedule id
80       * the name will always be suffixed with "BATCH_JOB_"
81       *
82       * @param jobName
83       */
84      public void initializeJobsForModule(String jobName) {
85          jobListener.setSchedulerService(this);
86          if (initJobs.contains(OLEConstants.OLEBatchProcess.BATCH_JOB + jobName)) return;
87          initJobs.add(OLEConstants.OLEBatchProcess.BATCH_JOB + jobName);
88          initializeJobs(OLEConstants.OLEBatchProcess.BATCH_JOB + jobName, new OLEBatchProcessStep());
89      }
90  
91      /**
92       * method initializes triggers for the given jobName - Job name here is the schedule id
93       * the name will always be suffixed with "BATCH_JOB_"
94       *
95       * @param jobName
96       * @param cronExpression
97       */
98      public void initializeTriggersForModule(String jobName, String cronExpression) throws Exception {
99          try {
100             CronExpression exp = new CronExpression(cronExpression);
101             Date date = exp.getNextValidTimeAfter(new Date());
102             if (date == null) {
103                 throw new RuntimeException("given cron expression already past its valid time::" + cronExpression);
104             } else {
105                 LOG.info("Next valid run time is:: " + date.toString() + " for the schedule job :: " + jobName);
106                 addTrigger(getCronTriggerBean(jobName, cronExpression));
107             }
108         } catch (Exception e) {
109             LOG.info("given cron expression already past its valid time::" + cronExpression, e);
110             throw e;
111         }
112 
113     }
114 
115     private CronTriggerBean getCronTriggerBean(String jobName, String cronExpression) {
116         CronTriggerBean cronTriggerBean = new CronTriggerBean();
117         cronTriggerBean.setName(jobName + TRIGGER_SFX);
118         try {
119             cronTriggerBean.setCronExpression(cronExpression);
120         } catch (ParseException e) {
121             LOG.error("Error while parsing cron expression :: " + cronExpression, e);
122             throw new RuntimeException("Error while parsing cron expression", e);
123         }
124         cronTriggerBean.setJobName(OLEConstants.OLEBatchProcess.BATCH_JOB + jobName);
125         cronTriggerBean.setJobGroup(SCHEDULED_GROUP);
126         return cronTriggerBean;
127     }
128 
129     private void initializeJobs(String jobName, Step step) {
130         JobDescriptor jobDescriptor = new JobDescriptor();
131         jobDescriptor.setBeanName(jobName);
132         jobDescriptor.setGroup(SCHEDULED_GROUP);
133         JobDetail jobDetail = jobDescriptor.getJobDetail();
134         jobDescriptor.getSteps().add(step);
135         loadJob(jobDescriptor);
136     }
137 
138     private void initializeAdhocJobs(String jobName, Step step) {
139         JobDescriptor jobDescriptor = new JobDescriptor();
140         jobDescriptor.setBeanName(jobName);
141         jobDescriptor.setGroup(UNSCHEDULED_GROUP);
142         JobDetail jobDetail = jobDescriptor.getJobDetail();
143         jobDescriptor.getSteps().add(step);
144         loadJob(jobDescriptor);
145     }
146 
147     public void startJob(String jobName) throws Exception {
148         try {
149             jobListener.setSchedulerService(this);
150             if (initJobs.contains(OLEConstants.OLEBatchProcess.ADHOC_BATCH_JOB + jobName)) return;
151             initJobs.add(OLEConstants.OLEBatchProcess.ADHOC_BATCH_JOB + jobName);
152             initializeAdhocJobs(OLEConstants.OLEBatchProcess.ADHOC_BATCH_JOB + jobName, new OLEBatchProcessAdhocStep());
153             scheduler.triggerJob(OLEConstants.OLEBatchProcess.ADHOC_BATCH_JOB + jobName, UNSCHEDULED_GROUP);
154         } catch (SchedulerException e) {
155             LOG.error("Error while starting job :: " + OLEConstants.OLEBatchProcess.ADHOC_BATCH_JOB + jobName, e);
156             throw e;
157         }
158     }
159 
160     public void pauseJob(String jobName) throws Exception {
161         try {
162             scheduler.pauseJob(OLEConstants.OLEBatchProcess.ADHOC_BATCH_JOB + jobName, SCHEDULED_GROUP);
163         } catch (SchedulerException e) {
164             LOG.error("Error while pausing job :: " + OLEConstants.OLEBatchProcess.ADHOC_BATCH_JOB + jobName, e);
165             throw e;
166         }
167     }
168 
169     public void resumeJob(String jobName) throws Exception {
170         try {
171             scheduler.resumeJob(OLEConstants.OLEBatchProcess.ADHOC_BATCH_JOB + jobName, SCHEDULED_GROUP);
172         } catch (SchedulerException e) {
173             LOG.error("Error while resuming job :: " + OLEConstants.OLEBatchProcess.ADHOC_BATCH_JOB + jobName, e);
174             throw e;
175         }
176     }
177 
178     @Override
179     public void deleteJob(String jobName) throws Exception {
180         removeScheduled(OLEConstants.OLEBatchProcess.BATCH_JOB + jobName);
181     }
182 
183     @Override
184     public void rescheduleJob(String jobName, String cronExp) throws Exception {
185         scheduler.rescheduleJob(jobName + TRIGGER_SFX, OLEConstants.OLEBatchProcess.BATCH_JOB + jobName, getCronTriggerBean(jobName, cronExp));
186     }
187 
188     public void unScheduleOneTimeJob(String jobName, String jobGroup) {
189         Map map = new HashMap();
190         map.put("scheduleId", StringUtils.substringAfter(jobName, "BATCH_JOB_"));
191         OLEBatchProcessScheduleBo scheduleBo = KRADServiceLocator.getBusinessObjectService().findByPrimaryKey(OLEBatchProcessScheduleBo.class, map);
192         if (scheduleBo != null && StringUtils.isNotBlank(scheduleBo.getOneTimeOrRecurring())) {
193             if (scheduleBo.getOneTimeOrRecurring().equalsIgnoreCase("onetime")) {
194                 removeScheduled(jobName);
195                 LOG.info("Removed" + jobName + "from Scheduler");
196             }
197         }
198     }
199 }