View Javadoc
1   /**
2    * Copyright 2005-2016 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.ken.service.impl;
17  
18  import java.sql.SQLException;
19  import java.sql.Timestamp;
20  import java.util.ArrayList;
21  import java.util.Collection;
22  import java.util.Iterator;
23  import java.util.List;
24  import java.util.concurrent.Callable;
25  import java.util.concurrent.ExecutorService;
26  import java.util.concurrent.Future;
27  
28  import javax.persistence.OptimisticLockException;
29  
30  import org.apache.commons.lang.StringUtils;
31  import org.apache.log4j.Logger;
32  import org.kuali.rice.ken.service.ProcessingResult;
33  import org.springframework.dao.DataAccessException;
34  import org.springframework.dao.OptimisticLockingFailureException;
35  import org.springframework.transaction.PlatformTransactionManager;
36  import org.springframework.transaction.TransactionException;
37  import org.springframework.transaction.TransactionStatus;
38  import org.springframework.transaction.UnexpectedRollbackException;
39  import org.springframework.transaction.support.TransactionCallback;
40  import org.springframework.transaction.support.TransactionTemplate;
41  
42  /**
43   * Base class for jobs that must obtain a set of work items atomically
44   * @author Kuali Rice Team (rice.collab@kuali.org)
45   */
46  public abstract class ConcurrentJob<T> {
47      /**
48       * Oracle's "ORA-00054: resource busy and acquire with NOWAIT specified"
49       */
50      private static final int ORACLE_00054 = 54;
51      /**
52       * Oracle's "ORA-00060 deadlock detected while waiting for resource"
53       */
54      private static final int ORACLE_00060 = 60;
55      
56  
57      protected final Logger LOG = Logger.getLogger(getClass());
58  
59      protected ExecutorService executor;
60      protected PlatformTransactionManager txManager;
61      
62      /**
63       * Constructs a ConcurrentJob instance.
64       * @param txManager PlatformTransactionManager to use for transactions
65       * @param executor the ExecutorService to use to process work items
66       */
67      public ConcurrentJob(PlatformTransactionManager txManager, ExecutorService executor) {
68          this.txManager = txManager;
69          this.executor = executor;
70      }
71  
72      /**
73       * Helper method for creating a TransactionTemplate initialized to create
74       * a new transaction
75       * @return a TransactionTemplate initialized to create a new transaction
76       */
77      protected TransactionTemplate createNewTransaction() {
78          TransactionTemplate tt = new TransactionTemplate(txManager);
79          tt.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
80          return tt;
81      }
82  
83      /**
84       * Template method that subclasses should override to obtain a set of available work items
85       * and mark them as taken
86       * @return a collection of available work items that have been marked as taken
87       */
88      protected abstract Collection<T> takeAvailableWorkItems();
89  
90      /**
91       * Template method that subclasses should override to group work items into units of work
92       * @param workItems list of work items to break into groups
93       * @param result ProcessingResult to modify if there are any failures...this is sort of a hack because previously
94       * failure to obtain a deliverer was considered a work item failure, and now this method has been factored out...
95       * but the tests still want to see the failure
96       * @return a collection of collection of work items
97       */
98      protected Collection<Collection<T>> groupWorkItems(Collection<T> workItems, ProcessingResult result) {
99          Collection<Collection<T>> groupedWorkItems = new ArrayList<Collection<T>>();
100         
101         if (workItems != null) {
102 	        for (T workItem: workItems) {
103 	            Collection<T> c = new ArrayList<T>(1);
104 	            c.add(workItem);
105 	            groupedWorkItems.add(c);
106 	        }
107         }
108         return groupedWorkItems;
109     }
110 
111     /**
112      * Template method that subclasses should override to process a given work item and mark it
113      * as untaken afterwards
114      * @param items the work item
115      * @return a collection of success messages
116      */
117     protected abstract Collection<?> processWorkItems(Collection<T> items);
118 
119     /**
120      * Template method that subclasses should override to unlock a given work item when procesing has failed.
121      * @param item the work item to unlock
122      */
123     protected abstract void unlockWorkItem(T item);
124 
125     /**
126      * Main processing method which invokes subclass implementations of template methods
127      * to obtain available work items, and process them concurrently
128      * @return a ProcessingResult object containing the results of processing
129      */
130     @SuppressWarnings("unchecked")
131     public ProcessingResult run() {
132         if ( LOG.isDebugEnabled() ) {
133             LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] STARTING RUN");
134         }
135 
136         final ProcessingResult result = new ProcessingResult();
137 
138         // retrieve list of available work items in a transaction
139         Collection<T> items = null;
140         try {
141             items = (Collection<T>)
142                 createNewTransaction().execute(new TransactionCallback() {
143                     public Object doInTransaction(TransactionStatus txStatus) {
144                         return takeAvailableWorkItems();
145                     }
146                 });
147         } catch (DataAccessException dae) {
148             if ( dae instanceof OptimisticLockingFailureException || dae.contains(OptimisticLockingFailureException.class) || dae.contains(OptimisticLockException.class) ) {
149                 // anticipated in the case that another thread is trying to grab items
150                 LOG.info("Contention while taking work items: " + dae.getMessage() );
151             } else {
152                 // in addition to logging a message, should we throw an exception or log a failure here?
153                 LOG.error("Error taking work items", dae);
154                 Throwable t = dae.getMostSpecificCause();
155                 if (t != null && t instanceof SQLException) {
156                     SQLException sqle = (SQLException) t;
157                     if (sqle.getErrorCode() == ORACLE_00054 && StringUtils.contains(sqle.getMessage(), "resource busy")) {
158                         // this is expected and non-fatal given that these jobs will run again
159                         LOG.warn("Select for update lock contention encountered: " + sqle.getMessage() );
160                     } else if (sqle.getErrorCode() == ORACLE_00060 && StringUtils.contains(sqle.getMessage(), "deadlock detected")) {
161                         // this is bad...two parties are waiting forever somewhere...
162                         // database is probably wedged now :(
163                         LOG.error("Select for update deadlock encountered! " + sqle.getMessage() );
164                     }
165                 }
166             }
167             return result;
168         } catch (UnexpectedRollbackException ure) {
169             LOG.error("UnexpectedRollbackException", ure);
170             return result;
171         } catch (TransactionException te) {
172             LOG.error("Error occurred obtaining available work items", te);
173             result.addFailure("Error occurred obtaining available work items: " + te);
174             return result;
175         }
176 
177         Collection<Collection<T>> groupedWorkItems = groupWorkItems(items, result);
178 
179         // now iterate over all work groups and process each
180         Iterator<Collection<T>> i = groupedWorkItems.iterator();
181         List<Future> futures = new ArrayList<Future>();
182         while(i.hasNext()) {
183             final Collection<T> workUnit= i.next();
184 
185             LOG.info("Processing work unit: " + workUnit);
186             /* performed within transaction */
187             /* executor manages threads to run work items... */
188             futures.add(executor.submit(new Callable() {
189                 public Object call() throws Exception {
190                     ProcessingResult result = new ProcessingResult();
191                     try {
192                         Collection<?> successes = (Collection<Object>)
193                             createNewTransaction().execute(new TransactionCallback() {
194                                 public Object doInTransaction(TransactionStatus txStatus) {
195                                     return processWorkItems(workUnit);
196                                 }
197                             });
198                         result.addAllSuccesses(successes);
199                     } catch (Exception e) {
200                         LOG.error("Error occurred processing work unit " + workUnit, e);
201                         for (final T workItem: workUnit) {
202                             LOG.error("Error occurred processing work item " + workItem, e);
203                             result.addFailure("Error occurred processing work item " + workItem + ": " + e);
204                             unlockWorkItemAtomically(workItem);
205                         }
206                     }
207                     return result;
208                 }
209             }));
210         }
211 
212         // wait for workers to finish
213         for (Future f: futures) {
214             try {
215                 ProcessingResult workResult = (ProcessingResult) f.get();
216                 result.add(workResult);
217             } catch (Exception e) {
218                 String message = "Error obtaining work result: " + e;
219                 LOG.error(message, e);
220                 result.addFailure(message);
221             }
222         }
223 
224         if ( LOG.isDebugEnabled() ) {
225             LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] FINISHED RUN - " + result);
226         }
227 
228         return result;
229     }
230     
231     protected void unlockWorkItemAtomically(final T workItem) {
232         try {
233             createNewTransaction().execute(new TransactionCallback() {
234                 public Object doInTransaction(TransactionStatus txStatus) {
235                     LOG.info("Unlocking failed work item: " + workItem);
236                     unlockWorkItem(workItem);
237                     return null;
238                 }
239             });
240         } catch (Exception e2) {
241             LOG.error("Error unlocking failed work item " + workItem, e2);
242         }
243     }
244 }