Coverage Report - org.kuali.rice.kcb.quartz.MessageProcessingJob
 
Classes in this File Line Coverage Branch Coverage Complexity
MessageProcessingJob
0%
0/136
0%
0/47
3.2
MessageProcessingJob$1
0%
0/1
N/A
3.2
MessageProcessingJob$Mode
0%
0/2
N/A
3.2
 
 1  
 /**
 2  
  * Copyright 2005-2011 The Kuali Foundation
 3  
  *
 4  
  * Licensed under the Educational Community License, Version 2.0 (the "License");
 5  
  * you may not use this file except in compliance with the License.
 6  
  * You may obtain a copy of the License at
 7  
  *
 8  
  * http://www.opensource.org/licenses/ecl2.php
 9  
  *
 10  
  * Unless required by applicable law or agreed to in writing, software
 11  
  * distributed under the License is distributed on an "AS IS" BASIS,
 12  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  
  * See the License for the specific language governing permissions and
 14  
  * limitations under the License.
 15  
  */
 16  
 package org.kuali.rice.kcb.quartz;
 17  
 
 18  
 import org.apache.log4j.Logger;
 19  
 import org.kuali.rice.core.framework.persistence.dao.GenericDao;
 20  
 import org.kuali.rice.kcb.bo.Message;
 21  
 import org.kuali.rice.kcb.bo.MessageDelivery;
 22  
 import org.kuali.rice.kcb.bo.MessageDeliveryStatus;
 23  
 import org.kuali.rice.kcb.deliverer.BulkMessageDeliverer;
 24  
 import org.kuali.rice.kcb.deliverer.MessageDeliverer;
 25  
 import org.kuali.rice.kcb.api.exception.MessageDeliveryProcessingException;
 26  
 import org.kuali.rice.kcb.quartz.ProcessingResult.Failure;
 27  
 import org.kuali.rice.kcb.service.GlobalKCBServiceLocator;
 28  
 import org.kuali.rice.kcb.service.MessageDelivererRegistryService;
 29  
 import org.kuali.rice.kcb.service.MessageDeliveryService;
 30  
 import org.kuali.rice.kcb.service.MessageService;
 31  
 import org.quartz.JobExecutionContext;
 32  
 import org.quartz.JobExecutionException;
 33  
 import org.quartz.StatefulJob;
 34  
 import org.springframework.beans.factory.annotation.Required;
 35  
 
 36  
 import java.util.ArrayList;
 37  
 import java.util.Collection;
 38  
 import java.util.HashMap;
 39  
 import java.util.HashSet;
 40  
 import java.util.LinkedList;
 41  
 import java.util.List;
 42  
 import java.util.Map;
 43  
 import java.util.Set;
 44  
 
 45  
 /**
 46  
  * Job that delivers messages to endpoints.  This job is not really stateful,
 47  
  * but should not be executed concurrently.
 48  
  * 
 49  
  * @author Kuali Rice Team (rice.collab@kuali.org)
 50  
  */
 51  0
 public class MessageProcessingJob extends ConcurrentJob<MessageDelivery> implements StatefulJob {
 52  
     public static final String NAME = "MessageProcessingJobDetail";
 53  
     public static final String GROUP = "KCB-Delivery";
 54  
 
 55  0
     public static enum Mode {
 56  0
         DELIVER, REMOVE
 57  
     }
 58  
 
 59  0
     private static final Logger LOG = Logger.getLogger(MessageProcessingJob.class);
 60  
     
 61  
     private GenericDao dao;
 62  
     private MessageDelivererRegistryService registry;
 63  
     private MessageDeliveryService messageDeliveryService;
 64  
     private Long messageId;
 65  0
     private Mode mode = null;
 66  
     private String user;
 67  
     private String cause;
 68  
 
 69  
     public MessageProcessingJob(Long messageId, Mode mode, String user, String cause) {
 70  0
         this();
 71  0
         this.messageId = messageId;
 72  0
         this.mode = mode;
 73  0
         this.user = user;
 74  0
         this.cause = cause;
 75  0
     }
 76  
 
 77  
 
 78  0
     public MessageProcessingJob() {
 79  0
         dao = GlobalKCBServiceLocator.getInstance().getKcbGenericDao();
 80  0
         registry = GlobalKCBServiceLocator.getInstance().getMessageDelivererRegistryService();
 81  0
         messageDeliveryService = GlobalKCBServiceLocator.getInstance().getMessageDeliveryService();
 82  0
         txManager = GlobalKCBServiceLocator.getInstance().getTransactionManager();
 83  0
     }
 84  
 
 85  
     /**
 86  
      * Sets the {@link GenericDao}
 87  
      * @param dao the {@link GenericDao}
 88  
      */
 89  
     @Required
 90  
     public void setGenericDao(GenericDao dao) {
 91  0
         this.dao = dao;
 92  0
     }
 93  
 
 94  
     /**
 95  
      * Sets the {@link MessageDelivererRegistryService}
 96  
      * @param registry the {@link MessageDelivererRegistryService}
 97  
      */
 98  
     @Required
 99  
     public void setMessageDelivererRegistry(MessageDelivererRegistryService registry) {
 100  0
         this.registry = registry;
 101  0
     }
 102  
     
 103  
     /**
 104  
      * Sets the {@link MessageDeliveryService}
 105  
      * @param messageDeliveryService the {@link MessageDeliveryService}
 106  
      */
 107  
     @Required
 108  
     public void setMessageDeliveryService(MessageDeliveryService messageDeliveryService) {
 109  0
         this.messageDeliveryService = messageDeliveryService;
 110  0
     }
 111  
 
 112  
     @Override
 113  
     protected Collection<MessageDelivery> takeAvailableWorkItems() {
 114  
         MessageDeliveryStatus[] statuses;
 115  0
         switch (mode) {
 116  
             case DELIVER: {
 117  0
                 statuses = new MessageDeliveryStatus[] { MessageDeliveryStatus.UNDELIVERED };
 118  0
                 break;
 119  
             }
 120  
             case REMOVE: {
 121  0
                 if (messageId == null) {
 122  0
                     throw new IllegalStateException("Message id must be specified for message removal mode");
 123  
                 }
 124  0
                 statuses = new MessageDeliveryStatus[] { MessageDeliveryStatus.DELIVERED, MessageDeliveryStatus.UNDELIVERED };
 125  0
                 break;
 126  
             }
 127  
             default:
 128  0
                 throw new RuntimeException("Invalid mode: " + mode);
 129  
         }
 130  0
         for (MessageDeliveryStatus status: statuses) {
 131  0
             LOG.debug("Taking message deliveries with status: " + status);
 132  
         }
 133  0
         Collection<MessageDelivery> ds = messageDeliveryService.lockAndTakeMessageDeliveries(messageId, statuses);
 134  0
         LOG.debug("Took " + ds.size() + " deliveries");
 135  0
         for (MessageDelivery md: ds) {
 136  0
             LOG.debug(md);
 137  0
             md.setProcessCount(md.getProcessCount().intValue() + 1);
 138  
         }
 139  0
         return ds;
 140  
     }
 141  
 
 142  
     @Override
 143  
     protected void unlockWorkItem(MessageDelivery item) {
 144  0
         item.setLockedDate(null);
 145  0
         dao.save(item);
 146  0
     }
 147  
 
 148  
     /**
 149  
      * Group work items by deliverer and notification, so that deliveries to bulk deliverers are grouped
 150  
      * by notification
 151  
      * @see org.kuali.rice.ken.service.impl.ConcurrentJob#groupWorkItems(java.util.Collection)
 152  
      */
 153  
     @Override
 154  
     protected Collection<Collection<MessageDelivery>> groupWorkItems(Collection<MessageDelivery> workItems, ProcessingResult<MessageDelivery> result) {
 155  0
         Collection<Collection<MessageDelivery>> groupedWorkItems = new ArrayList<Collection<MessageDelivery>>(workItems.size());
 156  
 
 157  0
         Map<String, Collection<MessageDelivery>> bulkWorkUnits = new HashMap<String, Collection<MessageDelivery>>();
 158  0
         for (MessageDelivery messageDelivery: workItems) {
 159  
             
 160  0
             MessageDeliverer deliverer = registry.getDeliverer(messageDelivery);
 161  0
             if (deliverer == null) {
 162  0
                 LOG.error("Error obtaining message deliverer for message delivery: " + messageDelivery);
 163  0
                 result.addFailure(new Failure<MessageDelivery>(messageDelivery, "Error obtaining message deliverer for message delivery"));
 164  0
                 unlockWorkItemAtomically(messageDelivery);
 165  0
                 continue;
 166  
             }
 167  
 
 168  0
             if (deliverer instanceof BulkMessageDeliverer) {
 169  
                 // group by bulk-deliverer+message combo
 170  0
                 String key = messageDelivery.getDelivererTypeName() + ":" + messageDelivery.getMessage().getId();
 171  0
                 Collection<MessageDelivery> workUnit = bulkWorkUnits.get(key);
 172  0
                 if (workUnit == null) {
 173  0
                     workUnit = new LinkedList<MessageDelivery>();
 174  0
                     bulkWorkUnits.put(key, workUnit);
 175  
                 }
 176  0
                 workUnit.add(messageDelivery);
 177  0
             } else {
 178  0
                 ArrayList<MessageDelivery> l = new ArrayList<MessageDelivery>(1);
 179  0
                 l.add(messageDelivery);
 180  0
                 groupedWorkItems.add(l);
 181  
             }
 182  0
         }
 183  
 
 184  0
         return groupedWorkItems;
 185  
     }
 186  
     
 187  
     
 188  
     @Override
 189  
     protected Collection<MessageDelivery> processWorkItems(Collection<MessageDelivery> messageDeliveries) {
 190  0
         MessageDelivery firstMessageDelivery = messageDeliveries.iterator().next();
 191  
         // get our hands on the appropriate MessageDeliverer instance
 192  0
         MessageDeliverer messageDeliverer = registry.getDeliverer(firstMessageDelivery);
 193  0
         if (messageDeliverer == null) {
 194  0
             throw new RuntimeException("Message deliverer could not be obtained");
 195  
         }
 196  
     
 197  0
         if (messageDeliveries.size() > 1) {
 198  
             // this is a bulk deliverer, so we need to batch the MessageDeliveries
 199  0
             if (!(messageDeliverer instanceof BulkMessageDeliverer)) {
 200  0
                 throw new RuntimeException("Discrepency in dispatch service: deliverer for list of message deliveries is not a BulkMessageDeliverer");
 201  
             }
 202  0
             return bulkProcess((BulkMessageDeliverer) messageDeliverer, messageDeliveries, mode);
 203  
         } else {
 204  0
             return process(messageDeliverer, firstMessageDelivery, mode);
 205  
         }
 206  
     }
 207  
 
 208  
     /**
 209  
      * Implements delivery of a single MessageDelivery
 210  
      * @param deliverer the deliverer
 211  
      * @param messageDelivery the delivery
 212  
      * @return collection of strings indicating successful deliveries
 213  
      */
 214  
     protected Collection<MessageDelivery> process(MessageDeliverer messageDeliverer, MessageDelivery messageDelivery, Mode mode) {
 215  
         // we have our message deliverer, so tell it to deliver the message
 216  
         try {
 217  0
             if (mode == Mode.DELIVER) {
 218  0
                 messageDeliverer.deliver(messageDelivery);
 219  
                 // if processing was successful, set the count back to zero
 220  0
                 messageDelivery.setProcessCount(Integer.valueOf(0));
 221  
                 // by definition we have succeeded at this point if no exception was thrown by the messageDeliverer
 222  
                 // so update the status of the delivery message instance to DELIVERED (and unmark as taken)
 223  
                 // and persist
 224  0
                 updateStatusAndUnlock(messageDelivery, mode == Mode.DELIVER ? MessageDeliveryStatus.DELIVERED : MessageDeliveryStatus.REMOVED);
 225  
             } else {
 226  0
                 messageDeliverer.dismiss(messageDelivery, user, cause);
 227  
                 // don't need to set the processing count down to zero because we are just deleting the record entirely
 228  0
                 messageDeliveryService.deleteMessageDelivery(messageDelivery);
 229  
             }
 230  0
         } catch (MessageDeliveryProcessingException nmde) {
 231  0
             LOG.error("Error processing message delivery " + messageDelivery, nmde);
 232  0
             throw new RuntimeException(nmde);
 233  0
         }
 234  
 
 235  0
         LOG.debug("Message delivery '" + messageDelivery.getId() + "' for message '" + messageDelivery.getMessage().getId() + "' was successfully processed.");
 236  
         //PerformanceLog.logDuration("Time to dispatch notification delivery for notification " + messageDelivery.getMessage().getId(), System.currentTimeMillis() - messageDelivery.getNotification().getSendDateTime().getTime());
 237  
 
 238  0
         List<MessageDelivery> success = new ArrayList<MessageDelivery>(1);
 239  0
         success.add(messageDelivery);
 240  0
         return success;
 241  
     }
 242  
 
 243  
     /**
 244  
      * Implements bulk delivery of a collection of {@link MessageDelivery}s
 245  
      * @param deliverer the deliverer
 246  
      * @param messageDeliveries the deliveries
 247  
      * @return collection of strings indicating successful deliveries
 248  
      */
 249  
     protected Collection<MessageDelivery> bulkProcess(BulkMessageDeliverer messageDeliverer, Collection<MessageDelivery> messageDeliveries,  Mode mode) {
 250  0
         MessageDeliveryStatus targetStatus = (mode == Mode.DELIVER ? MessageDeliveryStatus.DELIVERED : MessageDeliveryStatus.REMOVED);
 251  
         // we have our message deliverer, so tell it to deliver the message
 252  
         try {
 253  0
             if (mode == Mode.DELIVER) {
 254  0
                 messageDeliverer.bulkDeliver(messageDeliveries);
 255  
             } else {
 256  0
                 messageDeliverer.bulkDismiss(messageDeliveries);
 257  
             }
 258  0
         } catch (MessageDeliveryProcessingException nmde) {
 259  0
             LOG.error("Error bulk-delivering messages " + messageDeliveries, nmde);
 260  0
             throw new RuntimeException(nmde);
 261  0
         }
 262  
 
 263  
         // by definition we have succeeded at this point if no exception was thrown by the messageDeliverer
 264  
         // so update the status of the delivery message instance to DELIVERED (and unmark as taken)
 265  
         // and persist
 266  0
         List<MessageDelivery> successes = new ArrayList<MessageDelivery>(messageDeliveries.size());
 267  0
         for (MessageDelivery nmd: messageDeliveries) {
 268  0
             successes.add(nmd);
 269  0
             LOG.debug("Message delivery '" + nmd.getId() + "' for notification '" + nmd.getMessage().getId() + "' was successfully delivered.");
 270  
             //PerformanceLog.logDuration("Time to dispatch notification delivery for notification " + nmd.getMessage().getId(), System.currentTimeMillis() - nmd.getNotification().getSendDateTime().getTime());
 271  0
             if (mode == Mode.REMOVE) {
 272  0
                 messageDeliveryService.deleteMessageDelivery(nmd);
 273  
             } else {
 274  0
                 nmd.setProcessCount(0);
 275  0
                 updateStatusAndUnlock(nmd, targetStatus);                
 276  
             }
 277  
         }
 278  
         
 279  0
         return successes;
 280  
     }
 281  
 
 282  
     @Override
 283  
     protected void finishProcessing(ProcessingResult<MessageDelivery> result) {
 284  0
         LOG.debug("Message processing job: " + result.getSuccesses().size() + " processed, " + result.getFailures().size() + " failures");
 285  0
         Set<Long> messageIds = new HashSet<Long>(result.getSuccesses().size());
 286  0
         for (MessageDelivery md: result.getSuccesses()) {
 287  0
             messageIds.add(md.getMessage().getId());
 288  
         }
 289  0
         MessageService ms = GlobalKCBServiceLocator.getInstance().getMessageService();
 290  0
         for (Long id: messageIds) {
 291  0
             LOG.debug("Finishing processing message " + id);
 292  
             //if (Mode.REMOVE == mode) {
 293  
             
 294  0
             Message m = ms.getMessage(id);
 295  
             
 296  0
             Collection<MessageDelivery> c = messageDeliveryService.getMessageDeliveries(m);
 297  0
             if (c.size() == 0) {
 298  0
                 LOG.debug("Deleting message " + m);
 299  0
                 ms.deleteMessage(m);
 300  
             } else {
 301  0
                 LOG.debug("Message " + m.getId() + " has " + c.size() + " deliveries");
 302  0
                 for (MessageDelivery md: c) {
 303  0
                     LOG.debug(md);
 304  
                 }
 305  
             }
 306  0
         }
 307  0
     }
 308  
 
 309  
     /**
 310  
      * Marks a MessageDelivery as having been delivered, and unlocks it
 311  
      * @param messageDelivery the messageDelivery instance to mark
 312  
      */
 313  
     protected void updateStatusAndUnlock(MessageDelivery messageDelivery, MessageDeliveryStatus status) {
 314  0
         messageDelivery.setDeliveryStatus(status);
 315  
         // mark as unlocked
 316  0
         messageDelivery.setLockedDate(null);
 317  0
         dao.save(messageDelivery);
 318  0
     }
 319  
 
 320  
     @Override
 321  
     public ProcessingResult<MessageDelivery> run() {
 322  0
         LOG.debug("MessageProcessingJob running in Thread " + Thread.currentThread() + ": " + mode + " " + user + " " + cause);
 323  0
         return super.run();
 324  
     }
 325  
 
 326  
     public void execute(JobExecutionContext context) throws JobExecutionException {
 327  0
         String mode = context.getMergedJobDataMap().getString("mode");
 328  0
         if (mode != null) {
 329  0
             this.mode = Mode.valueOf(mode);
 330  
         } else {
 331  0
             this.mode = Mode.DELIVER;
 332  
         }
 333  0
         this.user = context.getMergedJobDataMap().getString("user");
 334  0
         this.cause = context.getMergedJobDataMap().getString("cause");
 335  0
         if (context.getMergedJobDataMap().containsKey("messageId")) {
 336  0
             this.messageId = context.getMergedJobDataMap().getLong("messageId");
 337  
         }
 338  0
         LOG.debug("==== message processing job: " + this.mode + " message id: " + this.messageId + "====");
 339  0
         super.run();
 340  0
     }
 341  
 }