View Javadoc
1   /**
2    * Copyright 2005-2014 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.kcb.quartz;
17  
18  import org.apache.log4j.Logger;
19  import org.apache.ojb.broker.OptimisticLockException;
20  import org.kuali.rice.core.api.util.RiceUtilities;
21  import org.kuali.rice.kcb.quartz.ProcessingResult.Failure;
22  import org.springframework.beans.factory.annotation.Required;
23  import org.springframework.dao.DataAccessException;
24  import org.springframework.transaction.PlatformTransactionManager;
25  import org.springframework.transaction.TransactionException;
26  import org.springframework.transaction.TransactionStatus;
27  import org.springframework.transaction.UnexpectedRollbackException;
28  import org.springframework.transaction.support.TransactionCallback;
29  import org.springframework.transaction.support.TransactionTemplate;
30  
31  import java.sql.Timestamp;
32  import java.util.ArrayList;
33  import java.util.Collection;
34  import java.util.Iterator;
35  import java.util.List;
36  import java.util.concurrent.Callable;
37  import java.util.concurrent.ExecutorService;
38  import java.util.concurrent.Executors;
39  import java.util.concurrent.Future;
40  import java.util.concurrent.ThreadFactory;
41  
42  /**
43   * Base class for jobs that must obtain a set of work items atomically
44   * @author Kuali Rice Team (rice.collab@kuali.org)
45   */
46  public abstract class ConcurrentJob<T> {
47      protected final Logger LOG = Logger.getLogger(getClass());
48  
49      protected ExecutorService executor = Executors.newSingleThreadExecutor(new KCBThreadFactory());
50      protected PlatformTransactionManager txManager;
51  
52      /**
53       * Sets the {@link ExecutorService} to use to process work items.  Default is single-threaded.
54       * @param executor the {@link ExecutorService} to use to process work items.  Default is single-threaded.
55       */
56      public void setExecutorService(ExecutorService executor) {
57          this.executor = executor;
58      }
59  
60      /**
61       * Sets the {@link PlatformTransactionManager}
62       * @param txManager the {@link PlatformTransactionManager} 
63       */
64      @Required
65      public void setTransactionManager(PlatformTransactionManager txManager) {
66          this.txManager = txManager;
67      }
68  
69      /**
70       * Helper method for creating a TransactionTemplate initialized to create
71       * a new transaction
72       * @return a TransactionTemplate initialized to create a new transaction
73       */
74      protected TransactionTemplate createNewTransaction() {
75          TransactionTemplate tt = new TransactionTemplate(txManager);
76          tt.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
77          return tt;
78      }
79  
80      /**
81       * Template method that subclasses should override to obtain a set of available work items
82       * and mark them as taken
83       * @return a collection of available work items that have been marked as taken
84       */
85      protected abstract Collection<T> takeAvailableWorkItems();
86  
87      /**
88       * Template method that subclasses should override to group work items into units of work
89       * @param workItems list of work items to break into groups
90       * @param result ProcessingResult to modify if there are any failures...this is sort of a hack because previously
91       * failure to obtain a deliverer was considered a work item failure, and now this method has been factored out...
92       * but the tests still want to see the failure
93       * @return a collection of collection of work items
94       */
95      protected Collection<Collection<T>> groupWorkItems(Collection<T> workItems, ProcessingResult<T> result) {
96          Collection<Collection<T>> groupedWorkItems = new ArrayList<Collection<T>>(workItems.size());
97          for (T workItem: workItems) {
98              Collection<T> c = new ArrayList<T>(1);
99              c.add(workItem);
100             groupedWorkItems.add(c);
101         }
102         return groupedWorkItems;
103     }
104 
105     /**
106      * Template method that subclasses should override to process a given work item and mark it
107      * as untaken afterwards
108      * @param item the work item
109      * @return a collection of success messages
110      */
111     protected abstract Collection<T> processWorkItems(Collection<T> items);
112 
113     /**
114      * Template method that subclasses should override to unlock a given work item when procesing has failed.
115      * @param item the work item to unlock
116      */
117     protected abstract void unlockWorkItem(T item);
118 
119     /**
120      * Main processing method which invokes subclass implementations of template methods
121      * to obtain available work items, and process them concurrently
122      * @return a ProcessingResult object containing the results of processing
123      */
124     public ProcessingResult<T> run() {
125         LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] STARTING RUN");
126 
127         final ProcessingResult<T> result = new ProcessingResult<T>();
128 
129         // retrieve list of available work items in a transaction
130         final Collection<T> items;
131         try {
132             items = executeInTransaction(new TransactionCallback() {
133                 public Object doInTransaction(TransactionStatus txStatus) {
134                     return takeAvailableWorkItems();
135                 }
136             });
137         } catch (DataAccessException dae) {
138             // Spring does not detect OJB's org.apache.ojb.broker.OptimisticLockException and turn it into a
139             // org.springframework.dao.OptimisticLockingFailureException?
140             OptimisticLockException optimisticLockException = RiceUtilities.findExceptionInStack(dae, OptimisticLockException.class);
141             if (optimisticLockException != null) {
142                 // anticipated in the case that another thread is trying to grab items
143                 LOG.info("Contention while taking work items");
144             } else {
145                 // in addition to logging a message, should we throw an exception or log a failure here?
146                 LOG.error("Error taking work items", dae);
147             }
148             return result;
149         } catch (UnexpectedRollbackException ure) {
150             LOG.error("UnexpectedRollbackException", ure);
151             return result;
152         } catch (TransactionException te) {
153             LOG.error("Error occurred obtaining available work items", te);
154             result.addFailure(new Failure<T>(te));
155             return result;
156         }
157 
158         Collection<Collection<T>> groupedWorkItems = groupWorkItems(items, result);
159 
160         // now iterate over all work groups and process each
161         Iterator<Collection<T>> i = groupedWorkItems.iterator();
162         List<Future> futures = new ArrayList<Future>();
163         while(i.hasNext()) {
164             final Collection<T> workUnit= i.next();
165 
166             LOG.info("Processing work unit: " + workUnit);
167             /* performed within transaction */
168             /* executor manages threads to run work items... */
169             futures.add(executor.submit(new Callable() {
170                 public Object call() throws Exception {
171                     ProcessingResult<T> result = new ProcessingResult<T>();
172                     try {
173                         Collection<T> successes = executeInTransaction(new TransactionCallback() {
174                             public Object doInTransaction(TransactionStatus txStatus) {
175                                 return processWorkItems(workUnit);
176                             }
177                         });
178                         result.addAllSuccesses(successes);
179                     } catch (Exception e) {
180                         LOG.error("Error occurred processing work unit " + workUnit, e);
181                         for (final T workItem: workUnit) {
182                             LOG.error("Error occurred processing work item " + workItem, e);
183                             result.addFailure(new Failure<T>(workItem, e));
184                             unlockWorkItemAtomically(workItem);
185                         }
186                     }
187                     return result;
188                 }
189             }));
190         }
191 
192         // wait for workers to finish
193         for (Future f: futures) {
194             try {
195                 ProcessingResult<T> workResult = (ProcessingResult<T>) f.get();
196                 result.add(workResult);
197             } catch (Exception e) {
198                 String message = "Error obtaining work result: " + e;
199                 LOG.error(message, e);
200                 result.addFailure(new Failure<T>(e, message));
201             }
202         }
203 
204         finishProcessing(result);
205 
206         LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] FINISHED RUN - " + result);
207 
208         return result;
209     }
210 
211     /**
212      * Template method called after processing of work items has completed
213      */
214     protected void finishProcessing(ProcessingResult<T> result) {}
215 
216     protected void unlockWorkItemAtomically(final T workItem) {
217         try {
218             executeInTransaction(new TransactionCallback() {
219                 public Object doInTransaction(TransactionStatus txStatus) {
220                     LOG.info("Unlocking failed work item: " + workItem);
221                     unlockWorkItem(workItem);
222                     return null;
223                 }
224             });
225         } catch (Exception e2) {
226             LOG.error("Error unlocking failed work item " + workItem, e2);
227         }
228     }
229     
230     protected <X> X executeInTransaction(TransactionCallback callback) {
231         // haha just kidding
232         //return (X) callback.doInTransaction(null);
233         return (X) createNewTransaction().execute(callback);
234     }
235     
236     private static class KCBThreadFactory implements ThreadFactory {
237 		
238 		private ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
239 		
240 		public Thread newThread(Runnable runnable) {
241 			Thread thread = defaultThreadFactory.newThread(runnable);
242 			thread.setName("KCB-job-" + thread.getName());
243 			return thread;
244 	    }
245 	}
246 }