001 /*
002 * Copyright 2007 The Kuali Foundation
003 *
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/ecl2.php
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package org.kuali.rice.ken.service.impl;
017
018 import java.sql.SQLException;
019 import java.sql.Timestamp;
020 import java.util.ArrayList;
021 import java.util.Collection;
022 import java.util.Iterator;
023 import java.util.List;
024 import java.util.concurrent.Callable;
025 import java.util.concurrent.ExecutorService;
026 import java.util.concurrent.Future;
027
028 import org.apache.commons.lang.StringUtils;
029 import org.apache.log4j.Logger;
030 import org.apache.ojb.broker.OptimisticLockException;
031 import org.kuali.rice.ken.service.ProcessingResult;
032 import org.kuali.rice.ken.util.Util;
033 import org.springframework.dao.DataAccessException;
034 import org.springframework.transaction.PlatformTransactionManager;
035 import org.springframework.transaction.TransactionException;
036 import org.springframework.transaction.TransactionStatus;
037 import org.springframework.transaction.UnexpectedRollbackException;
038 import org.springframework.transaction.support.TransactionCallback;
039 import org.springframework.transaction.support.TransactionTemplate;
040
041 /**
042 * Base class for jobs that must obtain a set of work items atomically
043 * @author Kuali Rice Team (rice.collab@kuali.org)
044 */
045 public abstract class ConcurrentJob<T> {
046 /**
047 * Oracle's "ORA-00054: resource busy and acquire with NOWAIT specified"
048 */
049 private static final int ORACLE_00054 = 54;
050 /**
051 * Oracle's "ORA-00060 deadlock detected while waiting for resource"
052 */
053 private static final int ORACLE_00060 = 60;
054
055
056 protected final Logger LOG = Logger.getLogger(getClass());
057
058 protected ExecutorService executor;
059 protected PlatformTransactionManager txManager;
060
061 /**
062 * Constructs a ConcurrentJob instance.
063 * @param txManager PlatformTransactionManager to use for transactions
064 * @param executor the ExecutorService to use to process work items
065 */
066 public ConcurrentJob(PlatformTransactionManager txManager, ExecutorService executor) {
067 this.txManager = txManager;
068 this.executor = executor;
069 }
070
071 /**
072 * Helper method for creating a TransactionTemplate initialized to create
073 * a new transaction
074 * @return a TransactionTemplate initialized to create a new transaction
075 */
076 protected TransactionTemplate createNewTransaction() {
077 TransactionTemplate tt = new TransactionTemplate(txManager);
078 tt.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
079 return tt;
080 }
081
082 /**
083 * Template method that subclasses should override to obtain a set of available work items
084 * and mark them as taken
085 * @return a collection of available work items that have been marked as taken
086 */
087 protected abstract Collection<T> takeAvailableWorkItems();
088
089 /**
090 * Template method that subclasses should override to group work items into units of work
091 * @param workItems list of work items to break into groups
092 * @param result ProcessingResult to modify if there are any failures...this is sort of a hack because previously
093 * failure to obtain a deliverer was considered a work item failure, and now this method has been factored out...
094 * but the tests still want to see the failure
095 * @return a collection of collection of work items
096 */
097 protected Collection<Collection<T>> groupWorkItems(Collection<T> workItems, ProcessingResult result) {
098 Collection<Collection<T>> groupedWorkItems = new ArrayList<Collection<T>>(workItems.size());
099 for (T workItem: workItems) {
100 Collection<T> c = new ArrayList<T>(1);
101 c.add(workItem);
102 groupedWorkItems.add(c);
103 }
104 return groupedWorkItems;
105 }
106
107 /**
108 * Template method that subclasses should override to process a given work item and mark it
109 * as untaken afterwards
110 * @param item the work item
111 * @return a collection of success messages
112 */
113 protected abstract Collection<?> processWorkItems(Collection<T> items);
114
115 /**
116 * Template method that subclasses should override to unlock a given work item when procesing has failed.
117 * @param item the work item to unlock
118 */
119 protected abstract void unlockWorkItem(T item);
120
121 /**
122 * Main processing method which invokes subclass implementations of template methods
123 * to obtain available work items, and process them concurrently
124 * @return a ProcessingResult object containing the results of processing
125 */
126 public ProcessingResult run() {
127 LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] STARTING RUN");
128
129 final ProcessingResult result = new ProcessingResult();
130
131 // retrieve list of available work items in a transaction
132 Collection<T> items = null;
133 try {
134 items = (Collection<T>)
135 createNewTransaction().execute(new TransactionCallback() {
136 public Object doInTransaction(TransactionStatus txStatus) {
137 return takeAvailableWorkItems();
138 }
139 });
140 } catch (DataAccessException dae) {
141 // Spring does not detect OJB's org.apache.ojb.broker.OptimisticLockException and turn it into a
142 // org.springframework.dao.OptimisticLockingFailureException?
143 OptimisticLockException optimisticLockException = Util.findExceptionInStack(dae, OptimisticLockException.class);
144 if (optimisticLockException != null) {
145 // anticipated in the case that another thread is trying to grab items
146 LOG.info("Contention while taking work items");
147 } else {
148 // in addition to logging a message, should we throw an exception or log a failure here?
149 LOG.error("Error taking work items", dae);
150 Throwable t = dae.getMostSpecificCause();
151 if (t != null && t instanceof SQLException) {
152 SQLException sqle = (SQLException) t;
153 if (sqle.getErrorCode() == ORACLE_00054 && StringUtils.contains(sqle.getMessage(), "resource busy")) {
154 // this is expected and non-fatal given that these jobs will run again
155 LOG.warn("Select for update lock contention encountered");
156 } else if (sqle.getErrorCode() == ORACLE_00060 && StringUtils.contains(sqle.getMessage(), "deadlock detected")) {
157 // this is bad...two parties are waiting forever somewhere...
158 // database is probably wedged now :(
159 LOG.error("Select for update deadlock encountered!");
160 }
161 }
162 }
163 return result;
164 } catch (UnexpectedRollbackException ure) {
165 // occurs against Mckoi... :(
166 LOG.error("UnexpectedRollbackException - possibly due to Mckoi");
167 return result;
168 } catch (TransactionException te) {
169 LOG.error("Error occurred obtaining available work items", te);
170 result.addFailure("Error occurred obtaining available work items: " + te);
171 return result;
172 }
173
174 Collection<Collection<T>> groupedWorkItems = groupWorkItems(items, result);
175
176 // now iterate over all work groups and process each
177 Iterator<Collection<T>> i = groupedWorkItems.iterator();
178 List<Future> futures = new ArrayList<Future>();
179 while(i.hasNext()) {
180 final Collection<T> workUnit= i.next();
181
182 LOG.info("Processing work unit: " + workUnit);
183 /* performed within transaction */
184 /* executor manages threads to run work items... */
185 futures.add(executor.submit(new Callable() {
186 public Object call() throws Exception {
187 ProcessingResult result = new ProcessingResult();
188 try {
189 Collection<?> successes = (Collection<Object>)
190 createNewTransaction().execute(new TransactionCallback() {
191 public Object doInTransaction(TransactionStatus txStatus) {
192 return processWorkItems(workUnit);
193 }
194 });
195 result.addAllSuccesses(successes);
196 } catch (Exception e) {
197 LOG.error("Error occurred processing work unit " + workUnit, e);
198 for (final T workItem: workUnit) {
199 LOG.error("Error occurred processing work item " + workItem, e);
200 result.addFailure("Error occurred processing work item " + workItem + ": " + e);
201 unlockWorkItemAtomically(workItem);
202 }
203 }
204 return result;
205 }
206 }));
207 }
208
209 // wait for workers to finish
210 for (Future f: futures) {
211 try {
212 ProcessingResult workResult = (ProcessingResult) f.get();
213 result.add(workResult);
214 } catch (Exception e) {
215 String message = "Error obtaining work result: " + e;
216 LOG.error(message, e);
217 result.addFailure(message);
218 }
219 }
220
221 LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] FINISHED RUN - " + result);
222
223 return result;
224 }
225
226 protected void unlockWorkItemAtomically(final T workItem) {
227 try {
228 createNewTransaction().execute(new TransactionCallback() {
229 public Object doInTransaction(TransactionStatus txStatus) {
230 LOG.info("Unlocking failed work item: " + workItem);
231 unlockWorkItem(workItem);
232 return null;
233 }
234 });
235 } catch (Exception e2) {
236 LOG.error("Error unlocking failed work item " + workItem, e2);
237 }
238 }
239 }