001 /* 002 * Copyright 2006-2011 The Kuali Foundation 003 * 004 * Licensed under the Educational Community License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/ecl2.php 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017 package org.kuali.rice.kim.service.impl; 018 019 import org.apache.commons.lang.StringUtils; 020 import org.apache.log4j.Level; 021 import org.apache.log4j.Logger; 022 import org.kuali.rice.core.api.config.property.ConfigurationService; 023 import org.kuali.rice.kim.api.identity.entity.EntityDefault; 024 import org.kuali.rice.kim.api.identity.principal.Principal; 025 import org.kuali.rice.kim.api.services.IdentityArchiveService; 026 import org.kuali.rice.kim.bo.entity.impl.KimEntityDefaultInfoCacheImpl; 027 import org.kuali.rice.kim.util.KimConstants; 028 import org.kuali.rice.krad.service.BusinessObjectService; 029 import org.kuali.rice.krad.service.KRADServiceLocatorInternal; 030 import org.kuali.rice.ksb.service.KSBServiceLocator; 031 import org.springframework.beans.factory.DisposableBean; 032 import org.springframework.beans.factory.InitializingBean; 033 import org.springframework.transaction.PlatformTransactionManager; 034 import org.springframework.transaction.TransactionStatus; 035 import org.springframework.transaction.support.TransactionCallback; 036 import org.springframework.transaction.support.TransactionTemplate; 037 038 import java.util.ArrayList; 039 import java.util.Arrays; 040 import java.util.Collection; 041 import java.util.Collections; 042 import java.util.Comparator; 043 import java.util.HashMap; 044 import java.util.HashSet; 045 import java.util.List; 046 import java.util.Map; 047 import java.util.Set; 048 import java.util.concurrent.Callable; 049 import java.util.concurrent.ConcurrentLinkedQueue; 050 import java.util.concurrent.TimeUnit; 051 import java.util.concurrent.atomic.AtomicBoolean; 052 import java.util.concurrent.atomic.AtomicInteger; 053 054 /** 055 * This is the default implementation for the IdentityArchiveService. 056 * @see IdentityArchiveService 057 * @author Kuali Rice Team (rice.collab@kuali.org) 058 * 059 */ 060 public class IdentityArchiveServiceImpl implements IdentityArchiveService, InitializingBean, DisposableBean { 061 private static final Logger LOG = Logger.getLogger( IdentityArchiveServiceImpl.class ); 062 063 private BusinessObjectService businessObjectService; 064 private ConfigurationService kualiConfigurationService; 065 066 private static final String EXEC_INTERVAL_SECS = "kim.identityArchiveServiceImpl.executionIntervalSeconds"; 067 private static final String MAX_WRITE_QUEUE_SIZE = "kim.identityArchiveServiceImpl.maxWriteQueueSize"; 068 private static final int EXECUTION_INTERVAL_SECONDS_DEFAULT = 600; // by default, flush the write queue this often 069 private static final int MAX_WRITE_QUEUE_SIZE_DEFAULT = 300; // cache this many KEDI's before forcing write 070 071 private final WriteQueue writeQueue = new WriteQueue(); 072 private final EntityArchiveWriter writer = new EntityArchiveWriter(); 073 074 // all this ceremony just decorates the writer so it logs a message first, and converts the Callable to Runnable 075 private final Runnable maxQueueSizeExceededWriter = 076 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "max size exceeded, flushing write queue")); 077 078 // ditto 079 private final Runnable scheduledWriter = 080 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "scheduled write out, flushing write queue")); 081 082 // ditto 083 private final Runnable shutdownWriter = 084 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "rice is shutting down, flushing write queue")); 085 086 private int getExecutionIntervalSeconds() { 087 final String prop = kualiConfigurationService.getPropertyString(EXEC_INTERVAL_SECS); 088 try { 089 return Integer.valueOf(prop).intValue(); 090 } catch (NumberFormatException e) { 091 return EXECUTION_INTERVAL_SECONDS_DEFAULT; 092 } 093 } 094 095 private int getMaxWriteQueueSize() { 096 final String prop = kualiConfigurationService.getPropertyString(MAX_WRITE_QUEUE_SIZE); 097 try { 098 return Integer.valueOf(prop).intValue(); 099 } catch (NumberFormatException e) { 100 return MAX_WRITE_QUEUE_SIZE_DEFAULT; 101 } 102 } 103 104 @Override 105 public EntityDefault getEntityDefaultInfoFromArchive( String entityId ) { 106 Map<String,String> criteria = new HashMap<String, String>(1); 107 criteria.put(KimConstants.PrimaryKeyConstants.ENTITY_ID, entityId); 108 KimEntityDefaultInfoCacheImpl cachedValue = getBusinessObjectService().findByPrimaryKey(KimEntityDefaultInfoCacheImpl.class, criteria); 109 return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo(); 110 } 111 112 @Override 113 public EntityDefault getEntityDefaultInfoFromArchiveByPrincipalId( String principalId ) { 114 Map<String,String> criteria = new HashMap<String, String>(1); 115 criteria.put("principalId", principalId); 116 KimEntityDefaultInfoCacheImpl cachedValue = getBusinessObjectService().findByPrimaryKey(KimEntityDefaultInfoCacheImpl.class, criteria); 117 return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo(); 118 } 119 120 @Override 121 public EntityDefault getEntityDefaultInfoFromArchiveByPrincipalName( String principalName ) { 122 Map<String,String> criteria = new HashMap<String, String>(1); 123 criteria.put("principalName", principalName); 124 Collection<KimEntityDefaultInfoCacheImpl> entities = getBusinessObjectService().findMatching(KimEntityDefaultInfoCacheImpl.class, criteria); 125 return (entities == null || entities.size() == 0) ? null : entities.iterator().next().convertCacheToEntityDefaultInfo(); 126 } 127 128 @Override 129 public void saveDefaultInfoToArchive( EntityDefault entity ) { 130 // if the max size has been reached, schedule now 131 if (getMaxWriteQueueSize() <= writeQueue.offerAndGetSize(entity) /* <- this enqueues the KEDI */ && 132 writer.requestSubmit()) { 133 KSBServiceLocator.getThreadPool().execute(maxQueueSizeExceededWriter); 134 } 135 } 136 137 public BusinessObjectService getBusinessObjectService() { 138 return this.businessObjectService; 139 } 140 141 public void setBusinessObjectService(BusinessObjectService businessObjectService) { 142 this.businessObjectService = businessObjectService; 143 } 144 145 public ConfigurationService getKualiConfigurationService() { 146 return this.kualiConfigurationService; 147 } 148 149 public void setKualiConfigurationService( 150 ConfigurationService kualiConfigurationService) { 151 this.kualiConfigurationService = kualiConfigurationService; 152 } 153 154 /** schedule the writer on the KSB scheduled pool. */ 155 @Override 156 public void afterPropertiesSet() throws Exception { 157 LOG.info("scheduling writer..."); 158 KSBServiceLocator.getScheduledPool().scheduleAtFixedRate(scheduledWriter, 159 getExecutionIntervalSeconds(), getExecutionIntervalSeconds(), TimeUnit.SECONDS); 160 } 161 162 /** flush the write queue immediately. */ 163 @Override 164 public void destroy() throws Exception { 165 KSBServiceLocator.getThreadPool().execute(shutdownWriter); 166 } 167 168 /** 169 * store the person to the database, but do this an alternate thread to 170 * prevent transaction issues since this service is non-transactional 171 * 172 * @author Kuali Rice Team (rice.collab@kuali.org) 173 * 174 */ 175 private class EntityArchiveWriter implements Callable { 176 177 // flag used to prevent multiple processes from being submitted at once 178 AtomicBoolean currentlySubmitted = new AtomicBoolean(false); 179 180 private final Comparator<Comparable> nullSafeComparator = new Comparator<Comparable>() { 181 @Override 182 public int compare(Comparable i1, Comparable i2) { 183 if (i1 != null && i2 != null) { 184 return i1.compareTo(i2); 185 } else if (i1 == null) { 186 if (i2 == null) { 187 return 0; 188 } else { 189 return -1; 190 } 191 } else { // if (entityId2 == null) { 192 return 1; 193 } 194 }; 195 }; 196 197 /** 198 * Comparator that attempts to impose a total ordering on EntityDefault instances 199 */ 200 private final Comparator<EntityDefault> kediComparator = new Comparator<EntityDefault>() { 201 /** 202 * compares by entityId value 203 * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object) 204 */ 205 @Override 206 public int compare(EntityDefault o1, EntityDefault o2) { 207 String entityId1 = (o1 == null) ? null : o1.getEntityId(); 208 String entityId2 = (o2 == null) ? null : o2.getEntityId(); 209 210 int result = nullSafeComparator.compare(entityId1, entityId2); 211 212 if (result == 0) { 213 result = getPrincipalIdsString(o1).compareTo(getPrincipalIdsString(o2)); 214 } 215 216 return result; 217 } 218 219 /** 220 * This method builds a newline delimited String containing the identity's principal IDs in sorted order 221 * 222 * @param entity 223 * @return 224 */ 225 private String getPrincipalIdsString(EntityDefault entity) { 226 String result = ""; 227 if (entity != null) { 228 List<Principal> principals = entity.getPrincipals(); 229 if (principals != null) { 230 if (principals.size() == 1) { // one 231 result = principals.get(0).getPrincipalId(); 232 } else { // multiple 233 String [] ids = new String [principals.size()]; 234 int insertIndex = 0; 235 for (Principal principal : principals) { 236 ids[insertIndex++] = principal.getPrincipalId(); 237 } 238 Arrays.sort(ids); 239 result = StringUtils.join(ids, "\n"); 240 } 241 } 242 } 243 return result; 244 } 245 }; 246 247 public boolean requestSubmit() { 248 return currentlySubmitted.compareAndSet(false, true); 249 } 250 251 /** 252 * Call that tries to flush the write queue. 253 * @see Callable#call() 254 */ 255 @Override 256 public Object call() { 257 try { 258 // the strategy is to grab chunks of entities, dedupe & sort them, and insert them in a big 259 // batch to reduce transaction overhead. Sorting is done so insertion order is guaranteed, which 260 // prevents deadlocks between concurrent writers to the database. 261 PlatformTransactionManager transactionManager = KRADServiceLocatorInternal.getTransactionManager(); 262 TransactionTemplate template = new TransactionTemplate(transactionManager); 263 template.execute(new TransactionCallback() { 264 @Override 265 public Object doInTransaction(TransactionStatus status) { 266 EntityDefault entity = null; 267 ArrayList<EntityDefault> entitiesToInsert = new ArrayList<EntityDefault>(getMaxWriteQueueSize()); 268 Set<String> deduper = new HashSet<String>(getMaxWriteQueueSize()); 269 270 // order is important in this conditional so that elements aren't dequeued and then ignored 271 while (entitiesToInsert.size() < getMaxWriteQueueSize() && null != (entity = writeQueue.poll())) { 272 if (deduper.add(entity.getEntityId())) { 273 entitiesToInsert.add(entity); 274 } 275 } 276 277 Collections.sort(entitiesToInsert, kediComparator); 278 279 for (EntityDefault entityToInsert : entitiesToInsert) { 280 getBusinessObjectService().save( new KimEntityDefaultInfoCacheImpl( entityToInsert ) ); 281 } 282 return null; 283 } 284 }); 285 } finally { // make sure our running flag is unset, otherwise we'll never run again 286 currentlySubmitted.compareAndSet(true, false); 287 } 288 289 return Boolean.TRUE; 290 } 291 } 292 293 /** 294 * A class encapsulating a {@link ConcurrentLinkedQueue} and an {@link AtomicInteger} to 295 * provide fast offer(enqueue)/poll(dequeue) and size checking. Size may be approximate due to concurrent 296 * activity, but for our purposes that is fine. 297 * 298 * @author Kuali Rice Team (rice.collab@kuali.org) 299 * 300 */ 301 private static class WriteQueue { 302 AtomicInteger writeQueueSize = new AtomicInteger(0); 303 ConcurrentLinkedQueue<EntityDefault> queue = new ConcurrentLinkedQueue<EntityDefault>(); 304 305 public int offerAndGetSize(EntityDefault entity) { 306 queue.add(entity); 307 return writeQueueSize.incrementAndGet(); 308 } 309 310 private EntityDefault poll() { 311 EntityDefault result = queue.poll(); 312 if (result != null) { writeQueueSize.decrementAndGet(); } 313 return result; 314 } 315 } 316 317 /** 318 * decorator for a callable to log a message before it is executed 319 * 320 * @author Kuali Rice Team (rice.collab@kuali.org) 321 * 322 */ 323 private static class PreLogCallableWrapper<A> implements Callable<A> { 324 325 private final Callable inner; 326 private final Level level; 327 private final String message; 328 329 public PreLogCallableWrapper(Callable inner, Level level, String message) { 330 this.inner = inner; 331 this.level = level; 332 this.message = message; 333 } 334 335 /** 336 * logs the message then calls the inner Callable 337 * 338 * @see java.util.concurrent.Callable#call() 339 */ 340 @Override 341 @SuppressWarnings("unchecked") 342 public A call() throws Exception { 343 LOG.log(level, message); 344 return (A)inner.call(); 345 } 346 } 347 348 /** 349 * Adapts a Callable to be Runnable 350 * 351 * @author Kuali Rice Team (rice.collab@kuali.org) 352 * 353 */ 354 private static class CallableAdapter implements Runnable { 355 356 private final Callable callable; 357 358 public CallableAdapter(Callable callable) { 359 this.callable = callable; 360 } 361 362 @Override 363 public void run() { 364 try { 365 callable.call(); 366 } catch (Exception e) { 367 throw new RuntimeException(e); 368 } 369 } 370 } 371 }