1 package org.kuali.ole.batch.impl;
2
3 import org.apache.commons.lang.StringUtils;
4 import org.apache.log4j.Logger;
5 import org.kuali.ole.OLEConstants;
6 import org.kuali.ole.batch.bo.OLEBatchProcessScheduleBo;
7 import org.kuali.ole.batch.service.OLEBatchSchedulerService;
8 import org.kuali.ole.sys.batch.Job;
9 import org.kuali.ole.sys.batch.JobDescriptor;
10 import org.kuali.ole.sys.batch.Step;
11 import org.kuali.ole.sys.batch.service.impl.SchedulerServiceImpl;
12 import org.kuali.rice.krad.service.KRADServiceLocator;
13 import org.quartz.CronExpression;
14 import org.quartz.JobDetail;
15 import org.quartz.SchedulerException;
16 import org.quartz.UnableToInterruptJobException;
17 import org.quartz.JobExecutionContext;
18 import org.springframework.scheduling.quartz.CronTriggerBean;
19
20 import java.text.ParseException;
21 import java.util.*;
22
23
24
25
26
27
28
29
30 public class OLEBatchSchedulerServiceImpl extends SchedulerServiceImpl implements OLEBatchSchedulerService {
31 private static final Logger LOG = Logger.getLogger(OLEBatchSchedulerServiceImpl.class);
32 private static volatile Set<String> initJobs = new HashSet<String>();
33 private static final String TRIGGER_SFX = "_Trigger";
34
35 @Override
36 public void initialize() {
37
38 jobListener.setSchedulerService(this);
39 Collection<OLEBatchProcessScheduleBo> oleBatchProcessScheduleBoList = KRADServiceLocator.getBusinessObjectService().findAll(OLEBatchProcessScheduleBo.class);
40 Step step = new OLEBatchProcessStep();
41 for (OLEBatchProcessScheduleBo scheduleBo : oleBatchProcessScheduleBoList) {
42 try {
43 CronExpression exp = new CronExpression(scheduleBo.getCronExpression());
44 Date date = exp.getNextValidTimeAfter(new Date());
45 if (date == null)
46 continue;
47 } catch (Exception e) {
48 LOG.error("Error while validating cron exp::" + scheduleBo.getCronExpression(), e);
49
50 }
51 scheduleBo.getCronExpression();
52 initializeJobs(OLEConstants.OLEBatchProcess.BATCH_JOB + scheduleBo.getScheduleId(), step);
53 try {
54 initializeTriggersForModule(scheduleBo.getScheduleId(), scheduleBo.getCronExpression());
55 } catch (Exception e) {
56 throw new RuntimeException(e);
57 }
58 initJobs.add(OLEConstants.OLEBatchProcess.BATCH_JOB + scheduleBo.getScheduleId());
59 }
60 }
61
62 @Override
63 public void initializeJob(String jobName, Job job) {
64 if (initJobs.contains(jobName)) {
65 job.setSchedulerService(this);
66 job.setParameterService(parameterService);
67 List<Step> steps = new ArrayList<Step>();
68 steps.add(new OLEBatchProcessStep());
69 steps.add(new OLEBatchProcessAdhocStep());
70 steps.add(new OLEBatchProcessEmailStep());
71 job.setSteps(steps);
72 job.setDateTimeService(dateTimeService);
73 } else {
74 super.initializeJob(jobName, job);
75 }
76 }
77
78
79
80
81
82
83
84 public void initializeJobsForModule(String jobName) {
85 jobListener.setSchedulerService(this);
86 if (initJobs.contains(OLEConstants.OLEBatchProcess.BATCH_JOB + jobName)) return;
87 initJobs.add(OLEConstants.OLEBatchProcess.BATCH_JOB + jobName);
88 initializeJobs(OLEConstants.OLEBatchProcess.BATCH_JOB + jobName, new OLEBatchProcessStep());
89 }
90
91
92
93
94
95
96
97
98 public void initializeTriggersForModule(String jobName, String cronExpression) throws Exception {
99 try {
100 CronExpression exp = new CronExpression(cronExpression);
101 Date date = exp.getNextValidTimeAfter(new Date());
102 if (date == null) {
103 throw new RuntimeException("given cron expression already past its valid time::" + cronExpression);
104 } else {
105 LOG.info("Next valid run time is:: " + date.toString() + " for the schedule job :: " + jobName);
106 addTrigger(getCronTriggerBean(jobName, cronExpression));
107 }
108 } catch (Exception e) {
109 LOG.info("given cron expression already past its valid time::" + cronExpression, e);
110 throw e;
111 }
112
113 }
114
115 private CronTriggerBean getCronTriggerBean(String jobName, String cronExpression) {
116 CronTriggerBean cronTriggerBean = new CronTriggerBean();
117 cronTriggerBean.setName(jobName + TRIGGER_SFX);
118 try {
119 cronTriggerBean.setCronExpression(cronExpression);
120 } catch (ParseException e) {
121 LOG.error("Error while parsing cron expression :: " + cronExpression, e);
122 throw new RuntimeException("Error while parsing cron expression", e);
123 }
124 cronTriggerBean.setJobName(OLEConstants.OLEBatchProcess.BATCH_JOB + jobName);
125 cronTriggerBean.setJobGroup(SCHEDULED_GROUP);
126 return cronTriggerBean;
127 }
128
129 private void initializeJobs(String jobName, Step step) {
130 JobDescriptor jobDescriptor = new JobDescriptor();
131 jobDescriptor.setBeanName(jobName);
132 jobDescriptor.setGroup(SCHEDULED_GROUP);
133 JobDetail jobDetail = jobDescriptor.getJobDetail();
134 jobDescriptor.getSteps().add(step);
135 loadJob(jobDescriptor);
136 }
137
138 private void initializeAdhocJobs(String jobName, Step step) {
139 JobDescriptor jobDescriptor = new JobDescriptor();
140 jobDescriptor.setBeanName(jobName);
141 jobDescriptor.setGroup(UNSCHEDULED_GROUP);
142 JobDetail jobDetail = jobDescriptor.getJobDetail();
143 jobDescriptor.getSteps().add(step);
144 loadJob(jobDescriptor);
145 }
146
147 public void startJob(String jobName) throws Exception {
148 try {
149 jobListener.setSchedulerService(this);
150 if (initJobs.contains(OLEConstants.OLEBatchProcess.ADHOC_BATCH_JOB + jobName)) return;
151 initJobs.add(OLEConstants.OLEBatchProcess.ADHOC_BATCH_JOB + jobName);
152 initializeAdhocJobs(OLEConstants.OLEBatchProcess.ADHOC_BATCH_JOB + jobName, new OLEBatchProcessAdhocStep());
153 scheduler.triggerJob(OLEConstants.OLEBatchProcess.ADHOC_BATCH_JOB + jobName, UNSCHEDULED_GROUP);
154 } catch (SchedulerException e) {
155 LOG.error("Error while starting job :: " + OLEConstants.OLEBatchProcess.ADHOC_BATCH_JOB + jobName, e);
156 throw e;
157 }
158 }
159
160 public void pauseJob(String jobName) throws Exception {
161 try {
162 scheduler.pauseJob(OLEConstants.OLEBatchProcess.ADHOC_BATCH_JOB + jobName, SCHEDULED_GROUP);
163 } catch (SchedulerException e) {
164 LOG.error("Error while pausing job :: " + OLEConstants.OLEBatchProcess.ADHOC_BATCH_JOB + jobName, e);
165 throw e;
166 }
167 }
168
169 public void resumeJob(String jobName) throws Exception {
170 try {
171 scheduler.resumeJob(OLEConstants.OLEBatchProcess.ADHOC_BATCH_JOB + jobName, SCHEDULED_GROUP);
172 } catch (SchedulerException e) {
173 LOG.error("Error while resuming job :: " + OLEConstants.OLEBatchProcess.ADHOC_BATCH_JOB + jobName, e);
174 throw e;
175 }
176 }
177
178 @Override
179 public void deleteJob(String jobName) throws Exception {
180 removeScheduled(OLEConstants.OLEBatchProcess.BATCH_JOB + jobName);
181 }
182
183 @Override
184 public void rescheduleJob(String jobName, String cronExp) throws Exception {
185 scheduler.rescheduleJob(jobName + TRIGGER_SFX, OLEConstants.OLEBatchProcess.BATCH_JOB + jobName, getCronTriggerBean(jobName, cronExp));
186 }
187
188 public void unScheduleOneTimeJob(String jobName, String jobGroup) {
189 Map map = new HashMap();
190 map.put("scheduleId", StringUtils.substringAfter(jobName, "BATCH_JOB_"));
191 OLEBatchProcessScheduleBo scheduleBo = KRADServiceLocator.getBusinessObjectService().findByPrimaryKey(OLEBatchProcessScheduleBo.class, map);
192 if (scheduleBo != null && StringUtils.isNotBlank(scheduleBo.getOneTimeOrRecurring())) {
193 if (scheduleBo.getOneTimeOrRecurring().equalsIgnoreCase("onetime")) {
194 removeScheduled(jobName);
195 LOG.info("Removed" + jobName + "from Scheduler");
196 }
197 }
198 }
199 }