Coverage Report - org.kuali.rice.ksb.messaging.RemoteResourceServiceLocatorImpl
 
Classes in this File Line Coverage Branch Coverage Complexity
RemoteResourceServiceLocatorImpl
0%
0/188
0%
0/110
5
 
 1  
 /*
 2  
  * Copyright 2006-2011 The Kuali Foundation
 3  
  *
 4  
  * Licensed under the Educational Community License, Version 2.0 (the "License");
 5  
  * you may not use this file except in compliance with the License.
 6  
  * You may obtain a copy of the License at
 7  
  *
 8  
  * http://www.opensource.org/licenses/ecl2.php
 9  
  *
 10  
  * Unless required by applicable law or agreed to in writing, software
 11  
  * distributed under the License is distributed on an "AS IS" BASIS,
 12  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  
  * See the License for the specific language governing permissions and
 14  
  * limitations under the License.
 15  
  */
 16  
 
 17  
 package org.kuali.rice.ksb.messaging;
 18  
 
 19  
 import org.apache.commons.lang.StringUtils;
 20  
 import org.apache.log4j.Logger;
 21  
 import org.kuali.rice.core.api.config.property.ConfigContext;
 22  
 import org.kuali.rice.core.api.exception.RiceRemoteServiceConnectionException;
 23  
 import org.kuali.rice.core.api.exception.RiceRuntimeException;
 24  
 import org.kuali.rice.core.api.reflect.ObjectDefinition;
 25  
 import org.kuali.rice.core.api.resourceloader.GlobalResourceLoader;
 26  
 import org.kuali.rice.core.api.resourceloader.GlobalResourceLoader;
 27  
 import org.kuali.rice.core.api.resourceloader.ResourceLoaderContainer;
 28  
 import org.kuali.rice.ksb.messaging.exceptionhandling.DefaultMessageExceptionHandler;
 29  
 import org.kuali.rice.ksb.messaging.exceptionhandling.MessageExceptionHandler;
 30  
 import org.kuali.rice.ksb.messaging.objectremoting.ObjectRemoterService;
 31  
 import org.kuali.rice.ksb.messaging.objectremoting.RemoteObjectCleanup;
 32  
 import org.kuali.rice.ksb.messaging.serviceconnectors.ServiceConnectorFactory;
 33  
 import org.kuali.rice.ksb.service.KSBServiceLocator;
 34  
 import org.kuali.rice.ksb.util.KSBConstants;
 35  
 import org.springframework.transaction.support.TransactionSynchronizationManager;
 36  
 
 37  
 import javax.xml.namespace.QName;
 38  
 import java.util.ArrayList;
 39  
 import java.util.Collections;
 40  
 import java.util.HashMap;
 41  
 import java.util.Iterator;
 42  
 import java.util.List;
 43  
 import java.util.Map;
 44  
 import java.util.Random;
 45  
 import java.util.concurrent.ScheduledFuture;
 46  
 import java.util.concurrent.TimeUnit;
 47  
 import java.util.regex.Matcher;
 48  
 import java.util.regex.Pattern;
 49  
 
 50  
 public class RemoteResourceServiceLocatorImpl extends ResourceLoaderContainer implements Runnable, RemoteResourceServiceLocator {
 51  
 
 52  0
         private static final Logger LOG = Logger.getLogger(RemoteResourceServiceLocatorImpl.class);
 53  
 
 54  0
         private Random randomNumber = new Random();
 55  
 
 56  
         private boolean started;
 57  
 
 58  
         private ScheduledFuture future;
 59  
 
 60  0
         private Map<QName, List<RemotedServiceHolder>> clients = Collections.synchronizedMap(new HashMap<QName, List<RemotedServiceHolder>>());
 61  
         // You can't synchronize on an instance that may be replaced by a setter
 62  
         // unless you want to potentially find a bunch of orphaned data in the
 63  
         // database over time.
 64  0
         private Object clientsMutex = new Object();
 65  
         
 66  
         public RemoteResourceServiceLocatorImpl(QName name) {
 67  0
                 super(name);
 68  0
         }
 69  
 
 70  
         public void removeService(ServiceInfo serviceInfo) {
 71  0
                 QName serviceName = serviceInfo.getQname();
 72  0
                 if ( LOG.isInfoEnabled() ) {
 73  0
                         LOG.info("Removing service '" + serviceName + "'...");
 74  
                 }
 75  0
                 List<RemotedServiceHolder> clientProxies = this.getClients().get(serviceName);
 76  
                 // these could be null in the case that they were removed by another
 77  
                 // thread (the thread pool) prior to entry into this method
 78  0
                 if (clientProxies != null) {
 79  0
                         boolean removed = removeServiceFromCollection(serviceInfo, clientProxies);
 80  0
                         if (!removed) {
 81  0
                                 if ( LOG.isInfoEnabled() ) {
 82  0
                                         LOG.info("There was no client proxy removed for the given service: " + serviceName);
 83  
                                 }
 84  
                         }
 85  0
                         if (clientProxies.isEmpty()) {
 86  0
                                 List<RemotedServiceHolder> removedList = this.getClients().remove(serviceName);
 87  0
                                 if (removedList.isEmpty()) {
 88  0
                                         LOG.warn("No client proxy was removed for the given service " + serviceName);
 89  
                                 }
 90  
                         }
 91  
                 }
 92  0
         }
 93  
 
 94  
         /**
 95  
          * Removes a service (its RemotedServiceHolder wrapper) from the list of
 96  
          * services. This isn't very efficient but for time reasons hashcode and
 97  
          * equals wasn't implemented on the RemotedServiceHolder and IPTable, which
 98  
          * is a member of the RemotedServiceHolder.
 99  
          *
 100  
          * @param serviceInfo
 101  
          * @param serviceList
 102  
          * @return boolean indicating if the entry was removed from the list
 103  
          */
 104  
         private boolean removeServiceFromCollection(ServiceInfo serviceInfo, List<RemotedServiceHolder> serviceList) {
 105  0
                     List<ServiceHolder> servicesToRemove = new ArrayList<ServiceHolder>();
 106  
 
 107  0
                 for (ServiceHolder remotedServiceHolder : serviceList) {
 108  
                     try {
 109  0
                         if (remotedServiceHolder.getServiceInfo().getEndpointUrl().equals(serviceInfo.getEndpointUrl())) {
 110  0
                             servicesToRemove.add(remotedServiceHolder);
 111  
                         }
 112  0
                     } catch (Exception e) {
 113  0
                         LOG.warn("An exception was thrown when attempting to compare endpoint URLs", e);
 114  0
                     }
 115  
                 }
 116  0
                 if (! servicesToRemove.isEmpty()) {
 117  0
                     for (ServiceHolder serviceToRemove : servicesToRemove) {
 118  0
                             serviceToRemove.getServiceInfo().setAlive(false);
 119  0
                             List<ServiceInfo> serviceInfos = new ArrayList<ServiceInfo>();
 120  0
                             serviceInfos.add(serviceToRemove.getServiceInfo());
 121  0
                             KSBServiceLocator.getServiceRegistry().markServicesDead(serviceInfos);
 122  0
                     }
 123  0
                     return serviceList.removeAll(servicesToRemove);
 124  
                 }
 125  0
                 return false;
 126  
         }
 127  
 
 128  
         /**
 129  
          * Fetches a service from the client proxies configured in this resource
 130  
          * loader.
 131  
          */
 132  
         public Object getService(QName serviceName) {
 133  0
                 if ( LOG.isDebugEnabled() ) {
 134  0
                         LOG.debug("ResourceLoader " + getName() + " fetching service " + serviceName);
 135  
                 }
 136  
 
 137  
                 //go to our remotely deployed services first
 138  0
                 RemotedServiceRegistry remoteRegistry = KSBServiceLocator.getServiceDeployer();
 139  0
                 Object service = remoteRegistry.getLocalService(serviceName);
 140  0
                 if (service != null) {
 141  0
                         return service;
 142  
                 }
 143  
 
 144  0
                 List<RemotedServiceHolder> clientProxies = getAllServices(serviceName);
 145  0
                 if (clientProxies == null || clientProxies.isEmpty()) {
 146  0
                         return null;
 147  
                 }
 148  
                 // randomly get a proxy for 'load balancing'
 149  0
                 ServiceHolder serviceHolder = getRemotedServiceHolderFromList(clientProxies);
 150  
                 try {
 151  0
                     service = serviceHolder.getService();
 152  0
                 } catch (Exception e) {
 153  0
                     LOG.error("Caught exception getting service " + serviceName);
 154  0
                     this.removeService(serviceHolder.getServiceInfo());
 155  0
                     return getService(serviceName);
 156  0
                 }
 157  0
                 if (service != null) {
 158  0
                         if ( LOG.isDebugEnabled() ) {
 159  0
                                 LOG.debug("Located a remote proxy to service " + serviceName);
 160  
                         }
 161  
                 }
 162  0
                 return service;
 163  
         }
 164  
 
 165  
         public Object getService(QName qName, String url) {
 166  0
                 List<RemotedServiceHolder> clientProxies = getAllServices(qName);
 167  0
                 if (clientProxies == null || clientProxies.isEmpty()) {
 168  0
                         return null;
 169  
                 }
 170  0
                 for (ServiceHolder holder : clientProxies) {
 171  0
                         if (holder.getServiceInfo().getEndpointUrl().equals(url)) {
 172  
                                     try {
 173  0
                                         return holder.getService();
 174  0
                                     } catch (Exception e) {
 175  0
                                         this.removeService(holder.getServiceInfo());
 176  0
                                     }
 177  
                         }
 178  
                 }
 179  0
                 return null;
 180  
         }
 181  
         
 182  
         public List<QName> getServiceNamesForUnqualifiedName(String unqualifiedServiceName) {
 183  0
                 List<QName> names = new ArrayList<QName>();
 184  0
                 for (QName serviceName : getClients().keySet()) {
 185  0
                         if (serviceName.getLocalPart().equals(unqualifiedServiceName)) {
 186  0
                                 names.add(serviceName);
 187  
                         }
 188  
                 }
 189  0
                 return names;
 190  
         }
 191  
 
 192  
         public ServiceHolder getRemotedServiceHolderFromList(List<RemotedServiceHolder> remotedServices) {
 193  0
                 return remotedServices.get(this.randomNumber.nextInt(remotedServices.size()));
 194  
         }
 195  
 
 196  
         /**
 197  
          * Returns an immutable list of services.
 198  
          */
 199  
         public List<RemotedServiceHolder> getAllServices(QName qName) {
 200  0
                 List<RemotedServiceHolder> clientProxies = this.getClients().get(qName);
 201  0
                 if (clientProxies == null) {
 202  0
                         if ( LOG.isDebugEnabled() ) {
 203  0
                                 LOG.debug("Client proxies are null, Re-aquiring services.  Service Namespace " + ConfigContext.getCurrentContextConfig().getServiceNamespace());
 204  
                         }
 205  
                         
 206  
                         // TODO: What is this all about?
 207  0
                         run();
 208  
                         
 209  0
                         clientProxies = this.getClients().get(qName);
 210  0
                         if (clientProxies == null || clientProxies.size() == 0) {
 211  0
                                 if ( LOG.isDebugEnabled() ) {
 212  0
                                         Map<QName, List<RemotedServiceHolder>> x = this.getClients();
 213  0
                                         if (x != null) {
 214  0
                                                 for (QName b : x.keySet()) {
 215  0
                                                         LOG.debug(b.getNamespaceURI() + " " + b.getLocalPart());
 216  
                                                 }
 217  
                                         }
 218  
                                 }
 219  0
                                 throw new RiceRemoteServiceConnectionException("No remote services available for client access when attempting to lookup '" + qName + "'");
 220  
                         }
 221  
                 }
 222  
                 // be sure to return an immutable copy of the list
 223  0
                 return Collections.unmodifiableList(new ArrayList<RemotedServiceHolder>(clientProxies));
 224  
         }
 225  
 
 226  
         public void refresh() {
 227  0
             run();
 228  0
         }
 229  
 
 230  
         public void run() {
 231  0
                 if (!isStarted()) {
 232  0
                         return;
 233  
                 }
 234  0
                 LOG.debug("Checking for new services on the bus");
 235  0
                 List<ServiceInfo> servicesOnBus = null;
 236  0
                 if (ConfigContext.getCurrentContextConfig().getDevMode()) {
 237  0
                         servicesOnBus = new ArrayList<ServiceInfo>();
 238  0
                         for (ServiceHolder remoteServiceHolder : KSBServiceLocator.getServiceDeployer().getPublishedServices().values()) {
 239  0
                                 servicesOnBus.add(remoteServiceHolder.getServiceInfo());
 240  
                         }
 241  
                 } else {
 242  0
                         servicesOnBus = KSBServiceLocator.getServiceRegistry().fetchAllActive();
 243  
                 }
 244  
 
 245  0
                 synchronized ( clientsMutex ) {
 246  0
                         if (new RoutingTableDiffCalculator().calculateClientSideUpdate(this.getClients(), servicesOnBus)) {
 247  0
                                 if ( LOG.isDebugEnabled() ) {
 248  0
                                         LOG.debug("Located new services on the bus, numServices=" + servicesOnBus.size());
 249  
                                 }
 250  0
                                 Map<QName, List<RemotedServiceHolder>> updatedRemoteServicesMap = new HashMap<QName, List<RemotedServiceHolder>>();
 251  0
                                 for (Iterator<ServiceInfo> iter = servicesOnBus.iterator(); iter.hasNext();) {
 252  0
                                         ServiceInfo entry = iter.next();
 253  0
                                         if (entry.getAlive()) {
 254  
                                                 try {
 255  0
                                                         registerClient(entry, updatedRemoteServicesMap);
 256  0
                                                 } catch (Exception e) {
 257  0
                                                         LOG.error("Unable to register client " + entry, e);
 258  0
                                                 }
 259  
                                         }
 260  0
                                 }
 261  0
                                 this.setClients(updatedRemoteServicesMap);
 262  0
                         } else {
 263  0
                                 LOG.debug("No new services on the bus.");
 264  
                         }
 265  0
                 }
 266  0
         }
 267  
 
 268  
         private void registerClient(ServiceInfo serviceInfo, Map<QName, List<RemotedServiceHolder>> clientMap) {
 269  
 
 270  
 
 271  
 
 272  0
                 if (clientMap.get(serviceInfo.getQname()) == null) {
 273  0
                         clientMap.put(serviceInfo.getQname(), new ArrayList<RemotedServiceHolder>());
 274  
                 }
 275  0
                 installAlternateEndpoint(serviceInfo);
 276  0
                 clientMap.get(serviceInfo.getQname()).add(new RemotedServiceHolder(serviceInfo));
 277  0
         }
 278  
         
 279  
         protected void installAlternateEndpoint(ServiceInfo serviceInfo) {
 280  0
         List<AlternateEndpointLocation> alternateEndpointLocations = (List<AlternateEndpointLocation>) ConfigContext
 281  
                 .getCurrentContextConfig().getObject(KSBConstants.Config.KSB_ALTERNATE_ENDPOINT_LOCATIONS);
 282  0
         if (alternateEndpointLocations != null) {
 283  0
             for (AlternateEndpointLocation alternateEndpointLocation : alternateEndpointLocations) {
 284  0
                 if (Pattern.matches(".*" + alternateEndpointLocation.getEndpointHostReplacementPattern() + ".*", serviceInfo
 285  
                         .getEndpointUrl())) {
 286  0
                     Pattern myPattern = Pattern.compile(alternateEndpointLocation.getEndpointHostReplacementPattern());
 287  0
                     Matcher myMatcher = myPattern.matcher(serviceInfo.getEndpointUrl());
 288  0
                     String alternateEndpoint = myMatcher.replaceFirst(alternateEndpointLocation
 289  
                             .getEndpointHostReplacementValue());
 290  0
                     if ( LOG.isInfoEnabled() ) {
 291  0
                             LOG.info("Found an alternate url host value ("
 292  
                                             + alternateEndpointLocation.getEndpointHostReplacementValue() + ") for endpoint: "
 293  
                                             + serviceInfo.getEndpointUrl() + " -> instead using: " + alternateEndpoint);
 294  
                     }
 295  0
                     serviceInfo.setEndpointAlternateUrl(alternateEndpoint);
 296  0
                     break;
 297  
                 }
 298  
             }
 299  
         }
 300  0
         List<AlternateEndpoint> alternateEndpoints = (List<AlternateEndpoint>) ConfigContext.getCurrentContextConfig().getObject(
 301  
                 KSBConstants.Config.KSB_ALTERNATE_ENDPOINTS);
 302  0
         if (alternateEndpoints != null) {
 303  0
             for (AlternateEndpoint alternateEndpoint : alternateEndpoints) {
 304  0
                 if (Pattern.matches(alternateEndpoint.getEndpointUrlPattern(), serviceInfo.getEndpointUrl())) {
 305  0
                         if ( LOG.isInfoEnabled() ) {
 306  0
                                 LOG.info("Found an alternate url for endpoint: " + serviceInfo.getEndpointUrl() + " -> instead using: "
 307  
                                             + alternateEndpoint.getActualEndpoint());
 308  
                         }
 309  0
                     serviceInfo.setEndpointAlternateUrl(alternateEndpoint.getActualEndpoint());
 310  0
                     break;
 311  
                 }
 312  
             }
 313  
         }
 314  0
     }
 315  
 
 316  
 
 317  
         public boolean isStarted() {
 318  0
                 return this.started;
 319  
         }
 320  
 
 321  
         public void start() throws Exception {
 322  0
                 LOG.info("Starting the RemoteResourceServiceLocator...");
 323  
 
 324  0
                 int refreshRate = ConfigContext.getCurrentContextConfig().getRefreshRate();
 325  0
                 this.future = KSBServiceLocator.getScheduledPool().scheduleWithFixedDelay(this, 30, refreshRate, TimeUnit.SECONDS);
 326  0
                 this.started = true;
 327  0
                 run();
 328  0
                 LOG.info("...RemoteResourceServiceLocator started.");
 329  0
         }
 330  
 
 331  
         public void stop() throws Exception {
 332  0
                 LOG.info("Stopping the RemoteResourceServiceLocator...");
 333  0
                 if (this.future != null) {
 334  0
                         if (!this.future.cancel(true)) {
 335  0
                                 LOG.warn("Failed to cancel the RemoteResourceServiceLocator service.");
 336  
                         }
 337  0
                         this.future = null;
 338  
                 }
 339  0
                 this.started = false;
 340  0
                 LOG.info("...RemoteResourceServiceLocator stopped.");
 341  0
         }
 342  
 
 343  
         public Object getObject(ObjectDefinition definition) {
 344  0
                 if (definition.isAtRemotingLayer()) {
 345  0
                         return null;
 346  
                 }
 347  0
                 if (StringUtils.isEmpty(definition.getServiceNamespace())) {
 348  0
                         return null;
 349  
                 }
 350  0
                 QName objectRemoterName = new QName(definition.getServiceNamespace(), KSBConstants.ServiceNames.OBJECT_REMOTER);
 351  0
                 ObjectRemoterService classRemoter = (ObjectRemoterService)GlobalResourceLoader.getService(objectRemoterName);
 352  0
                 ServiceInfo serviceInfo = classRemoter.getRemotedClassURL(definition);
 353  
 
 354  0
                 if (serviceInfo == null) {
 355  0
                         return null;
 356  
                 }
 357  
 
 358  
                 try {
 359  0
                         RemoteObjectCleanup remoteCleanup = new RemoteObjectCleanup(objectRemoterName, serviceInfo.getQname());
 360  0
                         if (TransactionSynchronizationManager.isActualTransactionActive()) {
 361  0
                             TransactionSynchronizationManager.registerSynchronization(remoteCleanup);
 362  
                         }
 363  0
                         return ServiceConnectorFactory.getServiceConnector(serviceInfo).getService();
 364  0
                 } catch (Exception e) {
 365  0
                         throw new RiceRuntimeException(e);
 366  
                 }
 367  
         }
 368  
 
 369  
         public MessageExceptionHandler getMessageExceptionHandler(QName qname) {
 370  0
                 List<RemotedServiceHolder> remotedServices = getAllServices(qname);
 371  0
                 if (remotedServices == null || remotedServices.isEmpty()) {
 372  0
                         throw new RiceRuntimeException("No services found for name " + qname);
 373  
                 }
 374  0
                 ServiceHolder serviceHolder = getRemotedServiceHolderFromList(remotedServices);
 375  0
                 if (serviceHolder != null) {
 376  0
                         String messageExceptionHandlerName = serviceHolder.getServiceInfo().getServiceDefinition().getMessageExceptionHandler();
 377  0
                         if (messageExceptionHandlerName == null) {
 378  0
                                 messageExceptionHandlerName = DefaultMessageExceptionHandler.class.getName();
 379  
                         }
 380  0
                         return (MessageExceptionHandler) GlobalResourceLoader.getObject(new ObjectDefinition(messageExceptionHandlerName));
 381  
                 }
 382  0
                 throw new RiceRuntimeException("No service with QName " + qname + " found");
 383  
         }
 384  
 
 385  
         public Map<QName, List<RemotedServiceHolder>> getClients() {
 386  0
                 synchronized ( clientsMutex ) {
 387  0
                     return this.clients;                        
 388  0
                 }
 389  
         }
 390  
 
 391  
         public void setClients(Map<QName, List<RemotedServiceHolder>> clients) {
 392  0
                 synchronized ( clientsMutex ) {
 393  0
                         this.clients = clients;
 394  0
                 }
 395  0
         }
 396  
 }