1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.ken.services.impl;
17
18 import org.junit.Test;
19 import org.kuali.rice.core.api.criteria.QueryByCriteria;
20 import org.kuali.rice.ken.bo.NotificationBo;
21 import org.kuali.rice.ken.bo.NotificationMessageDelivery;
22 import org.kuali.rice.ken.service.NotificationMessageDeliveryResolverService;
23 import org.kuali.rice.ken.service.NotificationRecipientService;
24 import org.kuali.rice.ken.service.NotificationService;
25 import org.kuali.rice.ken.service.ProcessingResult;
26 import org.kuali.rice.ken.service.impl.NotificationMessageDeliveryResolverServiceImpl;
27 import org.kuali.rice.ken.test.KENTestCase;
28 import org.kuali.rice.ken.util.NotificationConstants;
29 import org.kuali.rice.krad.data.DataObjectService;
30 import org.kuali.rice.krad.service.KRADServiceLocator;
31 import org.kuali.rice.test.BaselineTestCase;
32 import org.kuali.rice.test.data.PerTestUnitTestData;
33 import org.kuali.rice.test.data.UnitTestData;
34 import org.kuali.rice.test.data.UnitTestSql;
35 import org.springframework.transaction.PlatformTransactionManager;
36
37 import java.util.Collection;
38 import java.util.concurrent.ExecutorService;
39 import java.util.concurrent.Executors;
40
41 import static org.junit.Assert.assertEquals;
42 import static org.junit.Assert.assertNotNull;
43 import static org.junit.Assert.assertTrue;
44 import static org.kuali.rice.core.api.criteria.PredicateFactory.equal;
45
46
47
48
49
50
51 @PerTestUnitTestData(
52 @UnitTestData(
53 order = { UnitTestData.Type.SQL_STATEMENTS },
54 sqlStatements = {
55 @UnitTestSql("insert into KREN_RECIP_DELIV_T (RECIP_DELIV_ID, RECIP_ID, CHNL, NM, VER_NBR) values (1, 'testuser6', 'KEW', 'mock', 0)"),
56 @UnitTestSql("insert into KREN_RECIP_DELIV_T (RECIP_DELIV_ID, RECIP_ID, CHNL, NM, VER_NBR) values (2, 'testuser1', 'KEW', 'mock', 0)"),
57 @UnitTestSql("insert into KREN_RECIP_DELIV_T (RECIP_DELIV_ID, RECIP_ID, CHNL, NM, VER_NBR) values (3, 'testuser2', 'KEW', 'mock', 0)"),
58 @UnitTestSql("insert into KREN_RECIP_DELIV_T (RECIP_DELIV_ID, RECIP_ID, CHNL, NM, VER_NBR) values (4, 'quickstart', 'KEW', 'mock', 0)"),
59 @UnitTestSql("insert into KREN_RECIP_DELIV_T (RECIP_DELIV_ID, RECIP_ID, CHNL, NM, VER_NBR) values (5, 'testuser5', 'KEW', 'mock', 0)"),
60 @UnitTestSql("insert into KREN_RECIP_DELIV_T (RECIP_DELIV_ID, RECIP_ID, CHNL, NM, VER_NBR) values (6, 'testuser4', 'KEW', 'mock', 0)")
61 }
62 )
63 )
64 @BaselineTestCase.BaselineMode(BaselineTestCase.Mode.CLEAR_DB)
65 public class NotificationMessageDeliveryResolverServiceImplTest extends KENTestCase {
66
67
68 private static final int EXPECTED_SUCCESSES = 6;
69
70
71
72
73 private static final long BAD_NOTIFICATION_ID = 3L;
74
75 private static class TestNotificationMessageDeliveryResolverService extends NotificationMessageDeliveryResolverServiceImpl {
76 public TestNotificationMessageDeliveryResolverService(NotificationService notificationService, NotificationRecipientService notificationRecipientService,
77 DataObjectService dataObjectService, PlatformTransactionManager txManager, ExecutorService executor) {
78 super(notificationService, notificationRecipientService, dataObjectService, txManager, executor);
79 }
80
81 @Override
82 protected Collection<Object> processWorkItems(Collection<NotificationBo> notifications) {
83 for (NotificationBo notification: notifications) {
84 if (notification.getId().longValue() == BAD_NOTIFICATION_ID) {
85 throw new RuntimeException("Intentional heinous exception");
86 }
87 }
88 return super.processWorkItems(notifications);
89 }
90 }
91
92 protected TestNotificationMessageDeliveryResolverService getResolverService() {
93 return new TestNotificationMessageDeliveryResolverService(services.getNotificationService(), services.getNotificationRecipientService(),
94 KRADServiceLocator.getDataObjectService(), transactionManager,
95 Executors.newFixedThreadPool(5));
96 }
97
98
99 protected void assertProcessResults() {
100
101 Collection<NotificationMessageDelivery> lockedDeliveries = services.getNotificationMessegDeliveryDao().getLockedDeliveries(NotificationBo.class, KRADServiceLocator.getDataObjectService());
102 assertEquals(0, lockedDeliveries.size());
103
104
105 QueryByCriteria.Builder criteria = QueryByCriteria.Builder.create();
106 criteria.setPredicates(equal(NotificationConstants.BO_PROPERTY_NAMES.PROCESSING_FLAG, NotificationConstants.PROCESSING_FLAGS.UNRESOLVED));
107 Collection<NotificationBo> unprocessedDeliveries = KRADServiceLocator.getDataObjectService().findMatching(NotificationBo.class, criteria.build()).getResults();
108
109 assertEquals(1, unprocessedDeliveries.size());
110 NotificationBo n = unprocessedDeliveries.iterator().next();
111
112 assertEquals(BAD_NOTIFICATION_ID, n.getId().longValue());
113 }
114
115
116
117
118
119
120
121
122 @Test
123 public void testResolveNotificationMessageDeliveries() throws Exception {
124 NotificationMessageDeliveryResolverService nSvc = getResolverService();
125
126 ProcessingResult result = nSvc.resolveNotificationMessageDeliveries();
127
128 assertEquals(EXPECTED_SUCCESSES, result.getSuccesses().size());
129
130 assertProcessResults();
131 }
132
133
134
135
136
137 @Test
138 public void testResolverConcurrency() throws InterruptedException {
139 final NotificationMessageDeliveryResolverService nSvc = getResolverService();
140
141 final ProcessingResult[] results = new ProcessingResult[2];
142 Thread t1 = new Thread(new Runnable() {
143 public void run() {
144 try {
145 results[0] = nSvc.resolveNotificationMessageDeliveries();
146 } catch (Exception e) {
147 System.err.println("Error resolving notification message deliveries");
148 e.printStackTrace();
149 }
150 }
151 });
152 Thread t2 = new Thread(new Runnable() {
153 public void run() {
154 try {
155 results[1] = nSvc.resolveNotificationMessageDeliveries();
156 } catch (Exception e) {
157 System.err.println("Error resolving notification message deliveries");
158 e.printStackTrace();
159 }
160 }
161 });
162
163 t1.start();
164 t2.start();
165
166 t1.join();
167 t2.join();
168
169
170 LOG.info("Results of thread #1: " + results[0]);
171 LOG.info("Results of thread #2: " + results[1]);
172 assertNotNull(results[0]);
173 assertNotNull(results[1]);
174 assertTrue((results[0].getSuccesses().size() == EXPECTED_SUCCESSES && results[0].getFailures().size() == 1 && results[1].getSuccesses().size() == 0 && results[1].getFailures().size() == 0) ||
175 (results[1].getSuccesses().size() == EXPECTED_SUCCESSES && results[1].getFailures().size() == 1 && results[0].getSuccesses().size() == 0 && results[0].getFailures().size() == 0));
176
177 assertProcessResults();
178 }
179 }