View Javadoc

1   /**
2    * Copyright 2005-2012 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.ksb.impl.bus;
17  
18  import java.util.ArrayList;
19  import java.util.Collections;
20  import java.util.HashMap;
21  import java.util.HashSet;
22  import java.util.Iterator;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Random;
26  import java.util.Set;
27  import java.util.concurrent.ScheduledFuture;
28  import java.util.concurrent.TimeUnit;
29  
30  import javax.xml.namespace.QName;
31  
32  import org.apache.commons.lang.StringUtils;
33  import org.apache.log4j.Logger;
34  import org.kuali.rice.core.api.config.property.Config;
35  import org.kuali.rice.core.api.config.property.ConfigContext;
36  import org.kuali.rice.core.api.lifecycle.BaseLifecycle;
37  import org.kuali.rice.ksb.api.bus.Endpoint;
38  import org.kuali.rice.ksb.api.bus.ServiceBus;
39  import org.kuali.rice.ksb.api.bus.ServiceConfiguration;
40  import org.kuali.rice.ksb.api.bus.ServiceDefinition;
41  import org.kuali.rice.ksb.api.registry.RemoveAndPublishResult;
42  import org.kuali.rice.ksb.api.registry.ServiceEndpoint;
43  import org.kuali.rice.ksb.api.registry.ServiceInfo;
44  import org.kuali.rice.ksb.api.registry.ServiceRegistry;
45  import org.kuali.rice.ksb.impl.bus.diff.CompleteServiceDiff;
46  import org.kuali.rice.ksb.impl.bus.diff.LocalServicesDiff;
47  import org.kuali.rice.ksb.impl.bus.diff.RemoteServicesDiff;
48  import org.kuali.rice.ksb.impl.bus.diff.ServiceRegistryDiffCalculator;
49  import org.kuali.rice.ksb.messaging.serviceexporters.ServiceExportManager;
50  import org.kuali.rice.ksb.messaging.threadpool.KSBScheduledPool;
51  import org.springframework.beans.factory.DisposableBean;
52  import org.springframework.beans.factory.InitializingBean;
53  
54  public class ServiceBusImpl extends BaseLifecycle implements ServiceBus, InitializingBean, DisposableBean {
55  	
56  	private static final Logger LOG = Logger.getLogger(ServiceBusImpl.class);
57  	
58  	private final Object serviceLock = new Object();
59  	private final Object synchronizeLock = new Object();
60  	private final Random randomNumber = new Random();
61  	
62  	// injected values
63  	private String instanceId;
64  	private ServiceRegistry serviceRegistry;
65  	private ServiceRegistryDiffCalculator diffCalculator;
66  	private ServiceExportManager serviceExportManager;
67  	private KSBScheduledPool scheduledPool;
68  	
69  	private ScheduledFuture<?> registrySyncFuture;
70  	
71  	/**
72  	 * Contains endpoints for services which were published by this client application.
73  	 */
74  	private final Map<QName, LocalService> localServices;
75  	
76  	/**
77  	 * Contains endpoints for services which exist remotely.  This list may not be
78  	 * entirely complete as entries get lazily loaded into it as services are requested.
79  	 */
80  	private final Map<QName, Set<RemoteService>> clientRegistryCache;
81  		
82  	public ServiceBusImpl() {
83  		this.localServices = new HashMap<QName, LocalService>();
84  		this.clientRegistryCache = new HashMap<QName, Set<RemoteService>>();
85  	}
86  	
87  	@Override
88  	public void afterPropertiesSet() throws Exception {
89  		if (StringUtils.isBlank(instanceId)) {
90  			throw new IllegalStateException("a valid instanceId was not injected");
91  		}
92  		if (serviceRegistry == null) {
93  			throw new IllegalStateException("serviceRegistry was not injected");
94  		}
95  		if (diffCalculator == null) {
96  			throw new IllegalStateException("diffCalculator was not injected");
97  		}
98  		if (scheduledPool == null) {
99  			throw new IllegalStateException("scheduledPool was not injected");
100 		}
101 	}
102 	
103 	@Override
104 	public void start() throws Exception {
105 		startSynchronizationThread();
106 		super.start();
107 	}
108 		
109 	protected boolean isDevMode() {
110 		return ConfigContext.getCurrentContextConfig().getDevMode();
111 	}
112 
113 	protected void startSynchronizationThread() {
114 		synchronized (synchronizeLock) {
115 			LOG.info("Starting Service Bus synchronization thread...");
116 			if (!isDevMode()) {
117 				int refreshRate = ConfigContext.getCurrentContextConfig().getRefreshRate();
118 				Runnable runnable = new Runnable() {
119 					public void run() {
120 						try {
121 							synchronize();
122 						} catch (Throwable t) {
123 							LOG.error("Failed to execute background service bus synchronization.", t);
124 						}
125 					}
126 				};
127 				this.registrySyncFuture = scheduledPool.scheduleWithFixedDelay(runnable, 30, refreshRate, TimeUnit.SECONDS);
128 			}
129 			LOG.info("...Service Bus synchronization thread successfully started.");
130 		}
131 	}
132 	
133 	@Override
134 	public void destroy() throws Exception {
135 		LOG.info("Stopping the Service Bus...");
136 		stopSynchronizationThread();
137 		serviceRegistry.takeInstanceOffline(getInstanceId());
138 		LOG.info("...Service Bus successfully stopped.");
139 	}
140 	
141 	protected void stopSynchronizationThread() {
142 		synchronized (synchronizeLock) {
143 			// remove services from the bus
144 			if (this.registrySyncFuture != null) {
145 				if (!this.registrySyncFuture.cancel(false)) {
146 					LOG.warn("Failed to cancel registry sychronization.");
147 				}
148 				this.registrySyncFuture = null;
149 			}
150 		}
151 	}
152 
153 	@Override
154 	public String getInstanceId() {
155 		return this.instanceId;
156 	}
157 	
158 	public void setInstanceId(String instanceId) {
159 		this.instanceId = instanceId;
160 	}
161 	
162 	@Override
163 	public List<Endpoint> getEndpoints(QName serviceName) {
164 		if (serviceName == null) {
165 			throw new IllegalArgumentException("serviceName cannot be null");
166 		}
167 		List<Endpoint> endpoints = new ArrayList<Endpoint>();
168 		synchronized (serviceLock) {
169 			endpoints.addAll(getRemoteEndpoints(serviceName));
170 			Endpoint localEndpoint = getLocalEndpoint(serviceName);
171 			if (localEndpoint != null) {
172 				for (Iterator<Endpoint> iterator = endpoints.iterator(); iterator.hasNext();) {
173 					Endpoint endpoint = (Endpoint) iterator.next();
174 					if (localEndpoint.getServiceConfiguration().equals(endpoint.getServiceConfiguration())) {
175 						iterator.remove();
176 						break;
177 					}
178 				}
179 				// add at first position, just because we like the local endpoint the best, it's our friend ;)
180 				endpoints.add(0, localEndpoint);
181 			}
182 		}
183 		return Collections.unmodifiableList(endpoints);
184 	}
185 	
186 	@Override
187 	public List<Endpoint> getRemoteEndpoints(QName serviceName) {
188 		if (serviceName == null) {
189 			throw new IllegalArgumentException("serviceName cannot be null");
190 		}
191 		List<Endpoint> endpoints = new ArrayList<Endpoint>();
192 		synchronized (serviceLock) {
193 			Set<RemoteService> remoteServices = clientRegistryCache.get(serviceName);
194 			if (remoteServices != null) {
195 				for (RemoteService remoteService : remoteServices) {
196 					endpoints.add(remoteService.getEndpoint());
197 				}
198 			}
199 		}
200 		return Collections.unmodifiableList(endpoints);
201 	}
202 
203 	@Override
204 	public Endpoint getLocalEndpoint(QName serviceName) {
205 		if (serviceName == null) {
206 			throw new IllegalArgumentException("serviceName cannot be null");
207 		}
208 		synchronized (serviceLock) {
209 			LocalService localService = localServices.get(serviceName);
210 			if (localService != null) {
211 				return localService.getEndpoint();
212 			}
213 			return null;
214 		}
215 	}
216 
217 	@Override
218 	public Map<QName, Endpoint> getLocalEndpoints() {
219 		Map<QName, Endpoint> localEndpoints = new HashMap<QName, Endpoint>();
220 		synchronized (serviceLock) {
221 			for (QName localServiceName : localServices.keySet()) {
222 				LocalService localService = localServices.get(localServiceName);
223 				localEndpoints.put(localServiceName, localService.getEndpoint());
224 			}
225 		}
226 		return Collections.unmodifiableMap(localEndpoints);
227 	}
228 
229 	@Override
230 	public List<Endpoint> getAllEndpoints() {
231 		List<Endpoint> allEndpoints = new ArrayList<Endpoint>();
232 		synchronized (serviceLock) {
233 			for (QName serviceName : this.localServices.keySet()) {
234 				allEndpoints.add(this.localServices.get(serviceName).getEndpoint());
235 			}
236 			for (QName serviceName : this.clientRegistryCache.keySet()) {
237 				Set<RemoteService> remoteServices = clientRegistryCache.get(serviceName);
238 				for (RemoteService remoteService : remoteServices) {
239 					allEndpoints.add(remoteService.getEndpoint());
240 				}
241 			}
242 		}
243 		return Collections.unmodifiableList(allEndpoints);
244 	}
245 
246 	@Override
247 	public Endpoint getEndpoint(QName serviceName) {
248 		return getEndpoint(serviceName, null);
249 	}
250 	
251 	@Override
252     public Endpoint getEndpoint(QName serviceName, String applicationId) {
253         if (serviceName == null) {
254             throw new IllegalArgumentException("serviceName cannot be null");
255         }
256         Endpoint availableEndpoint = null;
257         synchronized (serviceLock) {
258             // look at local services first
259             availableEndpoint = getLocalEndpoint(serviceName);
260             if (availableEndpoint == null || (!StringUtils.isBlank(applicationId) && !availableEndpoint.getServiceConfiguration().getApplicationId().equals(applicationId))) {
261                  // TODO - would be better to return an Endpoint that contained an internal proxy to all the services so fail-over would be easier to implement!
262                 Set<RemoteService> remoteServices = clientRegistryCache.get(serviceName);
263                 remoteServices = filterByApplicationId(applicationId, remoteServices);
264                 if (remoteServices != null && !remoteServices.isEmpty()) {
265                     // TODO - this should also probably check the current status of the service?
266                     RemoteService[] remoteServiceArray = remoteServices.toArray(new RemoteService[0]);
267                     RemoteService availableRemoteService = remoteServiceArray[this.randomNumber.nextInt(remoteServiceArray.length)];
268                     availableEndpoint = availableRemoteService.getEndpoint();
269                 }
270             }
271         }
272         return availableEndpoint;
273     }
274 	
275 	protected Set<RemoteService> filterByApplicationId(String applicationId, Set<RemoteService> remoteServices) {
276 	    if (StringUtils.isBlank(applicationId) || remoteServices == null || remoteServices.isEmpty()) {
277 	        return remoteServices;
278 	    }
279 	    Set<RemoteService> filtered = new HashSet<RemoteService>();
280 	    for (RemoteService remoteService : remoteServices) {
281 	        if (remoteService.getServiceInfo().getApplicationId().equals(applicationId)) {
282 	            filtered.add(remoteService);
283 	        }
284 	    }
285 	    return filtered;
286 	}
287 	
288 	@Override
289 	public Endpoint getConfiguredEndpoint(ServiceConfiguration serviceConfiguration) {
290 		if (serviceConfiguration == null) {
291 			throw new IllegalArgumentException("serviceConfiguration cannot be null");
292 		}
293 		synchronized (serviceLock) {
294 			Endpoint localEndpoint = getLocalEndpoint(serviceConfiguration.getServiceName());
295 			if (localEndpoint != null && localEndpoint.getServiceConfiguration().equals(serviceConfiguration)) {
296 				return localEndpoint;
297 			}
298 			List<Endpoint> remoteEndpoints = getRemoteEndpoints(serviceConfiguration.getServiceName());
299 			for (Endpoint remoteEndpoint : remoteEndpoints) {
300 				if (remoteEndpoint.getServiceConfiguration().equals(serviceConfiguration)) {
301 					return remoteEndpoint;
302 				}
303 			}
304 		}
305 		return null;
306 	}
307 
308 	@Override
309     public Object getService(QName serviceName) {
310         return getService(serviceName, null);
311     }
312 	
313 	@Override
314 	public Object getService(QName serviceName, String applicationId) {
315 		Endpoint availableEndpoint = getEndpoint(serviceName, applicationId);
316 		if (availableEndpoint == null) {
317 			return null;
318 		}
319 		return availableEndpoint.getService();
320 	}
321 
322 	@Override
323 	public ServiceConfiguration publishService(ServiceDefinition serviceDefinition, boolean synchronize) {
324 		if (serviceDefinition == null) {
325 			throw new IllegalArgumentException("serviceDefinition cannot be null");
326 		}
327 		LocalService localService = new LocalService(getInstanceId(), serviceDefinition);
328 		synchronized (serviceLock) {
329 			serviceExportManager.exportService(serviceDefinition);
330 			localServices.put(serviceDefinition.getServiceName(), localService);
331 		}
332 		if (synchronize) {
333 			synchronize();
334 		}
335 		return localService.getEndpoint().getServiceConfiguration();
336 	}
337 
338 	@Override
339 	public List<ServiceConfiguration> publishServices(List<ServiceDefinition> serviceDefinitions, boolean synchronize) {
340 		if (serviceDefinitions == null) {
341 			throw new IllegalArgumentException("serviceDefinitions list cannot be null");
342 		}
343 		List<ServiceConfiguration> serviceConfigurations = new ArrayList<ServiceConfiguration>();
344 		synchronized (serviceLock) {
345 			for (ServiceDefinition serviceDefinition : serviceDefinitions) {
346 				ServiceConfiguration serviceConfiguration = publishService(serviceDefinition, false);
347 				serviceConfigurations.add(serviceConfiguration);
348 			}
349 		}
350 		if (synchronize) {
351 			synchronize();
352 		}
353 		return Collections.unmodifiableList(serviceConfigurations);
354 	}
355 
356 	@Override
357 	public boolean removeService(QName serviceName, boolean synchronize) {
358 		if (serviceName == null) {
359 			throw new IllegalArgumentException("serviceName cannot be null");
360 		}
361 		boolean serviceRemoved = false;
362 		synchronized (serviceLock) {
363 			LocalService localService = localServices.remove(serviceName);
364 			serviceRemoved = localService != null;
365 			serviceExportManager.removeService(serviceName);
366 		}
367 		if (serviceRemoved && synchronize) {
368 			synchronize();
369 		}
370 		return serviceRemoved;
371 	}
372 
373 	@Override
374 	public List<Boolean> removeServices(List<QName> serviceNames, boolean synchronize) {
375 		if (serviceNames == null) {
376 			throw new IllegalArgumentException("serviceNames cannot be null");
377 		}
378 		boolean serviceRemoved = false;
379 		List<Boolean> servicesRemoved = new ArrayList<Boolean>();
380 		synchronized (serviceLock) {
381 			for (QName serviceName : serviceNames) {
382 				serviceExportManager.removeService(serviceName);
383 				LocalService localService = localServices.remove(serviceName);
384 				if (localService != null) {
385 					servicesRemoved.add(Boolean.TRUE);
386 					serviceRemoved = true;
387 				} else {
388 					servicesRemoved.add(Boolean.FALSE);
389 				}
390 			}
391 		}
392 		if (serviceRemoved && synchronize) {
393 			synchronize();
394 		}
395 		return servicesRemoved;
396 	}
397 
398     protected void synchronizeAndProcess(SyncProcessor processor) {
399         if (!isDevMode()) {
400 			synchronized (synchronizeLock) {
401 				List<LocalService> localServicesList;
402 				List<RemoteService> clientRegistryCacheList;
403 				synchronized (serviceLock) {
404 					// first, flatten the lists
405 					localServicesList = new ArrayList<LocalService>(this.localServices.values());
406 					clientRegistryCacheList = new ArrayList<RemoteService>();
407 					for (Set<RemoteService> remoteServices : this.clientRegistryCache.values()) {
408 						clientRegistryCacheList.addAll(remoteServices);
409 					}
410 				}
411 				CompleteServiceDiff serviceDiff = diffCalculator.diffServices(getInstanceId(), localServicesList, clientRegistryCacheList);
412                 logCompleteServiceDiff(serviceDiff);
413                 processor.sync(serviceDiff);
414             }
415         }
416     }
417 
418 	@Override
419 	public void synchronize() {
420         synchronizeAndProcess(new SyncProcessor() {
421             @Override
422             public void sync(CompleteServiceDiff diff) {
423                 RemoteServicesDiff remoteServicesDiff = diff.getRemoteServicesDiff();
424 				processRemoteServiceDiff(remoteServicesDiff);
425 				LocalServicesDiff localServicesDiff = diff.getLocalServicesDiff();
426 				processLocalServiceDiff(localServicesDiff);
427             }
428         });
429 	}
430 
431     @Override
432 	public void synchronizeRemoteServices() {
433         synchronizeAndProcess(new SyncProcessor() {
434             @Override
435             public void sync(CompleteServiceDiff diff) {
436                 RemoteServicesDiff remoteServicesDiff = diff.getRemoteServicesDiff();
437 				processRemoteServiceDiff(remoteServicesDiff);
438             }
439         });
440 	}
441 
442     @Override
443     public void synchronizeLocalServices() {
444         synchronizeAndProcess(new SyncProcessor() {
445             @Override
446             public void sync(CompleteServiceDiff diff) {
447                 LocalServicesDiff localServicesDiff = diff.getLocalServicesDiff();
448                 processLocalServiceDiff(localServicesDiff);
449             }
450         });
451     }
452 
453     protected void logCompleteServiceDiff(CompleteServiceDiff serviceDiff) {
454         RemoteServicesDiff remoteServicesDiff = serviceDiff.getRemoteServicesDiff();
455         int newServices = remoteServicesDiff.getNewServices().size();
456         int removedServices = remoteServicesDiff.getRemovedServices().size();
457 
458         LocalServicesDiff localServicesDiff = serviceDiff.getLocalServicesDiff();
459         int servicesToPublish = localServicesDiff.getLocalServicesToPublish().size();
460         int servicesToUpdate = localServicesDiff.getLocalServicesToUpdate().size();
461         int servicesToRemove = localServicesDiff.getServicesToRemoveFromRegistry().size();
462 
463         if (newServices + removedServices + servicesToPublish + servicesToUpdate + servicesToRemove > 0) {
464             LOG.info("Found service changes during synchronization: remoteNewServices=" + newServices +
465                     ", remoteRemovedServices=" + removedServices +
466                     ", localServicesToPublish=" + servicesToPublish +
467                     ", localServicesToUpdate=" + servicesToUpdate +
468                     ", localServicesToRemove=" + servicesToRemove);
469         }
470     }
471 		
472 	protected void processRemoteServiceDiff(RemoteServicesDiff remoteServicesDiff) {
473 		// note that since there is a gap between when the original services are acquired, the diff, and this subsequent critical section
474 		// the list of local and client registry services could have changed, so that needs to be considered in the remaining code
475 		synchronized (serviceLock) {
476 			// first, let's update what we know about the remote services
477 			List<RemoteService> removedServices = remoteServicesDiff.getRemovedServices();
478 			for (RemoteService removedRemoteService : removedServices) {
479 				Set<RemoteService> remoteServiceSet = this.clientRegistryCache.get(removedRemoteService.getServiceName());
480 				if (remoteServiceSet != null) {
481 					boolean wasRemoved = remoteServiceSet.remove(removedRemoteService);
482 					if (!wasRemoved) {
483 						LOG.warn("Failed to remove remoteService during synchronization: " + removedRemoteService);
484 					}
485 				}
486 			}
487 			List<ServiceInfo> newServices = remoteServicesDiff.getNewServices();
488 			for (ServiceInfo newService : newServices) {
489 				Set<RemoteService> remoteServiceSet = clientRegistryCache.get(newService.getServiceName());
490 				if (remoteServiceSet == null) {
491 					remoteServiceSet = new HashSet<RemoteService>();
492 					clientRegistryCache.put(newService.getServiceName(), remoteServiceSet);
493 				}
494 				remoteServiceSet.add(new RemoteService(newService, this.serviceRegistry));
495 			}
496 		}
497 	}
498 	
499 	protected void processLocalServiceDiff(LocalServicesDiff localServicesDiff) {
500 		List<String> removeServiceEndpointIds = new ArrayList<String>();
501 		List<ServiceEndpoint> publishServiceEndpoints = new ArrayList<ServiceEndpoint>();
502 		for (ServiceInfo serviceToRemove : localServicesDiff.getServicesToRemoveFromRegistry()) {
503 			removeServiceEndpointIds.add(serviceToRemove.getServiceId());
504 		}
505 		for (LocalService localService : localServicesDiff.getLocalServicesToPublish()) {
506 			publishServiceEndpoints.add(localService.getServiceEndpoint());
507 		}
508 		for (LocalService localService : localServicesDiff.getLocalServicesToUpdate().keySet()) {
509 			ServiceInfo registryServiceInfo = localServicesDiff.getLocalServicesToUpdate().get(localService);
510 			publishServiceEndpoints.add(rebuildServiceEndpointForUpdate(localService.getServiceEndpoint(), registryServiceInfo));
511 		}
512 		boolean batchMode = ConfigContext.getCurrentContextConfig().getBooleanProperty(Config.BATCH_MODE, false);
513 		if (!batchMode && (!removeServiceEndpointIds.isEmpty() || !publishServiceEndpoints.isEmpty())) {
514 			RemoveAndPublishResult result = this.serviceRegistry.removeAndPublish(removeServiceEndpointIds, publishServiceEndpoints);
515 			// now update the ServiceEndpoints for our local services so we can get the proper id for them
516 			if (!result.getServicesPublished().isEmpty()) {
517 				synchronized (serviceLock) {
518 					for (ServiceEndpoint publishedService : result.getServicesPublished()) {
519 						rebuildLocalServiceEndpointAfterPublishing(publishedService);
520 					}
521 				}
522 			}
523 		}
524 	}
525 	
526 	protected ServiceEndpoint rebuildServiceEndpointForUpdate(ServiceEndpoint originalEndpoint, ServiceInfo registryServiceInfo) {
527 		ServiceEndpoint.Builder builder = ServiceEndpoint.Builder.create(originalEndpoint);
528 		builder.getInfo().setServiceId(registryServiceInfo.getServiceId());
529 		builder.getInfo().setServiceDescriptorId(registryServiceInfo.getServiceDescriptorId());
530 		builder.getInfo().setVersionNumber(registryServiceInfo.getVersionNumber());
531 		builder.getDescriptor().setId(registryServiceInfo.getServiceDescriptorId());
532 		return builder.build();
533 	}
534 	
535 	protected void rebuildLocalServiceEndpointAfterPublishing(ServiceEndpoint publishedService) {
536 		// verify the service is still published
537 		QName serviceName = publishedService.getInfo().getServiceName();
538 		if (localServices.containsKey(serviceName)) {
539 			LocalService newLocalService = new LocalService(localServices.get(serviceName), publishedService);
540 			localServices.put(serviceName, newLocalService);
541 		}
542 	}
543 
544 	public void setServiceRegistry(ServiceRegistry serviceRegistry) {
545 		this.serviceRegistry = serviceRegistry;
546 	}
547 	
548 	public void setDiffCalculator(ServiceRegistryDiffCalculator diffCalculator) {
549 		this.diffCalculator = diffCalculator;
550 	}
551 	
552 	public void setServiceExportManager(ServiceExportManager serviceExportManager) {
553 		this.serviceExportManager = serviceExportManager;
554 	}
555 	
556 	public void setScheduledPool(KSBScheduledPool scheduledPool) {
557 		this.scheduledPool = scheduledPool;
558 	}
559 
560     private static interface SyncProcessor {
561         void sync(CompleteServiceDiff diff);
562     }
563 	
564 }