001/** 002 * Copyright 2005-2016 The Kuali Foundation 003 * 004 * Licensed under the Educational Community License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/ecl2.php 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.kuali.rice.kcb.service.impl; 017 018import java.util.Collection; 019import java.util.HashSet; 020import java.util.Set; 021 022import org.apache.commons.lang.StringUtils; 023import org.apache.log4j.Logger; 024import org.kuali.rice.core.api.exception.RiceIllegalArgumentException; 025import org.kuali.rice.core.api.exception.RiceRuntimeException; 026import org.kuali.rice.kcb.bo.Message; 027import org.kuali.rice.kcb.bo.MessageDelivery; 028import org.kuali.rice.kcb.bo.RecipientDelivererConfig; 029import org.kuali.rice.kcb.api.message.MessageDTO; 030import org.kuali.rice.kcb.api.exception.MessageDeliveryException; 031import org.kuali.rice.kcb.api.exception.MessageDismissalException; 032import org.kuali.rice.kcb.quartz.MessageProcessingJob; 033import org.kuali.rice.kcb.service.MessageDeliveryService; 034import org.kuali.rice.kcb.service.MessageService; 035import org.kuali.rice.kcb.api.service.MessagingService; 036import org.kuali.rice.kcb.service.RecipientPreferenceService; 037import org.kuali.rice.ksb.service.KSBServiceLocator; 038import org.quartz.JobDataMap; 039import org.quartz.Scheduler; 040import org.quartz.SchedulerException; 041import org.quartz.SimpleTrigger; 042import org.springframework.beans.factory.annotation.Required; 043import org.springframework.transaction.support.TransactionSynchronizationAdapter; 044import org.springframework.transaction.support.TransactionSynchronizationManager; 045 046/** 047 * MessagingService implementation 048 * 049 * @author Kuali Rice Team (rice.collab@kuali.org) 050 */ 051public class MessagingServiceImpl implements MessagingService { 052 private static final Logger LOG = Logger.getLogger(MessagingServiceImpl.class); 053 054 private MessageService messageService; 055 private MessageDeliveryService messageDeliveryService; 056 private RecipientPreferenceService recipientPrefs; 057 private String jobName; 058 private String jobGroup; 059 060 /** 061 * Whether to perform the processing synchronously 062 */ 063 private boolean synchronous; 064 065 /** 066 * Sets the name of the target job to run to process messages 067 * @param jobName the name of the target job to run to process messages 068 */ 069 public void setJobName(String jobName) { 070 this.jobName = jobName; 071 } 072 073 /** 074 * Sets the group of the target job to run to process messages 075 * @param jobGroup Sets the group of the target job to run to process messages 076 */ 077 public void setJobGroup(String jobGroup) { 078 this.jobGroup = jobGroup; 079 } 080 081 /** 082 * Sets the MessageService 083 * @param messageService the MessageService 084 */ 085 @Required 086 public void setMessageService(MessageService messageService) { 087 this.messageService = messageService; 088 } 089 090 /** 091 * Sets the MessageDeliveryService 092 * @param messageDeliveryService the MessageDeliveryService 093 */ 094 @Required 095 public void setMessageDeliveryService(MessageDeliveryService messageDeliveryService) { 096 this.messageDeliveryService = messageDeliveryService; 097 } 098 099 /** 100 * Sets whether to perform the processing synchronously 101 * @param sync whether to perform the processing synchronously 102 */ 103 public void setSynchronous(boolean sync) { 104 LOG.debug("Setting synchronous messaging to: " + sync); 105 this.synchronous = sync; 106 } 107 108 /** 109 * Sets the RecipientPreferencesService 110 * @param prefs the RecipientPreferenceService 111 */ 112 @Required 113 public void setRecipientPreferenceService(RecipientPreferenceService prefs) { 114 this.recipientPrefs = prefs; 115 } 116 117 /** 118 * @see org.kuali.rice.kcb.service.MessagingService#deliver(org.kuali.rice.kcb.dto.MessageDTO) 119 */ 120 @Override 121 public Long deliver(MessageDTO message) throws MessageDeliveryException { 122 if (message == null) { 123 throw new RiceIllegalArgumentException("message is null"); 124 } 125 126 Collection<String> delivererTypes = getDelivererTypesForUserAndChannel(message.getRecipient(), message.getChannel()); 127 LOG.debug("Deliverer types for " + message.getRecipient() + "/" + message.getChannel() + ": " + delivererTypes.size()); 128 129 if (delivererTypes.isEmpty()) { 130 // no deliverers configured? just skipp it 131 LOG.debug("No deliverers are configured for " + message.getRecipient() + "/" + message.getChannel()); 132 return null; 133 } 134 135 Message m = new Message(); 136 m.setTitle(message.getTitle()); 137 m.setDeliveryType(message.getDeliveryType()); 138 m.setChannel(message.getChannel()); 139 m.setRecipient(message.getRecipient()); 140 m.setContentType(message.getContentType()); 141 m.setUrl(message.getUrl()); 142 m.setContent(message.getContent()); 143 m.setOriginId(message.getOriginId()); 144 145 LOG.debug("saving message: " +m); 146 m = messageService.saveMessage(m); 147 148 for (String type: delivererTypes) { 149 150 MessageDelivery delivery = new MessageDelivery(); 151 delivery.setDelivererTypeName(type); 152 delivery.setMessage(m); 153 154// MessageDeliverer deliverer = delivererRegistry.getDeliverer(delivery); 155// if (deliverer != null) { 156// deliverer.deliverMessage(delivery); 157// } 158 159 LOG.debug("saving messagedelivery: " +delivery); 160 messageDeliveryService.saveMessageDelivery(delivery); 161 } 162 163 LOG.debug("queuing job"); 164 queueJob(MessageProcessingJob.Mode.DELIVER, m.getId(), null, null); 165 166 LOG.debug("returning"); 167 return m.getId(); 168 } 169 170 /** 171 * @see org.kuali.rice.kcb.service.MessagingService#remove(long, java.lang.String, java.lang.String) 172 */ 173 @Override 174 public void remove(long messageId, String user, String cause) throws MessageDismissalException { 175 /*if (StringUtils.isBlank(messageId)) { 176 throw new RiceIllegalArgumentException("message is null"); 177 } if we switch to String id*/ 178 179 if (StringUtils.isBlank(user)) { 180 throw new RiceIllegalArgumentException("user is null"); 181 } 182 183 if (StringUtils.isBlank(cause)) { 184 throw new RiceIllegalArgumentException("cause is null"); 185 } 186 187 Message m = messageService.getMessage(Long.valueOf(messageId)); 188 if (m == null) { 189 throw new MessageDismissalException("No such message: " + messageId); 190 } 191 192 remove (m, user, cause); 193 } 194 195 /** 196 * @see org.kuali.rice.kcb.service.MessagingService#removeByOriginId(java.lang.String, java.lang.String, java.lang.String) 197 */ 198 @Override 199 public Long removeByOriginId(String originId, String user, String cause) throws MessageDismissalException { 200 if (StringUtils.isBlank(originId)) { 201 throw new RiceIllegalArgumentException("originId is null"); 202 } 203 204 Message m = messageService.getMessageByOriginId(originId); 205 if (m == null) { 206 return null; 207 //throw new MessageDismissalException("No such message with origin id: " + originId); 208 } 209 remove(m, user, cause); 210 return m.getId(); 211 } 212 213 private void remove(Message message, String user, String cause) { 214 queueJob(MessageProcessingJob.Mode.REMOVE, message.getId(), user, cause); 215 } 216 217 /** 218 * Determines what delivery endpoints the user has configured 219 * @param userRecipientId the user 220 * @return a Set of NotificationConstants.MESSAGE_DELIVERY_TYPES 221 */ 222 private Collection<String> getDelivererTypesForUserAndChannel(String userRecipientId, String channel) { 223 Set<String> deliveryTypes = new HashSet<String>(1); 224 225 // manually add the default one since they don't have an option on this one 226 //deliveryTypes.add(NotificationConstants.MESSAGE_DELIVERY_TYPES.DEFAULT_MESSAGE_DELIVERY_TYPE); 227 228 //now look for what they've configured for themselves 229 Collection<RecipientDelivererConfig> deliverers = recipientPrefs.getDeliverersForRecipientAndChannel(userRecipientId, channel); 230 231 for (RecipientDelivererConfig cfg: deliverers) { 232 deliveryTypes.add(cfg.getDelivererName()); 233 } 234 //return GlobalNotificationServiceLocator.getInstance().getKENAPIService().getDeliverersForRecipientAndChannel(userRecipientId, channel); 235 236 return deliveryTypes; 237 } 238 239 private void queueJob(MessageProcessingJob.Mode mode, long messageId, String user, String cause) { 240 // queue up the processing job after the transaction has committed 241 LOG.debug("registering synchronization"); 242 243 if (!TransactionSynchronizationManager.isSynchronizationActive()) { 244 throw new RiceRuntimeException("transaction syncronization is not active " + 245 "(!TransactionSynchronizationManager.isSynchronizationActive())"); 246 } else if (!TransactionSynchronizationManager.isActualTransactionActive()) { 247 throw new RiceRuntimeException("actual transaction is not active " + 248 "(!TransactionSynchronizationManager.isActualTransactionActive())"); 249 } 250 251 TransactionSynchronizationManager.registerSynchronization(new QueueProcessingJobSynchronization( 252 jobName, 253 jobGroup, 254 mode, 255 messageId, 256 user, 257 cause, 258 synchronous 259 )); 260 } 261 262 public static class QueueProcessingJobSynchronization extends TransactionSynchronizationAdapter { 263 private static final Logger LOG = Logger.getLogger(QueueProcessingJobSynchronization.class); 264 private final String jobName; 265 private final String jobGroup; 266 private final MessageProcessingJob.Mode mode; 267 private final long messageId; 268 private final String user; 269 private final String cause; 270 private final boolean synchronous; 271 272 private QueueProcessingJobSynchronization(String jobName, String jobGroup, MessageProcessingJob.Mode mode, long messageId, String user, String cause, boolean synchronous) { 273 this.jobName = jobName; 274 this.jobGroup = jobGroup; 275 this.mode = mode; 276 this.messageId = messageId; 277 this.user = user; 278 this.cause = cause; 279 this.synchronous = synchronous; 280 } 281 282 /* 283 @Override 284 public void beforeCommit(boolean readOnly) { 285 super.beforeCommit(readOnly); 286 }*/ 287 288 @Override 289 public void afterCommit() { 290 scheduleJob(); 291 } 292 /*@Override 293 public void afterCompletion(int status) { 294 if (STATUS_COMMITTED == status) { 295 scheduleJob(); 296 } else { 297 LOG.error("Status is not committed. Not scheduling message processing job."); 298 } 299 }*/ 300 301 private void scheduleJob() { 302 LOG.debug("Queueing processing job"); 303 try { 304 Scheduler scheduler = KSBServiceLocator.getScheduler(); 305 if (synchronous) { 306 LOG.debug("Invoking job synchronously in Thread " + Thread.currentThread()); 307 MessageProcessingJob job = new MessageProcessingJob(messageId, mode, user, cause); 308 job.run(); 309 } else { 310 String uniqueTriggerName = jobName + "-Trigger-" + System.currentTimeMillis() + Math.random(); 311 SimpleTrigger trigger = new SimpleTrigger(uniqueTriggerName, jobGroup + "-Trigger"); 312 LOG.debug("Scheduling trigger: " + trigger); 313 314 JobDataMap data = new JobDataMap(); 315 data.put("mode", mode.name()); 316 data.put("user", user); 317 data.put("cause", cause); 318 data.put("messageId", messageId); 319 320 trigger.setJobName(jobName); 321 trigger.setJobGroup(jobGroup); 322 trigger.setJobDataMap(data); 323 scheduler.scheduleJob(trigger); 324 } 325 } catch (SchedulerException se) { 326 throw new RuntimeException(se); 327 } 328 } 329 } 330}