1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  package org.kuali.rice.kcb.quartz;
17  
18  import java.sql.Timestamp;
19  import java.util.ArrayList;
20  import java.util.Collection;
21  import java.util.Iterator;
22  import java.util.List;
23  import java.util.concurrent.Callable;
24  import java.util.concurrent.ExecutorService;
25  import java.util.concurrent.Executors;
26  import java.util.concurrent.Future;
27  import java.util.concurrent.ThreadFactory;
28  
29  import javax.persistence.OptimisticLockException;
30  
31  import org.apache.log4j.Logger;
32  import org.kuali.rice.core.api.util.RiceUtilities;
33  import org.kuali.rice.kcb.quartz.ProcessingResult.Failure;
34  import org.springframework.beans.factory.annotation.Required;
35  import org.springframework.dao.DataAccessException;
36  import org.springframework.dao.OptimisticLockingFailureException;
37  import org.springframework.transaction.PlatformTransactionManager;
38  import org.springframework.transaction.TransactionException;
39  import org.springframework.transaction.TransactionStatus;
40  import org.springframework.transaction.UnexpectedRollbackException;
41  import org.springframework.transaction.support.TransactionCallback;
42  import org.springframework.transaction.support.TransactionTemplate;
43  
44  
45  
46  
47  
48  public abstract class ConcurrentJob<T> {
49      protected final Logger LOG = Logger.getLogger(getClass());
50  
51      protected ExecutorService executor = Executors.newSingleThreadExecutor(new KCBThreadFactory());
52      protected PlatformTransactionManager txManager;
53  
54      
55  
56  
57  
58      public void setExecutorService(ExecutorService executor) {
59          this.executor = executor;
60      }
61  
62      
63  
64  
65  
66      @Required
67      public void setTransactionManager(PlatformTransactionManager txManager) {
68          this.txManager = txManager;
69      }
70  
71      
72  
73  
74  
75  
76      protected TransactionTemplate createNewTransaction() {
77          TransactionTemplate tt = new TransactionTemplate(txManager);
78          tt.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
79          return tt;
80      }
81  
82      
83  
84  
85  
86  
87      protected abstract Collection<T> takeAvailableWorkItems();
88  
89      
90  
91  
92  
93  
94  
95  
96  
97      protected Collection<Collection<T>> groupWorkItems(Collection<T> workItems, ProcessingResult<T> result) {
98          Collection<Collection<T>> groupedWorkItems = new ArrayList<Collection<T>>(workItems.size());
99          for (T workItem: workItems) {
100             Collection<T> c = new ArrayList<T>(1);
101             c.add(workItem);
102             groupedWorkItems.add(c);
103         }
104         return groupedWorkItems;
105     }
106 
107     
108 
109 
110 
111 
112 
113     protected abstract Collection<T> processWorkItems(Collection<T> items);
114 
115     
116 
117 
118 
119     protected abstract void unlockWorkItem(T item);
120 
121     
122 
123 
124 
125 
126     public ProcessingResult<T> run() {
127         if ( LOG.isDebugEnabled() ) {
128             LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] STARTING RUN");
129         }
130 
131         final ProcessingResult<T> result = new ProcessingResult<T>();
132 
133         
134         final Collection<T> items;
135         try {
136             items = executeInTransaction(new TransactionCallback() {
137                 public Object doInTransaction(TransactionStatus txStatus) {
138                     return takeAvailableWorkItems();
139                 }
140             });
141         } catch (DataAccessException dae) {
142             Exception ole = RiceUtilities.findExceptionInStack(dae, OptimisticLockingFailureException.class);
143             if ( ole == null ) {
144                 ole = RiceUtilities.findExceptionInStack(dae, OptimisticLockException.class);
145             }
146             
147             if (ole != null) {
148                 
149                 LOG.info("Contention while taking work items: " + ole.getMessage() );
150             } else {
151                 
152                 LOG.error("Error taking work items", dae);
153             }
154             return result;
155         } catch (UnexpectedRollbackException ure) {
156             LOG.error("UnexpectedRollbackException", ure);
157             return result;
158         } catch (TransactionException te) {
159             LOG.error("Error occurred obtaining available work items", te);
160             result.addFailure(new Failure<T>(te));
161             return result;
162         }
163 
164         Collection<Collection<T>> groupedWorkItems = groupWorkItems(items, result);
165 
166         
167         Iterator<Collection<T>> i = groupedWorkItems.iterator();
168         List<Future> futures = new ArrayList<Future>();
169         while(i.hasNext()) {
170             final Collection<T> workUnit= i.next();
171 
172             LOG.info("Processing work unit: " + workUnit);
173             
174             
175             futures.add(executor.submit(new Callable() {
176                 public Object call() throws Exception {
177                     ProcessingResult<T> result = new ProcessingResult<T>();
178                     try {
179                         Collection<T> successes = executeInTransaction(new TransactionCallback() {
180                             public Object doInTransaction(TransactionStatus txStatus) {
181                                 return processWorkItems(workUnit);
182                             }
183                         });
184                         result.addAllSuccesses(successes);
185                     } catch (Exception e) {
186                         LOG.error("Error occurred processing work unit " + workUnit, e);
187                         for (final T workItem: workUnit) {
188                             LOG.error("Error occurred processing work item " + workItem, e);
189                             result.addFailure(new Failure<T>(workItem, e));
190                             unlockWorkItemAtomically(workItem);
191                         }
192                     }
193                     return result;
194                 }
195             }));
196         }
197 
198         
199         for (Future f: futures) {
200             try {
201                 ProcessingResult<T> workResult = (ProcessingResult<T>) f.get();
202                 result.add(workResult);
203             } catch (Exception e) {
204                 String message = "Error obtaining work result: " + e;
205                 LOG.error(message, e);
206                 result.addFailure(new Failure<T>(e, message));
207             }
208         }
209 
210         finishProcessing(result);
211 
212         if ( LOG.isDebugEnabled() ) {
213             LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] FINISHED RUN - " + result);
214         }
215 
216         return result;
217     }
218 
219     
220 
221 
222     protected void finishProcessing(ProcessingResult<T> result) {}
223 
224     protected void unlockWorkItemAtomically(final T workItem) {
225         try {
226             executeInTransaction(new TransactionCallback() {
227                 public Object doInTransaction(TransactionStatus txStatus) {
228                     LOG.info("Unlocking failed work item: " + workItem);
229                     unlockWorkItem(workItem);
230                     return null;
231                 }
232             });
233         } catch (Exception e2) {
234             LOG.error("Error unlocking failed work item " + workItem, e2);
235         }
236     }
237     
238     protected <X> X executeInTransaction(TransactionCallback callback) {
239         
240         
241         return (X) createNewTransaction().execute(callback);
242     }
243     
244     private static class KCBThreadFactory implements ThreadFactory {
245 		
246 		private ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
247 		
248 		public Thread newThread(Runnable runnable) {
249 			Thread thread = defaultThreadFactory.newThread(runnable);
250 			thread.setName("KCB-job-" + thread.getName());
251 			return thread;
252 	    }
253 	}
254 }