001    /*
002     * Copyright 2007-2008 The Kuali Foundation
003     *
004     * Licensed under the Educational Community License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.opensource.org/licenses/ecl2.php
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package org.kuali.rice.kcb.service.impl;
017    
018    import org.apache.commons.lang.RandomStringUtils;
019    import org.junit.Assert;
020    import org.junit.Test;
021    import org.kuali.rice.kcb.bo.MessageDelivery;
022    import org.kuali.rice.kcb.bo.MessageDeliveryStatus;
023    import org.kuali.rice.kcb.dto.MessageDTO;
024    import org.kuali.rice.kcb.quartz.MessageProcessingJob;
025    import org.kuali.rice.kcb.test.KCBTestCase;
026    import org.kuali.rice.ksb.service.KSBServiceLocator;
027    import org.kuali.rice.test.BaselineTestCase.BaselineMode;
028    import org.kuali.rice.test.BaselineTestCase.Mode;
029    import org.quartz.JobExecutionContext;
030    import org.quartz.JobExecutionException;
031    import org.quartz.SchedulerException;
032    import org.quartz.listeners.JobListenerSupport;
033    import org.springframework.transaction.support.TransactionSynchronizationManager;
034    
035    import java.util.Collection;
036    import java.util.concurrent.CountDownLatch;
037    import java.util.concurrent.TimeUnit;
038    
039    import static org.junit.Assert.*;
040    
041    /**
042     * Tests MessagingService 
043     * 
044     * @author Kuali Rice Team (rice.collab@kuali.org)
045     */
046    /*
047    @PerTestUnitTestData({
048        @UnitTestData(filename = "file:ken/src/main/config/sql/KENBootstrap.sql", delimiter = "/"),
049        @UnitTestData(filename = "classpath:org/kuali/rice/kcb/test/KENTestData.sql")
050    }
051    )*/
052    @BaselineMode(Mode.CLEAR_DB)
053    public class MessagingServiceTest extends KCBTestCase {
054        private CountDownLatch signal = new CountDownLatch(1);
055    
056        @Override
057        public void setUp() throws Exception {
058            super.setUp();
059        
060            services.getRecipientPreferenceService().saveRecipientDelivererConfig("testuser5", "mock", new String[] { "Test Channel #1" });
061            services.getRecipientPreferenceService().saveRecipientDelivererConfig("testuser5", "sms", new String[] { "Test Channel #1" });
062            services.getRecipientPreferenceService().saveRecipientDelivererConfig("testuser5", "broken", new String[] { "Test Channel #1" }); // this one throws exceptions
063            services.getRecipientPreferenceService().saveRecipientDelivererConfig("testuser5", "bogus", new String[] { "Test Channel #1" }); // this one doesn't exist
064            
065            assertEquals(4, services.getRecipientPreferenceService().getDeliverersForRecipientAndChannel("testuser5", "Test Channel #1").size());
066        }
067    
068        protected long deliver() throws Exception {
069            MessageDTO message = new MessageDTO();
070            message.setContent("test content 1");
071            message.setChannel("Test Channel #1");
072            message.setContentType("test content type 1");
073            message.setDeliveryType("test delivery type 1");
074            message.setRecipient("testuser5");
075            message.setTitle("test title 1");
076            message.setOriginId("origin id");
077    
078            registerJobListener();
079    
080            long id = services.getMessagingService().deliver(message);
081    
082            waitForNextJobCompletion();
083    
084            Collection<MessageDelivery> deliveries = services.getMessageDeliveryService().getAllMessageDeliveries();
085            assertNotNull(deliveries);
086            int delivCount = services.getRecipientPreferenceService().getDeliverersForRecipientAndChannel("testuser5", "Test Channel #1").size();
087            assertEquals(delivCount, deliveries.size());
088            assertTrue(deliveries.size() > 0);
089            int failed = 0;
090            for (MessageDelivery delivery: deliveries) {
091                if ("broken".equals(delivery.getDelivererTypeName()) || "bogus".equals(delivery.getDelivererTypeName())) {
092                    assertEquals(MessageDeliveryStatus.UNDELIVERED.name(), delivery.getDeliveryStatus());
093                    assertEquals(1, delivery.getProcessCount().intValue());
094                    failed++;
095                } else {
096                    assertEquals(MessageDeliveryStatus.DELIVERED.name(), delivery.getDeliveryStatus());
097                }
098            }
099    
100            assertEquals(2, failed);
101            
102            // try up till max attempts, which for now is 3
103            
104            waitForNextJobCompletion();
105    
106            failed = 0;
107            deliveries = services.getMessageDeliveryService().getAllMessageDeliveries();
108            for (MessageDelivery delivery: deliveries) {
109                if ("broken".equals(delivery.getDelivererTypeName()) || "bogus".equals(delivery.getDelivererTypeName())) {
110                    assertEquals(MessageDeliveryStatus.UNDELIVERED.name(), delivery.getDeliveryStatus());
111                    assertEquals(2, delivery.getProcessCount().intValue());
112                    failed++;
113                } else {
114                    assertEquals(MessageDeliveryStatus.DELIVERED.name(), delivery.getDeliveryStatus());
115                }
116            }
117    
118            assertEquals(2, failed);
119            
120            waitForNextJobCompletion();
121    
122            failed = 0;
123            deliveries = services.getMessageDeliveryService().getAllMessageDeliveries();
124            for (MessageDelivery delivery: deliveries) {
125                if ("broken".equals(delivery.getDelivererTypeName()) || "bogus".equals(delivery.getDelivererTypeName())) {
126                    assertEquals(MessageDeliveryStatus.UNDELIVERED.name(), delivery.getDeliveryStatus());
127                    assertEquals(3, delivery.getProcessCount().intValue());
128                    failed++;
129                } else {
130                    assertEquals(MessageDeliveryStatus.DELIVERED.name(), delivery.getDeliveryStatus());
131                }
132            }
133    
134            assertEquals(2, failed);
135            
136            // finally the last attempt, nothing should have changed
137            
138            waitForNextJobCompletion();
139    
140            failed = 0;
141            deliveries = services.getMessageDeliveryService().getAllMessageDeliveries();
142            for (MessageDelivery delivery: deliveries) {
143                if ("broken".equals(delivery.getDelivererTypeName()) || "bogus".equals(delivery.getDelivererTypeName())) {
144                    assertEquals(MessageDeliveryStatus.UNDELIVERED.name(), delivery.getDeliveryStatus());
145                    assertEquals(3, delivery.getProcessCount().intValue());
146                    failed++;
147                } else {
148                    assertEquals(MessageDeliveryStatus.DELIVERED.name(), delivery.getDeliveryStatus());
149                }
150            }
151    
152            assertEquals(2, failed);
153    
154            return id;
155        }
156    
157        @Test
158        public void testDeliver() throws Exception {
159            Assert.assertFalse(TransactionSynchronizationManager.isActualTransactionActive());
160    
161            deliver();
162        }
163        
164        @Test
165        public void testDismiss() throws Exception {
166            Assert.assertFalse(TransactionSynchronizationManager.isActualTransactionActive());
167    
168            long id = deliver();
169    
170            registerJobListener();
171    
172            services.getMessagingService().remove(id, "a user", "a cause");
173            
174            waitForNextJobCompletion();
175    
176            Collection<MessageDelivery> deliveries = services.getMessageDeliveryService().getAllMessageDeliveries();
177            assertNotNull(deliveries);
178            // should be all gone except for the 2 bad deliveries
179            assertEquals(2, deliveries.size());
180            for (MessageDelivery d: deliveries) {
181                assertTrue("broken".equals(d.getDelivererTypeName()) || "bogus".equals(d.getDelivererTypeName()));
182            }
183        }
184        
185        @Test
186        public void testDismissByOriginId() throws Exception {
187            Assert.assertFalse(TransactionSynchronizationManager.isActualTransactionActive());
188    
189            long id = deliver();
190    
191            registerJobListener();
192    
193            services.getMessagingService().removeByOriginId("origin id", "a user", "a cause");
194            
195            waitForNextJobCompletion();
196    
197            Collection<MessageDelivery> deliveries = services.getMessageDeliveryService().getAllMessageDeliveries();
198            assertNotNull(deliveries);
199            // should be all gone except for the 2 bad deliveries
200            assertEquals(2, deliveries.size());
201            
202            // should be all gone except for the 2 bad deliveries
203            assertEquals(2, deliveries.size());
204            for (MessageDelivery d: deliveries) {
205                assertTrue("broken".equals(d.getDelivererTypeName()) || "bogus".equals(d.getDelivererTypeName()));
206            }
207        }
208        
209        protected void registerJobListener() throws SchedulerException {
210            KSBServiceLocator.getScheduler().addGlobalJobListener(new JobListenerSupport() {
211                @Override
212                public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
213                    log.info("Job was executed: " + context);
214                    if (MessageProcessingJob.NAME.equals(context.getJobDetail().getName())) {
215                        signal.countDown();
216                    }
217                }
218                public String getName() {
219                    return System.currentTimeMillis() + RandomStringUtils.randomAlphanumeric(10);
220                }
221            });
222        }
223    
224        protected void waitForNextJobCompletion() throws InterruptedException {
225            log.info("Waiting for job to complete...");
226            signal.await(100, TimeUnit.SECONDS); // time limit so as not to hang tests if something goes wrong
227            signal = new CountDownLatch(1);
228            log.info("Job completed...");
229        }
230    }