1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.ksb.messaging;
17
18 import static org.junit.Assert.assertEquals;
19 import static org.junit.Assert.assertNotNull;
20 import static org.junit.Assert.assertTrue;
21
22 import java.util.List;
23
24 import javax.xml.namespace.QName;
25
26 import org.junit.After;
27 import org.junit.Before;
28 import org.junit.Test;
29 import org.kuali.rice.core.api.config.property.ConfigContext;
30 import org.kuali.rice.ksb.api.KsbApiServiceLocator;
31 import org.kuali.rice.ksb.messaging.remotedservices.TestHarnessSharedTopic;
32 import org.kuali.rice.ksb.messaging.service.KSBJavaService;
33 import org.kuali.rice.ksb.service.KSBServiceLocator;
34 import org.kuali.rice.ksb.test.KSBTestCase;
35 import org.kuali.rice.ksb.util.KSBConstants;
36
37
38
39
40
41
42
43
44
45 public class MessageFetcherTest extends KSBTestCase {
46
47 @Before
48 @Override
49 public void setUp() throws Exception {
50 super.setUp();
51 ConfigContext.getCurrentContextConfig().putProperty(KSBConstants.Config.MESSAGING_OFF, "true");
52 TestHarnessSharedTopic.CALL_COUNT = 0;
53 TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD = 0;
54 }
55
56 @After
57 @Override
58 public void tearDown() throws Exception {
59 TestHarnessSharedTopic.CALL_COUNT = 0;
60 super.tearDown();
61 }
62
63 @Test
64 public void testRequeueMessages() throws Exception {
65
66 List<PersistedMessageBO> messages = KSBServiceLocator.getMessageQueueService().getNextDocuments(null);
67 assertEquals("Should have no messages in the queue.", 0, messages.size());
68
69
70
71 TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD = 500;
72
73 for (int i = 0; i < TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD; i++) {
74 sendMessage();
75 }
76
77
78 Thread.sleep(5000);
79
80 messages = KSBServiceLocator.getMessageQueueService().getNextDocuments(null);
81 assertEquals("Should have 500 messages in the queue.", 500, messages.size());
82
83 turnOnMessaging();
84 new MessageFetcher((Integer) null).run();
85 synchronized (TestHarnessSharedTopic.LOCK) {
86 TestHarnessSharedTopic.LOCK.wait(5 * 60 * 1000);
87 }
88
89
90 Thread.sleep(500);
91
92 assertEquals("Service not called by message fetcher", TestHarnessSharedTopic.CALL_COUNT, TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD);
93
94 messages = KSBServiceLocator.getMessageQueueService().getNextDocuments(null);
95 assertEquals("Should still have no messages in the queue.", 0, messages.size());
96 }
97
98 private void sendMessage() {
99 QName serviceName = QName.valueOf("{testAppsSharedTopic}sharedTopic");
100 KSBJavaService testJavaAsyncService = (KSBJavaService) KsbApiServiceLocator.getMessageHelper().getServiceAsynchronously(serviceName);
101 testJavaAsyncService.invoke(new ClientAppServiceSharedPayloadObj("message content", false));
102 }
103
104 private void turnOnMessaging() {
105 ConfigContext.getCurrentContextConfig().putProperty(KSBConstants.Config.MESSAGING_OFF, "false");
106 }
107
108 @Test
109 public void testRequeueSingleMessage() throws Exception {
110 TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD = 1;
111
112 sendMessage();
113 sendMessage();
114 List<PersistedMessageBO> messages = KSBServiceLocator.getMessageQueueService().getNextDocuments(null);
115 assertEquals(2, messages.size());
116 assertNotNull("message should have been persisted", messages.get(0));
117 turnOnMessaging();
118
119
120 new MessageFetcher(messages.get(0).getRouteQueueId()).run();
121 synchronized (TestHarnessSharedTopic.LOCK) {
122 TestHarnessSharedTopic.LOCK.wait(3 * 1000);
123 }
124
125
126
127 Thread.sleep(500);
128
129 assertEquals("Service not called by message fetcher correct number of times", 1, TestHarnessSharedTopic.CALL_COUNT);
130 for (int i = 0; i < 10; i++) {
131 if (KSBServiceLocator.getMessageQueueService().getNextDocuments(null).size() == 1) {
132 break;
133 }
134 Thread.sleep(1000);
135 }
136 assertEquals("Message Queue should have a single remaining message because only single message was resent",
137 1, KSBServiceLocator.getMessageQueueService().getNextDocuments(null).size());
138 }
139
140 }