Coverage Report - org.kuali.rice.kim.service.impl.IdentityArchiveServiceImpl
 
Classes in this File Line Coverage Branch Coverage Complexity
IdentityArchiveServiceImpl
0%
0/42
0%
0/22
2.6
IdentityArchiveServiceImpl$1
N/A
N/A
2.6
IdentityArchiveServiceImpl$CallableAdapter
0%
0/8
N/A
2.6
IdentityArchiveServiceImpl$EntityArchiveWriter
0%
0/12
N/A
2.6
IdentityArchiveServiceImpl$EntityArchiveWriter$1
0%
0/8
0%
0/8
2.6
IdentityArchiveServiceImpl$EntityArchiveWriter$2
0%
0/20
0%
0/14
2.6
IdentityArchiveServiceImpl$EntityArchiveWriter$3
0%
0/10
0%
0/8
2.6
IdentityArchiveServiceImpl$PreLogCallableWrapper
0%
0/7
N/A
2.6
IdentityArchiveServiceImpl$WriteQueue
0%
0/9
0%
0/2
2.6
 
 1  
 /*
 2  
  * Copyright 2007-2009 The Kuali Foundation
 3  
  *
 4  
  * Licensed under the Educational Community License, Version 2.0 (the "License");
 5  
  * you may not use this file except in compliance with the License.
 6  
  * You may obtain a copy of the License at
 7  
  *
 8  
  * http://www.opensource.org/licenses/ecl2.php
 9  
  *
 10  
  * Unless required by applicable law or agreed to in writing, software
 11  
  * distributed under the License is distributed on an "AS IS" BASIS,
 12  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  
  * See the License for the specific language governing permissions and
 14  
  * limitations under the License.
 15  
  */
 16  
 package org.kuali.rice.kim.service.impl;
 17  
 
 18  
 import java.util.ArrayList;
 19  
 import java.util.Arrays;
 20  
 import java.util.Collection;
 21  
 import java.util.Collections;
 22  
 import java.util.Comparator;
 23  
 import java.util.HashMap;
 24  
 import java.util.HashSet;
 25  
 import java.util.List;
 26  
 import java.util.Map;
 27  
 import java.util.Set;
 28  
 import java.util.concurrent.Callable;
 29  
 import java.util.concurrent.ConcurrentLinkedQueue;
 30  
 import java.util.concurrent.TimeUnit;
 31  
 import java.util.concurrent.atomic.AtomicBoolean;
 32  
 import java.util.concurrent.atomic.AtomicInteger;
 33  
 
 34  
 import org.apache.commons.lang.StringUtils;
 35  
 import org.apache.log4j.Level;
 36  
 import org.apache.log4j.Logger;
 37  
 import org.kuali.rice.core.config.ConfigContext;
 38  
 import org.kuali.rice.core.config.RiceConfigurer;
 39  
 import org.kuali.rice.core.config.event.AfterStartEvent;
 40  
 import org.kuali.rice.core.config.event.BeforeStopEvent;
 41  
 import org.kuali.rice.core.config.event.RiceConfigEvent;
 42  
 import org.kuali.rice.core.config.event.RiceConfigEventListener;
 43  
 import org.kuali.rice.core.util.RiceConstants;
 44  
 import org.kuali.rice.kim.bo.entity.dto.KimEntityDefaultInfo;
 45  
 import org.kuali.rice.kim.bo.entity.dto.KimPrincipalInfo;
 46  
 import org.kuali.rice.kim.bo.entity.impl.KimEntityDefaultInfoCacheImpl;
 47  
 import org.kuali.rice.kim.service.IdentityArchiveService;
 48  
 import org.kuali.rice.kim.util.KimConstants;
 49  
 import org.kuali.rice.kns.service.BusinessObjectService;
 50  
 import org.kuali.rice.kns.service.KNSServiceLocator;
 51  
 import org.kuali.rice.ksb.service.KSBServiceLocator;
 52  
 import org.springframework.transaction.PlatformTransactionManager;
 53  
 import org.springframework.transaction.TransactionStatus;
 54  
 import org.springframework.transaction.support.TransactionCallback;
 55  
 import org.springframework.transaction.support.TransactionTemplate;
 56  
 
 57  
 
 58  
 /**
 59  
  * This is the default implementation for the IdentityArchiveService.
 60  
  * @see IdentityArchiveService
 61  
  * @author Kuali Rice Team (rice.collab@kuali.org)
 62  
  *
 63  
  */
 64  0
 public class IdentityArchiveServiceImpl implements IdentityArchiveService, RiceConfigEventListener {
 65  0
         private static final Logger LOG = Logger.getLogger( IdentityArchiveServiceImpl.class );
 66  
 
 67  
         private BusinessObjectService businessObjectService;
 68  
 
 69  
         private static final String EXEC_INTERVAL_SECS = "kim.identityArchiveServiceImpl.executionIntervalSeconds";
 70  
         private static final String MAX_WRITE_QUEUE_SIZE = "kim.identityArchiveServiceImpl.maxWriteQueueSize";
 71  
 
 72  0
         private int executionIntervalSeconds = 600; // by default, flush the write queue this often
 73  0
         private int maxWriteQueueSize = 300; // cache this many KEDI's before forcing write
 74  0
         private final WriteQueue writeQueue = new WriteQueue();
 75  0
         private final EntityArchiveWriter writer = new EntityArchiveWriter();
 76  
 
 77  
         // all this ceremony just decorates the writer so it logs a message first, and converts the Callable to Runnable
 78  0
         private final Runnable maxQueueSizeExceededWriter =
 79  
                 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "max size exceeded, flushing write queue"));
 80  
 
 81  
         // ditto
 82  0
         private final Runnable scheduledWriter =
 83  
                 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "scheduled write out, flushing write queue"));
 84  
 
 85  
         // ditto
 86  0
         private final Runnable shutdownWriter =
 87  
                 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "rice is shutting down, flushing write queue"));
 88  
 
 89  0
         public IdentityArchiveServiceImpl(Integer executionIntervalSeconds, Integer maxWriteQueueSize) {
 90  
                 // register for RiceConfigEventS
 91  0
                 RiceConfigurer rice =
 92  
                         (RiceConfigurer)ConfigContext.getCurrentContextConfig().getObject( RiceConstants.RICE_CONFIGURER_CONFIG_NAME );
 93  0
                 LOG.debug("registering for events...");
 94  0
                 rice.getKimConfigurer().registerConfigEventListener(this);
 95  
 
 96  0
                 if (executionIntervalSeconds != null) {
 97  0
                         this.executionIntervalSeconds = executionIntervalSeconds;
 98  
                 }
 99  
 
 100  0
                 if (maxWriteQueueSize != null) {
 101  0
                         this.maxWriteQueueSize = maxWriteQueueSize;
 102  
                 }
 103  0
         }
 104  
 
 105  
         protected BusinessObjectService getBusinessObjectService() {
 106  0
                 if ( businessObjectService == null ) {
 107  0
                         businessObjectService = KNSServiceLocator.getBusinessObjectService();
 108  
                 }
 109  0
                 return businessObjectService;
 110  
         }
 111  
 
 112  
         public KimEntityDefaultInfo getEntityDefaultInfoFromArchive( String entityId ) {
 113  0
             Map<String,String> criteria = new HashMap<String, String>(1);
 114  0
             criteria.put(KimConstants.PrimaryKeyConstants.ENTITY_ID, entityId);
 115  0
             KimEntityDefaultInfoCacheImpl cachedValue = (KimEntityDefaultInfoCacheImpl)getBusinessObjectService().findByPrimaryKey(KimEntityDefaultInfoCacheImpl.class, criteria);
 116  0
             return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
 117  
     }
 118  
 
 119  
     public KimEntityDefaultInfo getEntityDefaultInfoFromArchiveByPrincipalId( String principalId ) {
 120  0
             Map<String,String> criteria = new HashMap<String, String>(1);
 121  0
             criteria.put("principalId", principalId);
 122  0
             KimEntityDefaultInfoCacheImpl cachedValue = (KimEntityDefaultInfoCacheImpl)getBusinessObjectService().findByPrimaryKey(KimEntityDefaultInfoCacheImpl.class, criteria);
 123  0
             return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
 124  
     }
 125  
 
 126  
     @SuppressWarnings("unchecked")
 127  
         public KimEntityDefaultInfo getEntityDefaultInfoFromArchiveByPrincipalName( String principalName ) {
 128  0
             Map<String,String> criteria = new HashMap<String, String>(1);
 129  0
             criteria.put("principalName", principalName);
 130  0
             Collection<KimEntityDefaultInfoCacheImpl> entities = getBusinessObjectService().findMatching(KimEntityDefaultInfoCacheImpl.class, criteria);
 131  0
             return (entities == null || entities.size() == 0) ? null : entities.iterator().next().convertCacheToEntityDefaultInfo();
 132  
     }
 133  
 
 134  
     public void saveDefaultInfoToArchive( KimEntityDefaultInfo entity ) {
 135  
             // if the max size has been reached, schedule now
 136  0
             if (maxWriteQueueSize <= writeQueue.offerAndGetSize(entity) /* <- this enqueues the KEDI */ &&
 137  
                             writer.requestSubmit()) {
 138  0
                     KSBServiceLocator.getThreadPool().execute(maxQueueSizeExceededWriter);
 139  
             }
 140  0
     }
 141  
 
 142  
     /**
 143  
      * <h4>On events:</h4>
 144  
      * <p>{@link AfterStartEvent}: schedule the writer on the KSB scheduled pool
 145  
      * <p>{@link BeforeStopEvent}: flush the write queue immediately
 146  
      *
 147  
      * @see org.springframework.context.ApplicationListener#onApplicationEvent(org.springframework.context.ApplicationEvent)
 148  
      */
 149  
     public void onEvent(final RiceConfigEvent event) {
 150  0
             if (event instanceof AfterStartEvent) {
 151  
                     // on startup, schedule this to run
 152  0
                     LOG.info("scheduling writer...");
 153  0
                     KSBServiceLocator.getScheduledPool().scheduleAtFixedRate(scheduledWriter,
 154  
                                     executionIntervalSeconds, executionIntervalSeconds, TimeUnit.SECONDS);
 155  0
             } else if (event instanceof BeforeStopEvent) {
 156  0
                     KSBServiceLocator.getThreadPool().execute(shutdownWriter);
 157  
             }
 158  0
     }
 159  
 
 160  
         /**
 161  
          * store the person to the database, but do this an alternate thread to
 162  
          * prevent transaction issues since this service is non-transactional
 163  
          *
 164  
          * @author Kuali Rice Team (rice.collab@kuali.org)
 165  
          *
 166  
          */
 167  0
         private class EntityArchiveWriter implements Callable {
 168  
 
 169  
                 // flag used to prevent multiple processes from being submitted at once
 170  0
                 AtomicBoolean currentlySubmitted = new AtomicBoolean(false);
 171  
 
 172  0
                 private final Comparator<Comparable> nullSafeComparator = new Comparator<Comparable>() {
 173  
                         public int compare(Comparable i1, Comparable i2) {
 174  0
                                 if (i1 != null && i2 != null) {
 175  0
                                         return i1.compareTo(i2);
 176  0
                                 } else if (i1 == null) {
 177  0
                                         if (i2 == null) {
 178  0
                                                 return 0;
 179  
                                         } else {
 180  0
                                                 return -1;
 181  
                                         }
 182  
                                 } else { // if (entityId2 == null) {
 183  0
                                         return 1;
 184  
                                 }
 185  
                         };
 186  
                 };
 187  
 
 188  
                 /**
 189  
                  * Comparator that attempts to impose a total ordering on KimEntityDefaultInfo instances
 190  
                  */
 191  0
                 private final Comparator<KimEntityDefaultInfo> kediComparator = new Comparator<KimEntityDefaultInfo>() {
 192  
                         /**
 193  
                          * compares by entityId value
 194  
                          * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
 195  
                          */
 196  
                         public int compare(KimEntityDefaultInfo o1, KimEntityDefaultInfo o2) {
 197  0
                                 String entityId1 = (o1 == null) ? null : o1.getEntityId();
 198  0
                                 String entityId2 = (o2 == null) ? null : o2.getEntityId();
 199  
 
 200  0
                                 int result = nullSafeComparator.compare(entityId1, entityId2);
 201  
 
 202  0
                                 if (result == 0) {
 203  0
                                         result = getPrincipalIdsString(o1).compareTo(getPrincipalIdsString(o2));
 204  
                                 }
 205  
 
 206  0
                                 return result;
 207  
                         }
 208  
 
 209  
                         /**
 210  
                          * This method builds a newline delimited String containing the entity's principal IDs in sorted order
 211  
                          *
 212  
                          * @param entity
 213  
                          * @return
 214  
                          */
 215  
                         private String getPrincipalIdsString(KimEntityDefaultInfo entity) {
 216  0
                                 String result = "";
 217  0
                                 if (entity != null) {
 218  0
                                         List<KimPrincipalInfo> principals = entity.getPrincipals();
 219  0
                                         if (principals != null) {
 220  0
                                                 if (principals.size() == 1) { // one
 221  0
                                                         result = principals.get(0).getPrincipalId();
 222  
                                                 } else { // multiple
 223  0
                                                         String [] ids = new String [principals.size()];
 224  0
                                                         int insertIndex = 0;
 225  0
                                                         for (KimPrincipalInfo principal : principals) {
 226  0
                                                                 ids[insertIndex++] = principal.getPrincipalId();
 227  
                                                         }
 228  0
                                                         Arrays.sort(ids);
 229  0
                                                         result = StringUtils.join(ids, "\n");
 230  
                                                 }
 231  
                                         }
 232  
                                 }
 233  0
                                 return result;
 234  
                         }
 235  
                 };
 236  
 
 237  
                 public boolean requestSubmit() {
 238  0
                         return currentlySubmitted.compareAndSet(false, true);
 239  
                 }
 240  
 
 241  
                 /**
 242  
                  * Call that tries to flush the write queue.
 243  
                  * @see Callable#call()
 244  
                  */
 245  
                 public Object call() {
 246  
                         try {
 247  
                                 // the strategy is to grab chunks of entities, dedupe & sort them, and insert them in a big
 248  
                                 // batch to reduce transaction overhead.  Sorting is done so insertion order is guaranteed, which
 249  
                                 // prevents deadlocks between concurrent writers to the database.
 250  0
                                 PlatformTransactionManager transactionManager = KNSServiceLocator.getTransactionManager();
 251  0
                                 TransactionTemplate template = new TransactionTemplate(transactionManager);
 252  0
                                 template.execute(new TransactionCallback() {
 253  
                                         public Object doInTransaction(TransactionStatus status) {
 254  0
                                                 KimEntityDefaultInfo entity = null;
 255  0
                                                 ArrayList<KimEntityDefaultInfo> entitiesToInsert = new ArrayList<KimEntityDefaultInfo>(maxWriteQueueSize);
 256  0
                                                 Set<String> deduper = new HashSet<String>(maxWriteQueueSize);
 257  
 
 258  
                                                 // order is important in this conditional so that elements aren't dequeued and then ignored
 259  0
                                                 while (entitiesToInsert.size() < maxWriteQueueSize && null != (entity = writeQueue.poll())) {
 260  0
                                                         if (deduper.add(entity.getEntityId())) entitiesToInsert.add(entity);
 261  
                                                 }
 262  
 
 263  0
                                                 Collections.sort(entitiesToInsert, kediComparator);
 264  
 
 265  0
                                                 for (KimEntityDefaultInfo entityToInsert : entitiesToInsert) {
 266  0
                                                         getBusinessObjectService().save( new KimEntityDefaultInfoCacheImpl( entityToInsert ) );
 267  
                                                 }
 268  0
                                                 return null;
 269  
                                         }
 270  
                                 });
 271  0
                         } finally { // make sure our running flag is unset, otherwise we'll never run again
 272  0
                                 currentlySubmitted.compareAndSet(true, false);
 273  0
                         }
 274  
 
 275  0
                         return Boolean.TRUE;
 276  
                 }
 277  
         }
 278  
 
 279  
         /**
 280  
          * A class encapsulating a {@link ConcurrentLinkedQueue} and an {@link AtomicInteger} to
 281  
          * provide fast offer(enqueue)/poll(dequeue) and size checking.  Size may be approximate due to concurrent
 282  
          * activity, but for our purposes that is fine.
 283  
          *
 284  
          * @author Kuali Rice Team (rice.collab@kuali.org)
 285  
          *
 286  
          */
 287  0
         private static class WriteQueue {
 288  0
                 AtomicInteger writeQueueSize = new AtomicInteger(0);
 289  0
                 ConcurrentLinkedQueue<KimEntityDefaultInfo> queue = new ConcurrentLinkedQueue<KimEntityDefaultInfo>();
 290  
 
 291  
                 public int offerAndGetSize(KimEntityDefaultInfo entity) {
 292  0
                         queue.add(entity);
 293  0
                         return writeQueueSize.incrementAndGet();
 294  
                 }
 295  
 
 296  
                 private KimEntityDefaultInfo poll() {
 297  0
                         KimEntityDefaultInfo result = queue.poll();
 298  0
                         if (result != null) { writeQueueSize.decrementAndGet(); }
 299  0
                         return result;
 300  
                 }
 301  
 
 302  
                 private int getSize() {
 303  0
                         return writeQueueSize.get();
 304  
                 }
 305  
         }
 306  
 
 307  
         /**
 308  
          * decorator for a callable to log a message before it is executed
 309  
          *
 310  
          * @author Kuali Rice Team (rice.collab@kuali.org)
 311  
          *
 312  
          */
 313  
         private static class PreLogCallableWrapper<A> implements Callable<A> {
 314  
 
 315  
                 private final Callable inner;
 316  
                 private final Level level;
 317  
                 private final String message;
 318  
 
 319  0
                 public PreLogCallableWrapper(Callable inner, Level level, String message) {
 320  0
                         this.inner = inner;
 321  0
                         this.level = level;
 322  0
                         this.message = message;
 323  0
                 }
 324  
 
 325  
                 /**
 326  
                  * logs the message then calls the inner Callable
 327  
                  *
 328  
                  * @see java.util.concurrent.Callable#call()
 329  
                  */
 330  
                 @SuppressWarnings("unchecked")
 331  
                 public A call() throws Exception {
 332  0
                         LOG.log(level, message);
 333  0
                         return (A)inner.call();
 334  
                 }
 335  
         }
 336  
 
 337  
         /**
 338  
          * Adapts a Callable to be Runnable
 339  
          *
 340  
          * @author Kuali Rice Team (rice.collab@kuali.org)
 341  
          *
 342  
          */
 343  
         private static class CallableAdapter implements Runnable {
 344  
 
 345  
                 private final Callable callable;
 346  
 
 347  0
                 public CallableAdapter(Callable callable) {
 348  0
                         this.callable = callable;
 349  0
                 }
 350  
 
 351  
                 public void run() {
 352  
                         try {
 353  0
                                 callable.call();
 354  0
                         } catch (Exception e) {
 355  0
                                 throw new RuntimeException(e);
 356  0
                         }
 357  0
                 }
 358  
         }
 359  
 }