001/**
002 * Copyright 2005-2016 The Kuali Foundation
003 *
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/ecl2.php
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.kuali.rice.kcb.service.impl;
017
018import java.util.Collection;
019import java.util.HashSet;
020import java.util.Set;
021
022import org.apache.commons.lang.StringUtils;
023import org.apache.log4j.Logger;
024import org.kuali.rice.core.api.exception.RiceIllegalArgumentException;
025import org.kuali.rice.core.api.exception.RiceRuntimeException;
026import org.kuali.rice.kcb.bo.Message;
027import org.kuali.rice.kcb.bo.MessageDelivery;
028import org.kuali.rice.kcb.bo.RecipientDelivererConfig;
029import org.kuali.rice.kcb.api.message.MessageDTO;
030import org.kuali.rice.kcb.api.exception.MessageDeliveryException;
031import org.kuali.rice.kcb.api.exception.MessageDismissalException;
032import org.kuali.rice.kcb.quartz.MessageProcessingJob;
033import org.kuali.rice.kcb.service.MessageDeliveryService;
034import org.kuali.rice.kcb.service.MessageService;
035import org.kuali.rice.kcb.api.service.MessagingService;
036import org.kuali.rice.kcb.service.RecipientPreferenceService;
037import org.kuali.rice.ksb.service.KSBServiceLocator;
038import org.quartz.JobDataMap;
039import org.quartz.Scheduler;
040import org.quartz.SchedulerException;
041import org.quartz.SimpleTrigger;
042import org.springframework.beans.factory.annotation.Required;
043import org.springframework.transaction.support.TransactionSynchronizationAdapter;
044import org.springframework.transaction.support.TransactionSynchronizationManager;
045
046/**
047 * MessagingService implementation 
048 * 
049 * @author Kuali Rice Team (rice.collab@kuali.org)
050 */
051public class MessagingServiceImpl implements MessagingService {
052    private static final Logger LOG = Logger.getLogger(MessagingServiceImpl.class);
053
054    private MessageService messageService;
055    private MessageDeliveryService messageDeliveryService;
056    private RecipientPreferenceService recipientPrefs;
057    private String jobName;
058    private String jobGroup;
059
060    /**
061     * Whether to perform the processing  synchronously
062     */
063    private boolean synchronous;
064    
065    /**
066     * Sets the name of the target job to run to process messages
067     * @param jobName the name of the target job to run to process messages
068     */
069    public void setJobName(String jobName) {
070        this.jobName = jobName;
071    }
072
073    /**
074     * Sets the group of the target job to run to process messages
075     * @param jobGroup Sets the group of the target job to run to process messages
076     */
077    public void setJobGroup(String jobGroup) {
078        this.jobGroup = jobGroup;
079    }
080
081    /**
082     * Sets the MessageService
083     * @param messageService the MessageService
084     */
085    @Required
086    public void setMessageService(MessageService messageService) {
087        this.messageService = messageService;
088    }
089
090    /**
091     * Sets the MessageDeliveryService
092     * @param messageDeliveryService the MessageDeliveryService
093     */
094    @Required
095    public void setMessageDeliveryService(MessageDeliveryService messageDeliveryService) {
096        this.messageDeliveryService = messageDeliveryService;
097    }
098
099    /**
100     * Sets whether to perform the processing synchronously
101     * @param sync whether to perform the processing synchronously
102     */
103    public void setSynchronous(boolean sync) {
104        LOG.debug("Setting synchronous messaging to: " + sync);
105        this.synchronous = sync;
106    }
107
108    /**
109     * Sets the RecipientPreferencesService
110     * @param prefs the RecipientPreferenceService
111     */
112    @Required
113    public void setRecipientPreferenceService(RecipientPreferenceService prefs) {
114        this.recipientPrefs = prefs;
115    }
116
117    /**
118     * @see org.kuali.rice.kcb.service.MessagingService#deliver(org.kuali.rice.kcb.dto.MessageDTO)
119     */
120    @Override
121    public Long deliver(MessageDTO message) throws MessageDeliveryException {
122        if (message == null) {
123            throw new RiceIllegalArgumentException("message is null");
124        }
125
126        Collection<String> delivererTypes = getDelivererTypesForUserAndChannel(message.getRecipient(), message.getChannel());
127        LOG.debug("Deliverer types for " + message.getRecipient() + "/" + message.getChannel() + ": " + delivererTypes.size());
128
129        if (delivererTypes.isEmpty()) {
130            // no deliverers configured? just skipp it
131            LOG.debug("No deliverers are configured for " + message.getRecipient() + "/" + message.getChannel());
132            return null;
133        }
134
135        Message m = new Message();
136        m.setTitle(message.getTitle());
137        m.setDeliveryType(message.getDeliveryType());
138        m.setChannel(message.getChannel());
139        m.setRecipient(message.getRecipient());
140        m.setContentType(message.getContentType());
141        m.setUrl(message.getUrl());
142        m.setContent(message.getContent());
143        m.setOriginId(message.getOriginId());
144
145        LOG.debug("saving message: " +m);
146        m = messageService.saveMessage(m);
147
148        for (String type: delivererTypes) {
149            
150            MessageDelivery delivery = new MessageDelivery();
151            delivery.setDelivererTypeName(type);
152            delivery.setMessage(m);
153
154//            MessageDeliverer deliverer = delivererRegistry.getDeliverer(delivery);
155//            if (deliverer != null) {
156//                deliverer.deliverMessage(delivery);
157//            }
158        
159            LOG.debug("saving messagedelivery: " +delivery);
160            messageDeliveryService.saveMessageDelivery(delivery);
161        }
162
163        LOG.debug("queuing job");
164        queueJob(MessageProcessingJob.Mode.DELIVER, m.getId(), null, null);
165
166        LOG.debug("returning");
167        return m.getId();
168    }
169
170    /**
171     * @see org.kuali.rice.kcb.service.MessagingService#remove(long, java.lang.String, java.lang.String)
172     */
173    @Override
174    public void remove(long messageId, String user, String cause) throws MessageDismissalException {
175        /*if (StringUtils.isBlank(messageId)) {
176            throw new RiceIllegalArgumentException("message is null");
177        } if we switch to String id*/
178
179        if (StringUtils.isBlank(user)) {
180            throw new RiceIllegalArgumentException("user is null");
181        }
182
183        if (StringUtils.isBlank(cause)) {
184            throw new RiceIllegalArgumentException("cause is null");
185        }
186
187        Message m = messageService.getMessage(Long.valueOf(messageId));
188        if (m == null) {
189            throw new MessageDismissalException("No such message: " + messageId);
190        }
191
192        remove (m, user, cause);
193    }
194
195    /**
196     * @see org.kuali.rice.kcb.service.MessagingService#removeByOriginId(java.lang.String, java.lang.String, java.lang.String)
197     */
198    @Override
199    public Long removeByOriginId(String originId, String user, String cause) throws MessageDismissalException {
200        if (StringUtils.isBlank(originId)) {
201            throw new RiceIllegalArgumentException("originId is null");
202        }
203
204        Message m = messageService.getMessageByOriginId(originId);
205        if (m == null) {
206            return null; 
207            //throw new MessageDismissalException("No such message with origin id: " + originId);
208        }
209        remove(m, user, cause);
210        return m.getId();
211    }
212
213    private void remove(Message message, String user, String cause) {
214        queueJob(MessageProcessingJob.Mode.REMOVE, message.getId(), user, cause);
215    }
216
217    /**
218     * Determines what delivery endpoints the user has configured
219     * @param userRecipientId the user
220     * @return a Set of NotificationConstants.MESSAGE_DELIVERY_TYPES
221     */
222    private Collection<String> getDelivererTypesForUserAndChannel(String userRecipientId, String channel) {
223        Set<String> deliveryTypes = new HashSet<String>(1);
224        
225        // manually add the default one since they don't have an option on this one
226        //deliveryTypes.add(NotificationConstants.MESSAGE_DELIVERY_TYPES.DEFAULT_MESSAGE_DELIVERY_TYPE);
227        
228        //now look for what they've configured for themselves
229        Collection<RecipientDelivererConfig> deliverers = recipientPrefs.getDeliverersForRecipientAndChannel(userRecipientId, channel);
230        
231        for (RecipientDelivererConfig cfg: deliverers) {
232            deliveryTypes.add(cfg.getDelivererName());
233        }
234        //return GlobalNotificationServiceLocator.getInstance().getKENAPIService().getDeliverersForRecipientAndChannel(userRecipientId, channel);
235
236        return deliveryTypes;
237    }
238
239    private void queueJob(MessageProcessingJob.Mode mode, long messageId, String user, String cause) {
240        // queue up the processing job after the transaction has committed
241        LOG.debug("registering synchronization");
242
243        if (!TransactionSynchronizationManager.isSynchronizationActive()) {
244                throw new RiceRuntimeException("transaction syncronization is not active " +
245                                "(!TransactionSynchronizationManager.isSynchronizationActive())");
246        } else if (!TransactionSynchronizationManager.isActualTransactionActive()) {
247                throw new RiceRuntimeException("actual transaction is not active " +
248                                "(!TransactionSynchronizationManager.isActualTransactionActive())");
249        }
250
251        TransactionSynchronizationManager.registerSynchronization(new QueueProcessingJobSynchronization(
252            jobName,
253            jobGroup,
254            mode,
255            messageId,
256            user,
257            cause,
258            synchronous
259        ));
260    }
261    
262    public static class QueueProcessingJobSynchronization extends TransactionSynchronizationAdapter {
263        private static final Logger LOG = Logger.getLogger(QueueProcessingJobSynchronization.class);
264        private final String jobName;
265        private final String jobGroup;
266        private final MessageProcessingJob.Mode mode;
267        private final long messageId;
268        private final String user;
269        private final String cause;
270        private final boolean synchronous;
271
272        private QueueProcessingJobSynchronization(String jobName, String jobGroup, MessageProcessingJob.Mode mode, long messageId, String user, String cause, boolean synchronous) {
273            this.jobName = jobName;
274            this.jobGroup = jobGroup;
275            this.mode = mode;
276            this.messageId = messageId;
277            this.user = user;
278            this.cause = cause;
279            this.synchronous = synchronous;
280        }
281
282        /*
283        @Override
284        public void beforeCommit(boolean readOnly) {
285            super.beforeCommit(readOnly);
286        }*/
287
288        @Override
289        public void afterCommit() {
290            scheduleJob();
291        }
292        /*@Override
293        public void afterCompletion(int status) {
294            if (STATUS_COMMITTED == status) {
295                scheduleJob();
296            } else {
297                LOG.error("Status is not committed.  Not scheduling message processing job.");
298            }
299        }*/
300
301        private void scheduleJob() {
302            LOG.debug("Queueing processing job");
303            try {
304                Scheduler scheduler = KSBServiceLocator.getScheduler();
305                if (synchronous) {
306                    LOG.debug("Invoking job synchronously in Thread " + Thread.currentThread());
307                    MessageProcessingJob job = new MessageProcessingJob(messageId, mode, user, cause);
308                    job.run();
309                } else {
310                    String uniqueTriggerName = jobName + "-Trigger-" + System.currentTimeMillis() + Math.random();
311                    SimpleTrigger trigger = new SimpleTrigger(uniqueTriggerName, jobGroup + "-Trigger");
312                    LOG.debug("Scheduling trigger: " + trigger);
313
314                    JobDataMap data = new JobDataMap();
315                    data.put("mode", mode.name());
316                    data.put("user", user);
317                    data.put("cause", cause);
318                    data.put("messageId", messageId);
319
320                    trigger.setJobName(jobName);
321                    trigger.setJobGroup(jobGroup);
322                    trigger.setJobDataMap(data);
323                    scheduler.scheduleJob(trigger);
324                }
325            } catch (SchedulerException se) {
326                throw new RuntimeException(se);
327            }
328        }
329    }
330}