View Javadoc

1   /**
2    * Copyright 2005-2012 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.ksb.messaging.config;
17  
18  import org.apache.commons.httpclient.contrib.ssl.EasySSLProtocolSocketFactory;
19  import org.apache.commons.httpclient.protocol.Protocol;
20  import org.apache.commons.httpclient.protocol.ProtocolSocketFactory;
21  import org.kuali.rice.core.api.config.CoreConfigHelper;
22  import org.kuali.rice.core.api.config.module.RunMode;
23  import org.kuali.rice.core.api.config.property.Config;
24  import org.kuali.rice.core.api.config.property.ConfigContext;
25  import org.kuali.rice.core.api.exception.RiceRuntimeException;
26  import org.kuali.rice.core.api.lifecycle.BaseLifecycle;
27  import org.kuali.rice.core.api.lifecycle.Lifecycle;
28  import org.kuali.rice.core.api.resourceloader.ResourceLoader;
29  import org.kuali.rice.core.api.util.ClassLoaderUtils;
30  import org.kuali.rice.core.api.util.RiceConstants;
31  import org.kuali.rice.core.framework.config.module.ModuleConfigurer;
32  import org.kuali.rice.core.framework.config.module.WebModuleConfiguration;
33  import org.kuali.rice.core.framework.lifecycle.ServiceDelegatingLifecycle;
34  import org.kuali.rice.core.framework.persistence.jpa.OrmUtils;
35  import org.kuali.rice.ksb.api.KsbApiConstants;
36  import org.kuali.rice.ksb.api.KsbApiServiceLocator;
37  import org.kuali.rice.ksb.api.bus.ServiceDefinition;
38  import org.kuali.rice.ksb.messaging.AlternateEndpoint;
39  import org.kuali.rice.ksb.messaging.AlternateEndpointLocation;
40  import org.kuali.rice.ksb.messaging.MessageFetcher;
41  import org.kuali.rice.ksb.messaging.resourceloader.KSBResourceLoaderFactory;
42  import org.kuali.rice.ksb.messaging.serviceconnectors.HttpInvokerConnector;
43  import org.kuali.rice.ksb.service.KSBServiceLocator;
44  import org.kuali.rice.ksb.util.KSBConstants;
45  import org.quartz.Scheduler;
46  import org.springframework.transaction.PlatformTransactionManager;
47  
48  import javax.sql.DataSource;
49  import java.util.ArrayList;
50  import java.util.Arrays;
51  import java.util.Collection;
52  import java.util.Collections;
53  import java.util.LinkedList;
54  import java.util.List;
55  
56  
57  /**
58   * Used to configure the embedded workflow. This could be used to configure
59   * embedded workflow programmatically but mostly this is a base class by which
60   * to hang specific configuration behavior off of through subclassing
61   * 
62   * @author Kuali Rice Team (rice.collab@kuali.org)
63   * 
64   */
65  public class KSBConfigurer extends ModuleConfigurer {
66  	
67  	private static final String SERVICE_BUS_CLIENT_SPRING = "classpath:org/kuali/rice/ksb/config/KsbServiceBusClientSpringBeans.xml";
68  	private static final String MESSAGE_CLIENT_SPRING = "classpath:org/kuali/rice/ksb/config/KsbMessageClientSpringBeans.xml";
69  	private static final String OJB_MESSAGE_CLIENT_SPRING = "classpath:org/kuali/rice/ksb/config/KsbOjbMessageClientSpringBeans.xml";
70  	private static final String BAM_SPRING = "classpath:org/kuali/rice/ksb/config/KsbBamSpringBeans.xml";
71  	private static final String OJB_BAM_SPRING = "classpath:org/kuali/rice/ksb/config/KsbOjbBamSpringBeans.xml";
72  	private static final String REGISTRY_SERVER_SPRING = "classpath:org/kuali/rice/ksb/config/KsbRegistryServerSpringBeans.xml";
73  	private static final String OJB_REGISTRY_SPRING = "classpath:org/kuali/rice/ksb/config/KsbOjbRegistrySpringBeans.xml";
74  	private static final String WEB_SPRING = "classpath:org/kuali/rice/ksb/config/KsbWebSpringBeans.xml";
75  
76  	private List<ServiceDefinition> services = new ArrayList<ServiceDefinition>();
77  	
78      private List<AlternateEndpointLocation> alternateEndpointLocations = new ArrayList<AlternateEndpointLocation>();
79  
80  	private List<AlternateEndpoint> alternateEndpoints = new ArrayList<AlternateEndpoint>();
81  
82  	private DataSource registryDataSource;
83  
84  	private DataSource messageDataSource;
85  	
86  	private DataSource nonTransactionalMessageDataSource;
87  	
88  	private DataSource bamDataSource;
89  
90  	private Scheduler exceptionMessagingScheduler;
91  
92  	private PlatformTransactionManager platformTransactionManager;
93  	
94  	private List<Lifecycle> internalLifecycles;
95  	
96  	public KSBConfigurer() {
97  		super(KsbApiConstants.KSB_MODULE_NAME);
98  		setValidRunModes(Arrays.asList(RunMode.REMOTE, RunMode.LOCAL));
99  		this.internalLifecycles = new ArrayList<Lifecycle>();
100 	}
101 	
102 	@Override
103 	public void addAdditonalToConfig() {
104 		configureDataSource();
105 		configureScheduler();
106 		configurePlatformTransactionManager();
107 		configureAlternateEndpoints();
108 	}
109 
110 	@Override
111 	public List<String> getPrimarySpringFiles(){
112 		final List<String> springFileLocations = new ArrayList<String>();
113 				
114 		boolean isJpa = OrmUtils.isJpaEnabled("rice.ksb");
115 		if (isJpa) {
116 			// TODO redo this once we're back to JPA
117         	// springFileLocations.add("classpath:org/kuali/rice/ksb/config/KSBJPASpringBeans.xml");
118         	throw new UnsupportedOperationException("JPA not currently supported for KSB");
119 		}
120 		
121 		springFileLocations.add(SERVICE_BUS_CLIENT_SPRING);
122 		
123 		if (isMessagePersistenceEnabled()) {
124 			springFileLocations.add(MESSAGE_CLIENT_SPRING);
125 			springFileLocations.add(OJB_MESSAGE_CLIENT_SPRING);
126 		}
127         
128         if (isBamEnabled()) {
129         	springFileLocations.add(BAM_SPRING);
130         	springFileLocations.add(OJB_BAM_SPRING);
131         }
132         
133         if (getRunMode().equals( RunMode.LOCAL )) {
134         	springFileLocations.add(REGISTRY_SERVER_SPRING);
135         	springFileLocations.add(OJB_REGISTRY_SPRING);
136         }
137         
138         return springFileLocations;
139 	}
140 
141     @Override
142     public boolean hasWebInterface() {
143         return true;
144     }
145 
146     @Override
147     protected WebModuleConfiguration loadWebModule() {
148         WebModuleConfiguration configuration = super.loadWebModule();
149         configuration.getWebSpringFiles().add(WEB_SPRING);
150         return configuration;
151     }
152 
153     @Override
154 	public Collection<ResourceLoader> getResourceLoadersToRegister() throws Exception{
155 		ResourceLoader ksbRemoteResourceLoader = KSBResourceLoaderFactory.createRootKSBRemoteResourceLoader();
156 		ksbRemoteResourceLoader.start();
157 		return Collections.singletonList(ksbRemoteResourceLoader);
158 	}
159 	
160 	@Override
161 	public List<Lifecycle> loadLifecycles() throws Exception {
162 		List<Lifecycle> lifecycles = new LinkedList<Lifecycle>();
163 		// this validation of our service list needs to happen after we've
164 		// loaded our configs so it's a lifecycle
165 		lifecycles.add(new BaseLifecycle() {
166 
167 			@Override
168 			public void start() throws Exception {
169 				// first check if we want to allow self-signed certificates for SSL communication
170 				if (Boolean.valueOf(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.KSB_ALLOW_SELF_SIGNED_SSL)).booleanValue()) {
171 				    Protocol.registerProtocol("https", new Protocol("https",
172 					    (ProtocolSocketFactory) new EasySSLProtocolSocketFactory(), 443));
173 				}
174 				super.start();
175 			}
176 		});
177 		return lifecycles;
178 	}
179 	
180 	
181 
182     @Override
183     public void doAdditonalConfigurerValidations() {
184         for (final ServiceDefinition serviceDef : KSBConfigurer.this.services) {
185 			serviceDef.validate();
186 		}
187     }
188 
189 	@Override
190 	public void doAdditionalContextStartedLogic() {
191 		ServicePublisher servicePublisher = new ServicePublisher(getServices());
192 		Lifecycle serviceBus = new ServiceDelegatingLifecycle(KsbApiServiceLocator.SERVICE_BUS);
193 		Lifecycle threadPool = new ServiceDelegatingLifecycle(KSBConstants.ServiceNames.THREAD_POOL_SERVICE);
194 		Lifecycle scheduledThreadPool = new ServiceDelegatingLifecycle(KSBConstants.ServiceNames.SCHEDULED_THREAD_POOL_SERVICE);
195 		
196 		try {
197 			servicePublisher.start();
198 			internalLifecycles.add(servicePublisher);
199 			serviceBus.start();
200 			internalLifecycles.add(serviceBus);
201 			threadPool.start();
202 			internalLifecycles.add(threadPool);
203 			scheduledThreadPool.start();
204 			internalLifecycles.add(scheduledThreadPool);
205 		} catch (Exception e) {
206 			if (e instanceof RuntimeException) {
207 				throw (RuntimeException)e;
208 			}
209 			throw new RiceRuntimeException("Failed to initialize KSB on context startup");
210 		}
211 
212 		requeueMessages();
213 	}
214 
215 	@Override
216 	protected void doAdditionalModuleStopLogic() throws Exception {
217 		for (int index = internalLifecycles.size() - 1; index >= 0; index--) {
218 			try {
219 				internalLifecycles.get(index).stop();
220 			} catch (Exception e) {
221 				LOG.error("Failed to properly execute shutdown logic.", e);
222 			}
223 		}
224 	}
225 
226 	/**
227      * Used to refresh the service registry after the Application Context is initialized.  This way any services that were exported on startup
228      * will be available in the service registry once startup is complete.
229      */
230     private void requeueMessages() {
231         LOG.info("Refreshing Service Registry to export services to the bus.");
232         KsbApiServiceLocator.getServiceBus().synchronize();
233         
234 		//automatically requeue documents sitting with status of 'R'
235 		MessageFetcher messageFetcher = new MessageFetcher((Integer) null);
236 		KSBServiceLocator.getThreadPool().execute(messageFetcher);
237     }
238     
239     protected boolean isMessagePersistenceEnabled() {
240     	return ConfigContext.getCurrentContextConfig().getBooleanProperty(KSBConstants.Config.MESSAGE_PERSISTENCE, true);
241     }
242     
243     protected boolean isBamEnabled() {
244     	return ConfigContext.getCurrentContextConfig().getBooleanProperty(Config.BAM_ENABLED, false);
245     }
246 
247 	protected void configureScheduler() {
248 		if (this.getExceptionMessagingScheduler() != null) {
249 			LOG.info("Configuring injected exception messaging Scheduler");
250 			ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.INJECTED_EXCEPTION_MESSAGE_SCHEDULER_KEY, this.getExceptionMessagingScheduler());
251 		}
252 	}
253 
254 	protected void configureDataSource() {
255 		if (isMessagePersistenceEnabled()) {
256 			if (getMessageDataSource() != null) {
257 				ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_MESSAGE_DATASOURCE, getMessageDataSource());
258 			}
259 			if (getNonTransactionalMessageDataSource() != null) {
260 	            ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_MESSAGE_NON_TRANSACTIONAL_DATASOURCE, getNonTransactionalMessageDataSource());
261 			}
262 		}
263         if (getRunMode().equals(RunMode.LOCAL)) {
264         	if (getRegistryDataSource() != null) {
265                 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_REGISTRY_DATASOURCE, getRegistryDataSource());
266             }
267         }
268         if (isBamEnabled()) {
269         	if (getBamDataSource() != null) {
270         		ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_BAM_DATASOURCE, getBamDataSource());
271         	}
272         }
273     }
274 
275 	protected void configurePlatformTransactionManager() {
276 		if (getPlatformTransactionManager() == null) {
277 			return;
278 		}
279 		ConfigContext.getCurrentContextConfig().putObject(RiceConstants.SPRING_TRANSACTION_MANAGER, getPlatformTransactionManager());
280 	}
281 	
282 	protected void configureAlternateEndpoints() {
283 		ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINT_LOCATIONS, getAlternateEndpointLocations());
284 		ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINTS, getAlternateEndpoints());
285 	}
286 	
287 	@Override
288 	public void doAdditionalContextStoppedLogic() {
289 		try {
290 			HttpInvokerConnector.shutdownIdleConnectionTimeout();
291 		} catch (Exception e) {
292 			LOG.error("Failed to shutdown idle connection timeout evictor thread.", e);
293 		}
294 	    cleanUpConfiguration();
295 	}
296 	
297 	/**
298      * Because our configuration is global, shutting down Rice does not get rid of objects stored there.  For that reason
299      * we need to manually clean these up.  This is most important in the case of the service bus because the configuration
300      * is used to store services to be exported.  If we don't clean this up then a shutdown/startup within the same
301      * class loading context causes the service list to be doubled and results in "multiple endpoint" error messages.
302      *
303      */
304     protected void cleanUpConfiguration() {
305         ConfigContext.getCurrentContextConfig().removeObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINTS);
306     }
307 
308 	public List<ServiceDefinition> getServices() {
309 		return this.services;
310 	}
311 
312 	public void setServices(List<ServiceDefinition> javaServices) {
313 		this.services = javaServices;
314 	}
315 
316 	public DataSource getMessageDataSource() {
317 		return this.messageDataSource;
318 	}
319 
320 	public void setMessageDataSource(DataSource messageDataSource) {
321 		this.messageDataSource = messageDataSource;
322 	}
323 
324     public DataSource getNonTransactionalMessageDataSource() {
325         return this.nonTransactionalMessageDataSource;
326     }
327 
328     public void setNonTransactionalMessageDataSource(DataSource nonTransactionalMessageDataSource) {
329         this.nonTransactionalMessageDataSource = nonTransactionalMessageDataSource;
330     }
331 
332     public DataSource getRegistryDataSource() {
333 		return this.registryDataSource;
334 	}
335 
336 	public void setRegistryDataSource(DataSource registryDataSource) {
337 		this.registryDataSource = registryDataSource;
338 	}
339 	
340 	public DataSource getBamDataSource() {
341 		return this.bamDataSource;
342 	}
343 
344 	public void setBamDataSource(DataSource bamDataSource) {
345 		this.bamDataSource = bamDataSource;
346 	}
347 
348 	public Scheduler getExceptionMessagingScheduler() {
349 		return this.exceptionMessagingScheduler;
350 	}
351 
352 	public void setExceptionMessagingScheduler(Scheduler exceptionMessagingScheduler) {
353 		this.exceptionMessagingScheduler = exceptionMessagingScheduler;
354 	}
355 
356 	public PlatformTransactionManager getPlatformTransactionManager() {
357 		return platformTransactionManager;
358 	}
359 
360 	public void setPlatformTransactionManager(PlatformTransactionManager springTransactionManager) {
361 		this.platformTransactionManager = springTransactionManager;
362 	}
363 
364     public List<AlternateEndpointLocation> getAlternateEndpointLocations() {
365 	    return this.alternateEndpointLocations;
366     }
367 
368     public void setAlternateEndpointLocations(List<AlternateEndpointLocation> alternateEndpointLocations) {
369 	    this.alternateEndpointLocations = alternateEndpointLocations;
370 	}
371 
372     public List<AlternateEndpoint> getAlternateEndpoints() {
373         return this.alternateEndpoints;
374     }
375 
376     public void setAlternateEndpoints(List<AlternateEndpoint> alternateEndpoints) {
377         this.alternateEndpoints = alternateEndpoints;
378     }
379     
380     private final class ServicePublisher extends BaseLifecycle {
381 
382     	private final List<ServiceDefinition> serviceDefinitions;
383     	
384     	ServicePublisher(List<ServiceDefinition> serviceDefinitions) {
385     		this.serviceDefinitions = serviceDefinitions;
386     	}
387     	
388 		@Override
389 		public void start() throws Exception {
390 			if (serviceDefinitions != null && !serviceDefinitions.isEmpty()) {
391 				LOG.debug("Configuring " + serviceDefinitions.size() + " services for application id " + CoreConfigHelper.getApplicationId() + " using config for classloader " + ClassLoaderUtils.getDefaultClassLoader());
392 				KsbApiServiceLocator.getServiceBus().publishServices(serviceDefinitions, true);
393 				super.start();
394 			}
395 		}
396     	
397     }
398     
399 }