1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.kcb.quartz;
17
18 import java.sql.Timestamp;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.concurrent.Callable;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.Future;
27 import java.util.concurrent.ThreadFactory;
28
29 import javax.persistence.OptimisticLockException;
30
31 import org.apache.log4j.Logger;
32 import org.kuali.rice.core.api.util.RiceUtilities;
33 import org.kuali.rice.kcb.quartz.ProcessingResult.Failure;
34 import org.springframework.beans.factory.annotation.Required;
35 import org.springframework.dao.DataAccessException;
36 import org.springframework.dao.OptimisticLockingFailureException;
37 import org.springframework.transaction.PlatformTransactionManager;
38 import org.springframework.transaction.TransactionException;
39 import org.springframework.transaction.TransactionStatus;
40 import org.springframework.transaction.UnexpectedRollbackException;
41 import org.springframework.transaction.support.TransactionCallback;
42 import org.springframework.transaction.support.TransactionTemplate;
43
44
45
46
47
48 public abstract class ConcurrentJob<T> {
49 protected final Logger LOG = Logger.getLogger(getClass());
50
51 protected ExecutorService executor = Executors.newSingleThreadExecutor(new KCBThreadFactory());
52 protected PlatformTransactionManager txManager;
53
54
55
56
57
58 public void setExecutorService(ExecutorService executor) {
59 this.executor = executor;
60 }
61
62
63
64
65
66 @Required
67 public void setTransactionManager(PlatformTransactionManager txManager) {
68 this.txManager = txManager;
69 }
70
71
72
73
74
75
76 protected TransactionTemplate createNewTransaction() {
77 TransactionTemplate tt = new TransactionTemplate(txManager);
78 tt.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
79 return tt;
80 }
81
82
83
84
85
86
87 protected abstract Collection<T> takeAvailableWorkItems();
88
89
90
91
92
93
94
95
96
97 protected Collection<Collection<T>> groupWorkItems(Collection<T> workItems, ProcessingResult<T> result) {
98 Collection<Collection<T>> groupedWorkItems = new ArrayList<Collection<T>>(workItems.size());
99 for (T workItem: workItems) {
100 Collection<T> c = new ArrayList<T>(1);
101 c.add(workItem);
102 groupedWorkItems.add(c);
103 }
104 return groupedWorkItems;
105 }
106
107
108
109
110
111
112
113 protected abstract Collection<T> processWorkItems(Collection<T> items);
114
115
116
117
118
119 protected abstract void unlockWorkItem(T item);
120
121
122
123
124
125
126 public ProcessingResult<T> run() {
127 if ( LOG.isDebugEnabled() ) {
128 LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] STARTING RUN");
129 }
130
131 final ProcessingResult<T> result = new ProcessingResult<T>();
132
133
134 final Collection<T> items;
135 try {
136 items = executeInTransaction(new TransactionCallback() {
137 public Object doInTransaction(TransactionStatus txStatus) {
138 return takeAvailableWorkItems();
139 }
140 });
141 } catch (DataAccessException dae) {
142 Exception ole = RiceUtilities.findExceptionInStack(dae, OptimisticLockingFailureException.class);
143 if ( ole == null ) {
144 ole = RiceUtilities.findExceptionInStack(dae, OptimisticLockException.class);
145 }
146
147 if (ole != null) {
148
149 LOG.info("Contention while taking work items: " + ole.getMessage() );
150 } else {
151
152 LOG.error("Error taking work items", dae);
153 }
154 return result;
155 } catch (UnexpectedRollbackException ure) {
156 LOG.error("UnexpectedRollbackException", ure);
157 return result;
158 } catch (TransactionException te) {
159 LOG.error("Error occurred obtaining available work items", te);
160 result.addFailure(new Failure<T>(te));
161 return result;
162 }
163
164 Collection<Collection<T>> groupedWorkItems = groupWorkItems(items, result);
165
166
167 Iterator<Collection<T>> i = groupedWorkItems.iterator();
168 List<Future> futures = new ArrayList<Future>();
169 while(i.hasNext()) {
170 final Collection<T> workUnit= i.next();
171
172 LOG.info("Processing work unit: " + workUnit);
173
174
175 futures.add(executor.submit(new Callable() {
176 public Object call() throws Exception {
177 ProcessingResult<T> result = new ProcessingResult<T>();
178 try {
179 Collection<T> successes = executeInTransaction(new TransactionCallback() {
180 public Object doInTransaction(TransactionStatus txStatus) {
181 return processWorkItems(workUnit);
182 }
183 });
184 result.addAllSuccesses(successes);
185 } catch (Exception e) {
186 LOG.error("Error occurred processing work unit " + workUnit, e);
187 for (final T workItem: workUnit) {
188 LOG.error("Error occurred processing work item " + workItem, e);
189 result.addFailure(new Failure<T>(workItem, e));
190 unlockWorkItemAtomically(workItem);
191 }
192 }
193 return result;
194 }
195 }));
196 }
197
198
199 for (Future f: futures) {
200 try {
201 ProcessingResult<T> workResult = (ProcessingResult<T>) f.get();
202 result.add(workResult);
203 } catch (Exception e) {
204 String message = "Error obtaining work result: " + e;
205 LOG.error(message, e);
206 result.addFailure(new Failure<T>(e, message));
207 }
208 }
209
210 finishProcessing(result);
211
212 if ( LOG.isDebugEnabled() ) {
213 LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] FINISHED RUN - " + result);
214 }
215
216 return result;
217 }
218
219
220
221
222 protected void finishProcessing(ProcessingResult<T> result) {}
223
224 protected void unlockWorkItemAtomically(final T workItem) {
225 try {
226 executeInTransaction(new TransactionCallback() {
227 public Object doInTransaction(TransactionStatus txStatus) {
228 LOG.info("Unlocking failed work item: " + workItem);
229 unlockWorkItem(workItem);
230 return null;
231 }
232 });
233 } catch (Exception e2) {
234 LOG.error("Error unlocking failed work item " + workItem, e2);
235 }
236 }
237
238 protected <X> X executeInTransaction(TransactionCallback callback) {
239
240
241 return (X) createNewTransaction().execute(callback);
242 }
243
244 private static class KCBThreadFactory implements ThreadFactory {
245
246 private ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
247
248 public Thread newThread(Runnable runnable) {
249 Thread thread = defaultThreadFactory.newThread(runnable);
250 thread.setName("KCB-job-" + thread.getName());
251 return thread;
252 }
253 }
254 }