001    /**
002     * Copyright 2005-2012 The Kuali Foundation
003     *
004     * Licensed under the Educational Community License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.opensource.org/licenses/ecl2.php
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package org.kuali.rice.kcb.quartz;
017    
018    import org.apache.log4j.Logger;
019    import org.kuali.rice.core.framework.persistence.dao.GenericDao;
020    import org.kuali.rice.kcb.bo.Message;
021    import org.kuali.rice.kcb.bo.MessageDelivery;
022    import org.kuali.rice.kcb.bo.MessageDeliveryStatus;
023    import org.kuali.rice.kcb.deliverer.BulkMessageDeliverer;
024    import org.kuali.rice.kcb.deliverer.MessageDeliverer;
025    import org.kuali.rice.kcb.api.exception.MessageDeliveryProcessingException;
026    import org.kuali.rice.kcb.quartz.ProcessingResult.Failure;
027    import org.kuali.rice.kcb.service.GlobalKCBServiceLocator;
028    import org.kuali.rice.kcb.service.MessageDelivererRegistryService;
029    import org.kuali.rice.kcb.service.MessageDeliveryService;
030    import org.kuali.rice.kcb.service.MessageService;
031    import org.quartz.JobExecutionContext;
032    import org.quartz.JobExecutionException;
033    import org.quartz.StatefulJob;
034    import org.springframework.beans.factory.annotation.Required;
035    
036    import java.util.ArrayList;
037    import java.util.Collection;
038    import java.util.HashMap;
039    import java.util.HashSet;
040    import java.util.LinkedList;
041    import java.util.List;
042    import java.util.Map;
043    import java.util.Set;
044    
045    /**
046     * Job that delivers messages to endpoints.  This job is not really stateful,
047     * but should not be executed concurrently.
048     * 
049     * @author Kuali Rice Team (rice.collab@kuali.org)
050     */
051    public class MessageProcessingJob extends ConcurrentJob<MessageDelivery> implements StatefulJob {
052        public static final String NAME = "MessageProcessingJobDetail";
053        public static final String GROUP = "KCB-Delivery";
054    
055        public static enum Mode {
056            DELIVER, REMOVE
057        }
058    
059        private static final Logger LOG = Logger.getLogger(MessageProcessingJob.class);
060        
061        private GenericDao dao;
062        private MessageDelivererRegistryService registry;
063        private MessageDeliveryService messageDeliveryService;
064        private Long messageId;
065        private Mode mode = null;
066        private String user;
067        private String cause;
068    
069        public MessageProcessingJob(Long messageId, Mode mode, String user, String cause) {
070            this();
071            this.messageId = messageId;
072            this.mode = mode;
073            this.user = user;
074            this.cause = cause;
075        }
076    
077    
078        public MessageProcessingJob() {
079            dao = GlobalKCBServiceLocator.getInstance().getKcbGenericDao();
080            registry = GlobalKCBServiceLocator.getInstance().getMessageDelivererRegistryService();
081            messageDeliveryService = GlobalKCBServiceLocator.getInstance().getMessageDeliveryService();
082            txManager = GlobalKCBServiceLocator.getInstance().getTransactionManager();
083        }
084    
085        /**
086         * Sets the {@link GenericDao}
087         * @param dao the {@link GenericDao}
088         */
089        @Required
090        public void setGenericDao(GenericDao dao) {
091            this.dao = dao;
092        }
093    
094        /**
095         * Sets the {@link MessageDelivererRegistryService}
096         * @param registry the {@link MessageDelivererRegistryService}
097         */
098        @Required
099        public void setMessageDelivererRegistry(MessageDelivererRegistryService registry) {
100            this.registry = registry;
101        }
102        
103        /**
104         * Sets the {@link MessageDeliveryService}
105         * @param messageDeliveryService the {@link MessageDeliveryService}
106         */
107        @Required
108        public void setMessageDeliveryService(MessageDeliveryService messageDeliveryService) {
109            this.messageDeliveryService = messageDeliveryService;
110        }
111    
112        @Override
113        protected Collection<MessageDelivery> takeAvailableWorkItems() {
114            MessageDeliveryStatus[] statuses;
115            switch (mode) {
116                case DELIVER: {
117                    statuses = new MessageDeliveryStatus[] { MessageDeliveryStatus.UNDELIVERED };
118                    break;
119                }
120                case REMOVE: {
121                    if (messageId == null) {
122                        throw new IllegalStateException("Message id must be specified for message removal mode");
123                    }
124                    statuses = new MessageDeliveryStatus[] { MessageDeliveryStatus.DELIVERED, MessageDeliveryStatus.UNDELIVERED };
125                    break;
126                }
127                default:
128                    throw new RuntimeException("Invalid mode: " + mode);
129            }
130            for (MessageDeliveryStatus status: statuses) {
131                LOG.debug("Taking message deliveries with status: " + status);
132            }
133            Collection<MessageDelivery> ds = messageDeliveryService.lockAndTakeMessageDeliveries(messageId, statuses);
134            LOG.debug("Took " + ds.size() + " deliveries");
135            for (MessageDelivery md: ds) {
136                LOG.debug(md);
137                md.setProcessCount(md.getProcessCount().intValue() + 1);
138            }
139            return ds;
140        }
141    
142        @Override
143        protected void unlockWorkItem(MessageDelivery item) {
144            item.setLockedDate(null);
145            dao.save(item);
146        }
147    
148        /**
149         * Group work items by deliverer and notification, so that deliveries to bulk deliverers are grouped
150         * by notification
151         * @see org.kuali.rice.ken.service.impl.ConcurrentJob#groupWorkItems(java.util.Collection)
152         */
153        @Override
154        protected Collection<Collection<MessageDelivery>> groupWorkItems(Collection<MessageDelivery> workItems, ProcessingResult<MessageDelivery> result) {
155            Collection<Collection<MessageDelivery>> groupedWorkItems = new ArrayList<Collection<MessageDelivery>>(workItems.size());
156    
157            Map<String, Collection<MessageDelivery>> bulkWorkUnits = new HashMap<String, Collection<MessageDelivery>>();
158            for (MessageDelivery messageDelivery: workItems) {
159                
160                MessageDeliverer deliverer = registry.getDeliverer(messageDelivery);
161                if (deliverer == null) {
162                    LOG.error("Error obtaining message deliverer for message delivery: " + messageDelivery);
163                    result.addFailure(new Failure<MessageDelivery>(messageDelivery, "Error obtaining message deliverer for message delivery"));
164                    unlockWorkItemAtomically(messageDelivery);
165                    continue;
166                }
167    
168                if (deliverer instanceof BulkMessageDeliverer) {
169                    // group by bulk-deliverer+message combo
170                    String key = messageDelivery.getDelivererTypeName() + ":" + messageDelivery.getMessage().getId();
171                    Collection<MessageDelivery> workUnit = bulkWorkUnits.get(key);
172                    if (workUnit == null) {
173                        workUnit = new LinkedList<MessageDelivery>();
174                        bulkWorkUnits.put(key, workUnit);
175                    }
176                    workUnit.add(messageDelivery);
177                } else {
178                    ArrayList<MessageDelivery> l = new ArrayList<MessageDelivery>(1);
179                    l.add(messageDelivery);
180                    groupedWorkItems.add(l);
181                }
182            }
183    
184            return groupedWorkItems;
185        }
186        
187        
188        @Override
189        protected Collection<MessageDelivery> processWorkItems(Collection<MessageDelivery> messageDeliveries) {
190            MessageDelivery firstMessageDelivery = messageDeliveries.iterator().next();
191            // get our hands on the appropriate MessageDeliverer instance
192            MessageDeliverer messageDeliverer = registry.getDeliverer(firstMessageDelivery);
193            if (messageDeliverer == null) {
194                throw new RuntimeException("Message deliverer could not be obtained");
195            }
196        
197            if (messageDeliveries.size() > 1) {
198                // this is a bulk deliverer, so we need to batch the MessageDeliveries
199                if (!(messageDeliverer instanceof BulkMessageDeliverer)) {
200                    throw new RuntimeException("Discrepency in dispatch service: deliverer for list of message deliveries is not a BulkMessageDeliverer");
201                }
202                return bulkProcess((BulkMessageDeliverer) messageDeliverer, messageDeliveries, mode);
203            } else {
204                return process(messageDeliverer, firstMessageDelivery, mode);
205            }
206        }
207    
208        /**
209         * Implements delivery of a single MessageDelivery
210         * @param deliverer the deliverer
211         * @param messageDelivery the delivery
212         * @return collection of strings indicating successful deliveries
213         */
214        protected Collection<MessageDelivery> process(MessageDeliverer messageDeliverer, MessageDelivery messageDelivery, Mode mode) {
215            // we have our message deliverer, so tell it to deliver the message
216            try {
217                if (mode == Mode.DELIVER) {
218                    messageDeliverer.deliver(messageDelivery);
219                    // if processing was successful, set the count back to zero
220                    messageDelivery.setProcessCount(Integer.valueOf(0));
221                    // by definition we have succeeded at this point if no exception was thrown by the messageDeliverer
222                    // so update the status of the delivery message instance to DELIVERED (and unmark as taken)
223                    // and persist
224                    updateStatusAndUnlock(messageDelivery, mode == Mode.DELIVER ? MessageDeliveryStatus.DELIVERED : MessageDeliveryStatus.REMOVED);
225                } else {
226                    messageDeliverer.dismiss(messageDelivery, user, cause);
227                    // don't need to set the processing count down to zero because we are just deleting the record entirely
228                    messageDeliveryService.deleteMessageDelivery(messageDelivery);
229                }
230            } catch (MessageDeliveryProcessingException nmde) {
231                LOG.error("Error processing message delivery " + messageDelivery, nmde);
232                throw new RuntimeException(nmde);
233            }
234    
235            LOG.debug("Message delivery '" + messageDelivery.getId() + "' for message '" + messageDelivery.getMessage().getId() + "' was successfully processed.");
236            //PerformanceLog.logDuration("Time to dispatch notification delivery for notification " + messageDelivery.getMessage().getId(), System.currentTimeMillis() - messageDelivery.getNotification().getSendDateTime().getTime());
237    
238            List<MessageDelivery> success = new ArrayList<MessageDelivery>(1);
239            success.add(messageDelivery);
240            return success;
241        }
242    
243        /**
244         * Implements bulk delivery of a collection of {@link MessageDelivery}s
245         * @param deliverer the deliverer
246         * @param messageDeliveries the deliveries
247         * @return collection of strings indicating successful deliveries
248         */
249        protected Collection<MessageDelivery> bulkProcess(BulkMessageDeliverer messageDeliverer, Collection<MessageDelivery> messageDeliveries,  Mode mode) {
250            MessageDeliveryStatus targetStatus = (mode == Mode.DELIVER ? MessageDeliveryStatus.DELIVERED : MessageDeliveryStatus.REMOVED);
251            // we have our message deliverer, so tell it to deliver the message
252            try {
253                if (mode == Mode.DELIVER) {
254                    messageDeliverer.bulkDeliver(messageDeliveries);
255                } else {
256                    messageDeliverer.bulkDismiss(messageDeliveries);
257                }
258            } catch (MessageDeliveryProcessingException nmde) {
259                LOG.error("Error bulk-delivering messages " + messageDeliveries, nmde);
260                throw new RuntimeException(nmde);
261            }
262    
263            // by definition we have succeeded at this point if no exception was thrown by the messageDeliverer
264            // so update the status of the delivery message instance to DELIVERED (and unmark as taken)
265            // and persist
266            List<MessageDelivery> successes = new ArrayList<MessageDelivery>(messageDeliveries.size());
267            for (MessageDelivery nmd: messageDeliveries) {
268                successes.add(nmd);
269                LOG.debug("Message delivery '" + nmd.getId() + "' for notification '" + nmd.getMessage().getId() + "' was successfully delivered.");
270                //PerformanceLog.logDuration("Time to dispatch notification delivery for notification " + nmd.getMessage().getId(), System.currentTimeMillis() - nmd.getNotification().getSendDateTime().getTime());
271                if (mode == Mode.REMOVE) {
272                    messageDeliveryService.deleteMessageDelivery(nmd);
273                } else {
274                    nmd.setProcessCount(0);
275                    updateStatusAndUnlock(nmd, targetStatus);                
276                }
277            }
278            
279            return successes;
280        }
281    
282        @Override
283        protected void finishProcessing(ProcessingResult<MessageDelivery> result) {
284            LOG.debug("Message processing job: " + result.getSuccesses().size() + " processed, " + result.getFailures().size() + " failures");
285            Set<Long> messageIds = new HashSet<Long>(result.getSuccesses().size());
286            for (MessageDelivery md: result.getSuccesses()) {
287                messageIds.add(md.getMessage().getId());
288            }
289            MessageService ms = GlobalKCBServiceLocator.getInstance().getMessageService();
290            for (Long id: messageIds) {
291                LOG.debug("Finishing processing message " + id);
292                //if (Mode.REMOVE == mode) {
293                
294                Message m = ms.getMessage(id);
295                
296                Collection<MessageDelivery> c = messageDeliveryService.getMessageDeliveries(m);
297                if (c.size() == 0) {
298                    LOG.debug("Deleting message " + m);
299                    ms.deleteMessage(m);
300                } else {
301                    LOG.debug("Message " + m.getId() + " has " + c.size() + " deliveries");
302                    for (MessageDelivery md: c) {
303                        LOG.debug(md);
304                    }
305                }
306            }
307        }
308    
309        /**
310         * Marks a MessageDelivery as having been delivered, and unlocks it
311         * @param messageDelivery the messageDelivery instance to mark
312         */
313        protected void updateStatusAndUnlock(MessageDelivery messageDelivery, MessageDeliveryStatus status) {
314            messageDelivery.setDeliveryStatus(status);
315            // mark as unlocked
316            messageDelivery.setLockedDate(null);
317            dao.save(messageDelivery);
318        }
319    
320        @Override
321        public ProcessingResult<MessageDelivery> run() {
322            LOG.debug("MessageProcessingJob running in Thread " + Thread.currentThread() + ": " + mode + " " + user + " " + cause);
323            return super.run();
324        }
325    
326        public void execute(JobExecutionContext context) throws JobExecutionException {
327            String mode = context.getMergedJobDataMap().getString("mode");
328            if (mode != null) {
329                this.mode = Mode.valueOf(mode);
330            } else {
331                this.mode = Mode.DELIVER;
332            }
333            this.user = context.getMergedJobDataMap().getString("user");
334            this.cause = context.getMergedJobDataMap().getString("cause");
335            if (context.getMergedJobDataMap().containsKey("messageId")) {
336                this.messageId = context.getMergedJobDataMap().getLong("messageId");
337            }
338            LOG.debug("==== message processing job: " + this.mode + " message id: " + this.messageId + "====");
339            super.run();
340        }
341    }