001 /* 002 * Copyright 2007-2009 The Kuali Foundation 003 * 004 * Licensed under the Educational Community License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/ecl2.php 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 package org.kuali.rice.kim.service.impl; 017 018 import java.util.ArrayList; 019 import java.util.Arrays; 020 import java.util.Collection; 021 import java.util.Collections; 022 import java.util.Comparator; 023 import java.util.HashMap; 024 import java.util.HashSet; 025 import java.util.List; 026 import java.util.Map; 027 import java.util.Set; 028 import java.util.concurrent.Callable; 029 import java.util.concurrent.ConcurrentLinkedQueue; 030 import java.util.concurrent.TimeUnit; 031 import java.util.concurrent.atomic.AtomicBoolean; 032 import java.util.concurrent.atomic.AtomicInteger; 033 034 import org.apache.commons.lang.StringUtils; 035 import org.apache.log4j.Level; 036 import org.apache.log4j.Logger; 037 import org.kuali.rice.core.config.ConfigContext; 038 import org.kuali.rice.core.config.RiceConfigurer; 039 import org.kuali.rice.core.config.event.AfterStartEvent; 040 import org.kuali.rice.core.config.event.BeforeStopEvent; 041 import org.kuali.rice.core.config.event.RiceConfigEvent; 042 import org.kuali.rice.core.config.event.RiceConfigEventListener; 043 import org.kuali.rice.core.util.RiceConstants; 044 import org.kuali.rice.kim.bo.entity.dto.KimEntityDefaultInfo; 045 import org.kuali.rice.kim.bo.entity.dto.KimPrincipalInfo; 046 import org.kuali.rice.kim.bo.entity.impl.KimEntityDefaultInfoCacheImpl; 047 import org.kuali.rice.kim.service.IdentityArchiveService; 048 import org.kuali.rice.kim.util.KimConstants; 049 import org.kuali.rice.kns.service.BusinessObjectService; 050 import org.kuali.rice.kns.service.KNSServiceLocator; 051 import org.kuali.rice.ksb.service.KSBServiceLocator; 052 import org.springframework.transaction.PlatformTransactionManager; 053 import org.springframework.transaction.TransactionStatus; 054 import org.springframework.transaction.support.TransactionCallback; 055 import org.springframework.transaction.support.TransactionTemplate; 056 057 058 /** 059 * This is the default implementation for the IdentityArchiveService. 060 * @see IdentityArchiveService 061 * @author Kuali Rice Team (rice.collab@kuali.org) 062 * 063 */ 064 public class IdentityArchiveServiceImpl implements IdentityArchiveService, RiceConfigEventListener { 065 private static final Logger LOG = Logger.getLogger( IdentityArchiveServiceImpl.class ); 066 067 private BusinessObjectService businessObjectService; 068 069 private static final String EXEC_INTERVAL_SECS = "kim.identityArchiveServiceImpl.executionIntervalSeconds"; 070 private static final String MAX_WRITE_QUEUE_SIZE = "kim.identityArchiveServiceImpl.maxWriteQueueSize"; 071 072 private int executionIntervalSeconds = 600; // by default, flush the write queue this often 073 private int maxWriteQueueSize = 300; // cache this many KEDI's before forcing write 074 private final WriteQueue writeQueue = new WriteQueue(); 075 private final EntityArchiveWriter writer = new EntityArchiveWriter(); 076 077 // all this ceremony just decorates the writer so it logs a message first, and converts the Callable to Runnable 078 private final Runnable maxQueueSizeExceededWriter = 079 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "max size exceeded, flushing write queue")); 080 081 // ditto 082 private final Runnable scheduledWriter = 083 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "scheduled write out, flushing write queue")); 084 085 // ditto 086 private final Runnable shutdownWriter = 087 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "rice is shutting down, flushing write queue")); 088 089 public IdentityArchiveServiceImpl(Integer executionIntervalSeconds, Integer maxWriteQueueSize) { 090 // register for RiceConfigEventS 091 RiceConfigurer rice = 092 (RiceConfigurer)ConfigContext.getCurrentContextConfig().getObject( RiceConstants.RICE_CONFIGURER_CONFIG_NAME ); 093 LOG.debug("registering for events..."); 094 rice.getKimConfigurer().registerConfigEventListener(this); 095 096 if (executionIntervalSeconds != null) { 097 this.executionIntervalSeconds = executionIntervalSeconds; 098 } 099 100 if (maxWriteQueueSize != null) { 101 this.maxWriteQueueSize = maxWriteQueueSize; 102 } 103 } 104 105 protected BusinessObjectService getBusinessObjectService() { 106 if ( businessObjectService == null ) { 107 businessObjectService = KNSServiceLocator.getBusinessObjectService(); 108 } 109 return businessObjectService; 110 } 111 112 public KimEntityDefaultInfo getEntityDefaultInfoFromArchive( String entityId ) { 113 Map<String,String> criteria = new HashMap<String, String>(1); 114 criteria.put(KimConstants.PrimaryKeyConstants.ENTITY_ID, entityId); 115 KimEntityDefaultInfoCacheImpl cachedValue = (KimEntityDefaultInfoCacheImpl)getBusinessObjectService().findByPrimaryKey(KimEntityDefaultInfoCacheImpl.class, criteria); 116 return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo(); 117 } 118 119 public KimEntityDefaultInfo getEntityDefaultInfoFromArchiveByPrincipalId( String principalId ) { 120 Map<String,String> criteria = new HashMap<String, String>(1); 121 criteria.put("principalId", principalId); 122 KimEntityDefaultInfoCacheImpl cachedValue = (KimEntityDefaultInfoCacheImpl)getBusinessObjectService().findByPrimaryKey(KimEntityDefaultInfoCacheImpl.class, criteria); 123 return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo(); 124 } 125 126 @SuppressWarnings("unchecked") 127 public KimEntityDefaultInfo getEntityDefaultInfoFromArchiveByPrincipalName( String principalName ) { 128 Map<String,String> criteria = new HashMap<String, String>(1); 129 criteria.put("principalName", principalName); 130 Collection<KimEntityDefaultInfoCacheImpl> entities = getBusinessObjectService().findMatching(KimEntityDefaultInfoCacheImpl.class, criteria); 131 return (entities == null || entities.size() == 0) ? null : entities.iterator().next().convertCacheToEntityDefaultInfo(); 132 } 133 134 public void saveDefaultInfoToArchive( KimEntityDefaultInfo entity ) { 135 // if the max size has been reached, schedule now 136 if (maxWriteQueueSize <= writeQueue.offerAndGetSize(entity) /* <- this enqueues the KEDI */ && 137 writer.requestSubmit()) { 138 KSBServiceLocator.getThreadPool().execute(maxQueueSizeExceededWriter); 139 } 140 } 141 142 /** 143 * <h4>On events:</h4> 144 * <p>{@link AfterStartEvent}: schedule the writer on the KSB scheduled pool 145 * <p>{@link BeforeStopEvent}: flush the write queue immediately 146 * 147 * @see org.springframework.context.ApplicationListener#onApplicationEvent(org.springframework.context.ApplicationEvent) 148 */ 149 public void onEvent(final RiceConfigEvent event) { 150 if (event instanceof AfterStartEvent) { 151 // on startup, schedule this to run 152 LOG.info("scheduling writer..."); 153 KSBServiceLocator.getScheduledPool().scheduleAtFixedRate(scheduledWriter, 154 executionIntervalSeconds, executionIntervalSeconds, TimeUnit.SECONDS); 155 } else if (event instanceof BeforeStopEvent) { 156 KSBServiceLocator.getThreadPool().execute(shutdownWriter); 157 } 158 } 159 160 /** 161 * store the person to the database, but do this an alternate thread to 162 * prevent transaction issues since this service is non-transactional 163 * 164 * @author Kuali Rice Team (rice.collab@kuali.org) 165 * 166 */ 167 private class EntityArchiveWriter implements Callable { 168 169 // flag used to prevent multiple processes from being submitted at once 170 AtomicBoolean currentlySubmitted = new AtomicBoolean(false); 171 172 private final Comparator<Comparable> nullSafeComparator = new Comparator<Comparable>() { 173 public int compare(Comparable i1, Comparable i2) { 174 if (i1 != null && i2 != null) { 175 return i1.compareTo(i2); 176 } else if (i1 == null) { 177 if (i2 == null) { 178 return 0; 179 } else { 180 return -1; 181 } 182 } else { // if (entityId2 == null) { 183 return 1; 184 } 185 }; 186 }; 187 188 /** 189 * Comparator that attempts to impose a total ordering on KimEntityDefaultInfo instances 190 */ 191 private final Comparator<KimEntityDefaultInfo> kediComparator = new Comparator<KimEntityDefaultInfo>() { 192 /** 193 * compares by entityId value 194 * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object) 195 */ 196 public int compare(KimEntityDefaultInfo o1, KimEntityDefaultInfo o2) { 197 String entityId1 = (o1 == null) ? null : o1.getEntityId(); 198 String entityId2 = (o2 == null) ? null : o2.getEntityId(); 199 200 int result = nullSafeComparator.compare(entityId1, entityId2); 201 202 if (result == 0) { 203 result = getPrincipalIdsString(o1).compareTo(getPrincipalIdsString(o2)); 204 } 205 206 return result; 207 } 208 209 /** 210 * This method builds a newline delimited String containing the entity's principal IDs in sorted order 211 * 212 * @param entity 213 * @return 214 */ 215 private String getPrincipalIdsString(KimEntityDefaultInfo entity) { 216 String result = ""; 217 if (entity != null) { 218 List<KimPrincipalInfo> principals = entity.getPrincipals(); 219 if (principals != null) { 220 if (principals.size() == 1) { // one 221 result = principals.get(0).getPrincipalId(); 222 } else { // multiple 223 String [] ids = new String [principals.size()]; 224 int insertIndex = 0; 225 for (KimPrincipalInfo principal : principals) { 226 ids[insertIndex++] = principal.getPrincipalId(); 227 } 228 Arrays.sort(ids); 229 result = StringUtils.join(ids, "\n"); 230 } 231 } 232 } 233 return result; 234 } 235 }; 236 237 public boolean requestSubmit() { 238 return currentlySubmitted.compareAndSet(false, true); 239 } 240 241 /** 242 * Call that tries to flush the write queue. 243 * @see Callable#call() 244 */ 245 public Object call() { 246 try { 247 // the strategy is to grab chunks of entities, dedupe & sort them, and insert them in a big 248 // batch to reduce transaction overhead. Sorting is done so insertion order is guaranteed, which 249 // prevents deadlocks between concurrent writers to the database. 250 PlatformTransactionManager transactionManager = KNSServiceLocator.getTransactionManager(); 251 TransactionTemplate template = new TransactionTemplate(transactionManager); 252 template.execute(new TransactionCallback() { 253 public Object doInTransaction(TransactionStatus status) { 254 KimEntityDefaultInfo entity = null; 255 ArrayList<KimEntityDefaultInfo> entitiesToInsert = new ArrayList<KimEntityDefaultInfo>(maxWriteQueueSize); 256 Set<String> deduper = new HashSet<String>(maxWriteQueueSize); 257 258 // order is important in this conditional so that elements aren't dequeued and then ignored 259 while (entitiesToInsert.size() < maxWriteQueueSize && null != (entity = writeQueue.poll())) { 260 if (deduper.add(entity.getEntityId())) entitiesToInsert.add(entity); 261 } 262 263 Collections.sort(entitiesToInsert, kediComparator); 264 265 for (KimEntityDefaultInfo entityToInsert : entitiesToInsert) { 266 getBusinessObjectService().save( new KimEntityDefaultInfoCacheImpl( entityToInsert ) ); 267 } 268 return null; 269 } 270 }); 271 } finally { // make sure our running flag is unset, otherwise we'll never run again 272 currentlySubmitted.compareAndSet(true, false); 273 } 274 275 return Boolean.TRUE; 276 } 277 } 278 279 /** 280 * A class encapsulating a {@link ConcurrentLinkedQueue} and an {@link AtomicInteger} to 281 * provide fast offer(enqueue)/poll(dequeue) and size checking. Size may be approximate due to concurrent 282 * activity, but for our purposes that is fine. 283 * 284 * @author Kuali Rice Team (rice.collab@kuali.org) 285 * 286 */ 287 private static class WriteQueue { 288 AtomicInteger writeQueueSize = new AtomicInteger(0); 289 ConcurrentLinkedQueue<KimEntityDefaultInfo> queue = new ConcurrentLinkedQueue<KimEntityDefaultInfo>(); 290 291 public int offerAndGetSize(KimEntityDefaultInfo entity) { 292 queue.add(entity); 293 return writeQueueSize.incrementAndGet(); 294 } 295 296 private KimEntityDefaultInfo poll() { 297 KimEntityDefaultInfo result = queue.poll(); 298 if (result != null) { writeQueueSize.decrementAndGet(); } 299 return result; 300 } 301 302 private int getSize() { 303 return writeQueueSize.get(); 304 } 305 } 306 307 /** 308 * decorator for a callable to log a message before it is executed 309 * 310 * @author Kuali Rice Team (rice.collab@kuali.org) 311 * 312 */ 313 private static class PreLogCallableWrapper<A> implements Callable<A> { 314 315 private final Callable inner; 316 private final Level level; 317 private final String message; 318 319 public PreLogCallableWrapper(Callable inner, Level level, String message) { 320 this.inner = inner; 321 this.level = level; 322 this.message = message; 323 } 324 325 /** 326 * logs the message then calls the inner Callable 327 * 328 * @see java.util.concurrent.Callable#call() 329 */ 330 @SuppressWarnings("unchecked") 331 public A call() throws Exception { 332 LOG.log(level, message); 333 return (A)inner.call(); 334 } 335 } 336 337 /** 338 * Adapts a Callable to be Runnable 339 * 340 * @author Kuali Rice Team (rice.collab@kuali.org) 341 * 342 */ 343 private static class CallableAdapter implements Runnable { 344 345 private final Callable callable; 346 347 public CallableAdapter(Callable callable) { 348 this.callable = callable; 349 } 350 351 public void run() { 352 try { 353 callable.call(); 354 } catch (Exception e) { 355 throw new RuntimeException(e); 356 } 357 } 358 } 359 }