Coverage Report - org.kuali.rice.kcb.service.impl.MessagingServiceImpl
 
Classes in this File Line Coverage Branch Coverage Complexity
MessagingServiceImpl
0%
0/75
0%
0/22
2.533
MessagingServiceImpl$1
N/A
N/A
2.533
MessagingServiceImpl$QueueProcessingJobSynchronization
0%
0/36
0%
0/2
2.533
 
 1  
 /**
 2  
  * Copyright 2005-2011 The Kuali Foundation
 3  
  *
 4  
  * Licensed under the Educational Community License, Version 2.0 (the "License");
 5  
  * you may not use this file except in compliance with the License.
 6  
  * You may obtain a copy of the License at
 7  
  *
 8  
  * http://www.opensource.org/licenses/ecl2.php
 9  
  *
 10  
  * Unless required by applicable law or agreed to in writing, software
 11  
  * distributed under the License is distributed on an "AS IS" BASIS,
 12  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  
  * See the License for the specific language governing permissions and
 14  
  * limitations under the License.
 15  
  */
 16  
 package org.kuali.rice.kcb.service.impl;
 17  
 
 18  
 import java.util.Collection;
 19  
 import java.util.HashSet;
 20  
 import java.util.Set;
 21  
 
 22  
 import org.apache.commons.lang.StringUtils;
 23  
 import org.apache.log4j.Logger;
 24  
 import org.kuali.rice.core.api.exception.RiceIllegalArgumentException;
 25  
 import org.kuali.rice.core.api.exception.RiceRuntimeException;
 26  
 import org.kuali.rice.kcb.bo.Message;
 27  
 import org.kuali.rice.kcb.bo.MessageDelivery;
 28  
 import org.kuali.rice.kcb.bo.RecipientDelivererConfig;
 29  
 import org.kuali.rice.kcb.api.message.MessageDTO;
 30  
 import org.kuali.rice.kcb.api.exception.MessageDeliveryException;
 31  
 import org.kuali.rice.kcb.api.exception.MessageDismissalException;
 32  
 import org.kuali.rice.kcb.quartz.MessageProcessingJob;
 33  
 import org.kuali.rice.kcb.service.MessageDeliveryService;
 34  
 import org.kuali.rice.kcb.service.MessageService;
 35  
 import org.kuali.rice.kcb.api.service.MessagingService;
 36  
 import org.kuali.rice.kcb.service.RecipientPreferenceService;
 37  
 import org.kuali.rice.ksb.service.KSBServiceLocator;
 38  
 import org.quartz.JobDataMap;
 39  
 import org.quartz.Scheduler;
 40  
 import org.quartz.SchedulerException;
 41  
 import org.quartz.SimpleTrigger;
 42  
 import org.springframework.beans.factory.annotation.Required;
 43  
 import org.springframework.transaction.support.TransactionSynchronizationAdapter;
 44  
 import org.springframework.transaction.support.TransactionSynchronizationManager;
 45  
 
 46  
 /**
 47  
  * MessagingService implementation 
 48  
  * 
 49  
  * @author Kuali Rice Team (rice.collab@kuali.org)
 50  
  */
 51  0
 public class MessagingServiceImpl implements MessagingService {
 52  0
     private static final Logger LOG = Logger.getLogger(MessagingServiceImpl.class);
 53  
 
 54  
     private MessageService messageService;
 55  
     private MessageDeliveryService messageDeliveryService;
 56  
     private RecipientPreferenceService recipientPrefs;
 57  
     private String jobName;
 58  
     private String jobGroup;
 59  
 
 60  
     /**
 61  
      * Whether to perform the processing  synchronously
 62  
      */
 63  
     private boolean synchronous;
 64  
     
 65  
     /**
 66  
      * Sets the name of the target job to run to process messages
 67  
      * @param jobName the name of the target job to run to process messages
 68  
      */
 69  
     public void setJobName(String jobName) {
 70  0
         this.jobName = jobName;
 71  0
     }
 72  
 
 73  
     /**
 74  
      * Sets the group of the target job to run to process messages
 75  
      * @param jobGroup Sets the group of the target job to run to process messages
 76  
      */
 77  
     public void setJobGroup(String jobGroup) {
 78  0
         this.jobGroup = jobGroup;
 79  0
     }
 80  
 
 81  
     /**
 82  
      * Sets the MessageService
 83  
      * @param messageService the MessageService
 84  
      */
 85  
     @Required
 86  
     public void setMessageService(MessageService messageService) {
 87  0
         this.messageService = messageService;
 88  0
     }
 89  
 
 90  
     /**
 91  
      * Sets the MessageDeliveryService
 92  
      * @param messageDeliveryService the MessageDeliveryService
 93  
      */
 94  
     @Required
 95  
     public void setMessageDeliveryService(MessageDeliveryService messageDeliveryService) {
 96  0
         this.messageDeliveryService = messageDeliveryService;
 97  0
     }
 98  
 
 99  
     /**
 100  
      * Sets whether to perform the processing synchronously
 101  
      * @param sync whether to perform the processing synchronously
 102  
      */
 103  
     public void setSynchronous(boolean sync) {
 104  0
         LOG.debug("Setting synchronous messaging to: " + sync);
 105  0
         this.synchronous = sync;
 106  0
     }
 107  
 
 108  
     /**
 109  
      * Sets the RecipientPreferencesService
 110  
      * @param prefs the RecipientPreferenceService
 111  
      */
 112  
     @Required
 113  
     public void setRecipientPreferenceService(RecipientPreferenceService prefs) {
 114  0
         this.recipientPrefs = prefs;
 115  0
     }
 116  
 
 117  
     /**
 118  
      * @see org.kuali.rice.kcb.service.MessagingService#deliver(org.kuali.rice.kcb.dto.MessageDTO)
 119  
      */
 120  
     @Override
 121  
     public Long deliver(MessageDTO message) throws MessageDeliveryException {
 122  0
         if (message == null) {
 123  0
             throw new RiceIllegalArgumentException("message is null");
 124  
         }
 125  
 
 126  0
         Collection<String> delivererTypes = getDelivererTypesForUserAndChannel(message.getRecipient(), message.getChannel());
 127  0
         LOG.debug("Deliverer types for " + message.getRecipient() + "/" + message.getChannel() + ": " + delivererTypes.size());
 128  
 
 129  0
         if (delivererTypes.isEmpty()) {
 130  
             // no deliverers configured? just skipp it
 131  0
             LOG.debug("No deliverers are configured for " + message.getRecipient() + "/" + message.getChannel());
 132  0
             return null;
 133  
         }
 134  
 
 135  0
         final Message m = new Message();
 136  0
         m.setTitle(message.getTitle());
 137  0
         m.setDeliveryType(message.getDeliveryType());
 138  0
         m.setChannel(message.getChannel());
 139  0
         m.setRecipient(message.getRecipient());
 140  0
         m.setContentType(message.getContentType());
 141  0
         m.setUrl(message.getUrl());
 142  0
         m.setContent(message.getContent());
 143  0
         m.setOriginId(message.getOriginId());
 144  
 
 145  0
         LOG.debug("saving message: " +m);
 146  0
         messageService.saveMessage(m);
 147  
 
 148  0
         for (String type: delivererTypes) {
 149  
             
 150  0
             MessageDelivery delivery = new MessageDelivery();
 151  0
             delivery.setDelivererTypeName(type);
 152  0
             delivery.setMessage(m);
 153  
 
 154  
 //            MessageDeliverer deliverer = delivererRegistry.getDeliverer(delivery);
 155  
 //            if (deliverer != null) {
 156  
 //                deliverer.deliverMessage(delivery);
 157  
 //            }
 158  
         
 159  0
             LOG.debug("saving messagedelivery: " +delivery);
 160  0
             messageDeliveryService.saveMessageDelivery(delivery);
 161  0
         }
 162  
 
 163  0
         LOG.debug("queuing job");
 164  0
         queueJob(MessageProcessingJob.Mode.DELIVER, m.getId(), null, null);
 165  
 
 166  0
         LOG.debug("returning");
 167  0
         return m.getId();
 168  
     }
 169  
 
 170  
     /**
 171  
      * @see org.kuali.rice.kcb.service.MessagingService#remove(long, java.lang.String, java.lang.String)
 172  
      */
 173  
     @Override
 174  
     public void remove(long messageId, String user, String cause) throws MessageDismissalException {
 175  
         /*if (StringUtils.isBlank(messageId)) {
 176  
             throw new RiceIllegalArgumentException("message is null");
 177  
         } if we switch to String id*/
 178  
 
 179  0
         if (StringUtils.isBlank(user)) {
 180  0
             throw new RiceIllegalArgumentException("user is null");
 181  
         }
 182  
 
 183  0
         if (StringUtils.isBlank(cause)) {
 184  0
             throw new RiceIllegalArgumentException("cause is null");
 185  
         }
 186  
 
 187  0
         Message m = messageService.getMessage(Long.valueOf(messageId));
 188  0
         if (m == null) {
 189  0
             throw new MessageDismissalException("No such message: " + messageId);
 190  
         }
 191  
 
 192  0
         remove (m, user, cause);
 193  0
     }
 194  
 
 195  
     /**
 196  
      * @see org.kuali.rice.kcb.service.MessagingService#removeByOriginId(java.lang.String, java.lang.String, java.lang.String)
 197  
      */
 198  
     @Override
 199  
     public Long removeByOriginId(String originId, String user, String cause) throws MessageDismissalException {
 200  0
         if (StringUtils.isBlank(originId)) {
 201  0
             throw new RiceIllegalArgumentException("originId is null");
 202  
         }
 203  
 
 204  0
         Message m = messageService.getMessageByOriginId(originId);
 205  0
         if (m == null) {
 206  0
             return null; 
 207  
             //throw new MessageDismissalException("No such message with origin id: " + originId);
 208  
         }
 209  0
         remove(m, user, cause);
 210  0
         return m.getId();
 211  
     }
 212  
 
 213  
     private void remove(Message message, String user, String cause) {
 214  0
         queueJob(MessageProcessingJob.Mode.REMOVE, message.getId(), user, cause);
 215  0
     }
 216  
 
 217  
     /**
 218  
      * Determines what delivery endpoints the user has configured
 219  
      * @param userRecipientId the user
 220  
      * @return a Set of NotificationConstants.MESSAGE_DELIVERY_TYPES
 221  
      */
 222  
     private Collection<String> getDelivererTypesForUserAndChannel(String userRecipientId, String channel) {
 223  0
         Set<String> deliveryTypes = new HashSet<String>(1);
 224  
         
 225  
         // manually add the default one since they don't have an option on this one
 226  
         //deliveryTypes.add(NotificationConstants.MESSAGE_DELIVERY_TYPES.DEFAULT_MESSAGE_DELIVERY_TYPE);
 227  
         
 228  
         //now look for what they've configured for themselves
 229  0
         Collection<RecipientDelivererConfig> deliverers = recipientPrefs.getDeliverersForRecipientAndChannel(userRecipientId, channel);
 230  
         
 231  0
         for (RecipientDelivererConfig cfg: deliverers) {
 232  0
             deliveryTypes.add(cfg.getDelivererName());
 233  
         }
 234  
         //return GlobalNotificationServiceLocator.getInstance().getKENAPIService().getDeliverersForRecipientAndChannel(userRecipientId, channel);
 235  
 
 236  0
         return deliveryTypes;
 237  
     }
 238  
 
 239  
     private void queueJob(MessageProcessingJob.Mode mode, long messageId, String user, String cause) {
 240  
         // queue up the processing job after the transaction has committed
 241  0
         LOG.debug("registering synchronization");
 242  
 
 243  0
         if (!TransactionSynchronizationManager.isSynchronizationActive()) {
 244  0
                 throw new RiceRuntimeException("transaction syncronization is not active " +
 245  
                                 "(!TransactionSynchronizationManager.isSynchronizationActive())");
 246  0
         } else if (!TransactionSynchronizationManager.isActualTransactionActive()) {
 247  0
                 throw new RiceRuntimeException("actual transaction is not active " +
 248  
                                 "(!TransactionSynchronizationManager.isActualTransactionActive())");
 249  
         }
 250  
 
 251  0
         TransactionSynchronizationManager.registerSynchronization(new QueueProcessingJobSynchronization(
 252  
             jobName,
 253  
             jobGroup,
 254  
             mode,
 255  
             messageId,
 256  
             user,
 257  
             cause,
 258  
             synchronous
 259  
         ));
 260  0
     }
 261  
     
 262  0
     public static class QueueProcessingJobSynchronization extends TransactionSynchronizationAdapter {
 263  0
         private static final Logger LOG = Logger.getLogger(QueueProcessingJobSynchronization.class);
 264  
         private final String jobName;
 265  
         private final String jobGroup;
 266  
         private final MessageProcessingJob.Mode mode;
 267  
         private final long messageId;
 268  
         private final String user;
 269  
         private final String cause;
 270  
         private final boolean synchronous;
 271  
 
 272  0
         private QueueProcessingJobSynchronization(String jobName, String jobGroup, MessageProcessingJob.Mode mode, long messageId, String user, String cause, boolean synchronous) {
 273  0
             this.jobName = jobName;
 274  0
             this.jobGroup = jobGroup;
 275  0
             this.mode = mode;
 276  0
             this.messageId = messageId;
 277  0
             this.user = user;
 278  0
             this.cause = cause;
 279  0
             this.synchronous = synchronous;
 280  0
         }
 281  
 
 282  
         /*
 283  
         @Override
 284  
         public void beforeCommit(boolean readOnly) {
 285  
             super.beforeCommit(readOnly);
 286  
         }*/
 287  
 
 288  
         @Override
 289  
         public void afterCommit() {
 290  0
             scheduleJob();
 291  0
         }
 292  
         /*@Override
 293  
         public void afterCompletion(int status) {
 294  
             if (STATUS_COMMITTED == status) {
 295  
                 scheduleJob();
 296  
             } else {
 297  
                 LOG.error("Status is not committed.  Not scheduling message processing job.");
 298  
             }
 299  
         }*/
 300  
 
 301  
         private void scheduleJob() {
 302  0
             LOG.debug("Queueing processing job");
 303  
             try {
 304  0
                 Scheduler scheduler = KSBServiceLocator.getScheduler();
 305  0
                 if (synchronous) {
 306  0
                     LOG.debug("Invoking job synchronously in Thread " + Thread.currentThread());
 307  0
                     MessageProcessingJob job = new MessageProcessingJob(messageId, mode, user, cause);
 308  0
                     job.run();
 309  0
                 } else {
 310  0
                     String uniqueTriggerName = jobName + "-Trigger-" + System.currentTimeMillis() + Math.random();
 311  0
                     SimpleTrigger trigger = new SimpleTrigger(uniqueTriggerName, jobGroup + "-Trigger");
 312  0
                     LOG.debug("Scheduling trigger: " + trigger);
 313  
 
 314  0
                     JobDataMap data = new JobDataMap();
 315  0
                     data.put("mode", mode.name());
 316  0
                     data.put("user", user);
 317  0
                     data.put("cause", cause);
 318  0
                     data.put("messageId", messageId);
 319  
 
 320  0
                     trigger.setJobName(jobName);
 321  0
                     trigger.setJobGroup(jobGroup);
 322  0
                     trigger.setJobDataMap(data);
 323  0
                     scheduler.scheduleJob(trigger);
 324  
                 }
 325  0
             } catch (SchedulerException se) {
 326  0
                 throw new RuntimeException(se);
 327  0
             }
 328  0
         }
 329  
     }
 330  
 }