View Javadoc

1   /*
2    * Copyright 2007-2008 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.kcb.service.impl;
17  
18  import java.util.Collection;
19  import java.util.HashSet;
20  import java.util.Set;
21  
22  import org.apache.log4j.Logger;
23  import org.kuali.rice.core.exception.RiceRuntimeException;
24  import org.kuali.rice.kcb.bo.Message;
25  import org.kuali.rice.kcb.bo.MessageDelivery;
26  import org.kuali.rice.kcb.bo.RecipientDelivererConfig;
27  import org.kuali.rice.kcb.dto.MessageDTO;
28  import org.kuali.rice.kcb.exception.MessageDeliveryException;
29  import org.kuali.rice.kcb.exception.MessageDismissalException;
30  import org.kuali.rice.kcb.quartz.MessageProcessingJob;
31  import org.kuali.rice.kcb.service.MessageDeliveryService;
32  import org.kuali.rice.kcb.service.MessageService;
33  import org.kuali.rice.kcb.service.MessagingService;
34  import org.kuali.rice.kcb.service.RecipientPreferenceService;
35  import org.kuali.rice.ksb.service.KSBServiceLocator;
36  import org.quartz.JobDataMap;
37  import org.quartz.Scheduler;
38  import org.quartz.SchedulerException;
39  import org.quartz.SimpleTrigger;
40  import org.springframework.beans.factory.annotation.Required;
41  import org.springframework.transaction.support.TransactionSynchronizationAdapter;
42  import org.springframework.transaction.support.TransactionSynchronizationManager;
43  
44  /**
45   * MessagingService implementation 
46   * 
47   * @author Kuali Rice Team (rice.collab@kuali.org)
48   */
49  public class MessagingServiceImpl implements MessagingService {
50      private static final Logger LOG = Logger.getLogger(MessagingServiceImpl.class);
51  
52      private MessageService messageService;
53      private MessageDeliveryService messageDeliveryService;
54      private RecipientPreferenceService recipientPrefs;
55      private String jobName;
56      private String jobGroup;
57  
58      /**
59       * Whether to perform the processing  synchronously
60       */
61      private boolean synchronous;
62      
63      /**
64       * Sets the name of the target job to run to process messages
65       * @param jobName the name of the target job to run to process messages
66       */
67      public void setJobName(String jobName) {
68          this.jobName = jobName;
69      }
70  
71      /**
72       * Sets the group of the target job to run to process messages
73       * @param jobGroup Sets the group of the target job to run to process messages
74       */
75      public void setJobGroup(String jobGroup) {
76          this.jobGroup = jobGroup;
77      }
78  
79      /**
80       * Sets the MessageService
81       * @param messageService the MessageService
82       */
83      @Required
84      public void setMessageService(MessageService messageService) {
85          this.messageService = messageService;
86      }
87  
88      /**
89       * Sets the MessageDeliveryService
90       * @param messageDeliveryService the MessageDeliveryService
91       */
92      @Required
93      public void setMessageDeliveryService(MessageDeliveryService messageDeliveryService) {
94          this.messageDeliveryService = messageDeliveryService;
95      }
96  
97      /**
98       * Sets whether to perform the processing synchronously
99       * @param sync whether to perform the processing synchronously
100      */
101     public void setSynchronous(boolean sync) {
102         LOG.debug("Setting synchronous messaging to: " + sync);
103         this.synchronous = sync;
104     }
105 
106     /**
107      * Sets the RecipientPreferencesService
108      * @param prefs the RecipientPreferenceService
109      */
110     @Required
111     public void setRecipientPreferenceService(RecipientPreferenceService prefs) {
112         this.recipientPrefs = prefs;
113     }
114 
115     /**
116      * @see org.kuali.rice.kcb.service.MessagingService#deliver(org.kuali.rice.kcb.dto.MessageDTO)
117      */
118     public Long deliver(MessageDTO message) throws MessageDeliveryException {
119         Collection<String> delivererTypes = getDelivererTypesForUserAndChannel(message.getRecipient(), message.getChannel());
120         LOG.debug("Deliverer types for " + message.getRecipient() + "/" + message.getChannel() + ": " + delivererTypes.size());
121 
122         if (delivererTypes.size() == 0) {
123             // no deliverers configured? just skipp it
124             LOG.debug("No deliverers are configured for " + message.getRecipient() + "/" + message.getChannel());
125             return null;
126         }
127 
128         final Message m = new Message();
129         m.setTitle(message.getTitle());
130         m.setDeliveryType(message.getDeliveryType());
131         m.setChannel(message.getChannel());
132         m.setRecipient(message.getRecipient());
133         m.setContentType(message.getContentType());
134         m.setUrl(message.getUrl());
135         m.setContent(message.getContent());
136         m.setOriginId(message.getOriginId());
137 
138         LOG.debug("saving message: " +m);
139         messageService.saveMessage(m);
140 
141         for (String type: delivererTypes) {
142             
143             MessageDelivery delivery = new MessageDelivery();
144             delivery.setDelivererTypeName(type);
145             delivery.setMessage(m);
146 
147 //            MessageDeliverer deliverer = delivererRegistry.getDeliverer(delivery);
148 //            if (deliverer != null) {
149 //                deliverer.deliverMessage(delivery);
150 //            }
151         
152             LOG.debug("saving messagedelivery: " +delivery);
153             messageDeliveryService.saveMessageDelivery(delivery);
154         }
155 
156         LOG.debug("queuing job");
157         queueJob(MessageProcessingJob.Mode.DELIVER, m.getId(), null, null);
158 
159         LOG.debug("returning");
160         return m.getId();
161     }
162 
163     /**
164      * @see org.kuali.rice.kcb.service.MessagingService#remove(long, java.lang.String, java.lang.String)
165      */
166     public void remove(long messageId, String user, String cause) throws MessageDismissalException {
167         Message m = messageService.getMessage(Long.valueOf(messageId));
168         if (m == null) {
169             throw new MessageDismissalException("No such message: " + messageId);
170         }
171 
172         remove (m, user, cause);
173     }
174 
175     /**
176      * @see org.kuali.rice.kcb.service.MessagingService#removeByOriginId(java.lang.String, java.lang.String, java.lang.String)
177      */
178     public Long removeByOriginId(String originId, String user, String cause) throws MessageDismissalException {
179         Message m = messageService.getMessageByOriginId(originId);
180         if (m == null) {
181             return null; 
182             //throw new MessageDismissalException("No such message with origin id: " + originId);
183         }
184         remove(m, user, cause);
185         return m.getId();
186     }
187 
188     private void remove(Message message, String user, String cause) {
189         queueJob(MessageProcessingJob.Mode.REMOVE, message.getId(), user, cause);
190     }
191 
192     /**
193      * Determines what delivery endpoints the user has configured
194      * @param userRecipientId the user
195      * @return a Set of NotificationConstants.MESSAGE_DELIVERY_TYPES
196      */
197     private Collection<String> getDelivererTypesForUserAndChannel(String userRecipientId, String channel) {
198         Set<String> deliveryTypes = new HashSet<String>(1);
199         
200         // manually add the default one since they don't have an option on this one
201         //deliveryTypes.add(NotificationConstants.MESSAGE_DELIVERY_TYPES.DEFAULT_MESSAGE_DELIVERY_TYPE);
202         
203         //now look for what they've configured for themselves
204         Collection<RecipientDelivererConfig> deliverers = recipientPrefs.getDeliverersForRecipientAndChannel(userRecipientId, channel);
205         
206         for (RecipientDelivererConfig cfg: deliverers) {
207             deliveryTypes.add(cfg.getDelivererName());
208         }
209         //return GlobalNotificationServiceLocator.getInstance().getKENAPIService().getDeliverersForRecipientAndChannel(userRecipientId, channel);
210 
211         return deliveryTypes;
212     }
213 
214     private void queueJob(MessageProcessingJob.Mode mode, long messageId, String user, String cause) {
215         // queue up the processing job after the transaction has committed
216         LOG.debug("registering synchronization");
217 
218         if (!TransactionSynchronizationManager.isSynchronizationActive()) {
219         	throw new RiceRuntimeException("transaction syncronization is not active " + 
220         			"(!TransactionSynchronizationManager.isSynchronizationActive())");
221         } else if (!TransactionSynchronizationManager.isActualTransactionActive()) {
222         	throw new RiceRuntimeException("actual transaction is not active " +
223         			"(!TransactionSynchronizationManager.isActualTransactionActive())");
224         }
225 
226         TransactionSynchronizationManager.registerSynchronization(new QueueProcessingJobSynchronization(
227             jobName,
228             jobGroup,
229             mode,
230             messageId,
231             user,
232             cause,
233             synchronous
234         ));
235     }
236     
237     public static class QueueProcessingJobSynchronization extends TransactionSynchronizationAdapter {
238         private static final Logger LOG = Logger.getLogger(QueueProcessingJobSynchronization.class);
239         private final String jobName;
240         private final String jobGroup;
241         private final MessageProcessingJob.Mode mode;
242         private final long messageId;
243         private final String user;
244         private final String cause;
245         private final boolean synchronous;
246 
247         private QueueProcessingJobSynchronization(String jobName, String jobGroup, MessageProcessingJob.Mode mode, long messageId, String user, String cause, boolean synchronous) {
248             this.jobName = jobName;
249             this.jobGroup = jobGroup;
250             this.mode = mode;
251             this.messageId = messageId;
252             this.user = user;
253             this.cause = cause;
254             this.synchronous = synchronous;
255         }
256 
257         /*
258         @Override
259         public void beforeCommit(boolean readOnly) {
260             super.beforeCommit(readOnly);
261         }*/
262 
263         @Override
264         public void afterCommit() {
265             scheduleJob();
266         }
267         /*@Override
268         public void afterCompletion(int status) {
269             if (STATUS_COMMITTED == status) {
270                 scheduleJob();
271             } else {
272                 LOG.error("Status is not committed.  Not scheduling message processing job.");
273             }
274         }*/
275 
276         private void scheduleJob() {
277             LOG.debug("Queueing processing job");
278             try {
279                 Scheduler scheduler = KSBServiceLocator.getScheduler();
280                 if (synchronous) {
281                     LOG.debug("Invoking job synchronously in Thread " + Thread.currentThread());
282                     MessageProcessingJob job = new MessageProcessingJob(messageId, mode, user, cause);
283                     job.run();
284                 } else {
285                     String uniqueTriggerName = jobName + "-Trigger-" + System.currentTimeMillis() + Math.random();
286                     SimpleTrigger trigger = new SimpleTrigger(uniqueTriggerName, jobGroup + "-Trigger");
287                     LOG.debug("Scheduling trigger: " + trigger);
288 
289                     JobDataMap data = new JobDataMap();
290                     data.put("mode", mode.name());
291                     data.put("user", user);
292                     data.put("cause", cause);
293                     data.put("messageId", messageId);
294 
295                     trigger.setJobName(jobName);
296                     trigger.setJobGroup(jobGroup);
297                     trigger.setJobDataMap(data);
298                     scheduler.scheduleJob(trigger);
299                 }
300             } catch (SchedulerException se) {
301                 throw new RuntimeException(se);
302             }
303         }
304     }
305 }