1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.ken.services.impl;
17
18 import org.junit.Ignore;
19 import org.junit.Test;
20 import org.kuali.rice.core.api.criteria.QueryByCriteria;
21 import org.kuali.rice.kcb.service.GlobalKCBServiceLocator;
22 import org.kuali.rice.kcb.service.MessageService;
23 import org.kuali.rice.ken.bo.NotificationBo;
24 import org.kuali.rice.ken.bo.NotificationMessageDelivery;
25 import org.kuali.rice.ken.service.NotificationMessageDeliveryResolverService;
26 import org.kuali.rice.ken.service.NotificationRecipientService;
27 import org.kuali.rice.ken.service.NotificationService;
28 import org.kuali.rice.ken.service.ProcessingResult;
29 import org.kuali.rice.ken.service.UserPreferenceService;
30 import org.kuali.rice.ken.service.impl.NotificationMessageDeliveryResolverServiceImpl;
31 import org.kuali.rice.ken.test.KENTestCase;
32 import org.kuali.rice.ken.util.NotificationConstants;
33 import org.kuali.rice.krad.data.DataObjectService;
34 import org.kuali.rice.krad.service.KRADServiceLocator;
35 import org.kuali.rice.test.data.PerTestUnitTestData;
36 import org.kuali.rice.test.data.UnitTestData;
37 import org.kuali.rice.test.data.UnitTestSql;
38 import org.springframework.transaction.PlatformTransactionManager;
39
40 import java.util.Collection;
41 import java.util.concurrent.ExecutorService;
42 import java.util.concurrent.Executors;
43
44 import static org.junit.Assert.*;
45
46 import static org.kuali.rice.core.api.criteria.PredicateFactory.equal;
47
48
49
50
51
52
53
54
55 @PerTestUnitTestData(
56 @UnitTestData(
57 order = { UnitTestData.Type.SQL_STATEMENTS },
58 sqlStatements = {
59 @UnitTestSql("insert into KREN_RECIP_DELIV_T (RECIP_DELIV_ID, RECIP_ID, CHNL, NM, VER_NBR) values (1, 'testuser6', 'KEW', 'mock', 0)"),
60 @UnitTestSql("insert into KREN_RECIP_DELIV_T (RECIP_DELIV_ID, RECIP_ID, CHNL, NM, VER_NBR) values (2, 'testuser1', 'KEW', 'mock', 0)"),
61 @UnitTestSql("insert into KREN_RECIP_DELIV_T (RECIP_DELIV_ID, RECIP_ID, CHNL, NM, VER_NBR) values (3, 'testuser2', 'KEW', 'mock', 0)"),
62 @UnitTestSql("insert into KREN_RECIP_DELIV_T (RECIP_DELIV_ID, RECIP_ID, CHNL, NM, VER_NBR) values (4, 'quickstart', 'KEW', 'mock', 0)"),
63 @UnitTestSql("insert into KREN_RECIP_DELIV_T (RECIP_DELIV_ID, RECIP_ID, CHNL, NM, VER_NBR) values (5, 'testuser5', 'KEW', 'mock', 0)"),
64 @UnitTestSql("insert into KREN_RECIP_DELIV_T (RECIP_DELIV_ID, RECIP_ID, CHNL, NM, VER_NBR) values (6, 'testuser4', 'KEW', 'mock', 0)")
65 }
66 )
67 )
68 @Ignore
69 public class NotificationMessageDeliveryResolverServiceImplTest extends KENTestCase {
70
71
72 private static final int EXPECTED_SUCCESSES = 6;
73
74
75
76
77 private static final long BAD_NOTIFICATION_ID = 3L;
78
79 private static class TestNotificationMessageDeliveryResolverService extends NotificationMessageDeliveryResolverServiceImpl {
80 public TestNotificationMessageDeliveryResolverService(NotificationService notificationService, NotificationRecipientService notificationRecipientService,
81 DataObjectService dataObjectService, PlatformTransactionManager txManager, ExecutorService executor) {
82 super(notificationService, notificationRecipientService, dataObjectService, txManager, executor);
83 }
84
85 @Override
86 protected Collection<Object> processWorkItems(Collection<NotificationBo> notifications) {
87 for (NotificationBo notification: notifications) {
88 if (notification.getId().longValue() == BAD_NOTIFICATION_ID) {
89 throw new RuntimeException("Intentional heinous exception");
90 }
91 }
92 return super.processWorkItems(notifications);
93 }
94 }
95
96 protected TestNotificationMessageDeliveryResolverService getResolverService() {
97 return new TestNotificationMessageDeliveryResolverService(services.getNotificationService(), services.getNotificationRecipientService(),
98 KRADServiceLocator.getDataObjectService(), transactionManager,
99 Executors.newFixedThreadPool(5));
100 }
101
102
103 protected void assertProcessResults() {
104
105 Collection<NotificationMessageDelivery> lockedDeliveries = services.getNotificationMessegDeliveryDao().getLockedDeliveries(NotificationBo.class, KRADServiceLocator.getDataObjectService());
106 assertEquals(0, lockedDeliveries.size());
107
108
109 QueryByCriteria.Builder criteria = QueryByCriteria.Builder.create();
110 criteria.setPredicates(equal(NotificationConstants.BO_PROPERTY_NAMES.PROCESSING_FLAG, NotificationConstants.PROCESSING_FLAGS.UNRESOLVED));
111 Collection<NotificationBo> unprocessedDeliveries = KRADServiceLocator.getDataObjectService().findMatching(NotificationBo.class, criteria.build()).getResults();
112
113 assertEquals(1, unprocessedDeliveries.size());
114 NotificationBo n = unprocessedDeliveries.iterator().next();
115
116 assertEquals(BAD_NOTIFICATION_ID, n.getId().longValue());
117 }
118
119
120
121
122
123
124
125
126 @Test
127 public void testResolveNotificationMessageDeliveries() throws Exception {
128 NotificationMessageDeliveryResolverService nSvc = getResolverService();
129
130 ProcessingResult result = nSvc.resolveNotificationMessageDeliveries();
131
132 Thread.sleep(20000);
133
134 assertEquals(EXPECTED_SUCCESSES, result.getSuccesses().size());
135
136 MessageService ms = (MessageService) GlobalKCBServiceLocator.getInstance().getMessageService();
137 assertEquals(result.getSuccesses().size(), ms.getAllMessages().size());
138
139 assertProcessResults();
140 }
141
142
143
144
145
146 @Test
147 public void testResolverConcurrency() throws InterruptedException {
148 final NotificationMessageDeliveryResolverService nSvc = getResolverService();
149
150 final ProcessingResult[] results = new ProcessingResult[2];
151 Thread t1 = new Thread(new Runnable() {
152 public void run() {
153 try {
154 results[0] = nSvc.resolveNotificationMessageDeliveries();
155 } catch (Exception e) {
156 System.err.println("Error resolving notification message deliveries");
157 e.printStackTrace();
158 }
159 }
160 });
161 Thread t2 = new Thread(new Runnable() {
162 public void run() {
163 try {
164 results[1] = nSvc.resolveNotificationMessageDeliveries();
165 } catch (Exception e) {
166 System.err.println("Error resolving notification message deliveries");
167 e.printStackTrace();
168 }
169 }
170 });
171
172 t1.start();
173 t2.start();
174
175 t1.join();
176 t2.join();
177
178
179 LOG.info("Results of thread #1: " + results[0]);
180 LOG.info("Results of thread #2: " + results[1]);
181 assertNotNull(results[0]);
182 assertNotNull(results[1]);
183 assertTrue((results[0].getSuccesses().size() == EXPECTED_SUCCESSES && results[0].getFailures().size() == 1 && results[1].getSuccesses().size() == 0 && results[1].getFailures().size() == 0) ||
184 (results[1].getSuccesses().size() == EXPECTED_SUCCESSES && results[1].getFailures().size() == 1 && results[0].getSuccesses().size() == 0 && results[0].getFailures().size() == 0));
185
186 assertProcessResults();
187 }
188 }