View Javadoc

1   /**
2    * Copyright 2005-2013 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.kim.service.impl;
17  
18  import org.apache.commons.lang.StringUtils;
19  import org.apache.log4j.Level;
20  import org.apache.log4j.Logger;
21  import org.kuali.rice.core.api.config.property.ConfigurationService;
22  import org.kuali.rice.kim.api.KimConstants;
23  import org.kuali.rice.kim.impl.identity.IdentityArchiveService;
24  import org.kuali.rice.kim.api.identity.entity.EntityDefault;
25  import org.kuali.rice.kim.api.identity.principal.Principal;
26  import org.kuali.rice.kim.impl.identity.EntityDefaultInfoCacheBo;
27  import org.kuali.rice.krad.service.BusinessObjectService;
28  import org.kuali.rice.ksb.service.KSBServiceLocator;
29  import org.springframework.beans.factory.DisposableBean;
30  import org.springframework.beans.factory.InitializingBean;
31  import org.springframework.transaction.PlatformTransactionManager;
32  import org.springframework.transaction.TransactionStatus;
33  import org.springframework.transaction.support.TransactionCallback;
34  import org.springframework.transaction.support.TransactionTemplate;
35  
36  import java.util.ArrayList;
37  import java.util.Arrays;
38  import java.util.Collection;
39  import java.util.Collections;
40  import java.util.Comparator;
41  import java.util.HashMap;
42  import java.util.HashSet;
43  import java.util.List;
44  import java.util.Map;
45  import java.util.Set;
46  import java.util.concurrent.Callable;
47  import java.util.concurrent.ConcurrentLinkedQueue;
48  import java.util.concurrent.TimeUnit;
49  import java.util.concurrent.atomic.AtomicBoolean;
50  import java.util.concurrent.atomic.AtomicInteger;
51  
52  /**
53   * This is the default implementation for the IdentityArchiveService.
54   * @see IdentityArchiveService
55   *
56   * @author Kuali Rice Team (rice.collab@kuali.org)
57   */
58  public class IdentityArchiveServiceImpl implements IdentityArchiveService, InitializingBean, DisposableBean {
59  	private static final Logger LOG = Logger.getLogger( IdentityArchiveServiceImpl.class );
60  
61  	private BusinessObjectService businessObjectService;
62  	private ConfigurationService kualiConfigurationService;
63      private PlatformTransactionManager transactionManager;
64  
65  	private static final String EXEC_INTERVAL_SECS = "kim.identityArchiveServiceImpl.executionIntervalSeconds";
66  	private static final String MAX_WRITE_QUEUE_SIZE = "kim.identityArchiveServiceImpl.maxWriteQueueSize";
67  	private static final int EXECUTION_INTERVAL_SECONDS_DEFAULT = 600; // by default, flush the write queue this often
68  	private static final int MAX_WRITE_QUEUE_SIZE_DEFAULT = 300; // cache this many KEDI's before forcing write
69  
70  	private final WriteQueue writeQueue = new WriteQueue();
71  	private final EntityArchiveWriter writer = new EntityArchiveWriter();
72  
73  	// all this ceremony just decorates the writer so it logs a message first, and converts the Callable to Runnable
74  	private final Runnable maxQueueSizeExceededWriter =
75  		new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "max size exceeded, flushing write queue"));
76  
77  	// ditto
78  	private final Runnable scheduledWriter =
79  		new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "scheduled write out, flushing write queue"));
80  
81  	// ditto
82  	private final Runnable shutdownWriter =
83  		new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "rice is shutting down, flushing write queue"));
84  	
85  	private int getExecutionIntervalSeconds() {
86  		final String prop = kualiConfigurationService.getPropertyValueAsString(EXEC_INTERVAL_SECS);
87  		try {
88  			return Integer.valueOf(prop).intValue();
89  		} catch (NumberFormatException e) {
90  			return EXECUTION_INTERVAL_SECONDS_DEFAULT;
91  		}
92  	}
93  	
94  	private int getMaxWriteQueueSize() {
95  		final String prop = kualiConfigurationService.getPropertyValueAsString(MAX_WRITE_QUEUE_SIZE);
96  		try {
97  			return Integer.valueOf(prop).intValue();
98  		} catch (NumberFormatException e) {
99  			return MAX_WRITE_QUEUE_SIZE_DEFAULT;
100 		}
101 	}
102 
103 	@Override
104 	public EntityDefault getEntityDefaultFromArchive( String entityId ) {
105     	if (StringUtils.isBlank(entityId)) {
106             throw new IllegalArgumentException("entityId is blank");
107         }
108 
109         Map<String,String> criteria = new HashMap<String, String>(1);
110     	criteria.put(KimConstants.PrimaryKeyConstants.SUB_ENTITY_ID, entityId);
111     	EntityDefaultInfoCacheBo cachedValue = businessObjectService.findByPrimaryKey(EntityDefaultInfoCacheBo.class, criteria);
112     	return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
113     }
114 
115     @Override
116 	public EntityDefault getEntityDefaultFromArchiveByPrincipalId(String principalId) {
117     	if (StringUtils.isBlank(principalId)) {
118             throw new IllegalArgumentException("principalId is blank");
119         }
120 
121         Map<String,String> criteria = new HashMap<String, String>(1);
122     	criteria.put("principalId", principalId);
123     	EntityDefaultInfoCacheBo cachedValue = businessObjectService.findByPrimaryKey(EntityDefaultInfoCacheBo.class, criteria);
124     	return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
125     }
126 
127     @Override
128 	public EntityDefault getEntityDefaultFromArchiveByPrincipalName(String principalName) {
129     	if (StringUtils.isBlank(principalName)) {
130             throw new IllegalArgumentException("principalName is blank");
131         }
132 
133         Map<String,String> criteria = new HashMap<String, String>(1);
134     	criteria.put("principalName", principalName);
135     	Collection<EntityDefaultInfoCacheBo> entities = businessObjectService.findMatching(EntityDefaultInfoCacheBo.class, criteria);
136     	return (entities == null || entities.isEmpty()) ? null : entities.iterator().next().convertCacheToEntityDefaultInfo();
137     }
138     
139     @Override
140     public EntityDefault getEntityDefaultFromArchiveByEmployeeId(String employeeId) {
141         if (StringUtils.isBlank(employeeId)) {
142             throw new IllegalArgumentException("employeeId is blank");
143         }
144         Map<String,String> criteria = new HashMap<String, String>(1);
145         criteria.put("employeeId", employeeId);
146         Collection<EntityDefaultInfoCacheBo> entities = businessObjectService.findMatching(EntityDefaultInfoCacheBo.class, criteria);
147         return (entities == null || entities.isEmpty()) ? null : entities.iterator().next().convertCacheToEntityDefaultInfo();
148     }
149     
150     @Override
151 	public void saveEntityDefaultToArchive(EntityDefault entity) {
152     	if (entity == null) {
153             throw new IllegalArgumentException("entity is blank");
154         }
155 
156     	// if the max size has been reached, schedule now
157     	if (getMaxWriteQueueSize() <= writeQueue.offerAndGetSize(entity) /* <- this enqueues the KEDI */ &&
158     			writer.requestSubmit()) {
159     		KSBServiceLocator.getThreadPool().execute(maxQueueSizeExceededWriter);
160     	}
161     }
162 
163     @Override
164     public void flushToArchive() {
165         writer.call();
166     }
167 
168 
169 	public void setBusinessObjectService(BusinessObjectService businessObjectService) {
170 		this.businessObjectService = businessObjectService;
171 	}
172 
173 	public void setKualiConfigurationService(
174 			ConfigurationService kualiConfigurationService) {
175 		this.kualiConfigurationService = kualiConfigurationService;
176 	}
177 
178     public void setTransactionManager(PlatformTransactionManager txMgr) {
179         this.transactionManager = txMgr;
180     }
181     
182     /** schedule the writer on the KSB scheduled pool. */
183 	@Override
184 	public void afterPropertiesSet() throws Exception {
185 		LOG.info("scheduling writer...");
186 		KSBServiceLocator.getScheduledPool().scheduleAtFixedRate(scheduledWriter,
187 				getExecutionIntervalSeconds(), getExecutionIntervalSeconds(), TimeUnit.SECONDS);
188 	}
189 
190 	/** flush the write queue immediately. */
191 	@Override
192 	public void destroy() throws Exception {
193 		KSBServiceLocator.getThreadPool().execute(shutdownWriter);
194 	}
195 
196 	/**
197 	 * store the person to the database, but do this an alternate thread to
198 	 * prevent transaction issues since this service is non-transactional
199 	 *
200 	 * @author Kuali Rice Team (rice.collab@kuali.org)
201 	 *
202 	 */
203 	private class EntityArchiveWriter implements Callable {
204 
205 		// flag used to prevent multiple processes from being submitted at once
206 		AtomicBoolean currentlySubmitted = new AtomicBoolean(false);
207 
208 		private final Comparator<Comparable> nullSafeComparator = new Comparator<Comparable>() {
209 			@Override
210 			public int compare(Comparable i1, Comparable i2) {
211 				if (i1 != null && i2 != null) {
212 					return i1.compareTo(i2);
213 				} else if (i1 == null) {
214 					if (i2 == null) {
215 						return 0;
216 					} else {
217 						return -1;
218 					}
219 				} else { // if (entityId2 == null) {
220 					return 1;
221 				}
222 			};
223 		};
224 
225 		/**
226 		 * Comparator that attempts to impose a total ordering on EntityDefault instances
227 		 */
228 		private final Comparator<EntityDefault> kediComparator = new Comparator<EntityDefault>() {
229 			/**
230 			 * compares by entityId value
231 			 * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
232 			 */
233 			@Override
234 			public int compare(EntityDefault o1, EntityDefault o2) {
235 				String entityId1 = (o1 == null) ? null : o1.getEntityId();
236 				String entityId2 = (o2 == null) ? null : o2.getEntityId();
237 
238 				int result = nullSafeComparator.compare(entityId1, entityId2);
239 
240 				if (result == 0) {
241 					result = getPrincipalIdsString(o1).compareTo(getPrincipalIdsString(o2));
242 				}
243 
244 				return result;
245 			}
246 
247 			/**
248 			 * This method builds a newline delimited String containing the identity's principal IDs in sorted order
249 			 *
250 			 * @param entity
251 			 * @return
252 			 */
253 			private String getPrincipalIdsString(EntityDefault entity) {
254 				String result = "";
255 				if (entity != null) {
256 					List<Principal> principals = entity.getPrincipals();
257 					if (principals != null) {
258 						if (principals.size() == 1) { // one
259 							result = principals.get(0).getPrincipalId();
260 						} else { // multiple
261 							String [] ids = new String [principals.size()];
262 							int insertIndex = 0;
263 							for (Principal principal : principals) {
264 								ids[insertIndex++] = principal.getPrincipalId();
265 							}
266 							Arrays.sort(ids);
267 							result = StringUtils.join(ids, "\n");
268 						}
269 					}
270 				}
271 				return result;
272 			}
273 		};
274 
275 		public boolean requestSubmit() {
276 			return currentlySubmitted.compareAndSet(false, true);
277 		}
278 
279 		/**
280 		 * Call that tries to flush the write queue.
281 		 * @see Callable#call()
282 		 */
283 		@Override
284 		public Object call() {
285 			try {
286 				// the strategy is to grab chunks of entities, dedupe & sort them, and insert them in a big
287 				// batch to reduce transaction overhead.  Sorting is done so insertion order is guaranteed, which
288 				// prevents deadlocks between concurrent writers to the database.
289 				TransactionTemplate template = new TransactionTemplate(transactionManager);
290 				template.execute(new TransactionCallback() {
291 					@Override
292 					public Object doInTransaction(TransactionStatus status) {
293 						EntityDefault entity = null;
294 						ArrayList<EntityDefault> entitiesToInsert = new ArrayList<EntityDefault>(getMaxWriteQueueSize());
295 						Set<String> deduper = new HashSet<String>(getMaxWriteQueueSize());
296 
297 						// order is important in this conditional so that elements aren't dequeued and then ignored
298 						while (entitiesToInsert.size() < getMaxWriteQueueSize() && null != (entity = writeQueue.poll())) {
299 							if (deduper.add(entity.getEntityId())) {
300 								entitiesToInsert.add(entity);
301 							}
302 						}
303 
304 						Collections.sort(entitiesToInsert, kediComparator);
305                         List<EntityDefaultInfoCacheBo> entityCache = new ArrayList<EntityDefaultInfoCacheBo>(entitiesToInsert.size());
306                         for (EntityDefault entityToInsert : entitiesToInsert) {
307                             entityCache.add(new EntityDefaultInfoCacheBo( entityToInsert ));
308                         }
309                         businessObjectService.save(entityCache);
310 						//for (EntityDefault entityToInsert : entitiesToInsert) {
311 						//	businessObjectService.save( new EntityDefaultInfoCacheBo( entityToInsert ) );
312 						//}
313 						return null;
314 					}
315 				});
316 			} finally { // make sure our running flag is unset, otherwise we'll never run again
317 				currentlySubmitted.compareAndSet(true, false);
318 			}
319 
320 			return Boolean.TRUE;
321 		}
322 	}
323 
324 	/**
325 	 * A class encapsulating a {@link ConcurrentLinkedQueue} and an {@link AtomicInteger} to
326 	 * provide fast offer(enqueue)/poll(dequeue) and size checking.  Size may be approximate due to concurrent
327 	 * activity, but for our purposes that is fine.
328 	 *
329 	 * @author Kuali Rice Team (rice.collab@kuali.org)
330 	 *
331 	 */
332 	private static class WriteQueue {
333 		AtomicInteger writeQueueSize = new AtomicInteger(0);
334 		ConcurrentLinkedQueue<EntityDefault> queue = new ConcurrentLinkedQueue<EntityDefault>();
335 
336 		public int offerAndGetSize(EntityDefault entity) {
337 			queue.add(entity);
338 			return writeQueueSize.incrementAndGet();
339 		}
340 
341 		private EntityDefault poll() {
342 			EntityDefault result = queue.poll();
343 			if (result != null) { writeQueueSize.decrementAndGet(); }
344 			return result;
345 		}
346 	}
347 
348 	/**
349 	 * decorator for a callable to log a message before it is executed
350 	 *
351 	 * @author Kuali Rice Team (rice.collab@kuali.org)
352 	 *
353 	 */
354 	private static class PreLogCallableWrapper<A> implements Callable<A> {
355 		
356 		private final Callable inner;
357 		private final Level level;
358 		private final String message;
359 
360 		public PreLogCallableWrapper(Callable inner, Level level, String message) {
361 			this.inner = inner;
362 			this.level = level;
363 			this.message = message;
364 		}
365 
366 		/**
367 		 * logs the message then calls the inner Callable
368 		 * 
369 		 * @see java.util.concurrent.Callable#call()
370 		 */
371 		@Override
372 		@SuppressWarnings("unchecked")
373 		public A call() throws Exception {
374 			LOG.log(level, message);
375 			return (A)inner.call();
376 		}
377 	}
378 
379 	/**
380 	 * Adapts a Callable to be Runnable
381 	 *
382 	 * @author Kuali Rice Team (rice.collab@kuali.org)
383 	 *
384 	 */
385 	private static class CallableAdapter implements Runnable {
386 
387 		private final Callable callable;
388 
389 		public CallableAdapter(Callable callable) {
390 			this.callable = callable;
391 		}
392 
393 		@Override
394 		public void run() {
395 			try {
396 				callable.call();
397 			} catch (Exception e) {
398 				throw new RuntimeException(e);
399 			}
400 		}
401 	}
402 }