1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.kcb.quartz;
17
18 import java.sql.Timestamp;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.concurrent.Callable;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.Future;
27 import java.util.concurrent.ThreadFactory;
28
29 import org.apache.log4j.Logger;
30 import org.apache.ojb.broker.OptimisticLockException;
31 import org.kuali.rice.core.util.RiceUtilities;
32 import org.kuali.rice.kcb.quartz.ProcessingResult.Failure;
33 import org.springframework.beans.factory.annotation.Required;
34 import org.springframework.dao.DataAccessException;
35 import org.springframework.transaction.PlatformTransactionManager;
36 import org.springframework.transaction.TransactionException;
37 import org.springframework.transaction.TransactionStatus;
38 import org.springframework.transaction.UnexpectedRollbackException;
39 import org.springframework.transaction.support.TransactionCallback;
40 import org.springframework.transaction.support.TransactionTemplate;
41
42
43
44
45
46 public abstract class ConcurrentJob<T> {
47 protected final Logger LOG = Logger.getLogger(getClass());
48
49 protected ExecutorService executor = Executors.newSingleThreadExecutor(new KCBThreadFactory());
50 protected PlatformTransactionManager txManager;
51
52
53
54
55
56 public void setExecutorService(ExecutorService executor) {
57 this.executor = executor;
58 }
59
60
61
62
63
64 @Required
65 public void setTransactionManager(PlatformTransactionManager txManager) {
66 this.txManager = txManager;
67 }
68
69
70
71
72
73
74 protected TransactionTemplate createNewTransaction() {
75 TransactionTemplate tt = new TransactionTemplate(txManager);
76 tt.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
77 return tt;
78 }
79
80
81
82
83
84
85 protected abstract Collection<T> takeAvailableWorkItems();
86
87
88
89
90
91
92
93
94
95 protected Collection<Collection<T>> groupWorkItems(Collection<T> workItems, ProcessingResult<T> result) {
96 Collection<Collection<T>> groupedWorkItems = new ArrayList<Collection<T>>(workItems.size());
97 for (T workItem: workItems) {
98 Collection<T> c = new ArrayList<T>(1);
99 c.add(workItem);
100 groupedWorkItems.add(c);
101 }
102 return groupedWorkItems;
103 }
104
105
106
107
108
109
110
111 protected abstract Collection<T> processWorkItems(Collection<T> items);
112
113
114
115
116
117 protected abstract void unlockWorkItem(T item);
118
119
120
121
122
123
124 public ProcessingResult<T> run() {
125 LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] STARTING RUN");
126
127 final ProcessingResult<T> result = new ProcessingResult<T>();
128
129
130 final Collection<T> items;
131 try {
132 items = executeInTransaction(new TransactionCallback() {
133 public Object doInTransaction(TransactionStatus txStatus) {
134 return takeAvailableWorkItems();
135 }
136 });
137 } catch (DataAccessException dae) {
138
139
140 OptimisticLockException optimisticLockException = RiceUtilities.findExceptionInStack(dae, OptimisticLockException.class);
141 if (optimisticLockException != null) {
142
143 LOG.info("Contention while taking work items");
144 } else {
145
146 LOG.error("Error taking work items", dae);
147 }
148 return result;
149 } catch (UnexpectedRollbackException ure) {
150
151 LOG.error("UnexpectedRollbackException - possibly due to Mckoi");
152 return result;
153 } catch (TransactionException te) {
154 LOG.error("Error occurred obtaining available work items", te);
155 result.addFailure(new Failure<T>(te));
156 return result;
157 }
158
159 Collection<Collection<T>> groupedWorkItems = groupWorkItems(items, result);
160
161
162 Iterator<Collection<T>> i = groupedWorkItems.iterator();
163 List<Future> futures = new ArrayList<Future>();
164 while(i.hasNext()) {
165 final Collection<T> workUnit= i.next();
166
167 LOG.info("Processing work unit: " + workUnit);
168
169
170 futures.add(executor.submit(new Callable() {
171 public Object call() throws Exception {
172 ProcessingResult<T> result = new ProcessingResult<T>();
173 try {
174 Collection<T> successes = executeInTransaction(new TransactionCallback() {
175 public Object doInTransaction(TransactionStatus txStatus) {
176 return processWorkItems(workUnit);
177 }
178 });
179 result.addAllSuccesses(successes);
180 } catch (Exception e) {
181 LOG.error("Error occurred processing work unit " + workUnit, e);
182 for (final T workItem: workUnit) {
183 LOG.error("Error occurred processing work item " + workItem, e);
184 result.addFailure(new Failure<T>(workItem, e));
185 unlockWorkItemAtomically(workItem);
186 }
187 }
188 return result;
189 }
190 }));
191 }
192
193
194 for (Future f: futures) {
195 try {
196 ProcessingResult<T> workResult = (ProcessingResult<T>) f.get();
197 result.add(workResult);
198 } catch (Exception e) {
199 String message = "Error obtaining work result: " + e;
200 LOG.error(message, e);
201 result.addFailure(new Failure<T>(e, message));
202 }
203 }
204
205 finishProcessing(result);
206
207 LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] FINISHED RUN - " + result);
208
209 return result;
210 }
211
212
213
214
215 protected void finishProcessing(ProcessingResult<T> result) {}
216
217 protected void unlockWorkItemAtomically(final T workItem) {
218 try {
219 executeInTransaction(new TransactionCallback() {
220 public Object doInTransaction(TransactionStatus txStatus) {
221 LOG.info("Unlocking failed work item: " + workItem);
222 unlockWorkItem(workItem);
223 return null;
224 }
225 });
226 } catch (Exception e2) {
227 LOG.error("Error unlocking failed work item " + workItem, e2);
228 }
229 }
230
231 protected <X> X executeInTransaction(TransactionCallback callback) {
232
233
234 return (X) createNewTransaction().execute(callback);
235 }
236
237 private static class KCBThreadFactory implements ThreadFactory {
238
239 private ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
240
241 public Thread newThread(Runnable runnable) {
242 Thread thread = defaultThreadFactory.newThread(runnable);
243 thread.setName("KCB-job-" + thread.getName());
244 return thread;
245 }
246 }
247 }