001/** 002 * Copyright 2005-2016 The Kuali Foundation 003 * 004 * Licensed under the Educational Community License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/ecl2.php 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.kuali.rice.kim.service.impl; 017 018import org.apache.commons.lang.StringUtils; 019import org.apache.log4j.Level; 020import org.apache.log4j.Logger; 021import org.kuali.rice.core.api.config.property.ConfigurationService; 022import org.kuali.rice.kim.api.KimConstants; 023import org.kuali.rice.kim.impl.identity.IdentityArchiveService; 024import org.kuali.rice.kim.api.identity.entity.EntityDefault; 025import org.kuali.rice.kim.api.identity.principal.Principal; 026import org.kuali.rice.kim.impl.identity.EntityDefaultInfoCacheBo; 027import org.kuali.rice.krad.service.BusinessObjectService; 028import org.kuali.rice.ksb.service.KSBServiceLocator; 029import org.springframework.beans.factory.DisposableBean; 030import org.springframework.beans.factory.InitializingBean; 031import org.springframework.transaction.PlatformTransactionManager; 032import org.springframework.transaction.TransactionStatus; 033import org.springframework.transaction.support.TransactionCallback; 034import org.springframework.transaction.support.TransactionTemplate; 035 036import java.util.ArrayList; 037import java.util.Arrays; 038import java.util.Collection; 039import java.util.Collections; 040import java.util.Comparator; 041import java.util.HashMap; 042import java.util.HashSet; 043import java.util.List; 044import java.util.Map; 045import java.util.Set; 046import java.util.concurrent.Callable; 047import java.util.concurrent.ConcurrentLinkedQueue; 048import java.util.concurrent.TimeUnit; 049import java.util.concurrent.atomic.AtomicBoolean; 050import java.util.concurrent.atomic.AtomicInteger; 051 052/** 053 * This is the default implementation for the IdentityArchiveService. 054 * @see IdentityArchiveService 055 * 056 * @author Kuali Rice Team (rice.collab@kuali.org) 057 */ 058public class IdentityArchiveServiceImpl implements IdentityArchiveService, InitializingBean, DisposableBean { 059 private static final Logger LOG = Logger.getLogger( IdentityArchiveServiceImpl.class ); 060 061 private BusinessObjectService businessObjectService; 062 private ConfigurationService kualiConfigurationService; 063 private PlatformTransactionManager transactionManager; 064 065 private static final String EXEC_INTERVAL_SECS = "kim.identityArchiveServiceImpl.executionIntervalSeconds"; 066 private static final String MAX_WRITE_QUEUE_SIZE = "kim.identityArchiveServiceImpl.maxWriteQueueSize"; 067 private static final int EXECUTION_INTERVAL_SECONDS_DEFAULT = 600; // by default, flush the write queue this often 068 private static final int MAX_WRITE_QUEUE_SIZE_DEFAULT = 300; // cache this many KEDI's before forcing write 069 070 private final WriteQueue writeQueue = new WriteQueue(); 071 private final EntityArchiveWriter writer = new EntityArchiveWriter(); 072 073 // all this ceremony just decorates the writer so it logs a message first, and converts the Callable to Runnable 074 private final Runnable maxQueueSizeExceededWriter = 075 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "max size exceeded, flushing write queue")); 076 077 // ditto 078 private final Runnable scheduledWriter = 079 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "scheduled write out, flushing write queue")); 080 081 // ditto 082 private final Runnable shutdownWriter = 083 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "rice is shutting down, flushing write queue")); 084 085 private int getExecutionIntervalSeconds() { 086 final String prop = kualiConfigurationService.getPropertyValueAsString(EXEC_INTERVAL_SECS); 087 try { 088 return Integer.valueOf(prop).intValue(); 089 } catch (NumberFormatException e) { 090 return EXECUTION_INTERVAL_SECONDS_DEFAULT; 091 } 092 } 093 094 private int getMaxWriteQueueSize() { 095 final String prop = kualiConfigurationService.getPropertyValueAsString(MAX_WRITE_QUEUE_SIZE); 096 try { 097 return Integer.valueOf(prop).intValue(); 098 } catch (NumberFormatException e) { 099 return MAX_WRITE_QUEUE_SIZE_DEFAULT; 100 } 101 } 102 103 @Override 104 public EntityDefault getEntityDefaultFromArchive( String entityId ) { 105 if (StringUtils.isBlank(entityId)) { 106 throw new IllegalArgumentException("entityId is blank"); 107 } 108 109 Map<String,String> criteria = new HashMap<String, String>(1); 110 criteria.put(KimConstants.PrimaryKeyConstants.SUB_ENTITY_ID, entityId); 111 EntityDefaultInfoCacheBo cachedValue = businessObjectService.findByPrimaryKey(EntityDefaultInfoCacheBo.class, criteria); 112 return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo(); 113 } 114 115 @Override 116 public EntityDefault getEntityDefaultFromArchiveByPrincipalId(String principalId) { 117 if (StringUtils.isBlank(principalId)) { 118 throw new IllegalArgumentException("principalId is blank"); 119 } 120 121 Map<String,String> criteria = new HashMap<String, String>(1); 122 criteria.put("principalId", principalId); 123 EntityDefaultInfoCacheBo cachedValue = businessObjectService.findByPrimaryKey(EntityDefaultInfoCacheBo.class, criteria); 124 return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo(); 125 } 126 127 @Override 128 public EntityDefault getEntityDefaultFromArchiveByPrincipalName(String principalName) { 129 if (StringUtils.isBlank(principalName)) { 130 throw new IllegalArgumentException("principalName is blank"); 131 } 132 133 Map<String,String> criteria = new HashMap<String, String>(1); 134 criteria.put("principalName", principalName); 135 Collection<EntityDefaultInfoCacheBo> entities = businessObjectService.findMatching(EntityDefaultInfoCacheBo.class, criteria); 136 return (entities == null || entities.isEmpty()) ? null : entities.iterator().next().convertCacheToEntityDefaultInfo(); 137 } 138 139 @Override 140 public EntityDefault getEntityDefaultFromArchiveByEmployeeId(String employeeId) { 141 if (StringUtils.isBlank(employeeId)) { 142 throw new IllegalArgumentException("employeeId is blank"); 143 } 144 Map<String,String> criteria = new HashMap<String, String>(1); 145 criteria.put("employeeId", employeeId); 146 Collection<EntityDefaultInfoCacheBo> entities = businessObjectService.findMatching(EntityDefaultInfoCacheBo.class, criteria); 147 return (entities == null || entities.isEmpty()) ? null : entities.iterator().next().convertCacheToEntityDefaultInfo(); 148 } 149 150 @Override 151 public void saveEntityDefaultToArchive(EntityDefault entity) { 152 if (entity == null) { 153 throw new IllegalArgumentException("entity is blank"); 154 } 155 156 // if the max size has been reached, schedule now 157 if (getMaxWriteQueueSize() <= writeQueue.offerAndGetSize(entity) /* <- this enqueues the KEDI */ && 158 writer.requestSubmit()) { 159 KSBServiceLocator.getThreadPool().execute(maxQueueSizeExceededWriter); 160 } 161 } 162 163 @Override 164 public void flushToArchive() { 165 writer.call(); 166 } 167 168 169 public void setBusinessObjectService(BusinessObjectService businessObjectService) { 170 this.businessObjectService = businessObjectService; 171 } 172 173 public void setKualiConfigurationService( 174 ConfigurationService kualiConfigurationService) { 175 this.kualiConfigurationService = kualiConfigurationService; 176 } 177 178 public void setTransactionManager(PlatformTransactionManager txMgr) { 179 this.transactionManager = txMgr; 180 } 181 182 /** schedule the writer on the KSB scheduled pool. */ 183 @Override 184 public void afterPropertiesSet() throws Exception { 185 LOG.info("scheduling writer..."); 186 KSBServiceLocator.getScheduledPool().scheduleAtFixedRate(scheduledWriter, 187 getExecutionIntervalSeconds(), getExecutionIntervalSeconds(), TimeUnit.SECONDS); 188 } 189 190 /** flush the write queue immediately. */ 191 @Override 192 public void destroy() throws Exception { 193 KSBServiceLocator.getThreadPool().execute(shutdownWriter); 194 } 195 196 /** 197 * store the person to the database, but do this an alternate thread to 198 * prevent transaction issues since this service is non-transactional 199 * 200 * @author Kuali Rice Team (rice.collab@kuali.org) 201 * 202 */ 203 private class EntityArchiveWriter implements Callable { 204 205 // flag used to prevent multiple processes from being submitted at once 206 AtomicBoolean currentlySubmitted = new AtomicBoolean(false); 207 208 private final Comparator<Comparable> nullSafeComparator = new Comparator<Comparable>() { 209 @Override 210 public int compare(Comparable i1, Comparable i2) { 211 if (i1 != null && i2 != null) { 212 return i1.compareTo(i2); 213 } else if (i1 == null) { 214 if (i2 == null) { 215 return 0; 216 } else { 217 return -1; 218 } 219 } else { // if (entityId2 == null) { 220 return 1; 221 } 222 }; 223 }; 224 225 /** 226 * Comparator that attempts to impose a total ordering on EntityDefault instances 227 */ 228 private final Comparator<EntityDefault> kediComparator = new Comparator<EntityDefault>() { 229 /** 230 * compares by entityId value 231 * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object) 232 */ 233 @Override 234 public int compare(EntityDefault o1, EntityDefault o2) { 235 String entityId1 = (o1 == null) ? null : o1.getEntityId(); 236 String entityId2 = (o2 == null) ? null : o2.getEntityId(); 237 238 int result = nullSafeComparator.compare(entityId1, entityId2); 239 240 if (result == 0) { 241 result = getPrincipalIdsString(o1).compareTo(getPrincipalIdsString(o2)); 242 } 243 244 return result; 245 } 246 247 /** 248 * This method builds a newline delimited String containing the identity's principal IDs in sorted order 249 * 250 * @param entity 251 * @return 252 */ 253 private String getPrincipalIdsString(EntityDefault entity) { 254 String result = ""; 255 if (entity != null) { 256 List<Principal> principals = entity.getPrincipals(); 257 if (principals != null) { 258 if (principals.size() == 1) { // one 259 result = principals.get(0).getPrincipalId(); 260 } else { // multiple 261 String [] ids = new String [principals.size()]; 262 int insertIndex = 0; 263 for (Principal principal : principals) { 264 ids[insertIndex++] = principal.getPrincipalId(); 265 } 266 Arrays.sort(ids); 267 result = StringUtils.join(ids, "\n"); 268 } 269 } 270 } 271 return result; 272 } 273 }; 274 275 public boolean requestSubmit() { 276 return currentlySubmitted.compareAndSet(false, true); 277 } 278 279 /** 280 * Call that tries to flush the write queue. 281 * @see Callable#call() 282 */ 283 @Override 284 public Object call() { 285 try { 286 // the strategy is to grab chunks of entities, dedupe & sort them, and insert them in a big 287 // batch to reduce transaction overhead. Sorting is done so insertion order is guaranteed, which 288 // prevents deadlocks between concurrent writers to the database. 289 TransactionTemplate template = new TransactionTemplate(transactionManager); 290 template.execute(new TransactionCallback() { 291 @Override 292 public Object doInTransaction(TransactionStatus status) { 293 EntityDefault entity = null; 294 ArrayList<EntityDefault> entitiesToInsert = new ArrayList<EntityDefault>(getMaxWriteQueueSize()); 295 Set<String> deduper = new HashSet<String>(getMaxWriteQueueSize()); 296 297 // order is important in this conditional so that elements aren't dequeued and then ignored 298 while (entitiesToInsert.size() < getMaxWriteQueueSize() && null != (entity = writeQueue.poll())) { 299 if (deduper.add(entity.getEntityId())) { 300 entitiesToInsert.add(entity); 301 } 302 } 303 304 Collections.sort(entitiesToInsert, kediComparator); 305 List<EntityDefaultInfoCacheBo> entityCache = new ArrayList<EntityDefaultInfoCacheBo>(entitiesToInsert.size()); 306 for (EntityDefault entityToInsert : entitiesToInsert) { 307 entityCache.add(new EntityDefaultInfoCacheBo( entityToInsert )); 308 } 309 businessObjectService.save(entityCache); 310 //for (EntityDefault entityToInsert : entitiesToInsert) { 311 // businessObjectService.save( new EntityDefaultInfoCacheBo( entityToInsert ) ); 312 //} 313 return null; 314 } 315 }); 316 } finally { // make sure our running flag is unset, otherwise we'll never run again 317 currentlySubmitted.compareAndSet(true, false); 318 } 319 320 return Boolean.TRUE; 321 } 322 } 323 324 /** 325 * A class encapsulating a {@link ConcurrentLinkedQueue} and an {@link AtomicInteger} to 326 * provide fast offer(enqueue)/poll(dequeue) and size checking. Size may be approximate due to concurrent 327 * activity, but for our purposes that is fine. 328 * 329 * @author Kuali Rice Team (rice.collab@kuali.org) 330 * 331 */ 332 private static class WriteQueue { 333 AtomicInteger writeQueueSize = new AtomicInteger(0); 334 ConcurrentLinkedQueue<EntityDefault> queue = new ConcurrentLinkedQueue<EntityDefault>(); 335 336 public int offerAndGetSize(EntityDefault entity) { 337 queue.add(entity); 338 return writeQueueSize.incrementAndGet(); 339 } 340 341 private EntityDefault poll() { 342 EntityDefault result = queue.poll(); 343 if (result != null) { writeQueueSize.decrementAndGet(); } 344 return result; 345 } 346 } 347 348 /** 349 * decorator for a callable to log a message before it is executed 350 * 351 * @author Kuali Rice Team (rice.collab@kuali.org) 352 * 353 */ 354 private static class PreLogCallableWrapper<A> implements Callable<A> { 355 356 private final Callable inner; 357 private final Level level; 358 private final String message; 359 360 public PreLogCallableWrapper(Callable inner, Level level, String message) { 361 this.inner = inner; 362 this.level = level; 363 this.message = message; 364 } 365 366 /** 367 * logs the message then calls the inner Callable 368 * 369 * @see java.util.concurrent.Callable#call() 370 */ 371 @Override 372 @SuppressWarnings("unchecked") 373 public A call() throws Exception { 374 LOG.log(level, message); 375 return (A)inner.call(); 376 } 377 } 378 379 /** 380 * Adapts a Callable to be Runnable 381 * 382 * @author Kuali Rice Team (rice.collab@kuali.org) 383 * 384 */ 385 private static class CallableAdapter implements Runnable { 386 387 private final Callable callable; 388 389 public CallableAdapter(Callable callable) { 390 this.callable = callable; 391 } 392 393 @Override 394 public void run() { 395 try { 396 callable.call(); 397 } catch (Exception e) { 398 throw new RuntimeException(e); 399 } 400 } 401 } 402}