1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
package org.kuali.rice.ksb.messaging; |
18 | |
|
19 | |
import org.apache.commons.lang.StringUtils; |
20 | |
import org.apache.log4j.Logger; |
21 | |
import org.kuali.rice.core.api.config.property.ConfigContext; |
22 | |
import org.kuali.rice.core.api.exception.RiceRemoteServiceConnectionException; |
23 | |
import org.kuali.rice.core.api.exception.RiceRuntimeException; |
24 | |
import org.kuali.rice.core.api.reflect.ObjectDefinition; |
25 | |
import org.kuali.rice.core.api.resourceloader.GlobalResourceLoader; |
26 | |
import org.kuali.rice.core.api.resourceloader.GlobalResourceLoader; |
27 | |
import org.kuali.rice.core.api.resourceloader.ResourceLoaderContainer; |
28 | |
import org.kuali.rice.ksb.messaging.exceptionhandling.DefaultMessageExceptionHandler; |
29 | |
import org.kuali.rice.ksb.messaging.exceptionhandling.MessageExceptionHandler; |
30 | |
import org.kuali.rice.ksb.messaging.objectremoting.ObjectRemoterService; |
31 | |
import org.kuali.rice.ksb.messaging.objectremoting.RemoteObjectCleanup; |
32 | |
import org.kuali.rice.ksb.messaging.serviceconnectors.ServiceConnectorFactory; |
33 | |
import org.kuali.rice.ksb.service.KSBServiceLocator; |
34 | |
import org.kuali.rice.ksb.util.KSBConstants; |
35 | |
import org.springframework.transaction.support.TransactionSynchronizationManager; |
36 | |
|
37 | |
import javax.xml.namespace.QName; |
38 | |
import java.util.ArrayList; |
39 | |
import java.util.Collections; |
40 | |
import java.util.HashMap; |
41 | |
import java.util.Iterator; |
42 | |
import java.util.List; |
43 | |
import java.util.Map; |
44 | |
import java.util.Random; |
45 | |
import java.util.concurrent.ScheduledFuture; |
46 | |
import java.util.concurrent.TimeUnit; |
47 | |
import java.util.regex.Matcher; |
48 | |
import java.util.regex.Pattern; |
49 | |
|
50 | |
public class RemoteResourceServiceLocatorImpl extends ResourceLoaderContainer implements Runnable, RemoteResourceServiceLocator { |
51 | |
|
52 | 0 | private static final Logger LOG = Logger.getLogger(RemoteResourceServiceLocatorImpl.class); |
53 | |
|
54 | 0 | private Random randomNumber = new Random(); |
55 | |
|
56 | |
private boolean started; |
57 | |
|
58 | |
private ScheduledFuture future; |
59 | |
|
60 | 0 | private Map<QName, List<RemotedServiceHolder>> clients = Collections.synchronizedMap(new HashMap<QName, List<RemotedServiceHolder>>()); |
61 | |
|
62 | |
|
63 | |
|
64 | 0 | private Object clientsMutex = new Object(); |
65 | |
|
66 | |
public RemoteResourceServiceLocatorImpl(QName name) { |
67 | 0 | super(name); |
68 | 0 | } |
69 | |
|
70 | |
public void removeService(ServiceInfo serviceInfo) { |
71 | 0 | QName serviceName = serviceInfo.getQname(); |
72 | 0 | if ( LOG.isInfoEnabled() ) { |
73 | 0 | LOG.info("Removing service '" + serviceName + "'..."); |
74 | |
} |
75 | 0 | List<RemotedServiceHolder> clientProxies = this.getClients().get(serviceName); |
76 | |
|
77 | |
|
78 | 0 | if (clientProxies != null) { |
79 | 0 | boolean removed = removeServiceFromCollection(serviceInfo, clientProxies); |
80 | 0 | if (!removed) { |
81 | 0 | if ( LOG.isInfoEnabled() ) { |
82 | 0 | LOG.info("There was no client proxy removed for the given service: " + serviceName); |
83 | |
} |
84 | |
} |
85 | 0 | if (clientProxies.isEmpty()) { |
86 | 0 | List<RemotedServiceHolder> removedList = this.getClients().remove(serviceName); |
87 | 0 | if (removedList.isEmpty()) { |
88 | 0 | LOG.warn("No client proxy was removed for the given service " + serviceName); |
89 | |
} |
90 | |
} |
91 | |
} |
92 | 0 | } |
93 | |
|
94 | |
|
95 | |
|
96 | |
|
97 | |
|
98 | |
|
99 | |
|
100 | |
|
101 | |
|
102 | |
|
103 | |
|
104 | |
private boolean removeServiceFromCollection(ServiceInfo serviceInfo, List<RemotedServiceHolder> serviceList) { |
105 | 0 | List<ServiceHolder> servicesToRemove = new ArrayList<ServiceHolder>(); |
106 | |
|
107 | 0 | for (ServiceHolder remotedServiceHolder : serviceList) { |
108 | |
try { |
109 | 0 | if (remotedServiceHolder.getServiceInfo().getEndpointUrl().equals(serviceInfo.getEndpointUrl())) { |
110 | 0 | servicesToRemove.add(remotedServiceHolder); |
111 | |
} |
112 | 0 | } catch (Exception e) { |
113 | 0 | LOG.warn("An exception was thrown when attempting to compare endpoint URLs", e); |
114 | 0 | } |
115 | |
} |
116 | 0 | if (! servicesToRemove.isEmpty()) { |
117 | 0 | for (ServiceHolder serviceToRemove : servicesToRemove) { |
118 | 0 | serviceToRemove.getServiceInfo().setAlive(false); |
119 | 0 | List<ServiceInfo> serviceInfos = new ArrayList<ServiceInfo>(); |
120 | 0 | serviceInfos.add(serviceToRemove.getServiceInfo()); |
121 | 0 | KSBServiceLocator.getServiceRegistry().markServicesDead(serviceInfos); |
122 | 0 | } |
123 | 0 | return serviceList.removeAll(servicesToRemove); |
124 | |
} |
125 | 0 | return false; |
126 | |
} |
127 | |
|
128 | |
|
129 | |
|
130 | |
|
131 | |
|
132 | |
public Object getService(QName serviceName) { |
133 | 0 | if ( LOG.isDebugEnabled() ) { |
134 | 0 | LOG.debug("ResourceLoader " + getName() + " fetching service " + serviceName); |
135 | |
} |
136 | |
|
137 | |
|
138 | 0 | RemotedServiceRegistry remoteRegistry = KSBServiceLocator.getServiceDeployer(); |
139 | 0 | Object service = remoteRegistry.getLocalService(serviceName); |
140 | 0 | if (service != null) { |
141 | 0 | return service; |
142 | |
} |
143 | |
|
144 | 0 | List<RemotedServiceHolder> clientProxies = getAllServices(serviceName); |
145 | 0 | if (clientProxies == null || clientProxies.isEmpty()) { |
146 | 0 | return null; |
147 | |
} |
148 | |
|
149 | 0 | ServiceHolder serviceHolder = getRemotedServiceHolderFromList(clientProxies); |
150 | |
try { |
151 | 0 | service = serviceHolder.getService(); |
152 | 0 | } catch (Exception e) { |
153 | 0 | LOG.error("Caught exception getting service " + serviceName); |
154 | 0 | this.removeService(serviceHolder.getServiceInfo()); |
155 | 0 | return getService(serviceName); |
156 | 0 | } |
157 | 0 | if (service != null) { |
158 | 0 | if ( LOG.isDebugEnabled() ) { |
159 | 0 | LOG.debug("Located a remote proxy to service " + serviceName); |
160 | |
} |
161 | |
} |
162 | 0 | return service; |
163 | |
} |
164 | |
|
165 | |
public Object getService(QName qName, String url) { |
166 | 0 | List<RemotedServiceHolder> clientProxies = getAllServices(qName); |
167 | 0 | if (clientProxies == null || clientProxies.isEmpty()) { |
168 | 0 | return null; |
169 | |
} |
170 | 0 | for (ServiceHolder holder : clientProxies) { |
171 | 0 | if (holder.getServiceInfo().getEndpointUrl().equals(url)) { |
172 | |
try { |
173 | 0 | return holder.getService(); |
174 | 0 | } catch (Exception e) { |
175 | 0 | this.removeService(holder.getServiceInfo()); |
176 | 0 | } |
177 | |
} |
178 | |
} |
179 | 0 | return null; |
180 | |
} |
181 | |
|
182 | |
public List<QName> getServiceNamesForUnqualifiedName(String unqualifiedServiceName) { |
183 | 0 | List<QName> names = new ArrayList<QName>(); |
184 | 0 | for (QName serviceName : getClients().keySet()) { |
185 | 0 | if (serviceName.getLocalPart().equals(unqualifiedServiceName)) { |
186 | 0 | names.add(serviceName); |
187 | |
} |
188 | |
} |
189 | 0 | return names; |
190 | |
} |
191 | |
|
192 | |
public ServiceHolder getRemotedServiceHolderFromList(List<RemotedServiceHolder> remotedServices) { |
193 | 0 | return remotedServices.get(this.randomNumber.nextInt(remotedServices.size())); |
194 | |
} |
195 | |
|
196 | |
|
197 | |
|
198 | |
|
199 | |
public List<RemotedServiceHolder> getAllServices(QName qName) { |
200 | 0 | List<RemotedServiceHolder> clientProxies = this.getClients().get(qName); |
201 | 0 | if (clientProxies == null) { |
202 | 0 | if ( LOG.isDebugEnabled() ) { |
203 | 0 | LOG.debug("Client proxies are null, Re-aquiring services. Service Namespace " + ConfigContext.getCurrentContextConfig().getServiceNamespace()); |
204 | |
} |
205 | |
|
206 | |
|
207 | 0 | run(); |
208 | |
|
209 | 0 | clientProxies = this.getClients().get(qName); |
210 | 0 | if (clientProxies == null || clientProxies.size() == 0) { |
211 | 0 | if ( LOG.isDebugEnabled() ) { |
212 | 0 | Map<QName, List<RemotedServiceHolder>> x = this.getClients(); |
213 | 0 | if (x != null) { |
214 | 0 | for (QName b : x.keySet()) { |
215 | 0 | LOG.debug(b.getNamespaceURI() + " " + b.getLocalPart()); |
216 | |
} |
217 | |
} |
218 | |
} |
219 | 0 | throw new RiceRemoteServiceConnectionException("No remote services available for client access when attempting to lookup '" + qName + "'"); |
220 | |
} |
221 | |
} |
222 | |
|
223 | 0 | return Collections.unmodifiableList(new ArrayList<RemotedServiceHolder>(clientProxies)); |
224 | |
} |
225 | |
|
226 | |
public void refresh() { |
227 | 0 | run(); |
228 | 0 | } |
229 | |
|
230 | |
public void run() { |
231 | 0 | if (!isStarted()) { |
232 | 0 | return; |
233 | |
} |
234 | 0 | LOG.debug("Checking for new services on the bus"); |
235 | 0 | List<ServiceInfo> servicesOnBus = null; |
236 | 0 | if (ConfigContext.getCurrentContextConfig().getDevMode()) { |
237 | 0 | servicesOnBus = new ArrayList<ServiceInfo>(); |
238 | 0 | for (ServiceHolder remoteServiceHolder : KSBServiceLocator.getServiceDeployer().getPublishedServices().values()) { |
239 | 0 | servicesOnBus.add(remoteServiceHolder.getServiceInfo()); |
240 | |
} |
241 | |
} else { |
242 | 0 | servicesOnBus = KSBServiceLocator.getServiceRegistry().fetchAllActive(); |
243 | |
} |
244 | |
|
245 | 0 | synchronized ( clientsMutex ) { |
246 | 0 | if (new RoutingTableDiffCalculator().calculateClientSideUpdate(this.getClients(), servicesOnBus)) { |
247 | 0 | if ( LOG.isDebugEnabled() ) { |
248 | 0 | LOG.debug("Located new services on the bus, numServices=" + servicesOnBus.size()); |
249 | |
} |
250 | 0 | Map<QName, List<RemotedServiceHolder>> updatedRemoteServicesMap = new HashMap<QName, List<RemotedServiceHolder>>(); |
251 | 0 | for (Iterator<ServiceInfo> iter = servicesOnBus.iterator(); iter.hasNext();) { |
252 | 0 | ServiceInfo entry = iter.next(); |
253 | 0 | if (entry.getAlive()) { |
254 | |
try { |
255 | 0 | registerClient(entry, updatedRemoteServicesMap); |
256 | 0 | } catch (Exception e) { |
257 | 0 | LOG.error("Unable to register client " + entry, e); |
258 | 0 | } |
259 | |
} |
260 | 0 | } |
261 | 0 | this.setClients(updatedRemoteServicesMap); |
262 | 0 | } else { |
263 | 0 | LOG.debug("No new services on the bus."); |
264 | |
} |
265 | 0 | } |
266 | 0 | } |
267 | |
|
268 | |
private void registerClient(ServiceInfo serviceInfo, Map<QName, List<RemotedServiceHolder>> clientMap) { |
269 | |
|
270 | |
|
271 | |
|
272 | 0 | if (clientMap.get(serviceInfo.getQname()) == null) { |
273 | 0 | clientMap.put(serviceInfo.getQname(), new ArrayList<RemotedServiceHolder>()); |
274 | |
} |
275 | 0 | installAlternateEndpoint(serviceInfo); |
276 | 0 | clientMap.get(serviceInfo.getQname()).add(new RemotedServiceHolder(serviceInfo)); |
277 | 0 | } |
278 | |
|
279 | |
protected void installAlternateEndpoint(ServiceInfo serviceInfo) { |
280 | 0 | List<AlternateEndpointLocation> alternateEndpointLocations = (List<AlternateEndpointLocation>) ConfigContext |
281 | |
.getCurrentContextConfig().getObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINT_LOCATIONS); |
282 | 0 | if (alternateEndpointLocations != null) { |
283 | 0 | for (AlternateEndpointLocation alternateEndpointLocation : alternateEndpointLocations) { |
284 | 0 | if (Pattern.matches(".*" + alternateEndpointLocation.getEndpointHostReplacementPattern() + ".*", serviceInfo |
285 | |
.getEndpointUrl())) { |
286 | 0 | Pattern myPattern = Pattern.compile(alternateEndpointLocation.getEndpointHostReplacementPattern()); |
287 | 0 | Matcher myMatcher = myPattern.matcher(serviceInfo.getEndpointUrl()); |
288 | 0 | String alternateEndpoint = myMatcher.replaceFirst(alternateEndpointLocation |
289 | |
.getEndpointHostReplacementValue()); |
290 | 0 | if ( LOG.isInfoEnabled() ) { |
291 | 0 | LOG.info("Found an alternate url host value (" |
292 | |
+ alternateEndpointLocation.getEndpointHostReplacementValue() + ") for endpoint: " |
293 | |
+ serviceInfo.getEndpointUrl() + " -> instead using: " + alternateEndpoint); |
294 | |
} |
295 | 0 | serviceInfo.setEndpointAlternateUrl(alternateEndpoint); |
296 | 0 | break; |
297 | |
} |
298 | |
} |
299 | |
} |
300 | 0 | List<AlternateEndpoint> alternateEndpoints = (List<AlternateEndpoint>) ConfigContext.getCurrentContextConfig().getObject( |
301 | |
KSBConstants.Config.KSB_ALTERNATE_ENDPOINTS); |
302 | 0 | if (alternateEndpoints != null) { |
303 | 0 | for (AlternateEndpoint alternateEndpoint : alternateEndpoints) { |
304 | 0 | if (Pattern.matches(alternateEndpoint.getEndpointUrlPattern(), serviceInfo.getEndpointUrl())) { |
305 | 0 | if ( LOG.isInfoEnabled() ) { |
306 | 0 | LOG.info("Found an alternate url for endpoint: " + serviceInfo.getEndpointUrl() + " -> instead using: " |
307 | |
+ alternateEndpoint.getActualEndpoint()); |
308 | |
} |
309 | 0 | serviceInfo.setEndpointAlternateUrl(alternateEndpoint.getActualEndpoint()); |
310 | 0 | break; |
311 | |
} |
312 | |
} |
313 | |
} |
314 | 0 | } |
315 | |
|
316 | |
|
317 | |
public boolean isStarted() { |
318 | 0 | return this.started; |
319 | |
} |
320 | |
|
321 | |
public void start() throws Exception { |
322 | 0 | LOG.info("Starting the RemoteResourceServiceLocator..."); |
323 | |
|
324 | 0 | int refreshRate = ConfigContext.getCurrentContextConfig().getRefreshRate(); |
325 | 0 | this.future = KSBServiceLocator.getScheduledPool().scheduleWithFixedDelay(this, 30, refreshRate, TimeUnit.SECONDS); |
326 | 0 | this.started = true; |
327 | 0 | run(); |
328 | 0 | LOG.info("...RemoteResourceServiceLocator started."); |
329 | 0 | } |
330 | |
|
331 | |
public void stop() throws Exception { |
332 | 0 | LOG.info("Stopping the RemoteResourceServiceLocator..."); |
333 | 0 | if (this.future != null) { |
334 | 0 | if (!this.future.cancel(true)) { |
335 | 0 | LOG.warn("Failed to cancel the RemoteResourceServiceLocator service."); |
336 | |
} |
337 | 0 | this.future = null; |
338 | |
} |
339 | 0 | this.started = false; |
340 | 0 | LOG.info("...RemoteResourceServiceLocator stopped."); |
341 | 0 | } |
342 | |
|
343 | |
public Object getObject(ObjectDefinition definition) { |
344 | 0 | if (definition.isAtRemotingLayer()) { |
345 | 0 | return null; |
346 | |
} |
347 | 0 | if (StringUtils.isEmpty(definition.getServiceNamespace())) { |
348 | 0 | return null; |
349 | |
} |
350 | 0 | QName objectRemoterName = new QName(definition.getServiceNamespace(), KSBConstants.ServiceNames.OBJECT_REMOTER); |
351 | 0 | ObjectRemoterService classRemoter = (ObjectRemoterService)GlobalResourceLoader.getService(objectRemoterName); |
352 | 0 | ServiceInfo serviceInfo = classRemoter.getRemotedClassURL(definition); |
353 | |
|
354 | 0 | if (serviceInfo == null) { |
355 | 0 | return null; |
356 | |
} |
357 | |
|
358 | |
try { |
359 | 0 | RemoteObjectCleanup remoteCleanup = new RemoteObjectCleanup(objectRemoterName, serviceInfo.getQname()); |
360 | 0 | if (TransactionSynchronizationManager.isActualTransactionActive()) { |
361 | 0 | TransactionSynchronizationManager.registerSynchronization(remoteCleanup); |
362 | |
} |
363 | 0 | return ServiceConnectorFactory.getServiceConnector(serviceInfo).getService(); |
364 | 0 | } catch (Exception e) { |
365 | 0 | throw new RiceRuntimeException(e); |
366 | |
} |
367 | |
} |
368 | |
|
369 | |
public MessageExceptionHandler getMessageExceptionHandler(QName qname) { |
370 | 0 | List<RemotedServiceHolder> remotedServices = getAllServices(qname); |
371 | 0 | if (remotedServices == null || remotedServices.isEmpty()) { |
372 | 0 | throw new RiceRuntimeException("No services found for name " + qname); |
373 | |
} |
374 | 0 | ServiceHolder serviceHolder = getRemotedServiceHolderFromList(remotedServices); |
375 | 0 | if (serviceHolder != null) { |
376 | 0 | String messageExceptionHandlerName = serviceHolder.getServiceInfo().getServiceDefinition().getMessageExceptionHandler(); |
377 | 0 | if (messageExceptionHandlerName == null) { |
378 | 0 | messageExceptionHandlerName = DefaultMessageExceptionHandler.class.getName(); |
379 | |
} |
380 | 0 | return (MessageExceptionHandler) GlobalResourceLoader.getObject(new ObjectDefinition(messageExceptionHandlerName)); |
381 | |
} |
382 | 0 | throw new RiceRuntimeException("No service with QName " + qname + " found"); |
383 | |
} |
384 | |
|
385 | |
public Map<QName, List<RemotedServiceHolder>> getClients() { |
386 | 0 | synchronized ( clientsMutex ) { |
387 | 0 | return this.clients; |
388 | 0 | } |
389 | |
} |
390 | |
|
391 | |
public void setClients(Map<QName, List<RemotedServiceHolder>> clients) { |
392 | 0 | synchronized ( clientsMutex ) { |
393 | 0 | this.clients = clients; |
394 | 0 | } |
395 | 0 | } |
396 | |
} |