1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.kuali.rice.ksb.messaging;
18
19 import static org.junit.Assert.assertEquals;
20 import static org.junit.Assert.assertNotNull;
21 import static org.junit.Assert.assertTrue;
22
23 import java.util.List;
24
25 import javax.xml.namespace.QName;
26
27 import org.junit.After;
28 import org.junit.Before;
29 import org.junit.Test;
30 import org.kuali.rice.core.api.config.property.ConfigContext;
31 import org.kuali.rice.ksb.api.KsbApiServiceLocator;
32 import org.kuali.rice.ksb.messaging.remotedservices.TestHarnessSharedTopic;
33 import org.kuali.rice.ksb.messaging.service.KSBJavaService;
34 import org.kuali.rice.ksb.service.KSBServiceLocator;
35 import org.kuali.rice.ksb.test.KSBTestCase;
36 import org.kuali.rice.ksb.util.KSBConstants;
37
38
39
40
41
42
43
44
45
46 public class MessageFetcherTest extends KSBTestCase {
47
48 @Before
49 @Override
50 public void setUp() throws Exception {
51 super.setUp();
52 ConfigContext.getCurrentContextConfig().putProperty(
53 KSBConstants.Config.MESSAGING_OFF, "true");
54 TestHarnessSharedTopic.CALL_COUNT = 0;
55 }
56
57 @After
58 @Override
59 public void tearDown() throws Exception {
60 TestHarnessSharedTopic.CALL_COUNT = 0;
61 }
62
63 @Test
64 public void testRequeueMessages() throws Exception {
65
66 List<PersistedMessageBO> messages = KSBServiceLocator.getMessageQueueService().getNextDocuments(null);
67 assertEquals("Should have no messages in the queue.", 0, messages.size());
68
69
70
71 TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD = 500;
72
73 for (int i = 0; i < TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD; i++) {
74 sendMessage();
75 }
76
77 messages = KSBServiceLocator.getMessageQueueService().getNextDocuments(null);
78 assertEquals("Should have 500 messages in the queue.", 500, messages.size());
79
80 turnOnMessaging();
81 new MessageFetcher((Integer) null).run();
82 synchronized (TestHarnessSharedTopic.LOCK) {
83 TestHarnessSharedTopic.LOCK.wait(5 * 60 * 1000);
84 }
85
86
87 Thread.sleep(500);
88
89 assertTrue(
90 "Service not called by message fetcher",
91 TestHarnessSharedTopic.CALL_COUNT == TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD);
92
93 messages = KSBServiceLocator.getMessageQueueService().getNextDocuments(null);
94 assertEquals("Should still have no messages in the queue.", 0, messages.size());
95 }
96
97 private void sendMessage() {
98 QName serviceName = QName.valueOf("{testAppsSharedTopic}sharedTopic");
99 KSBJavaService testJavaAsyncService = (KSBJavaService) KsbApiServiceLocator
100 .getMessageHelper().getServiceAsynchronously(serviceName);
101 testJavaAsyncService.invoke(new ClientAppServiceSharedPayloadObj(
102 "message content", false));
103 }
104
105 private void turnOnMessaging() {
106 ConfigContext.getCurrentContextConfig().putProperty(
107 KSBConstants.Config.MESSAGING_OFF, "false");
108 }
109
110 @Test
111 public void testRequeueSingleMessage() throws Exception {
112 sendMessage();
113 sendMessage();
114 PersistedMessageBO message = KSBServiceLocator.getMessageQueueService()
115 .getNextDocuments(null).get(0);
116 assertNotNull("message should have been persisted", message);
117 turnOnMessaging();
118 new MessageFetcher(message.getRouteQueueId()).run();
119 synchronized (TestHarnessSharedTopic.LOCK) {
120 TestHarnessSharedTopic.LOCK.wait(3 * 1000);
121 }
122
123
124 Thread.sleep(500);
125
126 assertTrue(
127 "Service not called by message fetcher corrent number of times",
128 1 == TestHarnessSharedTopic.CALL_COUNT);
129 for (int i = 0; i < 10; i++) {
130 if (KSBServiceLocator.getMessageQueueService().getNextDocuments(null)
131 .size() == 1) {
132 break;
133 }
134 Thread.sleep(1000);
135 }
136 assertEquals(
137 "Message Queue should have a single remaining message because only single message was resent",
138 1, KSBServiceLocator.getMessageQueueService().getNextDocuments(
139 null).size());
140 }
141
142 }