View Javadoc

1   /**
2    * Copyright 2005-2012 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.kim.service.impl;
17  
18  import org.apache.commons.lang.StringUtils;
19  import org.apache.log4j.Level;
20  import org.apache.log4j.Logger;
21  import org.kuali.rice.core.api.config.property.ConfigurationService;
22  import org.kuali.rice.kim.api.KimConstants;
23  import org.kuali.rice.kim.impl.identity.IdentityArchiveService;
24  import org.kuali.rice.kim.api.identity.entity.EntityDefault;
25  import org.kuali.rice.kim.api.identity.principal.Principal;
26  import org.kuali.rice.kim.impl.identity.EntityDefaultInfoCacheBo;
27  import org.kuali.rice.krad.service.BusinessObjectService;
28  import org.kuali.rice.krad.service.KRADServiceLocatorInternal;
29  import org.kuali.rice.ksb.service.KSBServiceLocator;
30  import org.springframework.beans.factory.DisposableBean;
31  import org.springframework.beans.factory.InitializingBean;
32  import org.springframework.transaction.PlatformTransactionManager;
33  import org.springframework.transaction.TransactionStatus;
34  import org.springframework.transaction.support.TransactionCallback;
35  import org.springframework.transaction.support.TransactionTemplate;
36  
37  import java.util.ArrayList;
38  import java.util.Arrays;
39  import java.util.Collection;
40  import java.util.Collections;
41  import java.util.Comparator;
42  import java.util.HashMap;
43  import java.util.HashSet;
44  import java.util.List;
45  import java.util.Map;
46  import java.util.Set;
47  import java.util.concurrent.Callable;
48  import java.util.concurrent.ConcurrentLinkedQueue;
49  import java.util.concurrent.TimeUnit;
50  import java.util.concurrent.atomic.AtomicBoolean;
51  import java.util.concurrent.atomic.AtomicInteger;
52  
53  /**
54   * This is the default implementation for the IdentityArchiveService.
55   * @see IdentityArchiveService
56   * @author Kuali Rice Team (rice.collab@kuali.org)
57   *
58   */
59  public class IdentityArchiveServiceImpl implements IdentityArchiveService, InitializingBean, DisposableBean {
60  	private static final Logger LOG = Logger.getLogger( IdentityArchiveServiceImpl.class );
61  
62  	private BusinessObjectService businessObjectService;
63  	private ConfigurationService kualiConfigurationService;
64      private PlatformTransactionManager transactionManager;
65  
66  	private static final String EXEC_INTERVAL_SECS = "kim.identityArchiveServiceImpl.executionIntervalSeconds";
67  	private static final String MAX_WRITE_QUEUE_SIZE = "kim.identityArchiveServiceImpl.maxWriteQueueSize";
68  	private static final int EXECUTION_INTERVAL_SECONDS_DEFAULT = 600; // by default, flush the write queue this often
69  	private static final int MAX_WRITE_QUEUE_SIZE_DEFAULT = 300; // cache this many KEDI's before forcing write
70  
71  	private final WriteQueue writeQueue = new WriteQueue();
72  	private final EntityArchiveWriter writer = new EntityArchiveWriter();
73  
74  	// all this ceremony just decorates the writer so it logs a message first, and converts the Callable to Runnable
75  	private final Runnable maxQueueSizeExceededWriter =
76  		new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "max size exceeded, flushing write queue"));
77  
78  	// ditto
79  	private final Runnable scheduledWriter =
80  		new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "scheduled write out, flushing write queue"));
81  
82  	// ditto
83  	private final Runnable shutdownWriter =
84  		new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "rice is shutting down, flushing write queue"));
85  	
86  	private int getExecutionIntervalSeconds() {
87  		final String prop = kualiConfigurationService.getPropertyValueAsString(EXEC_INTERVAL_SECS);
88  		try {
89  			return Integer.valueOf(prop).intValue();
90  		} catch (NumberFormatException e) {
91  			return EXECUTION_INTERVAL_SECONDS_DEFAULT;
92  		}
93  	}
94  	
95  	private int getMaxWriteQueueSize() {
96  		final String prop = kualiConfigurationService.getPropertyValueAsString(MAX_WRITE_QUEUE_SIZE);
97  		try {
98  			return Integer.valueOf(prop).intValue();
99  		} catch (NumberFormatException e) {
100 			return MAX_WRITE_QUEUE_SIZE_DEFAULT;
101 		}
102 	}
103 
104 	@Override
105 	public EntityDefault getEntityDefaultFromArchive( String entityId ) {
106     	if (StringUtils.isBlank(entityId)) {
107             throw new IllegalArgumentException("entityId is blank");
108         }
109 
110         Map<String,String> criteria = new HashMap<String, String>(1);
111     	criteria.put(KimConstants.PrimaryKeyConstants.SUB_ENTITY_ID, entityId);
112     	EntityDefaultInfoCacheBo cachedValue = businessObjectService.findByPrimaryKey(EntityDefaultInfoCacheBo.class, criteria);
113     	return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
114     }
115 
116     @Override
117 	public EntityDefault getEntityDefaultFromArchiveByPrincipalId(String principalId) {
118     	if (StringUtils.isBlank(principalId)) {
119             throw new IllegalArgumentException("principalId is blank");
120         }
121 
122         Map<String,String> criteria = new HashMap<String, String>(1);
123     	criteria.put("principalId", principalId);
124     	EntityDefaultInfoCacheBo cachedValue = businessObjectService.findByPrimaryKey(EntityDefaultInfoCacheBo.class, criteria);
125     	return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
126     }
127 
128     @Override
129 	public EntityDefault getEntityDefaultFromArchiveByPrincipalName(String principalName) {
130     	if (StringUtils.isBlank(principalName)) {
131             throw new IllegalArgumentException("principalName is blank");
132         }
133 
134         Map<String,String> criteria = new HashMap<String, String>(1);
135     	criteria.put("principalName", principalName);
136     	Collection<EntityDefaultInfoCacheBo> entities = businessObjectService.findMatching(EntityDefaultInfoCacheBo.class, criteria);
137     	return (entities == null || entities.isEmpty()) ? null : entities.iterator().next().convertCacheToEntityDefaultInfo();
138     }
139     
140     @Override
141     public EntityDefault getEntityDefaultFromArchiveByEmployeeId(String employeeId) {
142         if (StringUtils.isBlank(employeeId)) {
143             throw new IllegalArgumentException("employeeId is blank");
144         }
145         Map<String,String> criteria = new HashMap<String, String>(1);
146         criteria.put("employeeId", employeeId);
147         Collection<EntityDefaultInfoCacheBo> entities = businessObjectService.findMatching(EntityDefaultInfoCacheBo.class, criteria);
148         return (entities == null || entities.isEmpty()) ? null : entities.iterator().next().convertCacheToEntityDefaultInfo();
149     }
150     
151     @Override
152 	public void saveEntityDefaultToArchive(EntityDefault entity) {
153     	if (entity == null) {
154             throw new IllegalArgumentException("entity is blank");
155         }
156 
157     	// if the max size has been reached, schedule now
158     	if (getMaxWriteQueueSize() <= writeQueue.offerAndGetSize(entity) /* <- this enqueues the KEDI */ &&
159     			writer.requestSubmit()) {
160     		KSBServiceLocator.getThreadPool().execute(maxQueueSizeExceededWriter);
161     	}
162     }
163 
164     @Override
165     public void flushToArchive() {
166         writer.call();
167     }
168 
169 
170 	public void setBusinessObjectService(BusinessObjectService businessObjectService) {
171 		this.businessObjectService = businessObjectService;
172 	}
173 
174 	public void setKualiConfigurationService(
175 			ConfigurationService kualiConfigurationService) {
176 		this.kualiConfigurationService = kualiConfigurationService;
177 	}
178 
179     public void setTransactionManager(PlatformTransactionManager txMgr) {
180         this.transactionManager = txMgr;
181     }
182     
183     /** schedule the writer on the KSB scheduled pool. */
184 	@Override
185 	public void afterPropertiesSet() throws Exception {
186 		LOG.info("scheduling writer...");
187 		KSBServiceLocator.getScheduledPool().scheduleAtFixedRate(scheduledWriter,
188 				getExecutionIntervalSeconds(), getExecutionIntervalSeconds(), TimeUnit.SECONDS);
189 	}
190 
191 	/** flush the write queue immediately. */
192 	@Override
193 	public void destroy() throws Exception {
194 		KSBServiceLocator.getThreadPool().execute(shutdownWriter);
195 	}
196 
197 	/**
198 	 * store the person to the database, but do this an alternate thread to
199 	 * prevent transaction issues since this service is non-transactional
200 	 *
201 	 * @author Kuali Rice Team (rice.collab@kuali.org)
202 	 *
203 	 */
204 	private class EntityArchiveWriter implements Callable {
205 
206 		// flag used to prevent multiple processes from being submitted at once
207 		AtomicBoolean currentlySubmitted = new AtomicBoolean(false);
208 
209 		private final Comparator<Comparable> nullSafeComparator = new Comparator<Comparable>() {
210 			@Override
211 			public int compare(Comparable i1, Comparable i2) {
212 				if (i1 != null && i2 != null) {
213 					return i1.compareTo(i2);
214 				} else if (i1 == null) {
215 					if (i2 == null) {
216 						return 0;
217 					} else {
218 						return -1;
219 					}
220 				} else { // if (entityId2 == null) {
221 					return 1;
222 				}
223 			};
224 		};
225 
226 		/**
227 		 * Comparator that attempts to impose a total ordering on EntityDefault instances
228 		 */
229 		private final Comparator<EntityDefault> kediComparator = new Comparator<EntityDefault>() {
230 			/**
231 			 * compares by entityId value
232 			 * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
233 			 */
234 			@Override
235 			public int compare(EntityDefault o1, EntityDefault o2) {
236 				String entityId1 = (o1 == null) ? null : o1.getEntityId();
237 				String entityId2 = (o2 == null) ? null : o2.getEntityId();
238 
239 				int result = nullSafeComparator.compare(entityId1, entityId2);
240 
241 				if (result == 0) {
242 					result = getPrincipalIdsString(o1).compareTo(getPrincipalIdsString(o2));
243 				}
244 
245 				return result;
246 			}
247 
248 			/**
249 			 * This method builds a newline delimited String containing the identity's principal IDs in sorted order
250 			 *
251 			 * @param entity
252 			 * @return
253 			 */
254 			private String getPrincipalIdsString(EntityDefault entity) {
255 				String result = "";
256 				if (entity != null) {
257 					List<Principal> principals = entity.getPrincipals();
258 					if (principals != null) {
259 						if (principals.size() == 1) { // one
260 							result = principals.get(0).getPrincipalId();
261 						} else { // multiple
262 							String [] ids = new String [principals.size()];
263 							int insertIndex = 0;
264 							for (Principal principal : principals) {
265 								ids[insertIndex++] = principal.getPrincipalId();
266 							}
267 							Arrays.sort(ids);
268 							result = StringUtils.join(ids, "\n");
269 						}
270 					}
271 				}
272 				return result;
273 			}
274 		};
275 
276 		public boolean requestSubmit() {
277 			return currentlySubmitted.compareAndSet(false, true);
278 		}
279 
280 		/**
281 		 * Call that tries to flush the write queue.
282 		 * @see Callable#call()
283 		 */
284 		@Override
285 		public Object call() {
286 			try {
287 				// the strategy is to grab chunks of entities, dedupe & sort them, and insert them in a big
288 				// batch to reduce transaction overhead.  Sorting is done so insertion order is guaranteed, which
289 				// prevents deadlocks between concurrent writers to the database.
290 				TransactionTemplate template = new TransactionTemplate(transactionManager);
291 				template.execute(new TransactionCallback() {
292 					@Override
293 					public Object doInTransaction(TransactionStatus status) {
294 						EntityDefault entity = null;
295 						ArrayList<EntityDefault> entitiesToInsert = new ArrayList<EntityDefault>(getMaxWriteQueueSize());
296 						Set<String> deduper = new HashSet<String>(getMaxWriteQueueSize());
297 
298 						// order is important in this conditional so that elements aren't dequeued and then ignored
299 						while (entitiesToInsert.size() < getMaxWriteQueueSize() && null != (entity = writeQueue.poll())) {
300 							if (deduper.add(entity.getEntityId())) {
301 								entitiesToInsert.add(entity);
302 							}
303 						}
304 
305 						Collections.sort(entitiesToInsert, kediComparator);
306                         List<EntityDefaultInfoCacheBo> entityCache = new ArrayList<EntityDefaultInfoCacheBo>(entitiesToInsert.size());
307                         for (EntityDefault entityToInsert : entitiesToInsert) {
308                             entityCache.add(new EntityDefaultInfoCacheBo( entityToInsert ));
309                         }
310                         businessObjectService.save(entityCache);
311 						//for (EntityDefault entityToInsert : entitiesToInsert) {
312 						//	businessObjectService.save( new EntityDefaultInfoCacheBo( entityToInsert ) );
313 						//}
314 						return null;
315 					}
316 				});
317 			} finally { // make sure our running flag is unset, otherwise we'll never run again
318 				currentlySubmitted.compareAndSet(true, false);
319 			}
320 
321 			return Boolean.TRUE;
322 		}
323 	}
324 
325 	/**
326 	 * A class encapsulating a {@link ConcurrentLinkedQueue} and an {@link AtomicInteger} to
327 	 * provide fast offer(enqueue)/poll(dequeue) and size checking.  Size may be approximate due to concurrent
328 	 * activity, but for our purposes that is fine.
329 	 *
330 	 * @author Kuali Rice Team (rice.collab@kuali.org)
331 	 *
332 	 */
333 	private static class WriteQueue {
334 		AtomicInteger writeQueueSize = new AtomicInteger(0);
335 		ConcurrentLinkedQueue<EntityDefault> queue = new ConcurrentLinkedQueue<EntityDefault>();
336 
337 		public int offerAndGetSize(EntityDefault entity) {
338 			queue.add(entity);
339 			return writeQueueSize.incrementAndGet();
340 		}
341 
342 		private EntityDefault poll() {
343 			EntityDefault result = queue.poll();
344 			if (result != null) { writeQueueSize.decrementAndGet(); }
345 			return result;
346 		}
347 	}
348 
349 	/**
350 	 * decorator for a callable to log a message before it is executed
351 	 *
352 	 * @author Kuali Rice Team (rice.collab@kuali.org)
353 	 *
354 	 */
355 	private static class PreLogCallableWrapper<A> implements Callable<A> {
356 		
357 		private final Callable inner;
358 		private final Level level;
359 		private final String message;
360 
361 		public PreLogCallableWrapper(Callable inner, Level level, String message) {
362 			this.inner = inner;
363 			this.level = level;
364 			this.message = message;
365 		}
366 
367 		/**
368 		 * logs the message then calls the inner Callable
369 		 * 
370 		 * @see java.util.concurrent.Callable#call()
371 		 */
372 		@Override
373 		@SuppressWarnings("unchecked")
374 		public A call() throws Exception {
375 			LOG.log(level, message);
376 			return (A)inner.call();
377 		}
378 	}
379 
380 	/**
381 	 * Adapts a Callable to be Runnable
382 	 *
383 	 * @author Kuali Rice Team (rice.collab@kuali.org)
384 	 *
385 	 */
386 	private static class CallableAdapter implements Runnable {
387 
388 		private final Callable callable;
389 
390 		public CallableAdapter(Callable callable) {
391 			this.callable = callable;
392 		}
393 
394 		@Override
395 		public void run() {
396 			try {
397 				callable.call();
398 			} catch (Exception e) {
399 				throw new RuntimeException(e);
400 			}
401 		}
402 	}
403 }