001 /**
002 * Copyright 2005-2014 The Kuali Foundation
003 *
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/ecl2.php
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package org.kuali.rice.ksb.messaging.config;
017
018 import org.apache.commons.httpclient.contrib.ssl.EasySSLProtocolSocketFactory;
019 import org.apache.commons.httpclient.protocol.Protocol;
020 import org.apache.commons.httpclient.protocol.ProtocolSocketFactory;
021 import org.kuali.rice.core.api.config.CoreConfigHelper;
022 import org.kuali.rice.core.api.config.module.RunMode;
023 import org.kuali.rice.core.api.config.property.Config;
024 import org.kuali.rice.core.api.config.property.ConfigContext;
025 import org.kuali.rice.core.api.exception.RiceRuntimeException;
026 import org.kuali.rice.core.api.lifecycle.BaseLifecycle;
027 import org.kuali.rice.core.api.lifecycle.Lifecycle;
028 import org.kuali.rice.core.api.resourceloader.ResourceLoader;
029 import org.kuali.rice.core.api.util.ClassLoaderUtils;
030 import org.kuali.rice.core.api.util.RiceConstants;
031 import org.kuali.rice.core.framework.config.module.ModuleConfigurer;
032 import org.kuali.rice.core.framework.config.module.WebModuleConfiguration;
033 import org.kuali.rice.core.framework.lifecycle.ServiceDelegatingLifecycle;
034 import org.kuali.rice.core.framework.persistence.jpa.OrmUtils;
035 import org.kuali.rice.ksb.api.KsbApiConstants;
036 import org.kuali.rice.ksb.api.KsbApiServiceLocator;
037 import org.kuali.rice.ksb.api.bus.ServiceDefinition;
038 import org.kuali.rice.ksb.messaging.AlternateEndpoint;
039 import org.kuali.rice.ksb.messaging.AlternateEndpointLocation;
040 import org.kuali.rice.ksb.messaging.MessageFetcher;
041 import org.kuali.rice.ksb.messaging.resourceloader.KSBResourceLoaderFactory;
042 import org.kuali.rice.ksb.messaging.serviceconnectors.HttpInvokerConnector;
043 import org.kuali.rice.ksb.service.KSBServiceLocator;
044 import org.kuali.rice.ksb.util.KSBConstants;
045 import org.quartz.Scheduler;
046 import org.springframework.context.ApplicationEvent;
047 import org.springframework.context.event.ContextRefreshedEvent;
048 import org.springframework.context.event.ContextStoppedEvent;
049 import org.springframework.context.event.SmartApplicationListener;
050 import org.springframework.core.Ordered;
051 import org.springframework.transaction.PlatformTransactionManager;
052
053 import javax.sql.DataSource;
054 import java.util.ArrayList;
055 import java.util.Arrays;
056 import java.util.Collection;
057 import java.util.Collections;
058 import java.util.LinkedList;
059 import java.util.List;
060
061
062 /**
063 * Used to configure the embedded workflow. This could be used to configure
064 * embedded workflow programmatically but mostly this is a base class by which
065 * to hang specific configuration behavior off of through subclassing
066 *
067 * @author Kuali Rice Team (rice.collab@kuali.org)
068 *
069 */
070 public class KSBConfigurer extends ModuleConfigurer implements SmartApplicationListener {
071
072 private static final String SERVICE_BUS_CLIENT_SPRING = "classpath:org/kuali/rice/ksb/config/KsbServiceBusClientSpringBeans.xml";
073 private static final String MESSAGE_CLIENT_SPRING = "classpath:org/kuali/rice/ksb/config/KsbMessageClientSpringBeans.xml";
074 private static final String OJB_MESSAGE_CLIENT_SPRING = "classpath:org/kuali/rice/ksb/config/KsbOjbMessageClientSpringBeans.xml";
075 private static final String BAM_SPRING = "classpath:org/kuali/rice/ksb/config/KsbBamSpringBeans.xml";
076 private static final String OJB_BAM_SPRING = "classpath:org/kuali/rice/ksb/config/KsbOjbBamSpringBeans.xml";
077 private static final String REGISTRY_SERVER_SPRING = "classpath:org/kuali/rice/ksb/config/KsbRegistryServerSpringBeans.xml";
078 private static final String OJB_REGISTRY_SPRING = "classpath:org/kuali/rice/ksb/config/KsbOjbRegistrySpringBeans.xml";
079 private static final String WEB_SPRING = "classpath:org/kuali/rice/ksb/config/KsbWebSpringBeans.xml";
080
081 private List<ServiceDefinition> services = new ArrayList<ServiceDefinition>();
082
083 private List<AlternateEndpointLocation> alternateEndpointLocations = new ArrayList<AlternateEndpointLocation>();
084
085 private List<AlternateEndpoint> alternateEndpoints = new ArrayList<AlternateEndpoint>();
086
087 private DataSource registryDataSource;
088
089 private DataSource messageDataSource;
090
091 private DataSource nonTransactionalMessageDataSource;
092
093 private DataSource bamDataSource;
094
095 private Scheduler exceptionMessagingScheduler;
096
097 private PlatformTransactionManager platformTransactionManager;
098
099 private List<Lifecycle> internalLifecycles;
100
101 public KSBConfigurer() {
102 super(KsbApiConstants.KSB_MODULE_NAME);
103 setValidRunModes(Arrays.asList(RunMode.THIN, RunMode.REMOTE, RunMode.LOCAL));
104 this.internalLifecycles = new ArrayList<Lifecycle>();
105 }
106
107 @Override
108 public void addAdditonalToConfig() {
109 configureDataSource();
110 configureScheduler();
111 configurePlatformTransactionManager();
112 configureAlternateEndpoints();
113 }
114
115 @Override
116 public List<String> getPrimarySpringFiles(){
117 final List<String> springFileLocations = new ArrayList<String>();
118
119 springFileLocations.add(SERVICE_BUS_CLIENT_SPRING);
120
121 if (getRunMode() != RunMode.THIN) {
122
123 boolean isJpa = OrmUtils.isJpaEnabled("rice.ksb");
124 if (isJpa) {
125 // TODO redo this once we're back to JPA
126 // springFileLocations.add("classpath:org/kuali/rice/ksb/config/KSBJPASpringBeans.xml");
127 throw new UnsupportedOperationException("JPA not currently supported for KSB");
128 }
129
130 // Loading these beans unconditionally now, see:
131 // KULRICE-6574: Some KSB beans not defined unless message persistence turned on
132 //
133 // if (isMessagePersistenceEnabled()) {
134 springFileLocations.add(MESSAGE_CLIENT_SPRING);
135 springFileLocations.add(OJB_MESSAGE_CLIENT_SPRING);
136 // }
137
138 if (isBamEnabled()) {
139 springFileLocations.add(BAM_SPRING);
140 springFileLocations.add(OJB_BAM_SPRING);
141 }
142 }
143
144 if (getRunMode().equals( RunMode.LOCAL )) {
145 springFileLocations.add(REGISTRY_SERVER_SPRING);
146 springFileLocations.add(OJB_REGISTRY_SPRING);
147 }
148
149 return springFileLocations;
150 }
151
152 @Override
153 public boolean hasWebInterface() {
154 return true;
155 }
156
157 // See KULRICE-7093: KSB Module UI is not available on client applications
158 @Override
159 public boolean shouldRenderWebInterface() {
160 if (ConfigContext.getCurrentContextConfig().getBooleanProperty(KSBConstants.Config.WEB_FORCE_ENABLE)) {
161 return true;
162 }
163 return super.shouldRenderWebInterface();
164 }
165
166 @Override
167 protected WebModuleConfiguration loadWebModule() {
168 WebModuleConfiguration configuration = super.loadWebModule();
169 configuration.getWebSpringFiles().add(WEB_SPRING);
170 return configuration;
171 }
172
173 @Override
174 public Collection<ResourceLoader> getResourceLoadersToRegister() throws Exception{
175 ResourceLoader ksbRemoteResourceLoader = KSBResourceLoaderFactory.createRootKSBRemoteResourceLoader();
176 ksbRemoteResourceLoader.start();
177 return Collections.singletonList(ksbRemoteResourceLoader);
178 }
179
180 @Override
181 public List<Lifecycle> loadLifecycles() throws Exception {
182 List<Lifecycle> lifecycles = new LinkedList<Lifecycle>();
183 // this validation of our service list needs to happen after we've
184 // loaded our configs so it's a lifecycle
185 lifecycles.add(new BaseLifecycle() {
186
187 @Override
188 public void start() throws Exception {
189 // first check if we want to allow self-signed certificates for SSL communication
190 if (Boolean.valueOf(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.KSB_ALLOW_SELF_SIGNED_SSL)).booleanValue()) {
191 Protocol.registerProtocol("https", new Protocol("https",
192 (ProtocolSocketFactory) new EasySSLProtocolSocketFactory(), 443));
193 }
194 super.start();
195 }
196 });
197 return lifecycles;
198 }
199
200 protected void validateServices(List<ServiceDefinition> services) {
201 for (final ServiceDefinition serviceDef : this.services) {
202 serviceDef.validate();
203 }
204 }
205
206 @Override
207 public void onApplicationEvent(ApplicationEvent applicationEvent) {
208 if (applicationEvent instanceof ContextRefreshedEvent) {
209 doAdditionalContextStartedLogic();
210 } else if (applicationEvent instanceof ContextStoppedEvent) {
211 doAdditionalContextStoppedLogic();
212 }
213 }
214
215 protected void doAdditionalContextStartedLogic() {
216 validateServices(getServices());
217 ServicePublisher servicePublisher = new ServicePublisher(getServices());
218 Lifecycle serviceBus = new ServiceDelegatingLifecycle(KsbApiServiceLocator.SERVICE_BUS);
219 Lifecycle threadPool = new ServiceDelegatingLifecycle(KSBConstants.ServiceNames.THREAD_POOL_SERVICE);
220 Lifecycle scheduledThreadPool = new ServiceDelegatingLifecycle(KSBConstants.ServiceNames.SCHEDULED_THREAD_POOL_SERVICE);
221
222 try {
223 servicePublisher.start();
224 internalLifecycles.add(servicePublisher);
225 serviceBus.start();
226 internalLifecycles.add(serviceBus);
227 threadPool.start();
228 internalLifecycles.add(threadPool);
229 scheduledThreadPool.start();
230 internalLifecycles.add(scheduledThreadPool);
231 } catch (Exception e) {
232 if (e instanceof RuntimeException) {
233 throw (RuntimeException)e;
234 }
235 throw new RiceRuntimeException("Failed to initialize KSB on context startup");
236 }
237
238 // Don't requeue messages if we are in thin client mode
239 if (getRunMode() != RunMode.THIN) {
240 requeueMessages();
241 }
242 }
243
244 protected void doAdditionalContextStoppedLogic() {
245 try {
246 HttpInvokerConnector.shutdownIdleConnectionTimeout();
247 } catch (Exception e) {
248 LOG.error("Failed to shutdown idle connection timeout evictor thread.", e);
249 }
250 cleanUpConfiguration();
251 }
252
253 @Override
254 public boolean supportsEventType(Class<? extends ApplicationEvent> aClass) {
255 return true;
256 }
257
258 @Override
259 public boolean supportsSourceType(Class<?> aClass) {
260 return true;
261 }
262
263 @Override
264 public int getOrder() {
265 return Ordered.LOWEST_PRECEDENCE;
266 }
267
268 @Override
269 protected void doAdditionalModuleStartLogic() throws Exception {
270 // this allows us to become aware of remote services, in case the application needs to use any of them during startup
271 LOG.info("Synchronizing remote services with service bus after KSB startup...");
272 long startTime = System.currentTimeMillis();
273 KsbApiServiceLocator.getServiceBus().synchronizeRemoteServices();
274 long endTime = System.currentTimeMillis();
275 LOG.info("...total time to synchronize remote services with service bus after KSB startup: " + (endTime - startTime));
276 }
277
278 @Override
279 protected void doAdditionalModuleStopLogic() throws Exception {
280 for (int index = internalLifecycles.size() - 1; index >= 0; index--) {
281 try {
282 internalLifecycles.get(index).stop();
283 } catch (Exception e) {
284 LOG.error("Failed to properly execute shutdown logic.", e);
285 }
286 }
287 }
288
289 /**
290 * Used to refresh the service registry after the Application Context is initialized. This way any services that were exported on startup
291 * will be available in the service registry once startup is complete.
292 */
293 private void requeueMessages() {
294 LOG.info("Refreshing Service Registry to export services to the bus.");
295 KsbApiServiceLocator.getServiceBus().synchronizeLocalServices();
296
297 //automatically requeue documents sitting with status of 'R'
298 MessageFetcher messageFetcher = new MessageFetcher((Integer) null);
299 KSBServiceLocator.getThreadPool().execute(messageFetcher);
300 }
301
302 protected boolean isMessagePersistenceEnabled() {
303 return ConfigContext.getCurrentContextConfig().getBooleanProperty(KSBConstants.Config.MESSAGE_PERSISTENCE, true);
304 }
305
306 protected boolean isBamEnabled() {
307 return ConfigContext.getCurrentContextConfig().getBooleanProperty(Config.BAM_ENABLED, false);
308 }
309
310 protected void configureScheduler() {
311 if (this.getExceptionMessagingScheduler() != null) {
312 LOG.info("Configuring injected exception messaging Scheduler");
313 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.INJECTED_EXCEPTION_MESSAGE_SCHEDULER_KEY, this.getExceptionMessagingScheduler());
314 }
315 }
316
317 protected void configureDataSource() {
318 if (isMessagePersistenceEnabled()) {
319 if (getMessageDataSource() != null) {
320 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_MESSAGE_DATASOURCE, getMessageDataSource());
321 }
322 if (getNonTransactionalMessageDataSource() != null) {
323 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_MESSAGE_NON_TRANSACTIONAL_DATASOURCE, getNonTransactionalMessageDataSource());
324 }
325 }
326 if (getRunMode().equals(RunMode.LOCAL)) {
327 if (getRegistryDataSource() != null) {
328 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_REGISTRY_DATASOURCE, getRegistryDataSource());
329 }
330 }
331 if (isBamEnabled()) {
332 if (getBamDataSource() != null) {
333 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_BAM_DATASOURCE, getBamDataSource());
334 }
335 }
336 }
337
338 protected void configurePlatformTransactionManager() {
339 if (getPlatformTransactionManager() == null) {
340 return;
341 }
342 ConfigContext.getCurrentContextConfig().putObject(RiceConstants.SPRING_TRANSACTION_MANAGER, getPlatformTransactionManager());
343 }
344
345 protected void configureAlternateEndpoints() {
346 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINT_LOCATIONS, getAlternateEndpointLocations());
347 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINTS, getAlternateEndpoints());
348 }
349
350 /**
351 * Because our configuration is global, shutting down Rice does not get rid of objects stored there. For that reason
352 * we need to manually clean these up. This is most important in the case of the service bus because the configuration
353 * is used to store services to be exported. If we don't clean this up then a shutdown/startup within the same
354 * class loading context causes the service list to be doubled and results in "multiple endpoint" error messages.
355 *
356 */
357 protected void cleanUpConfiguration() {
358 ConfigContext.getCurrentContextConfig().removeObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINTS);
359 }
360
361 public List<ServiceDefinition> getServices() {
362 return this.services;
363 }
364
365 public void setServices(List<ServiceDefinition> javaServices) {
366 this.services = javaServices;
367 }
368
369 public DataSource getMessageDataSource() {
370 return this.messageDataSource;
371 }
372
373 public void setMessageDataSource(DataSource messageDataSource) {
374 this.messageDataSource = messageDataSource;
375 }
376
377 public DataSource getNonTransactionalMessageDataSource() {
378 return this.nonTransactionalMessageDataSource;
379 }
380
381 public void setNonTransactionalMessageDataSource(DataSource nonTransactionalMessageDataSource) {
382 this.nonTransactionalMessageDataSource = nonTransactionalMessageDataSource;
383 }
384
385 public DataSource getRegistryDataSource() {
386 return this.registryDataSource;
387 }
388
389 public void setRegistryDataSource(DataSource registryDataSource) {
390 this.registryDataSource = registryDataSource;
391 }
392
393 public DataSource getBamDataSource() {
394 return this.bamDataSource;
395 }
396
397 public void setBamDataSource(DataSource bamDataSource) {
398 this.bamDataSource = bamDataSource;
399 }
400
401 public Scheduler getExceptionMessagingScheduler() {
402 return this.exceptionMessagingScheduler;
403 }
404
405 public void setExceptionMessagingScheduler(Scheduler exceptionMessagingScheduler) {
406 this.exceptionMessagingScheduler = exceptionMessagingScheduler;
407 }
408
409 public PlatformTransactionManager getPlatformTransactionManager() {
410 return platformTransactionManager;
411 }
412
413 public void setPlatformTransactionManager(PlatformTransactionManager springTransactionManager) {
414 this.platformTransactionManager = springTransactionManager;
415 }
416
417 public List<AlternateEndpointLocation> getAlternateEndpointLocations() {
418 return this.alternateEndpointLocations;
419 }
420
421 public void setAlternateEndpointLocations(List<AlternateEndpointLocation> alternateEndpointLocations) {
422 this.alternateEndpointLocations = alternateEndpointLocations;
423 }
424
425 public List<AlternateEndpoint> getAlternateEndpoints() {
426 return this.alternateEndpoints;
427 }
428
429 public void setAlternateEndpoints(List<AlternateEndpoint> alternateEndpoints) {
430 this.alternateEndpoints = alternateEndpoints;
431 }
432
433 private final class ServicePublisher extends BaseLifecycle {
434
435 private final List<ServiceDefinition> serviceDefinitions;
436
437 ServicePublisher(List<ServiceDefinition> serviceDefinitions) {
438 this.serviceDefinitions = serviceDefinitions;
439 }
440
441 @Override
442 public void start() throws Exception {
443 if (serviceDefinitions != null && !serviceDefinitions.isEmpty()) {
444 LOG.debug("Configuring " + serviceDefinitions.size() + " services for application id " + CoreConfigHelper.getApplicationId() + " using config for classloader " + ClassLoaderUtils.getDefaultClassLoader());
445 KsbApiServiceLocator.getServiceBus().publishServices(serviceDefinitions, true);
446 super.start();
447 }
448 }
449
450 }
451
452 }