1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.ksb.impl.bus;
17
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.HashMap;
21 import java.util.HashSet;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Random;
26 import java.util.Set;
27 import java.util.concurrent.ScheduledFuture;
28 import java.util.concurrent.TimeUnit;
29
30 import javax.xml.namespace.QName;
31
32 import org.apache.commons.lang.StringUtils;
33 import org.apache.log4j.Logger;
34 import org.kuali.rice.core.api.config.property.Config;
35 import org.kuali.rice.core.api.config.property.ConfigContext;
36 import org.kuali.rice.core.api.lifecycle.BaseLifecycle;
37 import org.kuali.rice.ksb.api.bus.Endpoint;
38 import org.kuali.rice.ksb.api.bus.ServiceBus;
39 import org.kuali.rice.ksb.api.bus.ServiceConfiguration;
40 import org.kuali.rice.ksb.api.bus.ServiceDefinition;
41 import org.kuali.rice.ksb.api.registry.RemoveAndPublishResult;
42 import org.kuali.rice.ksb.api.registry.ServiceEndpoint;
43 import org.kuali.rice.ksb.api.registry.ServiceInfo;
44 import org.kuali.rice.ksb.api.registry.ServiceRegistry;
45 import org.kuali.rice.ksb.impl.bus.diff.CompleteServiceDiff;
46 import org.kuali.rice.ksb.impl.bus.diff.LocalServicesDiff;
47 import org.kuali.rice.ksb.impl.bus.diff.RemoteServicesDiff;
48 import org.kuali.rice.ksb.impl.bus.diff.ServiceRegistryDiffCalculator;
49 import org.kuali.rice.ksb.messaging.serviceexporters.ServiceExportManager;
50 import org.kuali.rice.ksb.messaging.threadpool.KSBScheduledPool;
51 import org.springframework.beans.factory.DisposableBean;
52 import org.springframework.beans.factory.InitializingBean;
53
54 public class ServiceBusImpl extends BaseLifecycle implements ServiceBus, InitializingBean, DisposableBean {
55
56 private static final Logger LOG = Logger.getLogger(ServiceBusImpl.class);
57
58 private final Object serviceLock = new Object();
59 private final Object synchronizeLock = new Object();
60 private final Random randomNumber = new Random();
61
62
63 private String instanceId;
64 private ServiceRegistry serviceRegistry;
65 private ServiceRegistryDiffCalculator diffCalculator;
66 private ServiceExportManager serviceExportManager;
67 private KSBScheduledPool scheduledPool;
68
69 private ScheduledFuture<?> registrySyncFuture;
70
71
72
73
74 private final Map<QName, LocalService> localServices;
75
76
77
78
79
80 private final Map<QName, Set<RemoteService>> clientRegistryCache;
81
82 public ServiceBusImpl() {
83 this.localServices = new HashMap<QName, LocalService>();
84 this.clientRegistryCache = new HashMap<QName, Set<RemoteService>>();
85 }
86
87 @Override
88 public void afterPropertiesSet() throws Exception {
89 if (StringUtils.isBlank(instanceId)) {
90 throw new IllegalStateException("a valid instanceId was not injected");
91 }
92 if (serviceRegistry == null) {
93 throw new IllegalStateException("serviceRegistry was not injected");
94 }
95 if (diffCalculator == null) {
96 throw new IllegalStateException("diffCalculator was not injected");
97 }
98 if (scheduledPool == null) {
99 throw new IllegalStateException("scheduledPool was not injected");
100 }
101 }
102
103 @Override
104 public void start() throws Exception {
105 startSynchronizationThread();
106 super.start();
107 }
108
109 protected boolean isDevMode() {
110 return ConfigContext.getCurrentContextConfig().getDevMode();
111 }
112
113 protected void startSynchronizationThread() {
114 synchronized (synchronizeLock) {
115 LOG.info("Starting Service Bus synchronization thread...");
116 if (!isDevMode()) {
117 int refreshRate = ConfigContext.getCurrentContextConfig().getRefreshRate();
118 Runnable runnable = new Runnable() {
119 public void run() {
120 try {
121 synchronize();
122 } catch (Throwable t) {
123 LOG.error("Failed to execute background service bus synchronization.", t);
124 }
125 }
126 };
127 this.registrySyncFuture = scheduledPool.scheduleWithFixedDelay(runnable, 30, refreshRate, TimeUnit.SECONDS);
128 }
129 LOG.info("...Service Bus synchronization thread successfully started.");
130 }
131 }
132
133 @Override
134 public void destroy() throws Exception {
135 LOG.info("Stopping the Service Bus...");
136 stopSynchronizationThread();
137 serviceRegistry.takeInstanceOffline(getInstanceId());
138 LOG.info("...Service Bus successfully stopped.");
139 }
140
141 protected void stopSynchronizationThread() {
142 synchronized (synchronizeLock) {
143
144 if (this.registrySyncFuture != null) {
145 if (!this.registrySyncFuture.cancel(false)) {
146 LOG.warn("Failed to cancel registry sychronization.");
147 }
148 this.registrySyncFuture = null;
149 }
150 }
151 }
152
153 @Override
154 public String getInstanceId() {
155 return this.instanceId;
156 }
157
158 public void setInstanceId(String instanceId) {
159 this.instanceId = instanceId;
160 }
161
162 @Override
163 public List<Endpoint> getEndpoints(QName serviceName) {
164 if (serviceName == null) {
165 throw new IllegalArgumentException("serviceName cannot be null");
166 }
167 List<Endpoint> endpoints = new ArrayList<Endpoint>();
168 synchronized (serviceLock) {
169 endpoints.addAll(getRemoteEndpoints(serviceName));
170 Endpoint localEndpoint = getLocalEndpoint(serviceName);
171 if (localEndpoint != null) {
172 for (Iterator<Endpoint> iterator = endpoints.iterator(); iterator.hasNext();) {
173 Endpoint endpoint = (Endpoint) iterator.next();
174 if (localEndpoint.getServiceConfiguration().equals(endpoint.getServiceConfiguration())) {
175 iterator.remove();
176 break;
177 }
178 }
179
180 endpoints.add(0, localEndpoint);
181 }
182 }
183 return Collections.unmodifiableList(endpoints);
184 }
185
186 @Override
187 public List<Endpoint> getRemoteEndpoints(QName serviceName) {
188 if (serviceName == null) {
189 throw new IllegalArgumentException("serviceName cannot be null");
190 }
191 List<Endpoint> endpoints = new ArrayList<Endpoint>();
192 synchronized (serviceLock) {
193 Set<RemoteService> remoteServices = clientRegistryCache.get(serviceName);
194 if (remoteServices != null) {
195 for (RemoteService remoteService : remoteServices) {
196 endpoints.add(remoteService.getEndpoint());
197 }
198 }
199 }
200 return Collections.unmodifiableList(endpoints);
201 }
202
203 @Override
204 public Endpoint getLocalEndpoint(QName serviceName) {
205 if (serviceName == null) {
206 throw new IllegalArgumentException("serviceName cannot be null");
207 }
208 synchronized (serviceLock) {
209 LocalService localService = localServices.get(serviceName);
210 if (localService != null) {
211 return localService.getEndpoint();
212 }
213 return null;
214 }
215 }
216
217 @Override
218 public Map<QName, Endpoint> getLocalEndpoints() {
219 Map<QName, Endpoint> localEndpoints = new HashMap<QName, Endpoint>();
220 synchronized (serviceLock) {
221 for (QName localServiceName : localServices.keySet()) {
222 LocalService localService = localServices.get(localServiceName);
223 localEndpoints.put(localServiceName, localService.getEndpoint());
224 }
225 }
226 return Collections.unmodifiableMap(localEndpoints);
227 }
228
229 @Override
230 public List<Endpoint> getAllEndpoints() {
231 List<Endpoint> allEndpoints = new ArrayList<Endpoint>();
232 synchronized (serviceLock) {
233 for (QName serviceName : this.localServices.keySet()) {
234 allEndpoints.add(this.localServices.get(serviceName).getEndpoint());
235 }
236 for (QName serviceName : this.clientRegistryCache.keySet()) {
237 Set<RemoteService> remoteServices = clientRegistryCache.get(serviceName);
238 for (RemoteService remoteService : remoteServices) {
239 allEndpoints.add(remoteService.getEndpoint());
240 }
241 }
242 }
243 return Collections.unmodifiableList(allEndpoints);
244 }
245
246 @Override
247 public Endpoint getEndpoint(QName serviceName) {
248 return getEndpoint(serviceName, null);
249 }
250
251 @Override
252 public Endpoint getEndpoint(QName serviceName, String applicationId) {
253 if (serviceName == null) {
254 throw new IllegalArgumentException("serviceName cannot be null");
255 }
256 Endpoint availableEndpoint = null;
257 synchronized (serviceLock) {
258
259 availableEndpoint = getLocalEndpoint(serviceName);
260 if (availableEndpoint == null || (!StringUtils.isBlank(applicationId) && !availableEndpoint.getServiceConfiguration().getApplicationId().equals(applicationId))) {
261
262 Set<RemoteService> remoteServices = clientRegistryCache.get(serviceName);
263 remoteServices = filterByApplicationId(applicationId, remoteServices);
264 if (remoteServices != null && !remoteServices.isEmpty()) {
265
266 RemoteService[] remoteServiceArray = remoteServices.toArray(new RemoteService[0]);
267 RemoteService availableRemoteService = remoteServiceArray[this.randomNumber.nextInt(remoteServiceArray.length)];
268 availableEndpoint = availableRemoteService.getEndpoint();
269 }
270 }
271 }
272 return availableEndpoint;
273 }
274
275 protected Set<RemoteService> filterByApplicationId(String applicationId, Set<RemoteService> remoteServices) {
276 if (StringUtils.isBlank(applicationId) || remoteServices == null || remoteServices.isEmpty()) {
277 return remoteServices;
278 }
279 Set<RemoteService> filtered = new HashSet<RemoteService>();
280 for (RemoteService remoteService : remoteServices) {
281 if (remoteService.getServiceInfo().getApplicationId().equals(applicationId)) {
282 filtered.add(remoteService);
283 }
284 }
285 return filtered;
286 }
287
288 @Override
289 public Endpoint getConfiguredEndpoint(ServiceConfiguration serviceConfiguration) {
290 if (serviceConfiguration == null) {
291 throw new IllegalArgumentException("serviceConfiguration cannot be null");
292 }
293 synchronized (serviceLock) {
294 Endpoint localEndpoint = getLocalEndpoint(serviceConfiguration.getServiceName());
295 if (localEndpoint != null && localEndpoint.getServiceConfiguration().equals(serviceConfiguration)) {
296 return localEndpoint;
297 }
298 List<Endpoint> remoteEndpoints = getRemoteEndpoints(serviceConfiguration.getServiceName());
299 for (Endpoint remoteEndpoint : remoteEndpoints) {
300 if (remoteEndpoint.getServiceConfiguration().equals(serviceConfiguration)) {
301 return remoteEndpoint;
302 }
303 }
304 }
305 return null;
306 }
307
308 @Override
309 public Object getService(QName serviceName) {
310 return getService(serviceName, null);
311 }
312
313 @Override
314 public Object getService(QName serviceName, String applicationId) {
315 Endpoint availableEndpoint = getEndpoint(serviceName, applicationId);
316 if (availableEndpoint == null) {
317 return null;
318 }
319 return availableEndpoint.getService();
320 }
321
322 @Override
323 public ServiceConfiguration publishService(ServiceDefinition serviceDefinition, boolean synchronize) {
324 if (serviceDefinition == null) {
325 throw new IllegalArgumentException("serviceDefinition cannot be null");
326 }
327 LocalService localService = new LocalService(getInstanceId(), serviceDefinition);
328 synchronized (serviceLock) {
329 serviceExportManager.exportService(serviceDefinition);
330 localServices.put(serviceDefinition.getServiceName(), localService);
331 }
332 if (synchronize) {
333 synchronize();
334 }
335 return localService.getEndpoint().getServiceConfiguration();
336 }
337
338 @Override
339 public List<ServiceConfiguration> publishServices(List<ServiceDefinition> serviceDefinitions, boolean synchronize) {
340 if (serviceDefinitions == null) {
341 throw new IllegalArgumentException("serviceDefinitions list cannot be null");
342 }
343 List<ServiceConfiguration> serviceConfigurations = new ArrayList<ServiceConfiguration>();
344 synchronized (serviceLock) {
345 for (ServiceDefinition serviceDefinition : serviceDefinitions) {
346 ServiceConfiguration serviceConfiguration = publishService(serviceDefinition, false);
347 serviceConfigurations.add(serviceConfiguration);
348 }
349 }
350 if (synchronize) {
351 synchronize();
352 }
353 return Collections.unmodifiableList(serviceConfigurations);
354 }
355
356 @Override
357 public boolean removeService(QName serviceName, boolean synchronize) {
358 if (serviceName == null) {
359 throw new IllegalArgumentException("serviceName cannot be null");
360 }
361 boolean serviceRemoved = false;
362 synchronized (serviceLock) {
363 LocalService localService = localServices.remove(serviceName);
364 serviceRemoved = localService != null;
365 serviceExportManager.removeService(serviceName);
366 }
367 if (serviceRemoved && synchronize) {
368 synchronize();
369 }
370 return serviceRemoved;
371 }
372
373 @Override
374 public List<Boolean> removeServices(List<QName> serviceNames, boolean synchronize) {
375 if (serviceNames == null) {
376 throw new IllegalArgumentException("serviceNames cannot be null");
377 }
378 boolean serviceRemoved = false;
379 List<Boolean> servicesRemoved = new ArrayList<Boolean>();
380 synchronized (serviceLock) {
381 for (QName serviceName : serviceNames) {
382 serviceExportManager.removeService(serviceName);
383 LocalService localService = localServices.remove(serviceName);
384 if (localService != null) {
385 servicesRemoved.add(Boolean.TRUE);
386 serviceRemoved = true;
387 } else {
388 servicesRemoved.add(Boolean.FALSE);
389 }
390 }
391 }
392 if (serviceRemoved && synchronize) {
393 synchronize();
394 }
395 return servicesRemoved;
396 }
397
398 protected void synchronizeAndProcess(SyncProcessor processor) {
399 if (!isDevMode()) {
400 synchronized (synchronizeLock) {
401 List<LocalService> localServicesList;
402 List<RemoteService> clientRegistryCacheList;
403 synchronized (serviceLock) {
404
405 localServicesList = new ArrayList<LocalService>(this.localServices.values());
406 clientRegistryCacheList = new ArrayList<RemoteService>();
407 for (Set<RemoteService> remoteServices : this.clientRegistryCache.values()) {
408 clientRegistryCacheList.addAll(remoteServices);
409 }
410 }
411 CompleteServiceDiff serviceDiff = diffCalculator.diffServices(getInstanceId(), localServicesList, clientRegistryCacheList);
412 logCompleteServiceDiff(serviceDiff);
413 processor.sync(serviceDiff);
414 }
415 }
416 }
417
418 @Override
419 public void synchronize() {
420 synchronizeAndProcess(new SyncProcessor() {
421 @Override
422 public void sync(CompleteServiceDiff diff) {
423 RemoteServicesDiff remoteServicesDiff = diff.getRemoteServicesDiff();
424 processRemoteServiceDiff(remoteServicesDiff);
425 LocalServicesDiff localServicesDiff = diff.getLocalServicesDiff();
426 processLocalServiceDiff(localServicesDiff);
427 }
428 });
429 }
430
431 @Override
432 public void synchronizeRemoteServices() {
433 synchronizeAndProcess(new SyncProcessor() {
434 @Override
435 public void sync(CompleteServiceDiff diff) {
436 RemoteServicesDiff remoteServicesDiff = diff.getRemoteServicesDiff();
437 processRemoteServiceDiff(remoteServicesDiff);
438 }
439 });
440 }
441
442 @Override
443 public void synchronizeLocalServices() {
444 synchronizeAndProcess(new SyncProcessor() {
445 @Override
446 public void sync(CompleteServiceDiff diff) {
447 LocalServicesDiff localServicesDiff = diff.getLocalServicesDiff();
448 processLocalServiceDiff(localServicesDiff);
449 }
450 });
451 }
452
453 protected void logCompleteServiceDiff(CompleteServiceDiff serviceDiff) {
454 RemoteServicesDiff remoteServicesDiff = serviceDiff.getRemoteServicesDiff();
455 int newServices = remoteServicesDiff.getNewServices().size();
456 int removedServices = remoteServicesDiff.getRemovedServices().size();
457
458 LocalServicesDiff localServicesDiff = serviceDiff.getLocalServicesDiff();
459 int servicesToPublish = localServicesDiff.getLocalServicesToPublish().size();
460 int servicesToUpdate = localServicesDiff.getLocalServicesToUpdate().size();
461 int servicesToRemove = localServicesDiff.getServicesToRemoveFromRegistry().size();
462
463 if (newServices + removedServices + servicesToPublish + servicesToUpdate + servicesToRemove > 0) {
464 LOG.info("Found service changes during synchronization: remoteNewServices=" + newServices +
465 ", remoteRemovedServices=" + removedServices +
466 ", localServicesToPublish=" + servicesToPublish +
467 ", localServicesToUpdate=" + servicesToUpdate +
468 ", localServicesToRemove=" + servicesToRemove);
469 }
470 }
471
472 protected void processRemoteServiceDiff(RemoteServicesDiff remoteServicesDiff) {
473
474
475 synchronized (serviceLock) {
476
477 List<RemoteService> removedServices = remoteServicesDiff.getRemovedServices();
478 for (RemoteService removedRemoteService : removedServices) {
479 Set<RemoteService> remoteServiceSet = this.clientRegistryCache.get(removedRemoteService.getServiceName());
480 if (remoteServiceSet != null) {
481 boolean wasRemoved = remoteServiceSet.remove(removedRemoteService);
482 if (!wasRemoved) {
483 LOG.warn("Failed to remove remoteService during synchronization: " + removedRemoteService);
484 }
485 }
486 }
487 List<ServiceInfo> newServices = remoteServicesDiff.getNewServices();
488 for (ServiceInfo newService : newServices) {
489 Set<RemoteService> remoteServiceSet = clientRegistryCache.get(newService.getServiceName());
490 if (remoteServiceSet == null) {
491 remoteServiceSet = new HashSet<RemoteService>();
492 clientRegistryCache.put(newService.getServiceName(), remoteServiceSet);
493 }
494 remoteServiceSet.add(new RemoteService(newService, this.serviceRegistry));
495 }
496 }
497 }
498
499 protected void processLocalServiceDiff(LocalServicesDiff localServicesDiff) {
500 List<String> removeServiceEndpointIds = new ArrayList<String>();
501 List<ServiceEndpoint> publishServiceEndpoints = new ArrayList<ServiceEndpoint>();
502 for (ServiceInfo serviceToRemove : localServicesDiff.getServicesToRemoveFromRegistry()) {
503 removeServiceEndpointIds.add(serviceToRemove.getServiceId());
504 }
505 for (LocalService localService : localServicesDiff.getLocalServicesToPublish()) {
506 publishServiceEndpoints.add(localService.getServiceEndpoint());
507 }
508 for (LocalService localService : localServicesDiff.getLocalServicesToUpdate().keySet()) {
509 ServiceInfo registryServiceInfo = localServicesDiff.getLocalServicesToUpdate().get(localService);
510 publishServiceEndpoints.add(rebuildServiceEndpointForUpdate(localService.getServiceEndpoint(), registryServiceInfo));
511 }
512 boolean batchMode = ConfigContext.getCurrentContextConfig().getBooleanProperty(Config.BATCH_MODE, false);
513 if (!batchMode && (!removeServiceEndpointIds.isEmpty() || !publishServiceEndpoints.isEmpty())) {
514 RemoveAndPublishResult result = this.serviceRegistry.removeAndPublish(removeServiceEndpointIds, publishServiceEndpoints);
515
516 if (!result.getServicesPublished().isEmpty()) {
517 synchronized (serviceLock) {
518 for (ServiceEndpoint publishedService : result.getServicesPublished()) {
519 rebuildLocalServiceEndpointAfterPublishing(publishedService);
520 }
521 }
522 }
523 }
524 }
525
526 protected ServiceEndpoint rebuildServiceEndpointForUpdate(ServiceEndpoint originalEndpoint, ServiceInfo registryServiceInfo) {
527 ServiceEndpoint.Builder builder = ServiceEndpoint.Builder.create(originalEndpoint);
528 builder.getInfo().setServiceId(registryServiceInfo.getServiceId());
529 builder.getInfo().setServiceDescriptorId(registryServiceInfo.getServiceDescriptorId());
530 builder.getInfo().setVersionNumber(registryServiceInfo.getVersionNumber());
531 builder.getDescriptor().setId(registryServiceInfo.getServiceDescriptorId());
532 return builder.build();
533 }
534
535 protected void rebuildLocalServiceEndpointAfterPublishing(ServiceEndpoint publishedService) {
536
537 QName serviceName = publishedService.getInfo().getServiceName();
538 if (localServices.containsKey(serviceName)) {
539 LocalService newLocalService = new LocalService(localServices.get(serviceName), publishedService);
540 localServices.put(serviceName, newLocalService);
541 }
542 }
543
544 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
545 this.serviceRegistry = serviceRegistry;
546 }
547
548 public void setDiffCalculator(ServiceRegistryDiffCalculator diffCalculator) {
549 this.diffCalculator = diffCalculator;
550 }
551
552 public void setServiceExportManager(ServiceExportManager serviceExportManager) {
553 this.serviceExportManager = serviceExportManager;
554 }
555
556 public void setScheduledPool(KSBScheduledPool scheduledPool) {
557 this.scheduledPool = scheduledPool;
558 }
559
560 private static interface SyncProcessor {
561 void sync(CompleteServiceDiff diff);
562 }
563
564 }