001 /**
002 * Copyright 2005-2012 The Kuali Foundation
003 *
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/ecl2.php
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package org.kuali.rice.kcb.quartz;
017
018 import org.apache.log4j.Logger;
019 import org.apache.ojb.broker.OptimisticLockException;
020 import org.kuali.rice.core.api.util.RiceUtilities;
021 import org.kuali.rice.kcb.quartz.ProcessingResult.Failure;
022 import org.springframework.beans.factory.annotation.Required;
023 import org.springframework.dao.DataAccessException;
024 import org.springframework.transaction.PlatformTransactionManager;
025 import org.springframework.transaction.TransactionException;
026 import org.springframework.transaction.TransactionStatus;
027 import org.springframework.transaction.UnexpectedRollbackException;
028 import org.springframework.transaction.support.TransactionCallback;
029 import org.springframework.transaction.support.TransactionTemplate;
030
031 import java.sql.Timestamp;
032 import java.util.ArrayList;
033 import java.util.Collection;
034 import java.util.Iterator;
035 import java.util.List;
036 import java.util.concurrent.Callable;
037 import java.util.concurrent.ExecutorService;
038 import java.util.concurrent.Executors;
039 import java.util.concurrent.Future;
040 import java.util.concurrent.ThreadFactory;
041
042 /**
043 * Base class for jobs that must obtain a set of work items atomically
044 * @author Kuali Rice Team (rice.collab@kuali.org)
045 */
046 public abstract class ConcurrentJob<T> {
047 protected final Logger LOG = Logger.getLogger(getClass());
048
049 protected ExecutorService executor = Executors.newSingleThreadExecutor(new KCBThreadFactory());
050 protected PlatformTransactionManager txManager;
051
052 /**
053 * Sets the {@link ExecutorService} to use to process work items. Default is single-threaded.
054 * @param executor the {@link ExecutorService} to use to process work items. Default is single-threaded.
055 */
056 public void setExecutorService(ExecutorService executor) {
057 this.executor = executor;
058 }
059
060 /**
061 * Sets the {@link PlatformTransactionManager}
062 * @param txManager the {@link PlatformTransactionManager}
063 */
064 @Required
065 public void setTransactionManager(PlatformTransactionManager txManager) {
066 this.txManager = txManager;
067 }
068
069 /**
070 * Helper method for creating a TransactionTemplate initialized to create
071 * a new transaction
072 * @return a TransactionTemplate initialized to create a new transaction
073 */
074 protected TransactionTemplate createNewTransaction() {
075 TransactionTemplate tt = new TransactionTemplate(txManager);
076 tt.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
077 return tt;
078 }
079
080 /**
081 * Template method that subclasses should override to obtain a set of available work items
082 * and mark them as taken
083 * @return a collection of available work items that have been marked as taken
084 */
085 protected abstract Collection<T> takeAvailableWorkItems();
086
087 /**
088 * Template method that subclasses should override to group work items into units of work
089 * @param workItems list of work items to break into groups
090 * @param result ProcessingResult to modify if there are any failures...this is sort of a hack because previously
091 * failure to obtain a deliverer was considered a work item failure, and now this method has been factored out...
092 * but the tests still want to see the failure
093 * @return a collection of collection of work items
094 */
095 protected Collection<Collection<T>> groupWorkItems(Collection<T> workItems, ProcessingResult<T> result) {
096 Collection<Collection<T>> groupedWorkItems = new ArrayList<Collection<T>>(workItems.size());
097 for (T workItem: workItems) {
098 Collection<T> c = new ArrayList<T>(1);
099 c.add(workItem);
100 groupedWorkItems.add(c);
101 }
102 return groupedWorkItems;
103 }
104
105 /**
106 * Template method that subclasses should override to process a given work item and mark it
107 * as untaken afterwards
108 * @param item the work item
109 * @return a collection of success messages
110 */
111 protected abstract Collection<T> processWorkItems(Collection<T> items);
112
113 /**
114 * Template method that subclasses should override to unlock a given work item when procesing has failed.
115 * @param item the work item to unlock
116 */
117 protected abstract void unlockWorkItem(T item);
118
119 /**
120 * Main processing method which invokes subclass implementations of template methods
121 * to obtain available work items, and process them concurrently
122 * @return a ProcessingResult object containing the results of processing
123 */
124 public ProcessingResult<T> run() {
125 LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] STARTING RUN");
126
127 final ProcessingResult<T> result = new ProcessingResult<T>();
128
129 // retrieve list of available work items in a transaction
130 final Collection<T> items;
131 try {
132 items = executeInTransaction(new TransactionCallback() {
133 public Object doInTransaction(TransactionStatus txStatus) {
134 return takeAvailableWorkItems();
135 }
136 });
137 } catch (DataAccessException dae) {
138 // Spring does not detect OJB's org.apache.ojb.broker.OptimisticLockException and turn it into a
139 // org.springframework.dao.OptimisticLockingFailureException?
140 OptimisticLockException optimisticLockException = RiceUtilities.findExceptionInStack(dae, OptimisticLockException.class);
141 if (optimisticLockException != null) {
142 // anticipated in the case that another thread is trying to grab items
143 LOG.info("Contention while taking work items");
144 } else {
145 // in addition to logging a message, should we throw an exception or log a failure here?
146 LOG.error("Error taking work items", dae);
147 }
148 return result;
149 } catch (UnexpectedRollbackException ure) {
150 // occurs against Mckoi... :(
151 LOG.error("UnexpectedRollbackException - possibly due to Mckoi");
152 return result;
153 } catch (TransactionException te) {
154 LOG.error("Error occurred obtaining available work items", te);
155 result.addFailure(new Failure<T>(te));
156 return result;
157 }
158
159 Collection<Collection<T>> groupedWorkItems = groupWorkItems(items, result);
160
161 // now iterate over all work groups and process each
162 Iterator<Collection<T>> i = groupedWorkItems.iterator();
163 List<Future> futures = new ArrayList<Future>();
164 while(i.hasNext()) {
165 final Collection<T> workUnit= i.next();
166
167 LOG.info("Processing work unit: " + workUnit);
168 /* performed within transaction */
169 /* executor manages threads to run work items... */
170 futures.add(executor.submit(new Callable() {
171 public Object call() throws Exception {
172 ProcessingResult<T> result = new ProcessingResult<T>();
173 try {
174 Collection<T> successes = executeInTransaction(new TransactionCallback() {
175 public Object doInTransaction(TransactionStatus txStatus) {
176 return processWorkItems(workUnit);
177 }
178 });
179 result.addAllSuccesses(successes);
180 } catch (Exception e) {
181 LOG.error("Error occurred processing work unit " + workUnit, e);
182 for (final T workItem: workUnit) {
183 LOG.error("Error occurred processing work item " + workItem, e);
184 result.addFailure(new Failure<T>(workItem, e));
185 unlockWorkItemAtomically(workItem);
186 }
187 }
188 return result;
189 }
190 }));
191 }
192
193 // wait for workers to finish
194 for (Future f: futures) {
195 try {
196 ProcessingResult<T> workResult = (ProcessingResult<T>) f.get();
197 result.add(workResult);
198 } catch (Exception e) {
199 String message = "Error obtaining work result: " + e;
200 LOG.error(message, e);
201 result.addFailure(new Failure<T>(e, message));
202 }
203 }
204
205 finishProcessing(result);
206
207 LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] FINISHED RUN - " + result);
208
209 return result;
210 }
211
212 /**
213 * Template method called after processing of work items has completed
214 */
215 protected void finishProcessing(ProcessingResult<T> result) {}
216
217 protected void unlockWorkItemAtomically(final T workItem) {
218 try {
219 executeInTransaction(new TransactionCallback() {
220 public Object doInTransaction(TransactionStatus txStatus) {
221 LOG.info("Unlocking failed work item: " + workItem);
222 unlockWorkItem(workItem);
223 return null;
224 }
225 });
226 } catch (Exception e2) {
227 LOG.error("Error unlocking failed work item " + workItem, e2);
228 }
229 }
230
231 protected <X> X executeInTransaction(TransactionCallback callback) {
232 // haha just kidding
233 //return (X) callback.doInTransaction(null);
234 return (X) createNewTransaction().execute(callback);
235 }
236
237 private static class KCBThreadFactory implements ThreadFactory {
238
239 private ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
240
241 public Thread newThread(Runnable runnable) {
242 Thread thread = defaultThreadFactory.newThread(runnable);
243 thread.setName("KCB-job-" + thread.getName());
244 return thread;
245 }
246 }
247 }