1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
package org.kuali.rice.ksb.messaging; |
17 | |
|
18 | |
import java.util.ArrayList; |
19 | |
import java.util.Collections; |
20 | |
import java.util.HashMap; |
21 | |
import java.util.Iterator; |
22 | |
import java.util.List; |
23 | |
import java.util.Map; |
24 | |
import java.util.Random; |
25 | |
import java.util.concurrent.ScheduledFuture; |
26 | |
import java.util.concurrent.TimeUnit; |
27 | |
import java.util.regex.Matcher; |
28 | |
import java.util.regex.Pattern; |
29 | |
|
30 | |
import javax.xml.namespace.QName; |
31 | |
|
32 | |
import org.apache.commons.lang.StringUtils; |
33 | |
import org.apache.log4j.Logger; |
34 | |
import org.kuali.rice.core.config.ConfigContext; |
35 | |
import org.kuali.rice.core.exception.RiceRuntimeException; |
36 | |
import org.kuali.rice.core.exception.RiceRemoteServiceConnectionException; |
37 | |
import org.kuali.rice.core.reflect.ObjectDefinition; |
38 | |
import org.kuali.rice.core.resourceloader.GlobalResourceLoader; |
39 | |
import org.kuali.rice.core.resourceloader.ResourceLoaderContainer; |
40 | |
import org.kuali.rice.ksb.messaging.exceptionhandling.DefaultMessageExceptionHandler; |
41 | |
import org.kuali.rice.ksb.messaging.exceptionhandling.MessageExceptionHandler; |
42 | |
import org.kuali.rice.ksb.messaging.objectremoting.ObjectRemoterService; |
43 | |
import org.kuali.rice.ksb.messaging.objectremoting.RemoteObjectCleanup; |
44 | |
import org.kuali.rice.ksb.messaging.serviceconnectors.ServiceConnectorFactory; |
45 | |
import org.kuali.rice.ksb.service.KSBServiceLocator; |
46 | |
import org.kuali.rice.ksb.util.KSBConstants; |
47 | |
import org.springframework.transaction.support.TransactionSynchronizationManager; |
48 | |
|
49 | |
public class RemoteResourceServiceLocatorImpl extends ResourceLoaderContainer implements Runnable, RemoteResourceServiceLocator { |
50 | |
|
51 | 0 | private static final Logger LOG = Logger.getLogger(RemoteResourceServiceLocatorImpl.class); |
52 | |
|
53 | 0 | private Random randomNumber = new Random(); |
54 | |
|
55 | |
private boolean started; |
56 | |
|
57 | |
private ScheduledFuture future; |
58 | |
|
59 | 0 | private Map<QName, List<RemotedServiceHolder>> clients = Collections.synchronizedMap(new HashMap<QName, List<RemotedServiceHolder>>()); |
60 | |
|
61 | |
|
62 | |
|
63 | 0 | private Object clientsMutex = new Object(); |
64 | |
|
65 | |
public RemoteResourceServiceLocatorImpl(QName name) { |
66 | 0 | super(name); |
67 | 0 | } |
68 | |
|
69 | |
public void removeService(ServiceInfo serviceInfo) { |
70 | 0 | QName serviceName = serviceInfo.getQname(); |
71 | 0 | if ( LOG.isInfoEnabled() ) { |
72 | 0 | LOG.info("Removing service '" + serviceName + "'..."); |
73 | |
} |
74 | 0 | List<RemotedServiceHolder> clientProxies = this.getClients().get(serviceName); |
75 | |
|
76 | |
|
77 | 0 | if (clientProxies != null) { |
78 | 0 | boolean removed = removeServiceFromCollection(serviceInfo, clientProxies); |
79 | 0 | if (!removed) { |
80 | 0 | if ( LOG.isInfoEnabled() ) { |
81 | 0 | LOG.info("There was no client proxy removed for the given service: " + serviceName); |
82 | |
} |
83 | |
} |
84 | 0 | if (clientProxies.isEmpty()) { |
85 | 0 | List<RemotedServiceHolder> removedList = this.getClients().remove(serviceName); |
86 | 0 | if (removedList.isEmpty()) { |
87 | 0 | LOG.warn("No client proxy was removed for the given service " + serviceName); |
88 | |
} |
89 | |
} |
90 | |
} |
91 | 0 | } |
92 | |
|
93 | |
|
94 | |
|
95 | |
|
96 | |
|
97 | |
|
98 | |
|
99 | |
|
100 | |
|
101 | |
|
102 | |
|
103 | |
private boolean removeServiceFromCollection(ServiceInfo serviceInfo, List<RemotedServiceHolder> serviceList) { |
104 | 0 | List<ServiceHolder> servicesToRemove = new ArrayList<ServiceHolder>(); |
105 | |
|
106 | 0 | for (ServiceHolder remotedServiceHolder : serviceList) { |
107 | |
try { |
108 | 0 | if (remotedServiceHolder.getServiceInfo().getEndpointUrl().equals(serviceInfo.getEndpointUrl())) { |
109 | 0 | servicesToRemove.add(remotedServiceHolder); |
110 | |
} |
111 | 0 | } catch (Exception e) { |
112 | 0 | LOG.warn("An exception was thrown when attempting to compare endpoint URLs", e); |
113 | 0 | } |
114 | |
} |
115 | 0 | if (! servicesToRemove.isEmpty()) { |
116 | 0 | for (ServiceHolder serviceToRemove : servicesToRemove) { |
117 | 0 | serviceToRemove.getServiceInfo().setAlive(false); |
118 | 0 | List<ServiceInfo> serviceInfos = new ArrayList<ServiceInfo>(); |
119 | 0 | serviceInfos.add(serviceToRemove.getServiceInfo()); |
120 | 0 | KSBServiceLocator.getServiceRegistry().markServicesDead(serviceInfos); |
121 | 0 | } |
122 | 0 | return serviceList.removeAll(servicesToRemove); |
123 | |
} |
124 | 0 | return false; |
125 | |
} |
126 | |
|
127 | |
|
128 | |
|
129 | |
|
130 | |
|
131 | |
public Object getService(QName serviceName) { |
132 | 0 | if ( LOG.isDebugEnabled() ) { |
133 | 0 | LOG.debug("ResourceLoader " + getName() + " fetching service " + serviceName); |
134 | |
} |
135 | |
|
136 | |
|
137 | 0 | RemotedServiceRegistry remoteRegistry = KSBServiceLocator.getServiceDeployer(); |
138 | 0 | Object service = remoteRegistry.getLocalService(serviceName); |
139 | 0 | if (service != null) { |
140 | 0 | return service; |
141 | |
} |
142 | |
|
143 | 0 | List<RemotedServiceHolder> clientProxies = getAllServices(serviceName); |
144 | 0 | if (clientProxies == null || clientProxies.isEmpty()) { |
145 | 0 | return null; |
146 | |
} |
147 | |
|
148 | 0 | ServiceHolder serviceHolder = getRemotedServiceHolderFromList(clientProxies); |
149 | |
try { |
150 | 0 | service = serviceHolder.getService(); |
151 | 0 | } catch (Exception e) { |
152 | 0 | LOG.error("Caught exception getting service " + serviceName); |
153 | 0 | this.removeService(serviceHolder.getServiceInfo()); |
154 | 0 | return getService(serviceName); |
155 | 0 | } |
156 | 0 | if (service != null) { |
157 | 0 | if ( LOG.isDebugEnabled() ) { |
158 | 0 | LOG.debug("Located a remote proxy to service " + serviceName); |
159 | |
} |
160 | |
} |
161 | 0 | return service; |
162 | |
} |
163 | |
|
164 | |
public Object getService(QName qName, String url) { |
165 | 0 | List<RemotedServiceHolder> clientProxies = getAllServices(qName); |
166 | 0 | if (clientProxies == null || clientProxies.isEmpty()) { |
167 | 0 | return null; |
168 | |
} |
169 | 0 | for (ServiceHolder holder : clientProxies) { |
170 | 0 | if (holder.getServiceInfo().getEndpointUrl().equals(url)) { |
171 | |
try { |
172 | 0 | return holder.getService(); |
173 | 0 | } catch (Exception e) { |
174 | 0 | this.removeService(holder.getServiceInfo()); |
175 | 0 | } |
176 | |
} |
177 | |
} |
178 | 0 | return null; |
179 | |
} |
180 | |
|
181 | |
public List<QName> getServiceNamesForUnqualifiedName(String unqualifiedServiceName) { |
182 | 0 | List<QName> names = new ArrayList<QName>(); |
183 | 0 | for (QName serviceName : clients.keySet()) { |
184 | 0 | if (serviceName.getLocalPart().equals(unqualifiedServiceName)) { |
185 | 0 | names.add(serviceName); |
186 | |
} |
187 | |
} |
188 | 0 | return names; |
189 | |
} |
190 | |
|
191 | |
public ServiceHolder getRemotedServiceHolderFromList(List<RemotedServiceHolder> remotedServices) { |
192 | 0 | return remotedServices.get(this.randomNumber.nextInt(remotedServices.size())); |
193 | |
} |
194 | |
|
195 | |
|
196 | |
|
197 | |
|
198 | |
public List<RemotedServiceHolder> getAllServices(QName qName) { |
199 | 0 | List<RemotedServiceHolder> clientProxies = this.getClients().get(qName); |
200 | 0 | if (clientProxies == null) { |
201 | 0 | if ( LOG.isDebugEnabled() ) { |
202 | 0 | LOG.debug("Client proxies are null, Re-aquiring services. Service Namespace " + ConfigContext.getCurrentContextConfig().getServiceNamespace()); |
203 | |
} |
204 | |
|
205 | |
|
206 | 0 | run(); |
207 | |
|
208 | 0 | clientProxies = this.getClients().get(qName); |
209 | 0 | if (clientProxies == null || clientProxies.size() == 0) { |
210 | 0 | if ( LOG.isDebugEnabled() ) { |
211 | 0 | Map<QName, List<RemotedServiceHolder>> x = this.getClients(); |
212 | 0 | if (x != null) { |
213 | 0 | for (QName b : x.keySet()) { |
214 | 0 | LOG.debug(b.getNamespaceURI() + " " + b.getLocalPart()); |
215 | |
} |
216 | |
} |
217 | |
} |
218 | 0 | throw new RiceRemoteServiceConnectionException("No remote services available for client access when attempting to lookup '" + qName + "'"); |
219 | |
} |
220 | |
} |
221 | |
|
222 | 0 | return Collections.unmodifiableList(new ArrayList<RemotedServiceHolder>(clientProxies)); |
223 | |
} |
224 | |
|
225 | |
public void refresh() { |
226 | 0 | run(); |
227 | 0 | } |
228 | |
|
229 | |
public void run() { |
230 | 0 | if (!isStarted()) { |
231 | 0 | return; |
232 | |
} |
233 | 0 | LOG.debug("Checking for new services on the bus"); |
234 | 0 | List<ServiceInfo> servicesOnBus = null; |
235 | 0 | if (ConfigContext.getCurrentContextConfig().getDevMode()) { |
236 | 0 | servicesOnBus = new ArrayList<ServiceInfo>(); |
237 | 0 | for (ServiceHolder remoteServiceHolder : KSBServiceLocator.getServiceDeployer().getPublishedServices().values()) { |
238 | 0 | servicesOnBus.add(remoteServiceHolder.getServiceInfo()); |
239 | |
} |
240 | |
} else { |
241 | 0 | servicesOnBus = KSBServiceLocator.getServiceRegistry().fetchAllActive(); |
242 | |
} |
243 | |
|
244 | 0 | synchronized ( clientsMutex ) { |
245 | 0 | if (new RoutingTableDiffCalculator().calculateClientSideUpdate(this.getClients(), servicesOnBus)) { |
246 | 0 | if ( LOG.isDebugEnabled() ) { |
247 | 0 | LOG.debug("Located new services on the bus, numServices=" + servicesOnBus.size()); |
248 | |
} |
249 | 0 | Map<QName, List<RemotedServiceHolder>> updatedRemoteServicesMap = new HashMap<QName, List<RemotedServiceHolder>>(); |
250 | 0 | for (Iterator<ServiceInfo> iter = servicesOnBus.iterator(); iter.hasNext();) { |
251 | 0 | ServiceInfo entry = iter.next(); |
252 | 0 | if (entry.getAlive()) { |
253 | |
try { |
254 | 0 | registerClient(entry, updatedRemoteServicesMap); |
255 | 0 | } catch (Exception e) { |
256 | 0 | LOG.error("Unable to register client " + entry, e); |
257 | 0 | } |
258 | |
} |
259 | 0 | } |
260 | 0 | this.setClients(updatedRemoteServicesMap); |
261 | 0 | } else { |
262 | 0 | LOG.debug("No new services on the bus."); |
263 | |
} |
264 | 0 | } |
265 | 0 | } |
266 | |
|
267 | |
private void registerClient(ServiceInfo serviceInfo, Map<QName, List<RemotedServiceHolder>> clientMap) { |
268 | |
|
269 | |
|
270 | |
|
271 | 0 | if (clientMap.get(serviceInfo.getQname()) == null) { |
272 | 0 | clientMap.put(serviceInfo.getQname(), new ArrayList<RemotedServiceHolder>()); |
273 | |
} |
274 | 0 | installAlternateEndpoint(serviceInfo); |
275 | 0 | clientMap.get(serviceInfo.getQname()).add(new RemotedServiceHolder(serviceInfo)); |
276 | 0 | } |
277 | |
|
278 | |
protected void installAlternateEndpoint(ServiceInfo serviceInfo) { |
279 | 0 | List<AlternateEndpointLocation> alternateEndpointLocations = (List<AlternateEndpointLocation>) ConfigContext |
280 | |
.getCurrentContextConfig().getObject(KSBConstants.KSB_ALTERNATE_ENDPOINT_LOCATIONS); |
281 | 0 | if (alternateEndpointLocations != null) { |
282 | 0 | for (AlternateEndpointLocation alternateEndpointLocation : alternateEndpointLocations) { |
283 | 0 | if (Pattern.matches(".*" + alternateEndpointLocation.getEndpointHostReplacementPattern() + ".*", serviceInfo |
284 | |
.getEndpointUrl())) { |
285 | 0 | Pattern myPattern = Pattern.compile(alternateEndpointLocation.getEndpointHostReplacementPattern()); |
286 | 0 | Matcher myMatcher = myPattern.matcher(serviceInfo.getEndpointUrl()); |
287 | 0 | String alternateEndpoint = myMatcher.replaceFirst(alternateEndpointLocation |
288 | |
.getEndpointHostReplacementValue()); |
289 | 0 | if ( LOG.isInfoEnabled() ) { |
290 | 0 | LOG.info("Found an alternate url host value (" |
291 | |
+ alternateEndpointLocation.getEndpointHostReplacementValue() + ") for endpoint: " |
292 | |
+ serviceInfo.getEndpointUrl() + " -> instead using: " + alternateEndpoint); |
293 | |
} |
294 | 0 | serviceInfo.setEndpointAlternateUrl(alternateEndpoint); |
295 | 0 | break; |
296 | |
} |
297 | |
} |
298 | |
} |
299 | 0 | List<AlternateEndpoint> alternateEndpoints = (List<AlternateEndpoint>) ConfigContext.getCurrentContextConfig().getObject( |
300 | |
KSBConstants.KSB_ALTERNATE_ENDPOINTS); |
301 | 0 | if (alternateEndpoints != null) { |
302 | 0 | for (AlternateEndpoint alternateEndpoint : alternateEndpoints) { |
303 | 0 | if (Pattern.matches(alternateEndpoint.getEndpointUrlPattern(), serviceInfo.getEndpointUrl())) { |
304 | 0 | if ( LOG.isInfoEnabled() ) { |
305 | 0 | LOG.info("Found an alternate url for endpoint: " + serviceInfo.getEndpointUrl() + " -> instead using: " |
306 | |
+ alternateEndpoint.getActualEndpoint()); |
307 | |
} |
308 | 0 | serviceInfo.setEndpointAlternateUrl(alternateEndpoint.getActualEndpoint()); |
309 | 0 | break; |
310 | |
} |
311 | |
} |
312 | |
} |
313 | 0 | } |
314 | |
|
315 | |
|
316 | |
public boolean isStarted() { |
317 | 0 | return this.started; |
318 | |
} |
319 | |
|
320 | |
public void start() throws Exception { |
321 | 0 | LOG.info("Starting the RemoteResourceServiceLocator..."); |
322 | |
|
323 | 0 | int refreshRate = ConfigContext.getCurrentContextConfig().getRefreshRate(); |
324 | 0 | this.future = KSBServiceLocator.getScheduledPool().scheduleWithFixedDelay(this, 30, refreshRate, TimeUnit.SECONDS); |
325 | 0 | this.started = true; |
326 | 0 | run(); |
327 | 0 | LOG.info("...RemoteResourceServiceLocator started."); |
328 | 0 | } |
329 | |
|
330 | |
public void stop() throws Exception { |
331 | 0 | LOG.info("Stopping the RemoteResourceServiceLocator..."); |
332 | 0 | if (this.future != null) { |
333 | 0 | if (!this.future.cancel(true)) { |
334 | 0 | LOG.warn("Failed to cancel the RemoteResourceServiceLocator service."); |
335 | |
} |
336 | 0 | this.future = null; |
337 | |
} |
338 | 0 | this.started = false; |
339 | 0 | LOG.info("...RemoteResourceServiceLocator stopped."); |
340 | 0 | } |
341 | |
|
342 | |
public Object getObject(ObjectDefinition definition) { |
343 | 0 | if (definition.isAtRemotingLayer()) { |
344 | 0 | return null; |
345 | |
} |
346 | 0 | if (StringUtils.isEmpty(definition.getServiceNamespace())) { |
347 | 0 | return null; |
348 | |
} |
349 | 0 | QName objectRemoterName = new QName(definition.getServiceNamespace(), KSBServiceLocator.OBJECT_REMOTER); |
350 | 0 | ObjectRemoterService classRemoter = (ObjectRemoterService)GlobalResourceLoader.getService(objectRemoterName); |
351 | 0 | ServiceInfo serviceInfo = classRemoter.getRemotedClassURL(definition); |
352 | |
|
353 | 0 | if (serviceInfo == null) { |
354 | 0 | return null; |
355 | |
} |
356 | |
|
357 | |
try { |
358 | 0 | RemoteObjectCleanup remoteCleanup = new RemoteObjectCleanup(objectRemoterName, serviceInfo.getQname()); |
359 | 0 | if (TransactionSynchronizationManager.isActualTransactionActive()) { |
360 | 0 | TransactionSynchronizationManager.registerSynchronization(remoteCleanup); |
361 | |
} |
362 | 0 | return ServiceConnectorFactory.getServiceConnector(serviceInfo).getService(); |
363 | 0 | } catch (Exception e) { |
364 | 0 | throw new RiceRuntimeException(e); |
365 | |
} |
366 | |
} |
367 | |
|
368 | |
public MessageExceptionHandler getMessageExceptionHandler(QName qname) { |
369 | 0 | List<RemotedServiceHolder> remotedServices = getAllServices(qname); |
370 | 0 | if (remotedServices == null || remotedServices.isEmpty()) { |
371 | 0 | throw new RiceRuntimeException("No services found for name " + qname); |
372 | |
} |
373 | 0 | ServiceHolder serviceHolder = getRemotedServiceHolderFromList(remotedServices); |
374 | 0 | if (serviceHolder != null) { |
375 | 0 | String messageExceptionHandlerName = serviceHolder.getServiceInfo().getServiceDefinition().getMessageExceptionHandler(); |
376 | 0 | if (messageExceptionHandlerName == null) { |
377 | 0 | messageExceptionHandlerName = DefaultMessageExceptionHandler.class.getName(); |
378 | |
} |
379 | 0 | return (MessageExceptionHandler) GlobalResourceLoader.getObject(new ObjectDefinition(messageExceptionHandlerName)); |
380 | |
} |
381 | 0 | throw new RiceRuntimeException("No service with QName " + qname + " found"); |
382 | |
} |
383 | |
|
384 | |
public Map<QName, List<RemotedServiceHolder>> getClients() { |
385 | 0 | synchronized ( clientsMutex ) { |
386 | 0 | return this.clients; |
387 | 0 | } |
388 | |
} |
389 | |
|
390 | |
public void setClients(Map<QName, List<RemotedServiceHolder>> clients) { |
391 | 0 | synchronized ( clientsMutex ) { |
392 | 0 | this.clients = clients; |
393 | 0 | } |
394 | 0 | } |
395 | |
} |