1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.ken.services.impl;
17
18 import java.util.Collection;
19 import java.util.HashMap;
20
21 import org.apache.ojb.broker.query.Criteria;
22 import org.junit.Test;
23 import org.kuali.rice.core.dao.GenericDao;
24 import org.kuali.rice.kcb.service.GlobalKCBServiceLocator;
25 import org.kuali.rice.kcb.service.MessageService;
26 import org.kuali.rice.ken.bo.Notification;
27 import org.kuali.rice.ken.bo.NotificationMessageDelivery;
28 import org.kuali.rice.ken.service.NotificationMessageDeliveryResolverService;
29 import org.kuali.rice.ken.service.NotificationRecipientService;
30 import org.kuali.rice.ken.service.NotificationService;
31 import org.kuali.rice.ken.service.ProcessingResult;
32 import org.kuali.rice.ken.service.UserPreferenceService;
33 import org.kuali.rice.ken.service.impl.NotificationMessageDeliveryResolverServiceImpl;
34 import org.kuali.rice.ken.test.KENTestCase;
35 import org.kuali.rice.ken.util.NotificationConstants;
36 import org.kuali.rice.test.data.PerTestUnitTestData;
37 import org.kuali.rice.test.data.UnitTestData;
38 import org.kuali.rice.test.data.UnitTestSql;
39 import org.springframework.transaction.PlatformTransactionManager;
40
41 import java.util.concurrent.ExecutorService;
42 import java.util.concurrent.Executors;
43
44
45
46
47
48
49
50 @PerTestUnitTestData(
51 @UnitTestData(
52 order = { UnitTestData.Type.SQL_STATEMENTS },
53 sqlStatements = {
54 @UnitTestSql("insert into KREN_RECIP_DELIV_T (RECIP_DELIV_ID, RECIP_ID, CHNL, NM, VER_NBR) values (1, 'testuser6', 'KEW', 'mock', 0)"),
55 @UnitTestSql("insert into KREN_RECIP_DELIV_T (RECIP_DELIV_ID, RECIP_ID, CHNL, NM, VER_NBR) values (2, 'testuser1', 'KEW', 'mock', 0)"),
56 @UnitTestSql("insert into KREN_RECIP_DELIV_T (RECIP_DELIV_ID, RECIP_ID, CHNL, NM, VER_NBR) values (3, 'testuser2', 'KEW', 'mock', 0)"),
57 @UnitTestSql("insert into KREN_RECIP_DELIV_T (RECIP_DELIV_ID, RECIP_ID, CHNL, NM, VER_NBR) values (4, 'quickstart', 'KEW', 'mock', 0)"),
58 @UnitTestSql("insert into KREN_RECIP_DELIV_T (RECIP_DELIV_ID, RECIP_ID, CHNL, NM, VER_NBR) values (5, 'testuser5', 'KEW', 'mock', 0)"),
59 @UnitTestSql("insert into KREN_RECIP_DELIV_T (RECIP_DELIV_ID, RECIP_ID, CHNL, NM, VER_NBR) values (6, 'testuser4', 'KEW', 'mock', 0)")
60 }
61 )
62 )
63 public class NotificationMessageDeliveryResolverServiceImplTest extends KENTestCase {
64
65
66 private static final int EXPECTED_SUCCESSES = 6;
67
68
69
70
71 private static final long BAD_NOTIFICATION_ID = 3L;
72
73 private static class TestNotificationMessageDeliveryResolverService extends NotificationMessageDeliveryResolverServiceImpl {
74 public TestNotificationMessageDeliveryResolverService(NotificationService notificationService, NotificationRecipientService notificationRecipientService,
75 GenericDao businessObjectDao, PlatformTransactionManager txManager, ExecutorService executor, UserPreferenceService userPreferenceService) {
76 super(notificationService, notificationRecipientService, businessObjectDao, txManager, executor, userPreferenceService);
77 }
78
79 @Override
80 protected Collection<Object> processWorkItems(Collection<Notification> notifications) {
81 for (Notification notification: notifications) {
82 if (notification.getId().longValue() == BAD_NOTIFICATION_ID) {
83 throw new RuntimeException("Intentional heinous exception");
84 }
85 }
86 return super.processWorkItems(notifications);
87 }
88 }
89
90 protected TestNotificationMessageDeliveryResolverService getResolverService() {
91 return new TestNotificationMessageDeliveryResolverService(services.getNotificationService(), services.getNotificationRecipientService(), services.getGenericDao(), transactionManager,
92 Executors.newFixedThreadPool(5), services.getUserPreferenceService());
93 }
94
95 protected void assertProcessResults() {
96
97 Criteria criteria = new Criteria();
98 criteria.addNotNull(NotificationConstants.BO_PROPERTY_NAMES.LOCKED_DATE);
99 Collection<NotificationMessageDelivery> lockedDeliveries = services.getGenericDao().findMatching(Notification.class, criteria);
100 assertEquals(0, lockedDeliveries.size());
101
102
103 HashMap<String, String> queryCriteria = new HashMap<String, String>();
104 queryCriteria.put(NotificationConstants.BO_PROPERTY_NAMES.PROCESSING_FLAG, NotificationConstants.PROCESSING_FLAGS.UNRESOLVED);
105 Collection<Notification> unprocessedDeliveries = services.getGenericDao().findMatching(Notification.class, queryCriteria);
106 assertEquals(1, unprocessedDeliveries.size());
107 Notification n = unprocessedDeliveries.iterator().next();
108
109 assertEquals(BAD_NOTIFICATION_ID, n.getId().longValue());
110 }
111
112
113
114
115
116
117
118
119 @Test
120 public void testResolveNotificationMessageDeliveries() throws Exception {
121 NotificationMessageDeliveryResolverService nSvc = getResolverService();
122
123 ProcessingResult result = nSvc.resolveNotificationMessageDeliveries();
124
125 Thread.sleep(20000);
126
127 assertEquals(EXPECTED_SUCCESSES, result.getSuccesses().size());
128
129 MessageService ms = (MessageService) GlobalKCBServiceLocator.getInstance().getMessageService();
130 assertEquals(result.getSuccesses().size(), ms.getAllMessages().size());
131
132 assertProcessResults();
133 }
134
135
136
137
138
139 @Test
140 public void testResolverConcurrency() throws InterruptedException {
141 final NotificationMessageDeliveryResolverService nSvc = getResolverService();
142
143 final ProcessingResult[] results = new ProcessingResult[2];
144 Thread t1 = new Thread(new Runnable() {
145 public void run() {
146 try {
147 results[0] = nSvc.resolveNotificationMessageDeliveries();
148 } catch (Exception e) {
149 System.err.println("Error resolving notification message deliveries");
150 e.printStackTrace();
151 }
152 }
153 });
154 Thread t2 = new Thread(new Runnable() {
155 public void run() {
156 try {
157 results[1] = nSvc.resolveNotificationMessageDeliveries();
158 } catch (Exception e) {
159 System.err.println("Error resolving notification message deliveries");
160 e.printStackTrace();
161 }
162 }
163 });
164
165 t1.start();
166 t2.start();
167
168 t1.join();
169 t2.join();
170
171
172 LOG.info("Results of thread #1: " + results[0]);
173 LOG.info("Results of thread #2: " + results[1]);
174 assertNotNull(results[0]);
175 assertNotNull(results[1]);
176 assertTrue((results[0].getSuccesses().size() == EXPECTED_SUCCESSES && results[0].getFailures().size() == 1 && results[1].getSuccesses().size() == 0 && results[1].getFailures().size() == 0) ||
177 (results[1].getSuccesses().size() == EXPECTED_SUCCESSES && results[1].getFailures().size() == 1 && results[0].getSuccesses().size() == 0 && results[0].getFailures().size() == 0));
178
179 assertProcessResults();
180 }
181 }