View Javadoc

1   /**
2    * Copyright 2005-2015 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.ksb.messaging.config;
17  
18  import org.apache.commons.httpclient.contrib.ssl.EasySSLProtocolSocketFactory;
19  import org.apache.commons.httpclient.protocol.Protocol;
20  import org.apache.commons.httpclient.protocol.ProtocolSocketFactory;
21  import org.kuali.rice.core.api.config.CoreConfigHelper;
22  import org.kuali.rice.core.api.config.module.RunMode;
23  import org.kuali.rice.core.api.config.property.Config;
24  import org.kuali.rice.core.api.config.property.ConfigContext;
25  import org.kuali.rice.core.api.exception.RiceRuntimeException;
26  import org.kuali.rice.core.api.lifecycle.BaseLifecycle;
27  import org.kuali.rice.core.api.lifecycle.Lifecycle;
28  import org.kuali.rice.core.api.resourceloader.ResourceLoader;
29  import org.kuali.rice.core.api.util.ClassLoaderUtils;
30  import org.kuali.rice.core.api.util.RiceConstants;
31  import org.kuali.rice.core.framework.config.module.ModuleConfigurer;
32  import org.kuali.rice.core.framework.config.module.WebModuleConfiguration;
33  import org.kuali.rice.core.framework.lifecycle.ServiceDelegatingLifecycle;
34  import org.kuali.rice.core.framework.persistence.jpa.OrmUtils;
35  import org.kuali.rice.ksb.api.KsbApiConstants;
36  import org.kuali.rice.ksb.api.KsbApiServiceLocator;
37  import org.kuali.rice.ksb.api.bus.ServiceDefinition;
38  import org.kuali.rice.ksb.messaging.AlternateEndpoint;
39  import org.kuali.rice.ksb.messaging.AlternateEndpointLocation;
40  import org.kuali.rice.ksb.messaging.MessageFetcher;
41  import org.kuali.rice.ksb.messaging.resourceloader.KSBResourceLoaderFactory;
42  import org.kuali.rice.ksb.messaging.serviceconnectors.HttpInvokerConnector;
43  import org.kuali.rice.ksb.service.KSBServiceLocator;
44  import org.kuali.rice.ksb.util.KSBConstants;
45  import org.quartz.Scheduler;
46  import org.springframework.context.ApplicationEvent;
47  import org.springframework.context.event.ContextRefreshedEvent;
48  import org.springframework.context.event.ContextStoppedEvent;
49  import org.springframework.context.event.SmartApplicationListener;
50  import org.springframework.core.Ordered;
51  import org.springframework.transaction.PlatformTransactionManager;
52  
53  import javax.sql.DataSource;
54  import java.util.ArrayList;
55  import java.util.Arrays;
56  import java.util.Collection;
57  import java.util.Collections;
58  import java.util.LinkedList;
59  import java.util.List;
60  
61  
62  /**
63   * Used to configure the embedded workflow. This could be used to configure
64   * embedded workflow programmatically but mostly this is a base class by which
65   * to hang specific configuration behavior off of through subclassing
66   * 
67   * @author Kuali Rice Team (rice.collab@kuali.org)
68   * 
69   */
70  public class KSBConfigurer extends ModuleConfigurer implements SmartApplicationListener {
71  	
72  	private static final String SERVICE_BUS_CLIENT_SPRING = "classpath:org/kuali/rice/ksb/config/KsbServiceBusClientSpringBeans.xml";
73  	private static final String MESSAGE_CLIENT_SPRING = "classpath:org/kuali/rice/ksb/config/KsbMessageClientSpringBeans.xml";
74  	private static final String OJB_MESSAGE_CLIENT_SPRING = "classpath:org/kuali/rice/ksb/config/KsbOjbMessageClientSpringBeans.xml";
75  	private static final String BAM_SPRING = "classpath:org/kuali/rice/ksb/config/KsbBamSpringBeans.xml";
76  	private static final String OJB_BAM_SPRING = "classpath:org/kuali/rice/ksb/config/KsbOjbBamSpringBeans.xml";
77  	private static final String REGISTRY_SERVER_SPRING = "classpath:org/kuali/rice/ksb/config/KsbRegistryServerSpringBeans.xml";
78  	private static final String OJB_REGISTRY_SPRING = "classpath:org/kuali/rice/ksb/config/KsbOjbRegistrySpringBeans.xml";
79  	private static final String WEB_SPRING = "classpath:org/kuali/rice/ksb/config/KsbWebSpringBeans.xml";
80  
81  	private List<ServiceDefinition> services = new ArrayList<ServiceDefinition>();
82  	
83      private List<AlternateEndpointLocation> alternateEndpointLocations = new ArrayList<AlternateEndpointLocation>();
84  
85  	private List<AlternateEndpoint> alternateEndpoints = new ArrayList<AlternateEndpoint>();
86  
87  	private DataSource registryDataSource;
88  
89  	private DataSource messageDataSource;
90  	
91  	private DataSource nonTransactionalMessageDataSource;
92  	
93  	private DataSource bamDataSource;
94  
95  	private Scheduler exceptionMessagingScheduler;
96  
97  	private PlatformTransactionManager platformTransactionManager;
98  	
99  	private List<Lifecycle> internalLifecycles;
100 	
101 	public KSBConfigurer() {
102 		super(KsbApiConstants.KSB_MODULE_NAME);
103 		setValidRunModes(Arrays.asList(RunMode.THIN, RunMode.REMOTE, RunMode.LOCAL));
104 		this.internalLifecycles = new ArrayList<Lifecycle>();
105 	}
106 	
107 	@Override
108 	public void addAdditonalToConfig() {
109 		configureDataSource();
110 		configureScheduler();
111 		configurePlatformTransactionManager();
112 		configureAlternateEndpoints();
113 	}
114 
115 	@Override
116 	public List<String> getPrimarySpringFiles(){
117 		final List<String> springFileLocations = new ArrayList<String>();
118 
119 		springFileLocations.add(SERVICE_BUS_CLIENT_SPRING);
120 
121         if (getRunMode() != RunMode.THIN) {
122 
123             boolean isJpa = OrmUtils.isJpaEnabled("rice.ksb");
124             if (isJpa) {
125                 // TODO redo this once we're back to JPA
126                 // springFileLocations.add("classpath:org/kuali/rice/ksb/config/KSBJPASpringBeans.xml");
127                 throw new UnsupportedOperationException("JPA not currently supported for KSB");
128             }
129 
130             // Loading these beans unconditionally now, see:
131             // KULRICE-6574: Some KSB beans not defined unless message persistence turned on
132             //
133 		    // if (isMessagePersistenceEnabled()) {
134 			    springFileLocations.add(MESSAGE_CLIENT_SPRING);
135 			    springFileLocations.add(OJB_MESSAGE_CLIENT_SPRING);
136 		    // }
137 
138             if (isBamEnabled()) {
139         	    springFileLocations.add(BAM_SPRING);
140         	    springFileLocations.add(OJB_BAM_SPRING);
141             }
142         }
143 
144         if (getRunMode().equals( RunMode.LOCAL )) {
145         	springFileLocations.add(REGISTRY_SERVER_SPRING);
146         	springFileLocations.add(OJB_REGISTRY_SPRING);
147         }
148         
149         return springFileLocations;
150 	}
151 
152     @Override
153     public boolean hasWebInterface() {
154         return true;
155     }
156 
157     // See KULRICE-7093: KSB Module UI is not available on client applications
158     @Override
159     public boolean shouldRenderWebInterface() {
160         if (ConfigContext.getCurrentContextConfig().getBooleanProperty(KSBConstants.Config.WEB_FORCE_ENABLE)) {
161             return true;
162         }
163         return super.shouldRenderWebInterface();
164     }
165 
166     @Override
167     protected WebModuleConfiguration loadWebModule() {
168         WebModuleConfiguration configuration = super.loadWebModule();
169         configuration.getWebSpringFiles().add(WEB_SPRING);
170         return configuration;
171     }
172 
173     @Override
174 	public Collection<ResourceLoader> getResourceLoadersToRegister() throws Exception{
175 		ResourceLoader ksbRemoteResourceLoader = KSBResourceLoaderFactory.createRootKSBRemoteResourceLoader();
176 		ksbRemoteResourceLoader.start();
177 		return Collections.singletonList(ksbRemoteResourceLoader);
178 	}
179 	
180 	@Override
181 	public List<Lifecycle> loadLifecycles() throws Exception {
182 		List<Lifecycle> lifecycles = new LinkedList<Lifecycle>();
183 		// this validation of our service list needs to happen after we've
184 		// loaded our configs so it's a lifecycle
185 		lifecycles.add(new BaseLifecycle() {
186 
187 			@Override
188 			public void start() throws Exception {
189 				// first check if we want to allow self-signed certificates for SSL communication
190 				if (Boolean.valueOf(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.KSB_ALLOW_SELF_SIGNED_SSL)).booleanValue()) {
191 				    Protocol.registerProtocol("https", new Protocol("https",
192 					    (ProtocolSocketFactory) new EasySSLProtocolSocketFactory(), 443));
193 				}
194 				super.start();
195 			}
196 		});
197 		return lifecycles;
198 	}
199 	
200     protected void validateServices(List<ServiceDefinition> services) {
201         for (final ServiceDefinition serviceDef : this.services) {
202 			serviceDef.validate();
203 		}
204     }
205 
206     @Override
207     public void onApplicationEvent(ApplicationEvent applicationEvent) {
208         if (applicationEvent instanceof ContextRefreshedEvent) {
209             doAdditionalContextStartedLogic();
210         } else if (applicationEvent instanceof ContextStoppedEvent) {
211             doAdditionalContextStoppedLogic();
212         }
213     }
214 
215 	protected void doAdditionalContextStartedLogic() {
216         validateServices(getServices());
217 		ServicePublisher servicePublisher = new ServicePublisher(getServices());
218 		Lifecycle serviceBus = new ServiceDelegatingLifecycle(KsbApiServiceLocator.SERVICE_BUS);
219 		Lifecycle threadPool = new ServiceDelegatingLifecycle(KSBConstants.ServiceNames.THREAD_POOL_SERVICE);
220 		Lifecycle scheduledThreadPool = new ServiceDelegatingLifecycle(KSBConstants.ServiceNames.SCHEDULED_THREAD_POOL_SERVICE);
221 		
222 		try {
223 			servicePublisher.start();
224 			internalLifecycles.add(servicePublisher);
225 			serviceBus.start();
226 			internalLifecycles.add(serviceBus);
227 			threadPool.start();
228 			internalLifecycles.add(threadPool);
229 			scheduledThreadPool.start();
230 			internalLifecycles.add(scheduledThreadPool);
231 		} catch (Exception e) {
232 			if (e instanceof RuntimeException) {
233 				throw (RuntimeException)e;
234 			}
235 			throw new RiceRuntimeException("Failed to initialize KSB on context startup");
236 		}
237 
238         // Don't requeue messages if we are in thin client mode
239         if (getRunMode() != RunMode.THIN) {
240 		    requeueMessages();
241         }
242 	}
243 
244     protected void doAdditionalContextStoppedLogic() {
245         try {
246             HttpInvokerConnector.shutdownIdleConnectionTimeout();
247         } catch (Exception e) {
248             LOG.error("Failed to shutdown idle connection timeout evictor thread.", e);
249         }
250         cleanUpConfiguration();
251     }
252 
253     @Override
254     public boolean supportsEventType(Class<? extends ApplicationEvent> aClass) {
255         return true;
256     }
257 
258     @Override
259     public boolean supportsSourceType(Class<?> aClass) {
260         return true;
261     }
262 
263     @Override
264     public int getOrder() {
265         return Ordered.LOWEST_PRECEDENCE;
266     }
267 
268     @Override
269     protected void doAdditionalModuleStartLogic() throws Exception {
270         // this allows us to become aware of remote services, in case the application needs to use any of them during startup
271         LOG.info("Synchronizing remote services with service bus after KSB startup...");
272         long startTime = System.currentTimeMillis();
273         KsbApiServiceLocator.getServiceBus().synchronizeRemoteServices();
274         long endTime = System.currentTimeMillis();
275         LOG.info("...total time to synchronize remote services with service bus after KSB startup: " + (endTime - startTime));
276     }
277 
278     @Override
279 	protected void doAdditionalModuleStopLogic() throws Exception {
280 		for (int index = internalLifecycles.size() - 1; index >= 0; index--) {
281 			try {
282 				internalLifecycles.get(index).stop();
283 			} catch (Exception e) {
284 				LOG.error("Failed to properly execute shutdown logic.", e);
285 			}
286 		}
287 	}
288 
289 	/**
290      * Used to refresh the service registry after the Application Context is initialized.  This way any services that were exported on startup
291      * will be available in the service registry once startup is complete.
292      */
293     private void requeueMessages() {
294         LOG.info("Refreshing Service Registry to export services to the bus.");
295         KsbApiServiceLocator.getServiceBus().synchronizeLocalServices();
296         
297 		//automatically requeue documents sitting with status of 'R'
298 		MessageFetcher messageFetcher = new MessageFetcher((Integer) null);
299 		KSBServiceLocator.getThreadPool().execute(messageFetcher);
300     }
301     
302     protected boolean isMessagePersistenceEnabled() {
303     	return ConfigContext.getCurrentContextConfig().getBooleanProperty(KSBConstants.Config.MESSAGE_PERSISTENCE, true);
304     }
305     
306     protected boolean isBamEnabled() {
307     	return ConfigContext.getCurrentContextConfig().getBooleanProperty(Config.BAM_ENABLED, false);
308     }
309 
310 	protected void configureScheduler() {
311 		if (this.getExceptionMessagingScheduler() != null) {
312 			LOG.info("Configuring injected exception messaging Scheduler");
313 			ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.INJECTED_EXCEPTION_MESSAGE_SCHEDULER_KEY, this.getExceptionMessagingScheduler());
314 		}
315 	}
316 
317 	protected void configureDataSource() {
318 		if (isMessagePersistenceEnabled()) {
319 			if (getMessageDataSource() != null) {
320 				ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_MESSAGE_DATASOURCE, getMessageDataSource());
321 			}
322 			if (getNonTransactionalMessageDataSource() != null) {
323 	            ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_MESSAGE_NON_TRANSACTIONAL_DATASOURCE, getNonTransactionalMessageDataSource());
324 			}
325 		}
326         if (getRunMode().equals(RunMode.LOCAL)) {
327         	if (getRegistryDataSource() != null) {
328                 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_REGISTRY_DATASOURCE, getRegistryDataSource());
329             }
330         }
331         if (isBamEnabled()) {
332         	if (getBamDataSource() != null) {
333         		ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_BAM_DATASOURCE, getBamDataSource());
334         	}
335         }
336     }
337 
338 	protected void configurePlatformTransactionManager() {
339 		if (getPlatformTransactionManager() == null) {
340 			return;
341 		}
342 		ConfigContext.getCurrentContextConfig().putObject(RiceConstants.SPRING_TRANSACTION_MANAGER, getPlatformTransactionManager());
343 	}
344 	
345 	protected void configureAlternateEndpoints() {
346 		ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINT_LOCATIONS, getAlternateEndpointLocations());
347 		ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINTS, getAlternateEndpoints());
348 	}
349 
350 	/**
351      * Because our configuration is global, shutting down Rice does not get rid of objects stored there.  For that reason
352      * we need to manually clean these up.  This is most important in the case of the service bus because the configuration
353      * is used to store services to be exported.  If we don't clean this up then a shutdown/startup within the same
354      * class loading context causes the service list to be doubled and results in "multiple endpoint" error messages.
355      *
356      */
357     protected void cleanUpConfiguration() {
358         ConfigContext.getCurrentContextConfig().removeObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINTS);
359     }
360 
361 	public List<ServiceDefinition> getServices() {
362 		return this.services;
363 	}
364 
365 	public void setServices(List<ServiceDefinition> javaServices) {
366 		this.services = javaServices;
367 	}
368 
369 	public DataSource getMessageDataSource() {
370 		return this.messageDataSource;
371 	}
372 
373 	public void setMessageDataSource(DataSource messageDataSource) {
374 		this.messageDataSource = messageDataSource;
375 	}
376 
377     public DataSource getNonTransactionalMessageDataSource() {
378         return this.nonTransactionalMessageDataSource;
379     }
380 
381     public void setNonTransactionalMessageDataSource(DataSource nonTransactionalMessageDataSource) {
382         this.nonTransactionalMessageDataSource = nonTransactionalMessageDataSource;
383     }
384 
385     public DataSource getRegistryDataSource() {
386 		return this.registryDataSource;
387 	}
388 
389 	public void setRegistryDataSource(DataSource registryDataSource) {
390 		this.registryDataSource = registryDataSource;
391 	}
392 	
393 	public DataSource getBamDataSource() {
394 		return this.bamDataSource;
395 	}
396 
397 	public void setBamDataSource(DataSource bamDataSource) {
398 		this.bamDataSource = bamDataSource;
399 	}
400 
401 	public Scheduler getExceptionMessagingScheduler() {
402 		return this.exceptionMessagingScheduler;
403 	}
404 
405 	public void setExceptionMessagingScheduler(Scheduler exceptionMessagingScheduler) {
406 		this.exceptionMessagingScheduler = exceptionMessagingScheduler;
407 	}
408 
409 	public PlatformTransactionManager getPlatformTransactionManager() {
410 		return platformTransactionManager;
411 	}
412 
413 	public void setPlatformTransactionManager(PlatformTransactionManager springTransactionManager) {
414 		this.platformTransactionManager = springTransactionManager;
415 	}
416 
417     public List<AlternateEndpointLocation> getAlternateEndpointLocations() {
418 	    return this.alternateEndpointLocations;
419     }
420 
421     public void setAlternateEndpointLocations(List<AlternateEndpointLocation> alternateEndpointLocations) {
422 	    this.alternateEndpointLocations = alternateEndpointLocations;
423 	}
424 
425     public List<AlternateEndpoint> getAlternateEndpoints() {
426         return this.alternateEndpoints;
427     }
428 
429     public void setAlternateEndpoints(List<AlternateEndpoint> alternateEndpoints) {
430         this.alternateEndpoints = alternateEndpoints;
431     }
432     
433     private final class ServicePublisher extends BaseLifecycle {
434 
435     	private final List<ServiceDefinition> serviceDefinitions;
436     	
437     	ServicePublisher(List<ServiceDefinition> serviceDefinitions) {
438     		this.serviceDefinitions = serviceDefinitions;
439     	}
440     	
441 		@Override
442 		public void start() throws Exception {
443 			if (serviceDefinitions != null && !serviceDefinitions.isEmpty()) {
444 				LOG.debug("Configuring " + serviceDefinitions.size() + " services for application id " + CoreConfigHelper.getApplicationId() + " using config for classloader " + ClassLoaderUtils.getDefaultClassLoader());
445 				KsbApiServiceLocator.getServiceBus().publishServices(serviceDefinitions, true);
446 				super.start();
447 			}
448 		}
449     	
450     }
451     
452 }