1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.ken.service.impl;
17
18 import org.apache.commons.lang.StringUtils;
19 import org.apache.commons.lang.exception.ExceptionUtils;
20 import org.apache.log4j.Logger;
21 import org.apache.ojb.broker.OptimisticLockException;
22 import org.kuali.rice.ken.service.ProcessingResult;
23 import org.springframework.dao.DataAccessException;
24 import org.springframework.transaction.PlatformTransactionManager;
25 import org.springframework.transaction.TransactionException;
26 import org.springframework.transaction.TransactionStatus;
27 import org.springframework.transaction.UnexpectedRollbackException;
28 import org.springframework.transaction.support.TransactionCallback;
29 import org.springframework.transaction.support.TransactionTemplate;
30
31 import java.sql.SQLException;
32 import java.sql.Timestamp;
33 import java.util.ArrayList;
34 import java.util.Collection;
35 import java.util.Iterator;
36 import java.util.List;
37 import java.util.concurrent.Callable;
38 import java.util.concurrent.ExecutorService;
39 import java.util.concurrent.Future;
40
41
42
43
44
45 public abstract class ConcurrentJob<T> {
46
47
48
49 private static final int ORACLE_00054 = 54;
50
51
52
53 private static final int ORACLE_00060 = 60;
54
55
56 protected final Logger LOG = Logger.getLogger(getClass());
57
58 protected ExecutorService executor;
59 protected PlatformTransactionManager txManager;
60
61
62
63
64
65
66 public ConcurrentJob(PlatformTransactionManager txManager, ExecutorService executor) {
67 this.txManager = txManager;
68 this.executor = executor;
69 }
70
71
72
73
74
75
76 protected TransactionTemplate createNewTransaction() {
77 TransactionTemplate tt = new TransactionTemplate(txManager);
78 tt.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
79 return tt;
80 }
81
82
83
84
85
86
87 protected abstract Collection<T> takeAvailableWorkItems();
88
89
90
91
92
93
94
95
96
97 protected Collection<Collection<T>> groupWorkItems(Collection<T> workItems, ProcessingResult result) {
98 Collection<Collection<T>> groupedWorkItems = new ArrayList<Collection<T>>();
99
100 if (workItems != null) {
101 for (T workItem: workItems) {
102 Collection<T> c = new ArrayList<T>(1);
103 c.add(workItem);
104 groupedWorkItems.add(c);
105 }
106 }
107 return groupedWorkItems;
108 }
109
110
111
112
113
114
115
116 protected abstract Collection<?> processWorkItems(Collection<T> items);
117
118
119
120
121
122 protected abstract void unlockWorkItem(T item);
123
124
125
126
127
128
129 public ProcessingResult run() {
130 LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] STARTING RUN");
131
132 final ProcessingResult result = new ProcessingResult();
133
134
135 Collection<T> items = null;
136 try {
137 items = (Collection<T>)
138 createNewTransaction().execute(new TransactionCallback() {
139 public Object doInTransaction(TransactionStatus txStatus) {
140 return takeAvailableWorkItems();
141 }
142 });
143 } catch (DataAccessException dae) {
144
145
146 if (ExceptionUtils.indexOfType(dae, OptimisticLockException.class) != -1) {
147
148 LOG.info("Contention while taking work items");
149 } else {
150
151 LOG.error("Error taking work items", dae);
152 Throwable t = dae.getMostSpecificCause();
153 if (t != null && t instanceof SQLException) {
154 SQLException sqle = (SQLException) t;
155 if (sqle.getErrorCode() == ORACLE_00054 && StringUtils.contains(sqle.getMessage(), "resource busy")) {
156
157 LOG.warn("Select for update lock contention encountered");
158 } else if (sqle.getErrorCode() == ORACLE_00060 && StringUtils.contains(sqle.getMessage(), "deadlock detected")) {
159
160
161 LOG.error("Select for update deadlock encountered!");
162 }
163 }
164 }
165 return result;
166 } catch (UnexpectedRollbackException ure) {
167 LOG.error("UnexpectedRollbackException", ure);
168 return result;
169 } catch (TransactionException te) {
170 LOG.error("Error occurred obtaining available work items", te);
171 result.addFailure("Error occurred obtaining available work items: " + te);
172 return result;
173 }
174
175 Collection<Collection<T>> groupedWorkItems = groupWorkItems(items, result);
176
177
178 Iterator<Collection<T>> i = groupedWorkItems.iterator();
179 List<Future> futures = new ArrayList<Future>();
180 while(i.hasNext()) {
181 final Collection<T> workUnit= i.next();
182
183 LOG.info("Processing work unit: " + workUnit);
184
185
186 futures.add(executor.submit(new Callable() {
187 public Object call() throws Exception {
188 ProcessingResult result = new ProcessingResult();
189 try {
190 Collection<?> successes = (Collection<Object>)
191 createNewTransaction().execute(new TransactionCallback() {
192 public Object doInTransaction(TransactionStatus txStatus) {
193 return processWorkItems(workUnit);
194 }
195 });
196 result.addAllSuccesses(successes);
197 } catch (Exception e) {
198 LOG.error("Error occurred processing work unit " + workUnit, e);
199 for (final T workItem: workUnit) {
200 LOG.error("Error occurred processing work item " + workItem, e);
201 result.addFailure("Error occurred processing work item " + workItem + ": " + e);
202 unlockWorkItemAtomically(workItem);
203 }
204 }
205 return result;
206 }
207 }));
208 }
209
210
211 for (Future f: futures) {
212 try {
213 ProcessingResult workResult = (ProcessingResult) f.get();
214 result.add(workResult);
215 } catch (Exception e) {
216 String message = "Error obtaining work result: " + e;
217 LOG.error(message, e);
218 result.addFailure(message);
219 }
220 }
221
222 LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] FINISHED RUN - " + result);
223
224 return result;
225 }
226
227 protected void unlockWorkItemAtomically(final T workItem) {
228 try {
229 createNewTransaction().execute(new TransactionCallback() {
230 public Object doInTransaction(TransactionStatus txStatus) {
231 LOG.info("Unlocking failed work item: " + workItem);
232 unlockWorkItem(workItem);
233 return null;
234 }
235 });
236 } catch (Exception e2) {
237 LOG.error("Error unlocking failed work item " + workItem, e2);
238 }
239 }
240 }