001    /**
002     * Copyright 2005-2012 The Kuali Foundation
003     *
004     * Licensed under the Educational Community License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.opensource.org/licenses/ecl2.php
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package org.kuali.rice.kcb.quartz;
017    
018    import org.apache.log4j.Logger;
019    import org.apache.ojb.broker.OptimisticLockException;
020    import org.kuali.rice.core.api.util.RiceUtilities;
021    import org.kuali.rice.kcb.quartz.ProcessingResult.Failure;
022    import org.springframework.beans.factory.annotation.Required;
023    import org.springframework.dao.DataAccessException;
024    import org.springframework.transaction.PlatformTransactionManager;
025    import org.springframework.transaction.TransactionException;
026    import org.springframework.transaction.TransactionStatus;
027    import org.springframework.transaction.UnexpectedRollbackException;
028    import org.springframework.transaction.support.TransactionCallback;
029    import org.springframework.transaction.support.TransactionTemplate;
030    
031    import java.sql.Timestamp;
032    import java.util.ArrayList;
033    import java.util.Collection;
034    import java.util.Iterator;
035    import java.util.List;
036    import java.util.concurrent.Callable;
037    import java.util.concurrent.ExecutorService;
038    import java.util.concurrent.Executors;
039    import java.util.concurrent.Future;
040    import java.util.concurrent.ThreadFactory;
041    
042    /**
043     * Base class for jobs that must obtain a set of work items atomically
044     * @author Kuali Rice Team (rice.collab@kuali.org)
045     */
046    public abstract class ConcurrentJob<T> {
047        protected final Logger LOG = Logger.getLogger(getClass());
048    
049        protected ExecutorService executor = Executors.newSingleThreadExecutor(new KCBThreadFactory());
050        protected PlatformTransactionManager txManager;
051    
052        /**
053         * Sets the {@link ExecutorService} to use to process work items.  Default is single-threaded.
054         * @param executor the {@link ExecutorService} to use to process work items.  Default is single-threaded.
055         */
056        public void setExecutorService(ExecutorService executor) {
057            this.executor = executor;
058        }
059    
060        /**
061         * Sets the {@link PlatformTransactionManager}
062         * @param txManager the {@link PlatformTransactionManager} 
063         */
064        @Required
065        public void setTransactionManager(PlatformTransactionManager txManager) {
066            this.txManager = txManager;
067        }
068    
069        /**
070         * Helper method for creating a TransactionTemplate initialized to create
071         * a new transaction
072         * @return a TransactionTemplate initialized to create a new transaction
073         */
074        protected TransactionTemplate createNewTransaction() {
075            TransactionTemplate tt = new TransactionTemplate(txManager);
076            tt.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
077            return tt;
078        }
079    
080        /**
081         * Template method that subclasses should override to obtain a set of available work items
082         * and mark them as taken
083         * @return a collection of available work items that have been marked as taken
084         */
085        protected abstract Collection<T> takeAvailableWorkItems();
086    
087        /**
088         * Template method that subclasses should override to group work items into units of work
089         * @param workItems list of work items to break into groups
090         * @param result ProcessingResult to modify if there are any failures...this is sort of a hack because previously
091         * failure to obtain a deliverer was considered a work item failure, and now this method has been factored out...
092         * but the tests still want to see the failure
093         * @return a collection of collection of work items
094         */
095        protected Collection<Collection<T>> groupWorkItems(Collection<T> workItems, ProcessingResult<T> result) {
096            Collection<Collection<T>> groupedWorkItems = new ArrayList<Collection<T>>(workItems.size());
097            for (T workItem: workItems) {
098                Collection<T> c = new ArrayList<T>(1);
099                c.add(workItem);
100                groupedWorkItems.add(c);
101            }
102            return groupedWorkItems;
103        }
104    
105        /**
106         * Template method that subclasses should override to process a given work item and mark it
107         * as untaken afterwards
108         * @param item the work item
109         * @return a collection of success messages
110         */
111        protected abstract Collection<T> processWorkItems(Collection<T> items);
112    
113        /**
114         * Template method that subclasses should override to unlock a given work item when procesing has failed.
115         * @param item the work item to unlock
116         */
117        protected abstract void unlockWorkItem(T item);
118    
119        /**
120         * Main processing method which invokes subclass implementations of template methods
121         * to obtain available work items, and process them concurrently
122         * @return a ProcessingResult object containing the results of processing
123         */
124        public ProcessingResult<T> run() {
125            LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] STARTING RUN");
126    
127            final ProcessingResult<T> result = new ProcessingResult<T>();
128    
129            // retrieve list of available work items in a transaction
130            final Collection<T> items;
131            try {
132                items = executeInTransaction(new TransactionCallback() {
133                    public Object doInTransaction(TransactionStatus txStatus) {
134                        return takeAvailableWorkItems();
135                    }
136                });
137            } catch (DataAccessException dae) {
138                // Spring does not detect OJB's org.apache.ojb.broker.OptimisticLockException and turn it into a
139                // org.springframework.dao.OptimisticLockingFailureException?
140                OptimisticLockException optimisticLockException = RiceUtilities.findExceptionInStack(dae, OptimisticLockException.class);
141                if (optimisticLockException != null) {
142                    // anticipated in the case that another thread is trying to grab items
143                    LOG.info("Contention while taking work items");
144                } else {
145                    // in addition to logging a message, should we throw an exception or log a failure here?
146                    LOG.error("Error taking work items", dae);
147                }
148                return result;
149            } catch (UnexpectedRollbackException ure) {
150                // occurs against Mckoi... :(
151                LOG.error("UnexpectedRollbackException - possibly due to Mckoi");
152                return result;
153            } catch (TransactionException te) {
154                LOG.error("Error occurred obtaining available work items", te);
155                result.addFailure(new Failure<T>(te));
156                return result;
157            }
158    
159            Collection<Collection<T>> groupedWorkItems = groupWorkItems(items, result);
160    
161            // now iterate over all work groups and process each
162            Iterator<Collection<T>> i = groupedWorkItems.iterator();
163            List<Future> futures = new ArrayList<Future>();
164            while(i.hasNext()) {
165                final Collection<T> workUnit= i.next();
166    
167                LOG.info("Processing work unit: " + workUnit);
168                /* performed within transaction */
169                /* executor manages threads to run work items... */
170                futures.add(executor.submit(new Callable() {
171                    public Object call() throws Exception {
172                        ProcessingResult<T> result = new ProcessingResult<T>();
173                        try {
174                            Collection<T> successes = executeInTransaction(new TransactionCallback() {
175                                public Object doInTransaction(TransactionStatus txStatus) {
176                                    return processWorkItems(workUnit);
177                                }
178                            });
179                            result.addAllSuccesses(successes);
180                        } catch (Exception e) {
181                            LOG.error("Error occurred processing work unit " + workUnit, e);
182                            for (final T workItem: workUnit) {
183                                LOG.error("Error occurred processing work item " + workItem, e);
184                                result.addFailure(new Failure<T>(workItem, e));
185                                unlockWorkItemAtomically(workItem);
186                            }
187                        }
188                        return result;
189                    }
190                }));
191            }
192    
193            // wait for workers to finish
194            for (Future f: futures) {
195                try {
196                    ProcessingResult<T> workResult = (ProcessingResult<T>) f.get();
197                    result.add(workResult);
198                } catch (Exception e) {
199                    String message = "Error obtaining work result: " + e;
200                    LOG.error(message, e);
201                    result.addFailure(new Failure<T>(e, message));
202                }
203            }
204    
205            finishProcessing(result);
206    
207            LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] FINISHED RUN - " + result);
208    
209            return result;
210        }
211    
212        /**
213         * Template method called after processing of work items has completed
214         */
215        protected void finishProcessing(ProcessingResult<T> result) {}
216    
217        protected void unlockWorkItemAtomically(final T workItem) {
218            try {
219                executeInTransaction(new TransactionCallback() {
220                    public Object doInTransaction(TransactionStatus txStatus) {
221                        LOG.info("Unlocking failed work item: " + workItem);
222                        unlockWorkItem(workItem);
223                        return null;
224                    }
225                });
226            } catch (Exception e2) {
227                LOG.error("Error unlocking failed work item " + workItem, e2);
228            }
229        }
230        
231        protected <X> X executeInTransaction(TransactionCallback callback) {
232            // haha just kidding
233            //return (X) callback.doInTransaction(null);
234            return (X) createNewTransaction().execute(callback);
235        }
236        
237        private static class KCBThreadFactory implements ThreadFactory {
238                    
239                    private ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
240                    
241                    public Thread newThread(Runnable runnable) {
242                            Thread thread = defaultThreadFactory.newThread(runnable);
243                            thread.setName("KCB-job-" + thread.getName());
244                            return thread;
245                }
246            }
247    }