View Javadoc

1   /*
2    * Copyright 2007 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License"); you may not use this file except in
5    * compliance with the License. You may obtain a copy of the License at
6    *
7    * http://www.opensource.org/licenses/ecl2.php
8    *
9    * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS
10   * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific
11   * language governing permissions and limitations under the License.
12   */
13  package org.kuali.rice.ksb.messaging;
14  
15  import java.util.List;
16  
17  import javax.xml.namespace.QName;
18  
19  import org.junit.Test;
20  import org.kuali.rice.core.config.ConfigContext;
21  import org.kuali.rice.ksb.messaging.remotedservices.TestHarnessSharedTopic;
22  import org.kuali.rice.ksb.messaging.service.KSBJavaService;
23  import org.kuali.rice.ksb.service.KSBServiceLocator;
24  import org.kuali.rice.ksb.test.KSBTestCase;
25  import org.kuali.rice.ksb.util.KSBConstants;
26  
27  /**
28   * Tests {@link MessageFetcher}. Turn messaging off but leave persistence on.
29   * this will result in messages being persisted to db but not delivered. from
30   * there we start up the {@link MessageFetcher} and make sure he does his job.
31   * 
32   * @author Kuali Rice Team (rice.collab@kuali.org)
33   * 
34   */
35  public class MessageFetcherTest extends KSBTestCase {
36  
37  	@Override
38  	public void setUp() throws Exception {
39  		super.setUp();
40  		ConfigContext.getCurrentContextConfig().putProperty(
41  				KSBConstants.MESSAGING_OFF, "true");
42  		TestHarnessSharedTopic.CALL_COUNT = 0;
43  	}
44  
45  	@Override
46  	public void tearDown() throws Exception {
47  		TestHarnessSharedTopic.CALL_COUNT = 0;
48  	}
49  
50  	@Test
51  	public void testRequeueMessages() throws Exception {
52  
53  		List<PersistedMessage> messages = KSBServiceLocator.getRouteQueueService().getNextDocuments(null);
54  		assertEquals("Should have no messages in the queue.", 0, messages.size());
55  		
56  		// this number is way over the top but we're going to see if it works in
57  		// an overworked CI env.
58  		TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD = 500;
59  
60  		for (int i = 0; i < TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD; i++) {
61  			sendMessage();
62  		}
63  
64  		messages = KSBServiceLocator.getRouteQueueService().getNextDocuments(null);
65  		assertEquals("Should have 500 messages in the queue.", 500, messages.size());
66  		
67  		turnOnMessaging();
68  		new MessageFetcher((Integer) null).run();
69  		synchronized (TestHarnessSharedTopic.LOCK) {
70  			TestHarnessSharedTopic.LOCK.wait(5 * 60 * 1000);
71  		}
72  		// sleep here for half a second because the notify above is executed from inside the database transaction in TestHarnessSharedTopic,
73  		// we need to give that transaction time to be fully committed.
74  		Thread.sleep(500);
75  
76  		assertTrue(
77  				"Service not called by message fetcher",
78  				TestHarnessSharedTopic.CALL_COUNT == TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD);
79  		
80  		messages = KSBServiceLocator.getRouteQueueService().getNextDocuments(null);
81  		assertEquals("Should still have no messages in the queue.", 0, messages.size());
82  	}
83  
84  	private void sendMessage() {
85  		QName serviceName = QName.valueOf("{testAppsSharedTopic}sharedTopic");
86  		KSBJavaService testJavaAsyncService = (KSBJavaService) KSBServiceLocator
87  				.getMessageHelper().getServiceAsynchronously(serviceName);
88  		testJavaAsyncService.invoke(new ClientAppServiceSharedPayloadObj(
89  				"message content", false));
90  	}
91  
92  	private void turnOnMessaging() {
93  		ConfigContext.getCurrentContextConfig().putProperty(
94  				KSBConstants.MESSAGING_OFF, "false");
95  	}
96  
97  	@Test
98  	public void testRequeueSingleMessage() throws Exception {
99  		sendMessage();
100 		sendMessage();
101 		PersistedMessage message = KSBServiceLocator.getRouteQueueService()
102 				.getNextDocuments(null).get(0);
103 		assertNotNull("message should have been persisted", message);
104 		turnOnMessaging();
105 		new MessageFetcher(message.getRouteQueueId()).run();
106 		synchronized (TestHarnessSharedTopic.LOCK) {
107 			TestHarnessSharedTopic.LOCK.wait(3 * 1000);
108 		}
109 		// sleep here for half a second because the notify above is executed from inside the database transaction in TestHarnessSharedTopic,
110 		// we need to give that transaction time to be fully committed.
111 		Thread.sleep(500);
112 
113 		assertTrue(
114 				"Service not called by message fetcher corrent number of times",
115 				1 == TestHarnessSharedTopic.CALL_COUNT);
116 		for (int i = 0; i < 10; i++) {
117 			if (KSBServiceLocator.getRouteQueueService().getNextDocuments(null)
118 					.size() == 1) {
119 				break;
120 			}
121 			Thread.sleep(1000);
122 		}
123 		assertEquals(
124 				"Message Queue should have a single remaining message because only single message was resent",
125 				1, KSBServiceLocator.getRouteQueueService().getNextDocuments(
126 						null).size());
127 	}
128 
129 }