Coverage Report - org.kuali.rice.ksb.messaging.RemoteResourceServiceLocatorImpl
 
Classes in this File Line Coverage Branch Coverage Complexity
RemoteResourceServiceLocatorImpl
0%
0/188
0%
0/110
5
 
 1  
 /*
 2  
  * Copyright 2007 The Kuali Foundation
 3  
  *
 4  
  * Licensed under the Educational Community License, Version 2.0 (the "License");
 5  
  * you may not use this file except in compliance with the License.
 6  
  * You may obtain a copy of the License at
 7  
  *
 8  
  * http://www.opensource.org/licenses/ecl2.php
 9  
  *
 10  
  * Unless required by applicable law or agreed to in writing, software
 11  
  * distributed under the License is distributed on an "AS IS" BASIS,
 12  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  
  * See the License for the specific language governing permissions and
 14  
  * limitations under the License.
 15  
  */
 16  
 package org.kuali.rice.ksb.messaging;
 17  
 
 18  
 import java.util.ArrayList;
 19  
 import java.util.Collections;
 20  
 import java.util.HashMap;
 21  
 import java.util.Iterator;
 22  
 import java.util.List;
 23  
 import java.util.Map;
 24  
 import java.util.Random;
 25  
 import java.util.concurrent.ScheduledFuture;
 26  
 import java.util.concurrent.TimeUnit;
 27  
 import java.util.regex.Matcher;
 28  
 import java.util.regex.Pattern;
 29  
 
 30  
 import javax.xml.namespace.QName;
 31  
 
 32  
 import org.apache.commons.lang.StringUtils;
 33  
 import org.apache.log4j.Logger;
 34  
 import org.kuali.rice.core.config.ConfigContext;
 35  
 import org.kuali.rice.core.exception.RiceRuntimeException;
 36  
 import org.kuali.rice.core.exception.RiceRemoteServiceConnectionException;
 37  
 import org.kuali.rice.core.reflect.ObjectDefinition;
 38  
 import org.kuali.rice.core.resourceloader.GlobalResourceLoader;
 39  
 import org.kuali.rice.core.resourceloader.ResourceLoaderContainer;
 40  
 import org.kuali.rice.ksb.messaging.exceptionhandling.DefaultMessageExceptionHandler;
 41  
 import org.kuali.rice.ksb.messaging.exceptionhandling.MessageExceptionHandler;
 42  
 import org.kuali.rice.ksb.messaging.objectremoting.ObjectRemoterService;
 43  
 import org.kuali.rice.ksb.messaging.objectremoting.RemoteObjectCleanup;
 44  
 import org.kuali.rice.ksb.messaging.serviceconnectors.ServiceConnectorFactory;
 45  
 import org.kuali.rice.ksb.service.KSBServiceLocator;
 46  
 import org.kuali.rice.ksb.util.KSBConstants;
 47  
 import org.springframework.transaction.support.TransactionSynchronizationManager;
 48  
 
 49  
 public class RemoteResourceServiceLocatorImpl extends ResourceLoaderContainer implements Runnable, RemoteResourceServiceLocator {
 50  
 
 51  0
         private static final Logger LOG = Logger.getLogger(RemoteResourceServiceLocatorImpl.class);
 52  
 
 53  0
         private Random randomNumber = new Random();
 54  
 
 55  
         private boolean started;
 56  
 
 57  
         private ScheduledFuture future;
 58  
 
 59  0
         private Map<QName, List<RemotedServiceHolder>> clients = Collections.synchronizedMap(new HashMap<QName, List<RemotedServiceHolder>>());
 60  
         // You can't synchronize on an instance that may be replaced by a setter
 61  
         // unless you want to potentially find a bunch of orphaned data in the
 62  
         // database over time.
 63  0
         private Object clientsMutex = new Object();
 64  
         
 65  
         public RemoteResourceServiceLocatorImpl(QName name) {
 66  0
                 super(name);
 67  0
         }
 68  
 
 69  
         public void removeService(ServiceInfo serviceInfo) {
 70  0
                 QName serviceName = serviceInfo.getQname();
 71  0
                 if ( LOG.isInfoEnabled() ) {
 72  0
                         LOG.info("Removing service '" + serviceName + "'...");
 73  
                 }
 74  0
                 List<RemotedServiceHolder> clientProxies = this.getClients().get(serviceName);
 75  
                 // these could be null in the case that they were removed by another
 76  
                 // thread (the thread pool) prior to entry into this method
 77  0
                 if (clientProxies != null) {
 78  0
                         boolean removed = removeServiceFromCollection(serviceInfo, clientProxies);
 79  0
                         if (!removed) {
 80  0
                                 if ( LOG.isInfoEnabled() ) {
 81  0
                                         LOG.info("There was no client proxy removed for the given service: " + serviceName);
 82  
                                 }
 83  
                         }
 84  0
                         if (clientProxies.isEmpty()) {
 85  0
                                 List<RemotedServiceHolder> removedList = this.getClients().remove(serviceName);
 86  0
                                 if (removedList.isEmpty()) {
 87  0
                                         LOG.warn("No client proxy was removed for the given service " + serviceName);
 88  
                                 }
 89  
                         }
 90  
                 }
 91  0
         }
 92  
 
 93  
         /**
 94  
          * Removes a service (its RemotedServiceHolder wrapper) from the list of
 95  
          * services. This isn't very efficient but for time reasons hashcode and
 96  
          * equals wasn't implemented on the RemotedServiceHolder and IPTable, which
 97  
          * is a member of the RemotedServiceHolder.
 98  
          *
 99  
          * @param service
 100  
          * @param serviceList
 101  
          * @return boolean indicating if the entry was removed from the list
 102  
          */
 103  
         private boolean removeServiceFromCollection(ServiceInfo serviceInfo, List<RemotedServiceHolder> serviceList) {
 104  0
                     List<ServiceHolder> servicesToRemove = new ArrayList<ServiceHolder>();
 105  
 
 106  0
                 for (ServiceHolder remotedServiceHolder : serviceList) {
 107  
                     try {
 108  0
                         if (remotedServiceHolder.getServiceInfo().getEndpointUrl().equals(serviceInfo.getEndpointUrl())) {
 109  0
                             servicesToRemove.add(remotedServiceHolder);
 110  
                         }
 111  0
                     } catch (Exception e) {
 112  0
                         LOG.warn("An exception was thrown when attempting to compare endpoint URLs", e);
 113  0
                     }
 114  
                 }
 115  0
                 if (! servicesToRemove.isEmpty()) {
 116  0
                     for (ServiceHolder serviceToRemove : servicesToRemove) {
 117  0
                             serviceToRemove.getServiceInfo().setAlive(false);
 118  0
                             List<ServiceInfo> serviceInfos = new ArrayList<ServiceInfo>();
 119  0
                             serviceInfos.add(serviceToRemove.getServiceInfo());
 120  0
                             KSBServiceLocator.getServiceRegistry().markServicesDead(serviceInfos);
 121  0
                     }
 122  0
                     return serviceList.removeAll(servicesToRemove);
 123  
                 }
 124  0
                 return false;
 125  
         }
 126  
 
 127  
         /**
 128  
          * Fetches a service from the client proxies configured in this resource
 129  
          * loader.
 130  
          */
 131  
         public Object getService(QName serviceName) {
 132  0
                 if ( LOG.isDebugEnabled() ) {
 133  0
                         LOG.debug("ResourceLoader " + getName() + " fetching service " + serviceName);
 134  
                 }
 135  
 
 136  
                 //go to our remotely deployed services first
 137  0
                 RemotedServiceRegistry remoteRegistry = KSBServiceLocator.getServiceDeployer();
 138  0
                 Object service = remoteRegistry.getLocalService(serviceName);
 139  0
                 if (service != null) {
 140  0
                         return service;
 141  
                 }
 142  
 
 143  0
                 List<RemotedServiceHolder> clientProxies = getAllServices(serviceName);
 144  0
                 if (clientProxies == null || clientProxies.isEmpty()) {
 145  0
                         return null;
 146  
                 }
 147  
                 // randomly get a proxy for 'load balancing'
 148  0
                 ServiceHolder serviceHolder = getRemotedServiceHolderFromList(clientProxies);
 149  
                 try {
 150  0
                     service = serviceHolder.getService();
 151  0
                 } catch (Exception e) {
 152  0
                     LOG.error("Caught exception getting service " + serviceName);
 153  0
                     this.removeService(serviceHolder.getServiceInfo());
 154  0
                     return getService(serviceName);
 155  0
                 }
 156  0
                 if (service != null) {
 157  0
                         if ( LOG.isDebugEnabled() ) {
 158  0
                                 LOG.debug("Located a remote proxy to service " + serviceName);
 159  
                         }
 160  
                 }
 161  0
                 return service;
 162  
         }
 163  
 
 164  
         public Object getService(QName qName, String url) {
 165  0
                 List<RemotedServiceHolder> clientProxies = getAllServices(qName);
 166  0
                 if (clientProxies == null || clientProxies.isEmpty()) {
 167  0
                         return null;
 168  
                 }
 169  0
                 for (ServiceHolder holder : clientProxies) {
 170  0
                         if (holder.getServiceInfo().getEndpointUrl().equals(url)) {
 171  
                                     try {
 172  0
                                         return holder.getService();
 173  0
                                     } catch (Exception e) {
 174  0
                                         this.removeService(holder.getServiceInfo());
 175  0
                                     }
 176  
                         }
 177  
                 }
 178  0
                 return null;
 179  
         }
 180  
         
 181  
         public List<QName> getServiceNamesForUnqualifiedName(String unqualifiedServiceName) {
 182  0
                 List<QName> names = new ArrayList<QName>();
 183  0
                 for (QName serviceName : clients.keySet()) {
 184  0
                         if (serviceName.getLocalPart().equals(unqualifiedServiceName)) {
 185  0
                                 names.add(serviceName);
 186  
                         }
 187  
                 }
 188  0
                 return names;
 189  
         }
 190  
 
 191  
         public ServiceHolder getRemotedServiceHolderFromList(List<RemotedServiceHolder> remotedServices) {
 192  0
                 return remotedServices.get(this.randomNumber.nextInt(remotedServices.size()));
 193  
         }
 194  
 
 195  
         /**
 196  
          * Returns an immutable list of services.
 197  
          */
 198  
         public List<RemotedServiceHolder> getAllServices(QName qName) {
 199  0
                 List<RemotedServiceHolder> clientProxies = this.getClients().get(qName);
 200  0
                 if (clientProxies == null) {
 201  0
                         if ( LOG.isDebugEnabled() ) {
 202  0
                                 LOG.debug("Client proxies are null, Re-aquiring services.  Service Namespace " + ConfigContext.getCurrentContextConfig().getServiceNamespace());
 203  
                         }
 204  
                         
 205  
                         // TODO: What is this all about?
 206  0
                         run();
 207  
                         
 208  0
                         clientProxies = this.getClients().get(qName);
 209  0
                         if (clientProxies == null || clientProxies.size() == 0) {
 210  0
                                 if ( LOG.isDebugEnabled() ) {
 211  0
                                         Map<QName, List<RemotedServiceHolder>> x = this.getClients();
 212  0
                                         if (x != null) {
 213  0
                                                 for (QName b : x.keySet()) {
 214  0
                                                         LOG.debug(b.getNamespaceURI() + " " + b.getLocalPart());
 215  
                                                 }
 216  
                                         }
 217  
                                 }
 218  0
                                 throw new RiceRemoteServiceConnectionException("No remote services available for client access when attempting to lookup '" + qName + "'");
 219  
                         }
 220  
                 }
 221  
                 // be sure to return an immutable copy of the list
 222  0
                 return Collections.unmodifiableList(new ArrayList<RemotedServiceHolder>(clientProxies));
 223  
         }
 224  
 
 225  
         public void refresh() {
 226  0
             run();
 227  0
         }
 228  
 
 229  
         public void run() {
 230  0
                 if (!isStarted()) {
 231  0
                         return;
 232  
                 }
 233  0
                 LOG.debug("Checking for new services on the bus");
 234  0
                 List<ServiceInfo> servicesOnBus = null;
 235  0
                 if (ConfigContext.getCurrentContextConfig().getDevMode()) {
 236  0
                         servicesOnBus = new ArrayList<ServiceInfo>();
 237  0
                         for (ServiceHolder remoteServiceHolder : KSBServiceLocator.getServiceDeployer().getPublishedServices().values()) {
 238  0
                                 servicesOnBus.add(remoteServiceHolder.getServiceInfo());
 239  
                         }
 240  
                 } else {
 241  0
                         servicesOnBus = KSBServiceLocator.getServiceRegistry().fetchAllActive();
 242  
                 }
 243  
 
 244  0
                 synchronized ( clientsMutex ) {
 245  0
                         if (new RoutingTableDiffCalculator().calculateClientSideUpdate(this.getClients(), servicesOnBus)) {
 246  0
                                 if ( LOG.isDebugEnabled() ) {
 247  0
                                         LOG.debug("Located new services on the bus, numServices=" + servicesOnBus.size());
 248  
                                 }
 249  0
                                 Map<QName, List<RemotedServiceHolder>> updatedRemoteServicesMap = new HashMap<QName, List<RemotedServiceHolder>>();
 250  0
                                 for (Iterator<ServiceInfo> iter = servicesOnBus.iterator(); iter.hasNext();) {
 251  0
                                         ServiceInfo entry = iter.next();
 252  0
                                         if (entry.getAlive()) {
 253  
                                                 try {
 254  0
                                                         registerClient(entry, updatedRemoteServicesMap);
 255  0
                                                 } catch (Exception e) {
 256  0
                                                         LOG.error("Unable to register client " + entry, e);
 257  0
                                                 }
 258  
                                         }
 259  0
                                 }
 260  0
                                 this.setClients(updatedRemoteServicesMap);
 261  0
                         } else {
 262  0
                                 LOG.debug("No new services on the bus.");
 263  
                         }
 264  0
                 }
 265  0
         }
 266  
 
 267  
         private void registerClient(ServiceInfo serviceInfo, Map<QName, List<RemotedServiceHolder>> clientMap) {
 268  
 
 269  
 
 270  
 
 271  0
                 if (clientMap.get(serviceInfo.getQname()) == null) {
 272  0
                         clientMap.put(serviceInfo.getQname(), new ArrayList<RemotedServiceHolder>());
 273  
                 }
 274  0
                 installAlternateEndpoint(serviceInfo);
 275  0
                 clientMap.get(serviceInfo.getQname()).add(new RemotedServiceHolder(serviceInfo));
 276  0
         }
 277  
         
 278  
         protected void installAlternateEndpoint(ServiceInfo serviceInfo) {
 279  0
         List<AlternateEndpointLocation> alternateEndpointLocations = (List<AlternateEndpointLocation>) ConfigContext
 280  
                 .getCurrentContextConfig().getObject(KSBConstants.KSB_ALTERNATE_ENDPOINT_LOCATIONS);
 281  0
         if (alternateEndpointLocations != null) {
 282  0
             for (AlternateEndpointLocation alternateEndpointLocation : alternateEndpointLocations) {
 283  0
                 if (Pattern.matches(".*" + alternateEndpointLocation.getEndpointHostReplacementPattern() + ".*", serviceInfo
 284  
                         .getEndpointUrl())) {
 285  0
                     Pattern myPattern = Pattern.compile(alternateEndpointLocation.getEndpointHostReplacementPattern());
 286  0
                     Matcher myMatcher = myPattern.matcher(serviceInfo.getEndpointUrl());
 287  0
                     String alternateEndpoint = myMatcher.replaceFirst(alternateEndpointLocation
 288  
                             .getEndpointHostReplacementValue());
 289  0
                     if ( LOG.isInfoEnabled() ) {
 290  0
                             LOG.info("Found an alternate url host value ("
 291  
                                             + alternateEndpointLocation.getEndpointHostReplacementValue() + ") for endpoint: "
 292  
                                             + serviceInfo.getEndpointUrl() + " -> instead using: " + alternateEndpoint);
 293  
                     }
 294  0
                     serviceInfo.setEndpointAlternateUrl(alternateEndpoint);
 295  0
                     break;
 296  
                 }
 297  
             }
 298  
         }
 299  0
         List<AlternateEndpoint> alternateEndpoints = (List<AlternateEndpoint>) ConfigContext.getCurrentContextConfig().getObject(
 300  
                 KSBConstants.KSB_ALTERNATE_ENDPOINTS);
 301  0
         if (alternateEndpoints != null) {
 302  0
             for (AlternateEndpoint alternateEndpoint : alternateEndpoints) {
 303  0
                 if (Pattern.matches(alternateEndpoint.getEndpointUrlPattern(), serviceInfo.getEndpointUrl())) {
 304  0
                         if ( LOG.isInfoEnabled() ) {
 305  0
                                 LOG.info("Found an alternate url for endpoint: " + serviceInfo.getEndpointUrl() + " -> instead using: "
 306  
                                             + alternateEndpoint.getActualEndpoint());
 307  
                         }
 308  0
                     serviceInfo.setEndpointAlternateUrl(alternateEndpoint.getActualEndpoint());
 309  0
                     break;
 310  
                 }
 311  
             }
 312  
         }
 313  0
     }
 314  
 
 315  
 
 316  
         public boolean isStarted() {
 317  0
                 return this.started;
 318  
         }
 319  
 
 320  
         public void start() throws Exception {
 321  0
                 LOG.info("Starting the RemoteResourceServiceLocator...");
 322  
 
 323  0
                 int refreshRate = ConfigContext.getCurrentContextConfig().getRefreshRate();
 324  0
                 this.future = KSBServiceLocator.getScheduledPool().scheduleWithFixedDelay(this, 30, refreshRate, TimeUnit.SECONDS);
 325  0
                 this.started = true;
 326  0
                 run();
 327  0
                 LOG.info("...RemoteResourceServiceLocator started.");
 328  0
         }
 329  
 
 330  
         public void stop() throws Exception {
 331  0
                 LOG.info("Stopping the RemoteResourceServiceLocator...");
 332  0
                 if (this.future != null) {
 333  0
                         if (!this.future.cancel(true)) {
 334  0
                                 LOG.warn("Failed to cancel the RemoteResourceServiceLocator service.");
 335  
                         }
 336  0
                         this.future = null;
 337  
                 }
 338  0
                 this.started = false;
 339  0
                 LOG.info("...RemoteResourceServiceLocator stopped.");
 340  0
         }
 341  
 
 342  
         public Object getObject(ObjectDefinition definition) {
 343  0
                 if (definition.isAtRemotingLayer()) {
 344  0
                         return null;
 345  
                 }
 346  0
                 if (StringUtils.isEmpty(definition.getServiceNamespace())) {
 347  0
                         return null;
 348  
                 }
 349  0
                 QName objectRemoterName = new QName(definition.getServiceNamespace(), KSBServiceLocator.OBJECT_REMOTER);
 350  0
                 ObjectRemoterService classRemoter = (ObjectRemoterService)GlobalResourceLoader.getService(objectRemoterName);
 351  0
                 ServiceInfo serviceInfo = classRemoter.getRemotedClassURL(definition);
 352  
 
 353  0
                 if (serviceInfo == null) {
 354  0
                         return null;
 355  
                 }
 356  
 
 357  
                 try {
 358  0
                         RemoteObjectCleanup remoteCleanup = new RemoteObjectCleanup(objectRemoterName, serviceInfo.getQname());
 359  0
                         if (TransactionSynchronizationManager.isActualTransactionActive()) {
 360  0
                             TransactionSynchronizationManager.registerSynchronization(remoteCleanup);
 361  
                         }
 362  0
                         return ServiceConnectorFactory.getServiceConnector(serviceInfo).getService();
 363  0
                 } catch (Exception e) {
 364  0
                         throw new RiceRuntimeException(e);
 365  
                 }
 366  
         }
 367  
 
 368  
         public MessageExceptionHandler getMessageExceptionHandler(QName qname) {
 369  0
                 List<RemotedServiceHolder> remotedServices = getAllServices(qname);
 370  0
                 if (remotedServices == null || remotedServices.isEmpty()) {
 371  0
                         throw new RiceRuntimeException("No services found for name " + qname);
 372  
                 }
 373  0
                 ServiceHolder serviceHolder = getRemotedServiceHolderFromList(remotedServices);
 374  0
                 if (serviceHolder != null) {
 375  0
                         String messageExceptionHandlerName = serviceHolder.getServiceInfo().getServiceDefinition().getMessageExceptionHandler();
 376  0
                         if (messageExceptionHandlerName == null) {
 377  0
                                 messageExceptionHandlerName = DefaultMessageExceptionHandler.class.getName();
 378  
                         }
 379  0
                         return (MessageExceptionHandler) GlobalResourceLoader.getObject(new ObjectDefinition(messageExceptionHandlerName));
 380  
                 }
 381  0
                 throw new RiceRuntimeException("No service with QName " + qname + " found");
 382  
         }
 383  
 
 384  
         public Map<QName, List<RemotedServiceHolder>> getClients() {
 385  0
                 synchronized ( clientsMutex ) {
 386  0
                     return this.clients;                        
 387  0
                 }
 388  
         }
 389  
 
 390  
         public void setClients(Map<QName, List<RemotedServiceHolder>> clients) {
 391  0
                 synchronized ( clientsMutex ) {
 392  0
                         this.clients = clients;
 393  0
                 }
 394  0
         }
 395  
 }