View Javadoc

1   /**
2    * Copyright 2005-2012 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.kcb.service.impl;
17  
18  import java.util.Collection;
19  import java.util.HashSet;
20  import java.util.Set;
21  
22  import org.apache.commons.lang.StringUtils;
23  import org.apache.log4j.Logger;
24  import org.kuali.rice.core.api.exception.RiceIllegalArgumentException;
25  import org.kuali.rice.core.api.exception.RiceRuntimeException;
26  import org.kuali.rice.kcb.bo.Message;
27  import org.kuali.rice.kcb.bo.MessageDelivery;
28  import org.kuali.rice.kcb.bo.RecipientDelivererConfig;
29  import org.kuali.rice.kcb.api.message.MessageDTO;
30  import org.kuali.rice.kcb.api.exception.MessageDeliveryException;
31  import org.kuali.rice.kcb.api.exception.MessageDismissalException;
32  import org.kuali.rice.kcb.quartz.MessageProcessingJob;
33  import org.kuali.rice.kcb.service.MessageDeliveryService;
34  import org.kuali.rice.kcb.service.MessageService;
35  import org.kuali.rice.kcb.api.service.MessagingService;
36  import org.kuali.rice.kcb.service.RecipientPreferenceService;
37  import org.kuali.rice.ksb.service.KSBServiceLocator;
38  import org.quartz.JobDataMap;
39  import org.quartz.Scheduler;
40  import org.quartz.SchedulerException;
41  import org.quartz.SimpleTrigger;
42  import org.springframework.beans.factory.annotation.Required;
43  import org.springframework.transaction.support.TransactionSynchronizationAdapter;
44  import org.springframework.transaction.support.TransactionSynchronizationManager;
45  
46  /**
47   * MessagingService implementation 
48   * 
49   * @author Kuali Rice Team (rice.collab@kuali.org)
50   */
51  public class MessagingServiceImpl implements MessagingService {
52      private static final Logger LOG = Logger.getLogger(MessagingServiceImpl.class);
53  
54      private MessageService messageService;
55      private MessageDeliveryService messageDeliveryService;
56      private RecipientPreferenceService recipientPrefs;
57      private String jobName;
58      private String jobGroup;
59  
60      /**
61       * Whether to perform the processing  synchronously
62       */
63      private boolean synchronous;
64      
65      /**
66       * Sets the name of the target job to run to process messages
67       * @param jobName the name of the target job to run to process messages
68       */
69      public void setJobName(String jobName) {
70          this.jobName = jobName;
71      }
72  
73      /**
74       * Sets the group of the target job to run to process messages
75       * @param jobGroup Sets the group of the target job to run to process messages
76       */
77      public void setJobGroup(String jobGroup) {
78          this.jobGroup = jobGroup;
79      }
80  
81      /**
82       * Sets the MessageService
83       * @param messageService the MessageService
84       */
85      @Required
86      public void setMessageService(MessageService messageService) {
87          this.messageService = messageService;
88      }
89  
90      /**
91       * Sets the MessageDeliveryService
92       * @param messageDeliveryService the MessageDeliveryService
93       */
94      @Required
95      public void setMessageDeliveryService(MessageDeliveryService messageDeliveryService) {
96          this.messageDeliveryService = messageDeliveryService;
97      }
98  
99      /**
100      * Sets whether to perform the processing synchronously
101      * @param sync whether to perform the processing synchronously
102      */
103     public void setSynchronous(boolean sync) {
104         LOG.debug("Setting synchronous messaging to: " + sync);
105         this.synchronous = sync;
106     }
107 
108     /**
109      * Sets the RecipientPreferencesService
110      * @param prefs the RecipientPreferenceService
111      */
112     @Required
113     public void setRecipientPreferenceService(RecipientPreferenceService prefs) {
114         this.recipientPrefs = prefs;
115     }
116 
117     /**
118      * @see org.kuali.rice.kcb.service.MessagingService#deliver(org.kuali.rice.kcb.dto.MessageDTO)
119      */
120     @Override
121     public Long deliver(MessageDTO message) throws MessageDeliveryException {
122         if (message == null) {
123             throw new RiceIllegalArgumentException("message is null");
124         }
125 
126         Collection<String> delivererTypes = getDelivererTypesForUserAndChannel(message.getRecipient(), message.getChannel());
127         LOG.debug("Deliverer types for " + message.getRecipient() + "/" + message.getChannel() + ": " + delivererTypes.size());
128 
129         if (delivererTypes.isEmpty()) {
130             // no deliverers configured? just skipp it
131             LOG.debug("No deliverers are configured for " + message.getRecipient() + "/" + message.getChannel());
132             return null;
133         }
134 
135         final Message m = new Message();
136         m.setTitle(message.getTitle());
137         m.setDeliveryType(message.getDeliveryType());
138         m.setChannel(message.getChannel());
139         m.setRecipient(message.getRecipient());
140         m.setContentType(message.getContentType());
141         m.setUrl(message.getUrl());
142         m.setContent(message.getContent());
143         m.setOriginId(message.getOriginId());
144 
145         LOG.debug("saving message: " +m);
146         messageService.saveMessage(m);
147 
148         for (String type: delivererTypes) {
149             
150             MessageDelivery delivery = new MessageDelivery();
151             delivery.setDelivererTypeName(type);
152             delivery.setMessage(m);
153 
154 //            MessageDeliverer deliverer = delivererRegistry.getDeliverer(delivery);
155 //            if (deliverer != null) {
156 //                deliverer.deliverMessage(delivery);
157 //            }
158         
159             LOG.debug("saving messagedelivery: " +delivery);
160             messageDeliveryService.saveMessageDelivery(delivery);
161         }
162 
163         LOG.debug("queuing job");
164         queueJob(MessageProcessingJob.Mode.DELIVER, m.getId(), null, null);
165 
166         LOG.debug("returning");
167         return m.getId();
168     }
169 
170     /**
171      * @see org.kuali.rice.kcb.service.MessagingService#remove(long, java.lang.String, java.lang.String)
172      */
173     @Override
174     public void remove(long messageId, String user, String cause) throws MessageDismissalException {
175         /*if (StringUtils.isBlank(messageId)) {
176             throw new RiceIllegalArgumentException("message is null");
177         } if we switch to String id*/
178 
179         if (StringUtils.isBlank(user)) {
180             throw new RiceIllegalArgumentException("user is null");
181         }
182 
183         if (StringUtils.isBlank(cause)) {
184             throw new RiceIllegalArgumentException("cause is null");
185         }
186 
187         Message m = messageService.getMessage(Long.valueOf(messageId));
188         if (m == null) {
189             throw new MessageDismissalException("No such message: " + messageId);
190         }
191 
192         remove (m, user, cause);
193     }
194 
195     /**
196      * @see org.kuali.rice.kcb.service.MessagingService#removeByOriginId(java.lang.String, java.lang.String, java.lang.String)
197      */
198     @Override
199     public Long removeByOriginId(String originId, String user, String cause) throws MessageDismissalException {
200         if (StringUtils.isBlank(originId)) {
201             throw new RiceIllegalArgumentException("originId is null");
202         }
203 
204         Message m = messageService.getMessageByOriginId(originId);
205         if (m == null) {
206             return null; 
207             //throw new MessageDismissalException("No such message with origin id: " + originId);
208         }
209         remove(m, user, cause);
210         return m.getId();
211     }
212 
213     private void remove(Message message, String user, String cause) {
214         queueJob(MessageProcessingJob.Mode.REMOVE, message.getId(), user, cause);
215     }
216 
217     /**
218      * Determines what delivery endpoints the user has configured
219      * @param userRecipientId the user
220      * @return a Set of NotificationConstants.MESSAGE_DELIVERY_TYPES
221      */
222     private Collection<String> getDelivererTypesForUserAndChannel(String userRecipientId, String channel) {
223         Set<String> deliveryTypes = new HashSet<String>(1);
224         
225         // manually add the default one since they don't have an option on this one
226         //deliveryTypes.add(NotificationConstants.MESSAGE_DELIVERY_TYPES.DEFAULT_MESSAGE_DELIVERY_TYPE);
227         
228         //now look for what they've configured for themselves
229         Collection<RecipientDelivererConfig> deliverers = recipientPrefs.getDeliverersForRecipientAndChannel(userRecipientId, channel);
230         
231         for (RecipientDelivererConfig cfg: deliverers) {
232             deliveryTypes.add(cfg.getDelivererName());
233         }
234         //return GlobalNotificationServiceLocator.getInstance().getKENAPIService().getDeliverersForRecipientAndChannel(userRecipientId, channel);
235 
236         return deliveryTypes;
237     }
238 
239     private void queueJob(MessageProcessingJob.Mode mode, long messageId, String user, String cause) {
240         // queue up the processing job after the transaction has committed
241         LOG.debug("registering synchronization");
242 
243         if (!TransactionSynchronizationManager.isSynchronizationActive()) {
244         	throw new RiceRuntimeException("transaction syncronization is not active " +
245         			"(!TransactionSynchronizationManager.isSynchronizationActive())");
246         } else if (!TransactionSynchronizationManager.isActualTransactionActive()) {
247         	throw new RiceRuntimeException("actual transaction is not active " +
248         			"(!TransactionSynchronizationManager.isActualTransactionActive())");
249         }
250 
251         TransactionSynchronizationManager.registerSynchronization(new QueueProcessingJobSynchronization(
252             jobName,
253             jobGroup,
254             mode,
255             messageId,
256             user,
257             cause,
258             synchronous
259         ));
260     }
261     
262     public static class QueueProcessingJobSynchronization extends TransactionSynchronizationAdapter {
263         private static final Logger LOG = Logger.getLogger(QueueProcessingJobSynchronization.class);
264         private final String jobName;
265         private final String jobGroup;
266         private final MessageProcessingJob.Mode mode;
267         private final long messageId;
268         private final String user;
269         private final String cause;
270         private final boolean synchronous;
271 
272         private QueueProcessingJobSynchronization(String jobName, String jobGroup, MessageProcessingJob.Mode mode, long messageId, String user, String cause, boolean synchronous) {
273             this.jobName = jobName;
274             this.jobGroup = jobGroup;
275             this.mode = mode;
276             this.messageId = messageId;
277             this.user = user;
278             this.cause = cause;
279             this.synchronous = synchronous;
280         }
281 
282         /*
283         @Override
284         public void beforeCommit(boolean readOnly) {
285             super.beforeCommit(readOnly);
286         }*/
287 
288         @Override
289         public void afterCommit() {
290             scheduleJob();
291         }
292         /*@Override
293         public void afterCompletion(int status) {
294             if (STATUS_COMMITTED == status) {
295                 scheduleJob();
296             } else {
297                 LOG.error("Status is not committed.  Not scheduling message processing job.");
298             }
299         }*/
300 
301         private void scheduleJob() {
302             LOG.debug("Queueing processing job");
303             try {
304                 Scheduler scheduler = KSBServiceLocator.getScheduler();
305                 if (synchronous) {
306                     LOG.debug("Invoking job synchronously in Thread " + Thread.currentThread());
307                     MessageProcessingJob job = new MessageProcessingJob(messageId, mode, user, cause);
308                     job.run();
309                 } else {
310                     String uniqueTriggerName = jobName + "-Trigger-" + System.currentTimeMillis() + Math.random();
311                     SimpleTrigger trigger = new SimpleTrigger(uniqueTriggerName, jobGroup + "-Trigger");
312                     LOG.debug("Scheduling trigger: " + trigger);
313 
314                     JobDataMap data = new JobDataMap();
315                     data.put("mode", mode.name());
316                     data.put("user", user);
317                     data.put("cause", cause);
318                     data.put("messageId", messageId);
319 
320                     trigger.setJobName(jobName);
321                     trigger.setJobGroup(jobGroup);
322                     trigger.setJobDataMap(data);
323                     scheduler.scheduleJob(trigger);
324                 }
325             } catch (SchedulerException se) {
326                 throw new RuntimeException(se);
327             }
328         }
329     }
330 }