1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.kuali.rice.ken.services.impl;
18
19 import org.junit.Test;
20 import org.kuali.rice.core.framework.persistence.dao.GenericDao;
21 import org.kuali.rice.kcb.service.GlobalKCBServiceLocator;
22 import org.kuali.rice.kcb.service.MessageService;
23 import org.kuali.rice.ken.bo.Notification;
24 import org.kuali.rice.ken.bo.NotificationMessageDelivery;
25 import org.kuali.rice.ken.service.NotificationMessageDeliveryResolverService;
26 import org.kuali.rice.ken.service.NotificationRecipientService;
27 import org.kuali.rice.ken.service.NotificationService;
28 import org.kuali.rice.ken.service.ProcessingResult;
29 import org.kuali.rice.ken.service.UserPreferenceService;
30 import org.kuali.rice.ken.service.impl.NotificationMessageDeliveryResolverServiceImpl;
31 import org.kuali.rice.ken.test.KENTestCase;
32 import org.kuali.rice.ken.util.NotificationConstants;
33 import org.kuali.rice.test.data.PerTestUnitTestData;
34 import org.kuali.rice.test.data.UnitTestData;
35 import org.kuali.rice.test.data.UnitTestSql;
36 import org.springframework.transaction.PlatformTransactionManager;
37
38 import java.util.Collection;
39 import java.util.HashMap;
40 import java.util.concurrent.ExecutorService;
41 import java.util.concurrent.Executors;
42
43 import static org.junit.Assert.*;
44
45
46
47
48
49
50
51
52
53 @PerTestUnitTestData(
54 @UnitTestData(
55 order = { UnitTestData.Type.SQL_STATEMENTS },
56 sqlStatements = {
57 @UnitTestSql("insert into KREN_RECIP_DELIV_T (RECIP_DELIV_ID, RECIP_ID, CHNL, NM, VER_NBR) values (1, 'testuser6', 'KEW', 'mock', 0)"),
58 @UnitTestSql("insert into KREN_RECIP_DELIV_T (RECIP_DELIV_ID, RECIP_ID, CHNL, NM, VER_NBR) values (2, 'testuser1', 'KEW', 'mock', 0)"),
59 @UnitTestSql("insert into KREN_RECIP_DELIV_T (RECIP_DELIV_ID, RECIP_ID, CHNL, NM, VER_NBR) values (3, 'testuser2', 'KEW', 'mock', 0)"),
60 @UnitTestSql("insert into KREN_RECIP_DELIV_T (RECIP_DELIV_ID, RECIP_ID, CHNL, NM, VER_NBR) values (4, 'quickstart', 'KEW', 'mock', 0)"),
61 @UnitTestSql("insert into KREN_RECIP_DELIV_T (RECIP_DELIV_ID, RECIP_ID, CHNL, NM, VER_NBR) values (5, 'testuser5', 'KEW', 'mock', 0)"),
62 @UnitTestSql("insert into KREN_RECIP_DELIV_T (RECIP_DELIV_ID, RECIP_ID, CHNL, NM, VER_NBR) values (6, 'testuser4', 'KEW', 'mock', 0)")
63 }
64 )
65 )
66 public class NotificationMessageDeliveryResolverServiceImplTest extends KENTestCase {
67
68
69 private static final int EXPECTED_SUCCESSES = 6;
70
71
72
73
74 private static final long BAD_NOTIFICATION_ID = 3L;
75
76 private static class TestNotificationMessageDeliveryResolverService extends NotificationMessageDeliveryResolverServiceImpl {
77 public TestNotificationMessageDeliveryResolverService(NotificationService notificationService, NotificationRecipientService notificationRecipientService,
78 GenericDao businessObjectDao, PlatformTransactionManager txManager, ExecutorService executor, UserPreferenceService userPreferenceService) {
79 super(notificationService, notificationRecipientService, businessObjectDao, txManager, executor, userPreferenceService);
80 }
81
82 @Override
83 protected Collection<Object> processWorkItems(Collection<Notification> notifications) {
84 for (Notification notification: notifications) {
85 if (notification.getId().longValue() == BAD_NOTIFICATION_ID) {
86 throw new RuntimeException("Intentional heinous exception");
87 }
88 }
89 return super.processWorkItems(notifications);
90 }
91 }
92
93 protected TestNotificationMessageDeliveryResolverService getResolverService() {
94 return new TestNotificationMessageDeliveryResolverService(services.getNotificationService(), services.getNotificationRecipientService(), services.getGenericDao(), transactionManager,
95 Executors.newFixedThreadPool(5), services.getUserPreferenceService());
96 }
97
98
99 protected void assertProcessResults() {
100
101 Collection<NotificationMessageDelivery> lockedDeliveries = services.getNotificationMessegDeliveryDao().getLockedDeliveries(Notification.class, services.getGenericDao());
102
103 assertEquals(0, lockedDeliveries.size());
104
105
106 HashMap<String, String> queryCriteria = new HashMap<String, String>();
107 queryCriteria.put(NotificationConstants.BO_PROPERTY_NAMES.PROCESSING_FLAG, NotificationConstants.PROCESSING_FLAGS.UNRESOLVED);
108 Collection<Notification> unprocessedDeliveries = services.getGenericDao().findMatching(Notification.class, queryCriteria);
109 assertEquals(1, unprocessedDeliveries.size());
110 Notification n = unprocessedDeliveries.iterator().next();
111
112 assertEquals(BAD_NOTIFICATION_ID, n.getId().longValue());
113 }
114
115
116
117
118
119
120
121
122 @Test
123 public void testResolveNotificationMessageDeliveries() throws Exception {
124 NotificationMessageDeliveryResolverService nSvc = getResolverService();
125
126 ProcessingResult result = nSvc.resolveNotificationMessageDeliveries();
127
128 Thread.sleep(20000);
129
130 assertEquals(EXPECTED_SUCCESSES, result.getSuccesses().size());
131
132 MessageService ms = (MessageService) GlobalKCBServiceLocator.getInstance().getMessageService();
133 assertEquals(result.getSuccesses().size(), ms.getAllMessages().size());
134
135 assertProcessResults();
136 }
137
138
139
140
141
142 @Test
143 public void testResolverConcurrency() throws InterruptedException {
144 final NotificationMessageDeliveryResolverService nSvc = getResolverService();
145
146 final ProcessingResult[] results = new ProcessingResult[2];
147 Thread t1 = new Thread(new Runnable() {
148 public void run() {
149 try {
150 results[0] = nSvc.resolveNotificationMessageDeliveries();
151 } catch (Exception e) {
152 System.err.println("Error resolving notification message deliveries");
153 e.printStackTrace();
154 }
155 }
156 });
157 Thread t2 = new Thread(new Runnable() {
158 public void run() {
159 try {
160 results[1] = nSvc.resolveNotificationMessageDeliveries();
161 } catch (Exception e) {
162 System.err.println("Error resolving notification message deliveries");
163 e.printStackTrace();
164 }
165 }
166 });
167
168 t1.start();
169 t2.start();
170
171 t1.join();
172 t2.join();
173
174
175 LOG.info("Results of thread #1: " + results[0]);
176 LOG.info("Results of thread #2: " + results[1]);
177 assertNotNull(results[0]);
178 assertNotNull(results[1]);
179 assertTrue((results[0].getSuccesses().size() == EXPECTED_SUCCESSES && results[0].getFailures().size() == 1 && results[1].getSuccesses().size() == 0 && results[1].getFailures().size() == 0) ||
180 (results[1].getSuccesses().size() == EXPECTED_SUCCESSES && results[1].getFailures().size() == 1 && results[0].getSuccesses().size() == 0 && results[0].getFailures().size() == 0));
181
182 assertProcessResults();
183 }
184 }