001 /** 002 * Copyright 2005-2014 The Kuali Foundation 003 * 004 * Licensed under the Educational Community License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/ecl2.php 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 package org.kuali.rice.ksb.messaging.config; 017 018 import org.apache.commons.httpclient.contrib.ssl.EasySSLProtocolSocketFactory; 019 import org.apache.commons.httpclient.protocol.Protocol; 020 import org.apache.commons.httpclient.protocol.ProtocolSocketFactory; 021 import org.kuali.rice.core.api.config.CoreConfigHelper; 022 import org.kuali.rice.core.api.config.module.RunMode; 023 import org.kuali.rice.core.api.config.property.Config; 024 import org.kuali.rice.core.api.config.property.ConfigContext; 025 import org.kuali.rice.core.api.exception.RiceRuntimeException; 026 import org.kuali.rice.core.api.lifecycle.BaseLifecycle; 027 import org.kuali.rice.core.api.lifecycle.Lifecycle; 028 import org.kuali.rice.core.api.resourceloader.ResourceLoader; 029 import org.kuali.rice.core.api.util.ClassLoaderUtils; 030 import org.kuali.rice.core.api.util.RiceConstants; 031 import org.kuali.rice.core.framework.config.module.ModuleConfigurer; 032 import org.kuali.rice.core.framework.config.module.WebModuleConfiguration; 033 import org.kuali.rice.core.framework.lifecycle.ServiceDelegatingLifecycle; 034 import org.kuali.rice.core.framework.persistence.jpa.OrmUtils; 035 import org.kuali.rice.ksb.api.KsbApiConstants; 036 import org.kuali.rice.ksb.api.KsbApiServiceLocator; 037 import org.kuali.rice.ksb.api.bus.ServiceDefinition; 038 import org.kuali.rice.ksb.messaging.AlternateEndpoint; 039 import org.kuali.rice.ksb.messaging.AlternateEndpointLocation; 040 import org.kuali.rice.ksb.messaging.MessageFetcher; 041 import org.kuali.rice.ksb.messaging.resourceloader.KSBResourceLoaderFactory; 042 import org.kuali.rice.ksb.messaging.serviceconnectors.HttpInvokerConnector; 043 import org.kuali.rice.ksb.service.KSBServiceLocator; 044 import org.kuali.rice.ksb.util.KSBConstants; 045 import org.quartz.Scheduler; 046 import org.springframework.context.ApplicationEvent; 047 import org.springframework.context.event.ContextRefreshedEvent; 048 import org.springframework.context.event.ContextStoppedEvent; 049 import org.springframework.context.event.SmartApplicationListener; 050 import org.springframework.core.Ordered; 051 import org.springframework.transaction.PlatformTransactionManager; 052 053 import javax.sql.DataSource; 054 import java.util.ArrayList; 055 import java.util.Arrays; 056 import java.util.Collection; 057 import java.util.Collections; 058 import java.util.LinkedList; 059 import java.util.List; 060 061 062 /** 063 * Used to configure the embedded workflow. This could be used to configure 064 * embedded workflow programmatically but mostly this is a base class by which 065 * to hang specific configuration behavior off of through subclassing 066 * 067 * @author Kuali Rice Team (rice.collab@kuali.org) 068 * 069 */ 070 public class KSBConfigurer extends ModuleConfigurer implements SmartApplicationListener { 071 072 private static final String SERVICE_BUS_CLIENT_SPRING = "classpath:org/kuali/rice/ksb/config/KsbServiceBusClientSpringBeans.xml"; 073 private static final String MESSAGE_CLIENT_SPRING = "classpath:org/kuali/rice/ksb/config/KsbMessageClientSpringBeans.xml"; 074 private static final String OJB_MESSAGE_CLIENT_SPRING = "classpath:org/kuali/rice/ksb/config/KsbOjbMessageClientSpringBeans.xml"; 075 private static final String BAM_SPRING = "classpath:org/kuali/rice/ksb/config/KsbBamSpringBeans.xml"; 076 private static final String OJB_BAM_SPRING = "classpath:org/kuali/rice/ksb/config/KsbOjbBamSpringBeans.xml"; 077 private static final String REGISTRY_SERVER_SPRING = "classpath:org/kuali/rice/ksb/config/KsbRegistryServerSpringBeans.xml"; 078 private static final String OJB_REGISTRY_SPRING = "classpath:org/kuali/rice/ksb/config/KsbOjbRegistrySpringBeans.xml"; 079 private static final String WEB_SPRING = "classpath:org/kuali/rice/ksb/config/KsbWebSpringBeans.xml"; 080 081 private List<ServiceDefinition> services = new ArrayList<ServiceDefinition>(); 082 083 private List<AlternateEndpointLocation> alternateEndpointLocations = new ArrayList<AlternateEndpointLocation>(); 084 085 private List<AlternateEndpoint> alternateEndpoints = new ArrayList<AlternateEndpoint>(); 086 087 private DataSource registryDataSource; 088 089 private DataSource messageDataSource; 090 091 private DataSource nonTransactionalMessageDataSource; 092 093 private DataSource bamDataSource; 094 095 private Scheduler exceptionMessagingScheduler; 096 097 private PlatformTransactionManager platformTransactionManager; 098 099 private List<Lifecycle> internalLifecycles; 100 101 public KSBConfigurer() { 102 super(KsbApiConstants.KSB_MODULE_NAME); 103 setValidRunModes(Arrays.asList(RunMode.THIN, RunMode.REMOTE, RunMode.LOCAL)); 104 this.internalLifecycles = new ArrayList<Lifecycle>(); 105 } 106 107 @Override 108 public void addAdditonalToConfig() { 109 configureDataSource(); 110 configureScheduler(); 111 configurePlatformTransactionManager(); 112 configureAlternateEndpoints(); 113 } 114 115 @Override 116 public List<String> getPrimarySpringFiles(){ 117 final List<String> springFileLocations = new ArrayList<String>(); 118 119 springFileLocations.add(SERVICE_BUS_CLIENT_SPRING); 120 121 if (getRunMode() != RunMode.THIN) { 122 123 boolean isJpa = OrmUtils.isJpaEnabled("rice.ksb"); 124 if (isJpa) { 125 // TODO redo this once we're back to JPA 126 // springFileLocations.add("classpath:org/kuali/rice/ksb/config/KSBJPASpringBeans.xml"); 127 throw new UnsupportedOperationException("JPA not currently supported for KSB"); 128 } 129 130 // Loading these beans unconditionally now, see: 131 // KULRICE-6574: Some KSB beans not defined unless message persistence turned on 132 // 133 // if (isMessagePersistenceEnabled()) { 134 springFileLocations.add(MESSAGE_CLIENT_SPRING); 135 springFileLocations.add(OJB_MESSAGE_CLIENT_SPRING); 136 // } 137 138 if (isBamEnabled()) { 139 springFileLocations.add(BAM_SPRING); 140 springFileLocations.add(OJB_BAM_SPRING); 141 } 142 } 143 144 if (getRunMode().equals( RunMode.LOCAL )) { 145 springFileLocations.add(REGISTRY_SERVER_SPRING); 146 springFileLocations.add(OJB_REGISTRY_SPRING); 147 } 148 149 return springFileLocations; 150 } 151 152 @Override 153 public boolean hasWebInterface() { 154 return true; 155 } 156 157 // See KULRICE-7093: KSB Module UI is not available on client applications 158 @Override 159 public boolean shouldRenderWebInterface() { 160 if (ConfigContext.getCurrentContextConfig().getBooleanProperty(KSBConstants.Config.WEB_FORCE_ENABLE)) { 161 return true; 162 } 163 return super.shouldRenderWebInterface(); 164 } 165 166 @Override 167 protected WebModuleConfiguration loadWebModule() { 168 WebModuleConfiguration configuration = super.loadWebModule(); 169 configuration.getWebSpringFiles().add(WEB_SPRING); 170 return configuration; 171 } 172 173 @Override 174 public Collection<ResourceLoader> getResourceLoadersToRegister() throws Exception{ 175 ResourceLoader ksbRemoteResourceLoader = KSBResourceLoaderFactory.createRootKSBRemoteResourceLoader(); 176 ksbRemoteResourceLoader.start(); 177 return Collections.singletonList(ksbRemoteResourceLoader); 178 } 179 180 @Override 181 public List<Lifecycle> loadLifecycles() throws Exception { 182 List<Lifecycle> lifecycles = new LinkedList<Lifecycle>(); 183 // this validation of our service list needs to happen after we've 184 // loaded our configs so it's a lifecycle 185 lifecycles.add(new BaseLifecycle() { 186 187 @Override 188 public void start() throws Exception { 189 // first check if we want to allow self-signed certificates for SSL communication 190 if (Boolean.valueOf(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.KSB_ALLOW_SELF_SIGNED_SSL)).booleanValue()) { 191 Protocol.registerProtocol("https", new Protocol("https", 192 (ProtocolSocketFactory) new EasySSLProtocolSocketFactory(), 443)); 193 } 194 super.start(); 195 } 196 }); 197 return lifecycles; 198 } 199 200 protected void validateServices(List<ServiceDefinition> services) { 201 for (final ServiceDefinition serviceDef : this.services) { 202 serviceDef.validate(); 203 } 204 } 205 206 @Override 207 public void onApplicationEvent(ApplicationEvent applicationEvent) { 208 if (applicationEvent instanceof ContextRefreshedEvent) { 209 doAdditionalContextStartedLogic(); 210 } else if (applicationEvent instanceof ContextStoppedEvent) { 211 doAdditionalContextStoppedLogic(); 212 } 213 } 214 215 protected void doAdditionalContextStartedLogic() { 216 validateServices(getServices()); 217 ServicePublisher servicePublisher = new ServicePublisher(getServices()); 218 Lifecycle serviceBus = new ServiceDelegatingLifecycle(KsbApiServiceLocator.SERVICE_BUS); 219 Lifecycle threadPool = new ServiceDelegatingLifecycle(KSBConstants.ServiceNames.THREAD_POOL_SERVICE); 220 Lifecycle scheduledThreadPool = new ServiceDelegatingLifecycle(KSBConstants.ServiceNames.SCHEDULED_THREAD_POOL_SERVICE); 221 222 try { 223 servicePublisher.start(); 224 internalLifecycles.add(servicePublisher); 225 serviceBus.start(); 226 internalLifecycles.add(serviceBus); 227 threadPool.start(); 228 internalLifecycles.add(threadPool); 229 scheduledThreadPool.start(); 230 internalLifecycles.add(scheduledThreadPool); 231 } catch (Exception e) { 232 if (e instanceof RuntimeException) { 233 throw (RuntimeException)e; 234 } 235 throw new RiceRuntimeException("Failed to initialize KSB on context startup"); 236 } 237 238 // Don't requeue messages if we are in thin client mode 239 if (getRunMode() != RunMode.THIN) { 240 requeueMessages(); 241 } 242 } 243 244 protected void doAdditionalContextStoppedLogic() { 245 try { 246 HttpInvokerConnector.shutdownIdleConnectionTimeout(); 247 } catch (Exception e) { 248 LOG.error("Failed to shutdown idle connection timeout evictor thread.", e); 249 } 250 cleanUpConfiguration(); 251 } 252 253 @Override 254 public boolean supportsEventType(Class<? extends ApplicationEvent> aClass) { 255 return true; 256 } 257 258 @Override 259 public boolean supportsSourceType(Class<?> aClass) { 260 return true; 261 } 262 263 @Override 264 public int getOrder() { 265 return Ordered.LOWEST_PRECEDENCE; 266 } 267 268 @Override 269 protected void doAdditionalModuleStartLogic() throws Exception { 270 // this allows us to become aware of remote services, in case the application needs to use any of them during startup 271 LOG.info("Synchronizing remote services with service bus after KSB startup..."); 272 long startTime = System.currentTimeMillis(); 273 KsbApiServiceLocator.getServiceBus().synchronizeRemoteServices(); 274 long endTime = System.currentTimeMillis(); 275 LOG.info("...total time to synchronize remote services with service bus after KSB startup: " + (endTime - startTime)); 276 } 277 278 @Override 279 protected void doAdditionalModuleStopLogic() throws Exception { 280 for (int index = internalLifecycles.size() - 1; index >= 0; index--) { 281 try { 282 internalLifecycles.get(index).stop(); 283 } catch (Exception e) { 284 LOG.error("Failed to properly execute shutdown logic.", e); 285 } 286 } 287 } 288 289 /** 290 * Used to refresh the service registry after the Application Context is initialized. This way any services that were exported on startup 291 * will be available in the service registry once startup is complete. 292 */ 293 private void requeueMessages() { 294 LOG.info("Refreshing Service Registry to export services to the bus."); 295 KsbApiServiceLocator.getServiceBus().synchronizeLocalServices(); 296 297 //automatically requeue documents sitting with status of 'R' 298 MessageFetcher messageFetcher = new MessageFetcher((Integer) null); 299 KSBServiceLocator.getThreadPool().execute(messageFetcher); 300 } 301 302 protected boolean isMessagePersistenceEnabled() { 303 return ConfigContext.getCurrentContextConfig().getBooleanProperty(KSBConstants.Config.MESSAGE_PERSISTENCE, true); 304 } 305 306 protected boolean isBamEnabled() { 307 return ConfigContext.getCurrentContextConfig().getBooleanProperty(Config.BAM_ENABLED, false); 308 } 309 310 protected void configureScheduler() { 311 if (this.getExceptionMessagingScheduler() != null) { 312 LOG.info("Configuring injected exception messaging Scheduler"); 313 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.INJECTED_EXCEPTION_MESSAGE_SCHEDULER_KEY, this.getExceptionMessagingScheduler()); 314 } 315 } 316 317 protected void configureDataSource() { 318 if (isMessagePersistenceEnabled()) { 319 if (getMessageDataSource() != null) { 320 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_MESSAGE_DATASOURCE, getMessageDataSource()); 321 } 322 if (getNonTransactionalMessageDataSource() != null) { 323 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_MESSAGE_NON_TRANSACTIONAL_DATASOURCE, getNonTransactionalMessageDataSource()); 324 } 325 } 326 if (getRunMode().equals(RunMode.LOCAL)) { 327 if (getRegistryDataSource() != null) { 328 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_REGISTRY_DATASOURCE, getRegistryDataSource()); 329 } 330 } 331 if (isBamEnabled()) { 332 if (getBamDataSource() != null) { 333 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_BAM_DATASOURCE, getBamDataSource()); 334 } 335 } 336 } 337 338 protected void configurePlatformTransactionManager() { 339 if (getPlatformTransactionManager() == null) { 340 return; 341 } 342 ConfigContext.getCurrentContextConfig().putObject(RiceConstants.SPRING_TRANSACTION_MANAGER, getPlatformTransactionManager()); 343 } 344 345 protected void configureAlternateEndpoints() { 346 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINT_LOCATIONS, getAlternateEndpointLocations()); 347 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINTS, getAlternateEndpoints()); 348 } 349 350 /** 351 * Because our configuration is global, shutting down Rice does not get rid of objects stored there. For that reason 352 * we need to manually clean these up. This is most important in the case of the service bus because the configuration 353 * is used to store services to be exported. If we don't clean this up then a shutdown/startup within the same 354 * class loading context causes the service list to be doubled and results in "multiple endpoint" error messages. 355 * 356 */ 357 protected void cleanUpConfiguration() { 358 ConfigContext.getCurrentContextConfig().removeObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINTS); 359 } 360 361 public List<ServiceDefinition> getServices() { 362 return this.services; 363 } 364 365 public void setServices(List<ServiceDefinition> javaServices) { 366 this.services = javaServices; 367 } 368 369 public DataSource getMessageDataSource() { 370 return this.messageDataSource; 371 } 372 373 public void setMessageDataSource(DataSource messageDataSource) { 374 this.messageDataSource = messageDataSource; 375 } 376 377 public DataSource getNonTransactionalMessageDataSource() { 378 return this.nonTransactionalMessageDataSource; 379 } 380 381 public void setNonTransactionalMessageDataSource(DataSource nonTransactionalMessageDataSource) { 382 this.nonTransactionalMessageDataSource = nonTransactionalMessageDataSource; 383 } 384 385 public DataSource getRegistryDataSource() { 386 return this.registryDataSource; 387 } 388 389 public void setRegistryDataSource(DataSource registryDataSource) { 390 this.registryDataSource = registryDataSource; 391 } 392 393 public DataSource getBamDataSource() { 394 return this.bamDataSource; 395 } 396 397 public void setBamDataSource(DataSource bamDataSource) { 398 this.bamDataSource = bamDataSource; 399 } 400 401 public Scheduler getExceptionMessagingScheduler() { 402 return this.exceptionMessagingScheduler; 403 } 404 405 public void setExceptionMessagingScheduler(Scheduler exceptionMessagingScheduler) { 406 this.exceptionMessagingScheduler = exceptionMessagingScheduler; 407 } 408 409 public PlatformTransactionManager getPlatformTransactionManager() { 410 return platformTransactionManager; 411 } 412 413 public void setPlatformTransactionManager(PlatformTransactionManager springTransactionManager) { 414 this.platformTransactionManager = springTransactionManager; 415 } 416 417 public List<AlternateEndpointLocation> getAlternateEndpointLocations() { 418 return this.alternateEndpointLocations; 419 } 420 421 public void setAlternateEndpointLocations(List<AlternateEndpointLocation> alternateEndpointLocations) { 422 this.alternateEndpointLocations = alternateEndpointLocations; 423 } 424 425 public List<AlternateEndpoint> getAlternateEndpoints() { 426 return this.alternateEndpoints; 427 } 428 429 public void setAlternateEndpoints(List<AlternateEndpoint> alternateEndpoints) { 430 this.alternateEndpoints = alternateEndpoints; 431 } 432 433 private final class ServicePublisher extends BaseLifecycle { 434 435 private final List<ServiceDefinition> serviceDefinitions; 436 437 ServicePublisher(List<ServiceDefinition> serviceDefinitions) { 438 this.serviceDefinitions = serviceDefinitions; 439 } 440 441 @Override 442 public void start() throws Exception { 443 if (serviceDefinitions != null && !serviceDefinitions.isEmpty()) { 444 LOG.debug("Configuring " + serviceDefinitions.size() + " services for application id " + CoreConfigHelper.getApplicationId() + " using config for classloader " + ClassLoaderUtils.getDefaultClassLoader()); 445 KsbApiServiceLocator.getServiceBus().publishServices(serviceDefinitions, true); 446 super.start(); 447 } 448 } 449 450 } 451 452 }