001 /**
002 * Copyright 2005-2014 The Kuali Foundation
003 *
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/ecl2.php
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package org.kuali.rice.kcb.quartz;
017
018 import org.apache.log4j.Logger;
019 import org.kuali.rice.core.framework.persistence.dao.GenericDao;
020 import org.kuali.rice.kcb.bo.Message;
021 import org.kuali.rice.kcb.bo.MessageDelivery;
022 import org.kuali.rice.kcb.bo.MessageDeliveryStatus;
023 import org.kuali.rice.kcb.deliverer.BulkMessageDeliverer;
024 import org.kuali.rice.kcb.deliverer.MessageDeliverer;
025 import org.kuali.rice.kcb.api.exception.MessageDeliveryProcessingException;
026 import org.kuali.rice.kcb.quartz.ProcessingResult.Failure;
027 import org.kuali.rice.kcb.service.GlobalKCBServiceLocator;
028 import org.kuali.rice.kcb.service.MessageDelivererRegistryService;
029 import org.kuali.rice.kcb.service.MessageDeliveryService;
030 import org.kuali.rice.kcb.service.MessageService;
031 import org.kuali.rice.krad.data.DataObjectService;
032 import org.kuali.rice.krad.service.KRADServiceLocator;
033 import org.quartz.JobExecutionContext;
034 import org.quartz.JobExecutionException;
035 import org.quartz.StatefulJob;
036 import org.springframework.beans.factory.annotation.Required;
037
038 import java.util.ArrayList;
039 import java.util.Collection;
040 import java.util.HashMap;
041 import java.util.HashSet;
042 import java.util.LinkedList;
043 import java.util.List;
044 import java.util.Map;
045 import java.util.Set;
046
047 /**
048 * Job that delivers messages to endpoints. This job is not really stateful,
049 * but should not be executed concurrently.
050 *
051 * @author Kuali Rice Team (rice.collab@kuali.org)
052 */
053 public class MessageProcessingJob extends ConcurrentJob<MessageDelivery> implements StatefulJob {
054 public static final String NAME = "MessageProcessingJobDetail";
055 public static final String GROUP = "KCB-Delivery";
056
057 public static enum Mode {
058 DELIVER, REMOVE
059 }
060
061 private static final Logger LOG = Logger.getLogger(MessageProcessingJob.class);
062
063 private DataObjectService dataObjectService;
064 private MessageDelivererRegistryService registry;
065 private MessageDeliveryService messageDeliveryService;
066 private Long messageId;
067 private Mode mode = null;
068 private String user;
069 private String cause;
070
071 public MessageProcessingJob(Long messageId, Mode mode, String user, String cause) {
072 this();
073 this.messageId = messageId;
074 this.mode = mode;
075 this.user = user;
076 this.cause = cause;
077 }
078
079
080 public MessageProcessingJob() {
081 registry = GlobalKCBServiceLocator.getInstance().getMessageDelivererRegistryService();
082 messageDeliveryService = GlobalKCBServiceLocator.getInstance().getMessageDeliveryService();
083 txManager = GlobalKCBServiceLocator.getInstance().getTransactionManager();
084 dataObjectService = KRADServiceLocator.getDataObjectService();
085 }
086
087 /**
088 * Sets the {@link DataObjectService}
089 * @param dataObjectService the {@link DataObjectService}
090 */
091 public void setDataObjectService(DataObjectService dataObjectService) {
092 this.dataObjectService = dataObjectService;
093 }
094
095 /**
096 * Sets the {@link MessageDelivererRegistryService}
097 * @param registry the {@link MessageDelivererRegistryService}
098 */
099 @Required
100 public void setMessageDelivererRegistry(MessageDelivererRegistryService registry) {
101 this.registry = registry;
102 }
103
104 /**
105 * Sets the {@link MessageDeliveryService}
106 * @param messageDeliveryService the {@link MessageDeliveryService}
107 */
108 @Required
109 public void setMessageDeliveryService(MessageDeliveryService messageDeliveryService) {
110 this.messageDeliveryService = messageDeliveryService;
111 }
112
113 @Override
114 protected Collection<MessageDelivery> takeAvailableWorkItems() {
115 MessageDeliveryStatus[] statuses;
116 switch (mode) {
117 case DELIVER: {
118 statuses = new MessageDeliveryStatus[] { MessageDeliveryStatus.UNDELIVERED };
119 break;
120 }
121 case REMOVE: {
122 if (messageId == null) {
123 throw new IllegalStateException("Message id must be specified for message removal mode");
124 }
125 statuses = new MessageDeliveryStatus[] { MessageDeliveryStatus.DELIVERED, MessageDeliveryStatus.UNDELIVERED };
126 break;
127 }
128 default:
129 throw new RuntimeException("Invalid mode: " + mode);
130 }
131 for (MessageDeliveryStatus status: statuses) {
132 LOG.debug("Taking message deliveries with status: " + status);
133 }
134 Collection<MessageDelivery> ds = messageDeliveryService.lockAndTakeMessageDeliveries(messageId, statuses);
135 LOG.debug("Took " + ds.size() + " deliveries");
136 for (MessageDelivery md: ds) {
137 LOG.debug(md);
138 md.setProcessCount(md.getProcessCount().intValue() + 1);
139 }
140 return ds;
141 }
142
143 @Override
144 protected void unlockWorkItem(MessageDelivery item) {
145 item.setLockedDate(null);
146 dataObjectService.save(item);
147 }
148
149 /**
150 * Group work items by deliverer and notification, so that deliveries to bulk deliverers are grouped
151 * by notification
152 * @see org.kuali.rice.ken.service.impl.ConcurrentJob#groupWorkItems(java.util.Collection)
153 */
154 @Override
155 protected Collection<Collection<MessageDelivery>> groupWorkItems(Collection<MessageDelivery> workItems, ProcessingResult<MessageDelivery> result) {
156 Collection<Collection<MessageDelivery>> groupedWorkItems = new ArrayList<Collection<MessageDelivery>>(workItems.size());
157
158 Map<String, Collection<MessageDelivery>> bulkWorkUnits = new HashMap<String, Collection<MessageDelivery>>();
159 for (MessageDelivery messageDelivery: workItems) {
160
161 MessageDeliverer deliverer = registry.getDeliverer(messageDelivery);
162 if (deliverer == null) {
163 LOG.error("Error obtaining message deliverer for message delivery: " + messageDelivery);
164 result.addFailure(new Failure<MessageDelivery>(messageDelivery, "Error obtaining message deliverer for message delivery"));
165 unlockWorkItemAtomically(messageDelivery);
166 continue;
167 }
168
169 if (deliverer instanceof BulkMessageDeliverer) {
170 // group by bulk-deliverer+message combo
171 String key = messageDelivery.getDelivererTypeName() + ":" + messageDelivery.getMessage().getId();
172 Collection<MessageDelivery> workUnit = bulkWorkUnits.get(key);
173 if (workUnit == null) {
174 workUnit = new LinkedList<MessageDelivery>();
175 bulkWorkUnits.put(key, workUnit);
176 }
177 workUnit.add(messageDelivery);
178 } else {
179 ArrayList<MessageDelivery> l = new ArrayList<MessageDelivery>(1);
180 l.add(messageDelivery);
181 groupedWorkItems.add(l);
182 }
183 }
184
185 return groupedWorkItems;
186 }
187
188
189 @Override
190 protected Collection<MessageDelivery> processWorkItems(Collection<MessageDelivery> messageDeliveries) {
191 MessageDelivery firstMessageDelivery = messageDeliveries.iterator().next();
192 // get our hands on the appropriate MessageDeliverer instance
193 MessageDeliverer messageDeliverer = registry.getDeliverer(firstMessageDelivery);
194 if (messageDeliverer == null) {
195 throw new RuntimeException("Message deliverer could not be obtained");
196 }
197
198 if (messageDeliveries.size() > 1) {
199 // this is a bulk deliverer, so we need to batch the MessageDeliveries
200 if (!(messageDeliverer instanceof BulkMessageDeliverer)) {
201 throw new RuntimeException("Discrepency in dispatch service: deliverer for list of message deliveries is not a BulkMessageDeliverer");
202 }
203 return bulkProcess((BulkMessageDeliverer) messageDeliverer, messageDeliveries, mode);
204 } else {
205 return process(messageDeliverer, firstMessageDelivery, mode);
206 }
207 }
208
209 /**
210 * Implements delivery of a single MessageDelivery
211 * @param messageDeliverer the deliverer
212 * @param messageDelivery the delivery
213 * @return collection of strings indicating successful deliveries
214 */
215 protected Collection<MessageDelivery> process(MessageDeliverer messageDeliverer, MessageDelivery messageDelivery, Mode mode) {
216 // we have our message deliverer, so tell it to deliver the message
217 try {
218 if (mode == Mode.DELIVER) {
219 messageDeliverer.deliver(messageDelivery);
220 // if processing was successful, set the count back to zero
221 messageDelivery.setProcessCount(Integer.valueOf(0));
222 // by definition we have succeeded at this point if no exception was thrown by the messageDeliverer
223 // so update the status of the delivery message instance to DELIVERED (and unmark as taken)
224 // and persist
225 updateStatusAndUnlock(messageDelivery, mode == Mode.DELIVER ? MessageDeliveryStatus.DELIVERED : MessageDeliveryStatus.REMOVED);
226 } else {
227 messageDeliverer.dismiss(messageDelivery, user, cause);
228 // don't need to set the processing count down to zero because we are just deleting the record entirely
229 messageDeliveryService.deleteMessageDelivery(messageDelivery);
230 }
231 } catch (MessageDeliveryProcessingException nmde) {
232 LOG.error("Error processing message delivery " + messageDelivery, nmde);
233 throw new RuntimeException(nmde);
234 }
235
236 LOG.debug("Message delivery '" + messageDelivery.getId() + "' for message '" + messageDelivery.getMessage().getId() + "' was successfully processed.");
237 //PerformanceLog.logDuration("Time to dispatch notification delivery for notification " + messageDelivery.getMessage().getId(), System.currentTimeMillis() - messageDelivery.getNotification().getSendDateTime().getTime());
238
239 List<MessageDelivery> success = new ArrayList<MessageDelivery>(1);
240 success.add(messageDelivery);
241 return success;
242 }
243
244 /**
245 * Implements bulk delivery of a collection of {@link MessageDelivery}s
246 * @param messageDeliverer the deliverer
247 * @param messageDeliveries the deliveries
248 * @return collection of strings indicating successful deliveries
249 */
250 protected Collection<MessageDelivery> bulkProcess(BulkMessageDeliverer messageDeliverer, Collection<MessageDelivery> messageDeliveries, Mode mode) {
251 MessageDeliveryStatus targetStatus = (mode == Mode.DELIVER ? MessageDeliveryStatus.DELIVERED : MessageDeliveryStatus.REMOVED);
252 // we have our message deliverer, so tell it to deliver the message
253 try {
254 if (mode == Mode.DELIVER) {
255 messageDeliverer.bulkDeliver(messageDeliveries);
256 } else {
257 messageDeliverer.bulkDismiss(messageDeliveries);
258 }
259 } catch (MessageDeliveryProcessingException nmde) {
260 LOG.error("Error bulk-delivering messages " + messageDeliveries, nmde);
261 throw new RuntimeException(nmde);
262 }
263
264 // by definition we have succeeded at this point if no exception was thrown by the messageDeliverer
265 // so update the status of the delivery message instance to DELIVERED (and unmark as taken)
266 // and persist
267 List<MessageDelivery> successes = new ArrayList<MessageDelivery>(messageDeliveries.size());
268 for (MessageDelivery nmd: messageDeliveries) {
269 successes.add(nmd);
270 LOG.debug("Message delivery '" + nmd.getId() + "' for notification '" + nmd.getMessage().getId() + "' was successfully delivered.");
271 //PerformanceLog.logDuration("Time to dispatch notification delivery for notification " + nmd.getMessage().getId(), System.currentTimeMillis() - nmd.getNotification().getSendDateTime().getTime());
272 if (mode == Mode.REMOVE) {
273 messageDeliveryService.deleteMessageDelivery(nmd);
274 } else {
275 nmd.setProcessCount(0);
276 updateStatusAndUnlock(nmd, targetStatus);
277 }
278 }
279
280 return successes;
281 }
282
283 @Override
284 protected void finishProcessing(ProcessingResult<MessageDelivery> result) {
285 LOG.debug("Message processing job: " + result.getSuccesses().size() + " processed, " + result.getFailures().size() + " failures");
286 Set<Long> messageIds = new HashSet<Long>(result.getSuccesses().size());
287 for (MessageDelivery md: result.getSuccesses()) {
288 messageIds.add(md.getMessage().getId());
289 }
290 MessageService ms = GlobalKCBServiceLocator.getInstance().getMessageService();
291 for (Long id: messageIds) {
292 LOG.debug("Finishing processing message " + id);
293 //if (Mode.REMOVE == mode) {
294
295 Message m = ms.getMessage(id);
296
297 Collection<MessageDelivery> c = messageDeliveryService.getMessageDeliveries(m);
298 if (c.size() == 0) {
299 LOG.debug("Deleting message " + m);
300 ms.deleteMessage(m);
301 } else {
302 LOG.debug("Message " + m.getId() + " has " + c.size() + " deliveries");
303 for (MessageDelivery md: c) {
304 LOG.debug(md);
305 }
306 }
307 }
308 }
309
310 /**
311 * Marks a MessageDelivery as having been delivered, and unlocks it
312 * @param messageDelivery the messageDelivery instance to mark
313 */
314 protected void updateStatusAndUnlock(MessageDelivery messageDelivery, MessageDeliveryStatus status) {
315 messageDelivery.setDeliveryStatus(status);
316 // mark as unlocked
317 messageDelivery.setLockedDate(null);
318 dataObjectService.save(messageDelivery);
319 }
320
321 @Override
322 public ProcessingResult<MessageDelivery> run() {
323 LOG.debug("MessageProcessingJob running in Thread " + Thread.currentThread() + ": " + mode + " " + user + " " + cause);
324 return super.run();
325 }
326
327 public void execute(JobExecutionContext context) throws JobExecutionException {
328 String mode = context.getMergedJobDataMap().getString("mode");
329 if (mode != null) {
330 this.mode = Mode.valueOf(mode);
331 } else {
332 this.mode = Mode.DELIVER;
333 }
334 this.user = context.getMergedJobDataMap().getString("user");
335 this.cause = context.getMergedJobDataMap().getString("cause");
336 if (context.getMergedJobDataMap().containsKey("messageId")) {
337 this.messageId = context.getMergedJobDataMap().getLong("messageId");
338 }
339 LOG.debug("==== message processing job: " + this.mode + " message id: " + this.messageId + "====");
340 super.run();
341 }
342 }