001 /* 002 * Copyright 2007 The Kuali Foundation 003 * 004 * Licensed under the Educational Community License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/ecl2.php 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 package org.kuali.rice.ken.service.impl; 017 018 import java.sql.SQLException; 019 import java.sql.Timestamp; 020 import java.util.ArrayList; 021 import java.util.Collection; 022 import java.util.Iterator; 023 import java.util.List; 024 import java.util.concurrent.Callable; 025 import java.util.concurrent.ExecutorService; 026 import java.util.concurrent.Future; 027 028 import org.apache.commons.lang.StringUtils; 029 import org.apache.log4j.Logger; 030 import org.apache.ojb.broker.OptimisticLockException; 031 import org.kuali.rice.ken.service.ProcessingResult; 032 import org.kuali.rice.ken.util.Util; 033 import org.springframework.dao.DataAccessException; 034 import org.springframework.transaction.PlatformTransactionManager; 035 import org.springframework.transaction.TransactionException; 036 import org.springframework.transaction.TransactionStatus; 037 import org.springframework.transaction.UnexpectedRollbackException; 038 import org.springframework.transaction.support.TransactionCallback; 039 import org.springframework.transaction.support.TransactionTemplate; 040 041 /** 042 * Base class for jobs that must obtain a set of work items atomically 043 * @author Kuali Rice Team (rice.collab@kuali.org) 044 */ 045 public abstract class ConcurrentJob<T> { 046 /** 047 * Oracle's "ORA-00054: resource busy and acquire with NOWAIT specified" 048 */ 049 private static final int ORACLE_00054 = 54; 050 /** 051 * Oracle's "ORA-00060 deadlock detected while waiting for resource" 052 */ 053 private static final int ORACLE_00060 = 60; 054 055 056 protected final Logger LOG = Logger.getLogger(getClass()); 057 058 protected ExecutorService executor; 059 protected PlatformTransactionManager txManager; 060 061 /** 062 * Constructs a ConcurrentJob instance. 063 * @param txManager PlatformTransactionManager to use for transactions 064 * @param executor the ExecutorService to use to process work items 065 */ 066 public ConcurrentJob(PlatformTransactionManager txManager, ExecutorService executor) { 067 this.txManager = txManager; 068 this.executor = executor; 069 } 070 071 /** 072 * Helper method for creating a TransactionTemplate initialized to create 073 * a new transaction 074 * @return a TransactionTemplate initialized to create a new transaction 075 */ 076 protected TransactionTemplate createNewTransaction() { 077 TransactionTemplate tt = new TransactionTemplate(txManager); 078 tt.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW); 079 return tt; 080 } 081 082 /** 083 * Template method that subclasses should override to obtain a set of available work items 084 * and mark them as taken 085 * @return a collection of available work items that have been marked as taken 086 */ 087 protected abstract Collection<T> takeAvailableWorkItems(); 088 089 /** 090 * Template method that subclasses should override to group work items into units of work 091 * @param workItems list of work items to break into groups 092 * @param result ProcessingResult to modify if there are any failures...this is sort of a hack because previously 093 * failure to obtain a deliverer was considered a work item failure, and now this method has been factored out... 094 * but the tests still want to see the failure 095 * @return a collection of collection of work items 096 */ 097 protected Collection<Collection<T>> groupWorkItems(Collection<T> workItems, ProcessingResult result) { 098 Collection<Collection<T>> groupedWorkItems = new ArrayList<Collection<T>>(workItems.size()); 099 for (T workItem: workItems) { 100 Collection<T> c = new ArrayList<T>(1); 101 c.add(workItem); 102 groupedWorkItems.add(c); 103 } 104 return groupedWorkItems; 105 } 106 107 /** 108 * Template method that subclasses should override to process a given work item and mark it 109 * as untaken afterwards 110 * @param item the work item 111 * @return a collection of success messages 112 */ 113 protected abstract Collection<?> processWorkItems(Collection<T> items); 114 115 /** 116 * Template method that subclasses should override to unlock a given work item when procesing has failed. 117 * @param item the work item to unlock 118 */ 119 protected abstract void unlockWorkItem(T item); 120 121 /** 122 * Main processing method which invokes subclass implementations of template methods 123 * to obtain available work items, and process them concurrently 124 * @return a ProcessingResult object containing the results of processing 125 */ 126 public ProcessingResult run() { 127 LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] STARTING RUN"); 128 129 final ProcessingResult result = new ProcessingResult(); 130 131 // retrieve list of available work items in a transaction 132 Collection<T> items = null; 133 try { 134 items = (Collection<T>) 135 createNewTransaction().execute(new TransactionCallback() { 136 public Object doInTransaction(TransactionStatus txStatus) { 137 return takeAvailableWorkItems(); 138 } 139 }); 140 } catch (DataAccessException dae) { 141 // Spring does not detect OJB's org.apache.ojb.broker.OptimisticLockException and turn it into a 142 // org.springframework.dao.OptimisticLockingFailureException? 143 OptimisticLockException optimisticLockException = Util.findExceptionInStack(dae, OptimisticLockException.class); 144 if (optimisticLockException != null) { 145 // anticipated in the case that another thread is trying to grab items 146 LOG.info("Contention while taking work items"); 147 } else { 148 // in addition to logging a message, should we throw an exception or log a failure here? 149 LOG.error("Error taking work items", dae); 150 Throwable t = dae.getMostSpecificCause(); 151 if (t != null && t instanceof SQLException) { 152 SQLException sqle = (SQLException) t; 153 if (sqle.getErrorCode() == ORACLE_00054 && StringUtils.contains(sqle.getMessage(), "resource busy")) { 154 // this is expected and non-fatal given that these jobs will run again 155 LOG.warn("Select for update lock contention encountered"); 156 } else if (sqle.getErrorCode() == ORACLE_00060 && StringUtils.contains(sqle.getMessage(), "deadlock detected")) { 157 // this is bad...two parties are waiting forever somewhere... 158 // database is probably wedged now :( 159 LOG.error("Select for update deadlock encountered!"); 160 } 161 } 162 } 163 return result; 164 } catch (UnexpectedRollbackException ure) { 165 // occurs against Mckoi... :( 166 LOG.error("UnexpectedRollbackException - possibly due to Mckoi"); 167 return result; 168 } catch (TransactionException te) { 169 LOG.error("Error occurred obtaining available work items", te); 170 result.addFailure("Error occurred obtaining available work items: " + te); 171 return result; 172 } 173 174 Collection<Collection<T>> groupedWorkItems = groupWorkItems(items, result); 175 176 // now iterate over all work groups and process each 177 Iterator<Collection<T>> i = groupedWorkItems.iterator(); 178 List<Future> futures = new ArrayList<Future>(); 179 while(i.hasNext()) { 180 final Collection<T> workUnit= i.next(); 181 182 LOG.info("Processing work unit: " + workUnit); 183 /* performed within transaction */ 184 /* executor manages threads to run work items... */ 185 futures.add(executor.submit(new Callable() { 186 public Object call() throws Exception { 187 ProcessingResult result = new ProcessingResult(); 188 try { 189 Collection<?> successes = (Collection<Object>) 190 createNewTransaction().execute(new TransactionCallback() { 191 public Object doInTransaction(TransactionStatus txStatus) { 192 return processWorkItems(workUnit); 193 } 194 }); 195 result.addAllSuccesses(successes); 196 } catch (Exception e) { 197 LOG.error("Error occurred processing work unit " + workUnit, e); 198 for (final T workItem: workUnit) { 199 LOG.error("Error occurred processing work item " + workItem, e); 200 result.addFailure("Error occurred processing work item " + workItem + ": " + e); 201 unlockWorkItemAtomically(workItem); 202 } 203 } 204 return result; 205 } 206 })); 207 } 208 209 // wait for workers to finish 210 for (Future f: futures) { 211 try { 212 ProcessingResult workResult = (ProcessingResult) f.get(); 213 result.add(workResult); 214 } catch (Exception e) { 215 String message = "Error obtaining work result: " + e; 216 LOG.error(message, e); 217 result.addFailure(message); 218 } 219 } 220 221 LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] FINISHED RUN - " + result); 222 223 return result; 224 } 225 226 protected void unlockWorkItemAtomically(final T workItem) { 227 try { 228 createNewTransaction().execute(new TransactionCallback() { 229 public Object doInTransaction(TransactionStatus txStatus) { 230 LOG.info("Unlocking failed work item: " + workItem); 231 unlockWorkItem(workItem); 232 return null; 233 } 234 }); 235 } catch (Exception e2) { 236 LOG.error("Error unlocking failed work item " + workItem, e2); 237 } 238 } 239 }