1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.kcb.quartz;
17
18 import org.apache.log4j.Logger;
19 import org.apache.ojb.broker.OptimisticLockException;
20 import org.kuali.rice.core.api.util.RiceUtilities;
21 import org.kuali.rice.kcb.quartz.ProcessingResult.Failure;
22 import org.springframework.beans.factory.annotation.Required;
23 import org.springframework.dao.DataAccessException;
24 import org.springframework.transaction.PlatformTransactionManager;
25 import org.springframework.transaction.TransactionException;
26 import org.springframework.transaction.TransactionStatus;
27 import org.springframework.transaction.UnexpectedRollbackException;
28 import org.springframework.transaction.support.TransactionCallback;
29 import org.springframework.transaction.support.TransactionTemplate;
30
31 import java.sql.Timestamp;
32 import java.util.ArrayList;
33 import java.util.Collection;
34 import java.util.Iterator;
35 import java.util.List;
36 import java.util.concurrent.Callable;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.Executors;
39 import java.util.concurrent.Future;
40 import java.util.concurrent.ThreadFactory;
41
42
43
44
45
46 public abstract class ConcurrentJob<T> {
47 protected final Logger LOG = Logger.getLogger(getClass());
48
49 protected ExecutorService executor = Executors.newSingleThreadExecutor(new KCBThreadFactory());
50 protected PlatformTransactionManager txManager;
51
52
53
54
55
56 public void setExecutorService(ExecutorService executor) {
57 this.executor = executor;
58 }
59
60
61
62
63
64 @Required
65 public void setTransactionManager(PlatformTransactionManager txManager) {
66 this.txManager = txManager;
67 }
68
69
70
71
72
73
74 protected TransactionTemplate createNewTransaction() {
75 TransactionTemplate tt = new TransactionTemplate(txManager);
76 tt.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
77 return tt;
78 }
79
80
81
82
83
84
85 protected abstract Collection<T> takeAvailableWorkItems();
86
87
88
89
90
91
92
93
94
95 protected Collection<Collection<T>> groupWorkItems(Collection<T> workItems, ProcessingResult<T> result) {
96 Collection<Collection<T>> groupedWorkItems = new ArrayList<Collection<T>>(workItems.size());
97 for (T workItem: workItems) {
98 Collection<T> c = new ArrayList<T>(1);
99 c.add(workItem);
100 groupedWorkItems.add(c);
101 }
102 return groupedWorkItems;
103 }
104
105
106
107
108
109
110
111 protected abstract Collection<T> processWorkItems(Collection<T> items);
112
113
114
115
116
117 protected abstract void unlockWorkItem(T item);
118
119
120
121
122
123
124 public ProcessingResult<T> run() {
125 LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] STARTING RUN");
126
127 final ProcessingResult<T> result = new ProcessingResult<T>();
128
129
130 final Collection<T> items;
131 try {
132 items = executeInTransaction(new TransactionCallback() {
133 public Object doInTransaction(TransactionStatus txStatus) {
134 return takeAvailableWorkItems();
135 }
136 });
137 } catch (DataAccessException dae) {
138
139
140 OptimisticLockException optimisticLockException = RiceUtilities.findExceptionInStack(dae, OptimisticLockException.class);
141 if (optimisticLockException != null) {
142
143 LOG.info("Contention while taking work items");
144 } else {
145
146 LOG.error("Error taking work items", dae);
147 }
148 return result;
149 } catch (UnexpectedRollbackException ure) {
150 LOG.error("UnexpectedRollbackException", ure);
151 return result;
152 } catch (TransactionException te) {
153 LOG.error("Error occurred obtaining available work items", te);
154 result.addFailure(new Failure<T>(te));
155 return result;
156 }
157
158 Collection<Collection<T>> groupedWorkItems = groupWorkItems(items, result);
159
160
161 Iterator<Collection<T>> i = groupedWorkItems.iterator();
162 List<Future> futures = new ArrayList<Future>();
163 while(i.hasNext()) {
164 final Collection<T> workUnit= i.next();
165
166 LOG.info("Processing work unit: " + workUnit);
167
168
169 futures.add(executor.submit(new Callable() {
170 public Object call() throws Exception {
171 ProcessingResult<T> result = new ProcessingResult<T>();
172 try {
173 Collection<T> successes = executeInTransaction(new TransactionCallback() {
174 public Object doInTransaction(TransactionStatus txStatus) {
175 return processWorkItems(workUnit);
176 }
177 });
178 result.addAllSuccesses(successes);
179 } catch (Exception e) {
180 LOG.error("Error occurred processing work unit " + workUnit, e);
181 for (final T workItem: workUnit) {
182 LOG.error("Error occurred processing work item " + workItem, e);
183 result.addFailure(new Failure<T>(workItem, e));
184 unlockWorkItemAtomically(workItem);
185 }
186 }
187 return result;
188 }
189 }));
190 }
191
192
193 for (Future f: futures) {
194 try {
195 ProcessingResult<T> workResult = (ProcessingResult<T>) f.get();
196 result.add(workResult);
197 } catch (Exception e) {
198 String message = "Error obtaining work result: " + e;
199 LOG.error(message, e);
200 result.addFailure(new Failure<T>(e, message));
201 }
202 }
203
204 finishProcessing(result);
205
206 LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] FINISHED RUN - " + result);
207
208 return result;
209 }
210
211
212
213
214 protected void finishProcessing(ProcessingResult<T> result) {}
215
216 protected void unlockWorkItemAtomically(final T workItem) {
217 try {
218 executeInTransaction(new TransactionCallback() {
219 public Object doInTransaction(TransactionStatus txStatus) {
220 LOG.info("Unlocking failed work item: " + workItem);
221 unlockWorkItem(workItem);
222 return null;
223 }
224 });
225 } catch (Exception e2) {
226 LOG.error("Error unlocking failed work item " + workItem, e2);
227 }
228 }
229
230 protected <X> X executeInTransaction(TransactionCallback callback) {
231
232
233 return (X) createNewTransaction().execute(callback);
234 }
235
236 private static class KCBThreadFactory implements ThreadFactory {
237
238 private ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
239
240 public Thread newThread(Runnable runnable) {
241 Thread thread = defaultThreadFactory.newThread(runnable);
242 thread.setName("KCB-job-" + thread.getName());
243 return thread;
244 }
245 }
246 }