001 /** 002 * Copyright 2005-2011 The Kuali Foundation 003 * 004 * Licensed under the Educational Community License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/ecl2.php 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 package org.kuali.rice.ksb.messaging; 017 018 import static org.junit.Assert.assertEquals; 019 import static org.junit.Assert.assertNotNull; 020 import static org.junit.Assert.assertTrue; 021 022 import java.util.List; 023 024 import javax.xml.namespace.QName; 025 026 import org.junit.After; 027 import org.junit.Before; 028 import org.junit.Test; 029 import org.kuali.rice.core.api.config.property.ConfigContext; 030 import org.kuali.rice.ksb.api.KsbApiServiceLocator; 031 import org.kuali.rice.ksb.messaging.remotedservices.TestHarnessSharedTopic; 032 import org.kuali.rice.ksb.messaging.service.KSBJavaService; 033 import org.kuali.rice.ksb.service.KSBServiceLocator; 034 import org.kuali.rice.ksb.test.KSBTestCase; 035 import org.kuali.rice.ksb.util.KSBConstants; 036 037 /** 038 * Tests {@link MessageFetcher}. Turn messaging off but leave persistence on. 039 * this will result in messages being persisted to db but not delivered. from 040 * there we start up the {@link MessageFetcher} and make sure he does his job. 041 * 042 * @author Kuali Rice Team (rice.collab@kuali.org) 043 * 044 */ 045 public class MessageFetcherTest extends KSBTestCase { 046 047 @Before 048 @Override 049 public void setUp() throws Exception { 050 super.setUp(); 051 ConfigContext.getCurrentContextConfig().putProperty(KSBConstants.Config.MESSAGING_OFF, "true"); 052 TestHarnessSharedTopic.CALL_COUNT = 0; 053 TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD = 0; 054 } 055 056 @After 057 @Override 058 public void tearDown() throws Exception { 059 TestHarnessSharedTopic.CALL_COUNT = 0; 060 } 061 062 @Test 063 public void testRequeueMessages() throws Exception { 064 065 List<PersistedMessageBO> messages = KSBServiceLocator.getMessageQueueService().getNextDocuments(null); 066 assertEquals("Should have no messages in the queue.", 0, messages.size()); 067 068 // this number is way over the top but we're going to see if it works in 069 // an overworked CI env. 070 TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD = 500; 071 072 for (int i = 0; i < TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD; i++) { 073 sendMessage(); 074 } 075 076 // make sure all async calls land in the db 077 Thread.sleep(5000); 078 079 messages = KSBServiceLocator.getMessageQueueService().getNextDocuments(null); 080 assertEquals("Should have 500 messages in the queue.", 500, messages.size()); 081 082 turnOnMessaging(); 083 new MessageFetcher((Integer) null).run(); 084 synchronized (TestHarnessSharedTopic.LOCK) { 085 TestHarnessSharedTopic.LOCK.wait(5 * 60 * 1000); 086 } 087 // sleep here for half a second because the notify above is executed from inside the database transaction in TestHarnessSharedTopic, 088 // we need to give that transaction time to be fully committed. 089 Thread.sleep(500); 090 091 assertEquals("Service not called by message fetcher", TestHarnessSharedTopic.CALL_COUNT, TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD); 092 093 messages = KSBServiceLocator.getMessageQueueService().getNextDocuments(null); 094 assertEquals("Should still have no messages in the queue.", 0, messages.size()); 095 } 096 097 private void sendMessage() { 098 QName serviceName = QName.valueOf("{testAppsSharedTopic}sharedTopic"); 099 KSBJavaService testJavaAsyncService = (KSBJavaService) KsbApiServiceLocator.getMessageHelper().getServiceAsynchronously(serviceName); 100 testJavaAsyncService.invoke(new ClientAppServiceSharedPayloadObj("message content", false)); 101 } 102 103 private void turnOnMessaging() { 104 ConfigContext.getCurrentContextConfig().putProperty(KSBConstants.Config.MESSAGING_OFF, "false"); 105 } 106 107 @Test 108 public void testRequeueSingleMessage() throws Exception { 109 TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD = 1; 110 // record two messages 111 sendMessage(); 112 sendMessage(); 113 List<PersistedMessageBO> messages = KSBServiceLocator.getMessageQueueService().getNextDocuments(null); 114 assertEquals(2, messages.size()); 115 assertNotNull("message should have been persisted", messages.get(0)); 116 turnOnMessaging(); 117 118 // fetch and deliver one message 119 new MessageFetcher(messages.get(0).getRouteQueueId()).run(); 120 synchronized (TestHarnessSharedTopic.LOCK) { 121 TestHarnessSharedTopic.LOCK.wait(3 * 1000); 122 } 123 124 // sleep here for half a second because the notify above is executed from inside the database transaction in TestHarnessSharedTopic, 125 // we need to give that transaction time to be fully committed. 126 Thread.sleep(500); 127 128 assertEquals("Service not called by message fetcher correct number of times", 1, TestHarnessSharedTopic.CALL_COUNT); 129 for (int i = 0; i < 10; i++) { 130 if (KSBServiceLocator.getMessageQueueService().getNextDocuments(null).size() == 1) { 131 break; 132 } 133 Thread.sleep(1000); 134 } 135 assertEquals("Message Queue should have a single remaining message because only single message was resent", 136 1, KSBServiceLocator.getMessageQueueService().getNextDocuments(null).size()); 137 } 138 139 }