001    /**
002     * Copyright 2005-2013 The Kuali Foundation
003     *
004     * Licensed under the Educational Community License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.opensource.org/licenses/ecl2.php
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package org.kuali.rice.kcb.service.impl;
017    
018    import java.util.Collection;
019    import java.util.HashSet;
020    import java.util.Set;
021    
022    import org.apache.commons.lang.StringUtils;
023    import org.apache.log4j.Logger;
024    import org.kuali.rice.core.api.exception.RiceIllegalArgumentException;
025    import org.kuali.rice.core.api.exception.RiceRuntimeException;
026    import org.kuali.rice.kcb.bo.Message;
027    import org.kuali.rice.kcb.bo.MessageDelivery;
028    import org.kuali.rice.kcb.bo.RecipientDelivererConfig;
029    import org.kuali.rice.kcb.api.message.MessageDTO;
030    import org.kuali.rice.kcb.api.exception.MessageDeliveryException;
031    import org.kuali.rice.kcb.api.exception.MessageDismissalException;
032    import org.kuali.rice.kcb.quartz.MessageProcessingJob;
033    import org.kuali.rice.kcb.service.MessageDeliveryService;
034    import org.kuali.rice.kcb.service.MessageService;
035    import org.kuali.rice.kcb.api.service.MessagingService;
036    import org.kuali.rice.kcb.service.RecipientPreferenceService;
037    import org.kuali.rice.ksb.service.KSBServiceLocator;
038    import org.quartz.JobDataMap;
039    import org.quartz.Scheduler;
040    import org.quartz.SchedulerException;
041    import org.quartz.SimpleTrigger;
042    import org.springframework.beans.factory.annotation.Required;
043    import org.springframework.transaction.support.TransactionSynchronizationAdapter;
044    import org.springframework.transaction.support.TransactionSynchronizationManager;
045    
046    /**
047     * MessagingService implementation 
048     * 
049     * @author Kuali Rice Team (rice.collab@kuali.org)
050     */
051    public class MessagingServiceImpl implements MessagingService {
052        private static final Logger LOG = Logger.getLogger(MessagingServiceImpl.class);
053    
054        private MessageService messageService;
055        private MessageDeliveryService messageDeliveryService;
056        private RecipientPreferenceService recipientPrefs;
057        private String jobName;
058        private String jobGroup;
059    
060        /**
061         * Whether to perform the processing  synchronously
062         */
063        private boolean synchronous;
064        
065        /**
066         * Sets the name of the target job to run to process messages
067         * @param jobName the name of the target job to run to process messages
068         */
069        public void setJobName(String jobName) {
070            this.jobName = jobName;
071        }
072    
073        /**
074         * Sets the group of the target job to run to process messages
075         * @param jobGroup Sets the group of the target job to run to process messages
076         */
077        public void setJobGroup(String jobGroup) {
078            this.jobGroup = jobGroup;
079        }
080    
081        /**
082         * Sets the MessageService
083         * @param messageService the MessageService
084         */
085        @Required
086        public void setMessageService(MessageService messageService) {
087            this.messageService = messageService;
088        }
089    
090        /**
091         * Sets the MessageDeliveryService
092         * @param messageDeliveryService the MessageDeliveryService
093         */
094        @Required
095        public void setMessageDeliveryService(MessageDeliveryService messageDeliveryService) {
096            this.messageDeliveryService = messageDeliveryService;
097        }
098    
099        /**
100         * Sets whether to perform the processing synchronously
101         * @param sync whether to perform the processing synchronously
102         */
103        public void setSynchronous(boolean sync) {
104            LOG.debug("Setting synchronous messaging to: " + sync);
105            this.synchronous = sync;
106        }
107    
108        /**
109         * Sets the RecipientPreferencesService
110         * @param prefs the RecipientPreferenceService
111         */
112        @Required
113        public void setRecipientPreferenceService(RecipientPreferenceService prefs) {
114            this.recipientPrefs = prefs;
115        }
116    
117        /**
118         * @see org.kuali.rice.kcb.service.MessagingService#deliver(org.kuali.rice.kcb.dto.MessageDTO)
119         */
120        @Override
121        public Long deliver(MessageDTO message) throws MessageDeliveryException {
122            if (message == null) {
123                throw new RiceIllegalArgumentException("message is null");
124            }
125    
126            Collection<String> delivererTypes = getDelivererTypesForUserAndChannel(message.getRecipient(), message.getChannel());
127            LOG.debug("Deliverer types for " + message.getRecipient() + "/" + message.getChannel() + ": " + delivererTypes.size());
128    
129            if (delivererTypes.isEmpty()) {
130                // no deliverers configured? just skipp it
131                LOG.debug("No deliverers are configured for " + message.getRecipient() + "/" + message.getChannel());
132                return null;
133            }
134    
135            final Message m = new Message();
136            m.setTitle(message.getTitle());
137            m.setDeliveryType(message.getDeliveryType());
138            m.setChannel(message.getChannel());
139            m.setRecipient(message.getRecipient());
140            m.setContentType(message.getContentType());
141            m.setUrl(message.getUrl());
142            m.setContent(message.getContent());
143            m.setOriginId(message.getOriginId());
144    
145            LOG.debug("saving message: " +m);
146            messageService.saveMessage(m);
147    
148            for (String type: delivererTypes) {
149                
150                MessageDelivery delivery = new MessageDelivery();
151                delivery.setDelivererTypeName(type);
152                delivery.setMessage(m);
153    
154    //            MessageDeliverer deliverer = delivererRegistry.getDeliverer(delivery);
155    //            if (deliverer != null) {
156    //                deliverer.deliverMessage(delivery);
157    //            }
158            
159                LOG.debug("saving messagedelivery: " +delivery);
160                messageDeliveryService.saveMessageDelivery(delivery);
161            }
162    
163            LOG.debug("queuing job");
164            queueJob(MessageProcessingJob.Mode.DELIVER, m.getId(), null, null);
165    
166            LOG.debug("returning");
167            return m.getId();
168        }
169    
170        /**
171         * @see org.kuali.rice.kcb.service.MessagingService#remove(long, java.lang.String, java.lang.String)
172         */
173        @Override
174        public void remove(long messageId, String user, String cause) throws MessageDismissalException {
175            /*if (StringUtils.isBlank(messageId)) {
176                throw new RiceIllegalArgumentException("message is null");
177            } if we switch to String id*/
178    
179            if (StringUtils.isBlank(user)) {
180                throw new RiceIllegalArgumentException("user is null");
181            }
182    
183            if (StringUtils.isBlank(cause)) {
184                throw new RiceIllegalArgumentException("cause is null");
185            }
186    
187            Message m = messageService.getMessage(Long.valueOf(messageId));
188            if (m == null) {
189                throw new MessageDismissalException("No such message: " + messageId);
190            }
191    
192            remove (m, user, cause);
193        }
194    
195        /**
196         * @see org.kuali.rice.kcb.service.MessagingService#removeByOriginId(java.lang.String, java.lang.String, java.lang.String)
197         */
198        @Override
199        public Long removeByOriginId(String originId, String user, String cause) throws MessageDismissalException {
200            if (StringUtils.isBlank(originId)) {
201                throw new RiceIllegalArgumentException("originId is null");
202            }
203    
204            Message m = messageService.getMessageByOriginId(originId);
205            if (m == null) {
206                return null; 
207                //throw new MessageDismissalException("No such message with origin id: " + originId);
208            }
209            remove(m, user, cause);
210            return m.getId();
211        }
212    
213        private void remove(Message message, String user, String cause) {
214            queueJob(MessageProcessingJob.Mode.REMOVE, message.getId(), user, cause);
215        }
216    
217        /**
218         * Determines what delivery endpoints the user has configured
219         * @param userRecipientId the user
220         * @return a Set of NotificationConstants.MESSAGE_DELIVERY_TYPES
221         */
222        private Collection<String> getDelivererTypesForUserAndChannel(String userRecipientId, String channel) {
223            Set<String> deliveryTypes = new HashSet<String>(1);
224            
225            // manually add the default one since they don't have an option on this one
226            //deliveryTypes.add(NotificationConstants.MESSAGE_DELIVERY_TYPES.DEFAULT_MESSAGE_DELIVERY_TYPE);
227            
228            //now look for what they've configured for themselves
229            Collection<RecipientDelivererConfig> deliverers = recipientPrefs.getDeliverersForRecipientAndChannel(userRecipientId, channel);
230            
231            for (RecipientDelivererConfig cfg: deliverers) {
232                deliveryTypes.add(cfg.getDelivererName());
233            }
234            //return GlobalNotificationServiceLocator.getInstance().getKENAPIService().getDeliverersForRecipientAndChannel(userRecipientId, channel);
235    
236            return deliveryTypes;
237        }
238    
239        private void queueJob(MessageProcessingJob.Mode mode, long messageId, String user, String cause) {
240            // queue up the processing job after the transaction has committed
241            LOG.debug("registering synchronization");
242    
243            if (!TransactionSynchronizationManager.isSynchronizationActive()) {
244                    throw new RiceRuntimeException("transaction syncronization is not active " +
245                                    "(!TransactionSynchronizationManager.isSynchronizationActive())");
246            } else if (!TransactionSynchronizationManager.isActualTransactionActive()) {
247                    throw new RiceRuntimeException("actual transaction is not active " +
248                                    "(!TransactionSynchronizationManager.isActualTransactionActive())");
249            }
250    
251            TransactionSynchronizationManager.registerSynchronization(new QueueProcessingJobSynchronization(
252                jobName,
253                jobGroup,
254                mode,
255                messageId,
256                user,
257                cause,
258                synchronous
259            ));
260        }
261        
262        public static class QueueProcessingJobSynchronization extends TransactionSynchronizationAdapter {
263            private static final Logger LOG = Logger.getLogger(QueueProcessingJobSynchronization.class);
264            private final String jobName;
265            private final String jobGroup;
266            private final MessageProcessingJob.Mode mode;
267            private final long messageId;
268            private final String user;
269            private final String cause;
270            private final boolean synchronous;
271    
272            private QueueProcessingJobSynchronization(String jobName, String jobGroup, MessageProcessingJob.Mode mode, long messageId, String user, String cause, boolean synchronous) {
273                this.jobName = jobName;
274                this.jobGroup = jobGroup;
275                this.mode = mode;
276                this.messageId = messageId;
277                this.user = user;
278                this.cause = cause;
279                this.synchronous = synchronous;
280            }
281    
282            /*
283            @Override
284            public void beforeCommit(boolean readOnly) {
285                super.beforeCommit(readOnly);
286            }*/
287    
288            @Override
289            public void afterCommit() {
290                scheduleJob();
291            }
292            /*@Override
293            public void afterCompletion(int status) {
294                if (STATUS_COMMITTED == status) {
295                    scheduleJob();
296                } else {
297                    LOG.error("Status is not committed.  Not scheduling message processing job.");
298                }
299            }*/
300    
301            private void scheduleJob() {
302                LOG.debug("Queueing processing job");
303                try {
304                    Scheduler scheduler = KSBServiceLocator.getScheduler();
305                    if (synchronous) {
306                        LOG.debug("Invoking job synchronously in Thread " + Thread.currentThread());
307                        MessageProcessingJob job = new MessageProcessingJob(messageId, mode, user, cause);
308                        job.run();
309                    } else {
310                        String uniqueTriggerName = jobName + "-Trigger-" + System.currentTimeMillis() + Math.random();
311                        SimpleTrigger trigger = new SimpleTrigger(uniqueTriggerName, jobGroup + "-Trigger");
312                        LOG.debug("Scheduling trigger: " + trigger);
313    
314                        JobDataMap data = new JobDataMap();
315                        data.put("mode", mode.name());
316                        data.put("user", user);
317                        data.put("cause", cause);
318                        data.put("messageId", messageId);
319    
320                        trigger.setJobName(jobName);
321                        trigger.setJobGroup(jobGroup);
322                        trigger.setJobDataMap(data);
323                        scheduler.scheduleJob(trigger);
324                    }
325                } catch (SchedulerException se) {
326                    throw new RuntimeException(se);
327                }
328            }
329        }
330    }