View Javadoc

1   /*
2    * Copyright 2006-2011 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.kuali.rice.ksb.messaging;
18  
19  import static org.junit.Assert.assertEquals;
20  import static org.junit.Assert.assertNotNull;
21  import static org.junit.Assert.assertTrue;
22  
23  import java.util.List;
24  
25  import javax.xml.namespace.QName;
26  
27  import org.junit.After;
28  import org.junit.Before;
29  import org.junit.Test;
30  import org.kuali.rice.core.api.config.property.ConfigContext;
31  import org.kuali.rice.ksb.api.KsbApiServiceLocator;
32  import org.kuali.rice.ksb.messaging.remotedservices.TestHarnessSharedTopic;
33  import org.kuali.rice.ksb.messaging.service.KSBJavaService;
34  import org.kuali.rice.ksb.service.KSBServiceLocator;
35  import org.kuali.rice.ksb.test.KSBTestCase;
36  import org.kuali.rice.ksb.util.KSBConstants;
37  
38  /**
39   * Tests {@link MessageFetcher}. Turn messaging off but leave persistence on.
40   * this will result in messages being persisted to db but not delivered. from
41   * there we start up the {@link MessageFetcher} and make sure he does his job.
42   * 
43   * @author Kuali Rice Team (rice.collab@kuali.org)
44   * 
45   */
46  public class MessageFetcherTest extends KSBTestCase {
47  
48      @Before
49  	@Override
50  	public void setUp() throws Exception {
51  		super.setUp();
52  		ConfigContext.getCurrentContextConfig().putProperty(
53  				KSBConstants.Config.MESSAGING_OFF, "true");
54  		TestHarnessSharedTopic.CALL_COUNT = 0;
55  	}
56  
57      @After
58  	@Override
59  	public void tearDown() throws Exception {
60  		TestHarnessSharedTopic.CALL_COUNT = 0;
61  	}
62  
63  	@Test
64  	public void testRequeueMessages() throws Exception {
65  
66  		List<PersistedMessageBO> messages = KSBServiceLocator.getMessageQueueService().getNextDocuments(null);
67  		assertEquals("Should have no messages in the queue.", 0, messages.size());
68  		
69  		// this number is way over the top but we're going to see if it works in
70  		// an overworked CI env.
71  		TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD = 500;
72  
73  		for (int i = 0; i < TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD; i++) {
74  			sendMessage();
75  		}
76  
77  		messages = KSBServiceLocator.getMessageQueueService().getNextDocuments(null);
78  		assertEquals("Should have 500 messages in the queue.", 500, messages.size());
79  		
80  		turnOnMessaging();
81  		new MessageFetcher((Integer) null).run();
82  		synchronized (TestHarnessSharedTopic.LOCK) {
83  			TestHarnessSharedTopic.LOCK.wait(5 * 60 * 1000);
84  		}
85  		// sleep here for half a second because the notify above is executed from inside the database transaction in TestHarnessSharedTopic,
86  		// we need to give that transaction time to be fully committed.
87  		Thread.sleep(500);
88  
89  		assertTrue(
90  				"Service not called by message fetcher",
91  				TestHarnessSharedTopic.CALL_COUNT == TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD);
92  		
93  		messages = KSBServiceLocator.getMessageQueueService().getNextDocuments(null);
94  		assertEquals("Should still have no messages in the queue.", 0, messages.size());
95  	}
96  
97  	private void sendMessage() {
98  		QName serviceName = QName.valueOf("{testAppsSharedTopic}sharedTopic");
99  		KSBJavaService testJavaAsyncService = (KSBJavaService) KsbApiServiceLocator
100 				.getMessageHelper().getServiceAsynchronously(serviceName);
101 		testJavaAsyncService.invoke(new ClientAppServiceSharedPayloadObj(
102 				"message content", false));
103 	}
104 
105 	private void turnOnMessaging() {
106 		ConfigContext.getCurrentContextConfig().putProperty(
107 				KSBConstants.Config.MESSAGING_OFF, "false");
108 	}
109 
110 	@Test
111 	public void testRequeueSingleMessage() throws Exception {
112 		sendMessage();
113 		sendMessage();
114 		PersistedMessageBO message = KSBServiceLocator.getMessageQueueService()
115 				.getNextDocuments(null).get(0);
116 		assertNotNull("message should have been persisted", message);
117 		turnOnMessaging();
118 		new MessageFetcher(message.getRouteQueueId()).run();
119 		synchronized (TestHarnessSharedTopic.LOCK) {
120 			TestHarnessSharedTopic.LOCK.wait(3 * 1000);
121 		}
122 		// sleep here for half a second because the notify above is executed from inside the database transaction in TestHarnessSharedTopic,
123 		// we need to give that transaction time to be fully committed.
124 		Thread.sleep(500);
125 
126 		assertTrue(
127 				"Service not called by message fetcher corrent number of times",
128 				1 == TestHarnessSharedTopic.CALL_COUNT);
129 		for (int i = 0; i < 10; i++) {
130 			if (KSBServiceLocator.getMessageQueueService().getNextDocuments(null)
131 					.size() == 1) {
132 				break;
133 			}
134 			Thread.sleep(1000);
135 		}
136 		assertEquals(
137 				"Message Queue should have a single remaining message because only single message was resent",
138 				1, KSBServiceLocator.getMessageQueueService().getNextDocuments(
139 						null).size());
140 	}
141 
142 }