View Javadoc

1   /**
2    * Copyright 2005-2012 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.ksb.messaging.config;
17  
18  import org.apache.commons.httpclient.contrib.ssl.EasySSLProtocolSocketFactory;
19  import org.apache.commons.httpclient.protocol.Protocol;
20  import org.apache.commons.httpclient.protocol.ProtocolSocketFactory;
21  import org.kuali.rice.core.api.config.CoreConfigHelper;
22  import org.kuali.rice.core.api.config.module.RunMode;
23  import org.kuali.rice.core.api.config.property.Config;
24  import org.kuali.rice.core.api.config.property.ConfigContext;
25  import org.kuali.rice.core.api.exception.RiceRuntimeException;
26  import org.kuali.rice.core.api.lifecycle.BaseLifecycle;
27  import org.kuali.rice.core.api.lifecycle.Lifecycle;
28  import org.kuali.rice.core.api.resourceloader.ResourceLoader;
29  import org.kuali.rice.core.api.util.ClassLoaderUtils;
30  import org.kuali.rice.core.api.util.RiceConstants;
31  import org.kuali.rice.core.framework.config.module.ModuleConfigurer;
32  import org.kuali.rice.core.framework.config.module.WebModuleConfiguration;
33  import org.kuali.rice.core.framework.lifecycle.ServiceDelegatingLifecycle;
34  import org.kuali.rice.core.framework.persistence.jpa.OrmUtils;
35  import org.kuali.rice.ksb.api.KsbApiConstants;
36  import org.kuali.rice.ksb.api.KsbApiServiceLocator;
37  import org.kuali.rice.ksb.api.bus.ServiceDefinition;
38  import org.kuali.rice.ksb.messaging.AlternateEndpoint;
39  import org.kuali.rice.ksb.messaging.AlternateEndpointLocation;
40  import org.kuali.rice.ksb.messaging.MessageFetcher;
41  import org.kuali.rice.ksb.messaging.resourceloader.KSBResourceLoaderFactory;
42  import org.kuali.rice.ksb.messaging.serviceconnectors.HttpInvokerConnector;
43  import org.kuali.rice.ksb.service.KSBServiceLocator;
44  import org.kuali.rice.ksb.util.KSBConstants;
45  import org.quartz.Scheduler;
46  import org.springframework.transaction.PlatformTransactionManager;
47  
48  import javax.sql.DataSource;
49  import java.util.ArrayList;
50  import java.util.Arrays;
51  import java.util.Collection;
52  import java.util.Collections;
53  import java.util.LinkedList;
54  import java.util.List;
55  
56  
57  /**
58   * Used to configure the embedded workflow. This could be used to configure
59   * embedded workflow programmatically but mostly this is a base class by which
60   * to hang specific configuration behavior off of through subclassing
61   * 
62   * @author Kuali Rice Team (rice.collab@kuali.org)
63   * 
64   */
65  public class KSBConfigurer extends ModuleConfigurer {
66  	
67  	private static final String SERVICE_BUS_CLIENT_SPRING = "classpath:org/kuali/rice/ksb/config/KsbServiceBusClientSpringBeans.xml";
68  	private static final String MESSAGE_CLIENT_SPRING = "classpath:org/kuali/rice/ksb/config/KsbMessageClientSpringBeans.xml";
69  	private static final String OJB_MESSAGE_CLIENT_SPRING = "classpath:org/kuali/rice/ksb/config/KsbOjbMessageClientSpringBeans.xml";
70  	private static final String BAM_SPRING = "classpath:org/kuali/rice/ksb/config/KsbBamSpringBeans.xml";
71  	private static final String OJB_BAM_SPRING = "classpath:org/kuali/rice/ksb/config/KsbOjbBamSpringBeans.xml";
72  	private static final String REGISTRY_SERVER_SPRING = "classpath:org/kuali/rice/ksb/config/KsbRegistryServerSpringBeans.xml";
73  	private static final String OJB_REGISTRY_SPRING = "classpath:org/kuali/rice/ksb/config/KsbOjbRegistrySpringBeans.xml";
74  	private static final String WEB_SPRING = "classpath:org/kuali/rice/ksb/config/KsbWebSpringBeans.xml";
75  
76  	private List<ServiceDefinition> services = new ArrayList<ServiceDefinition>();
77  	
78      private List<AlternateEndpointLocation> alternateEndpointLocations = new ArrayList<AlternateEndpointLocation>();
79  
80  	private List<AlternateEndpoint> alternateEndpoints = new ArrayList<AlternateEndpoint>();
81  
82  	private DataSource registryDataSource;
83  
84  	private DataSource messageDataSource;
85  	
86  	private DataSource nonTransactionalMessageDataSource;
87  	
88  	private DataSource bamDataSource;
89  
90  	private Scheduler exceptionMessagingScheduler;
91  
92  	private PlatformTransactionManager platformTransactionManager;
93  	
94  	private List<Lifecycle> internalLifecycles;
95  	
96  	public KSBConfigurer() {
97  		super(KsbApiConstants.KSB_MODULE_NAME);
98  		setValidRunModes(Arrays.asList(RunMode.REMOTE, RunMode.LOCAL));
99  		this.internalLifecycles = new ArrayList<Lifecycle>();
100 	}
101 	
102 	@Override
103 	public void addAdditonalToConfig() {
104 		configureDataSource();
105 		configureScheduler();
106 		configurePlatformTransactionManager();
107 		configureAlternateEndpoints();
108 	}
109 
110 	@Override
111 	public List<String> getPrimarySpringFiles(){
112 		final List<String> springFileLocations = new ArrayList<String>();
113 				
114 		boolean isJpa = OrmUtils.isJpaEnabled("rice.ksb");
115 		if (isJpa) {
116 			// TODO redo this once we're back to JPA
117         	// springFileLocations.add("classpath:org/kuali/rice/ksb/config/KSBJPASpringBeans.xml");
118         	throw new UnsupportedOperationException("JPA not currently supported for KSB");
119 		}
120 		
121 		springFileLocations.add(SERVICE_BUS_CLIENT_SPRING);
122 		
123 		if (isMessagePersistenceEnabled()) {
124 			springFileLocations.add(MESSAGE_CLIENT_SPRING);
125 			springFileLocations.add(OJB_MESSAGE_CLIENT_SPRING);
126 		}
127         
128         if (isBamEnabled()) {
129         	springFileLocations.add(BAM_SPRING);
130         	springFileLocations.add(OJB_BAM_SPRING);
131         }
132         
133         if (getRunMode().equals( RunMode.LOCAL )) {
134         	springFileLocations.add(REGISTRY_SERVER_SPRING);
135         	springFileLocations.add(OJB_REGISTRY_SPRING);
136         }
137         
138         return springFileLocations;
139 	}
140 
141     @Override
142     public boolean hasWebInterface() {
143         return true;
144     }
145 
146     @Override
147     protected WebModuleConfiguration loadWebModule() {
148         WebModuleConfiguration configuration = super.loadWebModule();
149         configuration.getWebSpringFiles().add(WEB_SPRING);
150         return configuration;
151     }
152 
153     @Override
154 	public Collection<ResourceLoader> getResourceLoadersToRegister() throws Exception{
155 		ResourceLoader ksbRemoteResourceLoader = KSBResourceLoaderFactory.createRootKSBRemoteResourceLoader();
156 		ksbRemoteResourceLoader.start();
157 		return Collections.singletonList(ksbRemoteResourceLoader);
158 	}
159 	
160 	@Override
161 	public List<Lifecycle> loadLifecycles() throws Exception {
162 		List<Lifecycle> lifecycles = new LinkedList<Lifecycle>();
163 		// this validation of our service list needs to happen after we've
164 		// loaded our configs so it's a lifecycle
165 		lifecycles.add(new BaseLifecycle() {
166 
167 			@Override
168 			public void start() throws Exception {
169 				// first check if we want to allow self-signed certificates for SSL communication
170 				if (Boolean.valueOf(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.KSB_ALLOW_SELF_SIGNED_SSL)).booleanValue()) {
171 				    Protocol.registerProtocol("https", new Protocol("https",
172 					    (ProtocolSocketFactory) new EasySSLProtocolSocketFactory(), 443));
173 				}
174 				super.start();
175 			}
176 		});
177 		return lifecycles;
178 	}
179 	
180 	
181 
182     @Override
183     public void doAdditonalConfigurerValidations() {
184         for (final ServiceDefinition serviceDef : KSBConfigurer.this.services) {
185 			serviceDef.validate();
186 		}
187     }
188 
189 	@Override
190 	public void doAdditionalContextStartedLogic() {
191 		ServicePublisher servicePublisher = new ServicePublisher(getServices());
192 		Lifecycle serviceBus = new ServiceDelegatingLifecycle(KsbApiServiceLocator.SERVICE_BUS);
193 		Lifecycle threadPool = new ServiceDelegatingLifecycle(KSBConstants.ServiceNames.THREAD_POOL_SERVICE);
194 		Lifecycle scheduledThreadPool = new ServiceDelegatingLifecycle(KSBConstants.ServiceNames.SCHEDULED_THREAD_POOL_SERVICE);
195 		
196 		try {
197 			servicePublisher.start();
198 			internalLifecycles.add(servicePublisher);
199 			serviceBus.start();
200 			internalLifecycles.add(serviceBus);
201 			threadPool.start();
202 			internalLifecycles.add(threadPool);
203 			scheduledThreadPool.start();
204 			internalLifecycles.add(scheduledThreadPool);
205 		} catch (Exception e) {
206 			if (e instanceof RuntimeException) {
207 				throw (RuntimeException)e;
208 			}
209 			throw new RiceRuntimeException("Failed to initialize KSB on context startup");
210 		}
211 
212 		requeueMessages();
213 	}
214 
215     @Override
216     protected void doAdditionalModuleStartLogic() throws Exception {
217         // this allows us to become aware of remote services, in case the application needs to use any of them during startup
218         LOG.info("Synchronizing remote services with service bus after KSB startup...");
219         long startTime = System.currentTimeMillis();
220         KsbApiServiceLocator.getServiceBus().synchronizeRemoteServices();
221         long endTime = System.currentTimeMillis();
222         LOG.info("...total time to synchronize remote services with service bus after KSB startup: " + (endTime - startTime));
223     }
224 
225     @Override
226 	protected void doAdditionalModuleStopLogic() throws Exception {
227 		for (int index = internalLifecycles.size() - 1; index >= 0; index--) {
228 			try {
229 				internalLifecycles.get(index).stop();
230 			} catch (Exception e) {
231 				LOG.error("Failed to properly execute shutdown logic.", e);
232 			}
233 		}
234 	}
235 
236 	/**
237      * Used to refresh the service registry after the Application Context is initialized.  This way any services that were exported on startup
238      * will be available in the service registry once startup is complete.
239      */
240     private void requeueMessages() {
241         LOG.info("Refreshing Service Registry to export services to the bus.");
242         KsbApiServiceLocator.getServiceBus().synchronizeLocalServices();
243         
244 		//automatically requeue documents sitting with status of 'R'
245 		MessageFetcher messageFetcher = new MessageFetcher((Integer) null);
246 		KSBServiceLocator.getThreadPool().execute(messageFetcher);
247     }
248     
249     protected boolean isMessagePersistenceEnabled() {
250     	return ConfigContext.getCurrentContextConfig().getBooleanProperty(KSBConstants.Config.MESSAGE_PERSISTENCE, true);
251     }
252     
253     protected boolean isBamEnabled() {
254     	return ConfigContext.getCurrentContextConfig().getBooleanProperty(Config.BAM_ENABLED, false);
255     }
256 
257 	protected void configureScheduler() {
258 		if (this.getExceptionMessagingScheduler() != null) {
259 			LOG.info("Configuring injected exception messaging Scheduler");
260 			ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.INJECTED_EXCEPTION_MESSAGE_SCHEDULER_KEY, this.getExceptionMessagingScheduler());
261 		}
262 	}
263 
264 	protected void configureDataSource() {
265 		if (isMessagePersistenceEnabled()) {
266 			if (getMessageDataSource() != null) {
267 				ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_MESSAGE_DATASOURCE, getMessageDataSource());
268 			}
269 			if (getNonTransactionalMessageDataSource() != null) {
270 	            ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_MESSAGE_NON_TRANSACTIONAL_DATASOURCE, getNonTransactionalMessageDataSource());
271 			}
272 		}
273         if (getRunMode().equals(RunMode.LOCAL)) {
274         	if (getRegistryDataSource() != null) {
275                 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_REGISTRY_DATASOURCE, getRegistryDataSource());
276             }
277         }
278         if (isBamEnabled()) {
279         	if (getBamDataSource() != null) {
280         		ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_BAM_DATASOURCE, getBamDataSource());
281         	}
282         }
283     }
284 
285 	protected void configurePlatformTransactionManager() {
286 		if (getPlatformTransactionManager() == null) {
287 			return;
288 		}
289 		ConfigContext.getCurrentContextConfig().putObject(RiceConstants.SPRING_TRANSACTION_MANAGER, getPlatformTransactionManager());
290 	}
291 	
292 	protected void configureAlternateEndpoints() {
293 		ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINT_LOCATIONS, getAlternateEndpointLocations());
294 		ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINTS, getAlternateEndpoints());
295 	}
296 	
297 	@Override
298 	public void doAdditionalContextStoppedLogic() {
299 		try {
300 			HttpInvokerConnector.shutdownIdleConnectionTimeout();
301 		} catch (Exception e) {
302 			LOG.error("Failed to shutdown idle connection timeout evictor thread.", e);
303 		}
304 	    cleanUpConfiguration();
305 	}
306 	
307 	/**
308      * Because our configuration is global, shutting down Rice does not get rid of objects stored there.  For that reason
309      * we need to manually clean these up.  This is most important in the case of the service bus because the configuration
310      * is used to store services to be exported.  If we don't clean this up then a shutdown/startup within the same
311      * class loading context causes the service list to be doubled and results in "multiple endpoint" error messages.
312      *
313      */
314     protected void cleanUpConfiguration() {
315         ConfigContext.getCurrentContextConfig().removeObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINTS);
316     }
317 
318 	public List<ServiceDefinition> getServices() {
319 		return this.services;
320 	}
321 
322 	public void setServices(List<ServiceDefinition> javaServices) {
323 		this.services = javaServices;
324 	}
325 
326 	public DataSource getMessageDataSource() {
327 		return this.messageDataSource;
328 	}
329 
330 	public void setMessageDataSource(DataSource messageDataSource) {
331 		this.messageDataSource = messageDataSource;
332 	}
333 
334     public DataSource getNonTransactionalMessageDataSource() {
335         return this.nonTransactionalMessageDataSource;
336     }
337 
338     public void setNonTransactionalMessageDataSource(DataSource nonTransactionalMessageDataSource) {
339         this.nonTransactionalMessageDataSource = nonTransactionalMessageDataSource;
340     }
341 
342     public DataSource getRegistryDataSource() {
343 		return this.registryDataSource;
344 	}
345 
346 	public void setRegistryDataSource(DataSource registryDataSource) {
347 		this.registryDataSource = registryDataSource;
348 	}
349 	
350 	public DataSource getBamDataSource() {
351 		return this.bamDataSource;
352 	}
353 
354 	public void setBamDataSource(DataSource bamDataSource) {
355 		this.bamDataSource = bamDataSource;
356 	}
357 
358 	public Scheduler getExceptionMessagingScheduler() {
359 		return this.exceptionMessagingScheduler;
360 	}
361 
362 	public void setExceptionMessagingScheduler(Scheduler exceptionMessagingScheduler) {
363 		this.exceptionMessagingScheduler = exceptionMessagingScheduler;
364 	}
365 
366 	public PlatformTransactionManager getPlatformTransactionManager() {
367 		return platformTransactionManager;
368 	}
369 
370 	public void setPlatformTransactionManager(PlatformTransactionManager springTransactionManager) {
371 		this.platformTransactionManager = springTransactionManager;
372 	}
373 
374     public List<AlternateEndpointLocation> getAlternateEndpointLocations() {
375 	    return this.alternateEndpointLocations;
376     }
377 
378     public void setAlternateEndpointLocations(List<AlternateEndpointLocation> alternateEndpointLocations) {
379 	    this.alternateEndpointLocations = alternateEndpointLocations;
380 	}
381 
382     public List<AlternateEndpoint> getAlternateEndpoints() {
383         return this.alternateEndpoints;
384     }
385 
386     public void setAlternateEndpoints(List<AlternateEndpoint> alternateEndpoints) {
387         this.alternateEndpoints = alternateEndpoints;
388     }
389     
390     private final class ServicePublisher extends BaseLifecycle {
391 
392     	private final List<ServiceDefinition> serviceDefinitions;
393     	
394     	ServicePublisher(List<ServiceDefinition> serviceDefinitions) {
395     		this.serviceDefinitions = serviceDefinitions;
396     	}
397     	
398 		@Override
399 		public void start() throws Exception {
400 			if (serviceDefinitions != null && !serviceDefinitions.isEmpty()) {
401 				LOG.debug("Configuring " + serviceDefinitions.size() + " services for application id " + CoreConfigHelper.getApplicationId() + " using config for classloader " + ClassLoaderUtils.getDefaultClassLoader());
402 				KsbApiServiceLocator.getServiceBus().publishServices(serviceDefinitions, true);
403 				super.start();
404 			}
405 		}
406     	
407     }
408     
409 }