001    /**
002     * Copyright 2005-2011 The Kuali Foundation
003     *
004     * Licensed under the Educational Community License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.opensource.org/licenses/ecl2.php
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package org.kuali.rice.ksb.messaging;
017    
018    import static org.junit.Assert.assertEquals;
019    import static org.junit.Assert.assertNotNull;
020    import static org.junit.Assert.assertTrue;
021    
022    import java.util.List;
023    
024    import javax.xml.namespace.QName;
025    
026    import org.junit.After;
027    import org.junit.Before;
028    import org.junit.Test;
029    import org.kuali.rice.core.api.config.property.ConfigContext;
030    import org.kuali.rice.ksb.api.KsbApiServiceLocator;
031    import org.kuali.rice.ksb.messaging.remotedservices.TestHarnessSharedTopic;
032    import org.kuali.rice.ksb.messaging.service.KSBJavaService;
033    import org.kuali.rice.ksb.service.KSBServiceLocator;
034    import org.kuali.rice.ksb.test.KSBTestCase;
035    import org.kuali.rice.ksb.util.KSBConstants;
036    
037    /**
038     * Tests {@link MessageFetcher}. Turn messaging off but leave persistence on.
039     * this will result in messages being persisted to db but not delivered. from
040     * there we start up the {@link MessageFetcher} and make sure he does his job.
041     * 
042     * @author Kuali Rice Team (rice.collab@kuali.org)
043     * 
044     */
045    public class MessageFetcherTest extends KSBTestCase {
046    
047        @Before
048        @Override
049        public void setUp() throws Exception {
050            super.setUp();
051            ConfigContext.getCurrentContextConfig().putProperty(KSBConstants.Config.MESSAGING_OFF, "true");
052            TestHarnessSharedTopic.CALL_COUNT = 0;
053            TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD = 0;
054        }
055    
056        @After
057        @Override
058        public void tearDown() throws Exception {
059            TestHarnessSharedTopic.CALL_COUNT = 0;
060        }
061    
062        @Test
063        public void testRequeueMessages() throws Exception {
064    
065            List<PersistedMessageBO> messages = KSBServiceLocator.getMessageQueueService().getNextDocuments(null);
066            assertEquals("Should have no messages in the queue.", 0, messages.size());
067    
068            // this number is way over the top but we're going to see if it works in
069            // an overworked CI env.
070            TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD = 500;
071    
072            for (int i = 0; i < TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD; i++) {
073                sendMessage();
074            }
075    
076            // make sure all async calls land in the db
077            Thread.sleep(5000);
078    
079            messages = KSBServiceLocator.getMessageQueueService().getNextDocuments(null);
080            assertEquals("Should have 500 messages in the queue.", 500, messages.size());
081        
082            turnOnMessaging();
083            new MessageFetcher((Integer) null).run();
084            synchronized (TestHarnessSharedTopic.LOCK) {
085                TestHarnessSharedTopic.LOCK.wait(5 * 60 * 1000);
086            }
087            // sleep here for half a second because the notify above is executed from inside the database transaction in TestHarnessSharedTopic,
088            // we need to give that transaction time to be fully committed.
089            Thread.sleep(500);
090    
091            assertEquals("Service not called by message fetcher", TestHarnessSharedTopic.CALL_COUNT, TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD);
092    
093            messages = KSBServiceLocator.getMessageQueueService().getNextDocuments(null);
094            assertEquals("Should still have no messages in the queue.", 0, messages.size());
095        }
096    
097        private void sendMessage() {
098            QName serviceName = QName.valueOf("{testAppsSharedTopic}sharedTopic");
099            KSBJavaService testJavaAsyncService = (KSBJavaService) KsbApiServiceLocator.getMessageHelper().getServiceAsynchronously(serviceName);
100            testJavaAsyncService.invoke(new ClientAppServiceSharedPayloadObj("message content", false));
101        }
102    
103        private void turnOnMessaging() {
104            ConfigContext.getCurrentContextConfig().putProperty(KSBConstants.Config.MESSAGING_OFF, "false");
105        }
106    
107        @Test
108        public void testRequeueSingleMessage() throws Exception {
109            TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD = 1;
110            // record two messages
111            sendMessage();
112            sendMessage();
113            List<PersistedMessageBO> messages = KSBServiceLocator.getMessageQueueService().getNextDocuments(null);
114            assertEquals(2, messages.size());
115            assertNotNull("message should have been persisted", messages.get(0));
116            turnOnMessaging();
117    
118            // fetch and deliver one message
119            new MessageFetcher(messages.get(0).getRouteQueueId()).run();
120            synchronized (TestHarnessSharedTopic.LOCK) {
121                TestHarnessSharedTopic.LOCK.wait(3 * 1000);
122            }
123    
124            // sleep here for half a second because the notify above is executed from inside the database transaction in TestHarnessSharedTopic,
125            // we need to give that transaction time to be fully committed.
126            Thread.sleep(500);
127    
128            assertEquals("Service not called by message fetcher correct number of times", 1, TestHarnessSharedTopic.CALL_COUNT);
129            for (int i = 0; i < 10; i++) {
130                if (KSBServiceLocator.getMessageQueueService().getNextDocuments(null).size() == 1) {
131                    break;
132                }
133                Thread.sleep(1000);
134            }
135            assertEquals("Message Queue should have a single remaining message because only single message was resent",
136                         1, KSBServiceLocator.getMessageQueueService().getNextDocuments(null).size());
137        }
138    
139    }