View Javadoc

1   /**
2    * Copyright 2005-2013 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.ksb.impl.bus;
17  
18  import java.util.ArrayList;
19  import java.util.Collections;
20  import java.util.HashMap;
21  import java.util.HashSet;
22  import java.util.Iterator;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Random;
26  import java.util.Set;
27  import java.util.concurrent.ScheduledFuture;
28  import java.util.concurrent.TimeUnit;
29  
30  import javax.xml.namespace.QName;
31  
32  import org.apache.commons.lang.StringUtils;
33  import org.apache.log4j.Logger;
34  import org.kuali.rice.core.api.config.property.Config;
35  import org.kuali.rice.core.api.config.property.ConfigContext;
36  import org.kuali.rice.core.api.lifecycle.BaseLifecycle;
37  import org.kuali.rice.ksb.api.bus.Endpoint;
38  import org.kuali.rice.ksb.api.bus.ServiceBus;
39  import org.kuali.rice.ksb.api.bus.ServiceConfiguration;
40  import org.kuali.rice.ksb.api.bus.ServiceDefinition;
41  import org.kuali.rice.ksb.api.registry.RemoveAndPublishResult;
42  import org.kuali.rice.ksb.api.registry.ServiceEndpoint;
43  import org.kuali.rice.ksb.api.registry.ServiceInfo;
44  import org.kuali.rice.ksb.api.registry.ServiceRegistry;
45  import org.kuali.rice.ksb.impl.bus.diff.CompleteServiceDiff;
46  import org.kuali.rice.ksb.impl.bus.diff.LocalServicesDiff;
47  import org.kuali.rice.ksb.impl.bus.diff.RemoteServicesDiff;
48  import org.kuali.rice.ksb.impl.bus.diff.ServiceRegistryDiffCalculator;
49  import org.kuali.rice.ksb.messaging.serviceexporters.ServiceExportManager;
50  import org.kuali.rice.ksb.messaging.threadpool.KSBScheduledPool;
51  import org.springframework.beans.factory.DisposableBean;
52  import org.springframework.beans.factory.InitializingBean;
53  
54  public class ServiceBusImpl extends BaseLifecycle implements ServiceBus, InitializingBean, DisposableBean {
55  	
56  	private static final Logger LOG = Logger.getLogger(ServiceBusImpl.class);
57  	
58  	private final Object serviceLock = new Object();
59  	private final Object synchronizeLock = new Object();
60  	private final Random randomNumber = new Random();
61  	
62  	// injected values
63  	private String instanceId;
64  	private ServiceRegistry serviceRegistry;
65  	private ServiceRegistryDiffCalculator diffCalculator;
66  	private ServiceExportManager serviceExportManager;
67  	private KSBScheduledPool scheduledPool;
68  	
69  	private ScheduledFuture<?> registrySyncFuture;
70  	
71  	/**
72  	 * Contains endpoints for services which were published by this client application.
73  	 */
74  	private final Map<QName, LocalService> localServices;
75  	
76  	/**
77  	 * Contains endpoints for services which exist remotely.  This list may not be
78  	 * entirely complete as entries get lazily loaded into it as services are requested.
79  	 */
80  	private final Map<QName, Set<RemoteService>> clientRegistryCache;
81  		
82  	public ServiceBusImpl() {
83  		this.localServices = new HashMap<QName, LocalService>();
84  		this.clientRegistryCache = new HashMap<QName, Set<RemoteService>>();
85  	}
86  	
87  	@Override
88  	public void afterPropertiesSet() throws Exception {
89  		if (StringUtils.isBlank(instanceId)) {
90  			throw new IllegalStateException("a valid instanceId was not injected");
91  		}
92  		if (serviceRegistry == null) {
93  			throw new IllegalStateException("serviceRegistry was not injected");
94  		}
95  		if (diffCalculator == null) {
96  			throw new IllegalStateException("diffCalculator was not injected");
97  		}
98  		if (scheduledPool == null) {
99  			throw new IllegalStateException("scheduledPool was not injected");
100 		}
101 	}
102 	
103 	@Override
104 	public void start() throws Exception {
105 		startSynchronizationThread();
106 		super.start();
107 	}
108 		
109 	protected boolean isDevMode() {
110 		return ConfigContext.getCurrentContextConfig().getDevMode();
111 	}
112 
113 	protected void startSynchronizationThread() {
114 		synchronized (synchronizeLock) {
115 			LOG.info("Starting Service Bus synchronization thread...");
116 			if (!isDevMode()) {
117 				int refreshRate = ConfigContext.getCurrentContextConfig().getRefreshRate();
118 				Runnable runnable = new Runnable() {
119 					public void run() {
120 						try {
121 							synchronize();
122 						} catch (Throwable t) {
123 							LOG.error("Failed to execute background service bus synchronization.", t);
124 						}
125 					}
126 				};
127 				this.registrySyncFuture = scheduledPool.scheduleWithFixedDelay(runnable, 30, refreshRate, TimeUnit.SECONDS);
128 			}
129 			LOG.info("...Service Bus synchronization thread successfully started.");
130 		}
131 	}
132 	
133 	@Override
134 	public void destroy() throws Exception {
135 		LOG.info("Stopping the Service Bus...");
136 		stopSynchronizationThread();
137 		serviceRegistry.takeInstanceOffline(getInstanceId());
138 		LOG.info("...Service Bus successfully stopped.");
139 	}
140 	
141 	protected void stopSynchronizationThread() {
142 		synchronized (synchronizeLock) {
143 			// remove services from the bus
144 			if (this.registrySyncFuture != null) {
145 				if (!this.registrySyncFuture.cancel(false)) {
146 					LOG.warn("Failed to cancel registry sychronization.");
147 				}
148 				this.registrySyncFuture = null;
149 			}
150 		}
151 	}
152 
153 	@Override
154 	public String getInstanceId() {
155 		return this.instanceId;
156 	}
157 	
158 	public void setInstanceId(String instanceId) {
159 		this.instanceId = instanceId;
160 	}
161 	
162 	@Override
163 	public List<Endpoint> getEndpoints(QName serviceName) {
164 		return getEndpoints(serviceName, null);
165 	}
166 	
167 	@Override
168 	public List<Endpoint> getEndpoints(QName serviceName, String applicationId) {
169 		if (serviceName == null) {
170 			throw new IllegalArgumentException("serviceName cannot be null");
171 		}
172 		List<Endpoint> endpoints = new ArrayList<Endpoint>();
173 		synchronized (serviceLock) {
174 			endpoints.addAll(getRemoteEndpoints(serviceName));
175 			Endpoint localEndpoint = getLocalEndpoint(serviceName);
176 			if (localEndpoint != null) {
177 				for (Iterator<Endpoint> iterator = endpoints.iterator(); iterator.hasNext();) {
178 					Endpoint endpoint = (Endpoint) iterator.next();
179 					if (localEndpoint.getServiceConfiguration().equals(endpoint.getServiceConfiguration())) {
180 						iterator.remove();
181 						break;
182 					}
183 				}
184 				if(StringUtils.isBlank(applicationId) || StringUtils.equals(localEndpoint.getServiceConfiguration().getApplicationId(), applicationId)) {
185 					// add at first position, just because we like the local endpoint the best, it's our friend ;)
186 					endpoints.add(0, localEndpoint);
187 				}
188 			}
189 			if(StringUtils.isNotBlank(applicationId)) {
190 				for (Iterator<Endpoint> iterator = endpoints.iterator(); iterator.hasNext();) {
191 					Endpoint endpoint = (Endpoint) iterator.next();
192 					if(!StringUtils.equals(endpoint.getServiceConfiguration().getApplicationId(), applicationId)) {
193 						iterator.remove();
194 					}
195 				}
196 			}
197 		}
198 		return Collections.unmodifiableList(endpoints);
199 	}
200 	
201 	@Override
202 	public List<Endpoint> getRemoteEndpoints(QName serviceName) {
203 		if (serviceName == null) {
204 			throw new IllegalArgumentException("serviceName cannot be null");
205 		}
206 		List<Endpoint> endpoints = new ArrayList<Endpoint>();
207 		synchronized (serviceLock) {
208 			Set<RemoteService> remoteServices = clientRegistryCache.get(serviceName);
209 			if (remoteServices != null) {
210 				for (RemoteService remoteService : remoteServices) {
211 					endpoints.add(remoteService.getEndpoint());
212 				}
213 			}
214 		}
215 		return Collections.unmodifiableList(endpoints);
216 	}
217 
218 	@Override
219 	public Endpoint getLocalEndpoint(QName serviceName) {
220 		if (serviceName == null) {
221 			throw new IllegalArgumentException("serviceName cannot be null");
222 		}
223 		synchronized (serviceLock) {
224 			LocalService localService = localServices.get(serviceName);
225 			if (localService != null) {
226 				return localService.getEndpoint();
227 			}
228 			return null;
229 		}
230 	}
231 
232 	@Override
233 	public Map<QName, Endpoint> getLocalEndpoints() {
234 		Map<QName, Endpoint> localEndpoints = new HashMap<QName, Endpoint>();
235 		synchronized (serviceLock) {
236 			for (QName localServiceName : localServices.keySet()) {
237 				LocalService localService = localServices.get(localServiceName);
238 				localEndpoints.put(localServiceName, localService.getEndpoint());
239 			}
240 		}
241 		return Collections.unmodifiableMap(localEndpoints);
242 	}
243 
244 	@Override
245 	public List<Endpoint> getAllEndpoints() {
246 		List<Endpoint> allEndpoints = new ArrayList<Endpoint>();
247 		synchronized (serviceLock) {
248 			for (QName serviceName : this.localServices.keySet()) {
249 				allEndpoints.add(this.localServices.get(serviceName).getEndpoint());
250 			}
251 			for (QName serviceName : this.clientRegistryCache.keySet()) {
252 				Set<RemoteService> remoteServices = clientRegistryCache.get(serviceName);
253 				for (RemoteService remoteService : remoteServices) {
254 					allEndpoints.add(remoteService.getEndpoint());
255 				}
256 			}
257 		}
258 		return Collections.unmodifiableList(allEndpoints);
259 	}
260 
261 	@Override
262 	public Endpoint getEndpoint(QName serviceName) {
263 		return getEndpoint(serviceName, null);
264 	}
265 	
266 	@Override
267     public Endpoint getEndpoint(QName serviceName, String applicationId) {
268         if (serviceName == null) {
269             throw new IllegalArgumentException("serviceName cannot be null");
270         }
271         Endpoint availableEndpoint = null;
272         synchronized (serviceLock) {
273             // look at local services first
274             availableEndpoint = getLocalEndpoint(serviceName);
275             if (availableEndpoint == null || (!StringUtils.isBlank(applicationId) && !availableEndpoint.getServiceConfiguration().getApplicationId().equals(applicationId))) {
276                  // TODO - would be better to return an Endpoint that contained an internal proxy to all the services so fail-over would be easier to implement!
277                 Set<RemoteService> remoteServices = clientRegistryCache.get(serviceName);
278                 remoteServices = filterByApplicationId(applicationId, remoteServices);
279                 if (remoteServices != null && !remoteServices.isEmpty()) {
280                     // TODO - this should also probably check the current status of the service?
281                     RemoteService[] remoteServiceArray = remoteServices.toArray(new RemoteService[0]);
282                     RemoteService availableRemoteService = remoteServiceArray[this.randomNumber.nextInt(remoteServiceArray.length)];
283                     availableEndpoint = availableRemoteService.getEndpoint();
284                 }
285             }
286         }
287         return availableEndpoint;
288     }
289 	
290 	protected Set<RemoteService> filterByApplicationId(String applicationId, Set<RemoteService> remoteServices) {
291 	    if (StringUtils.isBlank(applicationId) || remoteServices == null || remoteServices.isEmpty()) {
292 	        return remoteServices;
293 	    }
294 	    Set<RemoteService> filtered = new HashSet<RemoteService>();
295 	    for (RemoteService remoteService : remoteServices) {
296 	        if (remoteService.getServiceInfo().getApplicationId().equals(applicationId)) {
297 	            filtered.add(remoteService);
298 	        }
299 	    }
300 	    return filtered;
301 	}
302 	
303 	@Override
304 	public Endpoint getConfiguredEndpoint(ServiceConfiguration serviceConfiguration) {
305 		if (serviceConfiguration == null) {
306 			throw new IllegalArgumentException("serviceConfiguration cannot be null");
307 		}
308 		synchronized (serviceLock) {
309 			Endpoint localEndpoint = getLocalEndpoint(serviceConfiguration.getServiceName());
310 			if (localEndpoint != null && localEndpoint.getServiceConfiguration().equals(serviceConfiguration)) {
311 				return localEndpoint;
312 			}
313 			List<Endpoint> remoteEndpoints = getRemoteEndpoints(serviceConfiguration.getServiceName());
314 			for (Endpoint remoteEndpoint : remoteEndpoints) {
315 				if (remoteEndpoint.getServiceConfiguration().equals(serviceConfiguration)) {
316 					return remoteEndpoint;
317 				}
318 			}
319 		}
320 		return null;
321 	}
322 
323 	@Override
324     public Object getService(QName serviceName) {
325         return getService(serviceName, null);
326     }
327 	
328 	@Override
329 	public Object getService(QName serviceName, String applicationId) {
330 		Endpoint availableEndpoint = getEndpoint(serviceName, applicationId);
331 		if (availableEndpoint == null) {
332 			return null;
333 		}
334 		return availableEndpoint.getService();
335 	}
336 
337 	@Override
338 	public ServiceConfiguration publishService(ServiceDefinition serviceDefinition, boolean synchronize) {
339 		if (serviceDefinition == null) {
340 			throw new IllegalArgumentException("serviceDefinition cannot be null");
341 		}
342 		LocalService localService = new LocalService(getInstanceId(), serviceDefinition);
343 		synchronized (serviceLock) {
344 			serviceExportManager.exportService(serviceDefinition);
345 			localServices.put(serviceDefinition.getServiceName(), localService);
346 		}
347 		if (synchronize) {
348 			synchronize();
349 		}
350 		return localService.getEndpoint().getServiceConfiguration();
351 	}
352 
353 	@Override
354 	public List<ServiceConfiguration> publishServices(List<ServiceDefinition> serviceDefinitions, boolean synchronize) {
355 		if (serviceDefinitions == null) {
356 			throw new IllegalArgumentException("serviceDefinitions list cannot be null");
357 		}
358 		List<ServiceConfiguration> serviceConfigurations = new ArrayList<ServiceConfiguration>();
359 		synchronized (serviceLock) {
360 			for (ServiceDefinition serviceDefinition : serviceDefinitions) {
361 				ServiceConfiguration serviceConfiguration = publishService(serviceDefinition, false);
362 				serviceConfigurations.add(serviceConfiguration);
363 			}
364 		}
365 		if (synchronize) {
366 			synchronize();
367 		}
368 		return Collections.unmodifiableList(serviceConfigurations);
369 	}
370 
371 	@Override
372 	public boolean removeService(QName serviceName, boolean synchronize) {
373 		if (serviceName == null) {
374 			throw new IllegalArgumentException("serviceName cannot be null");
375 		}
376 		boolean serviceRemoved = false;
377 		synchronized (serviceLock) {
378 			LocalService localService = localServices.remove(serviceName);
379 			serviceRemoved = localService != null;
380 			serviceExportManager.removeService(serviceName);
381 		}
382 		if (serviceRemoved && synchronize) {
383 			synchronize();
384 		}
385 		return serviceRemoved;
386 	}
387 
388 	@Override
389 	public List<Boolean> removeServices(List<QName> serviceNames, boolean synchronize) {
390 		if (serviceNames == null) {
391 			throw new IllegalArgumentException("serviceNames cannot be null");
392 		}
393 		boolean serviceRemoved = false;
394 		List<Boolean> servicesRemoved = new ArrayList<Boolean>();
395 		synchronized (serviceLock) {
396 			for (QName serviceName : serviceNames) {
397 				serviceExportManager.removeService(serviceName);
398 				LocalService localService = localServices.remove(serviceName);
399 				if (localService != null) {
400 					servicesRemoved.add(Boolean.TRUE);
401 					serviceRemoved = true;
402 				} else {
403 					servicesRemoved.add(Boolean.FALSE);
404 				}
405 			}
406 		}
407 		if (serviceRemoved && synchronize) {
408 			synchronize();
409 		}
410 		return servicesRemoved;
411 	}
412 
413     protected void synchronizeAndProcess(SyncProcessor processor) {
414         if (!isDevMode()) {
415 			synchronized (synchronizeLock) {
416 				List<LocalService> localServicesList;
417 				List<RemoteService> clientRegistryCacheList;
418 				synchronized (serviceLock) {
419 					// first, flatten the lists
420 					localServicesList = new ArrayList<LocalService>(this.localServices.values());
421 					clientRegistryCacheList = new ArrayList<RemoteService>();
422 					for (Set<RemoteService> remoteServices : this.clientRegistryCache.values()) {
423 						clientRegistryCacheList.addAll(remoteServices);
424 					}
425 				}
426 				CompleteServiceDiff serviceDiff = diffCalculator.diffServices(getInstanceId(), localServicesList, clientRegistryCacheList);
427                 logCompleteServiceDiff(serviceDiff);
428                 processor.sync(serviceDiff);
429             }
430         }
431     }
432 
433 	@Override
434 	public void synchronize() {
435         synchronizeAndProcess(new SyncProcessor() {
436             @Override
437             public void sync(CompleteServiceDiff diff) {
438                 RemoteServicesDiff remoteServicesDiff = diff.getRemoteServicesDiff();
439 				processRemoteServiceDiff(remoteServicesDiff);
440 				LocalServicesDiff localServicesDiff = diff.getLocalServicesDiff();
441 				processLocalServiceDiff(localServicesDiff);
442             }
443         });
444 	}
445 
446     @Override
447 	public void synchronizeRemoteServices() {
448         synchronizeAndProcess(new SyncProcessor() {
449             @Override
450             public void sync(CompleteServiceDiff diff) {
451                 RemoteServicesDiff remoteServicesDiff = diff.getRemoteServicesDiff();
452 				processRemoteServiceDiff(remoteServicesDiff);
453             }
454         });
455 	}
456 
457     @Override
458     public void synchronizeLocalServices() {
459         synchronizeAndProcess(new SyncProcessor() {
460             @Override
461             public void sync(CompleteServiceDiff diff) {
462                 LocalServicesDiff localServicesDiff = diff.getLocalServicesDiff();
463                 processLocalServiceDiff(localServicesDiff);
464             }
465         });
466     }
467 
468     protected void logCompleteServiceDiff(CompleteServiceDiff serviceDiff) {
469         RemoteServicesDiff remoteServicesDiff = serviceDiff.getRemoteServicesDiff();
470         int newServices = remoteServicesDiff.getNewServices().size();
471         int removedServices = remoteServicesDiff.getRemovedServices().size();
472 
473         LocalServicesDiff localServicesDiff = serviceDiff.getLocalServicesDiff();
474         int servicesToPublish = localServicesDiff.getLocalServicesToPublish().size();
475         int servicesToUpdate = localServicesDiff.getLocalServicesToUpdate().size();
476         int servicesToRemove = localServicesDiff.getServicesToRemoveFromRegistry().size();
477 
478         if (newServices + removedServices + servicesToPublish + servicesToUpdate + servicesToRemove > 0) {
479             LOG.info("Found service changes during synchronization: remoteNewServices=" + newServices +
480                     ", remoteRemovedServices=" + removedServices +
481                     ", localServicesToPublish=" + servicesToPublish +
482                     ", localServicesToUpdate=" + servicesToUpdate +
483                     ", localServicesToRemove=" + servicesToRemove);
484         }
485     }
486 		
487 	protected void processRemoteServiceDiff(RemoteServicesDiff remoteServicesDiff) {
488 		// note that since there is a gap between when the original services are acquired, the diff, and this subsequent critical section
489 		// the list of local and client registry services could have changed, so that needs to be considered in the remaining code
490 		synchronized (serviceLock) {
491 			// first, let's update what we know about the remote services
492 			List<RemoteService> removedServices = remoteServicesDiff.getRemovedServices();
493 			for (RemoteService removedRemoteService : removedServices) {
494 				Set<RemoteService> remoteServiceSet = this.clientRegistryCache.get(removedRemoteService.getServiceName());
495 				if (remoteServiceSet != null) {
496 					boolean wasRemoved = remoteServiceSet.remove(removedRemoteService);
497 					if (!wasRemoved) {
498 						LOG.warn("Failed to remove remoteService during synchronization: " + removedRemoteService);
499 					}
500 				}
501 			}
502 			List<ServiceInfo> newServices = remoteServicesDiff.getNewServices();
503 			for (ServiceInfo newService : newServices) {
504 				Set<RemoteService> remoteServiceSet = clientRegistryCache.get(newService.getServiceName());
505 				if (remoteServiceSet == null) {
506 					remoteServiceSet = new HashSet<RemoteService>();
507 					clientRegistryCache.put(newService.getServiceName(), remoteServiceSet);
508 				}
509 				remoteServiceSet.add(new RemoteService(newService, this.serviceRegistry));
510 			}
511 		}
512 	}
513 	
514 	protected void processLocalServiceDiff(LocalServicesDiff localServicesDiff) {
515 		List<String> removeServiceEndpointIds = new ArrayList<String>();
516 		List<ServiceEndpoint> publishServiceEndpoints = new ArrayList<ServiceEndpoint>();
517 		for (ServiceInfo serviceToRemove : localServicesDiff.getServicesToRemoveFromRegistry()) {
518 			removeServiceEndpointIds.add(serviceToRemove.getServiceId());
519 		}
520 		for (LocalService localService : localServicesDiff.getLocalServicesToPublish()) {
521 			publishServiceEndpoints.add(localService.getServiceEndpoint());
522 		}
523 		for (LocalService localService : localServicesDiff.getLocalServicesToUpdate().keySet()) {
524 			ServiceInfo registryServiceInfo = localServicesDiff.getLocalServicesToUpdate().get(localService);
525 			publishServiceEndpoints.add(rebuildServiceEndpointForUpdate(localService.getServiceEndpoint(), registryServiceInfo));
526 		}
527 		boolean batchMode = ConfigContext.getCurrentContextConfig().getBooleanProperty(Config.BATCH_MODE, false);
528 		if (!batchMode && (!removeServiceEndpointIds.isEmpty() || !publishServiceEndpoints.isEmpty())) {
529 			RemoveAndPublishResult result = this.serviceRegistry.removeAndPublish(removeServiceEndpointIds, publishServiceEndpoints);
530 			// now update the ServiceEndpoints for our local services so we can get the proper id for them
531 			if (!result.getServicesPublished().isEmpty()) {
532 				synchronized (serviceLock) {
533 					for (ServiceEndpoint publishedService : result.getServicesPublished()) {
534 						rebuildLocalServiceEndpointAfterPublishing(publishedService);
535 					}
536 				}
537 			}
538 		}
539 	}
540 	
541 	protected ServiceEndpoint rebuildServiceEndpointForUpdate(ServiceEndpoint originalEndpoint, ServiceInfo registryServiceInfo) {
542 		ServiceEndpoint.Builder builder = ServiceEndpoint.Builder.create(originalEndpoint);
543 		builder.getInfo().setServiceId(registryServiceInfo.getServiceId());
544 		builder.getInfo().setServiceDescriptorId(registryServiceInfo.getServiceDescriptorId());
545 		builder.getInfo().setVersionNumber(registryServiceInfo.getVersionNumber());
546 		builder.getDescriptor().setId(registryServiceInfo.getServiceDescriptorId());
547 		return builder.build();
548 	}
549 	
550 	protected void rebuildLocalServiceEndpointAfterPublishing(ServiceEndpoint publishedService) {
551 		// verify the service is still published
552 		QName serviceName = publishedService.getInfo().getServiceName();
553 		if (localServices.containsKey(serviceName)) {
554 			LocalService newLocalService = new LocalService(localServices.get(serviceName), publishedService);
555 			localServices.put(serviceName, newLocalService);
556 		}
557 	}
558 
559 	public void setServiceRegistry(ServiceRegistry serviceRegistry) {
560 		this.serviceRegistry = serviceRegistry;
561 	}
562 	
563 	public void setDiffCalculator(ServiceRegistryDiffCalculator diffCalculator) {
564 		this.diffCalculator = diffCalculator;
565 	}
566 	
567 	public void setServiceExportManager(ServiceExportManager serviceExportManager) {
568 		this.serviceExportManager = serviceExportManager;
569 	}
570 	
571 	public void setScheduledPool(KSBScheduledPool scheduledPool) {
572 		this.scheduledPool = scheduledPool;
573 	}
574 
575     private static interface SyncProcessor {
576         void sync(CompleteServiceDiff diff);
577     }
578 	
579 }