1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.ksb.messaging.config;
17
18 import org.apache.commons.httpclient.contrib.ssl.EasySSLProtocolSocketFactory;
19 import org.apache.commons.httpclient.protocol.Protocol;
20 import org.apache.commons.httpclient.protocol.ProtocolSocketFactory;
21 import org.kuali.rice.core.api.config.CoreConfigHelper;
22 import org.kuali.rice.core.api.config.module.RunMode;
23 import org.kuali.rice.core.api.config.property.Config;
24 import org.kuali.rice.core.api.config.property.ConfigContext;
25 import org.kuali.rice.core.api.exception.RiceRuntimeException;
26 import org.kuali.rice.core.api.lifecycle.BaseLifecycle;
27 import org.kuali.rice.core.api.lifecycle.Lifecycle;
28 import org.kuali.rice.core.api.resourceloader.ResourceLoader;
29 import org.kuali.rice.core.api.util.ClassLoaderUtils;
30 import org.kuali.rice.core.api.util.RiceConstants;
31 import org.kuali.rice.core.framework.config.module.ModuleConfigurer;
32 import org.kuali.rice.core.framework.config.module.WebModuleConfiguration;
33 import org.kuali.rice.core.framework.lifecycle.ServiceDelegatingLifecycle;
34 import org.kuali.rice.core.framework.persistence.jpa.OrmUtils;
35 import org.kuali.rice.ksb.api.KsbApiConstants;
36 import org.kuali.rice.ksb.api.KsbApiServiceLocator;
37 import org.kuali.rice.ksb.api.bus.ServiceDefinition;
38 import org.kuali.rice.ksb.messaging.AlternateEndpoint;
39 import org.kuali.rice.ksb.messaging.AlternateEndpointLocation;
40 import org.kuali.rice.ksb.messaging.MessageFetcher;
41 import org.kuali.rice.ksb.messaging.resourceloader.KSBResourceLoaderFactory;
42 import org.kuali.rice.ksb.messaging.serviceconnectors.HttpInvokerConnector;
43 import org.kuali.rice.ksb.service.KSBServiceLocator;
44 import org.kuali.rice.ksb.util.KSBConstants;
45 import org.quartz.Scheduler;
46 import org.springframework.transaction.PlatformTransactionManager;
47
48 import javax.sql.DataSource;
49 import java.util.ArrayList;
50 import java.util.Arrays;
51 import java.util.Collection;
52 import java.util.Collections;
53 import java.util.LinkedList;
54 import java.util.List;
55
56
57
58
59
60
61
62
63
64
65 public class KSBConfigurer extends ModuleConfigurer {
66
67 private static final String SERVICE_BUS_CLIENT_SPRING = "classpath:org/kuali/rice/ksb/config/KsbServiceBusClientSpringBeans.xml";
68 private static final String MESSAGE_CLIENT_SPRING = "classpath:org/kuali/rice/ksb/config/KsbMessageClientSpringBeans.xml";
69 private static final String OJB_MESSAGE_CLIENT_SPRING = "classpath:org/kuali/rice/ksb/config/KsbOjbMessageClientSpringBeans.xml";
70 private static final String BAM_SPRING = "classpath:org/kuali/rice/ksb/config/KsbBamSpringBeans.xml";
71 private static final String OJB_BAM_SPRING = "classpath:org/kuali/rice/ksb/config/KsbOjbBamSpringBeans.xml";
72 private static final String REGISTRY_SERVER_SPRING = "classpath:org/kuali/rice/ksb/config/KsbRegistryServerSpringBeans.xml";
73 private static final String OJB_REGISTRY_SPRING = "classpath:org/kuali/rice/ksb/config/KsbOjbRegistrySpringBeans.xml";
74 private static final String WEB_SPRING = "classpath:org/kuali/rice/ksb/config/KsbWebSpringBeans.xml";
75
76 private List<ServiceDefinition> services = new ArrayList<ServiceDefinition>();
77
78 private List<AlternateEndpointLocation> alternateEndpointLocations = new ArrayList<AlternateEndpointLocation>();
79
80 private List<AlternateEndpoint> alternateEndpoints = new ArrayList<AlternateEndpoint>();
81
82 private DataSource registryDataSource;
83
84 private DataSource messageDataSource;
85
86 private DataSource nonTransactionalMessageDataSource;
87
88 private DataSource bamDataSource;
89
90 private Scheduler exceptionMessagingScheduler;
91
92 private PlatformTransactionManager platformTransactionManager;
93
94 private List<Lifecycle> internalLifecycles;
95
96 public KSBConfigurer() {
97 super(KsbApiConstants.KSB_MODULE_NAME);
98 setValidRunModes(Arrays.asList(RunMode.REMOTE, RunMode.LOCAL));
99 this.internalLifecycles = new ArrayList<Lifecycle>();
100 }
101
102 @Override
103 public void addAdditonalToConfig() {
104 configureDataSource();
105 configureScheduler();
106 configurePlatformTransactionManager();
107 configureAlternateEndpoints();
108 }
109
110 @Override
111 public List<String> getPrimarySpringFiles(){
112 final List<String> springFileLocations = new ArrayList<String>();
113
114 boolean isJpa = OrmUtils.isJpaEnabled("rice.ksb");
115 if (isJpa) {
116
117
118 throw new UnsupportedOperationException("JPA not currently supported for KSB");
119 }
120
121 springFileLocations.add(SERVICE_BUS_CLIENT_SPRING);
122
123 if (isMessagePersistenceEnabled()) {
124 springFileLocations.add(MESSAGE_CLIENT_SPRING);
125 springFileLocations.add(OJB_MESSAGE_CLIENT_SPRING);
126 }
127
128 if (isBamEnabled()) {
129 springFileLocations.add(BAM_SPRING);
130 springFileLocations.add(OJB_BAM_SPRING);
131 }
132
133 if (getRunMode().equals( RunMode.LOCAL )) {
134 springFileLocations.add(REGISTRY_SERVER_SPRING);
135 springFileLocations.add(OJB_REGISTRY_SPRING);
136 }
137
138 return springFileLocations;
139 }
140
141 @Override
142 public boolean hasWebInterface() {
143 return true;
144 }
145
146 @Override
147 protected WebModuleConfiguration loadWebModule() {
148 WebModuleConfiguration configuration = super.loadWebModule();
149 configuration.getWebSpringFiles().add(WEB_SPRING);
150 return configuration;
151 }
152
153 @Override
154 public Collection<ResourceLoader> getResourceLoadersToRegister() throws Exception{
155 ResourceLoader ksbRemoteResourceLoader = KSBResourceLoaderFactory.createRootKSBRemoteResourceLoader();
156 ksbRemoteResourceLoader.start();
157 return Collections.singletonList(ksbRemoteResourceLoader);
158 }
159
160 @Override
161 public List<Lifecycle> loadLifecycles() throws Exception {
162 List<Lifecycle> lifecycles = new LinkedList<Lifecycle>();
163
164
165 lifecycles.add(new BaseLifecycle() {
166
167 @Override
168 public void start() throws Exception {
169
170 if (Boolean.valueOf(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.KSB_ALLOW_SELF_SIGNED_SSL)).booleanValue()) {
171 Protocol.registerProtocol("https", new Protocol("https",
172 (ProtocolSocketFactory) new EasySSLProtocolSocketFactory(), 443));
173 }
174 super.start();
175 }
176 });
177 return lifecycles;
178 }
179
180
181
182 @Override
183 public void doAdditonalConfigurerValidations() {
184 for (final ServiceDefinition serviceDef : KSBConfigurer.this.services) {
185 serviceDef.validate();
186 }
187 }
188
189 @Override
190 public void doAdditionalContextStartedLogic() {
191 ServicePublisher servicePublisher = new ServicePublisher(getServices());
192 Lifecycle serviceBus = new ServiceDelegatingLifecycle(KsbApiServiceLocator.SERVICE_BUS);
193 Lifecycle threadPool = new ServiceDelegatingLifecycle(KSBConstants.ServiceNames.THREAD_POOL_SERVICE);
194 Lifecycle scheduledThreadPool = new ServiceDelegatingLifecycle(KSBConstants.ServiceNames.SCHEDULED_THREAD_POOL_SERVICE);
195
196 try {
197 servicePublisher.start();
198 internalLifecycles.add(servicePublisher);
199 serviceBus.start();
200 internalLifecycles.add(serviceBus);
201 threadPool.start();
202 internalLifecycles.add(threadPool);
203 scheduledThreadPool.start();
204 internalLifecycles.add(scheduledThreadPool);
205 } catch (Exception e) {
206 if (e instanceof RuntimeException) {
207 throw (RuntimeException)e;
208 }
209 throw new RiceRuntimeException("Failed to initialize KSB on context startup");
210 }
211
212 requeueMessages();
213 }
214
215 @Override
216 protected void doAdditionalModuleStartLogic() throws Exception {
217
218 LOG.info("Synchronizing remote services with service bus after KSB startup...");
219 long startTime = System.currentTimeMillis();
220 KsbApiServiceLocator.getServiceBus().synchronizeRemoteServices();
221 long endTime = System.currentTimeMillis();
222 LOG.info("...total time to synchronize remote services with service bus after KSB startup: " + (endTime - startTime));
223 }
224
225 @Override
226 protected void doAdditionalModuleStopLogic() throws Exception {
227 for (int index = internalLifecycles.size() - 1; index >= 0; index--) {
228 try {
229 internalLifecycles.get(index).stop();
230 } catch (Exception e) {
231 LOG.error("Failed to properly execute shutdown logic.", e);
232 }
233 }
234 }
235
236
237
238
239
240 private void requeueMessages() {
241 LOG.info("Refreshing Service Registry to export services to the bus.");
242 KsbApiServiceLocator.getServiceBus().synchronizeLocalServices();
243
244
245 MessageFetcher messageFetcher = new MessageFetcher((Integer) null);
246 KSBServiceLocator.getThreadPool().execute(messageFetcher);
247 }
248
249 protected boolean isMessagePersistenceEnabled() {
250 return ConfigContext.getCurrentContextConfig().getBooleanProperty(KSBConstants.Config.MESSAGE_PERSISTENCE, true);
251 }
252
253 protected boolean isBamEnabled() {
254 return ConfigContext.getCurrentContextConfig().getBooleanProperty(Config.BAM_ENABLED, false);
255 }
256
257 protected void configureScheduler() {
258 if (this.getExceptionMessagingScheduler() != null) {
259 LOG.info("Configuring injected exception messaging Scheduler");
260 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.INJECTED_EXCEPTION_MESSAGE_SCHEDULER_KEY, this.getExceptionMessagingScheduler());
261 }
262 }
263
264 protected void configureDataSource() {
265 if (isMessagePersistenceEnabled()) {
266 if (getMessageDataSource() != null) {
267 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_MESSAGE_DATASOURCE, getMessageDataSource());
268 }
269 if (getNonTransactionalMessageDataSource() != null) {
270 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_MESSAGE_NON_TRANSACTIONAL_DATASOURCE, getNonTransactionalMessageDataSource());
271 }
272 }
273 if (getRunMode().equals(RunMode.LOCAL)) {
274 if (getRegistryDataSource() != null) {
275 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_REGISTRY_DATASOURCE, getRegistryDataSource());
276 }
277 }
278 if (isBamEnabled()) {
279 if (getBamDataSource() != null) {
280 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_BAM_DATASOURCE, getBamDataSource());
281 }
282 }
283 }
284
285 protected void configurePlatformTransactionManager() {
286 if (getPlatformTransactionManager() == null) {
287 return;
288 }
289 ConfigContext.getCurrentContextConfig().putObject(RiceConstants.SPRING_TRANSACTION_MANAGER, getPlatformTransactionManager());
290 }
291
292 protected void configureAlternateEndpoints() {
293 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINT_LOCATIONS, getAlternateEndpointLocations());
294 ConfigContext.getCurrentContextConfig().putObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINTS, getAlternateEndpoints());
295 }
296
297 @Override
298 public void doAdditionalContextStoppedLogic() {
299 try {
300 HttpInvokerConnector.shutdownIdleConnectionTimeout();
301 } catch (Exception e) {
302 LOG.error("Failed to shutdown idle connection timeout evictor thread.", e);
303 }
304 cleanUpConfiguration();
305 }
306
307
308
309
310
311
312
313
314 protected void cleanUpConfiguration() {
315 ConfigContext.getCurrentContextConfig().removeObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINTS);
316 }
317
318 public List<ServiceDefinition> getServices() {
319 return this.services;
320 }
321
322 public void setServices(List<ServiceDefinition> javaServices) {
323 this.services = javaServices;
324 }
325
326 public DataSource getMessageDataSource() {
327 return this.messageDataSource;
328 }
329
330 public void setMessageDataSource(DataSource messageDataSource) {
331 this.messageDataSource = messageDataSource;
332 }
333
334 public DataSource getNonTransactionalMessageDataSource() {
335 return this.nonTransactionalMessageDataSource;
336 }
337
338 public void setNonTransactionalMessageDataSource(DataSource nonTransactionalMessageDataSource) {
339 this.nonTransactionalMessageDataSource = nonTransactionalMessageDataSource;
340 }
341
342 public DataSource getRegistryDataSource() {
343 return this.registryDataSource;
344 }
345
346 public void setRegistryDataSource(DataSource registryDataSource) {
347 this.registryDataSource = registryDataSource;
348 }
349
350 public DataSource getBamDataSource() {
351 return this.bamDataSource;
352 }
353
354 public void setBamDataSource(DataSource bamDataSource) {
355 this.bamDataSource = bamDataSource;
356 }
357
358 public Scheduler getExceptionMessagingScheduler() {
359 return this.exceptionMessagingScheduler;
360 }
361
362 public void setExceptionMessagingScheduler(Scheduler exceptionMessagingScheduler) {
363 this.exceptionMessagingScheduler = exceptionMessagingScheduler;
364 }
365
366 public PlatformTransactionManager getPlatformTransactionManager() {
367 return platformTransactionManager;
368 }
369
370 public void setPlatformTransactionManager(PlatformTransactionManager springTransactionManager) {
371 this.platformTransactionManager = springTransactionManager;
372 }
373
374 public List<AlternateEndpointLocation> getAlternateEndpointLocations() {
375 return this.alternateEndpointLocations;
376 }
377
378 public void setAlternateEndpointLocations(List<AlternateEndpointLocation> alternateEndpointLocations) {
379 this.alternateEndpointLocations = alternateEndpointLocations;
380 }
381
382 public List<AlternateEndpoint> getAlternateEndpoints() {
383 return this.alternateEndpoints;
384 }
385
386 public void setAlternateEndpoints(List<AlternateEndpoint> alternateEndpoints) {
387 this.alternateEndpoints = alternateEndpoints;
388 }
389
390 private final class ServicePublisher extends BaseLifecycle {
391
392 private final List<ServiceDefinition> serviceDefinitions;
393
394 ServicePublisher(List<ServiceDefinition> serviceDefinitions) {
395 this.serviceDefinitions = serviceDefinitions;
396 }
397
398 @Override
399 public void start() throws Exception {
400 if (serviceDefinitions != null && !serviceDefinitions.isEmpty()) {
401 LOG.debug("Configuring " + serviceDefinitions.size() + " services for application id " + CoreConfigHelper.getApplicationId() + " using config for classloader " + ClassLoaderUtils.getDefaultClassLoader());
402 KsbApiServiceLocator.getServiceBus().publishServices(serviceDefinitions, true);
403 super.start();
404 }
405 }
406
407 }
408
409 }