001    /*
002     * Copyright 2007 The Kuali Foundation
003     * 
004     * Licensed under the Educational Community License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     * 
008     * http://www.opensource.org/licenses/ecl2.php
009     * 
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package org.kuali.rice.ken.service.impl;
017    
018    import java.sql.SQLException;
019    import java.sql.Timestamp;
020    import java.util.ArrayList;
021    import java.util.Collection;
022    import java.util.Iterator;
023    import java.util.List;
024    import java.util.concurrent.Callable;
025    import java.util.concurrent.ExecutorService;
026    import java.util.concurrent.Future;
027    
028    import org.apache.commons.lang.StringUtils;
029    import org.apache.log4j.Logger;
030    import org.apache.ojb.broker.OptimisticLockException;
031    import org.kuali.rice.ken.service.ProcessingResult;
032    import org.kuali.rice.ken.util.Util;
033    import org.springframework.dao.DataAccessException;
034    import org.springframework.transaction.PlatformTransactionManager;
035    import org.springframework.transaction.TransactionException;
036    import org.springframework.transaction.TransactionStatus;
037    import org.springframework.transaction.UnexpectedRollbackException;
038    import org.springframework.transaction.support.TransactionCallback;
039    import org.springframework.transaction.support.TransactionTemplate;
040    
041    /**
042     * Base class for jobs that must obtain a set of work items atomically
043     * @author Kuali Rice Team (rice.collab@kuali.org)
044     */
045    public abstract class ConcurrentJob<T> {
046        /**
047         * Oracle's "ORA-00054: resource busy and acquire with NOWAIT specified"
048         */
049        private static final int ORACLE_00054 = 54;
050        /**
051         * Oracle's "ORA-00060 deadlock detected while waiting for resource"
052         */
053        private static final int ORACLE_00060 = 60;
054        
055    
056        protected final Logger LOG = Logger.getLogger(getClass());
057    
058        protected ExecutorService executor;
059        protected PlatformTransactionManager txManager;
060        
061        /**
062         * Constructs a ConcurrentJob instance.
063         * @param txManager PlatformTransactionManager to use for transactions
064         * @param executor the ExecutorService to use to process work items
065         */
066        public ConcurrentJob(PlatformTransactionManager txManager, ExecutorService executor) {
067            this.txManager = txManager;
068            this.executor = executor;
069        }
070    
071        /**
072         * Helper method for creating a TransactionTemplate initialized to create
073         * a new transaction
074         * @return a TransactionTemplate initialized to create a new transaction
075         */
076        protected TransactionTemplate createNewTransaction() {
077            TransactionTemplate tt = new TransactionTemplate(txManager);
078            tt.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
079            return tt;
080        }
081    
082        /**
083         * Template method that subclasses should override to obtain a set of available work items
084         * and mark them as taken
085         * @return a collection of available work items that have been marked as taken
086         */
087        protected abstract Collection<T> takeAvailableWorkItems();
088    
089        /**
090         * Template method that subclasses should override to group work items into units of work
091         * @param workItems list of work items to break into groups
092         * @param result ProcessingResult to modify if there are any failures...this is sort of a hack because previously
093         * failure to obtain a deliverer was considered a work item failure, and now this method has been factored out...
094         * but the tests still want to see the failure
095         * @return a collection of collection of work items
096         */
097        protected Collection<Collection<T>> groupWorkItems(Collection<T> workItems, ProcessingResult result) {
098            Collection<Collection<T>> groupedWorkItems = new ArrayList<Collection<T>>(workItems.size());
099            for (T workItem: workItems) {
100                Collection<T> c = new ArrayList<T>(1);
101                c.add(workItem);
102                groupedWorkItems.add(c);
103            }
104            return groupedWorkItems;
105        }
106    
107        /**
108         * Template method that subclasses should override to process a given work item and mark it
109         * as untaken afterwards
110         * @param item the work item
111         * @return a collection of success messages
112         */
113        protected abstract Collection<?> processWorkItems(Collection<T> items);
114    
115        /**
116         * Template method that subclasses should override to unlock a given work item when procesing has failed.
117         * @param item the work item to unlock
118         */
119        protected abstract void unlockWorkItem(T item);
120    
121        /**
122         * Main processing method which invokes subclass implementations of template methods
123         * to obtain available work items, and process them concurrently
124         * @return a ProcessingResult object containing the results of processing
125         */
126        public ProcessingResult run() {
127            LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] STARTING RUN");
128    
129            final ProcessingResult result = new ProcessingResult();
130    
131            // retrieve list of available work items in a transaction
132            Collection<T> items = null;
133            try {
134                items = (Collection<T>)
135                    createNewTransaction().execute(new TransactionCallback() {
136                        public Object doInTransaction(TransactionStatus txStatus) {
137                            return takeAvailableWorkItems();
138                        }
139                    });
140            } catch (DataAccessException dae) {
141                // Spring does not detect OJB's org.apache.ojb.broker.OptimisticLockException and turn it into a
142                // org.springframework.dao.OptimisticLockingFailureException?
143                OptimisticLockException optimisticLockException = Util.findExceptionInStack(dae, OptimisticLockException.class);
144                if (optimisticLockException != null) {
145                    // anticipated in the case that another thread is trying to grab items
146                    LOG.info("Contention while taking work items");
147                } else {
148                    // in addition to logging a message, should we throw an exception or log a failure here?
149                    LOG.error("Error taking work items", dae);
150                    Throwable t = dae.getMostSpecificCause();
151                    if (t != null && t instanceof SQLException) {
152                        SQLException sqle = (SQLException) t;
153                        if (sqle.getErrorCode() == ORACLE_00054 && StringUtils.contains(sqle.getMessage(), "resource busy")) {
154                            // this is expected and non-fatal given that these jobs will run again
155                            LOG.warn("Select for update lock contention encountered");
156                        } else if (sqle.getErrorCode() == ORACLE_00060 && StringUtils.contains(sqle.getMessage(), "deadlock detected")) {
157                            // this is bad...two parties are waiting forever somewhere...
158                            // database is probably wedged now :(
159                            LOG.error("Select for update deadlock encountered!");
160                        }
161                    }
162                }
163                return result;
164            } catch (UnexpectedRollbackException ure) {
165                // occurs against Mckoi... :(
166                LOG.error("UnexpectedRollbackException - possibly due to Mckoi");
167                return result;
168            } catch (TransactionException te) {
169                LOG.error("Error occurred obtaining available work items", te);
170                result.addFailure("Error occurred obtaining available work items: " + te);
171                return result;
172            }
173    
174            Collection<Collection<T>> groupedWorkItems = groupWorkItems(items, result);
175    
176            // now iterate over all work groups and process each
177            Iterator<Collection<T>> i = groupedWorkItems.iterator();
178            List<Future> futures = new ArrayList<Future>();
179            while(i.hasNext()) {
180                final Collection<T> workUnit= i.next();
181    
182                LOG.info("Processing work unit: " + workUnit);
183                /* performed within transaction */
184                /* executor manages threads to run work items... */
185                futures.add(executor.submit(new Callable() {
186                    public Object call() throws Exception {
187                        ProcessingResult result = new ProcessingResult();
188                        try {
189                            Collection<?> successes = (Collection<Object>)
190                                createNewTransaction().execute(new TransactionCallback() {
191                                    public Object doInTransaction(TransactionStatus txStatus) {
192                                        return processWorkItems(workUnit);
193                                    }
194                                });
195                            result.addAllSuccesses(successes);
196                        } catch (Exception e) {
197                            LOG.error("Error occurred processing work unit " + workUnit, e);
198                            for (final T workItem: workUnit) {
199                                LOG.error("Error occurred processing work item " + workItem, e);
200                                result.addFailure("Error occurred processing work item " + workItem + ": " + e);
201                                unlockWorkItemAtomically(workItem);
202                            }
203                        }
204                        return result;
205                    }
206                }));
207            }
208    
209            // wait for workers to finish
210            for (Future f: futures) {
211                try {
212                    ProcessingResult workResult = (ProcessingResult) f.get();
213                    result.add(workResult);
214                } catch (Exception e) {
215                    String message = "Error obtaining work result: " + e;
216                    LOG.error(message, e);
217                    result.addFailure(message);
218                }
219            }
220    
221            LOG.debug("[" + new Timestamp(System.currentTimeMillis()).toString() + "] FINISHED RUN - " + result);
222    
223            return result;
224        }
225        
226        protected void unlockWorkItemAtomically(final T workItem) {
227            try {
228                createNewTransaction().execute(new TransactionCallback() {
229                    public Object doInTransaction(TransactionStatus txStatus) {
230                        LOG.info("Unlocking failed work item: " + workItem);
231                        unlockWorkItem(workItem);
232                        return null;
233                    }
234                });
235            } catch (Exception e2) {
236                LOG.error("Error unlocking failed work item " + workItem, e2);
237            }
238        }
239    }