001/** 002 * Copyright 2005-2015 The Kuali Foundation 003 * 004 * Licensed under the Educational Community License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/ecl2.php 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.kuali.rice.kim.service.impl; 017 018import java.util.ArrayList; 019import java.util.Arrays; 020import java.util.Collections; 021import java.util.Comparator; 022import java.util.HashSet; 023import java.util.List; 024import java.util.Set; 025import java.util.concurrent.Callable; 026import java.util.concurrent.ConcurrentLinkedQueue; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.atomic.AtomicBoolean; 029import java.util.concurrent.atomic.AtomicInteger; 030 031import org.apache.commons.lang.StringUtils; 032import org.apache.log4j.Level; 033import org.apache.log4j.Logger; 034import org.kuali.rice.core.api.config.property.ConfigurationService; 035import org.kuali.rice.core.api.criteria.QueryByCriteria; 036import org.kuali.rice.kim.api.KimConstants; 037import org.kuali.rice.kim.api.identity.entity.EntityDefault; 038import org.kuali.rice.kim.api.identity.principal.Principal; 039import org.kuali.rice.kim.impl.identity.EntityDefaultInfoCacheBo; 040import org.kuali.rice.kim.impl.identity.IdentityArchiveService; 041import org.kuali.rice.krad.data.DataObjectService; 042import org.kuali.rice.ksb.service.KSBServiceLocator; 043import org.springframework.beans.factory.DisposableBean; 044import org.springframework.beans.factory.InitializingBean; 045import org.springframework.transaction.PlatformTransactionManager; 046import org.springframework.transaction.TransactionStatus; 047import org.springframework.transaction.support.TransactionCallback; 048import org.springframework.transaction.support.TransactionTemplate; 049 050/** 051 * This is the default implementation for the IdentityArchiveService. 052 * @see IdentityArchiveService 053 * 054 * @author Kuali Rice Team (rice.collab@kuali.org) 055 */ 056public class IdentityArchiveServiceImpl implements IdentityArchiveService, InitializingBean, DisposableBean { 057 private static final Logger LOG = Logger.getLogger( IdentityArchiveServiceImpl.class ); 058 059 protected DataObjectService dataObjectService; 060 protected ConfigurationService kualiConfigurationService; 061 protected PlatformTransactionManager transactionManager; 062 063 protected static final String EXEC_INTERVAL_SECS = "kim.identityArchiveServiceImpl.executionIntervalSeconds"; 064 protected static final String MAX_WRITE_QUEUE_SIZE = "kim.identityArchiveServiceImpl.maxWriteQueueSize"; 065 protected static final int EXECUTION_INTERVAL_SECONDS_DEFAULT = 600; // by default, flush the write queue this often 066 protected static final int MAX_WRITE_QUEUE_SIZE_DEFAULT = 300; // cache this many KEDI's before forcing write 067 068 protected final WriteQueue writeQueue = new WriteQueue(); 069 protected final EntityArchiveWriter writer = new EntityArchiveWriter(); 070 071 // all this ceremony just decorates the writer so it logs a message first, and converts the Callable to Runnable 072 protected final Runnable maxQueueSizeExceededWriter = 073 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "max size exceeded, flushing write queue")); 074 075 // ditto 076 protected final Runnable scheduledWriter = 077 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "scheduled write out, flushing write queue")); 078 079 // ditto 080 protected final Runnable shutdownWriter = 081 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "rice is shutting down, flushing write queue")); 082 083 protected int getExecutionIntervalSeconds() { 084 final String prop = kualiConfigurationService.getPropertyValueAsString(EXEC_INTERVAL_SECS); 085 try { 086 return Integer.valueOf(prop).intValue(); 087 } catch (NumberFormatException e) { 088 return EXECUTION_INTERVAL_SECONDS_DEFAULT; 089 } 090 } 091 092 protected int getMaxWriteQueueSize() { 093 final String prop = kualiConfigurationService.getPropertyValueAsString(MAX_WRITE_QUEUE_SIZE); 094 try { 095 return Integer.valueOf(prop).intValue(); 096 } catch (NumberFormatException e) { 097 return MAX_WRITE_QUEUE_SIZE_DEFAULT; 098 } 099 } 100 101 @Override 102 public EntityDefault getEntityDefaultFromArchive( String entityId ) { 103 if (StringUtils.isBlank(entityId)) { 104 throw new IllegalArgumentException("entityId is blank"); 105 } 106 107 List<EntityDefaultInfoCacheBo> results = dataObjectService.findMatching(EntityDefaultInfoCacheBo.class, 108 QueryByCriteria.Builder.forAttribute(KimConstants.PrimaryKeyConstants.SUB_ENTITY_ID, entityId).build() ).getResults(); 109 EntityDefaultInfoCacheBo cachedValue = null; 110 if ( !results.isEmpty() ) { 111 cachedValue = results.get(0); 112 } 113 114 return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo(); 115 } 116 117 @Override 118 public EntityDefault getEntityDefaultFromArchiveByPrincipalId(String principalId) { 119 if (StringUtils.isBlank(principalId)) { 120 throw new IllegalArgumentException("principalId is blank"); 121 } 122 123 EntityDefaultInfoCacheBo cachedValue = dataObjectService.find(EntityDefaultInfoCacheBo.class, principalId); 124 return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo(); 125 } 126 127 @Override 128 public EntityDefault getEntityDefaultFromArchiveByPrincipalName(String principalName) { 129 if (StringUtils.isBlank(principalName)) { 130 throw new IllegalArgumentException("principalName is blank"); 131 } 132 133 List<EntityDefaultInfoCacheBo> entities = dataObjectService.findMatching(EntityDefaultInfoCacheBo.class, 134 QueryByCriteria.Builder.forAttribute("principalName", principalName).build()).getResults(); 135 return entities.isEmpty() ? null : entities.get(0).convertCacheToEntityDefaultInfo(); 136 } 137 138 @Override 139 public EntityDefault getEntityDefaultFromArchiveByEmployeeId(String employeeId) { 140 if (StringUtils.isBlank(employeeId)) { 141 throw new IllegalArgumentException("employeeId is blank"); 142 } 143 List<EntityDefaultInfoCacheBo> entities = dataObjectService.findMatching(EntityDefaultInfoCacheBo.class, 144 QueryByCriteria.Builder.forAttribute("employeeId", employeeId).build()).getResults(); 145 return entities.isEmpty() ? null : entities.get(0).convertCacheToEntityDefaultInfo(); 146 } 147 148 @Override 149 public void saveEntityDefaultToArchive(EntityDefault entity) { 150 if (entity == null) { 151 throw new IllegalArgumentException("entity is blank"); 152 } 153 154 // if the max size has been reached, schedule now 155 if (getMaxWriteQueueSize() <= writeQueue.offerAndGetSize(entity) /* <- this enqueues the KEDI */ && 156 writer.requestSubmit()) { 157 KSBServiceLocator.getThreadPool().execute(maxQueueSizeExceededWriter); 158 } 159 } 160 161 @Override 162 public void flushToArchive() { 163 writer.call(); 164 } 165 166 public void setKualiConfigurationService( 167 ConfigurationService kualiConfigurationService) { 168 this.kualiConfigurationService = kualiConfigurationService; 169 } 170 171 public void setTransactionManager(PlatformTransactionManager txMgr) { 172 this.transactionManager = txMgr; 173 } 174 175 /** schedule the writer on the KSB scheduled pool. */ 176 @Override 177 public void afterPropertiesSet() throws Exception { 178 LOG.info("scheduling writer..."); 179 KSBServiceLocator.getScheduledPool().scheduleAtFixedRate(scheduledWriter, 180 getExecutionIntervalSeconds(), getExecutionIntervalSeconds(), TimeUnit.SECONDS); 181 } 182 183 /** flush the write queue immediately. */ 184 @Override 185 public void destroy() throws Exception { 186 shutdownWriter.run(); 187 } 188 189 /** 190 * store the person to the database, but do this an alternate thread to 191 * prevent transaction issues since this service is non-transactional 192 * 193 * @author Kuali Rice Team (rice.collab@kuali.org) 194 * 195 */ 196 protected class EntityArchiveWriter implements Callable { 197 198 // flag used to prevent multiple processes from being submitted at once 199 AtomicBoolean currentlySubmitted = new AtomicBoolean(false); 200 201 private final Comparator<Comparable> nullSafeComparator = new Comparator<Comparable>() { 202 @Override 203 public int compare(Comparable i1, Comparable i2) { 204 if (i1 != null && i2 != null) { 205 return i1.compareTo(i2); 206 } else if (i1 == null) { 207 if (i2 == null) { 208 return 0; 209 } else { 210 return -1; 211 } 212 } else { // if (entityId2 == null) { 213 return 1; 214 } 215 }; 216 }; 217 218 /** 219 * Comparator that attempts to impose a total ordering on EntityDefault instances 220 */ 221 protected final Comparator<EntityDefault> kediComparator = new Comparator<EntityDefault>() { 222 /** 223 * compares by entityId value 224 * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object) 225 */ 226 @Override 227 public int compare(EntityDefault o1, EntityDefault o2) { 228 String entityId1 = (o1 == null) ? null : o1.getEntityId(); 229 String entityId2 = (o2 == null) ? null : o2.getEntityId(); 230 231 int result = nullSafeComparator.compare(entityId1, entityId2); 232 233 if (result == 0) { 234 result = getPrincipalIdsString(o1).compareTo(getPrincipalIdsString(o2)); 235 } 236 237 return result; 238 } 239 240 /** 241 * This method builds a newline delimited String containing the identity's principal IDs in sorted order 242 * 243 * @param entity 244 * @return 245 */ 246 private String getPrincipalIdsString(EntityDefault entity) { 247 String result = ""; 248 if (entity != null) { 249 List<Principal> principals = entity.getPrincipals(); 250 if (principals != null) { 251 if (principals.size() == 1) { // one 252 result = principals.get(0).getPrincipalId(); 253 } else { // multiple 254 String [] ids = new String [principals.size()]; 255 int insertIndex = 0; 256 for (Principal principal : principals) { 257 ids[insertIndex++] = principal.getPrincipalId(); 258 } 259 Arrays.sort(ids); 260 result = StringUtils.join(ids, "\n"); 261 } 262 } 263 } 264 return result; 265 } 266 }; 267 268 public boolean requestSubmit() { 269 return currentlySubmitted.compareAndSet(false, true); 270 } 271 272 /** 273 * Call that tries to flush the write queue. 274 * @see Callable#call() 275 */ 276 @Override 277 public Object call() { 278 try { 279 // the strategy is to grab chunks of entities, dedupe & sort them, and insert them in a big 280 // batch to reduce transaction overhead. Sorting is done so insertion order is guaranteed, which 281 // prevents deadlocks between concurrent writers to the database. 282 TransactionTemplate template = new TransactionTemplate(transactionManager); 283 template.execute(new TransactionCallback() { 284 @Override 285 public Object doInTransaction(TransactionStatus status) { 286 EntityDefault entity = null; 287 ArrayList<EntityDefault> entitiesToInsert = new ArrayList<EntityDefault>(getMaxWriteQueueSize()); 288 Set<String> deduper = new HashSet<String>(getMaxWriteQueueSize()); 289 290 // order is important in this conditional so that elements aren't dequeued and then ignored 291 while (entitiesToInsert.size() < getMaxWriteQueueSize() && null != (entity = writeQueue.poll())) { 292 //Added an if condition to check whether an entity has a principal 293 if (entity.getPrincipals().size() > 0 && deduper.add(entity.getEntityId())) { 294 entitiesToInsert.add(entity); 295 } 296 } 297 298 Collections.sort(entitiesToInsert, kediComparator); 299 for (EntityDefault entityToInsert : entitiesToInsert) { 300 dataObjectService.save( new EntityDefaultInfoCacheBo( entityToInsert ) ); 301 } 302 return null; 303 } 304 }); 305 } finally { // make sure our running flag is unset, otherwise we'll never run again 306 currentlySubmitted.compareAndSet(true, false); 307 } 308 309 return Boolean.TRUE; 310 } 311 } 312 313 /** 314 * A class encapsulating a {@link ConcurrentLinkedQueue} and an {@link AtomicInteger} to 315 * provide fast offer(enqueue)/poll(dequeue) and size checking. Size may be approximate due to concurrent 316 * activity, but for our purposes that is fine. 317 * 318 * @author Kuali Rice Team (rice.collab@kuali.org) 319 * 320 */ 321 protected static class WriteQueue { 322 AtomicInteger writeQueueSize = new AtomicInteger(0); 323 ConcurrentLinkedQueue<EntityDefault> queue = new ConcurrentLinkedQueue<EntityDefault>(); 324 325 public int offerAndGetSize(EntityDefault entity) { 326 queue.add(entity); 327 return writeQueueSize.incrementAndGet(); 328 } 329 330 protected EntityDefault poll() { 331 EntityDefault result = queue.poll(); 332 if (result != null) { writeQueueSize.decrementAndGet(); } 333 return result; 334 } 335 } 336 337 /** 338 * decorator for a callable to log a message before it is executed 339 * 340 * @author Kuali Rice Team (rice.collab@kuali.org) 341 * 342 */ 343 protected static class PreLogCallableWrapper<A> implements Callable<A> { 344 345 protected final Callable inner; 346 protected final Level level; 347 protected final String message; 348 349 public PreLogCallableWrapper(Callable inner, Level level, String message) { 350 this.inner = inner; 351 this.level = level; 352 this.message = message; 353 } 354 355 /** 356 * logs the message then calls the inner Callable 357 * 358 * @see java.util.concurrent.Callable#call() 359 */ 360 @Override 361 @SuppressWarnings("unchecked") 362 public A call() throws Exception { 363 LOG.log(level, message); 364 return (A)inner.call(); 365 } 366 } 367 368 /** 369 * Adapts a Callable to be Runnable 370 * 371 * @author Kuali Rice Team (rice.collab@kuali.org) 372 * 373 */ 374 protected static class CallableAdapter implements Runnable { 375 376 private final Callable callable; 377 378 public CallableAdapter(Callable callable) { 379 this.callable = callable; 380 } 381 382 @Override 383 public void run() { 384 try { 385 callable.call(); 386 } catch (Exception e) { 387 throw new RuntimeException(e); 388 } 389 } 390 } 391 392 /** 393 * @param dataObjectService the dataObjectService to set 394 */ 395 public void setDataObjectService(DataObjectService dataObjectService) { 396 this.dataObjectService = dataObjectService; 397 } 398}