View Javadoc

1   /**
2    * Copyright 2005-2011 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.kcb.quartz;
17  
18  import org.apache.log4j.Logger;
19  import org.kuali.rice.core.framework.persistence.dao.GenericDao;
20  import org.kuali.rice.kcb.bo.Message;
21  import org.kuali.rice.kcb.bo.MessageDelivery;
22  import org.kuali.rice.kcb.bo.MessageDeliveryStatus;
23  import org.kuali.rice.kcb.deliverer.BulkMessageDeliverer;
24  import org.kuali.rice.kcb.deliverer.MessageDeliverer;
25  import org.kuali.rice.kcb.api.exception.MessageDeliveryProcessingException;
26  import org.kuali.rice.kcb.quartz.ProcessingResult.Failure;
27  import org.kuali.rice.kcb.service.GlobalKCBServiceLocator;
28  import org.kuali.rice.kcb.service.MessageDelivererRegistryService;
29  import org.kuali.rice.kcb.service.MessageDeliveryService;
30  import org.kuali.rice.kcb.service.MessageService;
31  import org.quartz.JobExecutionContext;
32  import org.quartz.JobExecutionException;
33  import org.quartz.StatefulJob;
34  import org.springframework.beans.factory.annotation.Required;
35  
36  import java.util.ArrayList;
37  import java.util.Collection;
38  import java.util.HashMap;
39  import java.util.HashSet;
40  import java.util.LinkedList;
41  import java.util.List;
42  import java.util.Map;
43  import java.util.Set;
44  
45  /**
46   * Job that delivers messages to endpoints.  This job is not really stateful,
47   * but should not be executed concurrently.
48   * 
49   * @author Kuali Rice Team (rice.collab@kuali.org)
50   */
51  public class MessageProcessingJob extends ConcurrentJob<MessageDelivery> implements StatefulJob {
52      public static final String NAME = "MessageProcessingJobDetail";
53      public static final String GROUP = "KCB-Delivery";
54  
55      public static enum Mode {
56          DELIVER, REMOVE
57      }
58  
59      private static final Logger LOG = Logger.getLogger(MessageProcessingJob.class);
60      
61      private GenericDao dao;
62      private MessageDelivererRegistryService registry;
63      private MessageDeliveryService messageDeliveryService;
64      private Long messageId;
65      private Mode mode = null;
66      private String user;
67      private String cause;
68  
69      public MessageProcessingJob(Long messageId, Mode mode, String user, String cause) {
70          this();
71          this.messageId = messageId;
72          this.mode = mode;
73          this.user = user;
74          this.cause = cause;
75      }
76  
77  
78      public MessageProcessingJob() {
79          dao = GlobalKCBServiceLocator.getInstance().getKcbGenericDao();
80          registry = GlobalKCBServiceLocator.getInstance().getMessageDelivererRegistryService();
81          messageDeliveryService = GlobalKCBServiceLocator.getInstance().getMessageDeliveryService();
82          txManager = GlobalKCBServiceLocator.getInstance().getTransactionManager();
83      }
84  
85      /**
86       * Sets the {@link GenericDao}
87       * @param dao the {@link GenericDao}
88       */
89      @Required
90      public void setGenericDao(GenericDao dao) {
91          this.dao = dao;
92      }
93  
94      /**
95       * Sets the {@link MessageDelivererRegistryService}
96       * @param registry the {@link MessageDelivererRegistryService}
97       */
98      @Required
99      public void setMessageDelivererRegistry(MessageDelivererRegistryService registry) {
100         this.registry = registry;
101     }
102     
103     /**
104      * Sets the {@link MessageDeliveryService}
105      * @param messageDeliveryService the {@link MessageDeliveryService}
106      */
107     @Required
108     public void setMessageDeliveryService(MessageDeliveryService messageDeliveryService) {
109         this.messageDeliveryService = messageDeliveryService;
110     }
111 
112     @Override
113     protected Collection<MessageDelivery> takeAvailableWorkItems() {
114         MessageDeliveryStatus[] statuses;
115         switch (mode) {
116             case DELIVER: {
117                 statuses = new MessageDeliveryStatus[] { MessageDeliveryStatus.UNDELIVERED };
118                 break;
119             }
120             case REMOVE: {
121                 if (messageId == null) {
122                     throw new IllegalStateException("Message id must be specified for message removal mode");
123                 }
124                 statuses = new MessageDeliveryStatus[] { MessageDeliveryStatus.DELIVERED, MessageDeliveryStatus.UNDELIVERED };
125                 break;
126             }
127             default:
128                 throw new RuntimeException("Invalid mode: " + mode);
129         }
130         for (MessageDeliveryStatus status: statuses) {
131             LOG.debug("Taking message deliveries with status: " + status);
132         }
133         Collection<MessageDelivery> ds = messageDeliveryService.lockAndTakeMessageDeliveries(messageId, statuses);
134         LOG.debug("Took " + ds.size() + " deliveries");
135         for (MessageDelivery md: ds) {
136             LOG.debug(md);
137             md.setProcessCount(md.getProcessCount().intValue() + 1);
138         }
139         return ds;
140     }
141 
142     @Override
143     protected void unlockWorkItem(MessageDelivery item) {
144         item.setLockedDate(null);
145         dao.save(item);
146     }
147 
148     /**
149      * Group work items by deliverer and notification, so that deliveries to bulk deliverers are grouped
150      * by notification
151      * @see org.kuali.rice.ken.service.impl.ConcurrentJob#groupWorkItems(java.util.Collection)
152      */
153     @Override
154     protected Collection<Collection<MessageDelivery>> groupWorkItems(Collection<MessageDelivery> workItems, ProcessingResult<MessageDelivery> result) {
155         Collection<Collection<MessageDelivery>> groupedWorkItems = new ArrayList<Collection<MessageDelivery>>(workItems.size());
156 
157         Map<String, Collection<MessageDelivery>> bulkWorkUnits = new HashMap<String, Collection<MessageDelivery>>();
158         for (MessageDelivery messageDelivery: workItems) {
159             
160             MessageDeliverer deliverer = registry.getDeliverer(messageDelivery);
161             if (deliverer == null) {
162                 LOG.error("Error obtaining message deliverer for message delivery: " + messageDelivery);
163                 result.addFailure(new Failure<MessageDelivery>(messageDelivery, "Error obtaining message deliverer for message delivery"));
164                 unlockWorkItemAtomically(messageDelivery);
165                 continue;
166             }
167 
168             if (deliverer instanceof BulkMessageDeliverer) {
169                 // group by bulk-deliverer+message combo
170                 String key = messageDelivery.getDelivererTypeName() + ":" + messageDelivery.getMessage().getId();
171                 Collection<MessageDelivery> workUnit = bulkWorkUnits.get(key);
172                 if (workUnit == null) {
173                     workUnit = new LinkedList<MessageDelivery>();
174                     bulkWorkUnits.put(key, workUnit);
175                 }
176                 workUnit.add(messageDelivery);
177             } else {
178                 ArrayList<MessageDelivery> l = new ArrayList<MessageDelivery>(1);
179                 l.add(messageDelivery);
180                 groupedWorkItems.add(l);
181             }
182         }
183 
184         return groupedWorkItems;
185     }
186     
187     
188     @Override
189     protected Collection<MessageDelivery> processWorkItems(Collection<MessageDelivery> messageDeliveries) {
190         MessageDelivery firstMessageDelivery = messageDeliveries.iterator().next();
191         // get our hands on the appropriate MessageDeliverer instance
192         MessageDeliverer messageDeliverer = registry.getDeliverer(firstMessageDelivery);
193         if (messageDeliverer == null) {
194             throw new RuntimeException("Message deliverer could not be obtained");
195         }
196     
197         if (messageDeliveries.size() > 1) {
198             // this is a bulk deliverer, so we need to batch the MessageDeliveries
199             if (!(messageDeliverer instanceof BulkMessageDeliverer)) {
200                 throw new RuntimeException("Discrepency in dispatch service: deliverer for list of message deliveries is not a BulkMessageDeliverer");
201             }
202             return bulkProcess((BulkMessageDeliverer) messageDeliverer, messageDeliveries, mode);
203         } else {
204             return process(messageDeliverer, firstMessageDelivery, mode);
205         }
206     }
207 
208     /**
209      * Implements delivery of a single MessageDelivery
210      * @param deliverer the deliverer
211      * @param messageDelivery the delivery
212      * @return collection of strings indicating successful deliveries
213      */
214     protected Collection<MessageDelivery> process(MessageDeliverer messageDeliverer, MessageDelivery messageDelivery, Mode mode) {
215         // we have our message deliverer, so tell it to deliver the message
216         try {
217             if (mode == Mode.DELIVER) {
218                 messageDeliverer.deliver(messageDelivery);
219                 // if processing was successful, set the count back to zero
220                 messageDelivery.setProcessCount(Integer.valueOf(0));
221                 // by definition we have succeeded at this point if no exception was thrown by the messageDeliverer
222                 // so update the status of the delivery message instance to DELIVERED (and unmark as taken)
223                 // and persist
224                 updateStatusAndUnlock(messageDelivery, mode == Mode.DELIVER ? MessageDeliveryStatus.DELIVERED : MessageDeliveryStatus.REMOVED);
225             } else {
226                 messageDeliverer.dismiss(messageDelivery, user, cause);
227                 // don't need to set the processing count down to zero because we are just deleting the record entirely
228                 messageDeliveryService.deleteMessageDelivery(messageDelivery);
229             }
230         } catch (MessageDeliveryProcessingException nmde) {
231             LOG.error("Error processing message delivery " + messageDelivery, nmde);
232             throw new RuntimeException(nmde);
233         }
234 
235         LOG.debug("Message delivery '" + messageDelivery.getId() + "' for message '" + messageDelivery.getMessage().getId() + "' was successfully processed.");
236         //PerformanceLog.logDuration("Time to dispatch notification delivery for notification " + messageDelivery.getMessage().getId(), System.currentTimeMillis() - messageDelivery.getNotification().getSendDateTime().getTime());
237 
238         List<MessageDelivery> success = new ArrayList<MessageDelivery>(1);
239         success.add(messageDelivery);
240         return success;
241     }
242 
243     /**
244      * Implements bulk delivery of a collection of {@link MessageDelivery}s
245      * @param deliverer the deliverer
246      * @param messageDeliveries the deliveries
247      * @return collection of strings indicating successful deliveries
248      */
249     protected Collection<MessageDelivery> bulkProcess(BulkMessageDeliverer messageDeliverer, Collection<MessageDelivery> messageDeliveries,  Mode mode) {
250         MessageDeliveryStatus targetStatus = (mode == Mode.DELIVER ? MessageDeliveryStatus.DELIVERED : MessageDeliveryStatus.REMOVED);
251         // we have our message deliverer, so tell it to deliver the message
252         try {
253             if (mode == Mode.DELIVER) {
254                 messageDeliverer.bulkDeliver(messageDeliveries);
255             } else {
256                 messageDeliverer.bulkDismiss(messageDeliveries);
257             }
258         } catch (MessageDeliveryProcessingException nmde) {
259             LOG.error("Error bulk-delivering messages " + messageDeliveries, nmde);
260             throw new RuntimeException(nmde);
261         }
262 
263         // by definition we have succeeded at this point if no exception was thrown by the messageDeliverer
264         // so update the status of the delivery message instance to DELIVERED (and unmark as taken)
265         // and persist
266         List<MessageDelivery> successes = new ArrayList<MessageDelivery>(messageDeliveries.size());
267         for (MessageDelivery nmd: messageDeliveries) {
268             successes.add(nmd);
269             LOG.debug("Message delivery '" + nmd.getId() + "' for notification '" + nmd.getMessage().getId() + "' was successfully delivered.");
270             //PerformanceLog.logDuration("Time to dispatch notification delivery for notification " + nmd.getMessage().getId(), System.currentTimeMillis() - nmd.getNotification().getSendDateTime().getTime());
271             if (mode == Mode.REMOVE) {
272                 messageDeliveryService.deleteMessageDelivery(nmd);
273             } else {
274                 nmd.setProcessCount(0);
275                 updateStatusAndUnlock(nmd, targetStatus);                
276             }
277         }
278         
279         return successes;
280     }
281 
282     @Override
283     protected void finishProcessing(ProcessingResult<MessageDelivery> result) {
284         LOG.debug("Message processing job: " + result.getSuccesses().size() + " processed, " + result.getFailures().size() + " failures");
285         Set<Long> messageIds = new HashSet<Long>(result.getSuccesses().size());
286         for (MessageDelivery md: result.getSuccesses()) {
287             messageIds.add(md.getMessage().getId());
288         }
289         MessageService ms = GlobalKCBServiceLocator.getInstance().getMessageService();
290         for (Long id: messageIds) {
291             LOG.debug("Finishing processing message " + id);
292             //if (Mode.REMOVE == mode) {
293             
294             Message m = ms.getMessage(id);
295             
296             Collection<MessageDelivery> c = messageDeliveryService.getMessageDeliveries(m);
297             if (c.size() == 0) {
298                 LOG.debug("Deleting message " + m);
299                 ms.deleteMessage(m);
300             } else {
301                 LOG.debug("Message " + m.getId() + " has " + c.size() + " deliveries");
302                 for (MessageDelivery md: c) {
303                     LOG.debug(md);
304                 }
305             }
306         }
307     }
308 
309     /**
310      * Marks a MessageDelivery as having been delivered, and unlocks it
311      * @param messageDelivery the messageDelivery instance to mark
312      */
313     protected void updateStatusAndUnlock(MessageDelivery messageDelivery, MessageDeliveryStatus status) {
314         messageDelivery.setDeliveryStatus(status);
315         // mark as unlocked
316         messageDelivery.setLockedDate(null);
317         dao.save(messageDelivery);
318     }
319 
320     @Override
321     public ProcessingResult<MessageDelivery> run() {
322         LOG.debug("MessageProcessingJob running in Thread " + Thread.currentThread() + ": " + mode + " " + user + " " + cause);
323         return super.run();
324     }
325 
326     public void execute(JobExecutionContext context) throws JobExecutionException {
327         String mode = context.getMergedJobDataMap().getString("mode");
328         if (mode != null) {
329             this.mode = Mode.valueOf(mode);
330         } else {
331             this.mode = Mode.DELIVER;
332         }
333         this.user = context.getMergedJobDataMap().getString("user");
334         this.cause = context.getMergedJobDataMap().getString("cause");
335         if (context.getMergedJobDataMap().containsKey("messageId")) {
336             this.messageId = context.getMergedJobDataMap().getLong("messageId");
337         }
338         LOG.debug("==== message processing job: " + this.mode + " message id: " + this.messageId + "====");
339         super.run();
340     }
341 }