View Javadoc
1   /**
2    * Copyright 2005-2015 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.kcb.quartz;
17  
18  import java.sql.Timestamp;
19  import java.util.ArrayList;
20  import java.util.Collection;
21  import java.util.Iterator;
22  import java.util.List;
23  import java.util.concurrent.Callable;
24  import java.util.concurrent.ExecutorService;
25  import java.util.concurrent.Executors;
26  import java.util.concurrent.Future;
27  import java.util.concurrent.ThreadFactory;
28  
29  import javax.persistence.OptimisticLockException;
30  
31  import org.apache.log4j.Logger;
32  import org.kuali.rice.core.api.util.RiceUtilities;
33  import org.kuali.rice.kcb.quartz.ProcessingResult.Failure;
34  import org.springframework.beans.factory.annotation.Required;
35  import org.springframework.dao.DataAccessException;
36  import org.springframework.dao.OptimisticLockingFailureException;
37  import org.springframework.transaction.PlatformTransactionManager;
38  import org.springframework.transaction.TransactionException;
39  import org.springframework.transaction.TransactionStatus;
40  import org.springframework.transaction.UnexpectedRollbackException;
41  import org.springframework.transaction.support.TransactionCallback;
42  import org.springframework.transaction.support.TransactionTemplate;
43  
44  /**
45   * Base class for jobs that must obtain a set of work items atomically
46   * @author Kuali Rice Team (rice.collab@kuali.org)
47   */
48  public abstract class ConcurrentJob<T> {
49      protected final Logger LOG = Logger.getLogger(getClass());
50  
51      protected ExecutorService executor = Executors.newSingleThreadExecutor(new KCBThreadFactory());
52      protected PlatformTransactionManager txManager;
53  
54      /**
55       * Sets the {@link ExecutorService} to use to process work items.  Default is single-threaded.
56       * @param executor the {@link ExecutorService} to use to process work items.  Default is single-threaded.
57       */
58      public void setExecutorService(ExecutorService executor) {
59          this.executor = executor;
60      }
61  
62      /**
63       * Sets the {@link PlatformTransactionManager}
64       * @param txManager the {@link PlatformTransactionManager} 
65       */
66      @Required
67      public void setTransactionManager(PlatformTransactionManager txManager) {
68          this.txManager = txManager;
69      }
70  
71      /**
72       * Helper method for creating a TransactionTemplate initialized to create
73       * a new transaction
74       * @return a TransactionTemplate initialized to create a new transaction
75       */
76      protected TransactionTemplate createNewTransaction() {
77          TransactionTemplate tt = new TransactionTemplate(txManager);
78          tt.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
79          return tt;
80      }
81  
82      /**
83       * Template method that subclasses should override to obtain a set of available work items
84       * and mark them as taken
85       * @return a collection of available work items that have been marked as taken
86       */
87      protected abstract Collection<T> takeAvailableWorkItems();
88  
89      /**
90       * Template method that subclasses should override to group work items into units of work
91       * @param workItems list of work items to break into groups
92       * @param result ProcessingResult to modify if there are any failures...this is sort of a hack because previously
93       * failure to obtain a deliverer was considered a work item failure, and now this method has been factored out...
94       * but the tests still want to see the failure
95       * @return a collection of collection of work items
96       */
97      protected Collection<Collection<T>> groupWorkItems(Collection<T> workItems, ProcessingResult<T> result) {
98          Collection<Collection<T>> groupedWorkItems = new ArrayList<Collection<T>>(workItems.size());
99          for (T workItem: workItems) {
100             Collection<T> c = new ArrayList<T>(1);
101             c.add(workItem);
102             groupedWorkItems.add(c);
103         }
104         return groupedWorkItems;
105     }
106 
107     /**
108      * Template method that subclasses should override to process a given work item and mark it
109      * as untaken afterwards
110      * @param item the work item
111      * @return a collection of success messages
112      */
113     protected abstract Collection<T> processWorkItems(Collection<T> items);
114 
115     /**
116      * Template method that subclasses should override to unlock a given work item when procesing has failed.
117      * @param item the work item to unlock
118      */
119     protected abstract void unlockWorkItem(T item);
120 
121     /**
122      * Main processing method which invokes subclass implementations of template methods
123      * to obtain available work items, and process them concurrently
124      * @return a ProcessingResult object containing the results of processing
125      */
126     public ProcessingResult<T> run() {
127         if ( LOG.isDebugEnabled() ) {
128             LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] STARTING RUN");
129         }
130 
131         final ProcessingResult<T> result = new ProcessingResult<T>();
132 
133         // retrieve list of available work items in a transaction
134         final Collection<T> items;
135         try {
136             items = executeInTransaction(new TransactionCallback() {
137                 public Object doInTransaction(TransactionStatus txStatus) {
138                     return takeAvailableWorkItems();
139                 }
140             });
141         } catch (DataAccessException dae) {
142             Exception ole = RiceUtilities.findExceptionInStack(dae, OptimisticLockingFailureException.class);
143             if ( ole == null ) {
144                 ole = RiceUtilities.findExceptionInStack(dae, OptimisticLockException.class);
145             }
146             
147             if (ole != null) {
148                 // anticipated in the case that another thread is trying to grab items
149                 LOG.info("Contention while taking work items: " + ole.getMessage() );
150             } else {
151                 // in addition to logging a message, should we throw an exception or log a failure here?
152                 LOG.error("Error taking work items", dae);
153             }
154             return result;
155         } catch (UnexpectedRollbackException ure) {
156             LOG.error("UnexpectedRollbackException", ure);
157             return result;
158         } catch (TransactionException te) {
159             LOG.error("Error occurred obtaining available work items", te);
160             result.addFailure(new Failure<T>(te));
161             return result;
162         }
163 
164         Collection<Collection<T>> groupedWorkItems = groupWorkItems(items, result);
165 
166         // now iterate over all work groups and process each
167         Iterator<Collection<T>> i = groupedWorkItems.iterator();
168         List<Future> futures = new ArrayList<Future>();
169         while(i.hasNext()) {
170             final Collection<T> workUnit= i.next();
171 
172             LOG.info("Processing work unit: " + workUnit);
173             /* performed within transaction */
174             /* executor manages threads to run work items... */
175             futures.add(executor.submit(new Callable() {
176                 public Object call() throws Exception {
177                     ProcessingResult<T> result = new ProcessingResult<T>();
178                     try {
179                         Collection<T> successes = executeInTransaction(new TransactionCallback() {
180                             public Object doInTransaction(TransactionStatus txStatus) {
181                                 return processWorkItems(workUnit);
182                             }
183                         });
184                         result.addAllSuccesses(successes);
185                     } catch (Exception e) {
186                         LOG.error("Error occurred processing work unit " + workUnit, e);
187                         for (final T workItem: workUnit) {
188                             LOG.error("Error occurred processing work item " + workItem, e);
189                             result.addFailure(new Failure<T>(workItem, e));
190                             unlockWorkItemAtomically(workItem);
191                         }
192                     }
193                     return result;
194                 }
195             }));
196         }
197 
198         // wait for workers to finish
199         for (Future f: futures) {
200             try {
201                 ProcessingResult<T> workResult = (ProcessingResult<T>) f.get();
202                 result.add(workResult);
203             } catch (Exception e) {
204                 String message = "Error obtaining work result: " + e;
205                 LOG.error(message, e);
206                 result.addFailure(new Failure<T>(e, message));
207             }
208         }
209 
210         finishProcessing(result);
211 
212         if ( LOG.isDebugEnabled() ) {
213             LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] FINISHED RUN - " + result);
214         }
215 
216         return result;
217     }
218 
219     /**
220      * Template method called after processing of work items has completed
221      */
222     protected void finishProcessing(ProcessingResult<T> result) {}
223 
224     protected void unlockWorkItemAtomically(final T workItem) {
225         try {
226             executeInTransaction(new TransactionCallback() {
227                 public Object doInTransaction(TransactionStatus txStatus) {
228                     LOG.info("Unlocking failed work item: " + workItem);
229                     unlockWorkItem(workItem);
230                     return null;
231                 }
232             });
233         } catch (Exception e2) {
234             LOG.error("Error unlocking failed work item " + workItem, e2);
235         }
236     }
237     
238     protected <X> X executeInTransaction(TransactionCallback callback) {
239         // haha just kidding
240         //return (X) callback.doInTransaction(null);
241         return (X) createNewTransaction().execute(callback);
242     }
243     
244     private static class KCBThreadFactory implements ThreadFactory {
245 		
246 		private ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
247 		
248 		public Thread newThread(Runnable runnable) {
249 			Thread thread = defaultThreadFactory.newThread(runnable);
250 			thread.setName("KCB-job-" + thread.getName());
251 			return thread;
252 	    }
253 	}
254 }