001 /**
002 * Copyright 2005-2011 The Kuali Foundation
003 *
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/ecl2.php
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package org.kuali.rice.ksb.messaging;
017
018 import static org.junit.Assert.assertEquals;
019 import static org.junit.Assert.assertNotNull;
020 import static org.junit.Assert.assertTrue;
021
022 import java.util.List;
023
024 import javax.xml.namespace.QName;
025
026 import org.junit.After;
027 import org.junit.Before;
028 import org.junit.Test;
029 import org.kuali.rice.core.api.config.property.ConfigContext;
030 import org.kuali.rice.ksb.api.KsbApiServiceLocator;
031 import org.kuali.rice.ksb.messaging.remotedservices.TestHarnessSharedTopic;
032 import org.kuali.rice.ksb.messaging.service.KSBJavaService;
033 import org.kuali.rice.ksb.service.KSBServiceLocator;
034 import org.kuali.rice.ksb.test.KSBTestCase;
035 import org.kuali.rice.ksb.util.KSBConstants;
036
037 /**
038 * Tests {@link MessageFetcher}. Turn messaging off but leave persistence on.
039 * this will result in messages being persisted to db but not delivered. from
040 * there we start up the {@link MessageFetcher} and make sure he does his job.
041 *
042 * @author Kuali Rice Team (rice.collab@kuali.org)
043 *
044 */
045 public class MessageFetcherTest extends KSBTestCase {
046
047 @Before
048 @Override
049 public void setUp() throws Exception {
050 super.setUp();
051 ConfigContext.getCurrentContextConfig().putProperty(KSBConstants.Config.MESSAGING_OFF, "true");
052 TestHarnessSharedTopic.CALL_COUNT = 0;
053 TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD = 0;
054 }
055
056 @After
057 @Override
058 public void tearDown() throws Exception {
059 TestHarnessSharedTopic.CALL_COUNT = 0;
060 }
061
062 @Test
063 public void testRequeueMessages() throws Exception {
064
065 List<PersistedMessageBO> messages = KSBServiceLocator.getMessageQueueService().getNextDocuments(null);
066 assertEquals("Should have no messages in the queue.", 0, messages.size());
067
068 // this number is way over the top but we're going to see if it works in
069 // an overworked CI env.
070 TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD = 500;
071
072 for (int i = 0; i < TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD; i++) {
073 sendMessage();
074 }
075
076 // make sure all async calls land in the db
077 Thread.sleep(5000);
078
079 messages = KSBServiceLocator.getMessageQueueService().getNextDocuments(null);
080 assertEquals("Should have 500 messages in the queue.", 500, messages.size());
081
082 turnOnMessaging();
083 new MessageFetcher((Integer) null).run();
084 synchronized (TestHarnessSharedTopic.LOCK) {
085 TestHarnessSharedTopic.LOCK.wait(5 * 60 * 1000);
086 }
087 // sleep here for half a second because the notify above is executed from inside the database transaction in TestHarnessSharedTopic,
088 // we need to give that transaction time to be fully committed.
089 Thread.sleep(500);
090
091 assertEquals("Service not called by message fetcher", TestHarnessSharedTopic.CALL_COUNT, TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD);
092
093 messages = KSBServiceLocator.getMessageQueueService().getNextDocuments(null);
094 assertEquals("Should still have no messages in the queue.", 0, messages.size());
095 }
096
097 private void sendMessage() {
098 QName serviceName = QName.valueOf("{testAppsSharedTopic}sharedTopic");
099 KSBJavaService testJavaAsyncService = (KSBJavaService) KsbApiServiceLocator.getMessageHelper().getServiceAsynchronously(serviceName);
100 testJavaAsyncService.invoke(new ClientAppServiceSharedPayloadObj("message content", false));
101 }
102
103 private void turnOnMessaging() {
104 ConfigContext.getCurrentContextConfig().putProperty(KSBConstants.Config.MESSAGING_OFF, "false");
105 }
106
107 @Test
108 public void testRequeueSingleMessage() throws Exception {
109 TestHarnessSharedTopic.CALL_COUNT_NOTIFICATION_THRESHOLD = 1;
110 // record two messages
111 sendMessage();
112 sendMessage();
113 List<PersistedMessageBO> messages = KSBServiceLocator.getMessageQueueService().getNextDocuments(null);
114 assertEquals(2, messages.size());
115 assertNotNull("message should have been persisted", messages.get(0));
116 turnOnMessaging();
117
118 // fetch and deliver one message
119 new MessageFetcher(messages.get(0).getRouteQueueId()).run();
120 synchronized (TestHarnessSharedTopic.LOCK) {
121 TestHarnessSharedTopic.LOCK.wait(3 * 1000);
122 }
123
124 // sleep here for half a second because the notify above is executed from inside the database transaction in TestHarnessSharedTopic,
125 // we need to give that transaction time to be fully committed.
126 Thread.sleep(500);
127
128 assertEquals("Service not called by message fetcher correct number of times", 1, TestHarnessSharedTopic.CALL_COUNT);
129 for (int i = 0; i < 10; i++) {
130 if (KSBServiceLocator.getMessageQueueService().getNextDocuments(null).size() == 1) {
131 break;
132 }
133 Thread.sleep(1000);
134 }
135 assertEquals("Message Queue should have a single remaining message because only single message was resent",
136 1, KSBServiceLocator.getMessageQueueService().getNextDocuments(null).size());
137 }
138
139 }