1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.kcb.service.impl;
17
18 import java.util.Collection;
19 import java.util.HashSet;
20 import java.util.Set;
21
22 import org.apache.commons.lang.StringUtils;
23 import org.apache.log4j.Logger;
24 import org.kuali.rice.core.api.exception.RiceIllegalArgumentException;
25 import org.kuali.rice.core.api.exception.RiceRuntimeException;
26 import org.kuali.rice.kcb.bo.Message;
27 import org.kuali.rice.kcb.bo.MessageDelivery;
28 import org.kuali.rice.kcb.bo.RecipientDelivererConfig;
29 import org.kuali.rice.kcb.api.message.MessageDTO;
30 import org.kuali.rice.kcb.api.exception.MessageDeliveryException;
31 import org.kuali.rice.kcb.api.exception.MessageDismissalException;
32 import org.kuali.rice.kcb.quartz.MessageProcessingJob;
33 import org.kuali.rice.kcb.service.MessageDeliveryService;
34 import org.kuali.rice.kcb.service.MessageService;
35 import org.kuali.rice.kcb.api.service.MessagingService;
36 import org.kuali.rice.kcb.service.RecipientPreferenceService;
37 import org.kuali.rice.ksb.service.KSBServiceLocator;
38 import org.quartz.JobDataMap;
39 import org.quartz.Scheduler;
40 import org.quartz.SchedulerException;
41 import org.quartz.SimpleTrigger;
42 import org.springframework.beans.factory.annotation.Required;
43 import org.springframework.transaction.support.TransactionSynchronizationAdapter;
44 import org.springframework.transaction.support.TransactionSynchronizationManager;
45
46
47
48
49
50
51 public class MessagingServiceImpl implements MessagingService {
52 private static final Logger LOG = Logger.getLogger(MessagingServiceImpl.class);
53
54 private MessageService messageService;
55 private MessageDeliveryService messageDeliveryService;
56 private RecipientPreferenceService recipientPrefs;
57 private String jobName;
58 private String jobGroup;
59
60
61
62
63 private boolean synchronous;
64
65
66
67
68
69 public void setJobName(String jobName) {
70 this.jobName = jobName;
71 }
72
73
74
75
76
77 public void setJobGroup(String jobGroup) {
78 this.jobGroup = jobGroup;
79 }
80
81
82
83
84
85 @Required
86 public void setMessageService(MessageService messageService) {
87 this.messageService = messageService;
88 }
89
90
91
92
93
94 @Required
95 public void setMessageDeliveryService(MessageDeliveryService messageDeliveryService) {
96 this.messageDeliveryService = messageDeliveryService;
97 }
98
99
100
101
102
103 public void setSynchronous(boolean sync) {
104 LOG.debug("Setting synchronous messaging to: " + sync);
105 this.synchronous = sync;
106 }
107
108
109
110
111
112 @Required
113 public void setRecipientPreferenceService(RecipientPreferenceService prefs) {
114 this.recipientPrefs = prefs;
115 }
116
117
118
119
120 @Override
121 public Long deliver(MessageDTO message) throws MessageDeliveryException {
122 if (message == null) {
123 throw new RiceIllegalArgumentException("message is null");
124 }
125
126 Collection<String> delivererTypes = getDelivererTypesForUserAndChannel(message.getRecipient(), message.getChannel());
127 LOG.debug("Deliverer types for " + message.getRecipient() + "/" + message.getChannel() + ": " + delivererTypes.size());
128
129 if (delivererTypes.isEmpty()) {
130
131 LOG.debug("No deliverers are configured for " + message.getRecipient() + "/" + message.getChannel());
132 return null;
133 }
134
135 Message m = new Message();
136 m.setTitle(message.getTitle());
137 m.setDeliveryType(message.getDeliveryType());
138 m.setChannel(message.getChannel());
139 m.setRecipient(message.getRecipient());
140 m.setContentType(message.getContentType());
141 m.setUrl(message.getUrl());
142 m.setContent(message.getContent());
143 m.setOriginId(message.getOriginId());
144
145 LOG.debug("saving message: " +m);
146 m = messageService.saveMessage(m);
147
148 for (String type: delivererTypes) {
149
150 MessageDelivery delivery = new MessageDelivery();
151 delivery.setDelivererTypeName(type);
152 delivery.setMessage(m);
153
154
155
156
157
158
159 LOG.debug("saving messagedelivery: " +delivery);
160 messageDeliveryService.saveMessageDelivery(delivery);
161 }
162
163 LOG.debug("queuing job");
164 queueJob(MessageProcessingJob.Mode.DELIVER, m.getId(), null, null);
165
166 LOG.debug("returning");
167 return m.getId();
168 }
169
170
171
172
173 @Override
174 public void remove(long messageId, String user, String cause) throws MessageDismissalException {
175
176
177
178
179 if (StringUtils.isBlank(user)) {
180 throw new RiceIllegalArgumentException("user is null");
181 }
182
183 if (StringUtils.isBlank(cause)) {
184 throw new RiceIllegalArgumentException("cause is null");
185 }
186
187 Message m = messageService.getMessage(Long.valueOf(messageId));
188 if (m == null) {
189 throw new MessageDismissalException("No such message: " + messageId);
190 }
191
192 remove (m, user, cause);
193 }
194
195
196
197
198 @Override
199 public Long removeByOriginId(String originId, String user, String cause) throws MessageDismissalException {
200 if (StringUtils.isBlank(originId)) {
201 throw new RiceIllegalArgumentException("originId is null");
202 }
203
204 Message m = messageService.getMessageByOriginId(originId);
205 if (m == null) {
206 return null;
207
208 }
209 remove(m, user, cause);
210 return m.getId();
211 }
212
213 private void remove(Message message, String user, String cause) {
214 queueJob(MessageProcessingJob.Mode.REMOVE, message.getId(), user, cause);
215 }
216
217
218
219
220
221
222 private Collection<String> getDelivererTypesForUserAndChannel(String userRecipientId, String channel) {
223 Set<String> deliveryTypes = new HashSet<String>(1);
224
225
226
227
228
229 Collection<RecipientDelivererConfig> deliverers = recipientPrefs.getDeliverersForRecipientAndChannel(userRecipientId, channel);
230
231 for (RecipientDelivererConfig cfg: deliverers) {
232 deliveryTypes.add(cfg.getDelivererName());
233 }
234
235
236 return deliveryTypes;
237 }
238
239 private void queueJob(MessageProcessingJob.Mode mode, long messageId, String user, String cause) {
240
241 LOG.debug("registering synchronization");
242
243 if (!TransactionSynchronizationManager.isSynchronizationActive()) {
244 throw new RiceRuntimeException("transaction syncronization is not active " +
245 "(!TransactionSynchronizationManager.isSynchronizationActive())");
246 } else if (!TransactionSynchronizationManager.isActualTransactionActive()) {
247 throw new RiceRuntimeException("actual transaction is not active " +
248 "(!TransactionSynchronizationManager.isActualTransactionActive())");
249 }
250
251 TransactionSynchronizationManager.registerSynchronization(new QueueProcessingJobSynchronization(
252 jobName,
253 jobGroup,
254 mode,
255 messageId,
256 user,
257 cause,
258 synchronous
259 ));
260 }
261
262 public static class QueueProcessingJobSynchronization extends TransactionSynchronizationAdapter {
263 private static final Logger LOG = Logger.getLogger(QueueProcessingJobSynchronization.class);
264 private final String jobName;
265 private final String jobGroup;
266 private final MessageProcessingJob.Mode mode;
267 private final long messageId;
268 private final String user;
269 private final String cause;
270 private final boolean synchronous;
271
272 private QueueProcessingJobSynchronization(String jobName, String jobGroup, MessageProcessingJob.Mode mode, long messageId, String user, String cause, boolean synchronous) {
273 this.jobName = jobName;
274 this.jobGroup = jobGroup;
275 this.mode = mode;
276 this.messageId = messageId;
277 this.user = user;
278 this.cause = cause;
279 this.synchronous = synchronous;
280 }
281
282
283
284
285
286
287
288 @Override
289 public void afterCommit() {
290 scheduleJob();
291 }
292
293
294
295
296
297
298
299
300
301 private void scheduleJob() {
302 LOG.debug("Queueing processing job");
303 try {
304 Scheduler scheduler = KSBServiceLocator.getScheduler();
305 if (synchronous) {
306 LOG.debug("Invoking job synchronously in Thread " + Thread.currentThread());
307 MessageProcessingJob job = new MessageProcessingJob(messageId, mode, user, cause);
308 job.run();
309 } else {
310 String uniqueTriggerName = jobName + "-Trigger-" + System.currentTimeMillis() + Math.random();
311 SimpleTrigger trigger = new SimpleTrigger(uniqueTriggerName, jobGroup + "-Trigger");
312 LOG.debug("Scheduling trigger: " + trigger);
313
314 JobDataMap data = new JobDataMap();
315 data.put("mode", mode.name());
316 data.put("user", user);
317 data.put("cause", cause);
318 data.put("messageId", messageId);
319
320 trigger.setJobName(jobName);
321 trigger.setJobGroup(jobGroup);
322 trigger.setJobDataMap(data);
323 scheduler.scheduleJob(trigger);
324 }
325 } catch (SchedulerException se) {
326 throw new RuntimeException(se);
327 }
328 }
329 }
330 }