001 /**
002 * Copyright 2005-2014 The Kuali Foundation
003 *
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/ecl2.php
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package org.kuali.rice.kim.service.impl;
017
018 import org.apache.commons.lang.StringUtils;
019 import org.apache.log4j.Level;
020 import org.apache.log4j.Logger;
021 import org.kuali.rice.core.api.config.property.ConfigurationService;
022 import org.kuali.rice.kim.api.KimConstants;
023 import org.kuali.rice.kim.impl.identity.IdentityArchiveService;
024 import org.kuali.rice.kim.api.identity.entity.EntityDefault;
025 import org.kuali.rice.kim.api.identity.principal.Principal;
026 import org.kuali.rice.kim.impl.identity.EntityDefaultInfoCacheBo;
027 import org.kuali.rice.krad.service.BusinessObjectService;
028 import org.kuali.rice.ksb.service.KSBServiceLocator;
029 import org.springframework.beans.factory.DisposableBean;
030 import org.springframework.beans.factory.InitializingBean;
031 import org.springframework.transaction.PlatformTransactionManager;
032 import org.springframework.transaction.TransactionStatus;
033 import org.springframework.transaction.support.TransactionCallback;
034 import org.springframework.transaction.support.TransactionTemplate;
035
036 import java.util.ArrayList;
037 import java.util.Arrays;
038 import java.util.Collection;
039 import java.util.Collections;
040 import java.util.Comparator;
041 import java.util.HashMap;
042 import java.util.HashSet;
043 import java.util.List;
044 import java.util.Map;
045 import java.util.Set;
046 import java.util.concurrent.Callable;
047 import java.util.concurrent.ConcurrentLinkedQueue;
048 import java.util.concurrent.TimeUnit;
049 import java.util.concurrent.atomic.AtomicBoolean;
050 import java.util.concurrent.atomic.AtomicInteger;
051
052 /**
053 * This is the default implementation for the IdentityArchiveService.
054 * @see IdentityArchiveService
055 *
056 * @author Kuali Rice Team (rice.collab@kuali.org)
057 */
058 public class IdentityArchiveServiceImpl implements IdentityArchiveService, InitializingBean, DisposableBean {
059 private static final Logger LOG = Logger.getLogger( IdentityArchiveServiceImpl.class );
060
061 private BusinessObjectService businessObjectService;
062 private ConfigurationService kualiConfigurationService;
063 private PlatformTransactionManager transactionManager;
064
065 private static final String EXEC_INTERVAL_SECS = "kim.identityArchiveServiceImpl.executionIntervalSeconds";
066 private static final String MAX_WRITE_QUEUE_SIZE = "kim.identityArchiveServiceImpl.maxWriteQueueSize";
067 private static final int EXECUTION_INTERVAL_SECONDS_DEFAULT = 600; // by default, flush the write queue this often
068 private static final int MAX_WRITE_QUEUE_SIZE_DEFAULT = 300; // cache this many KEDI's before forcing write
069
070 private final WriteQueue writeQueue = new WriteQueue();
071 private final EntityArchiveWriter writer = new EntityArchiveWriter();
072
073 // all this ceremony just decorates the writer so it logs a message first, and converts the Callable to Runnable
074 private final Runnable maxQueueSizeExceededWriter =
075 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "max size exceeded, flushing write queue"));
076
077 // ditto
078 private final Runnable scheduledWriter =
079 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "scheduled write out, flushing write queue"));
080
081 // ditto
082 private final Runnable shutdownWriter =
083 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "rice is shutting down, flushing write queue"));
084
085 private int getExecutionIntervalSeconds() {
086 final String prop = kualiConfigurationService.getPropertyValueAsString(EXEC_INTERVAL_SECS);
087 try {
088 return Integer.valueOf(prop).intValue();
089 } catch (NumberFormatException e) {
090 return EXECUTION_INTERVAL_SECONDS_DEFAULT;
091 }
092 }
093
094 private int getMaxWriteQueueSize() {
095 final String prop = kualiConfigurationService.getPropertyValueAsString(MAX_WRITE_QUEUE_SIZE);
096 try {
097 return Integer.valueOf(prop).intValue();
098 } catch (NumberFormatException e) {
099 return MAX_WRITE_QUEUE_SIZE_DEFAULT;
100 }
101 }
102
103 @Override
104 public EntityDefault getEntityDefaultFromArchive( String entityId ) {
105 if (StringUtils.isBlank(entityId)) {
106 throw new IllegalArgumentException("entityId is blank");
107 }
108
109 Map<String,String> criteria = new HashMap<String, String>(1);
110 criteria.put(KimConstants.PrimaryKeyConstants.SUB_ENTITY_ID, entityId);
111 EntityDefaultInfoCacheBo cachedValue = businessObjectService.findByPrimaryKey(EntityDefaultInfoCacheBo.class, criteria);
112 return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
113 }
114
115 @Override
116 public EntityDefault getEntityDefaultFromArchiveByPrincipalId(String principalId) {
117 if (StringUtils.isBlank(principalId)) {
118 throw new IllegalArgumentException("principalId is blank");
119 }
120
121 Map<String,String> criteria = new HashMap<String, String>(1);
122 criteria.put("principalId", principalId);
123 EntityDefaultInfoCacheBo cachedValue = businessObjectService.findByPrimaryKey(EntityDefaultInfoCacheBo.class, criteria);
124 return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
125 }
126
127 @Override
128 public EntityDefault getEntityDefaultFromArchiveByPrincipalName(String principalName) {
129 if (StringUtils.isBlank(principalName)) {
130 throw new IllegalArgumentException("principalName is blank");
131 }
132
133 Map<String,String> criteria = new HashMap<String, String>(1);
134 criteria.put("principalName", principalName);
135 Collection<EntityDefaultInfoCacheBo> entities = businessObjectService.findMatching(EntityDefaultInfoCacheBo.class, criteria);
136 return (entities == null || entities.isEmpty()) ? null : entities.iterator().next().convertCacheToEntityDefaultInfo();
137 }
138
139 @Override
140 public EntityDefault getEntityDefaultFromArchiveByEmployeeId(String employeeId) {
141 if (StringUtils.isBlank(employeeId)) {
142 throw new IllegalArgumentException("employeeId is blank");
143 }
144 Map<String,String> criteria = new HashMap<String, String>(1);
145 criteria.put("employeeId", employeeId);
146 Collection<EntityDefaultInfoCacheBo> entities = businessObjectService.findMatching(EntityDefaultInfoCacheBo.class, criteria);
147 return (entities == null || entities.isEmpty()) ? null : entities.iterator().next().convertCacheToEntityDefaultInfo();
148 }
149
150 @Override
151 public void saveEntityDefaultToArchive(EntityDefault entity) {
152 if (entity == null) {
153 throw new IllegalArgumentException("entity is blank");
154 }
155
156 // if the max size has been reached, schedule now
157 if (getMaxWriteQueueSize() <= writeQueue.offerAndGetSize(entity) /* <- this enqueues the KEDI */ &&
158 writer.requestSubmit()) {
159 KSBServiceLocator.getThreadPool().execute(maxQueueSizeExceededWriter);
160 }
161 }
162
163 @Override
164 public void flushToArchive() {
165 writer.call();
166 }
167
168
169 public void setBusinessObjectService(BusinessObjectService businessObjectService) {
170 this.businessObjectService = businessObjectService;
171 }
172
173 public void setKualiConfigurationService(
174 ConfigurationService kualiConfigurationService) {
175 this.kualiConfigurationService = kualiConfigurationService;
176 }
177
178 public void setTransactionManager(PlatformTransactionManager txMgr) {
179 this.transactionManager = txMgr;
180 }
181
182 /** schedule the writer on the KSB scheduled pool. */
183 @Override
184 public void afterPropertiesSet() throws Exception {
185 LOG.info("scheduling writer...");
186 KSBServiceLocator.getScheduledPool().scheduleAtFixedRate(scheduledWriter,
187 getExecutionIntervalSeconds(), getExecutionIntervalSeconds(), TimeUnit.SECONDS);
188 }
189
190 /** flush the write queue immediately. */
191 @Override
192 public void destroy() throws Exception {
193 KSBServiceLocator.getThreadPool().execute(shutdownWriter);
194 }
195
196 /**
197 * store the person to the database, but do this an alternate thread to
198 * prevent transaction issues since this service is non-transactional
199 *
200 * @author Kuali Rice Team (rice.collab@kuali.org)
201 *
202 */
203 private class EntityArchiveWriter implements Callable {
204
205 // flag used to prevent multiple processes from being submitted at once
206 AtomicBoolean currentlySubmitted = new AtomicBoolean(false);
207
208 private final Comparator<Comparable> nullSafeComparator = new Comparator<Comparable>() {
209 @Override
210 public int compare(Comparable i1, Comparable i2) {
211 if (i1 != null && i2 != null) {
212 return i1.compareTo(i2);
213 } else if (i1 == null) {
214 if (i2 == null) {
215 return 0;
216 } else {
217 return -1;
218 }
219 } else { // if (entityId2 == null) {
220 return 1;
221 }
222 };
223 };
224
225 /**
226 * Comparator that attempts to impose a total ordering on EntityDefault instances
227 */
228 private final Comparator<EntityDefault> kediComparator = new Comparator<EntityDefault>() {
229 /**
230 * compares by entityId value
231 * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
232 */
233 @Override
234 public int compare(EntityDefault o1, EntityDefault o2) {
235 String entityId1 = (o1 == null) ? null : o1.getEntityId();
236 String entityId2 = (o2 == null) ? null : o2.getEntityId();
237
238 int result = nullSafeComparator.compare(entityId1, entityId2);
239
240 if (result == 0) {
241 result = getPrincipalIdsString(o1).compareTo(getPrincipalIdsString(o2));
242 }
243
244 return result;
245 }
246
247 /**
248 * This method builds a newline delimited String containing the identity's principal IDs in sorted order
249 *
250 * @param entity
251 * @return
252 */
253 private String getPrincipalIdsString(EntityDefault entity) {
254 String result = "";
255 if (entity != null) {
256 List<Principal> principals = entity.getPrincipals();
257 if (principals != null) {
258 if (principals.size() == 1) { // one
259 result = principals.get(0).getPrincipalId();
260 } else { // multiple
261 String [] ids = new String [principals.size()];
262 int insertIndex = 0;
263 for (Principal principal : principals) {
264 ids[insertIndex++] = principal.getPrincipalId();
265 }
266 Arrays.sort(ids);
267 result = StringUtils.join(ids, "\n");
268 }
269 }
270 }
271 return result;
272 }
273 };
274
275 public boolean requestSubmit() {
276 return currentlySubmitted.compareAndSet(false, true);
277 }
278
279 /**
280 * Call that tries to flush the write queue.
281 * @see Callable#call()
282 */
283 @Override
284 public Object call() {
285 try {
286 // the strategy is to grab chunks of entities, dedupe & sort them, and insert them in a big
287 // batch to reduce transaction overhead. Sorting is done so insertion order is guaranteed, which
288 // prevents deadlocks between concurrent writers to the database.
289 TransactionTemplate template = new TransactionTemplate(transactionManager);
290 template.execute(new TransactionCallback() {
291 @Override
292 public Object doInTransaction(TransactionStatus status) {
293 EntityDefault entity = null;
294 ArrayList<EntityDefault> entitiesToInsert = new ArrayList<EntityDefault>(getMaxWriteQueueSize());
295 Set<String> deduper = new HashSet<String>(getMaxWriteQueueSize());
296
297 // order is important in this conditional so that elements aren't dequeued and then ignored
298 while (entitiesToInsert.size() < getMaxWriteQueueSize() && null != (entity = writeQueue.poll())) {
299 if (deduper.add(entity.getEntityId())) {
300 entitiesToInsert.add(entity);
301 }
302 }
303
304 Collections.sort(entitiesToInsert, kediComparator);
305 List<EntityDefaultInfoCacheBo> entityCache = new ArrayList<EntityDefaultInfoCacheBo>(entitiesToInsert.size());
306 for (EntityDefault entityToInsert : entitiesToInsert) {
307 entityCache.add(new EntityDefaultInfoCacheBo( entityToInsert ));
308 }
309 businessObjectService.save(entityCache);
310 //for (EntityDefault entityToInsert : entitiesToInsert) {
311 // businessObjectService.save( new EntityDefaultInfoCacheBo( entityToInsert ) );
312 //}
313 return null;
314 }
315 });
316 } finally { // make sure our running flag is unset, otherwise we'll never run again
317 currentlySubmitted.compareAndSet(true, false);
318 }
319
320 return Boolean.TRUE;
321 }
322 }
323
324 /**
325 * A class encapsulating a {@link ConcurrentLinkedQueue} and an {@link AtomicInteger} to
326 * provide fast offer(enqueue)/poll(dequeue) and size checking. Size may be approximate due to concurrent
327 * activity, but for our purposes that is fine.
328 *
329 * @author Kuali Rice Team (rice.collab@kuali.org)
330 *
331 */
332 private static class WriteQueue {
333 AtomicInteger writeQueueSize = new AtomicInteger(0);
334 ConcurrentLinkedQueue<EntityDefault> queue = new ConcurrentLinkedQueue<EntityDefault>();
335
336 public int offerAndGetSize(EntityDefault entity) {
337 queue.add(entity);
338 return writeQueueSize.incrementAndGet();
339 }
340
341 private EntityDefault poll() {
342 EntityDefault result = queue.poll();
343 if (result != null) { writeQueueSize.decrementAndGet(); }
344 return result;
345 }
346 }
347
348 /**
349 * decorator for a callable to log a message before it is executed
350 *
351 * @author Kuali Rice Team (rice.collab@kuali.org)
352 *
353 */
354 private static class PreLogCallableWrapper<A> implements Callable<A> {
355
356 private final Callable inner;
357 private final Level level;
358 private final String message;
359
360 public PreLogCallableWrapper(Callable inner, Level level, String message) {
361 this.inner = inner;
362 this.level = level;
363 this.message = message;
364 }
365
366 /**
367 * logs the message then calls the inner Callable
368 *
369 * @see java.util.concurrent.Callable#call()
370 */
371 @Override
372 @SuppressWarnings("unchecked")
373 public A call() throws Exception {
374 LOG.log(level, message);
375 return (A)inner.call();
376 }
377 }
378
379 /**
380 * Adapts a Callable to be Runnable
381 *
382 * @author Kuali Rice Team (rice.collab@kuali.org)
383 *
384 */
385 private static class CallableAdapter implements Runnable {
386
387 private final Callable callable;
388
389 public CallableAdapter(Callable callable) {
390 this.callable = callable;
391 }
392
393 @Override
394 public void run() {
395 try {
396 callable.call();
397 } catch (Exception e) {
398 throw new RuntimeException(e);
399 }
400 }
401 }
402 }