001 /**
002 * Copyright 2005-2013 The Kuali Foundation
003 *
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/ecl2.php
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package org.kuali.rice.kcb.service.impl;
017
018 import java.util.Collection;
019 import java.util.HashSet;
020 import java.util.Set;
021
022 import org.apache.commons.lang.StringUtils;
023 import org.apache.log4j.Logger;
024 import org.kuali.rice.core.api.exception.RiceIllegalArgumentException;
025 import org.kuali.rice.core.api.exception.RiceRuntimeException;
026 import org.kuali.rice.kcb.bo.Message;
027 import org.kuali.rice.kcb.bo.MessageDelivery;
028 import org.kuali.rice.kcb.bo.RecipientDelivererConfig;
029 import org.kuali.rice.kcb.api.message.MessageDTO;
030 import org.kuali.rice.kcb.api.exception.MessageDeliveryException;
031 import org.kuali.rice.kcb.api.exception.MessageDismissalException;
032 import org.kuali.rice.kcb.quartz.MessageProcessingJob;
033 import org.kuali.rice.kcb.service.MessageDeliveryService;
034 import org.kuali.rice.kcb.service.MessageService;
035 import org.kuali.rice.kcb.api.service.MessagingService;
036 import org.kuali.rice.kcb.service.RecipientPreferenceService;
037 import org.kuali.rice.ksb.service.KSBServiceLocator;
038 import org.quartz.JobDataMap;
039 import org.quartz.Scheduler;
040 import org.quartz.SchedulerException;
041 import org.quartz.SimpleTrigger;
042 import org.springframework.beans.factory.annotation.Required;
043 import org.springframework.transaction.support.TransactionSynchronizationAdapter;
044 import org.springframework.transaction.support.TransactionSynchronizationManager;
045
046 /**
047 * MessagingService implementation
048 *
049 * @author Kuali Rice Team (rice.collab@kuali.org)
050 */
051 public class MessagingServiceImpl implements MessagingService {
052 private static final Logger LOG = Logger.getLogger(MessagingServiceImpl.class);
053
054 private MessageService messageService;
055 private MessageDeliveryService messageDeliveryService;
056 private RecipientPreferenceService recipientPrefs;
057 private String jobName;
058 private String jobGroup;
059
060 /**
061 * Whether to perform the processing synchronously
062 */
063 private boolean synchronous;
064
065 /**
066 * Sets the name of the target job to run to process messages
067 * @param jobName the name of the target job to run to process messages
068 */
069 public void setJobName(String jobName) {
070 this.jobName = jobName;
071 }
072
073 /**
074 * Sets the group of the target job to run to process messages
075 * @param jobGroup Sets the group of the target job to run to process messages
076 */
077 public void setJobGroup(String jobGroup) {
078 this.jobGroup = jobGroup;
079 }
080
081 /**
082 * Sets the MessageService
083 * @param messageService the MessageService
084 */
085 @Required
086 public void setMessageService(MessageService messageService) {
087 this.messageService = messageService;
088 }
089
090 /**
091 * Sets the MessageDeliveryService
092 * @param messageDeliveryService the MessageDeliveryService
093 */
094 @Required
095 public void setMessageDeliveryService(MessageDeliveryService messageDeliveryService) {
096 this.messageDeliveryService = messageDeliveryService;
097 }
098
099 /**
100 * Sets whether to perform the processing synchronously
101 * @param sync whether to perform the processing synchronously
102 */
103 public void setSynchronous(boolean sync) {
104 LOG.debug("Setting synchronous messaging to: " + sync);
105 this.synchronous = sync;
106 }
107
108 /**
109 * Sets the RecipientPreferencesService
110 * @param prefs the RecipientPreferenceService
111 */
112 @Required
113 public void setRecipientPreferenceService(RecipientPreferenceService prefs) {
114 this.recipientPrefs = prefs;
115 }
116
117 /**
118 * @see org.kuali.rice.kcb.service.MessagingService#deliver(org.kuali.rice.kcb.dto.MessageDTO)
119 */
120 @Override
121 public Long deliver(MessageDTO message) throws MessageDeliveryException {
122 if (message == null) {
123 throw new RiceIllegalArgumentException("message is null");
124 }
125
126 Collection<String> delivererTypes = getDelivererTypesForUserAndChannel(message.getRecipient(), message.getChannel());
127 LOG.debug("Deliverer types for " + message.getRecipient() + "/" + message.getChannel() + ": " + delivererTypes.size());
128
129 if (delivererTypes.isEmpty()) {
130 // no deliverers configured? just skipp it
131 LOG.debug("No deliverers are configured for " + message.getRecipient() + "/" + message.getChannel());
132 return null;
133 }
134
135 final Message m = new Message();
136 m.setTitle(message.getTitle());
137 m.setDeliveryType(message.getDeliveryType());
138 m.setChannel(message.getChannel());
139 m.setRecipient(message.getRecipient());
140 m.setContentType(message.getContentType());
141 m.setUrl(message.getUrl());
142 m.setContent(message.getContent());
143 m.setOriginId(message.getOriginId());
144
145 LOG.debug("saving message: " +m);
146 messageService.saveMessage(m);
147
148 for (String type: delivererTypes) {
149
150 MessageDelivery delivery = new MessageDelivery();
151 delivery.setDelivererTypeName(type);
152 delivery.setMessage(m);
153
154 // MessageDeliverer deliverer = delivererRegistry.getDeliverer(delivery);
155 // if (deliverer != null) {
156 // deliverer.deliverMessage(delivery);
157 // }
158
159 LOG.debug("saving messagedelivery: " +delivery);
160 messageDeliveryService.saveMessageDelivery(delivery);
161 }
162
163 LOG.debug("queuing job");
164 queueJob(MessageProcessingJob.Mode.DELIVER, m.getId(), null, null);
165
166 LOG.debug("returning");
167 return m.getId();
168 }
169
170 /**
171 * @see org.kuali.rice.kcb.service.MessagingService#remove(long, java.lang.String, java.lang.String)
172 */
173 @Override
174 public void remove(long messageId, String user, String cause) throws MessageDismissalException {
175 /*if (StringUtils.isBlank(messageId)) {
176 throw new RiceIllegalArgumentException("message is null");
177 } if we switch to String id*/
178
179 if (StringUtils.isBlank(user)) {
180 throw new RiceIllegalArgumentException("user is null");
181 }
182
183 if (StringUtils.isBlank(cause)) {
184 throw new RiceIllegalArgumentException("cause is null");
185 }
186
187 Message m = messageService.getMessage(Long.valueOf(messageId));
188 if (m == null) {
189 throw new MessageDismissalException("No such message: " + messageId);
190 }
191
192 remove (m, user, cause);
193 }
194
195 /**
196 * @see org.kuali.rice.kcb.service.MessagingService#removeByOriginId(java.lang.String, java.lang.String, java.lang.String)
197 */
198 @Override
199 public Long removeByOriginId(String originId, String user, String cause) throws MessageDismissalException {
200 if (StringUtils.isBlank(originId)) {
201 throw new RiceIllegalArgumentException("originId is null");
202 }
203
204 Message m = messageService.getMessageByOriginId(originId);
205 if (m == null) {
206 return null;
207 //throw new MessageDismissalException("No such message with origin id: " + originId);
208 }
209 remove(m, user, cause);
210 return m.getId();
211 }
212
213 private void remove(Message message, String user, String cause) {
214 queueJob(MessageProcessingJob.Mode.REMOVE, message.getId(), user, cause);
215 }
216
217 /**
218 * Determines what delivery endpoints the user has configured
219 * @param userRecipientId the user
220 * @return a Set of NotificationConstants.MESSAGE_DELIVERY_TYPES
221 */
222 private Collection<String> getDelivererTypesForUserAndChannel(String userRecipientId, String channel) {
223 Set<String> deliveryTypes = new HashSet<String>(1);
224
225 // manually add the default one since they don't have an option on this one
226 //deliveryTypes.add(NotificationConstants.MESSAGE_DELIVERY_TYPES.DEFAULT_MESSAGE_DELIVERY_TYPE);
227
228 //now look for what they've configured for themselves
229 Collection<RecipientDelivererConfig> deliverers = recipientPrefs.getDeliverersForRecipientAndChannel(userRecipientId, channel);
230
231 for (RecipientDelivererConfig cfg: deliverers) {
232 deliveryTypes.add(cfg.getDelivererName());
233 }
234 //return GlobalNotificationServiceLocator.getInstance().getKENAPIService().getDeliverersForRecipientAndChannel(userRecipientId, channel);
235
236 return deliveryTypes;
237 }
238
239 private void queueJob(MessageProcessingJob.Mode mode, long messageId, String user, String cause) {
240 // queue up the processing job after the transaction has committed
241 LOG.debug("registering synchronization");
242
243 if (!TransactionSynchronizationManager.isSynchronizationActive()) {
244 throw new RiceRuntimeException("transaction syncronization is not active " +
245 "(!TransactionSynchronizationManager.isSynchronizationActive())");
246 } else if (!TransactionSynchronizationManager.isActualTransactionActive()) {
247 throw new RiceRuntimeException("actual transaction is not active " +
248 "(!TransactionSynchronizationManager.isActualTransactionActive())");
249 }
250
251 TransactionSynchronizationManager.registerSynchronization(new QueueProcessingJobSynchronization(
252 jobName,
253 jobGroup,
254 mode,
255 messageId,
256 user,
257 cause,
258 synchronous
259 ));
260 }
261
262 public static class QueueProcessingJobSynchronization extends TransactionSynchronizationAdapter {
263 private static final Logger LOG = Logger.getLogger(QueueProcessingJobSynchronization.class);
264 private final String jobName;
265 private final String jobGroup;
266 private final MessageProcessingJob.Mode mode;
267 private final long messageId;
268 private final String user;
269 private final String cause;
270 private final boolean synchronous;
271
272 private QueueProcessingJobSynchronization(String jobName, String jobGroup, MessageProcessingJob.Mode mode, long messageId, String user, String cause, boolean synchronous) {
273 this.jobName = jobName;
274 this.jobGroup = jobGroup;
275 this.mode = mode;
276 this.messageId = messageId;
277 this.user = user;
278 this.cause = cause;
279 this.synchronous = synchronous;
280 }
281
282 /*
283 @Override
284 public void beforeCommit(boolean readOnly) {
285 super.beforeCommit(readOnly);
286 }*/
287
288 @Override
289 public void afterCommit() {
290 scheduleJob();
291 }
292 /*@Override
293 public void afterCompletion(int status) {
294 if (STATUS_COMMITTED == status) {
295 scheduleJob();
296 } else {
297 LOG.error("Status is not committed. Not scheduling message processing job.");
298 }
299 }*/
300
301 private void scheduleJob() {
302 LOG.debug("Queueing processing job");
303 try {
304 Scheduler scheduler = KSBServiceLocator.getScheduler();
305 if (synchronous) {
306 LOG.debug("Invoking job synchronously in Thread " + Thread.currentThread());
307 MessageProcessingJob job = new MessageProcessingJob(messageId, mode, user, cause);
308 job.run();
309 } else {
310 String uniqueTriggerName = jobName + "-Trigger-" + System.currentTimeMillis() + Math.random();
311 SimpleTrigger trigger = new SimpleTrigger(uniqueTriggerName, jobGroup + "-Trigger");
312 LOG.debug("Scheduling trigger: " + trigger);
313
314 JobDataMap data = new JobDataMap();
315 data.put("mode", mode.name());
316 data.put("user", user);
317 data.put("cause", cause);
318 data.put("messageId", messageId);
319
320 trigger.setJobName(jobName);
321 trigger.setJobGroup(jobGroup);
322 trigger.setJobDataMap(data);
323 scheduler.scheduleJob(trigger);
324 }
325 } catch (SchedulerException se) {
326 throw new RuntimeException(se);
327 }
328 }
329 }
330 }