001/**
002 * Copyright 2005-2016 The Kuali Foundation
003 *
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/ecl2.php
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.kuali.rice.kcb.quartz;
017
018import org.apache.log4j.Logger;
019import org.kuali.rice.core.framework.persistence.dao.GenericDao;
020import org.kuali.rice.kcb.bo.Message;
021import org.kuali.rice.kcb.bo.MessageDelivery;
022import org.kuali.rice.kcb.bo.MessageDeliveryStatus;
023import org.kuali.rice.kcb.deliverer.BulkMessageDeliverer;
024import org.kuali.rice.kcb.deliverer.MessageDeliverer;
025import org.kuali.rice.kcb.api.exception.MessageDeliveryProcessingException;
026import org.kuali.rice.kcb.quartz.ProcessingResult.Failure;
027import org.kuali.rice.kcb.service.GlobalKCBServiceLocator;
028import org.kuali.rice.kcb.service.MessageDelivererRegistryService;
029import org.kuali.rice.kcb.service.MessageDeliveryService;
030import org.kuali.rice.kcb.service.MessageService;
031import org.quartz.JobExecutionContext;
032import org.quartz.JobExecutionException;
033import org.quartz.StatefulJob;
034import org.springframework.beans.factory.annotation.Required;
035
036import java.util.ArrayList;
037import java.util.Collection;
038import java.util.HashMap;
039import java.util.HashSet;
040import java.util.LinkedList;
041import java.util.List;
042import java.util.Map;
043import java.util.Set;
044
045/**
046 * Job that delivers messages to endpoints.  This job is not really stateful,
047 * but should not be executed concurrently.
048 * 
049 * @author Kuali Rice Team (rice.collab@kuali.org)
050 */
051public class MessageProcessingJob extends ConcurrentJob<MessageDelivery> implements StatefulJob {
052    public static final String NAME = "MessageProcessingJobDetail";
053    public static final String GROUP = "KCB-Delivery";
054
055    public static enum Mode {
056        DELIVER, REMOVE
057    }
058
059    private static final Logger LOG = Logger.getLogger(MessageProcessingJob.class);
060    
061    private GenericDao dao;
062    private MessageDelivererRegistryService registry;
063    private MessageDeliveryService messageDeliveryService;
064    private Long messageId;
065    private Mode mode = null;
066    private String user;
067    private String cause;
068
069    public MessageProcessingJob(Long messageId, Mode mode, String user, String cause) {
070        this();
071        this.messageId = messageId;
072        this.mode = mode;
073        this.user = user;
074        this.cause = cause;
075    }
076
077
078    public MessageProcessingJob() {
079        dao = GlobalKCBServiceLocator.getInstance().getKcbGenericDao();
080        registry = GlobalKCBServiceLocator.getInstance().getMessageDelivererRegistryService();
081        messageDeliveryService = GlobalKCBServiceLocator.getInstance().getMessageDeliveryService();
082        txManager = GlobalKCBServiceLocator.getInstance().getTransactionManager();
083    }
084
085    /**
086     * Sets the {@link GenericDao}
087     * @param dao the {@link GenericDao}
088     */
089    @Required
090    public void setGenericDao(GenericDao dao) {
091        this.dao = dao;
092    }
093
094    /**
095     * Sets the {@link MessageDelivererRegistryService}
096     * @param registry the {@link MessageDelivererRegistryService}
097     */
098    @Required
099    public void setMessageDelivererRegistry(MessageDelivererRegistryService registry) {
100        this.registry = registry;
101    }
102    
103    /**
104     * Sets the {@link MessageDeliveryService}
105     * @param messageDeliveryService the {@link MessageDeliveryService}
106     */
107    @Required
108    public void setMessageDeliveryService(MessageDeliveryService messageDeliveryService) {
109        this.messageDeliveryService = messageDeliveryService;
110    }
111
112    @Override
113    protected Collection<MessageDelivery> takeAvailableWorkItems() {
114        MessageDeliveryStatus[] statuses;
115        switch (mode) {
116            case DELIVER: {
117                statuses = new MessageDeliveryStatus[] { MessageDeliveryStatus.UNDELIVERED };
118                break;
119            }
120            case REMOVE: {
121                if (messageId == null) {
122                    throw new IllegalStateException("Message id must be specified for message removal mode");
123                }
124                statuses = new MessageDeliveryStatus[] { MessageDeliveryStatus.DELIVERED, MessageDeliveryStatus.UNDELIVERED };
125                break;
126            }
127            default:
128                throw new RuntimeException("Invalid mode: " + mode);
129        }
130        for (MessageDeliveryStatus status: statuses) {
131            LOG.debug("Taking message deliveries with status: " + status);
132        }
133        Collection<MessageDelivery> ds = messageDeliveryService.lockAndTakeMessageDeliveries(messageId, statuses);
134        LOG.debug("Took " + ds.size() + " deliveries");
135        for (MessageDelivery md: ds) {
136            LOG.debug(md);
137            md.setProcessCount(md.getProcessCount().intValue() + 1);
138        }
139        return ds;
140    }
141
142    @Override
143    protected void unlockWorkItem(MessageDelivery item) {
144        item.setLockedDate(null);
145        dao.save(item);
146    }
147
148    /**
149     * Group work items by deliverer and notification, so that deliveries to bulk deliverers are grouped
150     * by notification
151     * @see org.kuali.rice.ken.service.impl.ConcurrentJob#groupWorkItems(java.util.Collection)
152     */
153    @Override
154    protected Collection<Collection<MessageDelivery>> groupWorkItems(Collection<MessageDelivery> workItems, ProcessingResult<MessageDelivery> result) {
155        Collection<Collection<MessageDelivery>> groupedWorkItems = new ArrayList<Collection<MessageDelivery>>(workItems.size());
156
157        Map<String, Collection<MessageDelivery>> bulkWorkUnits = new HashMap<String, Collection<MessageDelivery>>();
158        for (MessageDelivery messageDelivery: workItems) {
159            
160            MessageDeliverer deliverer = registry.getDeliverer(messageDelivery);
161            if (deliverer == null) {
162                LOG.error("Error obtaining message deliverer for message delivery: " + messageDelivery);
163                result.addFailure(new Failure<MessageDelivery>(messageDelivery, "Error obtaining message deliverer for message delivery"));
164                unlockWorkItemAtomically(messageDelivery);
165                continue;
166            }
167
168            if (deliverer instanceof BulkMessageDeliverer) {
169                // group by bulk-deliverer+message combo
170                String key = messageDelivery.getDelivererTypeName() + ":" + messageDelivery.getMessage().getId();
171                Collection<MessageDelivery> workUnit = bulkWorkUnits.get(key);
172                if (workUnit == null) {
173                    workUnit = new LinkedList<MessageDelivery>();
174                    bulkWorkUnits.put(key, workUnit);
175                }
176                workUnit.add(messageDelivery);
177            } else {
178                ArrayList<MessageDelivery> l = new ArrayList<MessageDelivery>(1);
179                l.add(messageDelivery);
180                groupedWorkItems.add(l);
181            }
182        }
183
184        return groupedWorkItems;
185    }
186    
187    
188    @Override
189    protected Collection<MessageDelivery> processWorkItems(Collection<MessageDelivery> messageDeliveries) {
190        MessageDelivery firstMessageDelivery = messageDeliveries.iterator().next();
191        // get our hands on the appropriate MessageDeliverer instance
192        MessageDeliverer messageDeliverer = registry.getDeliverer(firstMessageDelivery);
193        if (messageDeliverer == null) {
194            throw new RuntimeException("Message deliverer could not be obtained");
195        }
196    
197        if (messageDeliveries.size() > 1) {
198            // this is a bulk deliverer, so we need to batch the MessageDeliveries
199            if (!(messageDeliverer instanceof BulkMessageDeliverer)) {
200                throw new RuntimeException("Discrepency in dispatch service: deliverer for list of message deliveries is not a BulkMessageDeliverer");
201            }
202            return bulkProcess((BulkMessageDeliverer) messageDeliverer, messageDeliveries, mode);
203        } else {
204            return process(messageDeliverer, firstMessageDelivery, mode);
205        }
206    }
207
208    /**
209     * Implements delivery of a single MessageDelivery
210     * @param deliverer the deliverer
211     * @param messageDelivery the delivery
212     * @return collection of strings indicating successful deliveries
213     */
214    protected Collection<MessageDelivery> process(MessageDeliverer messageDeliverer, MessageDelivery messageDelivery, Mode mode) {
215        // we have our message deliverer, so tell it to deliver the message
216        try {
217            if (mode == Mode.DELIVER) {
218                messageDeliverer.deliver(messageDelivery);
219                // if processing was successful, set the count back to zero
220                messageDelivery.setProcessCount(Integer.valueOf(0));
221                // by definition we have succeeded at this point if no exception was thrown by the messageDeliverer
222                // so update the status of the delivery message instance to DELIVERED (and unmark as taken)
223                // and persist
224                updateStatusAndUnlock(messageDelivery, mode == Mode.DELIVER ? MessageDeliveryStatus.DELIVERED : MessageDeliveryStatus.REMOVED);
225            } else {
226                messageDeliverer.dismiss(messageDelivery, user, cause);
227                // don't need to set the processing count down to zero because we are just deleting the record entirely
228                messageDeliveryService.deleteMessageDelivery(messageDelivery);
229            }
230        } catch (MessageDeliveryProcessingException nmde) {
231            LOG.error("Error processing message delivery " + messageDelivery, nmde);
232            throw new RuntimeException(nmde);
233        }
234
235        LOG.debug("Message delivery '" + messageDelivery.getId() + "' for message '" + messageDelivery.getMessage().getId() + "' was successfully processed.");
236        //PerformanceLog.logDuration("Time to dispatch notification delivery for notification " + messageDelivery.getMessage().getId(), System.currentTimeMillis() - messageDelivery.getNotification().getSendDateTime().getTime());
237
238        List<MessageDelivery> success = new ArrayList<MessageDelivery>(1);
239        success.add(messageDelivery);
240        return success;
241    }
242
243    /**
244     * Implements bulk delivery of a collection of {@link MessageDelivery}s
245     * @param deliverer the deliverer
246     * @param messageDeliveries the deliveries
247     * @return collection of strings indicating successful deliveries
248     */
249    protected Collection<MessageDelivery> bulkProcess(BulkMessageDeliverer messageDeliverer, Collection<MessageDelivery> messageDeliveries,  Mode mode) {
250        MessageDeliveryStatus targetStatus = (mode == Mode.DELIVER ? MessageDeliveryStatus.DELIVERED : MessageDeliveryStatus.REMOVED);
251        // we have our message deliverer, so tell it to deliver the message
252        try {
253            if (mode == Mode.DELIVER) {
254                messageDeliverer.bulkDeliver(messageDeliveries);
255            } else {
256                messageDeliverer.bulkDismiss(messageDeliveries);
257            }
258        } catch (MessageDeliveryProcessingException nmde) {
259            LOG.error("Error bulk-delivering messages " + messageDeliveries, nmde);
260            throw new RuntimeException(nmde);
261        }
262
263        // by definition we have succeeded at this point if no exception was thrown by the messageDeliverer
264        // so update the status of the delivery message instance to DELIVERED (and unmark as taken)
265        // and persist
266        List<MessageDelivery> successes = new ArrayList<MessageDelivery>(messageDeliveries.size());
267        for (MessageDelivery nmd: messageDeliveries) {
268            successes.add(nmd);
269            LOG.debug("Message delivery '" + nmd.getId() + "' for notification '" + nmd.getMessage().getId() + "' was successfully delivered.");
270            //PerformanceLog.logDuration("Time to dispatch notification delivery for notification " + nmd.getMessage().getId(), System.currentTimeMillis() - nmd.getNotification().getSendDateTime().getTime());
271            if (mode == Mode.REMOVE) {
272                messageDeliveryService.deleteMessageDelivery(nmd);
273            } else {
274                nmd.setProcessCount(0);
275                updateStatusAndUnlock(nmd, targetStatus);                
276            }
277        }
278        
279        return successes;
280    }
281
282    @Override
283    protected void finishProcessing(ProcessingResult<MessageDelivery> result) {
284        LOG.debug("Message processing job: " + result.getSuccesses().size() + " processed, " + result.getFailures().size() + " failures");
285        Set<Long> messageIds = new HashSet<Long>(result.getSuccesses().size());
286        for (MessageDelivery md: result.getSuccesses()) {
287            messageIds.add(md.getMessage().getId());
288        }
289        MessageService ms = GlobalKCBServiceLocator.getInstance().getMessageService();
290        for (Long id: messageIds) {
291            LOG.debug("Finishing processing message " + id);
292            //if (Mode.REMOVE == mode) {
293            
294            Message m = ms.getMessage(id);
295            
296            Collection<MessageDelivery> c = messageDeliveryService.getMessageDeliveries(m);
297            if (c.size() == 0) {
298                LOG.debug("Deleting message " + m);
299                ms.deleteMessage(m);
300            } else {
301                LOG.debug("Message " + m.getId() + " has " + c.size() + " deliveries");
302                for (MessageDelivery md: c) {
303                    LOG.debug(md);
304                }
305            }
306        }
307    }
308
309    /**
310     * Marks a MessageDelivery as having been delivered, and unlocks it
311     * @param messageDelivery the messageDelivery instance to mark
312     */
313    protected void updateStatusAndUnlock(MessageDelivery messageDelivery, MessageDeliveryStatus status) {
314        messageDelivery.setDeliveryStatus(status);
315        // mark as unlocked
316        messageDelivery.setLockedDate(null);
317        dao.save(messageDelivery);
318    }
319
320    @Override
321    public ProcessingResult<MessageDelivery> run() {
322        LOG.debug("MessageProcessingJob running in Thread " + Thread.currentThread() + ": " + mode + " " + user + " " + cause);
323        return super.run();
324    }
325
326    public void execute(JobExecutionContext context) throws JobExecutionException {
327        String mode = context.getMergedJobDataMap().getString("mode");
328        if (mode != null) {
329            this.mode = Mode.valueOf(mode);
330        } else {
331            this.mode = Mode.DELIVER;
332        }
333        this.user = context.getMergedJobDataMap().getString("user");
334        this.cause = context.getMergedJobDataMap().getString("cause");
335        if (context.getMergedJobDataMap().containsKey("messageId")) {
336            this.messageId = context.getMergedJobDataMap().getLong("messageId");
337        }
338        LOG.debug("==== message processing job: " + this.mode + " message id: " + this.messageId + "====");
339        super.run();
340    }
341}