View Javadoc

1   /**
2    * Copyright 2005-2012 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.ksb.messaging.config;
17  
18  import org.apache.commons.httpclient.contrib.ssl.EasySSLProtocolSocketFactory;
19  import org.apache.commons.httpclient.protocol.Protocol;
20  import org.apache.commons.httpclient.protocol.ProtocolSocketFactory;
21  import org.kuali.rice.core.api.config.CoreConfigHelper;
22  import org.kuali.rice.core.api.config.module.RunMode;
23  import org.kuali.rice.core.api.config.property.Config;
24  import org.kuali.rice.core.api.config.property.ConfigContext;
25  import org.kuali.rice.core.api.exception.RiceRuntimeException;
26  import org.kuali.rice.core.api.lifecycle.BaseLifecycle;
27  import org.kuali.rice.core.api.lifecycle.Lifecycle;
28  import org.kuali.rice.core.api.resourceloader.ResourceLoader;
29  import org.kuali.rice.core.api.util.ClassLoaderUtils;
30  import org.kuali.rice.core.api.util.RiceConstants;
31  import org.kuali.rice.core.framework.config.module.ModuleConfigurer;
32  import org.kuali.rice.core.framework.config.module.WebModuleConfiguration;
33  import org.kuali.rice.core.framework.lifecycle.ServiceDelegatingLifecycle;
34  import org.kuali.rice.core.framework.persistence.jpa.OrmUtils;
35  import org.kuali.rice.ksb.api.KsbApiConstants;
36  import org.kuali.rice.ksb.api.KsbApiServiceLocator;
37  import org.kuali.rice.ksb.api.bus.ServiceDefinition;
38  import org.kuali.rice.ksb.messaging.AlternateEndpoint;
39  import org.kuali.rice.ksb.messaging.AlternateEndpointLocation;
40  import org.kuali.rice.ksb.messaging.MessageFetcher;
41  import org.kuali.rice.ksb.messaging.resourceloader.KSBResourceLoaderFactory;
42  import org.kuali.rice.ksb.messaging.serviceconnectors.HttpInvokerConnector;
43  import org.kuali.rice.ksb.service.KSBServiceLocator;
44  import org.kuali.rice.ksb.util.KSBConstants;
45  import org.quartz.Scheduler;
46  import org.springframework.context.ApplicationEvent;
47  import org.springframework.context.event.ContextRefreshedEvent;
48  import org.springframework.context.event.ContextStoppedEvent;
49  import org.springframework.context.event.SmartApplicationListener;
50  import org.springframework.core.Ordered;
51  import org.springframework.transaction.PlatformTransactionManager;
52  
53  import javax.sql.DataSource;
54  import java.util.ArrayList;
55  import java.util.Arrays;
56  import java.util.Collection;
57  import java.util.Collections;
58  import java.util.LinkedList;
59  import java.util.List;
60  
61  
62  /**
63   * Used to configure the embedded workflow. This could be used to configure
64   * embedded workflow programmatically but mostly this is a base class by which
65   * to hang specific configuration behavior off of through subclassing
66   * 
67   * @author Kuali Rice Team (rice.collab@kuali.org)
68   * 
69   */
70  public class KSBConfigurer extends ModuleConfigurer implements SmartApplicationListener {
71  	
72  	private static final String SERVICE_BUS_CLIENT_SPRING = "classpath:org/kuali/rice/ksb/config/KsbServiceBusClientSpringBeans.xml";
73  	private static final String MESSAGE_CLIENT_SPRING = "classpath:org/kuali/rice/ksb/config/KsbMessageClientSpringBeans.xml";
74  	private static final String OJB_MESSAGE_CLIENT_SPRING = "classpath:org/kuali/rice/ksb/config/KsbOjbMessageClientSpringBeans.xml";
75  	private static final String BAM_SPRING = "classpath:org/kuali/rice/ksb/config/KsbBamSpringBeans.xml";
76  	private static final String OJB_BAM_SPRING = "classpath:org/kuali/rice/ksb/config/KsbOjbBamSpringBeans.xml";
77  	private static final String REGISTRY_SERVER_SPRING = "classpath:org/kuali/rice/ksb/config/KsbRegistryServerSpringBeans.xml";
78  	private static final String OJB_REGISTRY_SPRING = "classpath:org/kuali/rice/ksb/config/KsbOjbRegistrySpringBeans.xml";
79  	private static final String WEB_SPRING = "classpath:org/kuali/rice/ksb/config/KsbWebSpringBeans.xml";
80  
81  	private List<ServiceDefinition> services = new ArrayList<ServiceDefinition>();
82  	
83      private List<AlternateEndpointLocation> alternateEndpointLocations = new ArrayList<AlternateEndpointLocation>();
84  
85  	private List<AlternateEndpoint> alternateEndpoints = new ArrayList<AlternateEndpoint>();
86  
87  	private DataSource registryDataSource;
88  
89  	private DataSource messageDataSource;
90  	
91  	private DataSource nonTransactionalMessageDataSource;
92  	
93  	private DataSource bamDataSource;
94  
95  	private Scheduler exceptionMessagingScheduler;
96  
97  	private PlatformTransactionManager platformTransactionManager;
98  	
99  	private List<Lifecycle> internalLifecycles;
100 	
101 	public KSBConfigurer() {
102 		super(KsbApiConstants.KSB_MODULE_NAME);
103 		setValidRunModes(Arrays.asList(RunMode.THIN, RunMode.REMOTE, RunMode.LOCAL));
104 		this.internalLifecycles = new ArrayList<Lifecycle>();
105 	}
106 	
107 	@Override
108 	public void addAdditonalToConfig() {
109 		configureDataSource();
110 		configureScheduler();
111 		configurePlatformTransactionManager();
112 		configureAlternateEndpoints();
113 	}
114 
115 	@Override
116 	public List<String> getPrimarySpringFiles(){
117 		final List<String> springFileLocations = new ArrayList<String>();
118 
119 		springFileLocations.add(SERVICE_BUS_CLIENT_SPRING);
120 
121         if (getRunMode() != RunMode.THIN) {
122 
123             boolean isJpa = OrmUtils.isJpaEnabled("rice.ksb");
124             if (isJpa) {
125                 // TODO redo this once we're back to JPA
126                 // springFileLocations.add("classpath:org/kuali/rice/ksb/config/KSBJPASpringBeans.xml");
127                 throw new UnsupportedOperationException("JPA not currently supported for KSB");
128             }
129 
130 
131 		    if (isMessagePersistenceEnabled()) {
132 			    springFileLocations.add(MESSAGE_CLIENT_SPRING);
133 			    springFileLocations.add(OJB_MESSAGE_CLIENT_SPRING);
134 		    }
135         
136             if (isBamEnabled()) {
137         	    springFileLocations.add(BAM_SPRING);
138         	    springFileLocations.add(OJB_BAM_SPRING);
139             }
140         }
141 
142         if (getRunMode().equals( RunMode.LOCAL )) {
143         	springFileLocations.add(REGISTRY_SERVER_SPRING);
144         	springFileLocations.add(OJB_REGISTRY_SPRING);
145         }
146         
147         return springFileLocations;
148 	}
149 
150     @Override
151     public boolean hasWebInterface() {
152         return true;
153     }
154 
155     @Override
156     protected WebModuleConfiguration loadWebModule() {
157         WebModuleConfiguration configuration = super.loadWebModule();
158         configuration.getWebSpringFiles().add(WEB_SPRING);
159         return configuration;
160     }
161 
162     @Override
163 	public Collection<ResourceLoader> getResourceLoadersToRegister() throws Exception{
164 		ResourceLoader ksbRemoteResourceLoader = KSBResourceLoaderFactory.createRootKSBRemoteResourceLoader();
165 		ksbRemoteResourceLoader.start();
166 		return Collections.singletonList(ksbRemoteResourceLoader);
167 	}
168 	
169 	@Override
170 	public List<Lifecycle> loadLifecycles() throws Exception {
171 		List<Lifecycle> lifecycles = new LinkedList<Lifecycle>();
172 		// this validation of our service list needs to happen after we've
173 		// loaded our configs so it's a lifecycle
174 		lifecycles.add(new BaseLifecycle() {
175 
176 			@Override
177 			public void start() throws Exception {
178 				// first check if we want to allow self-signed certificates for SSL communication
179 				if (Boolean.valueOf(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.KSB_ALLOW_SELF_SIGNED_SSL)).booleanValue()) {
180 				    Protocol.registerProtocol("https", new Protocol("https",
181 					    (ProtocolSocketFactory) new EasySSLProtocolSocketFactory(), 443));
182 				}
183 				super.start();
184 			}
185 		});
186 		return lifecycles;
187 	}
188 	
189     protected void validateServices(List<ServiceDefinition> services) {
190         for (final ServiceDefinition serviceDef : this.services) {
191 			serviceDef.validate();
192 		}
193     }
194 
195     @Override
196     public void onApplicationEvent(ApplicationEvent applicationEvent) {
197         if (applicationEvent instanceof ContextRefreshedEvent) {
198             doAdditionalContextStartedLogic();
199         } else if (applicationEvent instanceof ContextStoppedEvent) {
200             doAdditionalContextStoppedLogic();
201         }
202     }
203 
204 	protected void doAdditionalContextStartedLogic() {
205         validateServices(getServices());
206 		ServicePublisher servicePublisher = new ServicePublisher(getServices());
207 		Lifecycle serviceBus = new ServiceDelegatingLifecycle(KsbApiServiceLocator.SERVICE_BUS);
208 		Lifecycle threadPool = new ServiceDelegatingLifecycle(KSBConstants.ServiceNames.THREAD_POOL_SERVICE);
209 		Lifecycle scheduledThreadPool = new ServiceDelegatingLifecycle(KSBConstants.ServiceNames.SCHEDULED_THREAD_POOL_SERVICE);
210 		
211 		try {
212 			servicePublisher.start();
213 			internalLifecycles.add(servicePublisher);
214 			serviceBus.start();
215 			internalLifecycles.add(serviceBus);
216 			threadPool.start();
217 			internalLifecycles.add(threadPool);
218 			scheduledThreadPool.start();
219 			internalLifecycles.add(scheduledThreadPool);
220 		} catch (Exception e) {
221 			if (e instanceof RuntimeException) {
222 				throw (RuntimeException)e;
223 			}
224 			throw new RiceRuntimeException("Failed to initialize KSB on context startup");
225 		}
226 
227 		requeueMessages();
228 	}
229 
230     protected void doAdditionalContextStoppedLogic() {
231         try {
232             HttpInvokerConnector.shutdownIdleConnectionTimeout();
233         } catch (Exception e) {
234             LOG.error("Failed to shutdown idle connection timeout evictor thread.", e);
235         }
236         cleanUpConfiguration();
237     }
238 
239     @Override
240     public boolean supportsEventType(Class<? extends ApplicationEvent> aClass) {
241         return true;
242     }
243 
244     @Override
245     public boolean supportsSourceType(Class<?> aClass) {
246         return true;
247     }
248 
249     @Override
250     public int getOrder() {
251         return Ordered.LOWEST_PRECEDENCE;
252     }
253 
254     @Override
255     protected void doAdditionalModuleStartLogic() throws Exception {
256         // this allows us to become aware of remote services, in case the application needs to use any of them during startup
257         LOG.info("Synchronizing remote services with service bus after KSB startup...");
258         long startTime = System.currentTimeMillis();
259         KsbApiServiceLocator.getServiceBus().synchronizeRemoteServices();
260         long endTime = System.currentTimeMillis();
261         LOG.info("...total time to synchronize remote services with service bus after KSB startup: " + (endTime - startTime));
262     }
263 
264     @Override
265 	protected void doAdditionalModuleStopLogic() throws Exception {
266 		for (int index = internalLifecycles.size() - 1; index >= 0; index--) {
267 			try {
268 				internalLifecycles.get(index).stop();
269 			} catch (Exception e) {
270 				LOG.error("Failed to properly execute shutdown logic.", e);
271 			}
272 		}
273 	}
274 
275 	/**
276      * Used to refresh the service registry after the Application Context is initialized.  This way any services that were exported on startup
277      * will be available in the service registry once startup is complete.
278      */
279     private void requeueMessages() {
280         LOG.info("Refreshing Service Registry to export services to the bus.");
281         KsbApiServiceLocator.getServiceBus().synchronizeLocalServices();
282         
283 		//automatically requeue documents sitting with status of 'R'
284 		MessageFetcher messageFetcher = new MessageFetcher((Integer) null);
285 		KSBServiceLocator.getThreadPool().execute(messageFetcher);
286     }
287     
288     protected boolean isMessagePersistenceEnabled() {
289     	return ConfigContext.getCurrentContextConfig().getBooleanProperty(KSBConstants.Config.MESSAGE_PERSISTENCE, true);
290     }
291     
292     protected boolean isBamEnabled() {
293     	return ConfigContext.getCurrentContextConfig().getBooleanProperty(Config.BAM_ENABLED, false);
294     }
295 
296 	protected void configureScheduler() {
297 		if (this.getExceptionMessagingScheduler() != null) {
298 			LOG.info("Configuring injected exception messaging Scheduler");
299 			ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.INJECTED_EXCEPTION_MESSAGE_SCHEDULER_KEY, this.getExceptionMessagingScheduler());
300 		}
301 	}
302 
303 	protected void configureDataSource() {
304 		if (isMessagePersistenceEnabled()) {
305 			if (getMessageDataSource() != null) {
306 				ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_MESSAGE_DATASOURCE, getMessageDataSource());
307 			}
308 			if (getNonTransactionalMessageDataSource() != null) {
309 	            ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_MESSAGE_NON_TRANSACTIONAL_DATASOURCE, getNonTransactionalMessageDataSource());
310 			}
311 		}
312         if (getRunMode().equals(RunMode.LOCAL)) {
313         	if (getRegistryDataSource() != null) {
314                 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_REGISTRY_DATASOURCE, getRegistryDataSource());
315             }
316         }
317         if (isBamEnabled()) {
318         	if (getBamDataSource() != null) {
319         		ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_BAM_DATASOURCE, getBamDataSource());
320         	}
321         }
322     }
323 
324 	protected void configurePlatformTransactionManager() {
325 		if (getPlatformTransactionManager() == null) {
326 			return;
327 		}
328 		ConfigContext.getCurrentContextConfig().putObject(RiceConstants.SPRING_TRANSACTION_MANAGER, getPlatformTransactionManager());
329 	}
330 	
331 	protected void configureAlternateEndpoints() {
332 		ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINT_LOCATIONS, getAlternateEndpointLocations());
333 		ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINTS, getAlternateEndpoints());
334 	}
335 
336 	/**
337      * Because our configuration is global, shutting down Rice does not get rid of objects stored there.  For that reason
338      * we need to manually clean these up.  This is most important in the case of the service bus because the configuration
339      * is used to store services to be exported.  If we don't clean this up then a shutdown/startup within the same
340      * class loading context causes the service list to be doubled and results in "multiple endpoint" error messages.
341      *
342      */
343     protected void cleanUpConfiguration() {
344         ConfigContext.getCurrentContextConfig().removeObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINTS);
345     }
346 
347 	public List<ServiceDefinition> getServices() {
348 		return this.services;
349 	}
350 
351 	public void setServices(List<ServiceDefinition> javaServices) {
352 		this.services = javaServices;
353 	}
354 
355 	public DataSource getMessageDataSource() {
356 		return this.messageDataSource;
357 	}
358 
359 	public void setMessageDataSource(DataSource messageDataSource) {
360 		this.messageDataSource = messageDataSource;
361 	}
362 
363     public DataSource getNonTransactionalMessageDataSource() {
364         return this.nonTransactionalMessageDataSource;
365     }
366 
367     public void setNonTransactionalMessageDataSource(DataSource nonTransactionalMessageDataSource) {
368         this.nonTransactionalMessageDataSource = nonTransactionalMessageDataSource;
369     }
370 
371     public DataSource getRegistryDataSource() {
372 		return this.registryDataSource;
373 	}
374 
375 	public void setRegistryDataSource(DataSource registryDataSource) {
376 		this.registryDataSource = registryDataSource;
377 	}
378 	
379 	public DataSource getBamDataSource() {
380 		return this.bamDataSource;
381 	}
382 
383 	public void setBamDataSource(DataSource bamDataSource) {
384 		this.bamDataSource = bamDataSource;
385 	}
386 
387 	public Scheduler getExceptionMessagingScheduler() {
388 		return this.exceptionMessagingScheduler;
389 	}
390 
391 	public void setExceptionMessagingScheduler(Scheduler exceptionMessagingScheduler) {
392 		this.exceptionMessagingScheduler = exceptionMessagingScheduler;
393 	}
394 
395 	public PlatformTransactionManager getPlatformTransactionManager() {
396 		return platformTransactionManager;
397 	}
398 
399 	public void setPlatformTransactionManager(PlatformTransactionManager springTransactionManager) {
400 		this.platformTransactionManager = springTransactionManager;
401 	}
402 
403     public List<AlternateEndpointLocation> getAlternateEndpointLocations() {
404 	    return this.alternateEndpointLocations;
405     }
406 
407     public void setAlternateEndpointLocations(List<AlternateEndpointLocation> alternateEndpointLocations) {
408 	    this.alternateEndpointLocations = alternateEndpointLocations;
409 	}
410 
411     public List<AlternateEndpoint> getAlternateEndpoints() {
412         return this.alternateEndpoints;
413     }
414 
415     public void setAlternateEndpoints(List<AlternateEndpoint> alternateEndpoints) {
416         this.alternateEndpoints = alternateEndpoints;
417     }
418     
419     private final class ServicePublisher extends BaseLifecycle {
420 
421     	private final List<ServiceDefinition> serviceDefinitions;
422     	
423     	ServicePublisher(List<ServiceDefinition> serviceDefinitions) {
424     		this.serviceDefinitions = serviceDefinitions;
425     	}
426     	
427 		@Override
428 		public void start() throws Exception {
429 			if (serviceDefinitions != null && !serviceDefinitions.isEmpty()) {
430 				LOG.debug("Configuring " + serviceDefinitions.size() + " services for application id " + CoreConfigHelper.getApplicationId() + " using config for classloader " + ClassLoaderUtils.getDefaultClassLoader());
431 				KsbApiServiceLocator.getServiceBus().publishServices(serviceDefinitions, true);
432 				super.start();
433 			}
434 		}
435     	
436     }
437     
438 }