View Javadoc

1   /**
2    * Copyright 2005-2013 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.ken.service.impl;
17  
18  import org.apache.commons.lang.StringUtils;
19  import org.apache.commons.lang.exception.ExceptionUtils;
20  import org.apache.log4j.Logger;
21  import org.apache.ojb.broker.OptimisticLockException;
22  import org.kuali.rice.ken.service.ProcessingResult;
23  import org.springframework.dao.DataAccessException;
24  import org.springframework.transaction.PlatformTransactionManager;
25  import org.springframework.transaction.TransactionException;
26  import org.springframework.transaction.TransactionStatus;
27  import org.springframework.transaction.UnexpectedRollbackException;
28  import org.springframework.transaction.support.TransactionCallback;
29  import org.springframework.transaction.support.TransactionTemplate;
30  
31  import java.sql.SQLException;
32  import java.sql.Timestamp;
33  import java.util.ArrayList;
34  import java.util.Collection;
35  import java.util.Iterator;
36  import java.util.List;
37  import java.util.concurrent.Callable;
38  import java.util.concurrent.ExecutorService;
39  import java.util.concurrent.Future;
40  
41  /**
42   * Base class for jobs that must obtain a set of work items atomically
43   * @author Kuali Rice Team (rice.collab@kuali.org)
44   */
45  public abstract class ConcurrentJob<T> {
46      /**
47       * Oracle's "ORA-00054: resource busy and acquire with NOWAIT specified"
48       */
49      private static final int ORACLE_00054 = 54;
50      /**
51       * Oracle's "ORA-00060 deadlock detected while waiting for resource"
52       */
53      private static final int ORACLE_00060 = 60;
54      
55  
56      protected final Logger LOG = Logger.getLogger(getClass());
57  
58      protected ExecutorService executor;
59      protected PlatformTransactionManager txManager;
60      
61      /**
62       * Constructs a ConcurrentJob instance.
63       * @param txManager PlatformTransactionManager to use for transactions
64       * @param executor the ExecutorService to use to process work items
65       */
66      public ConcurrentJob(PlatformTransactionManager txManager, ExecutorService executor) {
67          this.txManager = txManager;
68          this.executor = executor;
69      }
70  
71      /**
72       * Helper method for creating a TransactionTemplate initialized to create
73       * a new transaction
74       * @return a TransactionTemplate initialized to create a new transaction
75       */
76      protected TransactionTemplate createNewTransaction() {
77          TransactionTemplate tt = new TransactionTemplate(txManager);
78          tt.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
79          return tt;
80      }
81  
82      /**
83       * Template method that subclasses should override to obtain a set of available work items
84       * and mark them as taken
85       * @return a collection of available work items that have been marked as taken
86       */
87      protected abstract Collection<T> takeAvailableWorkItems();
88  
89      /**
90       * Template method that subclasses should override to group work items into units of work
91       * @param workItems list of work items to break into groups
92       * @param result ProcessingResult to modify if there are any failures...this is sort of a hack because previously
93       * failure to obtain a deliverer was considered a work item failure, and now this method has been factored out...
94       * but the tests still want to see the failure
95       * @return a collection of collection of work items
96       */
97      protected Collection<Collection<T>> groupWorkItems(Collection<T> workItems, ProcessingResult result) {
98          Collection<Collection<T>> groupedWorkItems = new ArrayList<Collection<T>>();
99          
100         if (workItems != null) {
101 	        for (T workItem: workItems) {
102 	            Collection<T> c = new ArrayList<T>(1);
103 	            c.add(workItem);
104 	            groupedWorkItems.add(c);
105 	        }
106         }
107         return groupedWorkItems;
108     }
109 
110     /**
111      * Template method that subclasses should override to process a given work item and mark it
112      * as untaken afterwards
113      * @param items the work item
114      * @return a collection of success messages
115      */
116     protected abstract Collection<?> processWorkItems(Collection<T> items);
117 
118     /**
119      * Template method that subclasses should override to unlock a given work item when procesing has failed.
120      * @param item the work item to unlock
121      */
122     protected abstract void unlockWorkItem(T item);
123 
124     /**
125      * Main processing method which invokes subclass implementations of template methods
126      * to obtain available work items, and process them concurrently
127      * @return a ProcessingResult object containing the results of processing
128      */
129     public ProcessingResult run() {
130         LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] STARTING RUN");
131 
132         final ProcessingResult result = new ProcessingResult();
133 
134         // retrieve list of available work items in a transaction
135         Collection<T> items = null;
136         try {
137             items = (Collection<T>)
138                 createNewTransaction().execute(new TransactionCallback() {
139                     public Object doInTransaction(TransactionStatus txStatus) {
140                         return takeAvailableWorkItems();
141                     }
142                 });
143         } catch (DataAccessException dae) {
144             // Spring does not detect OJB's org.apache.ojb.broker.OptimisticLockException and turn it into a
145             // org.springframework.dao.OptimisticLockingFailureException?
146             if (ExceptionUtils.indexOfType(dae, OptimisticLockException.class) != -1) {
147                 // anticipated in the case that another thread is trying to grab items
148                 LOG.info("Contention while taking work items");
149             } else {
150                 // in addition to logging a message, should we throw an exception or log a failure here?
151                 LOG.error("Error taking work items", dae);
152                 Throwable t = dae.getMostSpecificCause();
153                 if (t != null && t instanceof SQLException) {
154                     SQLException sqle = (SQLException) t;
155                     if (sqle.getErrorCode() == ORACLE_00054 && StringUtils.contains(sqle.getMessage(), "resource busy")) {
156                         // this is expected and non-fatal given that these jobs will run again
157                         LOG.warn("Select for update lock contention encountered");
158                     } else if (sqle.getErrorCode() == ORACLE_00060 && StringUtils.contains(sqle.getMessage(), "deadlock detected")) {
159                         // this is bad...two parties are waiting forever somewhere...
160                         // database is probably wedged now :(
161                         LOG.error("Select for update deadlock encountered!");
162                     }
163                 }
164             }
165             return result;
166         } catch (UnexpectedRollbackException ure) {
167             // occurs against Mckoi... :(
168             LOG.error("UnexpectedRollbackException - possibly due to Mckoi");
169             return result;
170         } catch (TransactionException te) {
171             LOG.error("Error occurred obtaining available work items", te);
172             result.addFailure("Error occurred obtaining available work items: " + te);
173             return result;
174         }
175 
176         Collection<Collection<T>> groupedWorkItems = groupWorkItems(items, result);
177 
178         // now iterate over all work groups and process each
179         Iterator<Collection<T>> i = groupedWorkItems.iterator();
180         List<Future> futures = new ArrayList<Future>();
181         while(i.hasNext()) {
182             final Collection<T> workUnit= i.next();
183 
184             LOG.info("Processing work unit: " + workUnit);
185             /* performed within transaction */
186             /* executor manages threads to run work items... */
187             futures.add(executor.submit(new Callable() {
188                 public Object call() throws Exception {
189                     ProcessingResult result = new ProcessingResult();
190                     try {
191                         Collection<?> successes = (Collection<Object>)
192                             createNewTransaction().execute(new TransactionCallback() {
193                                 public Object doInTransaction(TransactionStatus txStatus) {
194                                     return processWorkItems(workUnit);
195                                 }
196                             });
197                         result.addAllSuccesses(successes);
198                     } catch (Exception e) {
199                         LOG.error("Error occurred processing work unit " + workUnit, e);
200                         for (final T workItem: workUnit) {
201                             LOG.error("Error occurred processing work item " + workItem, e);
202                             result.addFailure("Error occurred processing work item " + workItem + ": " + e);
203                             unlockWorkItemAtomically(workItem);
204                         }
205                     }
206                     return result;
207                 }
208             }));
209         }
210 
211         // wait for workers to finish
212         for (Future f: futures) {
213             try {
214                 ProcessingResult workResult = (ProcessingResult) f.get();
215                 result.add(workResult);
216             } catch (Exception e) {
217                 String message = "Error obtaining work result: " + e;
218                 LOG.error(message, e);
219                 result.addFailure(message);
220             }
221         }
222 
223         LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] FINISHED RUN - " + result);
224 
225         return result;
226     }
227     
228     protected void unlockWorkItemAtomically(final T workItem) {
229         try {
230             createNewTransaction().execute(new TransactionCallback() {
231                 public Object doInTransaction(TransactionStatus txStatus) {
232                     LOG.info("Unlocking failed work item: " + workItem);
233                     unlockWorkItem(workItem);
234                     return null;
235                 }
236             });
237         } catch (Exception e2) {
238             LOG.error("Error unlocking failed work item " + workItem, e2);
239         }
240     }
241 }