001 /*
002 * Copyright 2007-2009 The Kuali Foundation
003 *
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/ecl2.php
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package org.kuali.rice.kim.service.impl;
017
018 import java.util.ArrayList;
019 import java.util.Arrays;
020 import java.util.Collection;
021 import java.util.Collections;
022 import java.util.Comparator;
023 import java.util.HashMap;
024 import java.util.HashSet;
025 import java.util.List;
026 import java.util.Map;
027 import java.util.Set;
028 import java.util.concurrent.Callable;
029 import java.util.concurrent.ConcurrentLinkedQueue;
030 import java.util.concurrent.TimeUnit;
031 import java.util.concurrent.atomic.AtomicBoolean;
032 import java.util.concurrent.atomic.AtomicInteger;
033
034 import org.apache.commons.lang.StringUtils;
035 import org.apache.log4j.Level;
036 import org.apache.log4j.Logger;
037 import org.kuali.rice.core.config.ConfigContext;
038 import org.kuali.rice.core.config.RiceConfigurer;
039 import org.kuali.rice.core.config.event.AfterStartEvent;
040 import org.kuali.rice.core.config.event.BeforeStopEvent;
041 import org.kuali.rice.core.config.event.RiceConfigEvent;
042 import org.kuali.rice.core.config.event.RiceConfigEventListener;
043 import org.kuali.rice.core.util.RiceConstants;
044 import org.kuali.rice.kim.bo.entity.dto.KimEntityDefaultInfo;
045 import org.kuali.rice.kim.bo.entity.dto.KimPrincipalInfo;
046 import org.kuali.rice.kim.bo.entity.impl.KimEntityDefaultInfoCacheImpl;
047 import org.kuali.rice.kim.service.IdentityArchiveService;
048 import org.kuali.rice.kim.util.KimConstants;
049 import org.kuali.rice.kns.service.BusinessObjectService;
050 import org.kuali.rice.kns.service.KNSServiceLocator;
051 import org.kuali.rice.ksb.service.KSBServiceLocator;
052 import org.springframework.transaction.PlatformTransactionManager;
053 import org.springframework.transaction.TransactionStatus;
054 import org.springframework.transaction.support.TransactionCallback;
055 import org.springframework.transaction.support.TransactionTemplate;
056
057
058 /**
059 * This is the default implementation for the IdentityArchiveService.
060 * @see IdentityArchiveService
061 * @author Kuali Rice Team (rice.collab@kuali.org)
062 *
063 */
064 public class IdentityArchiveServiceImpl implements IdentityArchiveService, RiceConfigEventListener {
065 private static final Logger LOG = Logger.getLogger( IdentityArchiveServiceImpl.class );
066
067 private BusinessObjectService businessObjectService;
068
069 private static final String EXEC_INTERVAL_SECS = "kim.identityArchiveServiceImpl.executionIntervalSeconds";
070 private static final String MAX_WRITE_QUEUE_SIZE = "kim.identityArchiveServiceImpl.maxWriteQueueSize";
071
072 private int executionIntervalSeconds = 600; // by default, flush the write queue this often
073 private int maxWriteQueueSize = 300; // cache this many KEDI's before forcing write
074 private final WriteQueue writeQueue = new WriteQueue();
075 private final EntityArchiveWriter writer = new EntityArchiveWriter();
076
077 // all this ceremony just decorates the writer so it logs a message first, and converts the Callable to Runnable
078 private final Runnable maxQueueSizeExceededWriter =
079 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "max size exceeded, flushing write queue"));
080
081 // ditto
082 private final Runnable scheduledWriter =
083 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "scheduled write out, flushing write queue"));
084
085 // ditto
086 private final Runnable shutdownWriter =
087 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "rice is shutting down, flushing write queue"));
088
089 public IdentityArchiveServiceImpl(Integer executionIntervalSeconds, Integer maxWriteQueueSize) {
090 // register for RiceConfigEventS
091 RiceConfigurer rice =
092 (RiceConfigurer)ConfigContext.getCurrentContextConfig().getObject( RiceConstants.RICE_CONFIGURER_CONFIG_NAME );
093 LOG.debug("registering for events...");
094 rice.getKimConfigurer().registerConfigEventListener(this);
095
096 if (executionIntervalSeconds != null) {
097 this.executionIntervalSeconds = executionIntervalSeconds;
098 }
099
100 if (maxWriteQueueSize != null) {
101 this.maxWriteQueueSize = maxWriteQueueSize;
102 }
103 }
104
105 protected BusinessObjectService getBusinessObjectService() {
106 if ( businessObjectService == null ) {
107 businessObjectService = KNSServiceLocator.getBusinessObjectService();
108 }
109 return businessObjectService;
110 }
111
112 public KimEntityDefaultInfo getEntityDefaultInfoFromArchive( String entityId ) {
113 Map<String,String> criteria = new HashMap<String, String>(1);
114 criteria.put(KimConstants.PrimaryKeyConstants.ENTITY_ID, entityId);
115 KimEntityDefaultInfoCacheImpl cachedValue = (KimEntityDefaultInfoCacheImpl)getBusinessObjectService().findByPrimaryKey(KimEntityDefaultInfoCacheImpl.class, criteria);
116 return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
117 }
118
119 public KimEntityDefaultInfo getEntityDefaultInfoFromArchiveByPrincipalId( String principalId ) {
120 Map<String,String> criteria = new HashMap<String, String>(1);
121 criteria.put("principalId", principalId);
122 KimEntityDefaultInfoCacheImpl cachedValue = (KimEntityDefaultInfoCacheImpl)getBusinessObjectService().findByPrimaryKey(KimEntityDefaultInfoCacheImpl.class, criteria);
123 return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
124 }
125
126 @SuppressWarnings("unchecked")
127 public KimEntityDefaultInfo getEntityDefaultInfoFromArchiveByPrincipalName( String principalName ) {
128 Map<String,String> criteria = new HashMap<String, String>(1);
129 criteria.put("principalName", principalName);
130 Collection<KimEntityDefaultInfoCacheImpl> entities = getBusinessObjectService().findMatching(KimEntityDefaultInfoCacheImpl.class, criteria);
131 return (entities == null || entities.size() == 0) ? null : entities.iterator().next().convertCacheToEntityDefaultInfo();
132 }
133
134 public void saveDefaultInfoToArchive( KimEntityDefaultInfo entity ) {
135 // if the max size has been reached, schedule now
136 if (maxWriteQueueSize <= writeQueue.offerAndGetSize(entity) /* <- this enqueues the KEDI */ &&
137 writer.requestSubmit()) {
138 KSBServiceLocator.getThreadPool().execute(maxQueueSizeExceededWriter);
139 }
140 }
141
142 /**
143 * <h4>On events:</h4>
144 * <p>{@link AfterStartEvent}: schedule the writer on the KSB scheduled pool
145 * <p>{@link BeforeStopEvent}: flush the write queue immediately
146 *
147 * @see org.springframework.context.ApplicationListener#onApplicationEvent(org.springframework.context.ApplicationEvent)
148 */
149 public void onEvent(final RiceConfigEvent event) {
150 if (event instanceof AfterStartEvent) {
151 // on startup, schedule this to run
152 LOG.info("scheduling writer...");
153 KSBServiceLocator.getScheduledPool().scheduleAtFixedRate(scheduledWriter,
154 executionIntervalSeconds, executionIntervalSeconds, TimeUnit.SECONDS);
155 } else if (event instanceof BeforeStopEvent) {
156 KSBServiceLocator.getThreadPool().execute(shutdownWriter);
157 }
158 }
159
160 /**
161 * store the person to the database, but do this an alternate thread to
162 * prevent transaction issues since this service is non-transactional
163 *
164 * @author Kuali Rice Team (rice.collab@kuali.org)
165 *
166 */
167 private class EntityArchiveWriter implements Callable {
168
169 // flag used to prevent multiple processes from being submitted at once
170 AtomicBoolean currentlySubmitted = new AtomicBoolean(false);
171
172 private final Comparator<Comparable> nullSafeComparator = new Comparator<Comparable>() {
173 public int compare(Comparable i1, Comparable i2) {
174 if (i1 != null && i2 != null) {
175 return i1.compareTo(i2);
176 } else if (i1 == null) {
177 if (i2 == null) {
178 return 0;
179 } else {
180 return -1;
181 }
182 } else { // if (entityId2 == null) {
183 return 1;
184 }
185 };
186 };
187
188 /**
189 * Comparator that attempts to impose a total ordering on KimEntityDefaultInfo instances
190 */
191 private final Comparator<KimEntityDefaultInfo> kediComparator = new Comparator<KimEntityDefaultInfo>() {
192 /**
193 * compares by entityId value
194 * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
195 */
196 public int compare(KimEntityDefaultInfo o1, KimEntityDefaultInfo o2) {
197 String entityId1 = (o1 == null) ? null : o1.getEntityId();
198 String entityId2 = (o2 == null) ? null : o2.getEntityId();
199
200 int result = nullSafeComparator.compare(entityId1, entityId2);
201
202 if (result == 0) {
203 result = getPrincipalIdsString(o1).compareTo(getPrincipalIdsString(o2));
204 }
205
206 return result;
207 }
208
209 /**
210 * This method builds a newline delimited String containing the entity's principal IDs in sorted order
211 *
212 * @param entity
213 * @return
214 */
215 private String getPrincipalIdsString(KimEntityDefaultInfo entity) {
216 String result = "";
217 if (entity != null) {
218 List<KimPrincipalInfo> principals = entity.getPrincipals();
219 if (principals != null) {
220 if (principals.size() == 1) { // one
221 result = principals.get(0).getPrincipalId();
222 } else { // multiple
223 String [] ids = new String [principals.size()];
224 int insertIndex = 0;
225 for (KimPrincipalInfo principal : principals) {
226 ids[insertIndex++] = principal.getPrincipalId();
227 }
228 Arrays.sort(ids);
229 result = StringUtils.join(ids, "\n");
230 }
231 }
232 }
233 return result;
234 }
235 };
236
237 public boolean requestSubmit() {
238 return currentlySubmitted.compareAndSet(false, true);
239 }
240
241 /**
242 * Call that tries to flush the write queue.
243 * @see Callable#call()
244 */
245 public Object call() {
246 try {
247 // the strategy is to grab chunks of entities, dedupe & sort them, and insert them in a big
248 // batch to reduce transaction overhead. Sorting is done so insertion order is guaranteed, which
249 // prevents deadlocks between concurrent writers to the database.
250 PlatformTransactionManager transactionManager = KNSServiceLocator.getTransactionManager();
251 TransactionTemplate template = new TransactionTemplate(transactionManager);
252 template.execute(new TransactionCallback() {
253 public Object doInTransaction(TransactionStatus status) {
254 KimEntityDefaultInfo entity = null;
255 ArrayList<KimEntityDefaultInfo> entitiesToInsert = new ArrayList<KimEntityDefaultInfo>(maxWriteQueueSize);
256 Set<String> deduper = new HashSet<String>(maxWriteQueueSize);
257
258 // order is important in this conditional so that elements aren't dequeued and then ignored
259 while (entitiesToInsert.size() < maxWriteQueueSize && null != (entity = writeQueue.poll())) {
260 if (deduper.add(entity.getEntityId())) entitiesToInsert.add(entity);
261 }
262
263 Collections.sort(entitiesToInsert, kediComparator);
264
265 for (KimEntityDefaultInfo entityToInsert : entitiesToInsert) {
266 getBusinessObjectService().save( new KimEntityDefaultInfoCacheImpl( entityToInsert ) );
267 }
268 return null;
269 }
270 });
271 } finally { // make sure our running flag is unset, otherwise we'll never run again
272 currentlySubmitted.compareAndSet(true, false);
273 }
274
275 return Boolean.TRUE;
276 }
277 }
278
279 /**
280 * A class encapsulating a {@link ConcurrentLinkedQueue} and an {@link AtomicInteger} to
281 * provide fast offer(enqueue)/poll(dequeue) and size checking. Size may be approximate due to concurrent
282 * activity, but for our purposes that is fine.
283 *
284 * @author Kuali Rice Team (rice.collab@kuali.org)
285 *
286 */
287 private static class WriteQueue {
288 AtomicInteger writeQueueSize = new AtomicInteger(0);
289 ConcurrentLinkedQueue<KimEntityDefaultInfo> queue = new ConcurrentLinkedQueue<KimEntityDefaultInfo>();
290
291 public int offerAndGetSize(KimEntityDefaultInfo entity) {
292 queue.add(entity);
293 return writeQueueSize.incrementAndGet();
294 }
295
296 private KimEntityDefaultInfo poll() {
297 KimEntityDefaultInfo result = queue.poll();
298 if (result != null) { writeQueueSize.decrementAndGet(); }
299 return result;
300 }
301
302 private int getSize() {
303 return writeQueueSize.get();
304 }
305 }
306
307 /**
308 * decorator for a callable to log a message before it is executed
309 *
310 * @author Kuali Rice Team (rice.collab@kuali.org)
311 *
312 */
313 private static class PreLogCallableWrapper<A> implements Callable<A> {
314
315 private final Callable inner;
316 private final Level level;
317 private final String message;
318
319 public PreLogCallableWrapper(Callable inner, Level level, String message) {
320 this.inner = inner;
321 this.level = level;
322 this.message = message;
323 }
324
325 /**
326 * logs the message then calls the inner Callable
327 *
328 * @see java.util.concurrent.Callable#call()
329 */
330 @SuppressWarnings("unchecked")
331 public A call() throws Exception {
332 LOG.log(level, message);
333 return (A)inner.call();
334 }
335 }
336
337 /**
338 * Adapts a Callable to be Runnable
339 *
340 * @author Kuali Rice Team (rice.collab@kuali.org)
341 *
342 */
343 private static class CallableAdapter implements Runnable {
344
345 private final Callable callable;
346
347 public CallableAdapter(Callable callable) {
348 this.callable = callable;
349 }
350
351 public void run() {
352 try {
353 callable.call();
354 } catch (Exception e) {
355 throw new RuntimeException(e);
356 }
357 }
358 }
359 }