1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.ksb.messaging.config;
17
18 import org.kuali.rice.core.api.config.CoreConfigHelper;
19 import org.kuali.rice.core.api.config.module.RunMode;
20 import org.kuali.rice.core.api.config.property.Config;
21 import org.kuali.rice.core.api.config.property.ConfigContext;
22 import org.kuali.rice.core.api.exception.RiceRuntimeException;
23 import org.kuali.rice.core.api.lifecycle.BaseLifecycle;
24 import org.kuali.rice.core.api.lifecycle.Lifecycle;
25 import org.kuali.rice.core.api.resourceloader.ResourceLoader;
26 import org.kuali.rice.core.api.util.ClassLoaderUtils;
27 import org.kuali.rice.core.api.util.RiceConstants;
28 import org.kuali.rice.core.framework.config.module.ModuleConfigurer;
29 import org.kuali.rice.core.framework.config.module.WebModuleConfiguration;
30 import org.kuali.rice.core.framework.lifecycle.ServiceDelegatingLifecycle;
31 import org.kuali.rice.ksb.api.KsbApiConstants;
32 import org.kuali.rice.ksb.api.KsbApiServiceLocator;
33 import org.kuali.rice.ksb.api.bus.ServiceDefinition;
34 import org.kuali.rice.ksb.messaging.AlternateEndpoint;
35 import org.kuali.rice.ksb.messaging.AlternateEndpointLocation;
36 import org.kuali.rice.ksb.messaging.MessageFetcher;
37 import org.kuali.rice.ksb.messaging.resourceloader.KSBResourceLoaderFactory;
38 import org.kuali.rice.ksb.service.KSBServiceLocator;
39 import org.kuali.rice.ksb.util.KSBConstants;
40 import org.quartz.Scheduler;
41 import org.springframework.context.ApplicationEvent;
42 import org.springframework.context.event.ContextRefreshedEvent;
43 import org.springframework.context.event.ContextStoppedEvent;
44 import org.springframework.context.event.SmartApplicationListener;
45 import org.springframework.core.Ordered;
46 import org.springframework.transaction.PlatformTransactionManager;
47
48 import javax.sql.DataSource;
49 import javax.xml.ws.WebServiceException;
50 import java.util.ArrayList;
51 import java.util.Arrays;
52 import java.util.Collection;
53 import java.util.Collections;
54 import java.util.LinkedList;
55 import java.util.List;
56
57
58
59
60
61
62
63
64 public class KSBConfigurer extends ModuleConfigurer implements SmartApplicationListener {
65
66 private static final String SERVICE_BUS_CLIENT_SPRING = "classpath:org/kuali/rice/ksb/config/KsbServiceBusClientSpringBeans.xml";
67 private static final String MESSAGE_CLIENT_SPRING = "classpath:org/kuali/rice/ksb/config/KsbMessageClientSpringBeans.xml";
68 private static final String JPA_MESSAGE_CLIENT_SPRING = "classpath:org/kuali/rice/ksb/config/KsbJpaMessageClientSpringBeans.xml";
69 private static final String BAM_SPRING = "classpath:org/kuali/rice/ksb/config/KsbBamSpringBeans.xml";
70 private static final String REGISTRY_SERVER_SPRING = "classpath:org/kuali/rice/ksb/config/KsbRegistryServerSpringBeans.xml";
71 private static final String JPA_REGISTRY_SPRING = "classpath:org/kuali/rice/ksb/config/KsbJpaRegistrySpringBeans.xml";
72 private static final String WEB_SPRING = "classpath:org/kuali/rice/ksb/config/KsbWebSpringBeans.xml";
73
74 private List<ServiceDefinition> services = new ArrayList<ServiceDefinition>();
75
76 private List<AlternateEndpointLocation> alternateEndpointLocations = new ArrayList<AlternateEndpointLocation>();
77
78 private List<AlternateEndpoint> alternateEndpoints = new ArrayList<AlternateEndpoint>();
79
80 private DataSource registryDataSource;
81
82 private DataSource messageDataSource;
83
84 private DataSource nonTransactionalMessageDataSource;
85
86 private DataSource bamDataSource;
87
88 private Scheduler exceptionMessagingScheduler;
89
90 private PlatformTransactionManager platformTransactionManager;
91
92 private List<Lifecycle> internalLifecycles;
93
94 public KSBConfigurer() {
95 super(KsbApiConstants.KSB_MODULE_NAME);
96 setValidRunModes(Arrays.asList(RunMode.THIN, RunMode.REMOTE, RunMode.LOCAL));
97 this.internalLifecycles = new ArrayList<Lifecycle>();
98 }
99
100 @Override
101 public void addAdditonalToConfig() {
102 configureDataSource();
103 configureScheduler();
104 configurePlatformTransactionManager();
105 configureAlternateEndpoints();
106 }
107
108 @Override
109 public List<String> getPrimarySpringFiles() {
110 LOG.info("KSBConfigurer:getPrimarySpringFiles: getRunMode => " + getRunMode());
111 final List<String> springFileLocations = new ArrayList<String>();
112
113 springFileLocations.add(SERVICE_BUS_CLIENT_SPRING);
114
115 if (getRunMode() != RunMode.THIN) {
116
117
118
119
120 springFileLocations.add(MESSAGE_CLIENT_SPRING);
121 springFileLocations.add(JPA_MESSAGE_CLIENT_SPRING);
122
123
124 if (isBamEnabled()) {
125 springFileLocations.add(BAM_SPRING);
126 }
127 }
128
129 if (getRunMode().equals(RunMode.LOCAL)) {
130 springFileLocations.add(JPA_REGISTRY_SPRING);
131 springFileLocations.add(REGISTRY_SERVER_SPRING);
132 }
133
134 return springFileLocations;
135 }
136
137 @Override
138 public boolean hasWebInterface() {
139 return true;
140 }
141
142
143 @Override
144 public boolean shouldRenderWebInterface() {
145 if (ConfigContext.getCurrentContextConfig().getBooleanProperty(KSBConstants.Config.WEB_FORCE_ENABLE)) {
146 return true;
147 }
148 return super.shouldRenderWebInterface();
149 }
150
151 @Override
152 protected WebModuleConfiguration loadWebModule() {
153 WebModuleConfiguration configuration = super.loadWebModule();
154 configuration.getWebSpringFiles().add(WEB_SPRING);
155 return configuration;
156 }
157
158 @Override
159 public Collection<ResourceLoader> getResourceLoadersToRegister() throws Exception {
160 ResourceLoader ksbRemoteResourceLoader = KSBResourceLoaderFactory.createRootKSBRemoteResourceLoader();
161 ksbRemoteResourceLoader.start();
162 return Collections.singletonList(ksbRemoteResourceLoader);
163 }
164
165 @Override
166 public List<Lifecycle> loadLifecycles() throws Exception {
167 List<Lifecycle> lifecycles = new LinkedList<Lifecycle>();
168
169
170
171 lifecycles.add(new BaseLifecycle() {
172 @Override
173 public void start() throws Exception {
174 super.start();
175 }
176 });
177
178 return lifecycles;
179 }
180
181 protected void validateServices(List<ServiceDefinition> services) {
182 for (final ServiceDefinition serviceDef : this.services) {
183 serviceDef.validate();
184 }
185 }
186
187 @Override
188 public void onApplicationEvent(ApplicationEvent applicationEvent) {
189 if (applicationEvent instanceof ContextRefreshedEvent) {
190 doAdditionalContextStartedLogic();
191 } else if (applicationEvent instanceof ContextStoppedEvent) {
192 doAdditionalContextStoppedLogic();
193 }
194 }
195
196 protected void doAdditionalContextStartedLogic() {
197 validateServices(getServices());
198 ServicePublisher servicePublisher = new ServicePublisher(getServices());
199 Lifecycle serviceBus = new ServiceDelegatingLifecycle(KsbApiServiceLocator.SERVICE_BUS);
200 Lifecycle threadPool = new ServiceDelegatingLifecycle(KSBConstants.ServiceNames.THREAD_POOL_SERVICE);
201 Lifecycle scheduledThreadPool = new ServiceDelegatingLifecycle(KSBConstants.ServiceNames.SCHEDULED_THREAD_POOL_SERVICE);
202
203 try {
204 servicePublisher.start();
205 internalLifecycles.add(servicePublisher);
206 serviceBus.start();
207 internalLifecycles.add(serviceBus);
208 threadPool.start();
209 internalLifecycles.add(threadPool);
210 scheduledThreadPool.start();
211 internalLifecycles.add(scheduledThreadPool);
212 } catch (Exception e) {
213 if (e instanceof RuntimeException) {
214 throw (RuntimeException) e;
215 }
216 throw new RiceRuntimeException("Failed to initialize KSB on context startup");
217 }
218
219
220 if (getRunMode() != RunMode.THIN) {
221 requeueMessages();
222 }
223 }
224
225 protected void doAdditionalContextStoppedLogic() {
226 cleanUpConfiguration();
227 }
228
229 @Override
230 public boolean supportsEventType(Class<? extends ApplicationEvent> aClass) {
231 return true;
232 }
233
234 @Override
235 public boolean supportsSourceType(Class<?> aClass) {
236 return true;
237 }
238
239 @Override
240 public int getOrder() {
241 return Ordered.LOWEST_PRECEDENCE;
242 }
243
244 @Override
245 protected void doAdditionalModuleStartLogic() throws Exception {
246
247
248
249
250
251 try {
252
253 LOG.info("Synchronizing remote services with service bus after KSB startup...");
254 long startTime = System.currentTimeMillis();
255 KsbApiServiceLocator.getServiceBus().synchronizeRemoteServices();
256 long endTime = System.currentTimeMillis();
257 LOG.info("...total time to synchronize remote services with service bus after KSB startup: " + (endTime - startTime));
258 } catch (WebServiceException e) {
259 LOG.error("Caught web service exception synchronizing service bus while configuring KSB", e);
260 }
261 }
262
263 @Override
264 protected void doAdditionalModuleStopLogic() throws Exception {
265 for (int index = internalLifecycles.size() - 1; index >= 0; index--) {
266 try {
267 internalLifecycles.get(index).stop();
268 } catch (Exception e) {
269 LOG.error("Failed to properly execute shutdown logic.", e);
270 }
271 }
272 }
273
274
275
276
277
278
279 private void requeueMessages() {
280 LOG.info("Refreshing Service Registry to export services to the bus.");
281 try {
282 KsbApiServiceLocator.getServiceBus().synchronizeLocalServices();
283 }catch (WebServiceException e) {
284 LOG.error("Caught web service exception while starting the KSB Configurer", e);
285 }
286
287
288 MessageFetcher messageFetcher = new MessageFetcher((Integer) null);
289 KSBServiceLocator.getThreadPool().execute(messageFetcher);
290 }
291
292 protected boolean isMessagePersistenceEnabled() {
293 return ConfigContext.getCurrentContextConfig().getBooleanProperty(KSBConstants.Config.MESSAGE_PERSISTENCE, true);
294 }
295
296 protected boolean isBamEnabled() {
297 return ConfigContext.getCurrentContextConfig().getBooleanProperty(Config.BAM_ENABLED, false);
298 }
299
300 protected void configureScheduler() {
301 if (this.getExceptionMessagingScheduler() != null) {
302 LOG.info("Configuring injected exception messaging Scheduler");
303 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.INJECTED_EXCEPTION_MESSAGE_SCHEDULER_KEY, this.getExceptionMessagingScheduler());
304 }
305 }
306
307 protected void configureDataSource() {
308 if (isMessagePersistenceEnabled()) {
309 if (getMessageDataSource() != null) {
310 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_MESSAGE_DATASOURCE, getMessageDataSource());
311 }
312 if (getNonTransactionalMessageDataSource() != null) {
313 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_MESSAGE_NON_TRANSACTIONAL_DATASOURCE, getNonTransactionalMessageDataSource());
314 }
315 }
316 if (getRunMode().equals(RunMode.LOCAL)) {
317 if (getRegistryDataSource() != null) {
318 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_REGISTRY_DATASOURCE, getRegistryDataSource());
319 }
320 }
321 if (isBamEnabled()) {
322 if (getBamDataSource() != null) {
323 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_BAM_DATASOURCE, getBamDataSource());
324 }
325 }
326 }
327
328 protected void configurePlatformTransactionManager() {
329 if (getPlatformTransactionManager() == null) {
330 return;
331 }
332 ConfigContext.getCurrentContextConfig().putObject(RiceConstants.SPRING_TRANSACTION_MANAGER, getPlatformTransactionManager());
333 }
334
335 protected void configureAlternateEndpoints() {
336 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINT_LOCATIONS, getAlternateEndpointLocations());
337 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINTS, getAlternateEndpoints());
338 }
339
340
341
342
343
344
345
346
347 protected void cleanUpConfiguration() {
348 ConfigContext.getCurrentContextConfig().removeObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINTS);
349 }
350
351 public List<ServiceDefinition> getServices() {
352 return this.services;
353 }
354
355 public void setServices(List<ServiceDefinition> javaServices) {
356 this.services = javaServices;
357 }
358
359 public DataSource getMessageDataSource() {
360 return this.messageDataSource;
361 }
362
363 public void setMessageDataSource(DataSource messageDataSource) {
364 this.messageDataSource = messageDataSource;
365 }
366
367 public DataSource getNonTransactionalMessageDataSource() {
368 return this.nonTransactionalMessageDataSource;
369 }
370
371 public void setNonTransactionalMessageDataSource(DataSource nonTransactionalMessageDataSource) {
372 this.nonTransactionalMessageDataSource = nonTransactionalMessageDataSource;
373 }
374
375 public DataSource getRegistryDataSource() {
376 return this.registryDataSource;
377 }
378
379 public void setRegistryDataSource(DataSource registryDataSource) {
380 this.registryDataSource = registryDataSource;
381 }
382
383 public DataSource getBamDataSource() {
384 return this.bamDataSource;
385 }
386
387 public void setBamDataSource(DataSource bamDataSource) {
388 this.bamDataSource = bamDataSource;
389 }
390
391 public Scheduler getExceptionMessagingScheduler() {
392 return this.exceptionMessagingScheduler;
393 }
394
395 public void setExceptionMessagingScheduler(Scheduler exceptionMessagingScheduler) {
396 this.exceptionMessagingScheduler = exceptionMessagingScheduler;
397 }
398
399 public PlatformTransactionManager getPlatformTransactionManager() {
400 return platformTransactionManager;
401 }
402
403 public void setPlatformTransactionManager(PlatformTransactionManager springTransactionManager) {
404 this.platformTransactionManager = springTransactionManager;
405 }
406
407 public List<AlternateEndpointLocation> getAlternateEndpointLocations() {
408 return this.alternateEndpointLocations;
409 }
410
411 public void setAlternateEndpointLocations(List<AlternateEndpointLocation> alternateEndpointLocations) {
412 this.alternateEndpointLocations = alternateEndpointLocations;
413 }
414
415 public List<AlternateEndpoint> getAlternateEndpoints() {
416 return this.alternateEndpoints;
417 }
418
419 public void setAlternateEndpoints(List<AlternateEndpoint> alternateEndpoints) {
420 this.alternateEndpoints = alternateEndpoints;
421 }
422
423 private final class ServicePublisher extends BaseLifecycle {
424
425 private final List<ServiceDefinition> serviceDefinitions;
426
427 ServicePublisher(List<ServiceDefinition> serviceDefinitions) {
428 this.serviceDefinitions = serviceDefinitions;
429 }
430
431 @Override
432 public void start() throws Exception {
433 if (serviceDefinitions != null && !serviceDefinitions.isEmpty()) {
434 LOG.debug("Configuring " + serviceDefinitions.size() + " services for application id " + CoreConfigHelper.getApplicationId() + " using config for classloader " + ClassLoaderUtils.getDefaultClassLoader());
435 KsbApiServiceLocator.getServiceBus().publishServices(serviceDefinitions, true);
436 super.start();
437 }
438 }
439
440 }
441
442 }