View Javadoc

1   package org.kuali.rice.ksb.impl.bus;
2   
3   import java.util.ArrayList;
4   import java.util.Collections;
5   import java.util.HashMap;
6   import java.util.HashSet;
7   import java.util.Iterator;
8   import java.util.List;
9   import java.util.Map;
10  import java.util.Random;
11  import java.util.Set;
12  import java.util.concurrent.ScheduledFuture;
13  import java.util.concurrent.TimeUnit;
14  
15  import javax.xml.namespace.QName;
16  
17  import org.apache.commons.lang.StringUtils;
18  import org.apache.log4j.Logger;
19  import org.kuali.rice.core.api.config.property.Config;
20  import org.kuali.rice.core.api.config.property.ConfigContext;
21  import org.kuali.rice.core.api.lifecycle.BaseLifecycle;
22  import org.kuali.rice.ksb.api.bus.Endpoint;
23  import org.kuali.rice.ksb.api.bus.ServiceBus;
24  import org.kuali.rice.ksb.api.bus.ServiceConfiguration;
25  import org.kuali.rice.ksb.api.bus.ServiceDefinition;
26  import org.kuali.rice.ksb.api.registry.RemoveAndPublishResult;
27  import org.kuali.rice.ksb.api.registry.ServiceEndpoint;
28  import org.kuali.rice.ksb.api.registry.ServiceInfo;
29  import org.kuali.rice.ksb.api.registry.ServiceRegistry;
30  import org.kuali.rice.ksb.impl.registry.diff.CompleteServiceDiff;
31  import org.kuali.rice.ksb.impl.registry.diff.LocalServicesDiff;
32  import org.kuali.rice.ksb.impl.registry.diff.RemoteServicesDiff;
33  import org.kuali.rice.ksb.impl.registry.diff.ServiceRegistryDiffCalculator;
34  import org.kuali.rice.ksb.messaging.serviceexporters.ServiceExportManager;
35  import org.kuali.rice.ksb.messaging.threadpool.KSBScheduledPool;
36  import org.springframework.beans.factory.DisposableBean;
37  import org.springframework.beans.factory.InitializingBean;
38  
39  public class ServiceBusImpl extends BaseLifecycle implements ServiceBus, InitializingBean, DisposableBean {
40  	
41  	private static final Logger LOG = Logger.getLogger(ServiceBusImpl.class);
42  	
43  	private final Object serviceLock = new Object();
44  	private final Object synchronizeLock = new Object();
45  	private final Random randomNumber = new Random();
46  	
47  	// injected values
48  	private String instanceId;
49  	private ServiceRegistry serviceRegistry;
50  	private ServiceRegistryDiffCalculator diffCalculator;
51  	private ServiceExportManager serviceExportManager;
52  	private KSBScheduledPool scheduledPool;
53  	
54  	private ScheduledFuture<?> registrySyncFuture;
55  	
56  	/**
57  	 * Contains endpoints for services which were published by this client application.
58  	 */
59  	private final Map<QName, LocalService> localServices;
60  	
61  	/**
62  	 * Contains endpoints for services which exist remotely.  This list may not be
63  	 * entirely complete as entries get lazily loaded into it as services are requested.
64  	 */
65  	private final Map<QName, Set<RemoteService>> clientRegistryCache;
66  		
67  	public ServiceBusImpl() {
68  		this.localServices = new HashMap<QName, LocalService>();
69  		this.clientRegistryCache = new HashMap<QName, Set<RemoteService>>();
70  	}
71  	
72  	@Override
73  	public void afterPropertiesSet() throws Exception {
74  		if (StringUtils.isBlank(instanceId)) {
75  			throw new IllegalStateException("a valid instanceId was not injected");
76  		}
77  		if (serviceRegistry == null) {
78  			throw new IllegalStateException("serviceRegistry was not injected");
79  		}
80  		if (diffCalculator == null) {
81  			throw new IllegalStateException("diffCalculator was not injected");
82  		}
83  		if (scheduledPool == null) {
84  			throw new IllegalStateException("scheduledPool was not injected");
85  		}
86  	}
87  	
88  	@Override
89  	public void start() throws Exception {
90  		startSynchronizationThread();
91  		super.start();
92  	}
93  		
94  	protected boolean isDevMode() {
95  		return ConfigContext.getCurrentContextConfig().getDevMode();
96  	}
97  
98  	protected void startSynchronizationThread() {
99  		synchronized (synchronizeLock) {
100 			LOG.info("Starting Service Bus synchronization thread...");
101 			if (!isDevMode()) {
102 				int refreshRate = ConfigContext.getCurrentContextConfig().getRefreshRate();
103 				Runnable runnable = new Runnable() {
104 					public void run() {
105 						try {
106 							synchronize();
107 						} catch (Throwable t) {
108 							LOG.error("Failed to execute background service bus synchronization.", t);
109 						}
110 					}
111 				};
112 				this.registrySyncFuture = scheduledPool.scheduleWithFixedDelay(runnable, 30, refreshRate, TimeUnit.SECONDS);
113 			}
114 			LOG.info("...Service Bus synchronization thread successfully started.");
115 		}
116 	}
117 	
118 	@Override
119 	public void destroy() throws Exception {
120 		LOG.info("Stopping the Service Bus...");
121 		stopSynchronizationThread();
122 		serviceRegistry.takeInstanceOffline(getInstanceId());
123 		LOG.info("...Service Bus successfully stopped.");
124 	}
125 	
126 	protected void stopSynchronizationThread() {
127 		synchronized (synchronizeLock) {
128 			// remove services from the bus
129 			if (this.registrySyncFuture != null) {
130 				if (!this.registrySyncFuture.cancel(false)) {
131 					LOG.warn("Failed to cancel registry sychronization.");
132 				}
133 				this.registrySyncFuture = null;
134 			}
135 		}
136 	}
137 
138 	@Override
139 	public String getInstanceId() {
140 		return this.instanceId;
141 	}
142 	
143 	public void setInstanceId(String instanceId) {
144 		this.instanceId = instanceId;
145 	}
146 	
147 	@Override
148 	public List<Endpoint> getEndpoints(QName serviceName) {
149 		if (serviceName == null) {
150 			throw new IllegalArgumentException("serviceName cannot be null");
151 		}
152 		List<Endpoint> endpoints = new ArrayList<Endpoint>();
153 		synchronized (serviceLock) {
154 			endpoints.addAll(getRemoteEndpoints(serviceName));
155 			Endpoint localEndpoint = getLocalEndpoint(serviceName);
156 			if (localEndpoint != null) {
157 				for (Iterator<Endpoint> iterator = endpoints.iterator(); iterator.hasNext();) {
158 					Endpoint endpoint = (Endpoint) iterator.next();
159 					if (localEndpoint.getServiceConfiguration().equals(endpoint.getServiceConfiguration())) {
160 						iterator.remove();
161 						break;
162 					}
163 				}
164 				// add at first position, just because we like the local endpoint the best, it's our friend ;)
165 				endpoints.add(0, localEndpoint);
166 			}
167 		}
168 		return Collections.unmodifiableList(endpoints);
169 	}
170 	
171 	@Override
172 	public List<Endpoint> getRemoteEndpoints(QName serviceName) {
173 		if (serviceName == null) {
174 			throw new IllegalArgumentException("serviceName cannot be null");
175 		}
176 		List<Endpoint> endpoints = new ArrayList<Endpoint>();
177 		synchronized (serviceLock) {
178 			Set<RemoteService> remoteServices = clientRegistryCache.get(serviceName);
179 			if (remoteServices != null) {
180 				for (RemoteService remoteService : remoteServices) {
181 					endpoints.add(remoteService.getEndpoint());
182 				}
183 			}
184 		}
185 		return Collections.unmodifiableList(endpoints);
186 	}
187 
188 	@Override
189 	public Endpoint getLocalEndpoint(QName serviceName) {
190 		if (serviceName == null) {
191 			throw new IllegalArgumentException("serviceName cannot be null");
192 		}
193 		synchronized (serviceLock) {
194 			LocalService localService = localServices.get(serviceName);
195 			if (localService != null) {
196 				return localService.getEndpoint();
197 			}
198 			return null;
199 		}
200 	}
201 
202 	@Override
203 	public Map<QName, Endpoint> getLocalEndpoints() {
204 		Map<QName, Endpoint> localEndpoints = new HashMap<QName, Endpoint>();
205 		synchronized (serviceLock) {
206 			for (QName localServiceName : localServices.keySet()) {
207 				LocalService localService = localServices.get(localServiceName);
208 				localEndpoints.put(localServiceName, localService.getEndpoint());
209 			}
210 		}
211 		return Collections.unmodifiableMap(localEndpoints);
212 	}
213 
214 	@Override
215 	public List<Endpoint> getAllEndpoints() {
216 		List<Endpoint> allEndpoints = new ArrayList<Endpoint>();
217 		synchronized (serviceLock) {
218 			for (QName serviceName : this.localServices.keySet()) {
219 				allEndpoints.add(this.localServices.get(serviceName).getEndpoint());
220 			}
221 			for (QName serviceName : this.clientRegistryCache.keySet()) {
222 				Set<RemoteService> remoteServices = clientRegistryCache.get(serviceName);
223 				for (RemoteService remoteService : remoteServices) {
224 					allEndpoints.add(remoteService.getEndpoint());
225 				}
226 			}
227 		}
228 		return Collections.unmodifiableList(allEndpoints);
229 	}
230 
231 	@Override
232 	public Endpoint getEndpoint(QName serviceName) {
233 		return getEndpoint(serviceName, null);
234 	}
235 	
236 	@Override
237     public Endpoint getEndpoint(QName serviceName, String applicationId) {
238         if (serviceName == null) {
239             throw new IllegalArgumentException("serviceName cannot be null");
240         }
241         Endpoint availableEndpoint = null;
242         synchronized (serviceLock) {
243             // look at local services first
244             availableEndpoint = getLocalEndpoint(serviceName);
245             if (availableEndpoint == null || (!StringUtils.isBlank(applicationId) && !availableEndpoint.getServiceConfiguration().getApplicationId().equals(applicationId))) {
246                  // TODO - would be better to return an Endpoint that contained an internal proxy to all the services so fail-over would be easier to implement!
247                 Set<RemoteService> remoteServices = clientRegistryCache.get(serviceName);
248                 remoteServices = filterByApplicationId(applicationId, remoteServices);
249                 if (remoteServices != null && !remoteServices.isEmpty()) {
250                     // TODO - this should also probably check the current status of the service?
251                     RemoteService[] remoteServiceArray = remoteServices.toArray(new RemoteService[0]);
252                     RemoteService availableRemoteService = remoteServiceArray[this.randomNumber.nextInt(remoteServiceArray.length)];
253                     availableEndpoint = availableRemoteService.getEndpoint();
254                 }
255             }
256         }
257         return availableEndpoint;
258     }
259 	
260 	protected Set<RemoteService> filterByApplicationId(String applicationId, Set<RemoteService> remoteServices) {
261 	    if (StringUtils.isBlank(applicationId) || remoteServices == null || remoteServices.isEmpty()) {
262 	        return remoteServices;
263 	    }
264 	    Set<RemoteService> filtered = new HashSet<RemoteService>();
265 	    for (RemoteService remoteService : remoteServices) {
266 	        if (remoteService.getServiceInfo().getApplicationId().equals(applicationId)) {
267 	            filtered.add(remoteService);
268 	        }
269 	    }
270 	    return filtered;
271 	}
272 	
273 	@Override
274 	public Endpoint getConfiguredEndpoint(ServiceConfiguration serviceConfiguration) {
275 		if (serviceConfiguration == null) {
276 			throw new IllegalArgumentException("serviceConfiguration cannot be null");
277 		}
278 		synchronized (serviceLock) {
279 			Endpoint localEndpoint = getLocalEndpoint(serviceConfiguration.getServiceName());
280 			if (localEndpoint != null && localEndpoint.getServiceConfiguration().equals(serviceConfiguration)) {
281 				return localEndpoint;
282 			}
283 			List<Endpoint> remoteEndpoints = getRemoteEndpoints(serviceConfiguration.getServiceName());
284 			for (Endpoint remoteEndpoint : remoteEndpoints) {
285 				if (remoteEndpoint.getServiceConfiguration().equals(serviceConfiguration)) {
286 					return remoteEndpoint;
287 				}
288 			}
289 		}
290 		return null;
291 	}
292 
293 	@Override
294     public Object getService(QName serviceName) {
295         return getService(serviceName, null);
296     }
297 	
298 	@Override
299 	public Object getService(QName serviceName, String applicationId) {
300 		Endpoint availableEndpoint = getEndpoint(serviceName, applicationId);
301 		if (availableEndpoint == null) {
302 			return null;
303 		}
304 		return availableEndpoint.getService();
305 	}
306 
307 	@Override
308 	public ServiceConfiguration publishService(ServiceDefinition serviceDefinition, boolean synchronize) {
309 		if (serviceDefinition == null) {
310 			throw new IllegalArgumentException("serviceDefinition cannot be null");
311 		}
312 		LocalService localService = new LocalService(getInstanceId(), serviceDefinition);
313 		synchronized (serviceLock) {
314 			serviceExportManager.exportService(serviceDefinition);
315 			localServices.put(serviceDefinition.getServiceName(), localService);
316 		}
317 		if (synchronize) {
318 			synchronize();
319 		}
320 		return localService.getEndpoint().getServiceConfiguration();
321 	}
322 
323 	@Override
324 	public List<ServiceConfiguration> publishServices(List<ServiceDefinition> serviceDefinitions, boolean synchronize) {
325 		if (serviceDefinitions == null) {
326 			throw new IllegalArgumentException("serviceDefinitions list cannot be null");
327 		}
328 		List<ServiceConfiguration> serviceConfigurations = new ArrayList<ServiceConfiguration>();
329 		synchronized (serviceLock) {
330 			for (ServiceDefinition serviceDefinition : serviceDefinitions) {
331 				ServiceConfiguration serviceConfiguration = publishService(serviceDefinition, false);
332 				serviceConfigurations.add(serviceConfiguration);
333 			}
334 		}
335 		if (synchronize) {
336 			synchronize();
337 		}
338 		return Collections.unmodifiableList(serviceConfigurations);
339 	}
340 
341 	@Override
342 	public boolean removeService(QName serviceName, boolean synchronize) {
343 		if (serviceName == null) {
344 			throw new IllegalArgumentException("serviceName cannot be null");
345 		}
346 		boolean serviceRemoved = false;
347 		synchronized (serviceLock) {
348 			LocalService localService = localServices.remove(serviceName);
349 			serviceRemoved = localService != null;
350 			serviceExportManager.removeService(serviceName);
351 		}
352 		if (serviceRemoved && synchronize) {
353 			synchronize();
354 		}
355 		return serviceRemoved;
356 	}
357 
358 	@Override
359 	public List<Boolean> removeServices(List<QName> serviceNames, boolean synchronize) {
360 		if (serviceNames == null) {
361 			throw new IllegalArgumentException("serviceNames cannot be null");
362 		}
363 		boolean serviceRemoved = false;
364 		List<Boolean> servicesRemoved = new ArrayList<Boolean>();
365 		synchronized (serviceLock) {
366 			for (QName serviceName : serviceNames) {
367 				serviceExportManager.removeService(serviceName);
368 				LocalService localService = localServices.remove(serviceName);
369 				if (localService != null) {
370 					servicesRemoved.add(Boolean.TRUE);
371 					serviceRemoved = true;
372 				} else {
373 					servicesRemoved.add(Boolean.FALSE);
374 				}
375 			}
376 		}
377 		if (serviceRemoved && synchronize) {
378 			synchronize();
379 		}
380 		return servicesRemoved;
381 	}
382 
383 	@Override
384 	public void synchronize() {
385 		if (!isDevMode() && isStarted()) {
386 			synchronized (synchronizeLock) {
387 				List<LocalService> localServicesList;
388 				List<RemoteService> clientRegistryCacheList;
389 				synchronized (serviceLock) {
390 					// first, flatten the lists
391 					localServicesList = new ArrayList<LocalService>(this.localServices.values());
392 					clientRegistryCacheList = new ArrayList<RemoteService>();
393 					for (Set<RemoteService> remoteServices : this.clientRegistryCache.values()) {
394 						clientRegistryCacheList.addAll(remoteServices);
395 					}
396 				}
397 				CompleteServiceDiff serviceDiff = diffCalculator.diffServices(getInstanceId(), localServicesList, clientRegistryCacheList);
398 			
399 				RemoteServicesDiff remoteServicesDiff = serviceDiff.getRemoteServicesDiff();
400 				processRemoteServiceDiff(remoteServicesDiff);
401 			
402 				LocalServicesDiff localServicesDiff = serviceDiff.getLocalServicesDiff();
403 				processLocalServiceDiff(localServicesDiff);
404 			}
405 		}
406 	}
407 		
408 	protected void processRemoteServiceDiff(RemoteServicesDiff remoteServicesDiff) {
409 		// note that since there is a gap between when the original services are acquired, the diff, and this subsequent critical section
410 		// the list of local and client registry services could have changed, so that needs to be considered in the remaining code
411 		synchronized (serviceLock) {
412 			// first, let's update what we know about the remote services
413 			List<RemoteService> removedServices = remoteServicesDiff.getRemovedServices();
414 			for (RemoteService removedRemoteService : removedServices) {
415 				Set<RemoteService> remoteServiceSet = this.clientRegistryCache.get(removedRemoteService.getServiceName());
416 				if (remoteServiceSet != null) {
417 					boolean wasRemoved = remoteServiceSet.remove(removedRemoteService);
418 					if (!wasRemoved) {
419 						LOG.warn("Failed to remove remoteService during synchronization: " + removedRemoteService);
420 					}
421 				}
422 			}
423 			List<ServiceInfo> newServices = remoteServicesDiff.getNewServices();
424 			for (ServiceInfo newService : newServices) {
425 				Set<RemoteService> remoteServiceSet = clientRegistryCache.get(newService.getServiceName());
426 				if (remoteServiceSet == null) {
427 					remoteServiceSet = new HashSet<RemoteService>();
428 					clientRegistryCache.put(newService.getServiceName(), remoteServiceSet);
429 				}
430 				remoteServiceSet.add(new RemoteService(newService, this.serviceRegistry));
431 			}
432 		}
433 	}
434 	
435 	protected void processLocalServiceDiff(LocalServicesDiff localServicesDiff) {
436 		List<String> removeServiceEndpointIds = new ArrayList<String>();
437 		List<ServiceEndpoint> publishServiceEndpoints = new ArrayList<ServiceEndpoint>();
438 		for (ServiceInfo serviceToRemove : localServicesDiff.getServicesToRemoveFromRegistry()) {
439 			removeServiceEndpointIds.add(serviceToRemove.getServiceId());
440 		}
441 		for (LocalService localService : localServicesDiff.getLocalServicesToPublish()) {
442 			publishServiceEndpoints.add(localService.getServiceEndpoint());
443 		}
444 		for (LocalService localService : localServicesDiff.getLocalServicesToUpdate().keySet()) {
445 			ServiceInfo registryServiceInfo = localServicesDiff.getLocalServicesToUpdate().get(localService);
446 			publishServiceEndpoints.add(rebuildServiceEndpointForUpdate(localService.getServiceEndpoint(), registryServiceInfo));
447 		}
448 		boolean batchMode = ConfigContext.getCurrentContextConfig().getBooleanProperty(Config.BATCH_MODE, false);
449 		if (!batchMode && (!removeServiceEndpointIds.isEmpty() || !publishServiceEndpoints.isEmpty())) {
450 			RemoveAndPublishResult result = this.serviceRegistry.removeAndPublish(removeServiceEndpointIds, publishServiceEndpoints);
451 			// now update the ServiceEndpoints for our local services so we can get the proper id for them
452 			if (!result.getServicesPublished().isEmpty()) {
453 				synchronized (serviceLock) {
454 					for (ServiceEndpoint publishedService : result.getServicesPublished()) {
455 						rebuildLocalServiceEndpointAfterPublishing(publishedService);
456 					}
457 				}
458 			}
459 		}
460 	}
461 	
462 	protected ServiceEndpoint rebuildServiceEndpointForUpdate(ServiceEndpoint originalEndpoint, ServiceInfo registryServiceInfo) {
463 		ServiceEndpoint.Builder builder = ServiceEndpoint.Builder.create(originalEndpoint);
464 		builder.getInfo().setServiceId(registryServiceInfo.getServiceId());
465 		builder.getInfo().setServiceDescriptorId(registryServiceInfo.getServiceDescriptorId());
466 		builder.getInfo().setVersionNumber(registryServiceInfo.getVersionNumber());
467 		builder.getDescriptor().setId(registryServiceInfo.getServiceDescriptorId());
468 		return builder.build();
469 	}
470 	
471 	protected void rebuildLocalServiceEndpointAfterPublishing(ServiceEndpoint publishedService) {
472 		// verify the service is still published
473 		QName serviceName = publishedService.getInfo().getServiceName();
474 		if (localServices.containsKey(serviceName)) {
475 			LocalService newLocalService = new LocalService(localServices.get(serviceName), publishedService);
476 			localServices.put(serviceName, newLocalService);
477 		}
478 	}
479 
480 	public void setServiceRegistry(ServiceRegistry serviceRegistry) {
481 		this.serviceRegistry = serviceRegistry;
482 	}
483 	
484 	public void setDiffCalculator(ServiceRegistryDiffCalculator diffCalculator) {
485 		this.diffCalculator = diffCalculator;
486 	}
487 	
488 	public void setServiceExportManager(ServiceExportManager serviceExportManager) {
489 		this.serviceExportManager = serviceExportManager;
490 	}
491 	
492 	public void setScheduledPool(KSBScheduledPool scheduledPool) {
493 		this.scheduledPool = scheduledPool;
494 	}
495 	
496 }