1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.ken.service.impl;
17
18 import java.sql.SQLException;
19 import java.sql.Timestamp;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Future;
27
28 import javax.persistence.OptimisticLockException;
29
30 import org.apache.commons.lang.StringUtils;
31 import org.apache.log4j.Logger;
32 import org.kuali.rice.ken.service.ProcessingResult;
33 import org.springframework.dao.DataAccessException;
34 import org.springframework.dao.OptimisticLockingFailureException;
35 import org.springframework.transaction.PlatformTransactionManager;
36 import org.springframework.transaction.TransactionException;
37 import org.springframework.transaction.TransactionStatus;
38 import org.springframework.transaction.UnexpectedRollbackException;
39 import org.springframework.transaction.support.TransactionCallback;
40 import org.springframework.transaction.support.TransactionTemplate;
41
42
43
44
45
46 public abstract class ConcurrentJob<T> {
47
48
49
50 private static final int ORACLE_00054 = 54;
51
52
53
54 private static final int ORACLE_00060 = 60;
55
56
57 protected final Logger LOG = Logger.getLogger(getClass());
58
59 protected ExecutorService executor;
60 protected PlatformTransactionManager txManager;
61
62
63
64
65
66
67 public ConcurrentJob(PlatformTransactionManager txManager, ExecutorService executor) {
68 this.txManager = txManager;
69 this.executor = executor;
70 }
71
72
73
74
75
76
77 protected TransactionTemplate createNewTransaction() {
78 TransactionTemplate tt = new TransactionTemplate(txManager);
79 tt.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
80 return tt;
81 }
82
83
84
85
86
87
88 protected abstract Collection<T> takeAvailableWorkItems();
89
90
91
92
93
94
95
96
97
98 protected Collection<Collection<T>> groupWorkItems(Collection<T> workItems, ProcessingResult result) {
99 Collection<Collection<T>> groupedWorkItems = new ArrayList<Collection<T>>();
100
101 if (workItems != null) {
102 for (T workItem: workItems) {
103 Collection<T> c = new ArrayList<T>(1);
104 c.add(workItem);
105 groupedWorkItems.add(c);
106 }
107 }
108 return groupedWorkItems;
109 }
110
111
112
113
114
115
116
117 protected abstract Collection<?> processWorkItems(Collection<T> items);
118
119
120
121
122
123 protected abstract void unlockWorkItem(T item);
124
125
126
127
128
129
130 @SuppressWarnings("unchecked")
131 public ProcessingResult run() {
132 if ( LOG.isDebugEnabled() ) {
133 LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] STARTING RUN");
134 }
135
136 final ProcessingResult result = new ProcessingResult();
137
138
139 Collection<T> items = null;
140 try {
141 items = (Collection<T>)
142 createNewTransaction().execute(new TransactionCallback() {
143 public Object doInTransaction(TransactionStatus txStatus) {
144 return takeAvailableWorkItems();
145 }
146 });
147 } catch (DataAccessException dae) {
148 if ( dae instanceof OptimisticLockingFailureException || dae.contains(OptimisticLockingFailureException.class) || dae.contains(OptimisticLockException.class) ) {
149
150 LOG.info("Contention while taking work items: " + dae.getMessage() );
151 } else {
152
153 LOG.error("Error taking work items", dae);
154 Throwable t = dae.getMostSpecificCause();
155 if (t != null && t instanceof SQLException) {
156 SQLException sqle = (SQLException) t;
157 if (sqle.getErrorCode() == ORACLE_00054 && StringUtils.contains(sqle.getMessage(), "resource busy")) {
158
159 LOG.warn("Select for update lock contention encountered: " + sqle.getMessage() );
160 } else if (sqle.getErrorCode() == ORACLE_00060 && StringUtils.contains(sqle.getMessage(), "deadlock detected")) {
161
162
163 LOG.error("Select for update deadlock encountered! " + sqle.getMessage() );
164 }
165 }
166 }
167 return result;
168 } catch (UnexpectedRollbackException ure) {
169 LOG.error("UnexpectedRollbackException", ure);
170 return result;
171 } catch (TransactionException te) {
172 LOG.error("Error occurred obtaining available work items", te);
173 result.addFailure("Error occurred obtaining available work items: " + te);
174 return result;
175 }
176
177 Collection<Collection<T>> groupedWorkItems = groupWorkItems(items, result);
178
179
180 Iterator<Collection<T>> i = groupedWorkItems.iterator();
181 List<Future> futures = new ArrayList<Future>();
182 while(i.hasNext()) {
183 final Collection<T> workUnit= i.next();
184
185 LOG.info("Processing work unit: " + workUnit);
186
187
188 futures.add(executor.submit(new Callable() {
189 public Object call() throws Exception {
190 ProcessingResult result = new ProcessingResult();
191 try {
192 Collection<?> successes = (Collection<Object>)
193 createNewTransaction().execute(new TransactionCallback() {
194 public Object doInTransaction(TransactionStatus txStatus) {
195 return processWorkItems(workUnit);
196 }
197 });
198 result.addAllSuccesses(successes);
199 } catch (Exception e) {
200 LOG.error("Error occurred processing work unit " + workUnit, e);
201 for (final T workItem: workUnit) {
202 LOG.error("Error occurred processing work item " + workItem, e);
203 result.addFailure("Error occurred processing work item " + workItem + ": " + e);
204 unlockWorkItemAtomically(workItem);
205 }
206 }
207 return result;
208 }
209 }));
210 }
211
212
213 for (Future f: futures) {
214 try {
215 ProcessingResult workResult = (ProcessingResult) f.get();
216 result.add(workResult);
217 } catch (Exception e) {
218 String message = "Error obtaining work result: " + e;
219 LOG.error(message, e);
220 result.addFailure(message);
221 }
222 }
223
224 if ( LOG.isDebugEnabled() ) {
225 LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] FINISHED RUN - " + result);
226 }
227
228 return result;
229 }
230
231 protected void unlockWorkItemAtomically(final T workItem) {
232 try {
233 createNewTransaction().execute(new TransactionCallback() {
234 public Object doInTransaction(TransactionStatus txStatus) {
235 LOG.info("Unlocking failed work item: " + workItem);
236 unlockWorkItem(workItem);
237 return null;
238 }
239 });
240 } catch (Exception e2) {
241 LOG.error("Error unlocking failed work item " + workItem, e2);
242 }
243 }
244 }