1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.kcb.service.impl;
17
18 import java.util.Collection;
19 import java.util.HashSet;
20 import java.util.Set;
21
22 import org.apache.log4j.Logger;
23 import org.kuali.rice.core.exception.RiceRuntimeException;
24 import org.kuali.rice.kcb.bo.Message;
25 import org.kuali.rice.kcb.bo.MessageDelivery;
26 import org.kuali.rice.kcb.bo.RecipientDelivererConfig;
27 import org.kuali.rice.kcb.dto.MessageDTO;
28 import org.kuali.rice.kcb.exception.MessageDeliveryException;
29 import org.kuali.rice.kcb.exception.MessageDismissalException;
30 import org.kuali.rice.kcb.quartz.MessageProcessingJob;
31 import org.kuali.rice.kcb.service.MessageDeliveryService;
32 import org.kuali.rice.kcb.service.MessageService;
33 import org.kuali.rice.kcb.service.MessagingService;
34 import org.kuali.rice.kcb.service.RecipientPreferenceService;
35 import org.kuali.rice.ksb.service.KSBServiceLocator;
36 import org.quartz.JobDataMap;
37 import org.quartz.Scheduler;
38 import org.quartz.SchedulerException;
39 import org.quartz.SimpleTrigger;
40 import org.springframework.beans.factory.annotation.Required;
41 import org.springframework.transaction.support.TransactionSynchronizationAdapter;
42 import org.springframework.transaction.support.TransactionSynchronizationManager;
43
44
45
46
47
48
49 public class MessagingServiceImpl implements MessagingService {
50 private static final Logger LOG = Logger.getLogger(MessagingServiceImpl.class);
51
52 private MessageService messageService;
53 private MessageDeliveryService messageDeliveryService;
54 private RecipientPreferenceService recipientPrefs;
55 private String jobName;
56 private String jobGroup;
57
58
59
60
61 private boolean synchronous;
62
63
64
65
66
67 public void setJobName(String jobName) {
68 this.jobName = jobName;
69 }
70
71
72
73
74
75 public void setJobGroup(String jobGroup) {
76 this.jobGroup = jobGroup;
77 }
78
79
80
81
82
83 @Required
84 public void setMessageService(MessageService messageService) {
85 this.messageService = messageService;
86 }
87
88
89
90
91
92 @Required
93 public void setMessageDeliveryService(MessageDeliveryService messageDeliveryService) {
94 this.messageDeliveryService = messageDeliveryService;
95 }
96
97
98
99
100
101 public void setSynchronous(boolean sync) {
102 LOG.debug("Setting synchronous messaging to: " + sync);
103 this.synchronous = sync;
104 }
105
106
107
108
109
110 @Required
111 public void setRecipientPreferenceService(RecipientPreferenceService prefs) {
112 this.recipientPrefs = prefs;
113 }
114
115
116
117
118 public Long deliver(MessageDTO message) throws MessageDeliveryException {
119 Collection<String> delivererTypes = getDelivererTypesForUserAndChannel(message.getRecipient(), message.getChannel());
120 LOG.debug("Deliverer types for " + message.getRecipient() + "/" + message.getChannel() + ": " + delivererTypes.size());
121
122 if (delivererTypes.size() == 0) {
123
124 LOG.debug("No deliverers are configured for " + message.getRecipient() + "/" + message.getChannel());
125 return null;
126 }
127
128 final Message m = new Message();
129 m.setTitle(message.getTitle());
130 m.setDeliveryType(message.getDeliveryType());
131 m.setChannel(message.getChannel());
132 m.setRecipient(message.getRecipient());
133 m.setContentType(message.getContentType());
134 m.setUrl(message.getUrl());
135 m.setContent(message.getContent());
136 m.setOriginId(message.getOriginId());
137
138 LOG.debug("saving message: " +m);
139 messageService.saveMessage(m);
140
141 for (String type: delivererTypes) {
142
143 MessageDelivery delivery = new MessageDelivery();
144 delivery.setDelivererTypeName(type);
145 delivery.setMessage(m);
146
147
148
149
150
151
152 LOG.debug("saving messagedelivery: " +delivery);
153 messageDeliveryService.saveMessageDelivery(delivery);
154 }
155
156 LOG.debug("queuing job");
157 queueJob(MessageProcessingJob.Mode.DELIVER, m.getId(), null, null);
158
159 LOG.debug("returning");
160 return m.getId();
161 }
162
163
164
165
166 public void remove(long messageId, String user, String cause) throws MessageDismissalException {
167 Message m = messageService.getMessage(Long.valueOf(messageId));
168 if (m == null) {
169 throw new MessageDismissalException("No such message: " + messageId);
170 }
171
172 remove (m, user, cause);
173 }
174
175
176
177
178 public Long removeByOriginId(String originId, String user, String cause) throws MessageDismissalException {
179 Message m = messageService.getMessageByOriginId(originId);
180 if (m == null) {
181 return null;
182
183 }
184 remove(m, user, cause);
185 return m.getId();
186 }
187
188 private void remove(Message message, String user, String cause) {
189 queueJob(MessageProcessingJob.Mode.REMOVE, message.getId(), user, cause);
190 }
191
192
193
194
195
196
197 private Collection<String> getDelivererTypesForUserAndChannel(String userRecipientId, String channel) {
198 Set<String> deliveryTypes = new HashSet<String>(1);
199
200
201
202
203
204 Collection<RecipientDelivererConfig> deliverers = recipientPrefs.getDeliverersForRecipientAndChannel(userRecipientId, channel);
205
206 for (RecipientDelivererConfig cfg: deliverers) {
207 deliveryTypes.add(cfg.getDelivererName());
208 }
209
210
211 return deliveryTypes;
212 }
213
214 private void queueJob(MessageProcessingJob.Mode mode, long messageId, String user, String cause) {
215
216 LOG.debug("registering synchronization");
217
218 if (!TransactionSynchronizationManager.isSynchronizationActive()) {
219 throw new RiceRuntimeException("transaction syncronization is not active " +
220 "(!TransactionSynchronizationManager.isSynchronizationActive())");
221 } else if (!TransactionSynchronizationManager.isActualTransactionActive()) {
222 throw new RiceRuntimeException("actual transaction is not active " +
223 "(!TransactionSynchronizationManager.isActualTransactionActive())");
224 }
225
226 TransactionSynchronizationManager.registerSynchronization(new QueueProcessingJobSynchronization(
227 jobName,
228 jobGroup,
229 mode,
230 messageId,
231 user,
232 cause,
233 synchronous
234 ));
235 }
236
237 public static class QueueProcessingJobSynchronization extends TransactionSynchronizationAdapter {
238 private static final Logger LOG = Logger.getLogger(QueueProcessingJobSynchronization.class);
239 private final String jobName;
240 private final String jobGroup;
241 private final MessageProcessingJob.Mode mode;
242 private final long messageId;
243 private final String user;
244 private final String cause;
245 private final boolean synchronous;
246
247 private QueueProcessingJobSynchronization(String jobName, String jobGroup, MessageProcessingJob.Mode mode, long messageId, String user, String cause, boolean synchronous) {
248 this.jobName = jobName;
249 this.jobGroup = jobGroup;
250 this.mode = mode;
251 this.messageId = messageId;
252 this.user = user;
253 this.cause = cause;
254 this.synchronous = synchronous;
255 }
256
257
258
259
260
261
262
263 @Override
264 public void afterCommit() {
265 scheduleJob();
266 }
267
268
269
270
271
272
273
274
275
276 private void scheduleJob() {
277 LOG.debug("Queueing processing job");
278 try {
279 Scheduler scheduler = KSBServiceLocator.getScheduler();
280 if (synchronous) {
281 LOG.debug("Invoking job synchronously in Thread " + Thread.currentThread());
282 MessageProcessingJob job = new MessageProcessingJob(messageId, mode, user, cause);
283 job.run();
284 } else {
285 String uniqueTriggerName = jobName + "-Trigger-" + System.currentTimeMillis() + Math.random();
286 SimpleTrigger trigger = new SimpleTrigger(uniqueTriggerName, jobGroup + "-Trigger");
287 LOG.debug("Scheduling trigger: " + trigger);
288
289 JobDataMap data = new JobDataMap();
290 data.put("mode", mode.name());
291 data.put("user", user);
292 data.put("cause", cause);
293 data.put("messageId", messageId);
294
295 trigger.setJobName(jobName);
296 trigger.setJobGroup(jobGroup);
297 trigger.setJobDataMap(data);
298 scheduler.scheduleJob(trigger);
299 }
300 } catch (SchedulerException se) {
301 throw new RuntimeException(se);
302 }
303 }
304 }
305 }