1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.ken.services.impl;
17
18 import org.junit.Test;
19 import org.kuali.rice.core.framework.persistence.dao.GenericDao;
20 import org.kuali.rice.kcb.service.GlobalKCBServiceLocator;
21 import org.kuali.rice.kcb.service.MessageService;
22 import org.kuali.rice.ken.bo.Notification;
23 import org.kuali.rice.ken.bo.NotificationMessageDelivery;
24 import org.kuali.rice.ken.service.NotificationMessageDeliveryResolverService;
25 import org.kuali.rice.ken.service.NotificationRecipientService;
26 import org.kuali.rice.ken.service.NotificationService;
27 import org.kuali.rice.ken.service.ProcessingResult;
28 import org.kuali.rice.ken.service.UserPreferenceService;
29 import org.kuali.rice.ken.service.impl.NotificationMessageDeliveryResolverServiceImpl;
30 import org.kuali.rice.ken.test.KENTestCase;
31 import org.kuali.rice.ken.util.NotificationConstants;
32 import org.kuali.rice.test.data.PerTestUnitTestData;
33 import org.kuali.rice.test.data.UnitTestData;
34 import org.kuali.rice.test.data.UnitTestSql;
35 import org.springframework.transaction.PlatformTransactionManager;
36
37 import java.util.Collection;
38 import java.util.HashMap;
39 import java.util.concurrent.ExecutorService;
40 import java.util.concurrent.Executors;
41
42 import static org.junit.Assert.*;
43
44
45
46
47
48
49
50
51
52 @PerTestUnitTestData(
53 @UnitTestData(
54 order = { UnitTestData.Type.SQL_STATEMENTS },
55 sqlStatements = {
56 @UnitTestSql("insert into KREN_RECIP_DELIV_T (RECIP_DELIV_ID, RECIP_ID, CHNL, NM, VER_NBR) values (1, 'testuser6', 'KEW', 'mock', 0)"),
57 @UnitTestSql("insert into KREN_RECIP_DELIV_T (RECIP_DELIV_ID, RECIP_ID, CHNL, NM, VER_NBR) values (2, 'testuser1', 'KEW', 'mock', 0)"),
58 @UnitTestSql("insert into KREN_RECIP_DELIV_T (RECIP_DELIV_ID, RECIP_ID, CHNL, NM, VER_NBR) values (3, 'testuser2', 'KEW', 'mock', 0)"),
59 @UnitTestSql("insert into KREN_RECIP_DELIV_T (RECIP_DELIV_ID, RECIP_ID, CHNL, NM, VER_NBR) values (4, 'quickstart', 'KEW', 'mock', 0)"),
60 @UnitTestSql("insert into KREN_RECIP_DELIV_T (RECIP_DELIV_ID, RECIP_ID, CHNL, NM, VER_NBR) values (5, 'testuser5', 'KEW', 'mock', 0)"),
61 @UnitTestSql("insert into KREN_RECIP_DELIV_T (RECIP_DELIV_ID, RECIP_ID, CHNL, NM, VER_NBR) values (6, 'testuser4', 'KEW', 'mock', 0)")
62 }
63 )
64 )
65 public class NotificationMessageDeliveryResolverServiceImplTest extends KENTestCase {
66
67
68 private static final int EXPECTED_SUCCESSES = 6;
69
70
71
72
73 private static final long BAD_NOTIFICATION_ID = 3L;
74
75 private static class TestNotificationMessageDeliveryResolverService extends NotificationMessageDeliveryResolverServiceImpl {
76 public TestNotificationMessageDeliveryResolverService(NotificationService notificationService, NotificationRecipientService notificationRecipientService,
77 GenericDao businessObjectDao, PlatformTransactionManager txManager, ExecutorService executor, UserPreferenceService userPreferenceService) {
78 super(notificationService, notificationRecipientService, businessObjectDao, txManager, executor, userPreferenceService);
79 }
80
81 @Override
82 protected Collection<Object> processWorkItems(Collection<Notification> notifications) {
83 for (Notification notification: notifications) {
84 if (notification.getId().longValue() == BAD_NOTIFICATION_ID) {
85 throw new RuntimeException("Intentional heinous exception");
86 }
87 }
88 return super.processWorkItems(notifications);
89 }
90 }
91
92 protected TestNotificationMessageDeliveryResolverService getResolverService() {
93 return new TestNotificationMessageDeliveryResolverService(services.getNotificationService(), services.getNotificationRecipientService(), services.getGenericDao(), transactionManager,
94 Executors.newFixedThreadPool(5), services.getUserPreferenceService());
95 }
96
97
98 protected void assertProcessResults() {
99
100 Collection<NotificationMessageDelivery> lockedDeliveries = services.getNotificationMessegDeliveryDao().getLockedDeliveries(Notification.class, services.getGenericDao());
101
102 assertEquals(0, lockedDeliveries.size());
103
104
105 HashMap<String, String> queryCriteria = new HashMap<String, String>();
106 queryCriteria.put(NotificationConstants.BO_PROPERTY_NAMES.PROCESSING_FLAG, NotificationConstants.PROCESSING_FLAGS.UNRESOLVED);
107 Collection<Notification> unprocessedDeliveries = services.getGenericDao().findMatching(Notification.class, queryCriteria);
108 assertEquals(1, unprocessedDeliveries.size());
109 Notification n = unprocessedDeliveries.iterator().next();
110
111 assertEquals(BAD_NOTIFICATION_ID, n.getId().longValue());
112 }
113
114
115
116
117
118
119
120
121 @Test
122 public void testResolveNotificationMessageDeliveries() throws Exception {
123 NotificationMessageDeliveryResolverService nSvc = getResolverService();
124
125 ProcessingResult result = nSvc.resolveNotificationMessageDeliveries();
126
127 Thread.sleep(20000);
128
129 assertEquals(EXPECTED_SUCCESSES, result.getSuccesses().size());
130
131 MessageService ms = (MessageService) GlobalKCBServiceLocator.getInstance().getMessageService();
132 assertEquals(result.getSuccesses().size(), ms.getAllMessages().size());
133
134 assertProcessResults();
135 }
136
137
138
139
140
141 @Test
142 public void testResolverConcurrency() throws InterruptedException {
143 final NotificationMessageDeliveryResolverService nSvc = getResolverService();
144
145 final ProcessingResult[] results = new ProcessingResult[2];
146 Thread t1 = new Thread(new Runnable() {
147 public void run() {
148 try {
149 results[0] = nSvc.resolveNotificationMessageDeliveries();
150 } catch (Exception e) {
151 System.err.println("Error resolving notification message deliveries");
152 e.printStackTrace();
153 }
154 }
155 });
156 Thread t2 = new Thread(new Runnable() {
157 public void run() {
158 try {
159 results[1] = nSvc.resolveNotificationMessageDeliveries();
160 } catch (Exception e) {
161 System.err.println("Error resolving notification message deliveries");
162 e.printStackTrace();
163 }
164 }
165 });
166
167 t1.start();
168 t2.start();
169
170 t1.join();
171 t2.join();
172
173
174 LOG.info("Results of thread #1: " + results[0]);
175 LOG.info("Results of thread #2: " + results[1]);
176 assertNotNull(results[0]);
177 assertNotNull(results[1]);
178 assertTrue((results[0].getSuccesses().size() == EXPECTED_SUCCESSES && results[0].getFailures().size() == 1 && results[1].getSuccesses().size() == 0 && results[1].getFailures().size() == 0) ||
179 (results[1].getSuccesses().size() == EXPECTED_SUCCESSES && results[1].getFailures().size() == 1 && results[0].getSuccesses().size() == 0 && results[0].getFailures().size() == 0));
180
181 assertProcessResults();
182 }
183 }