1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.ksb.impl.bus;
17
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.HashMap;
21 import java.util.HashSet;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Random;
26 import java.util.Set;
27 import java.util.concurrent.ScheduledFuture;
28 import java.util.concurrent.TimeUnit;
29
30 import javax.xml.namespace.QName;
31
32 import org.apache.commons.lang.StringUtils;
33 import org.apache.log4j.Logger;
34 import org.kuali.rice.core.api.config.property.Config;
35 import org.kuali.rice.core.api.config.property.ConfigContext;
36 import org.kuali.rice.core.api.lifecycle.BaseLifecycle;
37 import org.kuali.rice.ksb.api.bus.Endpoint;
38 import org.kuali.rice.ksb.api.bus.ServiceBus;
39 import org.kuali.rice.ksb.api.bus.ServiceConfiguration;
40 import org.kuali.rice.ksb.api.bus.ServiceDefinition;
41 import org.kuali.rice.ksb.api.registry.RemoveAndPublishResult;
42 import org.kuali.rice.ksb.api.registry.ServiceEndpoint;
43 import org.kuali.rice.ksb.api.registry.ServiceInfo;
44 import org.kuali.rice.ksb.api.registry.ServiceRegistry;
45 import org.kuali.rice.ksb.impl.bus.diff.CompleteServiceDiff;
46 import org.kuali.rice.ksb.impl.bus.diff.LocalServicesDiff;
47 import org.kuali.rice.ksb.impl.bus.diff.RemoteServicesDiff;
48 import org.kuali.rice.ksb.impl.bus.diff.ServiceRegistryDiffCalculator;
49 import org.kuali.rice.ksb.messaging.serviceexporters.ServiceExportManager;
50 import org.kuali.rice.ksb.messaging.threadpool.KSBScheduledPool;
51 import org.springframework.beans.factory.DisposableBean;
52 import org.springframework.beans.factory.InitializingBean;
53
54 public class ServiceBusImpl extends BaseLifecycle implements ServiceBus, InitializingBean, DisposableBean {
55
56 private static final Logger LOG = Logger.getLogger(ServiceBusImpl.class);
57
58 private final Object serviceLock = new Object();
59 private final Object synchronizeLock = new Object();
60 private final Random randomNumber = new Random();
61
62
63 private String instanceId;
64 private ServiceRegistry serviceRegistry;
65 private ServiceRegistryDiffCalculator diffCalculator;
66 private ServiceExportManager serviceExportManager;
67 private KSBScheduledPool scheduledPool;
68
69 private ScheduledFuture<?> registrySyncFuture;
70
71
72
73
74 private final Map<QName, LocalService> localServices;
75
76
77
78
79
80 private final Map<QName, Set<RemoteService>> clientRegistryCache;
81
82 public ServiceBusImpl() {
83 this.localServices = new HashMap<QName, LocalService>();
84 this.clientRegistryCache = new HashMap<QName, Set<RemoteService>>();
85 }
86
87 @Override
88 public void afterPropertiesSet() throws Exception {
89 if (StringUtils.isBlank(instanceId)) {
90 throw new IllegalStateException("a valid instanceId was not injected");
91 }
92 if (serviceRegistry == null) {
93 throw new IllegalStateException("serviceRegistry was not injected");
94 }
95 if (diffCalculator == null) {
96 throw new IllegalStateException("diffCalculator was not injected");
97 }
98 if (scheduledPool == null) {
99 throw new IllegalStateException("scheduledPool was not injected");
100 }
101 }
102
103 @Override
104 public void start() throws Exception {
105 startSynchronizationThread();
106 super.start();
107 }
108
109 protected boolean isDevMode() {
110 return ConfigContext.getCurrentContextConfig().getDevMode();
111 }
112
113 protected void startSynchronizationThread() {
114 synchronized (synchronizeLock) {
115 LOG.info("Starting Service Bus synchronization thread...");
116 if (!isDevMode()) {
117 int refreshRate = ConfigContext.getCurrentContextConfig().getRefreshRate();
118 Runnable runnable = new Runnable() {
119 public void run() {
120 try {
121 synchronize();
122 } catch (Throwable t) {
123 LOG.error("Failed to execute background service bus synchronization.", t);
124 }
125 }
126 };
127 this.registrySyncFuture = scheduledPool.scheduleWithFixedDelay(runnable, 30, refreshRate, TimeUnit.SECONDS);
128 }
129 LOG.info("...Service Bus synchronization thread successfully started.");
130 }
131 }
132
133 @Override
134 public void destroy() throws Exception {
135 LOG.info("Stopping the Service Bus...");
136 stopSynchronizationThread();
137 serviceRegistry.takeInstanceOffline(getInstanceId());
138 LOG.info("...Service Bus successfully stopped.");
139 }
140
141 protected void stopSynchronizationThread() {
142 synchronized (synchronizeLock) {
143
144 if (this.registrySyncFuture != null) {
145 if (!this.registrySyncFuture.cancel(false)) {
146 LOG.warn("Failed to cancel registry sychronization.");
147 }
148 this.registrySyncFuture = null;
149 }
150 }
151 }
152
153 @Override
154 public String getInstanceId() {
155 return this.instanceId;
156 }
157
158 public void setInstanceId(String instanceId) {
159 this.instanceId = instanceId;
160 }
161
162 @Override
163 public List<Endpoint> getEndpoints(QName serviceName) {
164 return getEndpoints(serviceName, null);
165 }
166
167 @Override
168 public List<Endpoint> getEndpoints(QName serviceName, String applicationId) {
169 if (serviceName == null) {
170 throw new IllegalArgumentException("serviceName cannot be null");
171 }
172 List<Endpoint> endpoints = new ArrayList<Endpoint>();
173 synchronized (serviceLock) {
174 endpoints.addAll(getRemoteEndpoints(serviceName));
175 Endpoint localEndpoint = getLocalEndpoint(serviceName);
176 if (localEndpoint != null) {
177 for (Iterator<Endpoint> iterator = endpoints.iterator(); iterator.hasNext();) {
178 Endpoint endpoint = (Endpoint) iterator.next();
179 if (localEndpoint.getServiceConfiguration().equals(endpoint.getServiceConfiguration())) {
180 iterator.remove();
181 break;
182 }
183 }
184 if(StringUtils.isBlank(applicationId) || StringUtils.equals(localEndpoint.getServiceConfiguration().getApplicationId(), applicationId)) {
185
186 endpoints.add(0, localEndpoint);
187 }
188 }
189 if(StringUtils.isNotBlank(applicationId)) {
190 for (Iterator<Endpoint> iterator = endpoints.iterator(); iterator.hasNext();) {
191 Endpoint endpoint = (Endpoint) iterator.next();
192 if(!StringUtils.equals(endpoint.getServiceConfiguration().getApplicationId(), applicationId)) {
193 iterator.remove();
194 }
195 }
196 }
197 }
198 return Collections.unmodifiableList(endpoints);
199 }
200
201 @Override
202 public List<Endpoint> getRemoteEndpoints(QName serviceName) {
203 if (serviceName == null) {
204 throw new IllegalArgumentException("serviceName cannot be null");
205 }
206 List<Endpoint> endpoints = new ArrayList<Endpoint>();
207 synchronized (serviceLock) {
208 Set<RemoteService> remoteServices = clientRegistryCache.get(serviceName);
209 if (remoteServices != null) {
210 for (RemoteService remoteService : remoteServices) {
211 endpoints.add(remoteService.getEndpoint());
212 }
213 }
214 }
215 return Collections.unmodifiableList(endpoints);
216 }
217
218 @Override
219 public Endpoint getLocalEndpoint(QName serviceName) {
220 if (serviceName == null) {
221 throw new IllegalArgumentException("serviceName cannot be null");
222 }
223 synchronized (serviceLock) {
224 LocalService localService = localServices.get(serviceName);
225 if (localService != null) {
226 return localService.getEndpoint();
227 }
228 return null;
229 }
230 }
231
232 @Override
233 public Map<QName, Endpoint> getLocalEndpoints() {
234 Map<QName, Endpoint> localEndpoints = new HashMap<QName, Endpoint>();
235 synchronized (serviceLock) {
236 for (QName localServiceName : localServices.keySet()) {
237 LocalService localService = localServices.get(localServiceName);
238 localEndpoints.put(localServiceName, localService.getEndpoint());
239 }
240 }
241 return Collections.unmodifiableMap(localEndpoints);
242 }
243
244 @Override
245 public List<Endpoint> getAllEndpoints() {
246 List<Endpoint> allEndpoints = new ArrayList<Endpoint>();
247 synchronized (serviceLock) {
248 for (QName serviceName : this.localServices.keySet()) {
249 allEndpoints.add(this.localServices.get(serviceName).getEndpoint());
250 }
251 for (QName serviceName : this.clientRegistryCache.keySet()) {
252 Set<RemoteService> remoteServices = clientRegistryCache.get(serviceName);
253 for (RemoteService remoteService : remoteServices) {
254 allEndpoints.add(remoteService.getEndpoint());
255 }
256 }
257 }
258 return Collections.unmodifiableList(allEndpoints);
259 }
260
261 @Override
262 public Endpoint getEndpoint(QName serviceName) {
263 return getEndpoint(serviceName, null);
264 }
265
266 @Override
267 public Endpoint getEndpoint(QName serviceName, String applicationId) {
268 if (serviceName == null) {
269 throw new IllegalArgumentException("serviceName cannot be null");
270 }
271 Endpoint availableEndpoint = null;
272 synchronized (serviceLock) {
273
274 availableEndpoint = getLocalEndpoint(serviceName);
275 if (availableEndpoint == null || (!StringUtils.isBlank(applicationId) && !availableEndpoint.getServiceConfiguration().getApplicationId().equals(applicationId))) {
276
277 Set<RemoteService> remoteServices = clientRegistryCache.get(serviceName);
278 remoteServices = filterByApplicationId(applicationId, remoteServices);
279 if (remoteServices != null && !remoteServices.isEmpty()) {
280
281 RemoteService[] remoteServiceArray = remoteServices.toArray(new RemoteService[0]);
282 RemoteService availableRemoteService = remoteServiceArray[this.randomNumber.nextInt(remoteServiceArray.length)];
283 availableEndpoint = availableRemoteService.getEndpoint();
284 }
285 }
286 }
287 return availableEndpoint;
288 }
289
290 protected Set<RemoteService> filterByApplicationId(String applicationId, Set<RemoteService> remoteServices) {
291 if (StringUtils.isBlank(applicationId) || remoteServices == null || remoteServices.isEmpty()) {
292 return remoteServices;
293 }
294 Set<RemoteService> filtered = new HashSet<RemoteService>();
295 for (RemoteService remoteService : remoteServices) {
296 if (remoteService.getServiceInfo().getApplicationId().equals(applicationId)) {
297 filtered.add(remoteService);
298 }
299 }
300 return filtered;
301 }
302
303 @Override
304 public Endpoint getConfiguredEndpoint(ServiceConfiguration serviceConfiguration) {
305 if (serviceConfiguration == null) {
306 throw new IllegalArgumentException("serviceConfiguration cannot be null");
307 }
308 synchronized (serviceLock) {
309 Endpoint localEndpoint = getLocalEndpoint(serviceConfiguration.getServiceName());
310 if (localEndpoint != null && localEndpoint.getServiceConfiguration().equals(serviceConfiguration)) {
311 return localEndpoint;
312 }
313 List<Endpoint> remoteEndpoints = getRemoteEndpoints(serviceConfiguration.getServiceName());
314 for (Endpoint remoteEndpoint : remoteEndpoints) {
315 if (remoteEndpoint.getServiceConfiguration().equals(serviceConfiguration)) {
316 return remoteEndpoint;
317 }
318 }
319 }
320 return null;
321 }
322
323 @Override
324 public Object getService(QName serviceName) {
325 return getService(serviceName, null);
326 }
327
328 @Override
329 public Object getService(QName serviceName, String applicationId) {
330 Endpoint availableEndpoint = getEndpoint(serviceName, applicationId);
331 if (availableEndpoint == null) {
332 return null;
333 }
334 return availableEndpoint.getService();
335 }
336
337 @Override
338 public ServiceConfiguration publishService(ServiceDefinition serviceDefinition, boolean synchronize) {
339 if (serviceDefinition == null) {
340 throw new IllegalArgumentException("serviceDefinition cannot be null");
341 }
342 LocalService localService = new LocalService(getInstanceId(), serviceDefinition);
343 synchronized (serviceLock) {
344 serviceExportManager.exportService(serviceDefinition);
345 localServices.put(serviceDefinition.getServiceName(), localService);
346 }
347 if (synchronize) {
348 synchronize();
349 }
350 return localService.getEndpoint().getServiceConfiguration();
351 }
352
353 @Override
354 public List<ServiceConfiguration> publishServices(List<ServiceDefinition> serviceDefinitions, boolean synchronize) {
355 if (serviceDefinitions == null) {
356 throw new IllegalArgumentException("serviceDefinitions list cannot be null");
357 }
358 List<ServiceConfiguration> serviceConfigurations = new ArrayList<ServiceConfiguration>();
359 synchronized (serviceLock) {
360 for (ServiceDefinition serviceDefinition : serviceDefinitions) {
361 ServiceConfiguration serviceConfiguration = publishService(serviceDefinition, false);
362 serviceConfigurations.add(serviceConfiguration);
363 }
364 }
365 if (synchronize) {
366 synchronize();
367 }
368 return Collections.unmodifiableList(serviceConfigurations);
369 }
370
371 @Override
372 public boolean removeService(QName serviceName, boolean synchronize) {
373 if (serviceName == null) {
374 throw new IllegalArgumentException("serviceName cannot be null");
375 }
376 boolean serviceRemoved = false;
377 synchronized (serviceLock) {
378 LocalService localService = localServices.remove(serviceName);
379 serviceRemoved = localService != null;
380 serviceExportManager.removeService(serviceName);
381 }
382 if (serviceRemoved && synchronize) {
383 synchronize();
384 }
385 return serviceRemoved;
386 }
387
388 @Override
389 public List<Boolean> removeServices(List<QName> serviceNames, boolean synchronize) {
390 if (serviceNames == null) {
391 throw new IllegalArgumentException("serviceNames cannot be null");
392 }
393 boolean serviceRemoved = false;
394 List<Boolean> servicesRemoved = new ArrayList<Boolean>();
395 synchronized (serviceLock) {
396 for (QName serviceName : serviceNames) {
397 serviceExportManager.removeService(serviceName);
398 LocalService localService = localServices.remove(serviceName);
399 if (localService != null) {
400 servicesRemoved.add(Boolean.TRUE);
401 serviceRemoved = true;
402 } else {
403 servicesRemoved.add(Boolean.FALSE);
404 }
405 }
406 }
407 if (serviceRemoved && synchronize) {
408 synchronize();
409 }
410 return servicesRemoved;
411 }
412
413 protected void synchronizeAndProcess(SyncProcessor processor) {
414 if (!isDevMode()) {
415 synchronized (synchronizeLock) {
416 List<LocalService> localServicesList;
417 List<RemoteService> clientRegistryCacheList;
418 synchronized (serviceLock) {
419
420 localServicesList = new ArrayList<LocalService>(this.localServices.values());
421 clientRegistryCacheList = new ArrayList<RemoteService>();
422 for (Set<RemoteService> remoteServices : this.clientRegistryCache.values()) {
423 clientRegistryCacheList.addAll(remoteServices);
424 }
425 }
426 CompleteServiceDiff serviceDiff = diffCalculator.diffServices(getInstanceId(), localServicesList, clientRegistryCacheList);
427 logCompleteServiceDiff(serviceDiff);
428 processor.sync(serviceDiff);
429 }
430 }
431 }
432
433 @Override
434 public void synchronize() {
435 synchronizeAndProcess(new SyncProcessor() {
436 @Override
437 public void sync(CompleteServiceDiff diff) {
438 RemoteServicesDiff remoteServicesDiff = diff.getRemoteServicesDiff();
439 processRemoteServiceDiff(remoteServicesDiff);
440 LocalServicesDiff localServicesDiff = diff.getLocalServicesDiff();
441 processLocalServiceDiff(localServicesDiff);
442 }
443 });
444 }
445
446 @Override
447 public void synchronizeRemoteServices() {
448 synchronizeAndProcess(new SyncProcessor() {
449 @Override
450 public void sync(CompleteServiceDiff diff) {
451 RemoteServicesDiff remoteServicesDiff = diff.getRemoteServicesDiff();
452 processRemoteServiceDiff(remoteServicesDiff);
453 }
454 });
455 }
456
457 @Override
458 public void synchronizeLocalServices() {
459 synchronizeAndProcess(new SyncProcessor() {
460 @Override
461 public void sync(CompleteServiceDiff diff) {
462 LocalServicesDiff localServicesDiff = diff.getLocalServicesDiff();
463 processLocalServiceDiff(localServicesDiff);
464 }
465 });
466 }
467
468 protected void logCompleteServiceDiff(CompleteServiceDiff serviceDiff) {
469 RemoteServicesDiff remoteServicesDiff = serviceDiff.getRemoteServicesDiff();
470 int newServices = remoteServicesDiff.getNewServices().size();
471 int removedServices = remoteServicesDiff.getRemovedServices().size();
472
473 LocalServicesDiff localServicesDiff = serviceDiff.getLocalServicesDiff();
474 int servicesToPublish = localServicesDiff.getLocalServicesToPublish().size();
475 int servicesToUpdate = localServicesDiff.getLocalServicesToUpdate().size();
476 int servicesToRemove = localServicesDiff.getServicesToRemoveFromRegistry().size();
477
478 if (newServices + removedServices + servicesToPublish + servicesToUpdate + servicesToRemove > 0) {
479 LOG.info("Found service changes during synchronization: remoteNewServices=" + newServices +
480 ", remoteRemovedServices=" + removedServices +
481 ", localServicesToPublish=" + servicesToPublish +
482 ", localServicesToUpdate=" + servicesToUpdate +
483 ", localServicesToRemove=" + servicesToRemove);
484 }
485 }
486
487 protected void processRemoteServiceDiff(RemoteServicesDiff remoteServicesDiff) {
488
489
490 synchronized (serviceLock) {
491
492 List<RemoteService> removedServices = remoteServicesDiff.getRemovedServices();
493 for (RemoteService removedRemoteService : removedServices) {
494 Set<RemoteService> remoteServiceSet = this.clientRegistryCache.get(removedRemoteService.getServiceName());
495 if (remoteServiceSet != null) {
496 boolean wasRemoved = remoteServiceSet.remove(removedRemoteService);
497 if (!wasRemoved) {
498 LOG.warn("Failed to remove remoteService during synchronization: " + removedRemoteService);
499 }
500 }
501 }
502 List<ServiceInfo> newServices = remoteServicesDiff.getNewServices();
503 for (ServiceInfo newService : newServices) {
504 Set<RemoteService> remoteServiceSet = clientRegistryCache.get(newService.getServiceName());
505 if (remoteServiceSet == null) {
506 remoteServiceSet = new HashSet<RemoteService>();
507 clientRegistryCache.put(newService.getServiceName(), remoteServiceSet);
508 }
509 remoteServiceSet.add(new RemoteService(newService, this.serviceRegistry));
510 }
511 }
512 }
513
514 protected void processLocalServiceDiff(LocalServicesDiff localServicesDiff) {
515 List<String> removeServiceEndpointIds = new ArrayList<String>();
516 List<ServiceEndpoint> publishServiceEndpoints = new ArrayList<ServiceEndpoint>();
517 for (ServiceInfo serviceToRemove : localServicesDiff.getServicesToRemoveFromRegistry()) {
518 removeServiceEndpointIds.add(serviceToRemove.getServiceId());
519 }
520 for (LocalService localService : localServicesDiff.getLocalServicesToPublish()) {
521 publishServiceEndpoints.add(localService.getServiceEndpoint());
522 }
523 for (LocalService localService : localServicesDiff.getLocalServicesToUpdate().keySet()) {
524 ServiceInfo registryServiceInfo = localServicesDiff.getLocalServicesToUpdate().get(localService);
525 publishServiceEndpoints.add(rebuildServiceEndpointForUpdate(localService.getServiceEndpoint(), registryServiceInfo));
526 }
527 boolean batchMode = ConfigContext.getCurrentContextConfig().getBooleanProperty(Config.BATCH_MODE, false);
528 if (!batchMode && (!removeServiceEndpointIds.isEmpty() || !publishServiceEndpoints.isEmpty())) {
529 RemoveAndPublishResult result = this.serviceRegistry.removeAndPublish(removeServiceEndpointIds, publishServiceEndpoints);
530
531 if (!result.getServicesPublished().isEmpty()) {
532 synchronized (serviceLock) {
533 for (ServiceEndpoint publishedService : result.getServicesPublished()) {
534 rebuildLocalServiceEndpointAfterPublishing(publishedService);
535 }
536 }
537 }
538 }
539 }
540
541 protected ServiceEndpoint rebuildServiceEndpointForUpdate(ServiceEndpoint originalEndpoint, ServiceInfo registryServiceInfo) {
542 ServiceEndpoint.Builder builder = ServiceEndpoint.Builder.create(originalEndpoint);
543 builder.getInfo().setServiceId(registryServiceInfo.getServiceId());
544 builder.getInfo().setServiceDescriptorId(registryServiceInfo.getServiceDescriptorId());
545 builder.getInfo().setVersionNumber(registryServiceInfo.getVersionNumber());
546 builder.getDescriptor().setId(registryServiceInfo.getServiceDescriptorId());
547 return builder.build();
548 }
549
550 protected void rebuildLocalServiceEndpointAfterPublishing(ServiceEndpoint publishedService) {
551
552 QName serviceName = publishedService.getInfo().getServiceName();
553 if (localServices.containsKey(serviceName)) {
554 LocalService newLocalService = new LocalService(localServices.get(serviceName), publishedService);
555 localServices.put(serviceName, newLocalService);
556 }
557 }
558
559 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
560 this.serviceRegistry = serviceRegistry;
561 }
562
563 public void setDiffCalculator(ServiceRegistryDiffCalculator diffCalculator) {
564 this.diffCalculator = diffCalculator;
565 }
566
567 public void setServiceExportManager(ServiceExportManager serviceExportManager) {
568 this.serviceExportManager = serviceExportManager;
569 }
570
571 public void setScheduledPool(KSBScheduledPool scheduledPool) {
572 this.scheduledPool = scheduledPool;
573 }
574
575 private static interface SyncProcessor {
576 void sync(CompleteServiceDiff diff);
577 }
578
579 }