001 /** 002 * Copyright 2005-2012 The Kuali Foundation 003 * 004 * Licensed under the Educational Community License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/ecl2.php 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 package org.kuali.rice.kcb.quartz; 017 018 import org.apache.log4j.Logger; 019 import org.apache.ojb.broker.OptimisticLockException; 020 import org.kuali.rice.core.api.util.RiceUtilities; 021 import org.kuali.rice.kcb.quartz.ProcessingResult.Failure; 022 import org.springframework.beans.factory.annotation.Required; 023 import org.springframework.dao.DataAccessException; 024 import org.springframework.transaction.PlatformTransactionManager; 025 import org.springframework.transaction.TransactionException; 026 import org.springframework.transaction.TransactionStatus; 027 import org.springframework.transaction.UnexpectedRollbackException; 028 import org.springframework.transaction.support.TransactionCallback; 029 import org.springframework.transaction.support.TransactionTemplate; 030 031 import java.sql.Timestamp; 032 import java.util.ArrayList; 033 import java.util.Collection; 034 import java.util.Iterator; 035 import java.util.List; 036 import java.util.concurrent.Callable; 037 import java.util.concurrent.ExecutorService; 038 import java.util.concurrent.Executors; 039 import java.util.concurrent.Future; 040 import java.util.concurrent.ThreadFactory; 041 042 /** 043 * Base class for jobs that must obtain a set of work items atomically 044 * @author Kuali Rice Team (rice.collab@kuali.org) 045 */ 046 public abstract class ConcurrentJob<T> { 047 protected final Logger LOG = Logger.getLogger(getClass()); 048 049 protected ExecutorService executor = Executors.newSingleThreadExecutor(new KCBThreadFactory()); 050 protected PlatformTransactionManager txManager; 051 052 /** 053 * Sets the {@link ExecutorService} to use to process work items. Default is single-threaded. 054 * @param executor the {@link ExecutorService} to use to process work items. Default is single-threaded. 055 */ 056 public void setExecutorService(ExecutorService executor) { 057 this.executor = executor; 058 } 059 060 /** 061 * Sets the {@link PlatformTransactionManager} 062 * @param txManager the {@link PlatformTransactionManager} 063 */ 064 @Required 065 public void setTransactionManager(PlatformTransactionManager txManager) { 066 this.txManager = txManager; 067 } 068 069 /** 070 * Helper method for creating a TransactionTemplate initialized to create 071 * a new transaction 072 * @return a TransactionTemplate initialized to create a new transaction 073 */ 074 protected TransactionTemplate createNewTransaction() { 075 TransactionTemplate tt = new TransactionTemplate(txManager); 076 tt.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW); 077 return tt; 078 } 079 080 /** 081 * Template method that subclasses should override to obtain a set of available work items 082 * and mark them as taken 083 * @return a collection of available work items that have been marked as taken 084 */ 085 protected abstract Collection<T> takeAvailableWorkItems(); 086 087 /** 088 * Template method that subclasses should override to group work items into units of work 089 * @param workItems list of work items to break into groups 090 * @param result ProcessingResult to modify if there are any failures...this is sort of a hack because previously 091 * failure to obtain a deliverer was considered a work item failure, and now this method has been factored out... 092 * but the tests still want to see the failure 093 * @return a collection of collection of work items 094 */ 095 protected Collection<Collection<T>> groupWorkItems(Collection<T> workItems, ProcessingResult<T> result) { 096 Collection<Collection<T>> groupedWorkItems = new ArrayList<Collection<T>>(workItems.size()); 097 for (T workItem: workItems) { 098 Collection<T> c = new ArrayList<T>(1); 099 c.add(workItem); 100 groupedWorkItems.add(c); 101 } 102 return groupedWorkItems; 103 } 104 105 /** 106 * Template method that subclasses should override to process a given work item and mark it 107 * as untaken afterwards 108 * @param item the work item 109 * @return a collection of success messages 110 */ 111 protected abstract Collection<T> processWorkItems(Collection<T> items); 112 113 /** 114 * Template method that subclasses should override to unlock a given work item when procesing has failed. 115 * @param item the work item to unlock 116 */ 117 protected abstract void unlockWorkItem(T item); 118 119 /** 120 * Main processing method which invokes subclass implementations of template methods 121 * to obtain available work items, and process them concurrently 122 * @return a ProcessingResult object containing the results of processing 123 */ 124 public ProcessingResult<T> run() { 125 LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] STARTING RUN"); 126 127 final ProcessingResult<T> result = new ProcessingResult<T>(); 128 129 // retrieve list of available work items in a transaction 130 final Collection<T> items; 131 try { 132 items = executeInTransaction(new TransactionCallback() { 133 public Object doInTransaction(TransactionStatus txStatus) { 134 return takeAvailableWorkItems(); 135 } 136 }); 137 } catch (DataAccessException dae) { 138 // Spring does not detect OJB's org.apache.ojb.broker.OptimisticLockException and turn it into a 139 // org.springframework.dao.OptimisticLockingFailureException? 140 OptimisticLockException optimisticLockException = RiceUtilities.findExceptionInStack(dae, OptimisticLockException.class); 141 if (optimisticLockException != null) { 142 // anticipated in the case that another thread is trying to grab items 143 LOG.info("Contention while taking work items"); 144 } else { 145 // in addition to logging a message, should we throw an exception or log a failure here? 146 LOG.error("Error taking work items", dae); 147 } 148 return result; 149 } catch (UnexpectedRollbackException ure) { 150 // occurs against Mckoi... :( 151 LOG.error("UnexpectedRollbackException - possibly due to Mckoi"); 152 return result; 153 } catch (TransactionException te) { 154 LOG.error("Error occurred obtaining available work items", te); 155 result.addFailure(new Failure<T>(te)); 156 return result; 157 } 158 159 Collection<Collection<T>> groupedWorkItems = groupWorkItems(items, result); 160 161 // now iterate over all work groups and process each 162 Iterator<Collection<T>> i = groupedWorkItems.iterator(); 163 List<Future> futures = new ArrayList<Future>(); 164 while(i.hasNext()) { 165 final Collection<T> workUnit= i.next(); 166 167 LOG.info("Processing work unit: " + workUnit); 168 /* performed within transaction */ 169 /* executor manages threads to run work items... */ 170 futures.add(executor.submit(new Callable() { 171 public Object call() throws Exception { 172 ProcessingResult<T> result = new ProcessingResult<T>(); 173 try { 174 Collection<T> successes = executeInTransaction(new TransactionCallback() { 175 public Object doInTransaction(TransactionStatus txStatus) { 176 return processWorkItems(workUnit); 177 } 178 }); 179 result.addAllSuccesses(successes); 180 } catch (Exception e) { 181 LOG.error("Error occurred processing work unit " + workUnit, e); 182 for (final T workItem: workUnit) { 183 LOG.error("Error occurred processing work item " + workItem, e); 184 result.addFailure(new Failure<T>(workItem, e)); 185 unlockWorkItemAtomically(workItem); 186 } 187 } 188 return result; 189 } 190 })); 191 } 192 193 // wait for workers to finish 194 for (Future f: futures) { 195 try { 196 ProcessingResult<T> workResult = (ProcessingResult<T>) f.get(); 197 result.add(workResult); 198 } catch (Exception e) { 199 String message = "Error obtaining work result: " + e; 200 LOG.error(message, e); 201 result.addFailure(new Failure<T>(e, message)); 202 } 203 } 204 205 finishProcessing(result); 206 207 LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] FINISHED RUN - " + result); 208 209 return result; 210 } 211 212 /** 213 * Template method called after processing of work items has completed 214 */ 215 protected void finishProcessing(ProcessingResult<T> result) {} 216 217 protected void unlockWorkItemAtomically(final T workItem) { 218 try { 219 executeInTransaction(new TransactionCallback() { 220 public Object doInTransaction(TransactionStatus txStatus) { 221 LOG.info("Unlocking failed work item: " + workItem); 222 unlockWorkItem(workItem); 223 return null; 224 } 225 }); 226 } catch (Exception e2) { 227 LOG.error("Error unlocking failed work item " + workItem, e2); 228 } 229 } 230 231 protected <X> X executeInTransaction(TransactionCallback callback) { 232 // haha just kidding 233 //return (X) callback.doInTransaction(null); 234 return (X) createNewTransaction().execute(callback); 235 } 236 237 private static class KCBThreadFactory implements ThreadFactory { 238 239 private ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory(); 240 241 public Thread newThread(Runnable runnable) { 242 Thread thread = defaultThreadFactory.newThread(runnable); 243 thread.setName("KCB-job-" + thread.getName()); 244 return thread; 245 } 246 } 247 }