Coverage Report - org.kuali.rice.kim.service.impl.IdentityArchiveServiceImpl
 
Classes in this File Line Coverage Branch Coverage Complexity
IdentityArchiveServiceImpl
0%
0/48
0%
0/20
2.773
IdentityArchiveServiceImpl$1
N/A
N/A
2.773
IdentityArchiveServiceImpl$CallableAdapter
0%
0/8
N/A
2.773
IdentityArchiveServiceImpl$EntityArchiveWriter
0%
0/11
N/A
2.773
IdentityArchiveServiceImpl$EntityArchiveWriter$1
0%
0/8
0%
0/8
2.773
IdentityArchiveServiceImpl$EntityArchiveWriter$2
0%
0/20
0%
0/14
2.773
IdentityArchiveServiceImpl$EntityArchiveWriter$3
0%
0/13
0%
0/8
2.773
IdentityArchiveServiceImpl$PreLogCallableWrapper
0%
0/7
N/A
2.773
IdentityArchiveServiceImpl$WriteQueue
0%
0/8
0%
0/2
2.773
 
 1  
 /**
 2  
  * Copyright 2005-2011 The Kuali Foundation
 3  
  *
 4  
  * Licensed under the Educational Community License, Version 2.0 (the "License");
 5  
  * you may not use this file except in compliance with the License.
 6  
  * You may obtain a copy of the License at
 7  
  *
 8  
  * http://www.opensource.org/licenses/ecl2.php
 9  
  *
 10  
  * Unless required by applicable law or agreed to in writing, software
 11  
  * distributed under the License is distributed on an "AS IS" BASIS,
 12  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  
  * See the License for the specific language governing permissions and
 14  
  * limitations under the License.
 15  
  */
 16  
 package org.kuali.rice.kim.service.impl;
 17  
 
 18  
 import org.apache.commons.lang.StringUtils;
 19  
 import org.apache.log4j.Level;
 20  
 import org.apache.log4j.Logger;
 21  
 import org.kuali.rice.core.api.config.property.ConfigurationService;
 22  
 import org.kuali.rice.kim.api.KimConstants;
 23  
 import org.kuali.rice.kim.impl.identity.IdentityArchiveService;
 24  
 import org.kuali.rice.kim.api.identity.entity.EntityDefault;
 25  
 import org.kuali.rice.kim.api.identity.principal.Principal;
 26  
 import org.kuali.rice.kim.impl.identity.EntityDefaultInfoCacheBo;
 27  
 import org.kuali.rice.krad.service.BusinessObjectService;
 28  
 import org.kuali.rice.krad.service.KRADServiceLocatorInternal;
 29  
 import org.kuali.rice.ksb.service.KSBServiceLocator;
 30  
 import org.springframework.beans.factory.DisposableBean;
 31  
 import org.springframework.beans.factory.InitializingBean;
 32  
 import org.springframework.transaction.PlatformTransactionManager;
 33  
 import org.springframework.transaction.TransactionStatus;
 34  
 import org.springframework.transaction.support.TransactionCallback;
 35  
 import org.springframework.transaction.support.TransactionTemplate;
 36  
 
 37  
 import java.util.ArrayList;
 38  
 import java.util.Arrays;
 39  
 import java.util.Collection;
 40  
 import java.util.Collections;
 41  
 import java.util.Comparator;
 42  
 import java.util.HashMap;
 43  
 import java.util.HashSet;
 44  
 import java.util.List;
 45  
 import java.util.Map;
 46  
 import java.util.Set;
 47  
 import java.util.concurrent.Callable;
 48  
 import java.util.concurrent.ConcurrentLinkedQueue;
 49  
 import java.util.concurrent.TimeUnit;
 50  
 import java.util.concurrent.atomic.AtomicBoolean;
 51  
 import java.util.concurrent.atomic.AtomicInteger;
 52  
 
 53  
 /**
 54  
  * This is the default implementation for the IdentityArchiveService.
 55  
  * @see IdentityArchiveService
 56  
  * @author Kuali Rice Team (rice.collab@kuali.org)
 57  
  *
 58  
  */
 59  0
 public class IdentityArchiveServiceImpl implements IdentityArchiveService, InitializingBean, DisposableBean {
 60  0
         private static final Logger LOG = Logger.getLogger( IdentityArchiveServiceImpl.class );
 61  
 
 62  
         private BusinessObjectService businessObjectService;
 63  
         private ConfigurationService kualiConfigurationService;
 64  
 
 65  
         private static final String EXEC_INTERVAL_SECS = "kim.identityArchiveServiceImpl.executionIntervalSeconds";
 66  
         private static final String MAX_WRITE_QUEUE_SIZE = "kim.identityArchiveServiceImpl.maxWriteQueueSize";
 67  
         private static final int EXECUTION_INTERVAL_SECONDS_DEFAULT = 600; // by default, flush the write queue this often
 68  
         private static final int MAX_WRITE_QUEUE_SIZE_DEFAULT = 300; // cache this many KEDI's before forcing write
 69  
 
 70  0
         private final WriteQueue writeQueue = new WriteQueue();
 71  0
         private final EntityArchiveWriter writer = new EntityArchiveWriter();
 72  
 
 73  
         // all this ceremony just decorates the writer so it logs a message first, and converts the Callable to Runnable
 74  0
         private final Runnable maxQueueSizeExceededWriter =
 75  
                 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "max size exceeded, flushing write queue"));
 76  
 
 77  
         // ditto
 78  0
         private final Runnable scheduledWriter =
 79  
                 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "scheduled write out, flushing write queue"));
 80  
 
 81  
         // ditto
 82  0
         private final Runnable shutdownWriter =
 83  
                 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "rice is shutting down, flushing write queue"));
 84  
         
 85  
         private int getExecutionIntervalSeconds() {
 86  0
                 final String prop = kualiConfigurationService.getPropertyValueAsString(EXEC_INTERVAL_SECS);
 87  
                 try {
 88  0
                         return Integer.valueOf(prop).intValue();
 89  0
                 } catch (NumberFormatException e) {
 90  0
                         return EXECUTION_INTERVAL_SECONDS_DEFAULT;
 91  
                 }
 92  
         }
 93  
         
 94  
         private int getMaxWriteQueueSize() {
 95  0
                 final String prop = kualiConfigurationService.getPropertyValueAsString(MAX_WRITE_QUEUE_SIZE);
 96  
                 try {
 97  0
                         return Integer.valueOf(prop).intValue();
 98  0
                 } catch (NumberFormatException e) {
 99  0
                         return MAX_WRITE_QUEUE_SIZE_DEFAULT;
 100  
                 }
 101  
         }
 102  
 
 103  
         @Override
 104  
         public EntityDefault getEntityDefaultFromArchive( String entityId ) {
 105  0
             if (StringUtils.isBlank(entityId)) {
 106  0
             throw new IllegalArgumentException("entityId is blank");
 107  
         }
 108  
 
 109  0
         Map<String,String> criteria = new HashMap<String, String>(1);
 110  0
             criteria.put(KimConstants.PrimaryKeyConstants.SUB_ENTITY_ID, entityId);
 111  0
             EntityDefaultInfoCacheBo cachedValue = businessObjectService.findByPrimaryKey(EntityDefaultInfoCacheBo.class, criteria);
 112  0
             return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
 113  
     }
 114  
 
 115  
     @Override
 116  
         public EntityDefault getEntityDefaultFromArchiveByPrincipalId(String principalId) {
 117  0
             if (StringUtils.isBlank(principalId)) {
 118  0
             throw new IllegalArgumentException("principalId is blank");
 119  
         }
 120  
 
 121  0
         Map<String,String> criteria = new HashMap<String, String>(1);
 122  0
             criteria.put("principalId", principalId);
 123  0
             EntityDefaultInfoCacheBo cachedValue = businessObjectService.findByPrimaryKey(EntityDefaultInfoCacheBo.class, criteria);
 124  0
             return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
 125  
     }
 126  
 
 127  
     @Override
 128  
         public EntityDefault getEntityDefaultFromArchiveByPrincipalName(String principalName) {
 129  0
             if (StringUtils.isBlank(principalName)) {
 130  0
             throw new IllegalArgumentException("principalName is blank");
 131  
         }
 132  
 
 133  0
         Map<String,String> criteria = new HashMap<String, String>(1);
 134  0
             criteria.put("principalName", principalName);
 135  0
             Collection<EntityDefaultInfoCacheBo> entities = businessObjectService.findMatching(EntityDefaultInfoCacheBo.class, criteria);
 136  0
             return (entities == null || entities.isEmpty()) ? null : entities.iterator().next().convertCacheToEntityDefaultInfo();
 137  
     }
 138  
 
 139  
     @Override
 140  
         public void saveEntityDefaultToArchive(EntityDefault entity) {
 141  0
             if (entity == null) {
 142  0
             throw new IllegalArgumentException("entity is blank");
 143  
         }
 144  
 
 145  
             // if the max size has been reached, schedule now
 146  0
             if (getMaxWriteQueueSize() <= writeQueue.offerAndGetSize(entity) /* <- this enqueues the KEDI */ &&
 147  
                             writer.requestSubmit()) {
 148  0
                     KSBServiceLocator.getThreadPool().execute(maxQueueSizeExceededWriter);
 149  
             }
 150  0
     }
 151  
 
 152  
         public void setBusinessObjectService(BusinessObjectService businessObjectService) {
 153  0
                 this.businessObjectService = businessObjectService;
 154  0
         }
 155  
 
 156  
         public void setKualiConfigurationService(
 157  
                         ConfigurationService kualiConfigurationService) {
 158  0
                 this.kualiConfigurationService = kualiConfigurationService;
 159  0
         }
 160  
     
 161  
     /** schedule the writer on the KSB scheduled pool. */
 162  
         @Override
 163  
         public void afterPropertiesSet() throws Exception {
 164  0
                 LOG.info("scheduling writer...");
 165  0
                 KSBServiceLocator.getScheduledPool().scheduleAtFixedRate(scheduledWriter,
 166  
                                 getExecutionIntervalSeconds(), getExecutionIntervalSeconds(), TimeUnit.SECONDS);
 167  0
         }
 168  
 
 169  
         /** flush the write queue immediately. */
 170  
         @Override
 171  
         public void destroy() throws Exception {
 172  0
                 KSBServiceLocator.getThreadPool().execute(shutdownWriter);
 173  0
         }
 174  
 
 175  
         /**
 176  
          * store the person to the database, but do this an alternate thread to
 177  
          * prevent transaction issues since this service is non-transactional
 178  
          *
 179  
          * @author Kuali Rice Team (rice.collab@kuali.org)
 180  
          *
 181  
          */
 182  0
         private class EntityArchiveWriter implements Callable {
 183  
 
 184  
                 // flag used to prevent multiple processes from being submitted at once
 185  0
                 AtomicBoolean currentlySubmitted = new AtomicBoolean(false);
 186  
 
 187  0
                 private final Comparator<Comparable> nullSafeComparator = new Comparator<Comparable>() {
 188  
                         @Override
 189  
                         public int compare(Comparable i1, Comparable i2) {
 190  0
                                 if (i1 != null && i2 != null) {
 191  0
                                         return i1.compareTo(i2);
 192  0
                                 } else if (i1 == null) {
 193  0
                                         if (i2 == null) {
 194  0
                                                 return 0;
 195  
                                         } else {
 196  0
                                                 return -1;
 197  
                                         }
 198  
                                 } else { // if (entityId2 == null) {
 199  0
                                         return 1;
 200  
                                 }
 201  
                         };
 202  
                 };
 203  
 
 204  
                 /**
 205  
                  * Comparator that attempts to impose a total ordering on EntityDefault instances
 206  
                  */
 207  0
                 private final Comparator<EntityDefault> kediComparator = new Comparator<EntityDefault>() {
 208  
                         /**
 209  
                          * compares by entityId value
 210  
                          * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
 211  
                          */
 212  
                         @Override
 213  
                         public int compare(EntityDefault o1, EntityDefault o2) {
 214  0
                                 String entityId1 = (o1 == null) ? null : o1.getEntityId();
 215  0
                                 String entityId2 = (o2 == null) ? null : o2.getEntityId();
 216  
 
 217  0
                                 int result = nullSafeComparator.compare(entityId1, entityId2);
 218  
 
 219  0
                                 if (result == 0) {
 220  0
                                         result = getPrincipalIdsString(o1).compareTo(getPrincipalIdsString(o2));
 221  
                                 }
 222  
 
 223  0
                                 return result;
 224  
                         }
 225  
 
 226  
                         /**
 227  
                          * This method builds a newline delimited String containing the identity's principal IDs in sorted order
 228  
                          *
 229  
                          * @param entity
 230  
                          * @return
 231  
                          */
 232  
                         private String getPrincipalIdsString(EntityDefault entity) {
 233  0
                                 String result = "";
 234  0
                                 if (entity != null) {
 235  0
                                         List<Principal> principals = entity.getPrincipals();
 236  0
                                         if (principals != null) {
 237  0
                                                 if (principals.size() == 1) { // one
 238  0
                                                         result = principals.get(0).getPrincipalId();
 239  
                                                 } else { // multiple
 240  0
                                                         String [] ids = new String [principals.size()];
 241  0
                                                         int insertIndex = 0;
 242  0
                                                         for (Principal principal : principals) {
 243  0
                                                                 ids[insertIndex++] = principal.getPrincipalId();
 244  
                                                         }
 245  0
                                                         Arrays.sort(ids);
 246  0
                                                         result = StringUtils.join(ids, "\n");
 247  
                                                 }
 248  
                                         }
 249  
                                 }
 250  0
                                 return result;
 251  
                         }
 252  
                 };
 253  
 
 254  
                 public boolean requestSubmit() {
 255  0
                         return currentlySubmitted.compareAndSet(false, true);
 256  
                 }
 257  
 
 258  
                 /**
 259  
                  * Call that tries to flush the write queue.
 260  
                  * @see Callable#call()
 261  
                  */
 262  
                 @Override
 263  
                 public Object call() {
 264  
                         try {
 265  
                                 // the strategy is to grab chunks of entities, dedupe & sort them, and insert them in a big
 266  
                                 // batch to reduce transaction overhead.  Sorting is done so insertion order is guaranteed, which
 267  
                                 // prevents deadlocks between concurrent writers to the database.
 268  0
                                 PlatformTransactionManager transactionManager = KRADServiceLocatorInternal.getTransactionManager();
 269  0
                                 TransactionTemplate template = new TransactionTemplate(transactionManager);
 270  0
                                 template.execute(new TransactionCallback() {
 271  
                                         @Override
 272  
                                         public Object doInTransaction(TransactionStatus status) {
 273  0
                                                 EntityDefault entity = null;
 274  0
                                                 ArrayList<EntityDefault> entitiesToInsert = new ArrayList<EntityDefault>(getMaxWriteQueueSize());
 275  0
                                                 Set<String> deduper = new HashSet<String>(getMaxWriteQueueSize());
 276  
 
 277  
                                                 // order is important in this conditional so that elements aren't dequeued and then ignored
 278  0
                                                 while (entitiesToInsert.size() < getMaxWriteQueueSize() && null != (entity = writeQueue.poll())) {
 279  0
                                                         if (deduper.add(entity.getEntityId())) {
 280  0
                                                                 entitiesToInsert.add(entity);
 281  
                                                         }
 282  
                                                 }
 283  
 
 284  0
                                                 Collections.sort(entitiesToInsert, kediComparator);
 285  0
                         List<EntityDefaultInfoCacheBo> entityCache = new ArrayList<EntityDefaultInfoCacheBo>(entitiesToInsert.size());
 286  0
                         for (EntityDefault entityToInsert : entitiesToInsert) {
 287  0
                             entityCache.add(new EntityDefaultInfoCacheBo( entityToInsert ));
 288  
                         }
 289  0
                         businessObjectService.save(entityCache);
 290  
                                                 //for (EntityDefault entityToInsert : entitiesToInsert) {
 291  
                                                 //        businessObjectService.save( new EntityDefaultInfoCacheBo( entityToInsert ) );
 292  
                                                 //}
 293  0
                                                 return null;
 294  
                                         }
 295  
                                 });
 296  
                         } finally { // make sure our running flag is unset, otherwise we'll never run again
 297  0
                                 currentlySubmitted.compareAndSet(true, false);
 298  0
                         }
 299  
 
 300  0
                         return Boolean.TRUE;
 301  
                 }
 302  
         }
 303  
 
 304  
         /**
 305  
          * A class encapsulating a {@link ConcurrentLinkedQueue} and an {@link AtomicInteger} to
 306  
          * provide fast offer(enqueue)/poll(dequeue) and size checking.  Size may be approximate due to concurrent
 307  
          * activity, but for our purposes that is fine.
 308  
          *
 309  
          * @author Kuali Rice Team (rice.collab@kuali.org)
 310  
          *
 311  
          */
 312  0
         private static class WriteQueue {
 313  0
                 AtomicInteger writeQueueSize = new AtomicInteger(0);
 314  0
                 ConcurrentLinkedQueue<EntityDefault> queue = new ConcurrentLinkedQueue<EntityDefault>();
 315  
 
 316  
                 public int offerAndGetSize(EntityDefault entity) {
 317  0
                         queue.add(entity);
 318  0
                         return writeQueueSize.incrementAndGet();
 319  
                 }
 320  
 
 321  
                 private EntityDefault poll() {
 322  0
                         EntityDefault result = queue.poll();
 323  0
                         if (result != null) { writeQueueSize.decrementAndGet(); }
 324  0
                         return result;
 325  
                 }
 326  
         }
 327  
 
 328  
         /**
 329  
          * decorator for a callable to log a message before it is executed
 330  
          *
 331  
          * @author Kuali Rice Team (rice.collab@kuali.org)
 332  
          *
 333  
          */
 334  
         private static class PreLogCallableWrapper<A> implements Callable<A> {
 335  
                 
 336  
                 private final Callable inner;
 337  
                 private final Level level;
 338  
                 private final String message;
 339  
 
 340  0
                 public PreLogCallableWrapper(Callable inner, Level level, String message) {
 341  0
                         this.inner = inner;
 342  0
                         this.level = level;
 343  0
                         this.message = message;
 344  0
                 }
 345  
 
 346  
                 /**
 347  
                  * logs the message then calls the inner Callable
 348  
                  * 
 349  
                  * @see java.util.concurrent.Callable#call()
 350  
                  */
 351  
                 @Override
 352  
                 @SuppressWarnings("unchecked")
 353  
                 public A call() throws Exception {
 354  0
                         LOG.log(level, message);
 355  0
                         return (A)inner.call();
 356  
                 }
 357  
         }
 358  
 
 359  
         /**
 360  
          * Adapts a Callable to be Runnable
 361  
          *
 362  
          * @author Kuali Rice Team (rice.collab@kuali.org)
 363  
          *
 364  
          */
 365  0
         private static class CallableAdapter implements Runnable {
 366  
 
 367  
                 private final Callable callable;
 368  
 
 369  0
                 public CallableAdapter(Callable callable) {
 370  0
                         this.callable = callable;
 371  0
                 }
 372  
 
 373  
                 @Override
 374  
                 public void run() {
 375  
                         try {
 376  0
                                 callable.call();
 377  0
                         } catch (Exception e) {
 378  0
                                 throw new RuntimeException(e);
 379  0
                         }
 380  0
                 }
 381  
         }
 382  
 }