Coverage Report - org.kuali.rice.core.impl.cache.DistributedCacheManagerDecorator
 
Classes in this File Line Coverage Branch Coverage Complexity
DistributedCacheManagerDecorator
0%
0/43
0%
0/22
2.231
DistributedCacheManagerDecorator$1
N/A
N/A
2.231
DistributedCacheManagerDecorator$CacheMessageSendingTransactionSynchronization
0%
0/18
0%
0/6
2.231
DistributedCacheManagerDecorator$CacheMessageSendingTransactionSynchronization$1
0%
0/2
0%
0/6
2.231
DistributedCacheManagerDecorator$DistributedCacheDecorator
0%
0/34
0%
0/14
2.231
DistributedCacheManagerDecorator$DistributedCacheException
0%
0/3
N/A
2.231
 
 1  
 /**
 2  
  * Copyright 2005-2011 The Kuali Foundation
 3  
  *
 4  
  * Licensed under the Educational Community License, Version 2.0 (the "License");
 5  
  * you may not use this file except in compliance with the License.
 6  
  * You may obtain a copy of the License at
 7  
  *
 8  
  * http://www.opensource.org/licenses/ecl2.php
 9  
  *
 10  
  * Unless required by applicable law or agreed to in writing, software
 11  
  * distributed under the License is distributed on an "AS IS" BASIS,
 12  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  
  * See the License for the specific language governing permissions and
 14  
  * limitations under the License.
 15  
  */
 16  
 package org.kuali.rice.core.impl.cache;
 17  
 
 18  
 import com.google.common.base.Predicate;
 19  
 import com.google.common.collect.Collections2;
 20  
 import org.apache.commons.lang.StringUtils;
 21  
 import org.apache.commons.logging.Log;
 22  
 import org.apache.commons.logging.LogFactory;
 23  
 import org.kuali.rice.core.api.cache.CacheService;
 24  
 import org.kuali.rice.core.api.cache.CacheTarget;
 25  
 import org.kuali.rice.ksb.api.KsbApiServiceLocator;
 26  
 import org.kuali.rice.ksb.api.bus.Endpoint;
 27  
 import org.springframework.beans.factory.BeanNameAware;
 28  
 import org.springframework.beans.factory.InitializingBean;
 29  
 import org.springframework.beans.factory.NamedBean;
 30  
 import org.springframework.cache.Cache;
 31  
 import org.springframework.cache.CacheManager;
 32  
 import org.springframework.transaction.support.TransactionSynchronization;
 33  
 import org.springframework.transaction.support.TransactionSynchronizationAdapter;
 34  
 import org.springframework.transaction.support.TransactionSynchronizationManager;
 35  
 
 36  
 import javax.xml.namespace.QName;
 37  
 import java.util.ArrayList;
 38  
 import java.util.Collection;
 39  
 import java.util.Collections;
 40  
 import java.util.HashSet;
 41  
 import java.util.List;
 42  
 import java.util.Queue;
 43  
 import java.util.Set;
 44  
 import java.util.concurrent.LinkedBlockingQueue;
 45  
 
 46  
 /**
 47  
  * A distributed cache manager that wraps a cache manager and adds distributed cache capabilities
 48  
  * through the kuali service bus.
 49  
  *
 50  
  * <p>
 51  
  * If in a transaction, distributed cache messages are queued until a transaction completes successfully.
 52  
  * They are then sent as a single message rather than sending individual messages.
 53  
  * If the transaction does not complete successfully then all messages are discarded.
 54  
  * </p>
 55  
  *
 56  
  * <p>
 57  
  * If not in a transaction, distributed messages are sent immediately.  This should be avoided and is likely
 58  
  * the result of a programming error.
 59  
  * </p>
 60  
  */
 61  0
 public final class DistributedCacheManagerDecorator implements CacheManager, InitializingBean, BeanNameAware, NamedBean {
 62  0
     private static final Log LOG = LogFactory.getLog(DistributedCacheManagerDecorator.class);
 63  
 
 64  
     private CacheManager cacheManager;
 65  
     private String serviceName;
 66  
     private String name;
 67  
 
 68  
     @Override
 69  
     public Cache getCache(String name) {
 70  0
         return wrap(cacheManager.getCache(name));
 71  
     }
 72  
 
 73  
     @Override
 74  
     public Collection<String> getCacheNames() {
 75  0
         return cacheManager.getCacheNames();
 76  
     }
 77  
 
 78  
     private Cache wrap(Cache cache) {
 79  
         //just in case they are cached do not want to wrap twice. Obviously this only works
 80  
         //if the Cache isn't wrapped a second time. Don't want to wrap a null cache!
 81  0
         if (!(cache instanceof DistributedCacheDecorator) && cache != null) {
 82  0
             return new DistributedCacheDecorator(cache);
 83  
         }
 84  0
         return cache;
 85  
     }
 86  
 
 87  
     private Collection<CacheService> getCacheServices() {
 88  
         try {
 89  0
             List<CacheService> services = new ArrayList<CacheService>();
 90  0
             final Collection<Endpoint> endpoints = KsbApiServiceLocator.getServiceBus().getEndpoints(QName.valueOf(serviceName));
 91  0
             for (Endpoint endpoint : endpoints) {
 92  0
                 services.add((CacheService)endpoint.getService());
 93  
             }
 94  0
             return services != null ? services : Collections.<CacheService>emptyList();
 95  0
         } catch (RuntimeException e) {
 96  0
             LOG.warn("Failed to find any remote services with name: " + serviceName);
 97  
         }
 98  0
         return Collections.emptyList();
 99  
     }
 100  
 
 101  
     private void flushCache(Collection<CacheTarget> cacheTargets) {
 102  
         try {
 103  
             //this code may not look the prettiest but we do not want to call into the KSB
 104  
             //unless it is necessary to do so.  Also, handling most common fail points to avoid
 105  
             //exceptions.
 106  0
             if (!cacheTargets.isEmpty()) {
 107  0
                 final Collection<CacheService> services = getCacheServices();
 108  0
                 if (services != null) {
 109  0
                     for (CacheService service : services) {
 110  0
                         if (service != null) {
 111  
                             //wrap the each call in a try block so if one message send fails
 112  
                             //we still attempt the others
 113  
                             try {
 114  0
                                 service.flush(cacheTargets);
 115  0
                             } catch (Throwable t) {
 116  0
                                 LOG.error("failed to flush the queue for specific endpoint for serviceName "
 117  
                                         + serviceName, t);
 118  0
                             }
 119  
                         }
 120  
                     }
 121  
                 }
 122  
             }
 123  0
         } catch (Throwable t) {
 124  0
             LOG.error("failed to execute distributed flush for serviceName " + serviceName, t);
 125  0
         }
 126  0
     }
 127  
 
 128  
     @Override
 129  
     public void afterPropertiesSet() {
 130  0
         if (cacheManager == null) {
 131  0
             throw new IllegalStateException("cacheManager was null");
 132  
         }
 133  
 
 134  0
         if (StringUtils.isBlank(serviceName)) {
 135  0
             throw new IllegalStateException("serviceName was null or blank");
 136  
         }
 137  
 
 138  0
         if (StringUtils.isBlank(name)) {
 139  0
             name = "NOT_NAMED";
 140  
         }
 141  0
     }
 142  
 
 143  
     public void setCacheManager(CacheManager cacheManager) {
 144  0
         this.cacheManager = cacheManager;
 145  0
     }
 146  
 
 147  
     public void setServiceName(String serviceName) {
 148  0
         this.serviceName = serviceName;
 149  0
     }
 150  
 
 151  
     @Override
 152  
     public String getBeanName() {
 153  0
         return name;
 154  
     }
 155  
 
 156  
     @Override
 157  
     public void setBeanName(String name) {
 158  0
         this.name = name;
 159  0
     }
 160  
 
 161  
     /**
 162  
      * a cache wrapper that adds distributed cache flush capabilities.  Note: that all cache keys are
 163  
      * coerced to a String.  This means that all cache keys must have well-behaved toString methods.
 164  
      */
 165  0
     private final class DistributedCacheDecorator implements Cache {
 166  
 
 167  
         private final Cache cache;
 168  
 
 169  0
         private DistributedCacheDecorator(Cache cache) {
 170  0
             this.cache = cache;
 171  0
         }
 172  
 
 173  
         @Override
 174  
         public String getName() {
 175  0
             return cache.getName();
 176  
         }
 177  
 
 178  
         @Override
 179  
         public Object getNativeCache() {
 180  0
             return cache.getNativeCache();
 181  
         }
 182  
 
 183  
         @Override
 184  
         public ValueWrapper get(Object key) {
 185  0
             final String sKey = coerceStr(key);
 186  0
             return cache.get(sKey);
 187  
         }
 188  
 
 189  
         @Override
 190  
         public void put(Object key, Object value) {
 191  0
             final String sKey = coerceStr(key);
 192  0
             cache.put(sKey, value);
 193  0
         }
 194  
 
 195  
         @Override
 196  
         public void evict(Object key) {
 197  0
             final String sKey = coerceStr(key);
 198  0
             cache.evict(sKey);
 199  0
             doDistributed(CacheTarget.singleEntry(getName(), sKey));
 200  0
         }
 201  
 
 202  
         @Override
 203  
         public void clear() {
 204  0
             cache.clear();
 205  0
             doDistributed(CacheTarget.entireCache(getName()));
 206  0
         }
 207  
 
 208  
         private String coerceStr(Object key) {
 209  0
             return key != null ? key.toString() : (String) key;
 210  
         }
 211  
 
 212  
         /**
 213  
          * Sends a cache target message to distributed endpoints.  It will either send it in a delayed fashion when
 214  
          * bound to a transaction or immediately if no transaction is present.
 215  
          * @param target the cache target.  cannot be null.
 216  
          */
 217  
         private void doDistributed(CacheTarget target) {
 218  0
             if (doTransactionalFlush()) {
 219  0
                 final CacheMessageSendingTransactionSynchronization ts = getCacheMessageSendingTransactionSynchronization();
 220  
                 //adding to internal queue.  the Synchronization is already registered at this point
 221  0
                 ts.add(target);
 222  0
             } else {
 223  0
                 flushCache(Collections.singleton(target));
 224  
             }
 225  0
         }
 226  
 
 227  
         /**
 228  
          * Gets the {@link CacheMessageSendingTransactionSynchronization} that is registered with the transaction.  If no
 229  
          * synchronization is currently registered, then one is created and registered.
 230  
          * @return the synchronization. will no return null.
 231  
          */
 232  
         private CacheMessageSendingTransactionSynchronization getCacheMessageSendingTransactionSynchronization() {
 233  0
             final Collection<TransactionSynchronization> sycs = TransactionSynchronizationManager.getSynchronizations();
 234  0
             if (sycs != null) {
 235  0
                 for (final TransactionSynchronization ts : sycs) {
 236  0
                     if (ts instanceof CacheMessageSendingTransactionSynchronization) {
 237  0
                         return (CacheMessageSendingTransactionSynchronization) ts;
 238  
                     }
 239  
                 }
 240  
             }
 241  0
             final CacheMessageSendingTransactionSynchronization ts = new CacheMessageSendingTransactionSynchronization();
 242  0
             TransactionSynchronizationManager.registerSynchronization(ts);
 243  0
             return ts;
 244  
         }
 245  
 
 246  
         /**
 247  
          * Should a transaction bound flush be performed?
 248  
          * @return true for transaction based flushing
 249  
          */
 250  
         private boolean doTransactionalFlush() {
 251  0
             return TransactionSynchronizationManager.isSynchronizationActive() && TransactionSynchronizationManager.isActualTransactionActive();
 252  
         }
 253  
     }
 254  
 
 255  
     /**
 256  
      * A TransactionSynchronizer that contains a queue of pending messages.  After the initial creation of this
 257  
      * synchronizer and when in the same transaction, this instance should be retrieved from the Spring Transaction
 258  
      * registry and messages should be added to the internal queue.  This way messages can be "bundled" into a single
 259  
      * message send.
 260  
      *
 261  
      * <p>
 262  
      *     This synchronizer will only send cache messages when the transaction complete successfully.
 263  
      * </p>
 264  
      */
 265  0
     private final class CacheMessageSendingTransactionSynchronization extends TransactionSynchronizationAdapter {
 266  
 
 267  0
         private final LinkedBlockingQueue<CacheTarget> flushQueue = new LinkedBlockingQueue<CacheTarget>();
 268  
 
 269  
         private void add(CacheTarget target) throws DistributedCacheException {
 270  
             try {
 271  0
                 flushQueue.put(target);
 272  0
             } catch (InterruptedException e) {
 273  0
                 throw new DistributedCacheException(e);
 274  0
             }
 275  0
         }
 276  
 
 277  
         @Override
 278  
         public void afterCompletion(int status) {
 279  0
             if (status == STATUS_COMMITTED) {
 280  0
                 flushCache(exhaustQueue(flushQueue));
 281  
             } else {
 282  
                 //clear the queue. destroys all messages
 283  0
                 flushQueue.clear();
 284  
             }
 285  0
         }
 286  
 
 287  
         /**
 288  
          * Iterates over the passed in {@link Queue} calling the {@link Queue#poll} for each item.
 289  
          *
 290  
          * The returned list will also be normalized such that cache targets with keys will not be
 291  
          * present in the returned collection if a cache target exists for the same cache but
 292  
          * w/o a key (a complete cache flush)
 293  
          *
 294  
          * @param targets the queue to iterate over and exhaust
 295  
          * @return a new collection containing CacheTargets
 296  
          */
 297  
         private Collection<CacheTarget> exhaustQueue(Queue<CacheTarget> targets) {
 298  0
             final List<CacheTarget> normalized = new ArrayList<CacheTarget>();
 299  0
             final Set<String> completeFlush = new HashSet<String>();
 300  
 
 301  
             CacheTarget target;
 302  0
             while ((target = targets.poll()) != null) {
 303  0
                 normalized.add(target);
 304  0
                 if (!target.containsKey()) {
 305  0
                     completeFlush.add(target.getCache());
 306  
                 }
 307  
             }
 308  
 
 309  0
             return Collections2.filter(normalized, new Predicate<CacheTarget>() {
 310  
                 @Override
 311  
                 public boolean apply(CacheTarget input) {
 312  0
                     return !input.containsKey() || (input.containsKey() && !completeFlush.contains(input.getCache()));
 313  
                 }
 314  
             });
 315  
         }
 316  
     }
 317  
 
 318  0
     private static final class DistributedCacheException extends RuntimeException {
 319  
         private DistributedCacheException(Throwable cause) {
 320  0
             super(cause);
 321  0
         }
 322  
     }
 323  
 }