Coverage Report - org.kuali.rice.kcb.quartz.MessageProcessingJob
 
Classes in this File Line Coverage Branch Coverage Complexity
MessageProcessingJob
0%
0/136
0%
0/47
3.2
MessageProcessingJob$1
0%
0/1
N/A
3.2
MessageProcessingJob$Mode
0%
0/2
N/A
3.2
 
 1  
 /*
 2  
  * Copyright 2006-2011 The Kuali Foundation
 3  
  *
 4  
  * Licensed under the Educational Community License, Version 2.0 (the "License");
 5  
  * you may not use this file except in compliance with the License.
 6  
  * You may obtain a copy of the License at
 7  
  *
 8  
  * http://www.opensource.org/licenses/ecl2.php
 9  
  *
 10  
  * Unless required by applicable law or agreed to in writing, software
 11  
  * distributed under the License is distributed on an "AS IS" BASIS,
 12  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  
  * See the License for the specific language governing permissions and
 14  
  * limitations under the License.
 15  
  */
 16  
 
 17  
 package org.kuali.rice.kcb.quartz;
 18  
 
 19  
 import org.apache.log4j.Logger;
 20  
 import org.kuali.rice.core.framework.persistence.dao.GenericDao;
 21  
 import org.kuali.rice.kcb.bo.Message;
 22  
 import org.kuali.rice.kcb.bo.MessageDelivery;
 23  
 import org.kuali.rice.kcb.bo.MessageDeliveryStatus;
 24  
 import org.kuali.rice.kcb.deliverer.BulkMessageDeliverer;
 25  
 import org.kuali.rice.kcb.deliverer.MessageDeliverer;
 26  
 import org.kuali.rice.kcb.exception.MessageDeliveryProcessingException;
 27  
 import org.kuali.rice.kcb.quartz.ProcessingResult.Failure;
 28  
 import org.kuali.rice.kcb.service.GlobalKCBServiceLocator;
 29  
 import org.kuali.rice.kcb.service.MessageDelivererRegistryService;
 30  
 import org.kuali.rice.kcb.service.MessageDeliveryService;
 31  
 import org.kuali.rice.kcb.service.MessageService;
 32  
 import org.quartz.JobExecutionContext;
 33  
 import org.quartz.JobExecutionException;
 34  
 import org.quartz.StatefulJob;
 35  
 import org.springframework.beans.factory.annotation.Required;
 36  
 
 37  
 import java.util.ArrayList;
 38  
 import java.util.Collection;
 39  
 import java.util.HashMap;
 40  
 import java.util.HashSet;
 41  
 import java.util.LinkedList;
 42  
 import java.util.List;
 43  
 import java.util.Map;
 44  
 import java.util.Set;
 45  
 
 46  
 /**
 47  
  * Job that delivers messages to endpoints.  This job is not really stateful,
 48  
  * but should not be executed concurrently.
 49  
  * 
 50  
  * @author Kuali Rice Team (rice.collab@kuali.org)
 51  
  */
 52  0
 public class MessageProcessingJob extends ConcurrentJob<MessageDelivery> implements StatefulJob {
 53  
     public static final String NAME = "MessageProcessingJobDetail";
 54  
     public static final String GROUP = "KCB-Delivery";
 55  
 
 56  0
     public static enum Mode {
 57  0
         DELIVER, REMOVE
 58  
     }
 59  
 
 60  0
     private static final Logger LOG = Logger.getLogger(MessageProcessingJob.class);
 61  
     
 62  
     private GenericDao dao;
 63  
     private MessageDelivererRegistryService registry;
 64  
     private MessageDeliveryService messageDeliveryService;
 65  
     private Long messageId;
 66  0
     private Mode mode = null;
 67  
     private String user;
 68  
     private String cause;
 69  
 
 70  
     public MessageProcessingJob(Long messageId, Mode mode, String user, String cause) {
 71  0
         this();
 72  0
         this.messageId = messageId;
 73  0
         this.mode = mode;
 74  0
         this.user = user;
 75  0
         this.cause = cause;
 76  0
     }
 77  
 
 78  
 
 79  0
     public MessageProcessingJob() {
 80  0
         dao = GlobalKCBServiceLocator.getInstance().getKcbGenericDao();
 81  0
         registry = GlobalKCBServiceLocator.getInstance().getMessageDelivererRegistryService();
 82  0
         messageDeliveryService = GlobalKCBServiceLocator.getInstance().getMessageDeliveryService();
 83  0
         txManager = GlobalKCBServiceLocator.getInstance().getTransactionManager();
 84  0
     }
 85  
 
 86  
     /**
 87  
      * Sets the {@link GenericDao}
 88  
      * @param dao the {@link GenericDao}
 89  
      */
 90  
     @Required
 91  
     public void setGenericDao(GenericDao dao) {
 92  0
         this.dao = dao;
 93  0
     }
 94  
 
 95  
     /**
 96  
      * Sets the {@link MessageDelivererRegistryService}
 97  
      * @param registry the {@link MessageDelivererRegistryService}
 98  
      */
 99  
     @Required
 100  
     public void setMessageDelivererRegistry(MessageDelivererRegistryService registry) {
 101  0
         this.registry = registry;
 102  0
     }
 103  
     
 104  
     /**
 105  
      * Sets the {@link MessageDeliveryService}
 106  
      * @param messageDeliveryService the {@link MessageDeliveryService}
 107  
      */
 108  
     @Required
 109  
     public void setMessageDeliveryService(MessageDeliveryService messageDeliveryService) {
 110  0
         this.messageDeliveryService = messageDeliveryService;
 111  0
     }
 112  
 
 113  
     @Override
 114  
     protected Collection<MessageDelivery> takeAvailableWorkItems() {
 115  
         MessageDeliveryStatus[] statuses;
 116  0
         switch (mode) {
 117  
             case DELIVER: {
 118  0
                 statuses = new MessageDeliveryStatus[] { MessageDeliveryStatus.UNDELIVERED };
 119  0
                 break;
 120  
             }
 121  
             case REMOVE: {
 122  0
                 if (messageId == null) {
 123  0
                     throw new IllegalStateException("Message id must be specified for message removal mode");
 124  
                 }
 125  0
                 statuses = new MessageDeliveryStatus[] { MessageDeliveryStatus.DELIVERED, MessageDeliveryStatus.UNDELIVERED };
 126  0
                 break;
 127  
             }
 128  
             default:
 129  0
                 throw new RuntimeException("Invalid mode: " + mode);
 130  
         }
 131  0
         for (MessageDeliveryStatus status: statuses) {
 132  0
             LOG.debug("Taking message deliveries with status: " + status);
 133  
         }
 134  0
         Collection<MessageDelivery> ds = messageDeliveryService.lockAndTakeMessageDeliveries(messageId, statuses);
 135  0
         LOG.debug("Took " + ds.size() + " deliveries");
 136  0
         for (MessageDelivery md: ds) {
 137  0
             LOG.debug(md);
 138  0
             md.setProcessCount(md.getProcessCount().intValue() + 1);
 139  
         }
 140  0
         return ds;
 141  
     }
 142  
 
 143  
     @Override
 144  
     protected void unlockWorkItem(MessageDelivery item) {
 145  0
         item.setLockedDate(null);
 146  0
         dao.save(item);
 147  0
     }
 148  
 
 149  
     /**
 150  
      * Group work items by deliverer and notification, so that deliveries to bulk deliverers are grouped
 151  
      * by notification
 152  
      * @see org.kuali.rice.ken.service.impl.ConcurrentJob#groupWorkItems(java.util.Collection)
 153  
      */
 154  
     @Override
 155  
     protected Collection<Collection<MessageDelivery>> groupWorkItems(Collection<MessageDelivery> workItems, ProcessingResult<MessageDelivery> result) {
 156  0
         Collection<Collection<MessageDelivery>> groupedWorkItems = new ArrayList<Collection<MessageDelivery>>(workItems.size());
 157  
 
 158  0
         Map<String, Collection<MessageDelivery>> bulkWorkUnits = new HashMap<String, Collection<MessageDelivery>>();
 159  0
         for (MessageDelivery messageDelivery: workItems) {
 160  
             
 161  0
             MessageDeliverer deliverer = registry.getDeliverer(messageDelivery);
 162  0
             if (deliverer == null) {
 163  0
                 LOG.error("Error obtaining message deliverer for message delivery: " + messageDelivery);
 164  0
                 result.addFailure(new Failure<MessageDelivery>(messageDelivery, "Error obtaining message deliverer for message delivery"));
 165  0
                 unlockWorkItemAtomically(messageDelivery);
 166  0
                 continue;
 167  
             }
 168  
 
 169  0
             if (deliverer instanceof BulkMessageDeliverer) {
 170  
                 // group by bulk-deliverer+message combo
 171  0
                 String key = messageDelivery.getDelivererTypeName() + ":" + messageDelivery.getMessage().getId();
 172  0
                 Collection<MessageDelivery> workUnit = bulkWorkUnits.get(key);
 173  0
                 if (workUnit == null) {
 174  0
                     workUnit = new LinkedList<MessageDelivery>();
 175  0
                     bulkWorkUnits.put(key, workUnit);
 176  
                 }
 177  0
                 workUnit.add(messageDelivery);
 178  0
             } else {
 179  0
                 ArrayList<MessageDelivery> l = new ArrayList<MessageDelivery>(1);
 180  0
                 l.add(messageDelivery);
 181  0
                 groupedWorkItems.add(l);
 182  
             }
 183  0
         }
 184  
 
 185  0
         return groupedWorkItems;
 186  
     }
 187  
     
 188  
     
 189  
     @Override
 190  
     protected Collection<MessageDelivery> processWorkItems(Collection<MessageDelivery> messageDeliveries) {
 191  0
         MessageDelivery firstMessageDelivery = messageDeliveries.iterator().next();
 192  
         // get our hands on the appropriate MessageDeliverer instance
 193  0
         MessageDeliverer messageDeliverer = registry.getDeliverer(firstMessageDelivery);
 194  0
         if (messageDeliverer == null) {
 195  0
             throw new RuntimeException("Message deliverer could not be obtained");
 196  
         }
 197  
     
 198  0
         if (messageDeliveries.size() > 1) {
 199  
             // this is a bulk deliverer, so we need to batch the MessageDeliveries
 200  0
             if (!(messageDeliverer instanceof BulkMessageDeliverer)) {
 201  0
                 throw new RuntimeException("Discrepency in dispatch service: deliverer for list of message deliveries is not a BulkMessageDeliverer");
 202  
             }
 203  0
             return bulkProcess((BulkMessageDeliverer) messageDeliverer, messageDeliveries, mode);
 204  
         } else {
 205  0
             return process(messageDeliverer, firstMessageDelivery, mode);
 206  
         }
 207  
     }
 208  
 
 209  
     /**
 210  
      * Implements delivery of a single MessageDelivery
 211  
      * @param deliverer the deliverer
 212  
      * @param messageDelivery the delivery
 213  
      * @return collection of strings indicating successful deliveries
 214  
      */
 215  
     protected Collection<MessageDelivery> process(MessageDeliverer messageDeliverer, MessageDelivery messageDelivery, Mode mode) {
 216  
         // we have our message deliverer, so tell it to deliver the message
 217  
         try {
 218  0
             if (mode == Mode.DELIVER) {
 219  0
                 messageDeliverer.deliver(messageDelivery);
 220  
                 // if processing was successful, set the count back to zero
 221  0
                 messageDelivery.setProcessCount(Integer.valueOf(0));
 222  
                 // by definition we have succeeded at this point if no exception was thrown by the messageDeliverer
 223  
                 // so update the status of the delivery message instance to DELIVERED (and unmark as taken)
 224  
                 // and persist
 225  0
                 updateStatusAndUnlock(messageDelivery, mode == Mode.DELIVER ? MessageDeliveryStatus.DELIVERED : MessageDeliveryStatus.REMOVED);
 226  
             } else {
 227  0
                 messageDeliverer.dismiss(messageDelivery, user, cause);
 228  
                 // don't need to set the processing count down to zero because we are just deleting the record entirely
 229  0
                 messageDeliveryService.deleteMessageDelivery(messageDelivery);
 230  
             }
 231  0
         } catch (MessageDeliveryProcessingException nmde) {
 232  0
             LOG.error("Error processing message delivery " + messageDelivery, nmde);
 233  0
             throw new RuntimeException(nmde);
 234  0
         }
 235  
 
 236  0
         LOG.debug("Message delivery '" + messageDelivery.getId() + "' for message '" + messageDelivery.getMessage().getId() + "' was successfully processed.");
 237  
         //PerformanceLog.logDuration("Time to dispatch notification delivery for notification " + messageDelivery.getMessage().getId(), System.currentTimeMillis() - messageDelivery.getNotification().getSendDateTime().getTime());
 238  
 
 239  0
         List<MessageDelivery> success = new ArrayList<MessageDelivery>(1);
 240  0
         success.add(messageDelivery);
 241  0
         return success;
 242  
     }
 243  
 
 244  
     /**
 245  
      * Implements bulk delivery of a collection of {@link MessageDelivery}s
 246  
      * @param deliverer the deliverer
 247  
      * @param messageDeliveries the deliveries
 248  
      * @return collection of strings indicating successful deliveries
 249  
      */
 250  
     protected Collection<MessageDelivery> bulkProcess(BulkMessageDeliverer messageDeliverer, Collection<MessageDelivery> messageDeliveries,  Mode mode) {
 251  0
         MessageDeliveryStatus targetStatus = (mode == Mode.DELIVER ? MessageDeliveryStatus.DELIVERED : MessageDeliveryStatus.REMOVED);
 252  
         // we have our message deliverer, so tell it to deliver the message
 253  
         try {
 254  0
             if (mode == Mode.DELIVER) {
 255  0
                 messageDeliverer.bulkDeliver(messageDeliveries);
 256  
             } else {
 257  0
                 messageDeliverer.bulkDismiss(messageDeliveries);
 258  
             }
 259  0
         } catch (MessageDeliveryProcessingException nmde) {
 260  0
             LOG.error("Error bulk-delivering messages " + messageDeliveries, nmde);
 261  0
             throw new RuntimeException(nmde);
 262  0
         }
 263  
 
 264  
         // by definition we have succeeded at this point if no exception was thrown by the messageDeliverer
 265  
         // so update the status of the delivery message instance to DELIVERED (and unmark as taken)
 266  
         // and persist
 267  0
         List<MessageDelivery> successes = new ArrayList<MessageDelivery>(messageDeliveries.size());
 268  0
         for (MessageDelivery nmd: messageDeliveries) {
 269  0
             successes.add(nmd);
 270  0
             LOG.debug("Message delivery '" + nmd.getId() + "' for notification '" + nmd.getMessage().getId() + "' was successfully delivered.");
 271  
             //PerformanceLog.logDuration("Time to dispatch notification delivery for notification " + nmd.getMessage().getId(), System.currentTimeMillis() - nmd.getNotification().getSendDateTime().getTime());
 272  0
             if (mode == Mode.REMOVE) {
 273  0
                 messageDeliveryService.deleteMessageDelivery(nmd);
 274  
             } else {
 275  0
                 nmd.setProcessCount(0);
 276  0
                 updateStatusAndUnlock(nmd, targetStatus);                
 277  
             }
 278  
         }
 279  
         
 280  0
         return successes;
 281  
     }
 282  
 
 283  
     @Override
 284  
     protected void finishProcessing(ProcessingResult<MessageDelivery> result) {
 285  0
         LOG.debug("Message processing job: " + result.getSuccesses().size() + " processed, " + result.getFailures().size() + " failures");
 286  0
         Set<Long> messageIds = new HashSet<Long>(result.getSuccesses().size());
 287  0
         for (MessageDelivery md: result.getSuccesses()) {
 288  0
             messageIds.add(md.getMessage().getId());
 289  
         }
 290  0
         MessageService ms = GlobalKCBServiceLocator.getInstance().getMessageService();
 291  0
         for (Long id: messageIds) {
 292  0
             LOG.debug("Finishing processing message " + id);
 293  
             //if (Mode.REMOVE == mode) {
 294  
             
 295  0
             Message m = ms.getMessage(id);
 296  
             
 297  0
             Collection<MessageDelivery> c = messageDeliveryService.getMessageDeliveries(m);
 298  0
             if (c.size() == 0) {
 299  0
                 LOG.debug("Deleting message " + m);
 300  0
                 ms.deleteMessage(m);
 301  
             } else {
 302  0
                 LOG.debug("Message " + m.getId() + " has " + c.size() + " deliveries");
 303  0
                 for (MessageDelivery md: c) {
 304  0
                     LOG.debug(md);
 305  
                 }
 306  
             }
 307  0
         }
 308  0
     }
 309  
 
 310  
     /**
 311  
      * Marks a MessageDelivery as having been delivered, and unlocks it
 312  
      * @param messageDelivery the messageDelivery instance to mark
 313  
      */
 314  
     protected void updateStatusAndUnlock(MessageDelivery messageDelivery, MessageDeliveryStatus status) {
 315  0
         messageDelivery.setDeliveryStatus(status);
 316  
         // mark as unlocked
 317  0
         messageDelivery.setLockedDate(null);
 318  0
         dao.save(messageDelivery);
 319  0
     }
 320  
 
 321  
     @Override
 322  
     public ProcessingResult<MessageDelivery> run() {
 323  0
         LOG.debug("MessageProcessingJob running in Thread " + Thread.currentThread() + ": " + mode + " " + user + " " + cause);
 324  0
         return super.run();
 325  
     }
 326  
 
 327  
     public void execute(JobExecutionContext context) throws JobExecutionException {
 328  0
         String mode = context.getMergedJobDataMap().getString("mode");
 329  0
         if (mode != null) {
 330  0
             this.mode = Mode.valueOf(mode);
 331  
         } else {
 332  0
             this.mode = Mode.DELIVER;
 333  
         }
 334  0
         this.user = context.getMergedJobDataMap().getString("user");
 335  0
         this.cause = context.getMergedJobDataMap().getString("cause");
 336  0
         if (context.getMergedJobDataMap().containsKey("messageId")) {
 337  0
             this.messageId = context.getMergedJobDataMap().getLong("messageId");
 338  
         }
 339  0
         LOG.debug("==== message processing job: " + this.mode + " message id: " + this.messageId + "====");
 340  0
         super.run();
 341  0
     }
 342  
 }