View Javadoc

1   /**
2    * Copyright 2005-2011 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.kcb.quartz;
17  
18  import org.apache.log4j.Logger;
19  import org.apache.ojb.broker.OptimisticLockException;
20  import org.kuali.rice.core.api.util.RiceUtilities;
21  import org.kuali.rice.kcb.quartz.ProcessingResult.Failure;
22  import org.springframework.beans.factory.annotation.Required;
23  import org.springframework.dao.DataAccessException;
24  import org.springframework.transaction.PlatformTransactionManager;
25  import org.springframework.transaction.TransactionException;
26  import org.springframework.transaction.TransactionStatus;
27  import org.springframework.transaction.UnexpectedRollbackException;
28  import org.springframework.transaction.support.TransactionCallback;
29  import org.springframework.transaction.support.TransactionTemplate;
30  
31  import java.sql.Timestamp;
32  import java.util.ArrayList;
33  import java.util.Collection;
34  import java.util.Iterator;
35  import java.util.List;
36  import java.util.concurrent.Callable;
37  import java.util.concurrent.ExecutorService;
38  import java.util.concurrent.Executors;
39  import java.util.concurrent.Future;
40  import java.util.concurrent.ThreadFactory;
41  
42  /**
43   * Base class for jobs that must obtain a set of work items atomically
44   * @author Kuali Rice Team (rice.collab@kuali.org)
45   */
46  public abstract class ConcurrentJob<T> {
47      protected final Logger LOG = Logger.getLogger(getClass());
48  
49      protected ExecutorService executor = Executors.newSingleThreadExecutor(new KCBThreadFactory());
50      protected PlatformTransactionManager txManager;
51  
52      /**
53       * Sets the {@link ExecutorService} to use to process work items.  Default is single-threaded.
54       * @param executor the {@link ExecutorService} to use to process work items.  Default is single-threaded.
55       */
56      public void setExecutorService(ExecutorService executor) {
57          this.executor = executor;
58      }
59  
60      /**
61       * Sets the {@link PlatformTransactionManager}
62       * @param txManager the {@link PlatformTransactionManager} 
63       */
64      @Required
65      public void setTransactionManager(PlatformTransactionManager txManager) {
66          this.txManager = txManager;
67      }
68  
69      /**
70       * Helper method for creating a TransactionTemplate initialized to create
71       * a new transaction
72       * @return a TransactionTemplate initialized to create a new transaction
73       */
74      protected TransactionTemplate createNewTransaction() {
75          TransactionTemplate tt = new TransactionTemplate(txManager);
76          tt.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
77          return tt;
78      }
79  
80      /**
81       * Template method that subclasses should override to obtain a set of available work items
82       * and mark them as taken
83       * @return a collection of available work items that have been marked as taken
84       */
85      protected abstract Collection<T> takeAvailableWorkItems();
86  
87      /**
88       * Template method that subclasses should override to group work items into units of work
89       * @param workItems list of work items to break into groups
90       * @param result ProcessingResult to modify if there are any failures...this is sort of a hack because previously
91       * failure to obtain a deliverer was considered a work item failure, and now this method has been factored out...
92       * but the tests still want to see the failure
93       * @return a collection of collection of work items
94       */
95      protected Collection<Collection<T>> groupWorkItems(Collection<T> workItems, ProcessingResult<T> result) {
96          Collection<Collection<T>> groupedWorkItems = new ArrayList<Collection<T>>(workItems.size());
97          for (T workItem: workItems) {
98              Collection<T> c = new ArrayList<T>(1);
99              c.add(workItem);
100             groupedWorkItems.add(c);
101         }
102         return groupedWorkItems;
103     }
104 
105     /**
106      * Template method that subclasses should override to process a given work item and mark it
107      * as untaken afterwards
108      * @param item the work item
109      * @return a collection of success messages
110      */
111     protected abstract Collection<T> processWorkItems(Collection<T> items);
112 
113     /**
114      * Template method that subclasses should override to unlock a given work item when procesing has failed.
115      * @param item the work item to unlock
116      */
117     protected abstract void unlockWorkItem(T item);
118 
119     /**
120      * Main processing method which invokes subclass implementations of template methods
121      * to obtain available work items, and process them concurrently
122      * @return a ProcessingResult object containing the results of processing
123      */
124     public ProcessingResult<T> run() {
125         LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] STARTING RUN");
126 
127         final ProcessingResult<T> result = new ProcessingResult<T>();
128 
129         // retrieve list of available work items in a transaction
130         final Collection<T> items;
131         try {
132             items = executeInTransaction(new TransactionCallback() {
133                 public Object doInTransaction(TransactionStatus txStatus) {
134                     return takeAvailableWorkItems();
135                 }
136             });
137         } catch (DataAccessException dae) {
138             // Spring does not detect OJB's org.apache.ojb.broker.OptimisticLockException and turn it into a
139             // org.springframework.dao.OptimisticLockingFailureException?
140             OptimisticLockException optimisticLockException = RiceUtilities.findExceptionInStack(dae, OptimisticLockException.class);
141             if (optimisticLockException != null) {
142                 // anticipated in the case that another thread is trying to grab items
143                 LOG.info("Contention while taking work items");
144             } else {
145                 // in addition to logging a message, should we throw an exception or log a failure here?
146                 LOG.error("Error taking work items", dae);
147             }
148             return result;
149         } catch (UnexpectedRollbackException ure) {
150             // occurs against Mckoi... :(
151             LOG.error("UnexpectedRollbackException - possibly due to Mckoi");
152             return result;
153         } catch (TransactionException te) {
154             LOG.error("Error occurred obtaining available work items", te);
155             result.addFailure(new Failure<T>(te));
156             return result;
157         }
158 
159         Collection<Collection<T>> groupedWorkItems = groupWorkItems(items, result);
160 
161         // now iterate over all work groups and process each
162         Iterator<Collection<T>> i = groupedWorkItems.iterator();
163         List<Future> futures = new ArrayList<Future>();
164         while(i.hasNext()) {
165             final Collection<T> workUnit= i.next();
166 
167             LOG.info("Processing work unit: " + workUnit);
168             /* performed within transaction */
169             /* executor manages threads to run work items... */
170             futures.add(executor.submit(new Callable() {
171                 public Object call() throws Exception {
172                     ProcessingResult<T> result = new ProcessingResult<T>();
173                     try {
174                         Collection<T> successes = executeInTransaction(new TransactionCallback() {
175                             public Object doInTransaction(TransactionStatus txStatus) {
176                                 return processWorkItems(workUnit);
177                             }
178                         });
179                         result.addAllSuccesses(successes);
180                     } catch (Exception e) {
181                         LOG.error("Error occurred processing work unit " + workUnit, e);
182                         for (final T workItem: workUnit) {
183                             LOG.error("Error occurred processing work item " + workItem, e);
184                             result.addFailure(new Failure<T>(workItem, e));
185                             unlockWorkItemAtomically(workItem);
186                         }
187                     }
188                     return result;
189                 }
190             }));
191         }
192 
193         // wait for workers to finish
194         for (Future f: futures) {
195             try {
196                 ProcessingResult<T> workResult = (ProcessingResult<T>) f.get();
197                 result.add(workResult);
198             } catch (Exception e) {
199                 String message = "Error obtaining work result: " + e;
200                 LOG.error(message, e);
201                 result.addFailure(new Failure<T>(e, message));
202             }
203         }
204 
205         finishProcessing(result);
206 
207         LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] FINISHED RUN - " + result);
208 
209         return result;
210     }
211 
212     /**
213      * Template method called after processing of work items has completed
214      */
215     protected void finishProcessing(ProcessingResult<T> result) {}
216 
217     protected void unlockWorkItemAtomically(final T workItem) {
218         try {
219             executeInTransaction(new TransactionCallback() {
220                 public Object doInTransaction(TransactionStatus txStatus) {
221                     LOG.info("Unlocking failed work item: " + workItem);
222                     unlockWorkItem(workItem);
223                     return null;
224                 }
225             });
226         } catch (Exception e2) {
227             LOG.error("Error unlocking failed work item " + workItem, e2);
228         }
229     }
230     
231     protected <X> X executeInTransaction(TransactionCallback callback) {
232         // haha just kidding
233         //return (X) callback.doInTransaction(null);
234         return (X) createNewTransaction().execute(callback);
235     }
236     
237     private static class KCBThreadFactory implements ThreadFactory {
238 		
239 		private ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
240 		
241 		public Thread newThread(Runnable runnable) {
242 			Thread thread = defaultThreadFactory.newThread(runnable);
243 			thread.setName("KCB-job-" + thread.getName());
244 			return thread;
245 	    }
246 	}
247 }