001    /**
002     * Copyright 2005-2014 The Kuali Foundation
003     *
004     * Licensed under the Educational Community License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.opensource.org/licenses/ecl2.php
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package org.kuali.rice.kcb.quartz;
017    
018    import org.apache.log4j.Logger;
019    import org.kuali.rice.core.framework.persistence.dao.GenericDao;
020    import org.kuali.rice.kcb.bo.Message;
021    import org.kuali.rice.kcb.bo.MessageDelivery;
022    import org.kuali.rice.kcb.bo.MessageDeliveryStatus;
023    import org.kuali.rice.kcb.deliverer.BulkMessageDeliverer;
024    import org.kuali.rice.kcb.deliverer.MessageDeliverer;
025    import org.kuali.rice.kcb.api.exception.MessageDeliveryProcessingException;
026    import org.kuali.rice.kcb.quartz.ProcessingResult.Failure;
027    import org.kuali.rice.kcb.service.GlobalKCBServiceLocator;
028    import org.kuali.rice.kcb.service.MessageDelivererRegistryService;
029    import org.kuali.rice.kcb.service.MessageDeliveryService;
030    import org.kuali.rice.kcb.service.MessageService;
031    import org.kuali.rice.krad.data.DataObjectService;
032    import org.kuali.rice.krad.service.KRADServiceLocator;
033    import org.quartz.JobExecutionContext;
034    import org.quartz.JobExecutionException;
035    import org.quartz.StatefulJob;
036    import org.springframework.beans.factory.annotation.Required;
037    
038    import java.util.ArrayList;
039    import java.util.Collection;
040    import java.util.HashMap;
041    import java.util.HashSet;
042    import java.util.LinkedList;
043    import java.util.List;
044    import java.util.Map;
045    import java.util.Set;
046    
047    /**
048     * Job that delivers messages to endpoints.  This job is not really stateful,
049     * but should not be executed concurrently.
050     * 
051     * @author Kuali Rice Team (rice.collab@kuali.org)
052     */
053    public class MessageProcessingJob extends ConcurrentJob<MessageDelivery> implements StatefulJob {
054        public static final String NAME = "MessageProcessingJobDetail";
055        public static final String GROUP = "KCB-Delivery";
056    
057        public static enum Mode {
058            DELIVER, REMOVE
059        }
060    
061        private static final Logger LOG = Logger.getLogger(MessageProcessingJob.class);
062        
063        private DataObjectService dataObjectService;
064        private MessageDelivererRegistryService registry;
065        private MessageDeliveryService messageDeliveryService;
066        private Long messageId;
067        private Mode mode = null;
068        private String user;
069        private String cause;
070    
071        public MessageProcessingJob(Long messageId, Mode mode, String user, String cause) {
072            this();
073            this.messageId = messageId;
074            this.mode = mode;
075            this.user = user;
076            this.cause = cause;
077        }
078    
079    
080        public MessageProcessingJob() {
081            registry = GlobalKCBServiceLocator.getInstance().getMessageDelivererRegistryService();
082            messageDeliveryService = GlobalKCBServiceLocator.getInstance().getMessageDeliveryService();
083            txManager = GlobalKCBServiceLocator.getInstance().getTransactionManager();
084            dataObjectService = KRADServiceLocator.getDataObjectService();
085        }
086    
087        /**
088         * Sets the {@link DataObjectService}
089         * @param dataObjectService the {@link DataObjectService}
090         */
091        public void setDataObjectService(DataObjectService dataObjectService) {
092            this.dataObjectService = dataObjectService;
093        }
094    
095        /**
096         * Sets the {@link MessageDelivererRegistryService}
097         * @param registry the {@link MessageDelivererRegistryService}
098         */
099        @Required
100        public void setMessageDelivererRegistry(MessageDelivererRegistryService registry) {
101            this.registry = registry;
102        }
103        
104        /**
105         * Sets the {@link MessageDeliveryService}
106         * @param messageDeliveryService the {@link MessageDeliveryService}
107         */
108        @Required
109        public void setMessageDeliveryService(MessageDeliveryService messageDeliveryService) {
110            this.messageDeliveryService = messageDeliveryService;
111        }
112    
113        @Override
114        protected Collection<MessageDelivery> takeAvailableWorkItems() {
115            MessageDeliveryStatus[] statuses;
116            switch (mode) {
117                case DELIVER: {
118                    statuses = new MessageDeliveryStatus[] { MessageDeliveryStatus.UNDELIVERED };
119                    break;
120                }
121                case REMOVE: {
122                    if (messageId == null) {
123                        throw new IllegalStateException("Message id must be specified for message removal mode");
124                    }
125                    statuses = new MessageDeliveryStatus[] { MessageDeliveryStatus.DELIVERED, MessageDeliveryStatus.UNDELIVERED };
126                    break;
127                }
128                default:
129                    throw new RuntimeException("Invalid mode: " + mode);
130            }
131            for (MessageDeliveryStatus status: statuses) {
132                LOG.debug("Taking message deliveries with status: " + status);
133            }
134            Collection<MessageDelivery> ds = messageDeliveryService.lockAndTakeMessageDeliveries(messageId, statuses);
135            LOG.debug("Took " + ds.size() + " deliveries");
136            for (MessageDelivery md: ds) {
137                LOG.debug(md);
138                md.setProcessCount(md.getProcessCount().intValue() + 1);
139            }
140            return ds;
141        }
142    
143        @Override
144        protected void unlockWorkItem(MessageDelivery item) {
145            item.setLockedDate(null);
146            dataObjectService.save(item);
147        }
148    
149        /**
150         * Group work items by deliverer and notification, so that deliveries to bulk deliverers are grouped
151         * by notification
152         * @see org.kuali.rice.ken.service.impl.ConcurrentJob#groupWorkItems(java.util.Collection)
153         */
154        @Override
155        protected Collection<Collection<MessageDelivery>> groupWorkItems(Collection<MessageDelivery> workItems, ProcessingResult<MessageDelivery> result) {
156            Collection<Collection<MessageDelivery>> groupedWorkItems = new ArrayList<Collection<MessageDelivery>>(workItems.size());
157    
158            Map<String, Collection<MessageDelivery>> bulkWorkUnits = new HashMap<String, Collection<MessageDelivery>>();
159            for (MessageDelivery messageDelivery: workItems) {
160                
161                MessageDeliverer deliverer = registry.getDeliverer(messageDelivery);
162                if (deliverer == null) {
163                    LOG.error("Error obtaining message deliverer for message delivery: " + messageDelivery);
164                    result.addFailure(new Failure<MessageDelivery>(messageDelivery, "Error obtaining message deliverer for message delivery"));
165                    unlockWorkItemAtomically(messageDelivery);
166                    continue;
167                }
168    
169                if (deliverer instanceof BulkMessageDeliverer) {
170                    // group by bulk-deliverer+message combo
171                    String key = messageDelivery.getDelivererTypeName() + ":" + messageDelivery.getMessage().getId();
172                    Collection<MessageDelivery> workUnit = bulkWorkUnits.get(key);
173                    if (workUnit == null) {
174                        workUnit = new LinkedList<MessageDelivery>();
175                        bulkWorkUnits.put(key, workUnit);
176                    }
177                    workUnit.add(messageDelivery);
178                } else {
179                    ArrayList<MessageDelivery> l = new ArrayList<MessageDelivery>(1);
180                    l.add(messageDelivery);
181                    groupedWorkItems.add(l);
182                }
183            }
184    
185            return groupedWorkItems;
186        }
187        
188        
189        @Override
190        protected Collection<MessageDelivery> processWorkItems(Collection<MessageDelivery> messageDeliveries) {
191            MessageDelivery firstMessageDelivery = messageDeliveries.iterator().next();
192            // get our hands on the appropriate MessageDeliverer instance
193            MessageDeliverer messageDeliverer = registry.getDeliverer(firstMessageDelivery);
194            if (messageDeliverer == null) {
195                throw new RuntimeException("Message deliverer could not be obtained");
196            }
197        
198            if (messageDeliveries.size() > 1) {
199                // this is a bulk deliverer, so we need to batch the MessageDeliveries
200                if (!(messageDeliverer instanceof BulkMessageDeliverer)) {
201                    throw new RuntimeException("Discrepency in dispatch service: deliverer for list of message deliveries is not a BulkMessageDeliverer");
202                }
203                return bulkProcess((BulkMessageDeliverer) messageDeliverer, messageDeliveries, mode);
204            } else {
205                return process(messageDeliverer, firstMessageDelivery, mode);
206            }
207        }
208    
209        /**
210         * Implements delivery of a single MessageDelivery
211         * @param messageDeliverer the deliverer
212         * @param messageDelivery the delivery
213         * @return collection of strings indicating successful deliveries
214         */
215        protected Collection<MessageDelivery> process(MessageDeliverer messageDeliverer, MessageDelivery messageDelivery, Mode mode) {
216            // we have our message deliverer, so tell it to deliver the message
217            try {
218                if (mode == Mode.DELIVER) {
219                    messageDeliverer.deliver(messageDelivery);
220                    // if processing was successful, set the count back to zero
221                    messageDelivery.setProcessCount(Integer.valueOf(0));
222                    // by definition we have succeeded at this point if no exception was thrown by the messageDeliverer
223                    // so update the status of the delivery message instance to DELIVERED (and unmark as taken)
224                    // and persist
225                    updateStatusAndUnlock(messageDelivery, mode == Mode.DELIVER ? MessageDeliveryStatus.DELIVERED : MessageDeliveryStatus.REMOVED);
226                } else {
227                    messageDeliverer.dismiss(messageDelivery, user, cause);
228                    // don't need to set the processing count down to zero because we are just deleting the record entirely
229                    messageDeliveryService.deleteMessageDelivery(messageDelivery);
230                }
231            } catch (MessageDeliveryProcessingException nmde) {
232                LOG.error("Error processing message delivery " + messageDelivery, nmde);
233                throw new RuntimeException(nmde);
234            }
235    
236            LOG.debug("Message delivery '" + messageDelivery.getId() + "' for message '" + messageDelivery.getMessage().getId() + "' was successfully processed.");
237            //PerformanceLog.logDuration("Time to dispatch notification delivery for notification " + messageDelivery.getMessage().getId(), System.currentTimeMillis() - messageDelivery.getNotification().getSendDateTime().getTime());
238    
239            List<MessageDelivery> success = new ArrayList<MessageDelivery>(1);
240            success.add(messageDelivery);
241            return success;
242        }
243    
244        /**
245         * Implements bulk delivery of a collection of {@link MessageDelivery}s
246         * @param messageDeliverer the deliverer
247         * @param messageDeliveries the deliveries
248         * @return collection of strings indicating successful deliveries
249         */
250        protected Collection<MessageDelivery> bulkProcess(BulkMessageDeliverer messageDeliverer, Collection<MessageDelivery> messageDeliveries,  Mode mode) {
251            MessageDeliveryStatus targetStatus = (mode == Mode.DELIVER ? MessageDeliveryStatus.DELIVERED : MessageDeliveryStatus.REMOVED);
252            // we have our message deliverer, so tell it to deliver the message
253            try {
254                if (mode == Mode.DELIVER) {
255                    messageDeliverer.bulkDeliver(messageDeliveries);
256                } else {
257                    messageDeliverer.bulkDismiss(messageDeliveries);
258                }
259            } catch (MessageDeliveryProcessingException nmde) {
260                LOG.error("Error bulk-delivering messages " + messageDeliveries, nmde);
261                throw new RuntimeException(nmde);
262            }
263    
264            // by definition we have succeeded at this point if no exception was thrown by the messageDeliverer
265            // so update the status of the delivery message instance to DELIVERED (and unmark as taken)
266            // and persist
267            List<MessageDelivery> successes = new ArrayList<MessageDelivery>(messageDeliveries.size());
268            for (MessageDelivery nmd: messageDeliveries) {
269                successes.add(nmd);
270                LOG.debug("Message delivery '" + nmd.getId() + "' for notification '" + nmd.getMessage().getId() + "' was successfully delivered.");
271                //PerformanceLog.logDuration("Time to dispatch notification delivery for notification " + nmd.getMessage().getId(), System.currentTimeMillis() - nmd.getNotification().getSendDateTime().getTime());
272                if (mode == Mode.REMOVE) {
273                    messageDeliveryService.deleteMessageDelivery(nmd);
274                } else {
275                    nmd.setProcessCount(0);
276                    updateStatusAndUnlock(nmd, targetStatus);                
277                }
278            }
279            
280            return successes;
281        }
282    
283        @Override
284        protected void finishProcessing(ProcessingResult<MessageDelivery> result) {
285            LOG.debug("Message processing job: " + result.getSuccesses().size() + " processed, " + result.getFailures().size() + " failures");
286            Set<Long> messageIds = new HashSet<Long>(result.getSuccesses().size());
287            for (MessageDelivery md: result.getSuccesses()) {
288                messageIds.add(md.getMessage().getId());
289            }
290            MessageService ms = GlobalKCBServiceLocator.getInstance().getMessageService();
291            for (Long id: messageIds) {
292                LOG.debug("Finishing processing message " + id);
293                //if (Mode.REMOVE == mode) {
294                
295                Message m = ms.getMessage(id);
296                
297                Collection<MessageDelivery> c = messageDeliveryService.getMessageDeliveries(m);
298                if (c.size() == 0) {
299                    LOG.debug("Deleting message " + m);
300                    ms.deleteMessage(m);
301                } else {
302                    LOG.debug("Message " + m.getId() + " has " + c.size() + " deliveries");
303                    for (MessageDelivery md: c) {
304                        LOG.debug(md);
305                    }
306                }
307            }
308        }
309    
310        /**
311         * Marks a MessageDelivery as having been delivered, and unlocks it
312         * @param messageDelivery the messageDelivery instance to mark
313         */
314        protected void updateStatusAndUnlock(MessageDelivery messageDelivery, MessageDeliveryStatus status) {
315            messageDelivery.setDeliveryStatus(status);
316            // mark as unlocked
317            messageDelivery.setLockedDate(null);
318            dataObjectService.save(messageDelivery);
319        }
320    
321        @Override
322        public ProcessingResult<MessageDelivery> run() {
323            LOG.debug("MessageProcessingJob running in Thread " + Thread.currentThread() + ": " + mode + " " + user + " " + cause);
324            return super.run();
325        }
326    
327        public void execute(JobExecutionContext context) throws JobExecutionException {
328            String mode = context.getMergedJobDataMap().getString("mode");
329            if (mode != null) {
330                this.mode = Mode.valueOf(mode);
331            } else {
332                this.mode = Mode.DELIVER;
333            }
334            this.user = context.getMergedJobDataMap().getString("user");
335            this.cause = context.getMergedJobDataMap().getString("cause");
336            if (context.getMergedJobDataMap().containsKey("messageId")) {
337                this.messageId = context.getMergedJobDataMap().getLong("messageId");
338            }
339            LOG.debug("==== message processing job: " + this.mode + " message id: " + this.messageId + "====");
340            super.run();
341        }
342    }