001/**
002 * Copyright 2005-2014 The Kuali Foundation
003 *
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/ecl2.php
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.kuali.rice.kcb.quartz;
017
018import org.apache.log4j.Logger;
019import org.apache.ojb.broker.OptimisticLockException;
020import org.kuali.rice.core.api.util.RiceUtilities;
021import org.kuali.rice.kcb.quartz.ProcessingResult.Failure;
022import org.springframework.beans.factory.annotation.Required;
023import org.springframework.dao.DataAccessException;
024import org.springframework.transaction.PlatformTransactionManager;
025import org.springframework.transaction.TransactionException;
026import org.springframework.transaction.TransactionStatus;
027import org.springframework.transaction.UnexpectedRollbackException;
028import org.springframework.transaction.support.TransactionCallback;
029import org.springframework.transaction.support.TransactionTemplate;
030
031import java.sql.Timestamp;
032import java.util.ArrayList;
033import java.util.Collection;
034import java.util.Iterator;
035import java.util.List;
036import java.util.concurrent.Callable;
037import java.util.concurrent.ExecutorService;
038import java.util.concurrent.Executors;
039import java.util.concurrent.Future;
040import java.util.concurrent.ThreadFactory;
041
042/**
043 * Base class for jobs that must obtain a set of work items atomically
044 * @author Kuali Rice Team (rice.collab@kuali.org)
045 */
046public abstract class ConcurrentJob<T> {
047    protected final Logger LOG = Logger.getLogger(getClass());
048
049    protected ExecutorService executor = Executors.newSingleThreadExecutor(new KCBThreadFactory());
050    protected PlatformTransactionManager txManager;
051
052    /**
053     * Sets the {@link ExecutorService} to use to process work items.  Default is single-threaded.
054     * @param executor the {@link ExecutorService} to use to process work items.  Default is single-threaded.
055     */
056    public void setExecutorService(ExecutorService executor) {
057        this.executor = executor;
058    }
059
060    /**
061     * Sets the {@link PlatformTransactionManager}
062     * @param txManager the {@link PlatformTransactionManager} 
063     */
064    @Required
065    public void setTransactionManager(PlatformTransactionManager txManager) {
066        this.txManager = txManager;
067    }
068
069    /**
070     * Helper method for creating a TransactionTemplate initialized to create
071     * a new transaction
072     * @return a TransactionTemplate initialized to create a new transaction
073     */
074    protected TransactionTemplate createNewTransaction() {
075        TransactionTemplate tt = new TransactionTemplate(txManager);
076        tt.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
077        return tt;
078    }
079
080    /**
081     * Template method that subclasses should override to obtain a set of available work items
082     * and mark them as taken
083     * @return a collection of available work items that have been marked as taken
084     */
085    protected abstract Collection<T> takeAvailableWorkItems();
086
087    /**
088     * Template method that subclasses should override to group work items into units of work
089     * @param workItems list of work items to break into groups
090     * @param result ProcessingResult to modify if there are any failures...this is sort of a hack because previously
091     * failure to obtain a deliverer was considered a work item failure, and now this method has been factored out...
092     * but the tests still want to see the failure
093     * @return a collection of collection of work items
094     */
095    protected Collection<Collection<T>> groupWorkItems(Collection<T> workItems, ProcessingResult<T> result) {
096        Collection<Collection<T>> groupedWorkItems = new ArrayList<Collection<T>>(workItems.size());
097        for (T workItem: workItems) {
098            Collection<T> c = new ArrayList<T>(1);
099            c.add(workItem);
100            groupedWorkItems.add(c);
101        }
102        return groupedWorkItems;
103    }
104
105    /**
106     * Template method that subclasses should override to process a given work item and mark it
107     * as untaken afterwards
108     * @param item the work item
109     * @return a collection of success messages
110     */
111    protected abstract Collection<T> processWorkItems(Collection<T> items);
112
113    /**
114     * Template method that subclasses should override to unlock a given work item when procesing has failed.
115     * @param item the work item to unlock
116     */
117    protected abstract void unlockWorkItem(T item);
118
119    /**
120     * Main processing method which invokes subclass implementations of template methods
121     * to obtain available work items, and process them concurrently
122     * @return a ProcessingResult object containing the results of processing
123     */
124    public ProcessingResult<T> run() {
125        LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] STARTING RUN");
126
127        final ProcessingResult<T> result = new ProcessingResult<T>();
128
129        // retrieve list of available work items in a transaction
130        final Collection<T> items;
131        try {
132            items = executeInTransaction(new TransactionCallback() {
133                public Object doInTransaction(TransactionStatus txStatus) {
134                    return takeAvailableWorkItems();
135                }
136            });
137        } catch (DataAccessException dae) {
138            // Spring does not detect OJB's org.apache.ojb.broker.OptimisticLockException and turn it into a
139            // org.springframework.dao.OptimisticLockingFailureException?
140            OptimisticLockException optimisticLockException = RiceUtilities.findExceptionInStack(dae, OptimisticLockException.class);
141            if (optimisticLockException != null) {
142                // anticipated in the case that another thread is trying to grab items
143                LOG.info("Contention while taking work items");
144            } else {
145                // in addition to logging a message, should we throw an exception or log a failure here?
146                LOG.error("Error taking work items", dae);
147            }
148            return result;
149        } catch (UnexpectedRollbackException ure) {
150            LOG.error("UnexpectedRollbackException", ure);
151            return result;
152        } catch (TransactionException te) {
153            LOG.error("Error occurred obtaining available work items", te);
154            result.addFailure(new Failure<T>(te));
155            return result;
156        }
157
158        Collection<Collection<T>> groupedWorkItems = groupWorkItems(items, result);
159
160        // now iterate over all work groups and process each
161        Iterator<Collection<T>> i = groupedWorkItems.iterator();
162        List<Future> futures = new ArrayList<Future>();
163        while(i.hasNext()) {
164            final Collection<T> workUnit= i.next();
165
166            LOG.info("Processing work unit: " + workUnit);
167            /* performed within transaction */
168            /* executor manages threads to run work items... */
169            futures.add(executor.submit(new Callable() {
170                public Object call() throws Exception {
171                    ProcessingResult<T> result = new ProcessingResult<T>();
172                    try {
173                        Collection<T> successes = executeInTransaction(new TransactionCallback() {
174                            public Object doInTransaction(TransactionStatus txStatus) {
175                                return processWorkItems(workUnit);
176                            }
177                        });
178                        result.addAllSuccesses(successes);
179                    } catch (Exception e) {
180                        LOG.error("Error occurred processing work unit " + workUnit, e);
181                        for (final T workItem: workUnit) {
182                            LOG.error("Error occurred processing work item " + workItem, e);
183                            result.addFailure(new Failure<T>(workItem, e));
184                            unlockWorkItemAtomically(workItem);
185                        }
186                    }
187                    return result;
188                }
189            }));
190        }
191
192        // wait for workers to finish
193        for (Future f: futures) {
194            try {
195                ProcessingResult<T> workResult = (ProcessingResult<T>) f.get();
196                result.add(workResult);
197            } catch (Exception e) {
198                String message = "Error obtaining work result: " + e;
199                LOG.error(message, e);
200                result.addFailure(new Failure<T>(e, message));
201            }
202        }
203
204        finishProcessing(result);
205
206        LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] FINISHED RUN - " + result);
207
208        return result;
209    }
210
211    /**
212     * Template method called after processing of work items has completed
213     */
214    protected void finishProcessing(ProcessingResult<T> result) {}
215
216    protected void unlockWorkItemAtomically(final T workItem) {
217        try {
218            executeInTransaction(new TransactionCallback() {
219                public Object doInTransaction(TransactionStatus txStatus) {
220                    LOG.info("Unlocking failed work item: " + workItem);
221                    unlockWorkItem(workItem);
222                    return null;
223                }
224            });
225        } catch (Exception e2) {
226            LOG.error("Error unlocking failed work item " + workItem, e2);
227        }
228    }
229    
230    protected <X> X executeInTransaction(TransactionCallback callback) {
231        // haha just kidding
232        //return (X) callback.doInTransaction(null);
233        return (X) createNewTransaction().execute(callback);
234    }
235    
236    private static class KCBThreadFactory implements ThreadFactory {
237                
238                private ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
239                
240                public Thread newThread(Runnable runnable) {
241                        Thread thread = defaultThreadFactory.newThread(runnable);
242                        thread.setName("KCB-job-" + thread.getName());
243                        return thread;
244            }
245        }
246}