View Javadoc
1   /**
2    * Copyright 2005-2014 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.ksb.messaging;
17  
18  import static org.junit.Assert.assertEquals;
19  import static org.junit.Assert.assertNotNull;
20  import static org.junit.Assert.assertTrue;
21  
22  import java.util.List;
23  
24  import javax.xml.namespace.QName;
25  
26  import org.junit.After;
27  import org.junit.Before;
28  import org.junit.Test;
29  import org.kuali.rice.core.api.config.property.ConfigContext;
30  import org.kuali.rice.ksb.api.KsbApiServiceLocator;
31  import org.kuali.rice.ksb.messaging.remotedservices.TestHarnessSharedTopic;
32  import org.kuali.rice.ksb.messaging.service.KSBJavaService;
33  import org.kuali.rice.ksb.service.KSBServiceLocator;
34  import org.kuali.rice.ksb.test.KSBTestCase;
35  import org.kuali.rice.ksb.util.KSBConstants;
36  
37  /**
38   * Tests {@link MessageFetcher}. Turn messaging off but leave persistence on.
39   * this will result in messages being persisted to db but not delivered. from
40   * there we start up the {@link MessageFetcher} and make sure he does his job.
41   * 
42   * @author Kuali Rice Team (rice.collab@kuali.org)
43   * 
44   */
45  public class MessageFetcherTest extends KSBTestCase {
46  
47      @Before
48      @Override
49      public void setUp() throws Exception {
50          super.setUp();
51          ConfigContext.getCurrentContextConfig().putProperty(KSBConstants.Config.MESSAGING_OFF, "true");
52          TestHarnessSharedTopic.CALL_COUNT = 0;
53          TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD = 0;
54      }
55  
56      @After
57      @Override
58      public void tearDown() throws Exception {
59          TestHarnessSharedTopic.CALL_COUNT = 0;
60          super.tearDown();
61      }
62  
63      @Test
64      public void testRequeueMessages() throws Exception {
65  
66          List<PersistedMessageBO> messages = KSBServiceLocator.getMessageQueueService().getNextDocuments(null);
67          assertEquals("Should have no messages in the queue.", 0, messages.size());
68  
69          // this number is way over the top but we're going to see if it works in
70          // an overworked CI env.
71          TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD = 500;
72  
73          for (int i = 0; i < TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD; i++) {
74              sendMessage();
75          }
76  
77          // make sure all async calls land in the db
78          Thread.sleep(5000);
79  
80          messages = KSBServiceLocator.getMessageQueueService().getNextDocuments(null);
81          assertEquals("Should have 500 messages in the queue.", 500, messages.size());
82      
83          turnOnMessaging();
84          new MessageFetcher((Integer) null).run();
85          synchronized (TestHarnessSharedTopic.LOCK) {
86              TestHarnessSharedTopic.LOCK.wait(5 * 60 * 1000);
87          }
88          // sleep here for half a second because the notify above is executed from inside the database transaction in TestHarnessSharedTopic,
89          // we need to give that transaction time to be fully committed.
90          Thread.sleep(500);
91  
92          assertEquals("Service not called by message fetcher", TestHarnessSharedTopic.CALL_COUNT, TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD);
93  
94          messages = KSBServiceLocator.getMessageQueueService().getNextDocuments(null);
95          assertEquals("Should still have no messages in the queue.", 0, messages.size());
96      }
97  
98      private void sendMessage() {
99          QName serviceName = QName.valueOf("{testAppsSharedTopic}sharedTopic");
100         KSBJavaService testJavaAsyncService = (KSBJavaService) KsbApiServiceLocator.getMessageHelper().getServiceAsynchronously(serviceName);
101         testJavaAsyncService.invoke(new ClientAppServiceSharedPayloadObj("message content", false));
102     }
103 
104     private void turnOnMessaging() {
105         ConfigContext.getCurrentContextConfig().putProperty(KSBConstants.Config.MESSAGING_OFF, "false");
106     }
107 
108     @Test
109     public void testRequeueSingleMessage() throws Exception {
110         TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD = 1;
111         // record two messages
112         sendMessage();
113         sendMessage();
114         List<PersistedMessageBO> messages = KSBServiceLocator.getMessageQueueService().getNextDocuments(null);
115         assertEquals(2, messages.size());
116         assertNotNull("message should have been persisted", messages.get(0));
117         turnOnMessaging();
118 
119         // fetch and deliver one message
120         new MessageFetcher(messages.get(0).getRouteQueueId()).run();
121         synchronized (TestHarnessSharedTopic.LOCK) {
122             TestHarnessSharedTopic.LOCK.wait(3 * 1000);
123         }
124 
125         // sleep here for half a second because the notify above is executed from inside the database transaction in TestHarnessSharedTopic,
126         // we need to give that transaction time to be fully committed.
127         Thread.sleep(500);
128 
129         assertEquals("Service not called by message fetcher correct number of times", 1, TestHarnessSharedTopic.CALL_COUNT);
130         for (int i = 0; i < 10; i++) {
131             if (KSBServiceLocator.getMessageQueueService().getNextDocuments(null).size() == 1) {
132                 break;
133             }
134             Thread.sleep(1000);
135         }
136         assertEquals("Message Queue should have a single remaining message because only single message was resent",
137                      1, KSBServiceLocator.getMessageQueueService().getNextDocuments(null).size());
138     }
139 
140 }