001/** 002 * Copyright 2005-2014 The Kuali Foundation 003 * 004 * Licensed under the Educational Community License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/ecl2.php 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.kuali.rice.kcb.quartz; 017 018import org.apache.log4j.Logger; 019import org.apache.ojb.broker.OptimisticLockException; 020import org.kuali.rice.core.api.util.RiceUtilities; 021import org.kuali.rice.kcb.quartz.ProcessingResult.Failure; 022import org.springframework.beans.factory.annotation.Required; 023import org.springframework.dao.DataAccessException; 024import org.springframework.transaction.PlatformTransactionManager; 025import org.springframework.transaction.TransactionException; 026import org.springframework.transaction.TransactionStatus; 027import org.springframework.transaction.UnexpectedRollbackException; 028import org.springframework.transaction.support.TransactionCallback; 029import org.springframework.transaction.support.TransactionTemplate; 030 031import java.sql.Timestamp; 032import java.util.ArrayList; 033import java.util.Collection; 034import java.util.Iterator; 035import java.util.List; 036import java.util.concurrent.Callable; 037import java.util.concurrent.ExecutorService; 038import java.util.concurrent.Executors; 039import java.util.concurrent.Future; 040import java.util.concurrent.ThreadFactory; 041 042/** 043 * Base class for jobs that must obtain a set of work items atomically 044 * @author Kuali Rice Team (rice.collab@kuali.org) 045 */ 046public abstract class ConcurrentJob<T> { 047 protected final Logger LOG = Logger.getLogger(getClass()); 048 049 protected ExecutorService executor = Executors.newSingleThreadExecutor(new KCBThreadFactory()); 050 protected PlatformTransactionManager txManager; 051 052 /** 053 * Sets the {@link ExecutorService} to use to process work items. Default is single-threaded. 054 * @param executor the {@link ExecutorService} to use to process work items. Default is single-threaded. 055 */ 056 public void setExecutorService(ExecutorService executor) { 057 this.executor = executor; 058 } 059 060 /** 061 * Sets the {@link PlatformTransactionManager} 062 * @param txManager the {@link PlatformTransactionManager} 063 */ 064 @Required 065 public void setTransactionManager(PlatformTransactionManager txManager) { 066 this.txManager = txManager; 067 } 068 069 /** 070 * Helper method for creating a TransactionTemplate initialized to create 071 * a new transaction 072 * @return a TransactionTemplate initialized to create a new transaction 073 */ 074 protected TransactionTemplate createNewTransaction() { 075 TransactionTemplate tt = new TransactionTemplate(txManager); 076 tt.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW); 077 return tt; 078 } 079 080 /** 081 * Template method that subclasses should override to obtain a set of available work items 082 * and mark them as taken 083 * @return a collection of available work items that have been marked as taken 084 */ 085 protected abstract Collection<T> takeAvailableWorkItems(); 086 087 /** 088 * Template method that subclasses should override to group work items into units of work 089 * @param workItems list of work items to break into groups 090 * @param result ProcessingResult to modify if there are any failures...this is sort of a hack because previously 091 * failure to obtain a deliverer was considered a work item failure, and now this method has been factored out... 092 * but the tests still want to see the failure 093 * @return a collection of collection of work items 094 */ 095 protected Collection<Collection<T>> groupWorkItems(Collection<T> workItems, ProcessingResult<T> result) { 096 Collection<Collection<T>> groupedWorkItems = new ArrayList<Collection<T>>(workItems.size()); 097 for (T workItem: workItems) { 098 Collection<T> c = new ArrayList<T>(1); 099 c.add(workItem); 100 groupedWorkItems.add(c); 101 } 102 return groupedWorkItems; 103 } 104 105 /** 106 * Template method that subclasses should override to process a given work item and mark it 107 * as untaken afterwards 108 * @param item the work item 109 * @return a collection of success messages 110 */ 111 protected abstract Collection<T> processWorkItems(Collection<T> items); 112 113 /** 114 * Template method that subclasses should override to unlock a given work item when procesing has failed. 115 * @param item the work item to unlock 116 */ 117 protected abstract void unlockWorkItem(T item); 118 119 /** 120 * Main processing method which invokes subclass implementations of template methods 121 * to obtain available work items, and process them concurrently 122 * @return a ProcessingResult object containing the results of processing 123 */ 124 public ProcessingResult<T> run() { 125 LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] STARTING RUN"); 126 127 final ProcessingResult<T> result = new ProcessingResult<T>(); 128 129 // retrieve list of available work items in a transaction 130 final Collection<T> items; 131 try { 132 items = executeInTransaction(new TransactionCallback() { 133 public Object doInTransaction(TransactionStatus txStatus) { 134 return takeAvailableWorkItems(); 135 } 136 }); 137 } catch (DataAccessException dae) { 138 // Spring does not detect OJB's org.apache.ojb.broker.OptimisticLockException and turn it into a 139 // org.springframework.dao.OptimisticLockingFailureException? 140 OptimisticLockException optimisticLockException = RiceUtilities.findExceptionInStack(dae, OptimisticLockException.class); 141 if (optimisticLockException != null) { 142 // anticipated in the case that another thread is trying to grab items 143 LOG.info("Contention while taking work items"); 144 } else { 145 // in addition to logging a message, should we throw an exception or log a failure here? 146 LOG.error("Error taking work items", dae); 147 } 148 return result; 149 } catch (UnexpectedRollbackException ure) { 150 LOG.error("UnexpectedRollbackException", ure); 151 return result; 152 } catch (TransactionException te) { 153 LOG.error("Error occurred obtaining available work items", te); 154 result.addFailure(new Failure<T>(te)); 155 return result; 156 } 157 158 Collection<Collection<T>> groupedWorkItems = groupWorkItems(items, result); 159 160 // now iterate over all work groups and process each 161 Iterator<Collection<T>> i = groupedWorkItems.iterator(); 162 List<Future> futures = new ArrayList<Future>(); 163 while(i.hasNext()) { 164 final Collection<T> workUnit= i.next(); 165 166 LOG.info("Processing work unit: " + workUnit); 167 /* performed within transaction */ 168 /* executor manages threads to run work items... */ 169 futures.add(executor.submit(new Callable() { 170 public Object call() throws Exception { 171 ProcessingResult<T> result = new ProcessingResult<T>(); 172 try { 173 Collection<T> successes = executeInTransaction(new TransactionCallback() { 174 public Object doInTransaction(TransactionStatus txStatus) { 175 return processWorkItems(workUnit); 176 } 177 }); 178 result.addAllSuccesses(successes); 179 } catch (Exception e) { 180 LOG.error("Error occurred processing work unit " + workUnit, e); 181 for (final T workItem: workUnit) { 182 LOG.error("Error occurred processing work item " + workItem, e); 183 result.addFailure(new Failure<T>(workItem, e)); 184 unlockWorkItemAtomically(workItem); 185 } 186 } 187 return result; 188 } 189 })); 190 } 191 192 // wait for workers to finish 193 for (Future f: futures) { 194 try { 195 ProcessingResult<T> workResult = (ProcessingResult<T>) f.get(); 196 result.add(workResult); 197 } catch (Exception e) { 198 String message = "Error obtaining work result: " + e; 199 LOG.error(message, e); 200 result.addFailure(new Failure<T>(e, message)); 201 } 202 } 203 204 finishProcessing(result); 205 206 LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] FINISHED RUN - " + result); 207 208 return result; 209 } 210 211 /** 212 * Template method called after processing of work items has completed 213 */ 214 protected void finishProcessing(ProcessingResult<T> result) {} 215 216 protected void unlockWorkItemAtomically(final T workItem) { 217 try { 218 executeInTransaction(new TransactionCallback() { 219 public Object doInTransaction(TransactionStatus txStatus) { 220 LOG.info("Unlocking failed work item: " + workItem); 221 unlockWorkItem(workItem); 222 return null; 223 } 224 }); 225 } catch (Exception e2) { 226 LOG.error("Error unlocking failed work item " + workItem, e2); 227 } 228 } 229 230 protected <X> X executeInTransaction(TransactionCallback callback) { 231 // haha just kidding 232 //return (X) callback.doInTransaction(null); 233 return (X) createNewTransaction().execute(callback); 234 } 235 236 private static class KCBThreadFactory implements ThreadFactory { 237 238 private ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory(); 239 240 public Thread newThread(Runnable runnable) { 241 Thread thread = defaultThreadFactory.newThread(runnable); 242 thread.setName("KCB-job-" + thread.getName()); 243 return thread; 244 } 245 } 246}