View Javadoc
1   /**
2    * Copyright 2005-2014 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.ksb.messaging.config;
17  
18  import org.kuali.rice.core.api.config.CoreConfigHelper;
19  import org.kuali.rice.core.api.config.module.RunMode;
20  import org.kuali.rice.core.api.config.property.Config;
21  import org.kuali.rice.core.api.config.property.ConfigContext;
22  import org.kuali.rice.core.api.exception.RiceRuntimeException;
23  import org.kuali.rice.core.api.lifecycle.BaseLifecycle;
24  import org.kuali.rice.core.api.lifecycle.Lifecycle;
25  import org.kuali.rice.core.api.resourceloader.ResourceLoader;
26  import org.kuali.rice.core.api.util.ClassLoaderUtils;
27  import org.kuali.rice.core.api.util.RiceConstants;
28  import org.kuali.rice.core.framework.config.module.ModuleConfigurer;
29  import org.kuali.rice.core.framework.config.module.WebModuleConfiguration;
30  import org.kuali.rice.core.framework.lifecycle.ServiceDelegatingLifecycle;
31  import org.kuali.rice.ksb.api.KsbApiConstants;
32  import org.kuali.rice.ksb.api.KsbApiServiceLocator;
33  import org.kuali.rice.ksb.api.bus.ServiceDefinition;
34  import org.kuali.rice.ksb.messaging.AlternateEndpoint;
35  import org.kuali.rice.ksb.messaging.AlternateEndpointLocation;
36  import org.kuali.rice.ksb.messaging.MessageFetcher;
37  import org.kuali.rice.ksb.messaging.resourceloader.KSBResourceLoaderFactory;
38  import org.kuali.rice.ksb.service.KSBServiceLocator;
39  import org.kuali.rice.ksb.util.KSBConstants;
40  import org.quartz.Scheduler;
41  import org.springframework.context.ApplicationEvent;
42  import org.springframework.context.event.ContextRefreshedEvent;
43  import org.springframework.context.event.ContextStoppedEvent;
44  import org.springframework.context.event.SmartApplicationListener;
45  import org.springframework.core.Ordered;
46  import org.springframework.transaction.PlatformTransactionManager;
47  
48  import javax.sql.DataSource;
49  import javax.xml.ws.WebServiceException;
50  import java.util.ArrayList;
51  import java.util.Arrays;
52  import java.util.Collection;
53  import java.util.Collections;
54  import java.util.LinkedList;
55  import java.util.List;
56  
57  /**
58   * Used to configure the embedded workflow. This could be used to configure
59   * embedded workflow programmatically but mostly this is a base class by which
60   * to hang specific configuration behavior off of through subclassing
61   *
62   * @author Kuali Rice Team (rice.collab@kuali.org)
63   */
64  public class KSBConfigurer extends ModuleConfigurer implements SmartApplicationListener {
65  	
66      private static final String SERVICE_BUS_CLIENT_SPRING = "classpath:org/kuali/rice/ksb/config/KsbServiceBusClientSpringBeans.xml";
67      private static final String MESSAGE_CLIENT_SPRING = "classpath:org/kuali/rice/ksb/config/KsbMessageClientSpringBeans.xml";
68      private static final String JPA_MESSAGE_CLIENT_SPRING = "classpath:org/kuali/rice/ksb/config/KsbJpaMessageClientSpringBeans.xml";
69      private static final String BAM_SPRING = "classpath:org/kuali/rice/ksb/config/KsbBamSpringBeans.xml";
70      private static final String REGISTRY_SERVER_SPRING = "classpath:org/kuali/rice/ksb/config/KsbRegistryServerSpringBeans.xml";
71      private static final String JPA_REGISTRY_SPRING = "classpath:org/kuali/rice/ksb/config/KsbJpaRegistrySpringBeans.xml";
72      private static final String WEB_SPRING = "classpath:org/kuali/rice/ksb/config/KsbWebSpringBeans.xml";
73  
74      private List<ServiceDefinition> services = new ArrayList<ServiceDefinition>();
75  
76      private List<AlternateEndpointLocation> alternateEndpointLocations = new ArrayList<AlternateEndpointLocation>();
77  
78      private List<AlternateEndpoint> alternateEndpoints = new ArrayList<AlternateEndpoint>();
79  
80      private DataSource registryDataSource;
81  
82      private DataSource messageDataSource;
83  
84      private DataSource nonTransactionalMessageDataSource;
85  
86      private DataSource bamDataSource;
87  
88      private Scheduler exceptionMessagingScheduler;
89  
90      private PlatformTransactionManager platformTransactionManager;
91  
92      private List<Lifecycle> internalLifecycles;
93  
94      public KSBConfigurer() {
95          super(KsbApiConstants.KSB_MODULE_NAME);
96          setValidRunModes(Arrays.asList(RunMode.THIN, RunMode.REMOTE, RunMode.LOCAL));
97          this.internalLifecycles = new ArrayList<Lifecycle>();
98      }
99  
100     @Override
101     public void addAdditonalToConfig() {
102         configureDataSource();
103         configureScheduler();
104         configurePlatformTransactionManager();
105         configureAlternateEndpoints();
106     }
107 
108     @Override
109     public List<String> getPrimarySpringFiles() {
110         LOG.info("KSBConfigurer:getPrimarySpringFiles: getRunMode => " + getRunMode());
111         final List<String> springFileLocations = new ArrayList<String>();
112 
113         springFileLocations.add(SERVICE_BUS_CLIENT_SPRING);
114 
115         if (getRunMode() != RunMode.THIN) {
116             // Loading these beans unconditionally now, see:
117             // KULRICE-6574: Some KSB beans not defined unless message persistence turned on
118             //
119             // if (isMessagePersistenceEnabled()) {
120             springFileLocations.add(MESSAGE_CLIENT_SPRING);
121             springFileLocations.add(JPA_MESSAGE_CLIENT_SPRING);
122             // }
123 
124             if (isBamEnabled()) {
125                 springFileLocations.add(BAM_SPRING);
126             }
127         }
128 
129         if (getRunMode().equals(RunMode.LOCAL)) {
130             springFileLocations.add(JPA_REGISTRY_SPRING);
131             springFileLocations.add(REGISTRY_SERVER_SPRING);
132         }
133 
134         return springFileLocations;
135     }
136 
137     @Override
138     public boolean hasWebInterface() {
139         return true;
140     }
141 
142     // See KULRICE-7093: KSB Module UI is not available on client applications
143     @Override
144     public boolean shouldRenderWebInterface() {
145         if (ConfigContext.getCurrentContextConfig().getBooleanProperty(KSBConstants.Config.WEB_FORCE_ENABLE)) {
146             return true;
147         }
148         return super.shouldRenderWebInterface();
149     }
150 
151     @Override
152     protected WebModuleConfiguration loadWebModule() {
153         WebModuleConfiguration configuration = super.loadWebModule();
154         configuration.getWebSpringFiles().add(WEB_SPRING);
155         return configuration;
156     }
157 
158     @Override
159     public Collection<ResourceLoader> getResourceLoadersToRegister() throws Exception {
160         ResourceLoader ksbRemoteResourceLoader = KSBResourceLoaderFactory.createRootKSBRemoteResourceLoader();
161         ksbRemoteResourceLoader.start();
162         return Collections.singletonList(ksbRemoteResourceLoader);
163     }
164 
165     @Override
166     public List<Lifecycle> loadLifecycles() throws Exception {
167         List<Lifecycle> lifecycles = new LinkedList<Lifecycle>();
168 
169         // this validation of our service list needs to happen after we've
170         // loaded our configs so it's a lifecycle
171         lifecycles.add(new BaseLifecycle() {
172             @Override
173             public void start() throws Exception {
174                 super.start();
175             }
176         });
177 
178         return lifecycles;
179     }
180 
181     protected void validateServices(List<ServiceDefinition> services) {
182         for (final ServiceDefinition serviceDef : this.services) {
183             serviceDef.validate();
184         }
185     }
186 
187     @Override
188     public void onApplicationEvent(ApplicationEvent applicationEvent) {
189         if (applicationEvent instanceof ContextRefreshedEvent) {
190             doAdditionalContextStartedLogic();
191         } else if (applicationEvent instanceof ContextStoppedEvent) {
192             doAdditionalContextStoppedLogic();
193         }
194     }
195 
196     protected void doAdditionalContextStartedLogic() {
197         validateServices(getServices());
198         ServicePublisher servicePublisher = new ServicePublisher(getServices());
199         Lifecycle serviceBus = new ServiceDelegatingLifecycle(KsbApiServiceLocator.SERVICE_BUS);
200         Lifecycle threadPool = new ServiceDelegatingLifecycle(KSBConstants.ServiceNames.THREAD_POOL_SERVICE);
201         Lifecycle scheduledThreadPool = new ServiceDelegatingLifecycle(KSBConstants.ServiceNames.SCHEDULED_THREAD_POOL_SERVICE);
202 
203         try {
204             servicePublisher.start();
205             internalLifecycles.add(servicePublisher);
206             serviceBus.start();
207             internalLifecycles.add(serviceBus);
208             threadPool.start();
209             internalLifecycles.add(threadPool);
210             scheduledThreadPool.start();
211             internalLifecycles.add(scheduledThreadPool);
212         } catch (Exception e) {
213             if (e instanceof RuntimeException) {
214                 throw (RuntimeException) e;
215             }
216             throw new RiceRuntimeException("Failed to initialize KSB on context startup");
217         }
218 
219         // Don't requeue messages if we are in thin client mode
220         if (getRunMode() != RunMode.THIN) {
221             requeueMessages();
222         }
223     }
224 
225     protected void doAdditionalContextStoppedLogic() {
226         cleanUpConfiguration();
227     }
228 
229     @Override
230     public boolean supportsEventType(Class<? extends ApplicationEvent> aClass) {
231         return true;
232     }
233 
234     @Override
235     public boolean supportsSourceType(Class<?> aClass) {
236         return true;
237     }
238 
239     @Override
240     public int getOrder() {
241         return Ordered.LOWEST_PRECEDENCE;
242     }
243 
244     @Override
245     protected void doAdditionalModuleStartLogic() throws Exception {
246         // this allows us to become aware of remote services, in case the application needs to use any of them during startup
247         /* Wrapped this call in a try/catch block to handle when the web
248 		 * service call fails so the KSB module will continue starting even
249 		 * if the remote service registry is not available.
250 		 */
251         try {
252             // this allows us to become aware of remote services, in case the application needs to use any of them during startup
253             LOG.info("Synchronizing remote services with service bus after KSB startup...");
254             long startTime = System.currentTimeMillis();
255             KsbApiServiceLocator.getServiceBus().synchronizeRemoteServices();
256             long endTime = System.currentTimeMillis();
257             LOG.info("...total time to synchronize remote services with service bus after KSB startup: " + (endTime - startTime));
258         } catch (WebServiceException e) {
259             LOG.error("Caught web service exception synchronizing service bus while configuring KSB", e);
260         }
261     }
262 
263     @Override
264     protected void doAdditionalModuleStopLogic() throws Exception {
265         for (int index = internalLifecycles.size() - 1; index >= 0; index--) {
266             try {
267                 internalLifecycles.get(index).stop();
268             } catch (Exception e) {
269                 LOG.error("Failed to properly execute shutdown logic.", e);
270             }
271         }
272     }
273 
274     /**
275      * Used to refresh the service registry after the Application Context is initialized.  This way any services that
276      * were exported on startup
277      * will be available in the service registry once startup is complete.
278      */
279     private void requeueMessages() {
280         LOG.info("Refreshing Service Registry to export services to the bus.");
281         try {
282             KsbApiServiceLocator.getServiceBus().synchronizeLocalServices();
283         }catch (WebServiceException e) {
284             LOG.error("Caught web service exception while starting the KSB Configurer", e);
285         }
286 
287         //automatically requeue documents sitting with status of 'R'
288         MessageFetcher messageFetcher = new MessageFetcher((Integer) null);
289         KSBServiceLocator.getThreadPool().execute(messageFetcher);
290     }
291 
292     protected boolean isMessagePersistenceEnabled() {
293         return ConfigContext.getCurrentContextConfig().getBooleanProperty(KSBConstants.Config.MESSAGE_PERSISTENCE, true);
294     }
295 
296     protected boolean isBamEnabled() {
297         return ConfigContext.getCurrentContextConfig().getBooleanProperty(Config.BAM_ENABLED, false);
298     }
299 
300     protected void configureScheduler() {
301         if (this.getExceptionMessagingScheduler() != null) {
302             LOG.info("Configuring injected exception messaging Scheduler");
303             ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.INJECTED_EXCEPTION_MESSAGE_SCHEDULER_KEY, this.getExceptionMessagingScheduler());
304         }
305     }
306 
307     protected void configureDataSource() {
308         if (isMessagePersistenceEnabled()) {
309             if (getMessageDataSource() != null) {
310                 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_MESSAGE_DATASOURCE, getMessageDataSource());
311             }
312             if (getNonTransactionalMessageDataSource() != null) {
313                 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_MESSAGE_NON_TRANSACTIONAL_DATASOURCE, getNonTransactionalMessageDataSource());
314             }
315         }
316         if (getRunMode().equals(RunMode.LOCAL)) {
317             if (getRegistryDataSource() != null) {
318                 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_REGISTRY_DATASOURCE, getRegistryDataSource());
319             }
320         }
321         if (isBamEnabled()) {
322             if (getBamDataSource() != null) {
323                 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_BAM_DATASOURCE, getBamDataSource());
324             }
325         }
326     }
327 
328     protected void configurePlatformTransactionManager() {
329         if (getPlatformTransactionManager() == null) {
330             return;
331         }
332         ConfigContext.getCurrentContextConfig().putObject(RiceConstants.SPRING_TRANSACTION_MANAGER, getPlatformTransactionManager());
333     }
334 
335     protected void configureAlternateEndpoints() {
336         ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINT_LOCATIONS, getAlternateEndpointLocations());
337         ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINTS, getAlternateEndpoints());
338     }
339 
340     /**
341      * Because our configuration is global, shutting down Rice does not get rid of objects stored there.  For that
342      * reason we need to manually clean these up.  This is most important in the case of the service bus because the
343      * configuration is used to store services to be exported.  If we don't clean this up then a shutdown/startup
344      * within the same class loading context causes the service list to be doubled and results in "multiple endpoint"
345      * error messages.
346      */
347     protected void cleanUpConfiguration() {
348         ConfigContext.getCurrentContextConfig().removeObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINTS);
349     }
350 
351     public List<ServiceDefinition> getServices() {
352         return this.services;
353     }
354 
355     public void setServices(List<ServiceDefinition> javaServices) {
356         this.services = javaServices;
357     }
358 
359     public DataSource getMessageDataSource() {
360         return this.messageDataSource;
361     }
362 
363     public void setMessageDataSource(DataSource messageDataSource) {
364         this.messageDataSource = messageDataSource;
365     }
366 
367     public DataSource getNonTransactionalMessageDataSource() {
368         return this.nonTransactionalMessageDataSource;
369     }
370 
371     public void setNonTransactionalMessageDataSource(DataSource nonTransactionalMessageDataSource) {
372         this.nonTransactionalMessageDataSource = nonTransactionalMessageDataSource;
373     }
374 
375     public DataSource getRegistryDataSource() {
376         return this.registryDataSource;
377     }
378 
379     public void setRegistryDataSource(DataSource registryDataSource) {
380         this.registryDataSource = registryDataSource;
381     }
382 
383     public DataSource getBamDataSource() {
384         return this.bamDataSource;
385     }
386 
387     public void setBamDataSource(DataSource bamDataSource) {
388         this.bamDataSource = bamDataSource;
389     }
390 
391     public Scheduler getExceptionMessagingScheduler() {
392         return this.exceptionMessagingScheduler;
393     }
394 
395     public void setExceptionMessagingScheduler(Scheduler exceptionMessagingScheduler) {
396         this.exceptionMessagingScheduler = exceptionMessagingScheduler;
397     }
398 
399     public PlatformTransactionManager getPlatformTransactionManager() {
400         return platformTransactionManager;
401     }
402 
403     public void setPlatformTransactionManager(PlatformTransactionManager springTransactionManager) {
404         this.platformTransactionManager = springTransactionManager;
405     }
406 
407     public List<AlternateEndpointLocation> getAlternateEndpointLocations() {
408         return this.alternateEndpointLocations;
409     }
410 
411     public void setAlternateEndpointLocations(List<AlternateEndpointLocation> alternateEndpointLocations) {
412         this.alternateEndpointLocations = alternateEndpointLocations;
413     }
414 
415     public List<AlternateEndpoint> getAlternateEndpoints() {
416         return this.alternateEndpoints;
417     }
418 
419     public void setAlternateEndpoints(List<AlternateEndpoint> alternateEndpoints) {
420         this.alternateEndpoints = alternateEndpoints;
421     }
422 
423     private final class ServicePublisher extends BaseLifecycle {
424 
425         private final List<ServiceDefinition> serviceDefinitions;
426 
427         ServicePublisher(List<ServiceDefinition> serviceDefinitions) {
428             this.serviceDefinitions = serviceDefinitions;
429         }
430 
431         @Override
432         public void start() throws Exception {
433             if (serviceDefinitions != null && !serviceDefinitions.isEmpty()) {
434                 LOG.debug("Configuring " + serviceDefinitions.size() + " services for application id " + CoreConfigHelper.getApplicationId() + " using config for classloader " + ClassLoaderUtils.getDefaultClassLoader());
435                 KsbApiServiceLocator.getServiceBus().publishServices(serviceDefinitions, true);
436                 super.start();
437             }
438         }
439 
440     }
441 
442 }