1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.ksb.messaging;
17
18 import static org.junit.Assert.assertEquals;
19 import static org.junit.Assert.assertNotNull;
20 import static org.junit.Assert.assertTrue;
21
22 import java.util.List;
23
24 import javax.xml.namespace.QName;
25
26 import org.junit.After;
27 import org.junit.Before;
28 import org.junit.Test;
29 import org.kuali.rice.core.api.config.property.ConfigContext;
30 import org.kuali.rice.ksb.api.KsbApiServiceLocator;
31 import org.kuali.rice.ksb.messaging.remotedservices.TestHarnessSharedTopic;
32 import org.kuali.rice.ksb.messaging.service.KSBJavaService;
33 import org.kuali.rice.ksb.service.KSBServiceLocator;
34 import org.kuali.rice.ksb.test.KSBTestCase;
35 import org.kuali.rice.ksb.util.KSBConstants;
36
37
38
39
40
41
42
43
44
45 public class MessageFetcherTest extends KSBTestCase {
46
47 @Before
48 @Override
49 public void setUp() throws Exception {
50 super.setUp();
51 ConfigContext.getCurrentContextConfig().putProperty(KSBConstants.Config.MESSAGING_OFF, "true");
52 TestHarnessSharedTopic.CALL_COUNT = 0;
53 TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD = 0;
54 }
55
56 @After
57 @Override
58 public void tearDown() throws Exception {
59 TestHarnessSharedTopic.CALL_COUNT = 0;
60 }
61
62 @Test
63 public void testRequeueMessages() throws Exception {
64
65 List<PersistedMessageBO> messages = KSBServiceLocator.getMessageQueueService().getNextDocuments(null);
66 assertEquals("Should have no messages in the queue.", 0, messages.size());
67
68
69
70 TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD = 500;
71
72 for (int i = 0; i < TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD; i++) {
73 sendMessage();
74 }
75
76
77 Thread.sleep(5000);
78
79 messages = KSBServiceLocator.getMessageQueueService().getNextDocuments(null);
80 assertEquals("Should have 500 messages in the queue.", 500, messages.size());
81
82 turnOnMessaging();
83 new MessageFetcher((Integer) null).run();
84 synchronized (TestHarnessSharedTopic.LOCK) {
85 TestHarnessSharedTopic.LOCK.wait(5 * 60 * 1000);
86 }
87
88
89 Thread.sleep(500);
90
91 assertEquals("Service not called by message fetcher", TestHarnessSharedTopic.CALL_COUNT, TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD);
92
93 messages = KSBServiceLocator.getMessageQueueService().getNextDocuments(null);
94 assertEquals("Should still have no messages in the queue.", 0, messages.size());
95 }
96
97 private void sendMessage() {
98 QName serviceName = QName.valueOf("{testAppsSharedTopic}sharedTopic");
99 KSBJavaService testJavaAsyncService = (KSBJavaService) KsbApiServiceLocator.getMessageHelper().getServiceAsynchronously(serviceName);
100 testJavaAsyncService.invoke(new ClientAppServiceSharedPayloadObj("message content", false));
101 }
102
103 private void turnOnMessaging() {
104 ConfigContext.getCurrentContextConfig().putProperty(KSBConstants.Config.MESSAGING_OFF, "false");
105 }
106
107 @Test
108 public void testRequeueSingleMessage() throws Exception {
109 TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD = 1;
110
111 sendMessage();
112 sendMessage();
113 List<PersistedMessageBO> messages = KSBServiceLocator.getMessageQueueService().getNextDocuments(null);
114 assertEquals(2, messages.size());
115 assertNotNull("message should have been persisted", messages.get(0));
116 turnOnMessaging();
117
118
119 new MessageFetcher(messages.get(0).getRouteQueueId()).run();
120 synchronized (TestHarnessSharedTopic.LOCK) {
121 TestHarnessSharedTopic.LOCK.wait(3 * 1000);
122 }
123
124
125
126 Thread.sleep(500);
127
128 assertEquals("Service not called by message fetcher correct number of times", 1, TestHarnessSharedTopic.CALL_COUNT);
129 for (int i = 0; i < 10; i++) {
130 if (KSBServiceLocator.getMessageQueueService().getNextDocuments(null).size() == 1) {
131 break;
132 }
133 Thread.sleep(1000);
134 }
135 assertEquals("Message Queue should have a single remaining message because only single message was resent",
136 1, KSBServiceLocator.getMessageQueueService().getNextDocuments(null).size());
137 }
138
139 }