View Javadoc
1   /**
2    * Copyright 2005-2014 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.kcb.quartz;
17  
18  import org.apache.log4j.Logger;
19  import org.kuali.rice.core.framework.persistence.dao.GenericDao;
20  import org.kuali.rice.kcb.bo.Message;
21  import org.kuali.rice.kcb.bo.MessageDelivery;
22  import org.kuali.rice.kcb.bo.MessageDeliveryStatus;
23  import org.kuali.rice.kcb.deliverer.BulkMessageDeliverer;
24  import org.kuali.rice.kcb.deliverer.MessageDeliverer;
25  import org.kuali.rice.kcb.api.exception.MessageDeliveryProcessingException;
26  import org.kuali.rice.kcb.quartz.ProcessingResult.Failure;
27  import org.kuali.rice.kcb.service.GlobalKCBServiceLocator;
28  import org.kuali.rice.kcb.service.MessageDelivererRegistryService;
29  import org.kuali.rice.kcb.service.MessageDeliveryService;
30  import org.kuali.rice.kcb.service.MessageService;
31  import org.kuali.rice.krad.data.DataObjectService;
32  import org.kuali.rice.krad.service.KRADServiceLocator;
33  import org.quartz.JobExecutionContext;
34  import org.quartz.JobExecutionException;
35  import org.quartz.StatefulJob;
36  import org.springframework.beans.factory.annotation.Required;
37  
38  import java.util.ArrayList;
39  import java.util.Collection;
40  import java.util.HashMap;
41  import java.util.HashSet;
42  import java.util.LinkedList;
43  import java.util.List;
44  import java.util.Map;
45  import java.util.Set;
46  
47  /**
48   * Job that delivers messages to endpoints.  This job is not really stateful,
49   * but should not be executed concurrently.
50   * 
51   * @author Kuali Rice Team (rice.collab@kuali.org)
52   */
53  public class MessageProcessingJob extends ConcurrentJob<MessageDelivery> implements StatefulJob {
54      public static final String NAME = "MessageProcessingJobDetail";
55      public static final String GROUP = "KCB-Delivery";
56  
57      public static enum Mode {
58          DELIVER, REMOVE
59      }
60  
61      private static final Logger LOG = Logger.getLogger(MessageProcessingJob.class);
62      
63      private DataObjectService dataObjectService;
64      private MessageDelivererRegistryService registry;
65      private MessageDeliveryService messageDeliveryService;
66      private Long messageId;
67      private Mode mode = null;
68      private String user;
69      private String cause;
70  
71      public MessageProcessingJob(Long messageId, Mode mode, String user, String cause) {
72          this();
73          this.messageId = messageId;
74          this.mode = mode;
75          this.user = user;
76          this.cause = cause;
77      }
78  
79  
80      public MessageProcessingJob() {
81          registry = GlobalKCBServiceLocator.getInstance().getMessageDelivererRegistryService();
82          messageDeliveryService = GlobalKCBServiceLocator.getInstance().getMessageDeliveryService();
83          txManager = GlobalKCBServiceLocator.getInstance().getTransactionManager();
84          dataObjectService = KRADServiceLocator.getDataObjectService();
85      }
86  
87      /**
88       * Sets the {@link DataObjectService}
89       * @param dataObjectService the {@link DataObjectService}
90       */
91      public void setDataObjectService(DataObjectService dataObjectService) {
92          this.dataObjectService = dataObjectService;
93      }
94  
95      /**
96       * Sets the {@link MessageDelivererRegistryService}
97       * @param registry the {@link MessageDelivererRegistryService}
98       */
99      @Required
100     public void setMessageDelivererRegistry(MessageDelivererRegistryService registry) {
101         this.registry = registry;
102     }
103     
104     /**
105      * Sets the {@link MessageDeliveryService}
106      * @param messageDeliveryService the {@link MessageDeliveryService}
107      */
108     @Required
109     public void setMessageDeliveryService(MessageDeliveryService messageDeliveryService) {
110         this.messageDeliveryService = messageDeliveryService;
111     }
112 
113     @Override
114     protected Collection<MessageDelivery> takeAvailableWorkItems() {
115         MessageDeliveryStatus[] statuses;
116         switch (mode) {
117             case DELIVER: {
118                 statuses = new MessageDeliveryStatus[] { MessageDeliveryStatus.UNDELIVERED };
119                 break;
120             }
121             case REMOVE: {
122                 if (messageId == null) {
123                     throw new IllegalStateException("Message id must be specified for message removal mode");
124                 }
125                 statuses = new MessageDeliveryStatus[] { MessageDeliveryStatus.DELIVERED, MessageDeliveryStatus.UNDELIVERED };
126                 break;
127             }
128             default:
129                 throw new RuntimeException("Invalid mode: " + mode);
130         }
131         for (MessageDeliveryStatus status: statuses) {
132             LOG.debug("Taking message deliveries with status: " + status);
133         }
134         Collection<MessageDelivery> ds = messageDeliveryService.lockAndTakeMessageDeliveries(messageId, statuses);
135         LOG.debug("Took " + ds.size() + " deliveries");
136         for (MessageDelivery md: ds) {
137             LOG.debug(md);
138             md.setProcessCount(md.getProcessCount().intValue() + 1);
139         }
140         return ds;
141     }
142 
143     @Override
144     protected void unlockWorkItem(MessageDelivery item) {
145         item.setLockedDate(null);
146         dataObjectService.save(item);
147     }
148 
149     /**
150      * Group work items by deliverer and notification, so that deliveries to bulk deliverers are grouped
151      * by notification
152      * @see org.kuali.rice.ken.service.impl.ConcurrentJob#groupWorkItems(java.util.Collection)
153      */
154     @Override
155     protected Collection<Collection<MessageDelivery>> groupWorkItems(Collection<MessageDelivery> workItems, ProcessingResult<MessageDelivery> result) {
156         Collection<Collection<MessageDelivery>> groupedWorkItems = new ArrayList<Collection<MessageDelivery>>(workItems.size());
157 
158         Map<String, Collection<MessageDelivery>> bulkWorkUnits = new HashMap<String, Collection<MessageDelivery>>();
159         for (MessageDelivery messageDelivery: workItems) {
160             
161             MessageDeliverer deliverer = registry.getDeliverer(messageDelivery);
162             if (deliverer == null) {
163                 LOG.error("Error obtaining message deliverer for message delivery: " + messageDelivery);
164                 result.addFailure(new Failure<MessageDelivery>(messageDelivery, "Error obtaining message deliverer for message delivery"));
165                 unlockWorkItemAtomically(messageDelivery);
166                 continue;
167             }
168 
169             if (deliverer instanceof BulkMessageDeliverer) {
170                 // group by bulk-deliverer+message combo
171                 String key = messageDelivery.getDelivererTypeName() + ":" + messageDelivery.getMessage().getId();
172                 Collection<MessageDelivery> workUnit = bulkWorkUnits.get(key);
173                 if (workUnit == null) {
174                     workUnit = new LinkedList<MessageDelivery>();
175                     bulkWorkUnits.put(key, workUnit);
176                 }
177                 workUnit.add(messageDelivery);
178             } else {
179                 ArrayList<MessageDelivery> l = new ArrayList<MessageDelivery>(1);
180                 l.add(messageDelivery);
181                 groupedWorkItems.add(l);
182             }
183         }
184 
185         return groupedWorkItems;
186     }
187     
188     
189     @Override
190     protected Collection<MessageDelivery> processWorkItems(Collection<MessageDelivery> messageDeliveries) {
191         MessageDelivery firstMessageDelivery = messageDeliveries.iterator().next();
192         // get our hands on the appropriate MessageDeliverer instance
193         MessageDeliverer messageDeliverer = registry.getDeliverer(firstMessageDelivery);
194         if (messageDeliverer == null) {
195             throw new RuntimeException("Message deliverer could not be obtained");
196         }
197     
198         if (messageDeliveries.size() > 1) {
199             // this is a bulk deliverer, so we need to batch the MessageDeliveries
200             if (!(messageDeliverer instanceof BulkMessageDeliverer)) {
201                 throw new RuntimeException("Discrepency in dispatch service: deliverer for list of message deliveries is not a BulkMessageDeliverer");
202             }
203             return bulkProcess((BulkMessageDeliverer) messageDeliverer, messageDeliveries, mode);
204         } else {
205             return process(messageDeliverer, firstMessageDelivery, mode);
206         }
207     }
208 
209     /**
210      * Implements delivery of a single MessageDelivery
211      * @param messageDeliverer the deliverer
212      * @param messageDelivery the delivery
213      * @return collection of strings indicating successful deliveries
214      */
215     protected Collection<MessageDelivery> process(MessageDeliverer messageDeliverer, MessageDelivery messageDelivery, Mode mode) {
216         // we have our message deliverer, so tell it to deliver the message
217         try {
218             if (mode == Mode.DELIVER) {
219                 messageDeliverer.deliver(messageDelivery);
220                 // if processing was successful, set the count back to zero
221                 messageDelivery.setProcessCount(Integer.valueOf(0));
222                 // by definition we have succeeded at this point if no exception was thrown by the messageDeliverer
223                 // so update the status of the delivery message instance to DELIVERED (and unmark as taken)
224                 // and persist
225                 updateStatusAndUnlock(messageDelivery, mode == Mode.DELIVER ? MessageDeliveryStatus.DELIVERED : MessageDeliveryStatus.REMOVED);
226             } else {
227                 messageDeliverer.dismiss(messageDelivery, user, cause);
228                 // don't need to set the processing count down to zero because we are just deleting the record entirely
229                 messageDeliveryService.deleteMessageDelivery(messageDelivery);
230             }
231         } catch (MessageDeliveryProcessingException nmde) {
232             LOG.error("Error processing message delivery " + messageDelivery, nmde);
233             throw new RuntimeException(nmde);
234         }
235 
236         LOG.debug("Message delivery '" + messageDelivery.getId() + "' for message '" + messageDelivery.getMessage().getId() + "' was successfully processed.");
237         //PerformanceLog.logDuration("Time to dispatch notification delivery for notification " + messageDelivery.getMessage().getId(), System.currentTimeMillis() - messageDelivery.getNotification().getSendDateTime().getTime());
238 
239         List<MessageDelivery> success = new ArrayList<MessageDelivery>(1);
240         success.add(messageDelivery);
241         return success;
242     }
243 
244     /**
245      * Implements bulk delivery of a collection of {@link MessageDelivery}s
246      * @param messageDeliverer the deliverer
247      * @param messageDeliveries the deliveries
248      * @return collection of strings indicating successful deliveries
249      */
250     protected Collection<MessageDelivery> bulkProcess(BulkMessageDeliverer messageDeliverer, Collection<MessageDelivery> messageDeliveries,  Mode mode) {
251         MessageDeliveryStatus targetStatus = (mode == Mode.DELIVER ? MessageDeliveryStatus.DELIVERED : MessageDeliveryStatus.REMOVED);
252         // we have our message deliverer, so tell it to deliver the message
253         try {
254             if (mode == Mode.DELIVER) {
255                 messageDeliverer.bulkDeliver(messageDeliveries);
256             } else {
257                 messageDeliverer.bulkDismiss(messageDeliveries);
258             }
259         } catch (MessageDeliveryProcessingException nmde) {
260             LOG.error("Error bulk-delivering messages " + messageDeliveries, nmde);
261             throw new RuntimeException(nmde);
262         }
263 
264         // by definition we have succeeded at this point if no exception was thrown by the messageDeliverer
265         // so update the status of the delivery message instance to DELIVERED (and unmark as taken)
266         // and persist
267         List<MessageDelivery> successes = new ArrayList<MessageDelivery>(messageDeliveries.size());
268         for (MessageDelivery nmd: messageDeliveries) {
269             successes.add(nmd);
270             LOG.debug("Message delivery '" + nmd.getId() + "' for notification '" + nmd.getMessage().getId() + "' was successfully delivered.");
271             //PerformanceLog.logDuration("Time to dispatch notification delivery for notification " + nmd.getMessage().getId(), System.currentTimeMillis() - nmd.getNotification().getSendDateTime().getTime());
272             if (mode == Mode.REMOVE) {
273                 messageDeliveryService.deleteMessageDelivery(nmd);
274             } else {
275                 nmd.setProcessCount(0);
276                 updateStatusAndUnlock(nmd, targetStatus);                
277             }
278         }
279         
280         return successes;
281     }
282 
283     @Override
284     protected void finishProcessing(ProcessingResult<MessageDelivery> result) {
285         LOG.debug("Message processing job: " + result.getSuccesses().size() + " processed, " + result.getFailures().size() + " failures");
286         Set<Long> messageIds = new HashSet<Long>(result.getSuccesses().size());
287         for (MessageDelivery md: result.getSuccesses()) {
288             messageIds.add(md.getMessage().getId());
289         }
290         MessageService ms = GlobalKCBServiceLocator.getInstance().getMessageService();
291         for (Long id: messageIds) {
292             LOG.debug("Finishing processing message " + id);
293             //if (Mode.REMOVE == mode) {
294             
295             Message m = ms.getMessage(id);
296             
297             Collection<MessageDelivery> c = messageDeliveryService.getMessageDeliveries(m);
298             if (c.size() == 0) {
299                 LOG.debug("Deleting message " + m);
300                 ms.deleteMessage(m);
301             } else {
302                 LOG.debug("Message " + m.getId() + " has " + c.size() + " deliveries");
303                 for (MessageDelivery md: c) {
304                     LOG.debug(md);
305                 }
306             }
307         }
308     }
309 
310     /**
311      * Marks a MessageDelivery as having been delivered, and unlocks it
312      * @param messageDelivery the messageDelivery instance to mark
313      */
314     protected void updateStatusAndUnlock(MessageDelivery messageDelivery, MessageDeliveryStatus status) {
315         messageDelivery.setDeliveryStatus(status);
316         // mark as unlocked
317         messageDelivery.setLockedDate(null);
318         dataObjectService.save(messageDelivery);
319     }
320 
321     @Override
322     public ProcessingResult<MessageDelivery> run() {
323         LOG.debug("MessageProcessingJob running in Thread " + Thread.currentThread() + ": " + mode + " " + user + " " + cause);
324         return super.run();
325     }
326 
327     public void execute(JobExecutionContext context) throws JobExecutionException {
328         String mode = context.getMergedJobDataMap().getString("mode");
329         if (mode != null) {
330             this.mode = Mode.valueOf(mode);
331         } else {
332             this.mode = Mode.DELIVER;
333         }
334         this.user = context.getMergedJobDataMap().getString("user");
335         this.cause = context.getMergedJobDataMap().getString("cause");
336         if (context.getMergedJobDataMap().containsKey("messageId")) {
337             this.messageId = context.getMergedJobDataMap().getLong("messageId");
338         }
339         LOG.debug("==== message processing job: " + this.mode + " message id: " + this.messageId + "====");
340         super.run();
341     }
342 }