Coverage Report - org.kuali.rice.kcb.service.impl.MessagingServiceImpl
 
Classes in this File Line Coverage Branch Coverage Complexity
MessagingServiceImpl
0%
0/67
0%
0/14
2
MessagingServiceImpl$1
N/A
N/A
2
MessagingServiceImpl$QueueProcessingJobSynchronization
0%
0/36
0%
0/2
2
 
 1  
 /*
 2  
  * Copyright 2007-2008 The Kuali Foundation
 3  
  *
 4  
  * Licensed under the Educational Community License, Version 2.0 (the "License");
 5  
  * you may not use this file except in compliance with the License.
 6  
  * You may obtain a copy of the License at
 7  
  *
 8  
  * http://www.opensource.org/licenses/ecl2.php
 9  
  *
 10  
  * Unless required by applicable law or agreed to in writing, software
 11  
  * distributed under the License is distributed on an "AS IS" BASIS,
 12  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  
  * See the License for the specific language governing permissions and
 14  
  * limitations under the License.
 15  
  */
 16  
 package org.kuali.rice.kcb.service.impl;
 17  
 
 18  
 import java.util.Collection;
 19  
 import java.util.HashSet;
 20  
 import java.util.Set;
 21  
 
 22  
 import org.apache.log4j.Logger;
 23  
 import org.kuali.rice.core.api.exception.RiceRuntimeException;
 24  
 import org.kuali.rice.kcb.bo.Message;
 25  
 import org.kuali.rice.kcb.bo.MessageDelivery;
 26  
 import org.kuali.rice.kcb.bo.RecipientDelivererConfig;
 27  
 import org.kuali.rice.kcb.dto.MessageDTO;
 28  
 import org.kuali.rice.kcb.exception.MessageDeliveryException;
 29  
 import org.kuali.rice.kcb.exception.MessageDismissalException;
 30  
 import org.kuali.rice.kcb.quartz.MessageProcessingJob;
 31  
 import org.kuali.rice.kcb.service.MessageDeliveryService;
 32  
 import org.kuali.rice.kcb.service.MessageService;
 33  
 import org.kuali.rice.kcb.service.MessagingService;
 34  
 import org.kuali.rice.kcb.service.RecipientPreferenceService;
 35  
 import org.kuali.rice.ksb.service.KSBServiceLocator;
 36  
 import org.quartz.JobDataMap;
 37  
 import org.quartz.Scheduler;
 38  
 import org.quartz.SchedulerException;
 39  
 import org.quartz.SimpleTrigger;
 40  
 import org.springframework.beans.factory.annotation.Required;
 41  
 import org.springframework.transaction.support.TransactionSynchronizationAdapter;
 42  
 import org.springframework.transaction.support.TransactionSynchronizationManager;
 43  
 
 44  
 /**
 45  
  * MessagingService implementation 
 46  
  * 
 47  
  * @author Kuali Rice Team (rice.collab@kuali.org)
 48  
  */
 49  0
 public class MessagingServiceImpl implements MessagingService {
 50  0
     private static final Logger LOG = Logger.getLogger(MessagingServiceImpl.class);
 51  
 
 52  
     private MessageService messageService;
 53  
     private MessageDeliveryService messageDeliveryService;
 54  
     private RecipientPreferenceService recipientPrefs;
 55  
     private String jobName;
 56  
     private String jobGroup;
 57  
 
 58  
     /**
 59  
      * Whether to perform the processing  synchronously
 60  
      */
 61  
     private boolean synchronous;
 62  
     
 63  
     /**
 64  
      * Sets the name of the target job to run to process messages
 65  
      * @param jobName the name of the target job to run to process messages
 66  
      */
 67  
     public void setJobName(String jobName) {
 68  0
         this.jobName = jobName;
 69  0
     }
 70  
 
 71  
     /**
 72  
      * Sets the group of the target job to run to process messages
 73  
      * @param jobGroup Sets the group of the target job to run to process messages
 74  
      */
 75  
     public void setJobGroup(String jobGroup) {
 76  0
         this.jobGroup = jobGroup;
 77  0
     }
 78  
 
 79  
     /**
 80  
      * Sets the MessageService
 81  
      * @param messageService the MessageService
 82  
      */
 83  
     @Required
 84  
     public void setMessageService(MessageService messageService) {
 85  0
         this.messageService = messageService;
 86  0
     }
 87  
 
 88  
     /**
 89  
      * Sets the MessageDeliveryService
 90  
      * @param messageDeliveryService the MessageDeliveryService
 91  
      */
 92  
     @Required
 93  
     public void setMessageDeliveryService(MessageDeliveryService messageDeliveryService) {
 94  0
         this.messageDeliveryService = messageDeliveryService;
 95  0
     }
 96  
 
 97  
     /**
 98  
      * Sets whether to perform the processing synchronously
 99  
      * @param sync whether to perform the processing synchronously
 100  
      */
 101  
     public void setSynchronous(boolean sync) {
 102  0
         LOG.debug("Setting synchronous messaging to: " + sync);
 103  0
         this.synchronous = sync;
 104  0
     }
 105  
 
 106  
     /**
 107  
      * Sets the RecipientPreferencesService
 108  
      * @param prefs the RecipientPreferenceService
 109  
      */
 110  
     @Required
 111  
     public void setRecipientPreferenceService(RecipientPreferenceService prefs) {
 112  0
         this.recipientPrefs = prefs;
 113  0
     }
 114  
 
 115  
     /**
 116  
      * @see org.kuali.rice.kcb.service.MessagingService#deliver(org.kuali.rice.kcb.dto.MessageDTO)
 117  
      */
 118  
     public Long deliver(MessageDTO message) throws MessageDeliveryException {
 119  0
         Collection<String> delivererTypes = getDelivererTypesForUserAndChannel(message.getRecipient(), message.getChannel());
 120  0
         LOG.debug("Deliverer types for " + message.getRecipient() + "/" + message.getChannel() + ": " + delivererTypes.size());
 121  
 
 122  0
         if (delivererTypes.size() == 0) {
 123  
             // no deliverers configured? just skipp it
 124  0
             LOG.debug("No deliverers are configured for " + message.getRecipient() + "/" + message.getChannel());
 125  0
             return null;
 126  
         }
 127  
 
 128  0
         final Message m = new Message();
 129  0
         m.setTitle(message.getTitle());
 130  0
         m.setDeliveryType(message.getDeliveryType());
 131  0
         m.setChannel(message.getChannel());
 132  0
         m.setRecipient(message.getRecipient());
 133  0
         m.setContentType(message.getContentType());
 134  0
         m.setUrl(message.getUrl());
 135  0
         m.setContent(message.getContent());
 136  0
         m.setOriginId(message.getOriginId());
 137  
 
 138  0
         LOG.debug("saving message: " +m);
 139  0
         messageService.saveMessage(m);
 140  
 
 141  0
         for (String type: delivererTypes) {
 142  
             
 143  0
             MessageDelivery delivery = new MessageDelivery();
 144  0
             delivery.setDelivererTypeName(type);
 145  0
             delivery.setMessage(m);
 146  
 
 147  
 //            MessageDeliverer deliverer = delivererRegistry.getDeliverer(delivery);
 148  
 //            if (deliverer != null) {
 149  
 //                deliverer.deliverMessage(delivery);
 150  
 //            }
 151  
         
 152  0
             LOG.debug("saving messagedelivery: " +delivery);
 153  0
             messageDeliveryService.saveMessageDelivery(delivery);
 154  0
         }
 155  
 
 156  0
         LOG.debug("queuing job");
 157  0
         queueJob(MessageProcessingJob.Mode.DELIVER, m.getId(), null, null);
 158  
 
 159  0
         LOG.debug("returning");
 160  0
         return m.getId();
 161  
     }
 162  
 
 163  
     /**
 164  
      * @see org.kuali.rice.kcb.service.MessagingService#remove(long, java.lang.String, java.lang.String)
 165  
      */
 166  
     public void remove(long messageId, String user, String cause) throws MessageDismissalException {
 167  0
         Message m = messageService.getMessage(Long.valueOf(messageId));
 168  0
         if (m == null) {
 169  0
             throw new MessageDismissalException("No such message: " + messageId);
 170  
         }
 171  
 
 172  0
         remove (m, user, cause);
 173  0
     }
 174  
 
 175  
     /**
 176  
      * @see org.kuali.rice.kcb.service.MessagingService#removeByOriginId(java.lang.String, java.lang.String, java.lang.String)
 177  
      */
 178  
     public Long removeByOriginId(String originId, String user, String cause) throws MessageDismissalException {
 179  0
         Message m = messageService.getMessageByOriginId(originId);
 180  0
         if (m == null) {
 181  0
             return null; 
 182  
             //throw new MessageDismissalException("No such message with origin id: " + originId);
 183  
         }
 184  0
         remove(m, user, cause);
 185  0
         return m.getId();
 186  
     }
 187  
 
 188  
     private void remove(Message message, String user, String cause) {
 189  0
         queueJob(MessageProcessingJob.Mode.REMOVE, message.getId(), user, cause);
 190  0
     }
 191  
 
 192  
     /**
 193  
      * Determines what delivery endpoints the user has configured
 194  
      * @param userRecipientId the user
 195  
      * @return a Set of NotificationConstants.MESSAGE_DELIVERY_TYPES
 196  
      */
 197  
     private Collection<String> getDelivererTypesForUserAndChannel(String userRecipientId, String channel) {
 198  0
         Set<String> deliveryTypes = new HashSet<String>(1);
 199  
         
 200  
         // manually add the default one since they don't have an option on this one
 201  
         //deliveryTypes.add(NotificationConstants.MESSAGE_DELIVERY_TYPES.DEFAULT_MESSAGE_DELIVERY_TYPE);
 202  
         
 203  
         //now look for what they've configured for themselves
 204  0
         Collection<RecipientDelivererConfig> deliverers = recipientPrefs.getDeliverersForRecipientAndChannel(userRecipientId, channel);
 205  
         
 206  0
         for (RecipientDelivererConfig cfg: deliverers) {
 207  0
             deliveryTypes.add(cfg.getDelivererName());
 208  
         }
 209  
         //return GlobalNotificationServiceLocator.getInstance().getKENAPIService().getDeliverersForRecipientAndChannel(userRecipientId, channel);
 210  
 
 211  0
         return deliveryTypes;
 212  
     }
 213  
 
 214  
     private void queueJob(MessageProcessingJob.Mode mode, long messageId, String user, String cause) {
 215  
         // queue up the processing job after the transaction has committed
 216  0
         LOG.debug("registering synchronization");
 217  
 
 218  0
         if (!TransactionSynchronizationManager.isSynchronizationActive()) {
 219  0
                 throw new RiceRuntimeException("transaction syncronization is not active " +
 220  
                                 "(!TransactionSynchronizationManager.isSynchronizationActive())");
 221  0
         } else if (!TransactionSynchronizationManager.isActualTransactionActive()) {
 222  0
                 throw new RiceRuntimeException("actual transaction is not active " +
 223  
                                 "(!TransactionSynchronizationManager.isActualTransactionActive())");
 224  
         }
 225  
 
 226  0
         TransactionSynchronizationManager.registerSynchronization(new QueueProcessingJobSynchronization(
 227  
             jobName,
 228  
             jobGroup,
 229  
             mode,
 230  
             messageId,
 231  
             user,
 232  
             cause,
 233  
             synchronous
 234  
         ));
 235  0
     }
 236  
     
 237  0
     public static class QueueProcessingJobSynchronization extends TransactionSynchronizationAdapter {
 238  0
         private static final Logger LOG = Logger.getLogger(QueueProcessingJobSynchronization.class);
 239  
         private final String jobName;
 240  
         private final String jobGroup;
 241  
         private final MessageProcessingJob.Mode mode;
 242  
         private final long messageId;
 243  
         private final String user;
 244  
         private final String cause;
 245  
         private final boolean synchronous;
 246  
 
 247  0
         private QueueProcessingJobSynchronization(String jobName, String jobGroup, MessageProcessingJob.Mode mode, long messageId, String user, String cause, boolean synchronous) {
 248  0
             this.jobName = jobName;
 249  0
             this.jobGroup = jobGroup;
 250  0
             this.mode = mode;
 251  0
             this.messageId = messageId;
 252  0
             this.user = user;
 253  0
             this.cause = cause;
 254  0
             this.synchronous = synchronous;
 255  0
         }
 256  
 
 257  
         /*
 258  
         @Override
 259  
         public void beforeCommit(boolean readOnly) {
 260  
             super.beforeCommit(readOnly);
 261  
         }*/
 262  
 
 263  
         @Override
 264  
         public void afterCommit() {
 265  0
             scheduleJob();
 266  0
         }
 267  
         /*@Override
 268  
         public void afterCompletion(int status) {
 269  
             if (STATUS_COMMITTED == status) {
 270  
                 scheduleJob();
 271  
             } else {
 272  
                 LOG.error("Status is not committed.  Not scheduling message processing job.");
 273  
             }
 274  
         }*/
 275  
 
 276  
         private void scheduleJob() {
 277  0
             LOG.debug("Queueing processing job");
 278  
             try {
 279  0
                 Scheduler scheduler = KSBServiceLocator.getScheduler();
 280  0
                 if (synchronous) {
 281  0
                     LOG.debug("Invoking job synchronously in Thread " + Thread.currentThread());
 282  0
                     MessageProcessingJob job = new MessageProcessingJob(messageId, mode, user, cause);
 283  0
                     job.run();
 284  0
                 } else {
 285  0
                     String uniqueTriggerName = jobName + "-Trigger-" + System.currentTimeMillis() + Math.random();
 286  0
                     SimpleTrigger trigger = new SimpleTrigger(uniqueTriggerName, jobGroup + "-Trigger");
 287  0
                     LOG.debug("Scheduling trigger: " + trigger);
 288  
 
 289  0
                     JobDataMap data = new JobDataMap();
 290  0
                     data.put("mode", mode.name());
 291  0
                     data.put("user", user);
 292  0
                     data.put("cause", cause);
 293  0
                     data.put("messageId", messageId);
 294  
 
 295  0
                     trigger.setJobName(jobName);
 296  0
                     trigger.setJobGroup(jobGroup);
 297  0
                     trigger.setJobDataMap(data);
 298  0
                     scheduler.scheduleJob(trigger);
 299  
                 }
 300  0
             } catch (SchedulerException se) {
 301  0
                 throw new RuntimeException(se);
 302  0
             }
 303  0
         }
 304  
     }
 305  
 }