1
2
3
4
5
6
7
8
9
10
11
12
13 package org.kuali.rice.ksb.messaging;
14
15 import java.util.List;
16
17 import javax.xml.namespace.QName;
18
19 import org.junit.Test;
20 import org.kuali.rice.core.config.ConfigContext;
21 import org.kuali.rice.ksb.messaging.remotedservices.TestHarnessSharedTopic;
22 import org.kuali.rice.ksb.messaging.service.KSBJavaService;
23 import org.kuali.rice.ksb.service.KSBServiceLocator;
24 import org.kuali.rice.ksb.test.KSBTestCase;
25 import org.kuali.rice.ksb.util.KSBConstants;
26
27
28
29
30
31
32
33
34
35 public class MessageFetcherTest extends KSBTestCase {
36
37 @Override
38 public void setUp() throws Exception {
39 super.setUp();
40 ConfigContext.getCurrentContextConfig().putProperty(
41 KSBConstants.MESSAGING_OFF, "true");
42 TestHarnessSharedTopic.CALL_COUNT = 0;
43 }
44
45 @Override
46 public void tearDown() throws Exception {
47 TestHarnessSharedTopic.CALL_COUNT = 0;
48 }
49
50 @Test
51 public void testRequeueMessages() throws Exception {
52
53 List<PersistedMessage> messages = KSBServiceLocator.getRouteQueueService().getNextDocuments(null);
54 assertEquals("Should have no messages in the queue.", 0, messages.size());
55
56
57
58 TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD = 500;
59
60 for (int i = 0; i < TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD; i++) {
61 sendMessage();
62 }
63
64 messages = KSBServiceLocator.getRouteQueueService().getNextDocuments(null);
65 assertEquals("Should have 500 messages in the queue.", 500, messages.size());
66
67 turnOnMessaging();
68 new MessageFetcher((Integer) null).run();
69 synchronized (TestHarnessSharedTopic.LOCK) {
70 TestHarnessSharedTopic.LOCK.wait(5 * 60 * 1000);
71 }
72
73
74 Thread.sleep(500);
75
76 assertTrue(
77 "Service not called by message fetcher",
78 TestHarnessSharedTopic.CALL_COUNT == TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD);
79
80 messages = KSBServiceLocator.getRouteQueueService().getNextDocuments(null);
81 assertEquals("Should still have no messages in the queue.", 0, messages.size());
82 }
83
84 private void sendMessage() {
85 QName serviceName = QName.valueOf("{testAppsSharedTopic}sharedTopic");
86 KSBJavaService testJavaAsyncService = (KSBJavaService) KSBServiceLocator
87 .getMessageHelper().getServiceAsynchronously(serviceName);
88 testJavaAsyncService.invoke(new ClientAppServiceSharedPayloadObj(
89 "message content", false));
90 }
91
92 private void turnOnMessaging() {
93 ConfigContext.getCurrentContextConfig().putProperty(
94 KSBConstants.MESSAGING_OFF, "false");
95 }
96
97 @Test
98 public void testRequeueSingleMessage() throws Exception {
99 sendMessage();
100 sendMessage();
101 PersistedMessage message = KSBServiceLocator.getRouteQueueService()
102 .getNextDocuments(null).get(0);
103 assertNotNull("message should have been persisted", message);
104 turnOnMessaging();
105 new MessageFetcher(message.getRouteQueueId()).run();
106 synchronized (TestHarnessSharedTopic.LOCK) {
107 TestHarnessSharedTopic.LOCK.wait(3 * 1000);
108 }
109
110
111 Thread.sleep(500);
112
113 assertTrue(
114 "Service not called by message fetcher corrent number of times",
115 1 == TestHarnessSharedTopic.CALL_COUNT);
116 for (int i = 0; i < 10; i++) {
117 if (KSBServiceLocator.getRouteQueueService().getNextDocuments(null)
118 .size() == 1) {
119 break;
120 }
121 Thread.sleep(1000);
122 }
123 assertEquals(
124 "Message Queue should have a single remaining message because only single message was resent",
125 1, KSBServiceLocator.getRouteQueueService().getNextDocuments(
126 null).size());
127 }
128
129 }