1 package org.kuali.rice.ksb.impl.bus;
2
3 import java.util.ArrayList;
4 import java.util.Collections;
5 import java.util.HashMap;
6 import java.util.HashSet;
7 import java.util.Iterator;
8 import java.util.List;
9 import java.util.Map;
10 import java.util.Random;
11 import java.util.Set;
12 import java.util.concurrent.ScheduledFuture;
13 import java.util.concurrent.TimeUnit;
14
15 import javax.xml.namespace.QName;
16
17 import org.apache.commons.lang.StringUtils;
18 import org.apache.log4j.Logger;
19 import org.kuali.rice.core.api.config.property.Config;
20 import org.kuali.rice.core.api.config.property.ConfigContext;
21 import org.kuali.rice.core.api.lifecycle.BaseLifecycle;
22 import org.kuali.rice.ksb.api.bus.Endpoint;
23 import org.kuali.rice.ksb.api.bus.ServiceBus;
24 import org.kuali.rice.ksb.api.bus.ServiceConfiguration;
25 import org.kuali.rice.ksb.api.bus.ServiceDefinition;
26 import org.kuali.rice.ksb.api.registry.RemoveAndPublishResult;
27 import org.kuali.rice.ksb.api.registry.ServiceEndpoint;
28 import org.kuali.rice.ksb.api.registry.ServiceInfo;
29 import org.kuali.rice.ksb.api.registry.ServiceRegistry;
30 import org.kuali.rice.ksb.impl.registry.diff.CompleteServiceDiff;
31 import org.kuali.rice.ksb.impl.registry.diff.LocalServicesDiff;
32 import org.kuali.rice.ksb.impl.registry.diff.RemoteServicesDiff;
33 import org.kuali.rice.ksb.impl.registry.diff.ServiceRegistryDiffCalculator;
34 import org.kuali.rice.ksb.messaging.serviceexporters.ServiceExportManager;
35 import org.kuali.rice.ksb.messaging.threadpool.KSBScheduledPool;
36 import org.springframework.beans.factory.DisposableBean;
37 import org.springframework.beans.factory.InitializingBean;
38
39 public class ServiceBusImpl extends BaseLifecycle implements ServiceBus, InitializingBean, DisposableBean {
40
41 private static final Logger LOG = Logger.getLogger(ServiceBusImpl.class);
42
43 private final Object serviceLock = new Object();
44 private final Object synchronizeLock = new Object();
45 private final Random randomNumber = new Random();
46
47
48 private String instanceId;
49 private ServiceRegistry serviceRegistry;
50 private ServiceRegistryDiffCalculator diffCalculator;
51 private ServiceExportManager serviceExportManager;
52 private KSBScheduledPool scheduledPool;
53
54 private ScheduledFuture<?> registrySyncFuture;
55
56
57
58
59 private final Map<QName, LocalService> localServices;
60
61
62
63
64
65 private final Map<QName, Set<RemoteService>> clientRegistryCache;
66
67 public ServiceBusImpl() {
68 this.localServices = new HashMap<QName, LocalService>();
69 this.clientRegistryCache = new HashMap<QName, Set<RemoteService>>();
70 }
71
72 @Override
73 public void afterPropertiesSet() throws Exception {
74 if (StringUtils.isBlank(instanceId)) {
75 throw new IllegalStateException("a valid instanceId was not injected");
76 }
77 if (serviceRegistry == null) {
78 throw new IllegalStateException("serviceRegistry was not injected");
79 }
80 if (diffCalculator == null) {
81 throw new IllegalStateException("diffCalculator was not injected");
82 }
83 if (scheduledPool == null) {
84 throw new IllegalStateException("scheduledPool was not injected");
85 }
86 }
87
88 @Override
89 public void start() throws Exception {
90 startSynchronizationThread();
91 super.start();
92 }
93
94 protected boolean isDevMode() {
95 return ConfigContext.getCurrentContextConfig().getDevMode();
96 }
97
98 protected void startSynchronizationThread() {
99 synchronized (synchronizeLock) {
100 LOG.info("Starting Service Bus synchronization thread...");
101 if (!isDevMode()) {
102 int refreshRate = ConfigContext.getCurrentContextConfig().getRefreshRate();
103 Runnable runnable = new Runnable() {
104 public void run() {
105 try {
106 synchronize();
107 } catch (Throwable t) {
108 LOG.error("Failed to execute background service bus synchronization.", t);
109 }
110 }
111 };
112 this.registrySyncFuture = scheduledPool.scheduleWithFixedDelay(runnable, 30, refreshRate, TimeUnit.SECONDS);
113 }
114 LOG.info("...Service Bus synchronization thread successfully started.");
115 }
116 }
117
118 @Override
119 public void destroy() throws Exception {
120 LOG.info("Stopping the Service Bus...");
121 stopSynchronizationThread();
122 serviceRegistry.takeInstanceOffline(getInstanceId());
123 LOG.info("...Service Bus successfully stopped.");
124 }
125
126 protected void stopSynchronizationThread() {
127 synchronized (synchronizeLock) {
128
129 if (this.registrySyncFuture != null) {
130 if (!this.registrySyncFuture.cancel(false)) {
131 LOG.warn("Failed to cancel registry sychronization.");
132 }
133 this.registrySyncFuture = null;
134 }
135 }
136 }
137
138 @Override
139 public String getInstanceId() {
140 return this.instanceId;
141 }
142
143 public void setInstanceId(String instanceId) {
144 this.instanceId = instanceId;
145 }
146
147 @Override
148 public List<Endpoint> getEndpoints(QName serviceName) {
149 if (serviceName == null) {
150 throw new IllegalArgumentException("serviceName cannot be null");
151 }
152 List<Endpoint> endpoints = new ArrayList<Endpoint>();
153 synchronized (serviceLock) {
154 endpoints.addAll(getRemoteEndpoints(serviceName));
155 Endpoint localEndpoint = getLocalEndpoint(serviceName);
156 if (localEndpoint != null) {
157 for (Iterator<Endpoint> iterator = endpoints.iterator(); iterator.hasNext();) {
158 Endpoint endpoint = (Endpoint) iterator.next();
159 if (localEndpoint.getServiceConfiguration().equals(endpoint.getServiceConfiguration())) {
160 iterator.remove();
161 break;
162 }
163 }
164
165 endpoints.add(0, localEndpoint);
166 }
167 }
168 return Collections.unmodifiableList(endpoints);
169 }
170
171 @Override
172 public List<Endpoint> getRemoteEndpoints(QName serviceName) {
173 if (serviceName == null) {
174 throw new IllegalArgumentException("serviceName cannot be null");
175 }
176 List<Endpoint> endpoints = new ArrayList<Endpoint>();
177 synchronized (serviceLock) {
178 Set<RemoteService> remoteServices = clientRegistryCache.get(serviceName);
179 if (remoteServices != null) {
180 for (RemoteService remoteService : remoteServices) {
181 endpoints.add(remoteService.getEndpoint());
182 }
183 }
184 }
185 return Collections.unmodifiableList(endpoints);
186 }
187
188 @Override
189 public Endpoint getLocalEndpoint(QName serviceName) {
190 if (serviceName == null) {
191 throw new IllegalArgumentException("serviceName cannot be null");
192 }
193 synchronized (serviceLock) {
194 LocalService localService = localServices.get(serviceName);
195 if (localService != null) {
196 return localService.getEndpoint();
197 }
198 return null;
199 }
200 }
201
202 @Override
203 public Map<QName, Endpoint> getLocalEndpoints() {
204 Map<QName, Endpoint> localEndpoints = new HashMap<QName, Endpoint>();
205 synchronized (serviceLock) {
206 for (QName localServiceName : localServices.keySet()) {
207 LocalService localService = localServices.get(localServiceName);
208 localEndpoints.put(localServiceName, localService.getEndpoint());
209 }
210 }
211 return Collections.unmodifiableMap(localEndpoints);
212 }
213
214 @Override
215 public List<Endpoint> getAllEndpoints() {
216 List<Endpoint> allEndpoints = new ArrayList<Endpoint>();
217 synchronized (serviceLock) {
218 for (QName serviceName : this.localServices.keySet()) {
219 allEndpoints.add(this.localServices.get(serviceName).getEndpoint());
220 }
221 for (QName serviceName : this.clientRegistryCache.keySet()) {
222 Set<RemoteService> remoteServices = clientRegistryCache.get(serviceName);
223 for (RemoteService remoteService : remoteServices) {
224 allEndpoints.add(remoteService.getEndpoint());
225 }
226 }
227 }
228 return Collections.unmodifiableList(allEndpoints);
229 }
230
231 @Override
232 public Endpoint getEndpoint(QName serviceName) {
233 return getEndpoint(serviceName, null);
234 }
235
236 @Override
237 public Endpoint getEndpoint(QName serviceName, String applicationId) {
238 if (serviceName == null) {
239 throw new IllegalArgumentException("serviceName cannot be null");
240 }
241 Endpoint availableEndpoint = null;
242 synchronized (serviceLock) {
243
244 availableEndpoint = getLocalEndpoint(serviceName);
245 if (availableEndpoint == null || (!StringUtils.isBlank(applicationId) && !availableEndpoint.getServiceConfiguration().getApplicationId().equals(applicationId))) {
246
247 Set<RemoteService> remoteServices = clientRegistryCache.get(serviceName);
248 remoteServices = filterByApplicationId(applicationId, remoteServices);
249 if (remoteServices != null && !remoteServices.isEmpty()) {
250
251 RemoteService[] remoteServiceArray = remoteServices.toArray(new RemoteService[0]);
252 RemoteService availableRemoteService = remoteServiceArray[this.randomNumber.nextInt(remoteServiceArray.length)];
253 availableEndpoint = availableRemoteService.getEndpoint();
254 }
255 }
256 }
257 return availableEndpoint;
258 }
259
260 protected Set<RemoteService> filterByApplicationId(String applicationId, Set<RemoteService> remoteServices) {
261 if (StringUtils.isBlank(applicationId) || remoteServices == null || remoteServices.isEmpty()) {
262 return remoteServices;
263 }
264 Set<RemoteService> filtered = new HashSet<RemoteService>();
265 for (RemoteService remoteService : remoteServices) {
266 if (remoteService.getServiceInfo().getApplicationId().equals(applicationId)) {
267 filtered.add(remoteService);
268 }
269 }
270 return filtered;
271 }
272
273 @Override
274 public Endpoint getConfiguredEndpoint(ServiceConfiguration serviceConfiguration) {
275 if (serviceConfiguration == null) {
276 throw new IllegalArgumentException("serviceConfiguration cannot be null");
277 }
278 synchronized (serviceLock) {
279 Endpoint localEndpoint = getLocalEndpoint(serviceConfiguration.getServiceName());
280 if (localEndpoint != null && localEndpoint.getServiceConfiguration().equals(serviceConfiguration)) {
281 return localEndpoint;
282 }
283 List<Endpoint> remoteEndpoints = getRemoteEndpoints(serviceConfiguration.getServiceName());
284 for (Endpoint remoteEndpoint : remoteEndpoints) {
285 if (remoteEndpoint.getServiceConfiguration().equals(serviceConfiguration)) {
286 return remoteEndpoint;
287 }
288 }
289 }
290 return null;
291 }
292
293 @Override
294 public Object getService(QName serviceName) {
295 return getService(serviceName, null);
296 }
297
298 @Override
299 public Object getService(QName serviceName, String applicationId) {
300 Endpoint availableEndpoint = getEndpoint(serviceName, applicationId);
301 if (availableEndpoint == null) {
302 return null;
303 }
304 return availableEndpoint.getService();
305 }
306
307 @Override
308 public ServiceConfiguration publishService(ServiceDefinition serviceDefinition, boolean synchronize) {
309 if (serviceDefinition == null) {
310 throw new IllegalArgumentException("serviceDefinition cannot be null");
311 }
312 LocalService localService = new LocalService(getInstanceId(), serviceDefinition);
313 synchronized (serviceLock) {
314 serviceExportManager.exportService(serviceDefinition);
315 localServices.put(serviceDefinition.getServiceName(), localService);
316 }
317 if (synchronize) {
318 synchronize();
319 }
320 return localService.getEndpoint().getServiceConfiguration();
321 }
322
323 @Override
324 public List<ServiceConfiguration> publishServices(List<ServiceDefinition> serviceDefinitions, boolean synchronize) {
325 if (serviceDefinitions == null) {
326 throw new IllegalArgumentException("serviceDefinitions list cannot be null");
327 }
328 List<ServiceConfiguration> serviceConfigurations = new ArrayList<ServiceConfiguration>();
329 synchronized (serviceLock) {
330 for (ServiceDefinition serviceDefinition : serviceDefinitions) {
331 ServiceConfiguration serviceConfiguration = publishService(serviceDefinition, false);
332 serviceConfigurations.add(serviceConfiguration);
333 }
334 }
335 if (synchronize) {
336 synchronize();
337 }
338 return Collections.unmodifiableList(serviceConfigurations);
339 }
340
341 @Override
342 public boolean removeService(QName serviceName, boolean synchronize) {
343 if (serviceName == null) {
344 throw new IllegalArgumentException("serviceName cannot be null");
345 }
346 boolean serviceRemoved = false;
347 synchronized (serviceLock) {
348 LocalService localService = localServices.remove(serviceName);
349 serviceRemoved = localService != null;
350 serviceExportManager.removeService(serviceName);
351 }
352 if (serviceRemoved && synchronize) {
353 synchronize();
354 }
355 return serviceRemoved;
356 }
357
358 @Override
359 public List<Boolean> removeServices(List<QName> serviceNames, boolean synchronize) {
360 if (serviceNames == null) {
361 throw new IllegalArgumentException("serviceNames cannot be null");
362 }
363 boolean serviceRemoved = false;
364 List<Boolean> servicesRemoved = new ArrayList<Boolean>();
365 synchronized (serviceLock) {
366 for (QName serviceName : serviceNames) {
367 serviceExportManager.removeService(serviceName);
368 LocalService localService = localServices.remove(serviceName);
369 if (localService != null) {
370 servicesRemoved.add(Boolean.TRUE);
371 serviceRemoved = true;
372 } else {
373 servicesRemoved.add(Boolean.FALSE);
374 }
375 }
376 }
377 if (serviceRemoved && synchronize) {
378 synchronize();
379 }
380 return servicesRemoved;
381 }
382
383 @Override
384 public void synchronize() {
385 if (!isDevMode() && isStarted()) {
386 synchronized (synchronizeLock) {
387 List<LocalService> localServicesList;
388 List<RemoteService> clientRegistryCacheList;
389 synchronized (serviceLock) {
390
391 localServicesList = new ArrayList<LocalService>(this.localServices.values());
392 clientRegistryCacheList = new ArrayList<RemoteService>();
393 for (Set<RemoteService> remoteServices : this.clientRegistryCache.values()) {
394 clientRegistryCacheList.addAll(remoteServices);
395 }
396 }
397 CompleteServiceDiff serviceDiff = diffCalculator.diffServices(getInstanceId(), localServicesList, clientRegistryCacheList);
398
399 RemoteServicesDiff remoteServicesDiff = serviceDiff.getRemoteServicesDiff();
400 processRemoteServiceDiff(remoteServicesDiff);
401
402 LocalServicesDiff localServicesDiff = serviceDiff.getLocalServicesDiff();
403 processLocalServiceDiff(localServicesDiff);
404 }
405 }
406 }
407
408 protected void processRemoteServiceDiff(RemoteServicesDiff remoteServicesDiff) {
409
410
411 synchronized (serviceLock) {
412
413 List<RemoteService> removedServices = remoteServicesDiff.getRemovedServices();
414 for (RemoteService removedRemoteService : removedServices) {
415 Set<RemoteService> remoteServiceSet = this.clientRegistryCache.get(removedRemoteService.getServiceName());
416 if (remoteServiceSet != null) {
417 boolean wasRemoved = remoteServiceSet.remove(removedRemoteService);
418 if (!wasRemoved) {
419 LOG.warn("Failed to remove remoteService during synchronization: " + removedRemoteService);
420 }
421 }
422 }
423 List<ServiceInfo> newServices = remoteServicesDiff.getNewServices();
424 for (ServiceInfo newService : newServices) {
425 Set<RemoteService> remoteServiceSet = clientRegistryCache.get(newService.getServiceName());
426 if (remoteServiceSet == null) {
427 remoteServiceSet = new HashSet<RemoteService>();
428 clientRegistryCache.put(newService.getServiceName(), remoteServiceSet);
429 }
430 remoteServiceSet.add(new RemoteService(newService, this.serviceRegistry));
431 }
432 }
433 }
434
435 protected void processLocalServiceDiff(LocalServicesDiff localServicesDiff) {
436 List<String> removeServiceEndpointIds = new ArrayList<String>();
437 List<ServiceEndpoint> publishServiceEndpoints = new ArrayList<ServiceEndpoint>();
438 for (ServiceInfo serviceToRemove : localServicesDiff.getServicesToRemoveFromRegistry()) {
439 removeServiceEndpointIds.add(serviceToRemove.getServiceId());
440 }
441 for (LocalService localService : localServicesDiff.getLocalServicesToPublish()) {
442 publishServiceEndpoints.add(localService.getServiceEndpoint());
443 }
444 for (LocalService localService : localServicesDiff.getLocalServicesToUpdate().keySet()) {
445 ServiceInfo registryServiceInfo = localServicesDiff.getLocalServicesToUpdate().get(localService);
446 publishServiceEndpoints.add(rebuildServiceEndpointForUpdate(localService.getServiceEndpoint(), registryServiceInfo));
447 }
448 boolean batchMode = ConfigContext.getCurrentContextConfig().getBooleanProperty(Config.BATCH_MODE, false);
449 if (!batchMode && (!removeServiceEndpointIds.isEmpty() || !publishServiceEndpoints.isEmpty())) {
450 RemoveAndPublishResult result = this.serviceRegistry.removeAndPublish(removeServiceEndpointIds, publishServiceEndpoints);
451
452 if (!result.getServicesPublished().isEmpty()) {
453 synchronized (serviceLock) {
454 for (ServiceEndpoint publishedService : result.getServicesPublished()) {
455 rebuildLocalServiceEndpointAfterPublishing(publishedService);
456 }
457 }
458 }
459 }
460 }
461
462 protected ServiceEndpoint rebuildServiceEndpointForUpdate(ServiceEndpoint originalEndpoint, ServiceInfo registryServiceInfo) {
463 ServiceEndpoint.Builder builder = ServiceEndpoint.Builder.create(originalEndpoint);
464 builder.getInfo().setServiceId(registryServiceInfo.getServiceId());
465 builder.getInfo().setServiceDescriptorId(registryServiceInfo.getServiceDescriptorId());
466 builder.getInfo().setVersionNumber(registryServiceInfo.getVersionNumber());
467 builder.getDescriptor().setId(registryServiceInfo.getServiceDescriptorId());
468 return builder.build();
469 }
470
471 protected void rebuildLocalServiceEndpointAfterPublishing(ServiceEndpoint publishedService) {
472
473 QName serviceName = publishedService.getInfo().getServiceName();
474 if (localServices.containsKey(serviceName)) {
475 LocalService newLocalService = new LocalService(localServices.get(serviceName), publishedService);
476 localServices.put(serviceName, newLocalService);
477 }
478 }
479
480 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
481 this.serviceRegistry = serviceRegistry;
482 }
483
484 public void setDiffCalculator(ServiceRegistryDiffCalculator diffCalculator) {
485 this.diffCalculator = diffCalculator;
486 }
487
488 public void setServiceExportManager(ServiceExportManager serviceExportManager) {
489 this.serviceExportManager = serviceExportManager;
490 }
491
492 public void setScheduledPool(KSBScheduledPool scheduledPool) {
493 this.scheduledPool = scheduledPool;
494 }
495
496 }