Coverage Report - org.kuali.rice.kcb.quartz.ConcurrentJob
 
Classes in this File Line Coverage Branch Coverage Complexity
ConcurrentJob
0%
0/61
0%
0/8
1.875
ConcurrentJob$1
0%
0/2
N/A
1.875
ConcurrentJob$2
0%
0/12
0%
0/2
1.875
ConcurrentJob$2$1
0%
0/2
N/A
1.875
ConcurrentJob$3
0%
0/4
N/A
1.875
ConcurrentJob$KCBThreadFactory
0%
0/5
N/A
1.875
 
 1  
 /*
 2  
  * Copyright 2007-2008 The Kuali Foundation
 3  
  * 
 4  
  * Licensed under the Educational Community License, Version 2.0 (the "License");
 5  
  * you may not use this file except in compliance with the License.
 6  
  * You may obtain a copy of the License at
 7  
  * 
 8  
  * http://www.opensource.org/licenses/ecl2.php
 9  
  * 
 10  
  * Unless required by applicable law or agreed to in writing, software
 11  
  * distributed under the License is distributed on an "AS IS" BASIS,
 12  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  
  * See the License for the specific language governing permissions and
 14  
  * limitations under the License.
 15  
  */
 16  
 package org.kuali.rice.kcb.quartz;
 17  
 
 18  
 import java.sql.Timestamp;
 19  
 import java.util.ArrayList;
 20  
 import java.util.Collection;
 21  
 import java.util.Iterator;
 22  
 import java.util.List;
 23  
 import java.util.concurrent.Callable;
 24  
 import java.util.concurrent.ExecutorService;
 25  
 import java.util.concurrent.Executors;
 26  
 import java.util.concurrent.Future;
 27  
 import java.util.concurrent.ThreadFactory;
 28  
 
 29  
 import org.apache.log4j.Logger;
 30  
 import org.apache.ojb.broker.OptimisticLockException;
 31  
 import org.kuali.rice.core.util.RiceUtilities;
 32  
 import org.kuali.rice.kcb.quartz.ProcessingResult.Failure;
 33  
 import org.springframework.beans.factory.annotation.Required;
 34  
 import org.springframework.dao.DataAccessException;
 35  
 import org.springframework.transaction.PlatformTransactionManager;
 36  
 import org.springframework.transaction.TransactionException;
 37  
 import org.springframework.transaction.TransactionStatus;
 38  
 import org.springframework.transaction.UnexpectedRollbackException;
 39  
 import org.springframework.transaction.support.TransactionCallback;
 40  
 import org.springframework.transaction.support.TransactionTemplate;
 41  
 
 42  
 /**
 43  
  * Base class for jobs that must obtain a set of work items atomically
 44  
  * @author Kuali Rice Team (rice.collab@kuali.org)
 45  
  */
 46  0
 public abstract class ConcurrentJob<T> {
 47  0
     protected final Logger LOG = Logger.getLogger(getClass());
 48  
 
 49  0
     protected ExecutorService executor = Executors.newSingleThreadExecutor(new KCBThreadFactory());
 50  
     protected PlatformTransactionManager txManager;
 51  
 
 52  
     /**
 53  
      * Sets the {@link ExecutorService} to use to process work items.  Default is single-threaded.
 54  
      * @param executor the {@link ExecutorService} to use to process work items.  Default is single-threaded.
 55  
      */
 56  
     public void setExecutorService(ExecutorService executor) {
 57  0
         this.executor = executor;
 58  0
     }
 59  
 
 60  
     /**
 61  
      * Sets the {@link PlatformTransactionManager}
 62  
      * @param txManager the {@link PlatformTransactionManager} 
 63  
      */
 64  
     @Required
 65  
     public void setTransactionManager(PlatformTransactionManager txManager) {
 66  0
         this.txManager = txManager;
 67  0
     }
 68  
 
 69  
     /**
 70  
      * Helper method for creating a TransactionTemplate initialized to create
 71  
      * a new transaction
 72  
      * @return a TransactionTemplate initialized to create a new transaction
 73  
      */
 74  
     protected TransactionTemplate createNewTransaction() {
 75  0
         TransactionTemplate tt = new TransactionTemplate(txManager);
 76  0
         tt.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
 77  0
         return tt;
 78  
     }
 79  
 
 80  
     /**
 81  
      * Template method that subclasses should override to obtain a set of available work items
 82  
      * and mark them as taken
 83  
      * @return a collection of available work items that have been marked as taken
 84  
      */
 85  
     protected abstract Collection<T> takeAvailableWorkItems();
 86  
 
 87  
     /**
 88  
      * Template method that subclasses should override to group work items into units of work
 89  
      * @param workItems list of work items to break into groups
 90  
      * @param result ProcessingResult to modify if there are any failures...this is sort of a hack because previously
 91  
      * failure to obtain a deliverer was considered a work item failure, and now this method has been factored out...
 92  
      * but the tests still want to see the failure
 93  
      * @return a collection of collection of work items
 94  
      */
 95  
     protected Collection<Collection<T>> groupWorkItems(Collection<T> workItems, ProcessingResult<T> result) {
 96  0
         Collection<Collection<T>> groupedWorkItems = new ArrayList<Collection<T>>(workItems.size());
 97  0
         for (T workItem: workItems) {
 98  0
             Collection<T> c = new ArrayList<T>(1);
 99  0
             c.add(workItem);
 100  0
             groupedWorkItems.add(c);
 101  0
         }
 102  0
         return groupedWorkItems;
 103  
     }
 104  
 
 105  
     /**
 106  
      * Template method that subclasses should override to process a given work item and mark it
 107  
      * as untaken afterwards
 108  
      * @param item the work item
 109  
      * @return a collection of success messages
 110  
      */
 111  
     protected abstract Collection<T> processWorkItems(Collection<T> items);
 112  
 
 113  
     /**
 114  
      * Template method that subclasses should override to unlock a given work item when procesing has failed.
 115  
      * @param item the work item to unlock
 116  
      */
 117  
     protected abstract void unlockWorkItem(T item);
 118  
 
 119  
     /**
 120  
      * Main processing method which invokes subclass implementations of template methods
 121  
      * to obtain available work items, and process them concurrently
 122  
      * @return a ProcessingResult object containing the results of processing
 123  
      */
 124  
     public ProcessingResult<T> run() {
 125  0
         LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] STARTING RUN");
 126  
 
 127  0
         final ProcessingResult<T> result = new ProcessingResult<T>();
 128  
 
 129  
         // retrieve list of available work items in a transaction
 130  
         final Collection<T> items;
 131  
         try {
 132  0
             items = executeInTransaction(new TransactionCallback() {
 133  
                 public Object doInTransaction(TransactionStatus txStatus) {
 134  0
                     return takeAvailableWorkItems();
 135  
                 }
 136  
             });
 137  0
         } catch (DataAccessException dae) {
 138  
             // Spring does not detect OJB's org.apache.ojb.broker.OptimisticLockException and turn it into a
 139  
             // org.springframework.dao.OptimisticLockingFailureException?
 140  0
             OptimisticLockException optimisticLockException = RiceUtilities.findExceptionInStack(dae, OptimisticLockException.class);
 141  0
             if (optimisticLockException != null) {
 142  
                 // anticipated in the case that another thread is trying to grab items
 143  0
                 LOG.info("Contention while taking work items");
 144  
             } else {
 145  
                 // in addition to logging a message, should we throw an exception or log a failure here?
 146  0
                 LOG.error("Error taking work items", dae);
 147  
             }
 148  0
             return result;
 149  0
         } catch (UnexpectedRollbackException ure) {
 150  
             // occurs against Mckoi... :(
 151  0
             LOG.error("UnexpectedRollbackException - possibly due to Mckoi");
 152  0
             return result;
 153  0
         } catch (TransactionException te) {
 154  0
             LOG.error("Error occurred obtaining available work items", te);
 155  0
             result.addFailure(new Failure<T>(te));
 156  0
             return result;
 157  0
         }
 158  
 
 159  0
         Collection<Collection<T>> groupedWorkItems = groupWorkItems(items, result);
 160  
 
 161  
         // now iterate over all work groups and process each
 162  0
         Iterator<Collection<T>> i = groupedWorkItems.iterator();
 163  0
         List<Future> futures = new ArrayList<Future>();
 164  0
         while(i.hasNext()) {
 165  0
             final Collection<T> workUnit= i.next();
 166  
 
 167  0
             LOG.info("Processing work unit: " + workUnit);
 168  
             /* performed within transaction */
 169  
             /* executor manages threads to run work items... */
 170  0
             futures.add(executor.submit(new Callable() {
 171  
                 public Object call() throws Exception {
 172  0
                     ProcessingResult<T> result = new ProcessingResult<T>();
 173  
                     try {
 174  0
                         Collection<T> successes = executeInTransaction(new TransactionCallback() {
 175  
                             public Object doInTransaction(TransactionStatus txStatus) {
 176  0
                                 return processWorkItems(workUnit);
 177  
                             }
 178  
                         });
 179  0
                         result.addAllSuccesses(successes);
 180  0
                     } catch (Exception e) {
 181  0
                         LOG.error("Error occurred processing work unit " + workUnit, e);
 182  0
                         for (final T workItem: workUnit) {
 183  0
                             LOG.error("Error occurred processing work item " + workItem, e);
 184  0
                             result.addFailure(new Failure<T>(workItem, e));
 185  0
                             unlockWorkItemAtomically(workItem);
 186  
                         }
 187  0
                     }
 188  0
                     return result;
 189  
                 }
 190  
             }));
 191  0
         }
 192  
 
 193  
         // wait for workers to finish
 194  0
         for (Future f: futures) {
 195  
             try {
 196  0
                 ProcessingResult<T> workResult = (ProcessingResult<T>) f.get();
 197  0
                 result.add(workResult);
 198  0
             } catch (Exception e) {
 199  0
                 String message = "Error obtaining work result: " + e;
 200  0
                 LOG.error(message, e);
 201  0
                 result.addFailure(new Failure<T>(e, message));
 202  0
             }
 203  
         }
 204  
 
 205  0
         finishProcessing(result);
 206  
 
 207  0
         LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] FINISHED RUN - " + result);
 208  
 
 209  0
         return result;
 210  
     }
 211  
 
 212  
     /**
 213  
      * Template method called after processing of work items has completed
 214  
      */
 215  0
     protected void finishProcessing(ProcessingResult<T> result) {}
 216  
 
 217  
     protected void unlockWorkItemAtomically(final T workItem) {
 218  
         try {
 219  0
             executeInTransaction(new TransactionCallback() {
 220  
                 public Object doInTransaction(TransactionStatus txStatus) {
 221  0
                     LOG.info("Unlocking failed work item: " + workItem);
 222  0
                     unlockWorkItem(workItem);
 223  0
                     return null;
 224  
                 }
 225  
             });
 226  0
         } catch (Exception e2) {
 227  0
             LOG.error("Error unlocking failed work item " + workItem, e2);
 228  0
         }
 229  0
     }
 230  
     
 231  
     protected <X> X executeInTransaction(TransactionCallback callback) {
 232  
         // haha just kidding
 233  
         //return (X) callback.doInTransaction(null);
 234  0
         return (X) createNewTransaction().execute(callback);
 235  
     }
 236  
     
 237  0
     private static class KCBThreadFactory implements ThreadFactory {
 238  
                 
 239  0
                 private ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
 240  
                 
 241  
                 public Thread newThread(Runnable runnable) {
 242  0
                         Thread thread = defaultThreadFactory.newThread(runnable);
 243  0
                         thread.setName("KCB-job-" + thread.getName());
 244  0
                         return thread;
 245  
             }
 246  
         }
 247  
 }