1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.ken.service.impl;
17
18 import org.apache.commons.lang.StringUtils;
19 import org.apache.commons.lang.exception.ExceptionUtils;
20 import org.apache.log4j.Logger;
21 import org.apache.ojb.broker.OptimisticLockException;
22 import org.kuali.rice.ken.service.ProcessingResult;
23 import org.springframework.dao.DataAccessException;
24 import org.springframework.transaction.PlatformTransactionManager;
25 import org.springframework.transaction.TransactionException;
26 import org.springframework.transaction.TransactionStatus;
27 import org.springframework.transaction.UnexpectedRollbackException;
28 import org.springframework.transaction.support.TransactionCallback;
29 import org.springframework.transaction.support.TransactionTemplate;
30
31 import java.sql.SQLException;
32 import java.sql.Timestamp;
33 import java.util.ArrayList;
34 import java.util.Collection;
35 import java.util.Iterator;
36 import java.util.List;
37 import java.util.concurrent.Callable;
38 import java.util.concurrent.ExecutorService;
39 import java.util.concurrent.Future;
40
41
42
43
44
45 public abstract class ConcurrentJob<T> {
46
47
48
49 private static final int ORACLE_00054 = 54;
50
51
52
53 private static final int ORACLE_00060 = 60;
54
55
56 protected final Logger LOG = Logger.getLogger(getClass());
57
58 protected ExecutorService executor;
59 protected PlatformTransactionManager txManager;
60
61
62
63
64
65
66 public ConcurrentJob(PlatformTransactionManager txManager, ExecutorService executor) {
67 this.txManager = txManager;
68 this.executor = executor;
69 }
70
71
72
73
74
75
76 protected TransactionTemplate createNewTransaction() {
77 TransactionTemplate tt = new TransactionTemplate(txManager);
78 tt.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
79 return tt;
80 }
81
82
83
84
85
86
87 protected abstract Collection<T> takeAvailableWorkItems();
88
89
90
91
92
93
94
95
96
97 protected Collection<Collection<T>> groupWorkItems(Collection<T> workItems, ProcessingResult result) {
98 Collection<Collection<T>> groupedWorkItems = new ArrayList<Collection<T>>();
99
100 if (workItems != null) {
101 for (T workItem: workItems) {
102 Collection<T> c = new ArrayList<T>(1);
103 c.add(workItem);
104 groupedWorkItems.add(c);
105 }
106 }
107 return groupedWorkItems;
108 }
109
110
111
112
113
114
115
116 protected abstract Collection<?> processWorkItems(Collection<T> items);
117
118
119
120
121
122 protected abstract void unlockWorkItem(T item);
123
124
125
126
127
128
129 public ProcessingResult run() {
130 LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] STARTING RUN");
131
132 final ProcessingResult result = new ProcessingResult();
133
134
135 Collection<T> items = null;
136 try {
137 items = (Collection<T>)
138 createNewTransaction().execute(new TransactionCallback() {
139 public Object doInTransaction(TransactionStatus txStatus) {
140 return takeAvailableWorkItems();
141 }
142 });
143 } catch (DataAccessException dae) {
144
145
146 if (ExceptionUtils.indexOfType(dae, OptimisticLockException.class) != -1) {
147
148 LOG.info("Contention while taking work items");
149 } else {
150
151 LOG.error("Error taking work items", dae);
152 Throwable t = dae.getMostSpecificCause();
153 if (t != null && t instanceof SQLException) {
154 SQLException sqle = (SQLException) t;
155 if (sqle.getErrorCode() == ORACLE_00054 && StringUtils.contains(sqle.getMessage(), "resource busy")) {
156
157 LOG.warn("Select for update lock contention encountered");
158 } else if (sqle.getErrorCode() == ORACLE_00060 && StringUtils.contains(sqle.getMessage(), "deadlock detected")) {
159
160
161 LOG.error("Select for update deadlock encountered!");
162 }
163 }
164 }
165 return result;
166 } catch (UnexpectedRollbackException ure) {
167
168 LOG.error("UnexpectedRollbackException - possibly due to Mckoi");
169 return result;
170 } catch (TransactionException te) {
171 LOG.error("Error occurred obtaining available work items", te);
172 result.addFailure("Error occurred obtaining available work items: " + te);
173 return result;
174 }
175
176 Collection<Collection<T>> groupedWorkItems = groupWorkItems(items, result);
177
178
179 Iterator<Collection<T>> i = groupedWorkItems.iterator();
180 List<Future> futures = new ArrayList<Future>();
181 while(i.hasNext()) {
182 final Collection<T> workUnit= i.next();
183
184 LOG.info("Processing work unit: " + workUnit);
185
186
187 futures.add(executor.submit(new Callable() {
188 public Object call() throws Exception {
189 ProcessingResult result = new ProcessingResult();
190 try {
191 Collection<?> successes = (Collection<Object>)
192 createNewTransaction().execute(new TransactionCallback() {
193 public Object doInTransaction(TransactionStatus txStatus) {
194 return processWorkItems(workUnit);
195 }
196 });
197 result.addAllSuccesses(successes);
198 } catch (Exception e) {
199 LOG.error("Error occurred processing work unit " + workUnit, e);
200 for (final T workItem: workUnit) {
201 LOG.error("Error occurred processing work item " + workItem, e);
202 result.addFailure("Error occurred processing work item " + workItem + ": " + e);
203 unlockWorkItemAtomically(workItem);
204 }
205 }
206 return result;
207 }
208 }));
209 }
210
211
212 for (Future f: futures) {
213 try {
214 ProcessingResult workResult = (ProcessingResult) f.get();
215 result.add(workResult);
216 } catch (Exception e) {
217 String message = "Error obtaining work result: " + e;
218 LOG.error(message, e);
219 result.addFailure(message);
220 }
221 }
222
223 LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] FINISHED RUN - " + result);
224
225 return result;
226 }
227
228 protected void unlockWorkItemAtomically(final T workItem) {
229 try {
230 createNewTransaction().execute(new TransactionCallback() {
231 public Object doInTransaction(TransactionStatus txStatus) {
232 LOG.info("Unlocking failed work item: " + workItem);
233 unlockWorkItem(workItem);
234 return null;
235 }
236 });
237 } catch (Exception e2) {
238 LOG.error("Error unlocking failed work item " + workItem, e2);
239 }
240 }
241 }