View Javadoc
1   /**
2    * Copyright 2005-2016 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.kim.service.impl;
17  
18  import java.util.ArrayList;
19  import java.util.Arrays;
20  import java.util.Collections;
21  import java.util.Comparator;
22  import java.util.HashSet;
23  import java.util.List;
24  import java.util.Set;
25  import java.util.concurrent.Callable;
26  import java.util.concurrent.ConcurrentLinkedQueue;
27  import java.util.concurrent.TimeUnit;
28  import java.util.concurrent.atomic.AtomicBoolean;
29  import java.util.concurrent.atomic.AtomicInteger;
30  
31  import org.apache.commons.lang.StringUtils;
32  import org.apache.log4j.Level;
33  import org.apache.log4j.Logger;
34  import org.kuali.rice.core.api.config.property.ConfigurationService;
35  import org.kuali.rice.core.api.criteria.QueryByCriteria;
36  import org.kuali.rice.kim.api.KimConstants;
37  import org.kuali.rice.kim.api.identity.entity.EntityDefault;
38  import org.kuali.rice.kim.api.identity.principal.Principal;
39  import org.kuali.rice.kim.impl.identity.EntityDefaultInfoCacheBo;
40  import org.kuali.rice.kim.impl.identity.IdentityArchiveService;
41  import org.kuali.rice.krad.data.DataObjectService;
42  import org.kuali.rice.ksb.service.KSBServiceLocator;
43  import org.springframework.beans.factory.DisposableBean;
44  import org.springframework.beans.factory.InitializingBean;
45  import org.springframework.transaction.PlatformTransactionManager;
46  import org.springframework.transaction.TransactionStatus;
47  import org.springframework.transaction.support.TransactionCallback;
48  import org.springframework.transaction.support.TransactionTemplate;
49  
50  /**
51   * This is the default implementation for the IdentityArchiveService.
52   * @see IdentityArchiveService
53   *
54   * @author Kuali Rice Team (rice.collab@kuali.org)
55   */
56  public class IdentityArchiveServiceImpl implements IdentityArchiveService, InitializingBean, DisposableBean {
57  	private static final Logger LOG = Logger.getLogger( IdentityArchiveServiceImpl.class );
58  
59  	protected DataObjectService dataObjectService;
60  	protected ConfigurationService kualiConfigurationService;
61  	protected PlatformTransactionManager transactionManager;
62  
63  	protected static final String EXEC_INTERVAL_SECS = "kim.identityArchiveServiceImpl.executionIntervalSeconds";
64  	protected static final String MAX_WRITE_QUEUE_SIZE = "kim.identityArchiveServiceImpl.maxWriteQueueSize";
65  	protected static final int EXECUTION_INTERVAL_SECONDS_DEFAULT = 600; // by default, flush the write queue this often
66  	protected static final int MAX_WRITE_QUEUE_SIZE_DEFAULT = 300; // cache this many KEDI's before forcing write
67  
68  	protected final WriteQueue writeQueue = new WriteQueue();
69  	protected final EntityArchiveWriter writer = new EntityArchiveWriter();
70  
71  	// all this ceremony just decorates the writer so it logs a message first, and converts the Callable to Runnable
72  	protected final Runnable maxQueueSizeExceededWriter =
73  		new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "max size exceeded, flushing write queue"));
74  
75  	// ditto
76  	protected final Runnable scheduledWriter =
77  		new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "scheduled write out, flushing write queue"));
78  
79  	// ditto
80  	protected final Runnable shutdownWriter =
81  		new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "rice is shutting down, flushing write queue"));
82  
83  	protected int getExecutionIntervalSeconds() {
84  		final String prop = kualiConfigurationService.getPropertyValueAsString(EXEC_INTERVAL_SECS);
85  		try {
86  			return Integer.valueOf(prop).intValue();
87  		} catch (NumberFormatException e) {
88  			return EXECUTION_INTERVAL_SECONDS_DEFAULT;
89  		}
90  	}
91  
92  	protected int getMaxWriteQueueSize() {
93  		final String prop = kualiConfigurationService.getPropertyValueAsString(MAX_WRITE_QUEUE_SIZE);
94  		try {
95  			return Integer.valueOf(prop).intValue();
96  		} catch (NumberFormatException e) {
97  			return MAX_WRITE_QUEUE_SIZE_DEFAULT;
98  		}
99  	}
100 
101 	@Override
102 	public EntityDefault getEntityDefaultFromArchive( String entityId ) {
103     	if (StringUtils.isBlank(entityId)) {
104             throw new IllegalArgumentException("entityId is blank");
105         }
106 
107     	List<EntityDefaultInfoCacheBo> results = dataObjectService.findMatching(EntityDefaultInfoCacheBo.class,
108     	        QueryByCriteria.Builder.forAttribute(KimConstants.PrimaryKeyConstants.SUB_ENTITY_ID, entityId).build() ).getResults();
109         EntityDefaultInfoCacheBo cachedValue = null;
110         if ( !results.isEmpty() ) {
111             cachedValue = results.get(0);
112         }
113 
114     	return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
115     }
116 
117     @Override
118 	public EntityDefault getEntityDefaultFromArchiveByPrincipalId(String principalId) {
119     	if (StringUtils.isBlank(principalId)) {
120             throw new IllegalArgumentException("principalId is blank");
121         }
122 
123     	EntityDefaultInfoCacheBo cachedValue = dataObjectService.find(EntityDefaultInfoCacheBo.class, principalId);
124     	return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
125     }
126 
127     @Override
128 	public EntityDefault getEntityDefaultFromArchiveByPrincipalName(String principalName) {
129     	if (StringUtils.isBlank(principalName)) {
130             throw new IllegalArgumentException("principalName is blank");
131         }
132 
133     	List<EntityDefaultInfoCacheBo> entities = dataObjectService.findMatching(EntityDefaultInfoCacheBo.class,
134     	        QueryByCriteria.Builder.forAttribute("principalName", principalName).build()).getResults();
135     	return entities.isEmpty() ? null : entities.get(0).convertCacheToEntityDefaultInfo();
136     }
137 
138     @Override
139     public EntityDefault getEntityDefaultFromArchiveByEmployeeId(String employeeId) {
140         if (StringUtils.isBlank(employeeId)) {
141             throw new IllegalArgumentException("employeeId is blank");
142         }
143         List<EntityDefaultInfoCacheBo> entities = dataObjectService.findMatching(EntityDefaultInfoCacheBo.class,
144                 QueryByCriteria.Builder.forAttribute("employeeId", employeeId).build()).getResults();
145         return entities.isEmpty() ? null : entities.get(0).convertCacheToEntityDefaultInfo();
146     }
147 
148     @Override
149 	public void saveEntityDefaultToArchive(EntityDefault entity) {
150     	if (entity == null) {
151             throw new IllegalArgumentException("entity is blank");
152         }
153 
154     	// if the max size has been reached, schedule now
155     	if (getMaxWriteQueueSize() <= writeQueue.offerAndGetSize(entity) /* <- this enqueues the KEDI */ &&
156     			writer.requestSubmit()) {
157     		KSBServiceLocator.getThreadPool().execute(maxQueueSizeExceededWriter);
158     	}
159     }
160 
161     @Override
162     public void flushToArchive() {
163         writer.call();
164     }
165 
166 	public void setKualiConfigurationService(
167 			ConfigurationService kualiConfigurationService) {
168 		this.kualiConfigurationService = kualiConfigurationService;
169 	}
170 
171     public void setTransactionManager(PlatformTransactionManager txMgr) {
172         this.transactionManager = txMgr;
173     }
174 
175     /** schedule the writer on the KSB scheduled pool. */
176 	@Override
177 	public void afterPropertiesSet() throws Exception {
178 		LOG.info("scheduling writer...");
179 		KSBServiceLocator.getScheduledPool().scheduleAtFixedRate(scheduledWriter,
180 				getExecutionIntervalSeconds(), getExecutionIntervalSeconds(), TimeUnit.SECONDS);
181 	}
182 
183 	/** flush the write queue immediately. */
184 	@Override
185 	public void destroy() throws Exception {
186 		shutdownWriter.run();
187 	}
188 
189 	/**
190 	 * store the person to the database, but do this an alternate thread to
191 	 * prevent transaction issues since this service is non-transactional
192 	 *
193 	 * @author Kuali Rice Team (rice.collab@kuali.org)
194 	 *
195 	 */
196 	protected class EntityArchiveWriter implements Callable {
197 
198 		// flag used to prevent multiple processes from being submitted at once
199 		AtomicBoolean currentlySubmitted = new AtomicBoolean(false);
200 
201 		private final Comparator<Comparable> nullSafeComparator = new Comparator<Comparable>() {
202 			@Override
203 			public int compare(Comparable i1, Comparable i2) {
204 				if (i1 != null && i2 != null) {
205 					return i1.compareTo(i2);
206 				} else if (i1 == null) {
207 					if (i2 == null) {
208 						return 0;
209 					} else {
210 						return -1;
211 					}
212 				} else { // if (entityId2 == null) {
213 					return 1;
214 				}
215 			};
216 		};
217 
218 		/**
219 		 * Comparator that attempts to impose a total ordering on EntityDefault instances
220 		 */
221 		protected final Comparator<EntityDefault> kediComparator = new Comparator<EntityDefault>() {
222 			/**
223 			 * compares by entityId value
224 			 * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
225 			 */
226 			@Override
227 			public int compare(EntityDefault o1, EntityDefault o2) {
228 				String entityId1 = (o1 == null) ? null : o1.getEntityId();
229 				String entityId2 = (o2 == null) ? null : o2.getEntityId();
230 
231 				int result = nullSafeComparator.compare(entityId1, entityId2);
232 
233 				if (result == 0) {
234 					result = getPrincipalIdsString(o1).compareTo(getPrincipalIdsString(o2));
235 				}
236 
237 				return result;
238 			}
239 
240 			/**
241 			 * This method builds a newline delimited String containing the identity's principal IDs in sorted order
242 			 *
243 			 * @param entity
244 			 * @return
245 			 */
246 			private String getPrincipalIdsString(EntityDefault entity) {
247 				String result = "";
248 				if (entity != null) {
249 					List<Principal> principals = entity.getPrincipals();
250 					if (principals != null) {
251 						if (principals.size() == 1) { // one
252 							result = principals.get(0).getPrincipalId();
253 						} else { // multiple
254 							String [] ids = new String [principals.size()];
255 							int insertIndex = 0;
256 							for (Principal principal : principals) {
257 								ids[insertIndex++] = principal.getPrincipalId();
258 							}
259 							Arrays.sort(ids);
260 							result = StringUtils.join(ids, "\n");
261 						}
262 					}
263 				}
264 				return result;
265 			}
266 		};
267 
268 		public boolean requestSubmit() {
269 			return currentlySubmitted.compareAndSet(false, true);
270 		}
271 
272 		/**
273 		 * Call that tries to flush the write queue.
274 		 * @see Callable#call()
275 		 */
276 		@Override
277 		public Object call() {
278 			try {
279 				// the strategy is to grab chunks of entities, dedupe & sort them, and insert them in a big
280 				// batch to reduce transaction overhead.  Sorting is done so insertion order is guaranteed, which
281 				// prevents deadlocks between concurrent writers to the database.
282 				TransactionTemplate template = new TransactionTemplate(transactionManager);
283 				template.execute(new TransactionCallback() {
284 					@Override
285 					public Object doInTransaction(TransactionStatus status) {
286 						EntityDefault entity = null;
287 						ArrayList<EntityDefault> entitiesToInsert = new ArrayList<EntityDefault>(getMaxWriteQueueSize());
288 						Set<String> deduper = new HashSet<String>(getMaxWriteQueueSize());
289 
290 						// order is important in this conditional so that elements aren't dequeued and then ignored
291 						while (entitiesToInsert.size() < getMaxWriteQueueSize() && null != (entity = writeQueue.poll())) {
292                             //Added an if condition to check whether an entity has a principal
293                             if (entity.getPrincipals().size() > 0 && deduper.add(entity.getEntityId())) {
294 								entitiesToInsert.add(entity);
295 							}
296 						}
297 
298 						Collections.sort(entitiesToInsert, kediComparator);
299 						for (EntityDefault entityToInsert : entitiesToInsert) {
300 							dataObjectService.save( new EntityDefaultInfoCacheBo( entityToInsert ) );
301 						}
302 						return null;
303 					}
304 				});
305 			} finally { // make sure our running flag is unset, otherwise we'll never run again
306 				currentlySubmitted.compareAndSet(true, false);
307 			}
308 
309 			return Boolean.TRUE;
310 		}
311 	}
312 
313 	/**
314 	 * A class encapsulating a {@link ConcurrentLinkedQueue} and an {@link AtomicInteger} to
315 	 * provide fast offer(enqueue)/poll(dequeue) and size checking.  Size may be approximate due to concurrent
316 	 * activity, but for our purposes that is fine.
317 	 *
318 	 * @author Kuali Rice Team (rice.collab@kuali.org)
319 	 *
320 	 */
321 	protected static class WriteQueue {
322 		AtomicInteger writeQueueSize = new AtomicInteger(0);
323 		ConcurrentLinkedQueue<EntityDefault> queue = new ConcurrentLinkedQueue<EntityDefault>();
324 
325 		public int offerAndGetSize(EntityDefault entity) {
326 			queue.add(entity);
327 			return writeQueueSize.incrementAndGet();
328 		}
329 
330 		protected EntityDefault poll() {
331 			EntityDefault result = queue.poll();
332 			if (result != null) { writeQueueSize.decrementAndGet(); }
333 			return result;
334 		}
335 	}
336 
337 	/**
338 	 * decorator for a callable to log a message before it is executed
339 	 *
340 	 * @author Kuali Rice Team (rice.collab@kuali.org)
341 	 *
342 	 */
343 	protected static class PreLogCallableWrapper<A> implements Callable<A> {
344 
345 	    protected final Callable inner;
346 	    protected final Level level;
347 	    protected final String message;
348 
349 		public PreLogCallableWrapper(Callable inner, Level level, String message) {
350 			this.inner = inner;
351 			this.level = level;
352 			this.message = message;
353 		}
354 
355 		/**
356 		 * logs the message then calls the inner Callable
357 		 *
358 		 * @see java.util.concurrent.Callable#call()
359 		 */
360 		@Override
361 		@SuppressWarnings("unchecked")
362 		public A call() throws Exception {
363 			LOG.log(level, message);
364 			return (A)inner.call();
365 		}
366 	}
367 
368 	/**
369 	 * Adapts a Callable to be Runnable
370 	 *
371 	 * @author Kuali Rice Team (rice.collab@kuali.org)
372 	 *
373 	 */
374 	protected static class CallableAdapter implements Runnable {
375 
376 		private final Callable callable;
377 
378 		public CallableAdapter(Callable callable) {
379 			this.callable = callable;
380 		}
381 
382 		@Override
383 		public void run() {
384 			try {
385 				callable.call();
386 			} catch (Exception e) {
387 				throw new RuntimeException(e);
388 			}
389 		}
390 	}
391 
392     /**
393      * @param dataObjectService the dataObjectService to set
394      */
395     public void setDataObjectService(DataObjectService dataObjectService) {
396         this.dataObjectService = dataObjectService;
397     }
398 }