001 /*
002 * Copyright 2006-2011 The Kuali Foundation
003 *
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/ecl2.php
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017 package org.kuali.rice.kim.service.impl;
018
019 import org.apache.commons.lang.StringUtils;
020 import org.apache.log4j.Level;
021 import org.apache.log4j.Logger;
022 import org.kuali.rice.core.api.config.property.ConfigurationService;
023 import org.kuali.rice.kim.api.identity.entity.EntityDefault;
024 import org.kuali.rice.kim.api.identity.principal.Principal;
025 import org.kuali.rice.kim.api.services.IdentityArchiveService;
026 import org.kuali.rice.kim.bo.entity.impl.KimEntityDefaultInfoCacheImpl;
027 import org.kuali.rice.kim.util.KimConstants;
028 import org.kuali.rice.krad.service.BusinessObjectService;
029 import org.kuali.rice.krad.service.KRADServiceLocatorInternal;
030 import org.kuali.rice.ksb.service.KSBServiceLocator;
031 import org.springframework.beans.factory.DisposableBean;
032 import org.springframework.beans.factory.InitializingBean;
033 import org.springframework.transaction.PlatformTransactionManager;
034 import org.springframework.transaction.TransactionStatus;
035 import org.springframework.transaction.support.TransactionCallback;
036 import org.springframework.transaction.support.TransactionTemplate;
037
038 import java.util.ArrayList;
039 import java.util.Arrays;
040 import java.util.Collection;
041 import java.util.Collections;
042 import java.util.Comparator;
043 import java.util.HashMap;
044 import java.util.HashSet;
045 import java.util.List;
046 import java.util.Map;
047 import java.util.Set;
048 import java.util.concurrent.Callable;
049 import java.util.concurrent.ConcurrentLinkedQueue;
050 import java.util.concurrent.TimeUnit;
051 import java.util.concurrent.atomic.AtomicBoolean;
052 import java.util.concurrent.atomic.AtomicInteger;
053
054 /**
055 * This is the default implementation for the IdentityArchiveService.
056 * @see IdentityArchiveService
057 * @author Kuali Rice Team (rice.collab@kuali.org)
058 *
059 */
060 public class IdentityArchiveServiceImpl implements IdentityArchiveService, InitializingBean, DisposableBean {
061 private static final Logger LOG = Logger.getLogger( IdentityArchiveServiceImpl.class );
062
063 private BusinessObjectService businessObjectService;
064 private ConfigurationService kualiConfigurationService;
065
066 private static final String EXEC_INTERVAL_SECS = "kim.identityArchiveServiceImpl.executionIntervalSeconds";
067 private static final String MAX_WRITE_QUEUE_SIZE = "kim.identityArchiveServiceImpl.maxWriteQueueSize";
068 private static final int EXECUTION_INTERVAL_SECONDS_DEFAULT = 600; // by default, flush the write queue this often
069 private static final int MAX_WRITE_QUEUE_SIZE_DEFAULT = 300; // cache this many KEDI's before forcing write
070
071 private final WriteQueue writeQueue = new WriteQueue();
072 private final EntityArchiveWriter writer = new EntityArchiveWriter();
073
074 // all this ceremony just decorates the writer so it logs a message first, and converts the Callable to Runnable
075 private final Runnable maxQueueSizeExceededWriter =
076 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "max size exceeded, flushing write queue"));
077
078 // ditto
079 private final Runnable scheduledWriter =
080 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "scheduled write out, flushing write queue"));
081
082 // ditto
083 private final Runnable shutdownWriter =
084 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "rice is shutting down, flushing write queue"));
085
086 private int getExecutionIntervalSeconds() {
087 final String prop = kualiConfigurationService.getPropertyString(EXEC_INTERVAL_SECS);
088 try {
089 return Integer.valueOf(prop).intValue();
090 } catch (NumberFormatException e) {
091 return EXECUTION_INTERVAL_SECONDS_DEFAULT;
092 }
093 }
094
095 private int getMaxWriteQueueSize() {
096 final String prop = kualiConfigurationService.getPropertyString(MAX_WRITE_QUEUE_SIZE);
097 try {
098 return Integer.valueOf(prop).intValue();
099 } catch (NumberFormatException e) {
100 return MAX_WRITE_QUEUE_SIZE_DEFAULT;
101 }
102 }
103
104 @Override
105 public EntityDefault getEntityDefaultInfoFromArchive( String entityId ) {
106 Map<String,String> criteria = new HashMap<String, String>(1);
107 criteria.put(KimConstants.PrimaryKeyConstants.ENTITY_ID, entityId);
108 KimEntityDefaultInfoCacheImpl cachedValue = getBusinessObjectService().findByPrimaryKey(KimEntityDefaultInfoCacheImpl.class, criteria);
109 return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
110 }
111
112 @Override
113 public EntityDefault getEntityDefaultInfoFromArchiveByPrincipalId( String principalId ) {
114 Map<String,String> criteria = new HashMap<String, String>(1);
115 criteria.put("principalId", principalId);
116 KimEntityDefaultInfoCacheImpl cachedValue = getBusinessObjectService().findByPrimaryKey(KimEntityDefaultInfoCacheImpl.class, criteria);
117 return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
118 }
119
120 @Override
121 public EntityDefault getEntityDefaultInfoFromArchiveByPrincipalName( String principalName ) {
122 Map<String,String> criteria = new HashMap<String, String>(1);
123 criteria.put("principalName", principalName);
124 Collection<KimEntityDefaultInfoCacheImpl> entities = getBusinessObjectService().findMatching(KimEntityDefaultInfoCacheImpl.class, criteria);
125 return (entities == null || entities.size() == 0) ? null : entities.iterator().next().convertCacheToEntityDefaultInfo();
126 }
127
128 @Override
129 public void saveDefaultInfoToArchive( EntityDefault entity ) {
130 // if the max size has been reached, schedule now
131 if (getMaxWriteQueueSize() <= writeQueue.offerAndGetSize(entity) /* <- this enqueues the KEDI */ &&
132 writer.requestSubmit()) {
133 KSBServiceLocator.getThreadPool().execute(maxQueueSizeExceededWriter);
134 }
135 }
136
137 public BusinessObjectService getBusinessObjectService() {
138 return this.businessObjectService;
139 }
140
141 public void setBusinessObjectService(BusinessObjectService businessObjectService) {
142 this.businessObjectService = businessObjectService;
143 }
144
145 public ConfigurationService getKualiConfigurationService() {
146 return this.kualiConfigurationService;
147 }
148
149 public void setKualiConfigurationService(
150 ConfigurationService kualiConfigurationService) {
151 this.kualiConfigurationService = kualiConfigurationService;
152 }
153
154 /** schedule the writer on the KSB scheduled pool. */
155 @Override
156 public void afterPropertiesSet() throws Exception {
157 LOG.info("scheduling writer...");
158 KSBServiceLocator.getScheduledPool().scheduleAtFixedRate(scheduledWriter,
159 getExecutionIntervalSeconds(), getExecutionIntervalSeconds(), TimeUnit.SECONDS);
160 }
161
162 /** flush the write queue immediately. */
163 @Override
164 public void destroy() throws Exception {
165 KSBServiceLocator.getThreadPool().execute(shutdownWriter);
166 }
167
168 /**
169 * store the person to the database, but do this an alternate thread to
170 * prevent transaction issues since this service is non-transactional
171 *
172 * @author Kuali Rice Team (rice.collab@kuali.org)
173 *
174 */
175 private class EntityArchiveWriter implements Callable {
176
177 // flag used to prevent multiple processes from being submitted at once
178 AtomicBoolean currentlySubmitted = new AtomicBoolean(false);
179
180 private final Comparator<Comparable> nullSafeComparator = new Comparator<Comparable>() {
181 @Override
182 public int compare(Comparable i1, Comparable i2) {
183 if (i1 != null && i2 != null) {
184 return i1.compareTo(i2);
185 } else if (i1 == null) {
186 if (i2 == null) {
187 return 0;
188 } else {
189 return -1;
190 }
191 } else { // if (entityId2 == null) {
192 return 1;
193 }
194 };
195 };
196
197 /**
198 * Comparator that attempts to impose a total ordering on EntityDefault instances
199 */
200 private final Comparator<EntityDefault> kediComparator = new Comparator<EntityDefault>() {
201 /**
202 * compares by entityId value
203 * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
204 */
205 @Override
206 public int compare(EntityDefault o1, EntityDefault o2) {
207 String entityId1 = (o1 == null) ? null : o1.getEntityId();
208 String entityId2 = (o2 == null) ? null : o2.getEntityId();
209
210 int result = nullSafeComparator.compare(entityId1, entityId2);
211
212 if (result == 0) {
213 result = getPrincipalIdsString(o1).compareTo(getPrincipalIdsString(o2));
214 }
215
216 return result;
217 }
218
219 /**
220 * This method builds a newline delimited String containing the identity's principal IDs in sorted order
221 *
222 * @param entity
223 * @return
224 */
225 private String getPrincipalIdsString(EntityDefault entity) {
226 String result = "";
227 if (entity != null) {
228 List<Principal> principals = entity.getPrincipals();
229 if (principals != null) {
230 if (principals.size() == 1) { // one
231 result = principals.get(0).getPrincipalId();
232 } else { // multiple
233 String [] ids = new String [principals.size()];
234 int insertIndex = 0;
235 for (Principal principal : principals) {
236 ids[insertIndex++] = principal.getPrincipalId();
237 }
238 Arrays.sort(ids);
239 result = StringUtils.join(ids, "\n");
240 }
241 }
242 }
243 return result;
244 }
245 };
246
247 public boolean requestSubmit() {
248 return currentlySubmitted.compareAndSet(false, true);
249 }
250
251 /**
252 * Call that tries to flush the write queue.
253 * @see Callable#call()
254 */
255 @Override
256 public Object call() {
257 try {
258 // the strategy is to grab chunks of entities, dedupe & sort them, and insert them in a big
259 // batch to reduce transaction overhead. Sorting is done so insertion order is guaranteed, which
260 // prevents deadlocks between concurrent writers to the database.
261 PlatformTransactionManager transactionManager = KRADServiceLocatorInternal.getTransactionManager();
262 TransactionTemplate template = new TransactionTemplate(transactionManager);
263 template.execute(new TransactionCallback() {
264 @Override
265 public Object doInTransaction(TransactionStatus status) {
266 EntityDefault entity = null;
267 ArrayList<EntityDefault> entitiesToInsert = new ArrayList<EntityDefault>(getMaxWriteQueueSize());
268 Set<String> deduper = new HashSet<String>(getMaxWriteQueueSize());
269
270 // order is important in this conditional so that elements aren't dequeued and then ignored
271 while (entitiesToInsert.size() < getMaxWriteQueueSize() && null != (entity = writeQueue.poll())) {
272 if (deduper.add(entity.getEntityId())) {
273 entitiesToInsert.add(entity);
274 }
275 }
276
277 Collections.sort(entitiesToInsert, kediComparator);
278
279 for (EntityDefault entityToInsert : entitiesToInsert) {
280 getBusinessObjectService().save( new KimEntityDefaultInfoCacheImpl( entityToInsert ) );
281 }
282 return null;
283 }
284 });
285 } finally { // make sure our running flag is unset, otherwise we'll never run again
286 currentlySubmitted.compareAndSet(true, false);
287 }
288
289 return Boolean.TRUE;
290 }
291 }
292
293 /**
294 * A class encapsulating a {@link ConcurrentLinkedQueue} and an {@link AtomicInteger} to
295 * provide fast offer(enqueue)/poll(dequeue) and size checking. Size may be approximate due to concurrent
296 * activity, but for our purposes that is fine.
297 *
298 * @author Kuali Rice Team (rice.collab@kuali.org)
299 *
300 */
301 private static class WriteQueue {
302 AtomicInteger writeQueueSize = new AtomicInteger(0);
303 ConcurrentLinkedQueue<EntityDefault> queue = new ConcurrentLinkedQueue<EntityDefault>();
304
305 public int offerAndGetSize(EntityDefault entity) {
306 queue.add(entity);
307 return writeQueueSize.incrementAndGet();
308 }
309
310 private EntityDefault poll() {
311 EntityDefault result = queue.poll();
312 if (result != null) { writeQueueSize.decrementAndGet(); }
313 return result;
314 }
315 }
316
317 /**
318 * decorator for a callable to log a message before it is executed
319 *
320 * @author Kuali Rice Team (rice.collab@kuali.org)
321 *
322 */
323 private static class PreLogCallableWrapper<A> implements Callable<A> {
324
325 private final Callable inner;
326 private final Level level;
327 private final String message;
328
329 public PreLogCallableWrapper(Callable inner, Level level, String message) {
330 this.inner = inner;
331 this.level = level;
332 this.message = message;
333 }
334
335 /**
336 * logs the message then calls the inner Callable
337 *
338 * @see java.util.concurrent.Callable#call()
339 */
340 @Override
341 @SuppressWarnings("unchecked")
342 public A call() throws Exception {
343 LOG.log(level, message);
344 return (A)inner.call();
345 }
346 }
347
348 /**
349 * Adapts a Callable to be Runnable
350 *
351 * @author Kuali Rice Team (rice.collab@kuali.org)
352 *
353 */
354 private static class CallableAdapter implements Runnable {
355
356 private final Callable callable;
357
358 public CallableAdapter(Callable callable) {
359 this.callable = callable;
360 }
361
362 @Override
363 public void run() {
364 try {
365 callable.call();
366 } catch (Exception e) {
367 throw new RuntimeException(e);
368 }
369 }
370 }
371 }