1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.ksb.messaging.config;
17
18 import org.apache.commons.httpclient.contrib.ssl.EasySSLProtocolSocketFactory;
19 import org.apache.commons.httpclient.protocol.Protocol;
20 import org.apache.commons.httpclient.protocol.ProtocolSocketFactory;
21 import org.kuali.rice.core.api.config.CoreConfigHelper;
22 import org.kuali.rice.core.api.config.module.RunMode;
23 import org.kuali.rice.core.api.config.property.Config;
24 import org.kuali.rice.core.api.config.property.ConfigContext;
25 import org.kuali.rice.core.api.exception.RiceRuntimeException;
26 import org.kuali.rice.core.api.lifecycle.BaseLifecycle;
27 import org.kuali.rice.core.api.lifecycle.Lifecycle;
28 import org.kuali.rice.core.api.resourceloader.ResourceLoader;
29 import org.kuali.rice.core.api.util.ClassLoaderUtils;
30 import org.kuali.rice.core.api.util.RiceConstants;
31 import org.kuali.rice.core.framework.config.module.ModuleConfigurer;
32 import org.kuali.rice.core.framework.config.module.WebModuleConfiguration;
33 import org.kuali.rice.core.framework.lifecycle.ServiceDelegatingLifecycle;
34 import org.kuali.rice.core.framework.persistence.jpa.OrmUtils;
35 import org.kuali.rice.ksb.api.KsbApiConstants;
36 import org.kuali.rice.ksb.api.KsbApiServiceLocator;
37 import org.kuali.rice.ksb.api.bus.ServiceDefinition;
38 import org.kuali.rice.ksb.messaging.AlternateEndpoint;
39 import org.kuali.rice.ksb.messaging.AlternateEndpointLocation;
40 import org.kuali.rice.ksb.messaging.MessageFetcher;
41 import org.kuali.rice.ksb.messaging.resourceloader.KSBResourceLoaderFactory;
42 import org.kuali.rice.ksb.messaging.serviceconnectors.HttpInvokerConnector;
43 import org.kuali.rice.ksb.service.KSBServiceLocator;
44 import org.kuali.rice.ksb.util.KSBConstants;
45 import org.quartz.Scheduler;
46 import org.springframework.context.ApplicationEvent;
47 import org.springframework.context.event.ContextRefreshedEvent;
48 import org.springframework.context.event.ContextStoppedEvent;
49 import org.springframework.context.event.SmartApplicationListener;
50 import org.springframework.core.Ordered;
51 import org.springframework.transaction.PlatformTransactionManager;
52
53 import javax.sql.DataSource;
54 import java.util.ArrayList;
55 import java.util.Arrays;
56 import java.util.Collection;
57 import java.util.Collections;
58 import java.util.LinkedList;
59 import java.util.List;
60
61
62
63
64
65
66
67
68
69
70 public class KSBConfigurer extends ModuleConfigurer implements SmartApplicationListener {
71
72 private static final String SERVICE_BUS_CLIENT_SPRING = "classpath:org/kuali/rice/ksb/config/KsbServiceBusClientSpringBeans.xml";
73 private static final String MESSAGE_CLIENT_SPRING = "classpath:org/kuali/rice/ksb/config/KsbMessageClientSpringBeans.xml";
74 private static final String OJB_MESSAGE_CLIENT_SPRING = "classpath:org/kuali/rice/ksb/config/KsbOjbMessageClientSpringBeans.xml";
75 private static final String BAM_SPRING = "classpath:org/kuali/rice/ksb/config/KsbBamSpringBeans.xml";
76 private static final String OJB_BAM_SPRING = "classpath:org/kuali/rice/ksb/config/KsbOjbBamSpringBeans.xml";
77 private static final String REGISTRY_SERVER_SPRING = "classpath:org/kuali/rice/ksb/config/KsbRegistryServerSpringBeans.xml";
78 private static final String OJB_REGISTRY_SPRING = "classpath:org/kuali/rice/ksb/config/KsbOjbRegistrySpringBeans.xml";
79 private static final String WEB_SPRING = "classpath:org/kuali/rice/ksb/config/KsbWebSpringBeans.xml";
80
81 private List<ServiceDefinition> services = new ArrayList<ServiceDefinition>();
82
83 private List<AlternateEndpointLocation> alternateEndpointLocations = new ArrayList<AlternateEndpointLocation>();
84
85 private List<AlternateEndpoint> alternateEndpoints = new ArrayList<AlternateEndpoint>();
86
87 private DataSource registryDataSource;
88
89 private DataSource messageDataSource;
90
91 private DataSource nonTransactionalMessageDataSource;
92
93 private DataSource bamDataSource;
94
95 private Scheduler exceptionMessagingScheduler;
96
97 private PlatformTransactionManager platformTransactionManager;
98
99 private List<Lifecycle> internalLifecycles;
100
101 public KSBConfigurer() {
102 super(KsbApiConstants.KSB_MODULE_NAME);
103 setValidRunModes(Arrays.asList(RunMode.THIN, RunMode.REMOTE, RunMode.LOCAL));
104 this.internalLifecycles = new ArrayList<Lifecycle>();
105 }
106
107 @Override
108 public void addAdditonalToConfig() {
109 configureDataSource();
110 configureScheduler();
111 configurePlatformTransactionManager();
112 configureAlternateEndpoints();
113 }
114
115 @Override
116 public List<String> getPrimarySpringFiles(){
117 final List<String> springFileLocations = new ArrayList<String>();
118
119 springFileLocations.add(SERVICE_BUS_CLIENT_SPRING);
120
121 if (getRunMode() != RunMode.THIN) {
122
123 boolean isJpa = OrmUtils.isJpaEnabled("rice.ksb");
124 if (isJpa) {
125
126
127 throw new UnsupportedOperationException("JPA not currently supported for KSB");
128 }
129
130
131
132
133
134 springFileLocations.add(MESSAGE_CLIENT_SPRING);
135 springFileLocations.add(OJB_MESSAGE_CLIENT_SPRING);
136
137
138 if (isBamEnabled()) {
139 springFileLocations.add(BAM_SPRING);
140 springFileLocations.add(OJB_BAM_SPRING);
141 }
142 }
143
144 if (getRunMode().equals( RunMode.LOCAL )) {
145 springFileLocations.add(REGISTRY_SERVER_SPRING);
146 springFileLocations.add(OJB_REGISTRY_SPRING);
147 }
148
149 return springFileLocations;
150 }
151
152 @Override
153 public boolean hasWebInterface() {
154 return true;
155 }
156
157
158 @Override
159 public boolean shouldRenderWebInterface() {
160 if (ConfigContext.getCurrentContextConfig().getBooleanProperty(KSBConstants.Config.WEB_FORCE_ENABLE)) {
161 return true;
162 }
163 return super.shouldRenderWebInterface();
164 }
165
166 @Override
167 protected WebModuleConfiguration loadWebModule() {
168 WebModuleConfiguration configuration = super.loadWebModule();
169 configuration.getWebSpringFiles().add(WEB_SPRING);
170 return configuration;
171 }
172
173 @Override
174 public Collection<ResourceLoader> getResourceLoadersToRegister() throws Exception{
175 ResourceLoader ksbRemoteResourceLoader = KSBResourceLoaderFactory.createRootKSBRemoteResourceLoader();
176 ksbRemoteResourceLoader.start();
177 return Collections.singletonList(ksbRemoteResourceLoader);
178 }
179
180 @Override
181 public List<Lifecycle> loadLifecycles() throws Exception {
182 List<Lifecycle> lifecycles = new LinkedList<Lifecycle>();
183
184
185 lifecycles.add(new BaseLifecycle() {
186
187 @Override
188 public void start() throws Exception {
189
190 if (Boolean.valueOf(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.KSB_ALLOW_SELF_SIGNED_SSL)).booleanValue()) {
191 Protocol.registerProtocol("https", new Protocol("https",
192 (ProtocolSocketFactory) new EasySSLProtocolSocketFactory(), 443));
193 }
194 super.start();
195 }
196 });
197 return lifecycles;
198 }
199
200 protected void validateServices(List<ServiceDefinition> services) {
201 for (final ServiceDefinition serviceDef : this.services) {
202 serviceDef.validate();
203 }
204 }
205
206 @Override
207 public void onApplicationEvent(ApplicationEvent applicationEvent) {
208 if (applicationEvent instanceof ContextRefreshedEvent) {
209 doAdditionalContextStartedLogic();
210 } else if (applicationEvent instanceof ContextStoppedEvent) {
211 doAdditionalContextStoppedLogic();
212 }
213 }
214
215 protected void doAdditionalContextStartedLogic() {
216 validateServices(getServices());
217 ServicePublisher servicePublisher = new ServicePublisher(getServices());
218 Lifecycle serviceBus = new ServiceDelegatingLifecycle(KsbApiServiceLocator.SERVICE_BUS);
219 Lifecycle threadPool = new ServiceDelegatingLifecycle(KSBConstants.ServiceNames.THREAD_POOL_SERVICE);
220 Lifecycle scheduledThreadPool = new ServiceDelegatingLifecycle(KSBConstants.ServiceNames.SCHEDULED_THREAD_POOL_SERVICE);
221
222 try {
223 servicePublisher.start();
224 internalLifecycles.add(servicePublisher);
225 serviceBus.start();
226 internalLifecycles.add(serviceBus);
227 threadPool.start();
228 internalLifecycles.add(threadPool);
229 scheduledThreadPool.start();
230 internalLifecycles.add(scheduledThreadPool);
231 } catch (Exception e) {
232 if (e instanceof RuntimeException) {
233 throw (RuntimeException)e;
234 }
235 throw new RiceRuntimeException("Failed to initialize KSB on context startup");
236 }
237
238
239 if (getRunMode() != RunMode.THIN) {
240 requeueMessages();
241 }
242 }
243
244 protected void doAdditionalContextStoppedLogic() {
245 try {
246 HttpInvokerConnector.shutdownIdleConnectionTimeout();
247 } catch (Exception e) {
248 LOG.error("Failed to shutdown idle connection timeout evictor thread.", e);
249 }
250 cleanUpConfiguration();
251 }
252
253 @Override
254 public boolean supportsEventType(Class<? extends ApplicationEvent> aClass) {
255 return true;
256 }
257
258 @Override
259 public boolean supportsSourceType(Class<?> aClass) {
260 return true;
261 }
262
263 @Override
264 public int getOrder() {
265 return Ordered.LOWEST_PRECEDENCE;
266 }
267
268 @Override
269 protected void doAdditionalModuleStartLogic() throws Exception {
270
271 LOG.info("Synchronizing remote services with service bus after KSB startup...");
272 long startTime = System.currentTimeMillis();
273 KsbApiServiceLocator.getServiceBus().synchronizeRemoteServices();
274 long endTime = System.currentTimeMillis();
275 LOG.info("...total time to synchronize remote services with service bus after KSB startup: " + (endTime - startTime));
276 }
277
278 @Override
279 protected void doAdditionalModuleStopLogic() throws Exception {
280 for (int index = internalLifecycles.size() - 1; index >= 0; index--) {
281 try {
282 internalLifecycles.get(index).stop();
283 } catch (Exception e) {
284 LOG.error("Failed to properly execute shutdown logic.", e);
285 }
286 }
287 }
288
289
290
291
292
293 private void requeueMessages() {
294 LOG.info("Refreshing Service Registry to export services to the bus.");
295 KsbApiServiceLocator.getServiceBus().synchronizeLocalServices();
296
297
298 MessageFetcher messageFetcher = new MessageFetcher((Integer) null);
299 KSBServiceLocator.getThreadPool().execute(messageFetcher);
300 }
301
302 protected boolean isMessagePersistenceEnabled() {
303 return ConfigContext.getCurrentContextConfig().getBooleanProperty(KSBConstants.Config.MESSAGE_PERSISTENCE, true);
304 }
305
306 protected boolean isBamEnabled() {
307 return ConfigContext.getCurrentContextConfig().getBooleanProperty(Config.BAM_ENABLED, false);
308 }
309
310 protected void configureScheduler() {
311 if (this.getExceptionMessagingScheduler() != null) {
312 LOG.info("Configuring injected exception messaging Scheduler");
313 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.INJECTED_EXCEPTION_MESSAGE_SCHEDULER_KEY, this.getExceptionMessagingScheduler());
314 }
315 }
316
317 protected void configureDataSource() {
318 if (isMessagePersistenceEnabled()) {
319 if (getMessageDataSource() != null) {
320 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_MESSAGE_DATASOURCE, getMessageDataSource());
321 }
322 if (getNonTransactionalMessageDataSource() != null) {
323 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_MESSAGE_NON_TRANSACTIONAL_DATASOURCE, getNonTransactionalMessageDataSource());
324 }
325 }
326 if (getRunMode().equals(RunMode.LOCAL)) {
327 if (getRegistryDataSource() != null) {
328 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_REGISTRY_DATASOURCE, getRegistryDataSource());
329 }
330 }
331 if (isBamEnabled()) {
332 if (getBamDataSource() != null) {
333 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_BAM_DATASOURCE, getBamDataSource());
334 }
335 }
336 }
337
338 protected void configurePlatformTransactionManager() {
339 if (getPlatformTransactionManager() == null) {
340 return;
341 }
342 ConfigContext.getCurrentContextConfig().putObject(RiceConstants.SPRING_TRANSACTION_MANAGER, getPlatformTransactionManager());
343 }
344
345 protected void configureAlternateEndpoints() {
346 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINT_LOCATIONS, getAlternateEndpointLocations());
347 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINTS, getAlternateEndpoints());
348 }
349
350
351
352
353
354
355
356
357 protected void cleanUpConfiguration() {
358 ConfigContext.getCurrentContextConfig().removeObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINTS);
359 }
360
361 public List<ServiceDefinition> getServices() {
362 return this.services;
363 }
364
365 public void setServices(List<ServiceDefinition> javaServices) {
366 this.services = javaServices;
367 }
368
369 public DataSource getMessageDataSource() {
370 return this.messageDataSource;
371 }
372
373 public void setMessageDataSource(DataSource messageDataSource) {
374 this.messageDataSource = messageDataSource;
375 }
376
377 public DataSource getNonTransactionalMessageDataSource() {
378 return this.nonTransactionalMessageDataSource;
379 }
380
381 public void setNonTransactionalMessageDataSource(DataSource nonTransactionalMessageDataSource) {
382 this.nonTransactionalMessageDataSource = nonTransactionalMessageDataSource;
383 }
384
385 public DataSource getRegistryDataSource() {
386 return this.registryDataSource;
387 }
388
389 public void setRegistryDataSource(DataSource registryDataSource) {
390 this.registryDataSource = registryDataSource;
391 }
392
393 public DataSource getBamDataSource() {
394 return this.bamDataSource;
395 }
396
397 public void setBamDataSource(DataSource bamDataSource) {
398 this.bamDataSource = bamDataSource;
399 }
400
401 public Scheduler getExceptionMessagingScheduler() {
402 return this.exceptionMessagingScheduler;
403 }
404
405 public void setExceptionMessagingScheduler(Scheduler exceptionMessagingScheduler) {
406 this.exceptionMessagingScheduler = exceptionMessagingScheduler;
407 }
408
409 public PlatformTransactionManager getPlatformTransactionManager() {
410 return platformTransactionManager;
411 }
412
413 public void setPlatformTransactionManager(PlatformTransactionManager springTransactionManager) {
414 this.platformTransactionManager = springTransactionManager;
415 }
416
417 public List<AlternateEndpointLocation> getAlternateEndpointLocations() {
418 return this.alternateEndpointLocations;
419 }
420
421 public void setAlternateEndpointLocations(List<AlternateEndpointLocation> alternateEndpointLocations) {
422 this.alternateEndpointLocations = alternateEndpointLocations;
423 }
424
425 public List<AlternateEndpoint> getAlternateEndpoints() {
426 return this.alternateEndpoints;
427 }
428
429 public void setAlternateEndpoints(List<AlternateEndpoint> alternateEndpoints) {
430 this.alternateEndpoints = alternateEndpoints;
431 }
432
433 private final class ServicePublisher extends BaseLifecycle {
434
435 private final List<ServiceDefinition> serviceDefinitions;
436
437 ServicePublisher(List<ServiceDefinition> serviceDefinitions) {
438 this.serviceDefinitions = serviceDefinitions;
439 }
440
441 @Override
442 public void start() throws Exception {
443 if (serviceDefinitions != null && !serviceDefinitions.isEmpty()) {
444 LOG.debug("Configuring " + serviceDefinitions.size() + " services for application id " + CoreConfigHelper.getApplicationId() + " using config for classloader " + ClassLoaderUtils.getDefaultClassLoader());
445 KsbApiServiceLocator.getServiceBus().publishServices(serviceDefinitions, true);
446 super.start();
447 }
448 }
449
450 }
451
452 }