1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.kim.service.impl;
17
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Collections;
21 import java.util.Comparator;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Set;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.ConcurrentLinkedQueue;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.atomic.AtomicBoolean;
29 import java.util.concurrent.atomic.AtomicInteger;
30
31 import org.apache.commons.lang.StringUtils;
32 import org.apache.log4j.Level;
33 import org.apache.log4j.Logger;
34 import org.kuali.rice.core.api.config.property.ConfigurationService;
35 import org.kuali.rice.core.api.criteria.QueryByCriteria;
36 import org.kuali.rice.kim.api.KimConstants;
37 import org.kuali.rice.kim.api.identity.entity.EntityDefault;
38 import org.kuali.rice.kim.api.identity.principal.Principal;
39 import org.kuali.rice.kim.impl.identity.EntityDefaultInfoCacheBo;
40 import org.kuali.rice.kim.impl.identity.IdentityArchiveService;
41 import org.kuali.rice.krad.data.DataObjectService;
42 import org.kuali.rice.ksb.service.KSBServiceLocator;
43 import org.springframework.beans.factory.DisposableBean;
44 import org.springframework.beans.factory.InitializingBean;
45 import org.springframework.transaction.PlatformTransactionManager;
46 import org.springframework.transaction.TransactionStatus;
47 import org.springframework.transaction.support.TransactionCallback;
48 import org.springframework.transaction.support.TransactionTemplate;
49
50
51
52
53
54
55
56 public class IdentityArchiveServiceImpl implements IdentityArchiveService, InitializingBean, DisposableBean {
57 private static final Logger LOG = Logger.getLogger( IdentityArchiveServiceImpl.class );
58
59 protected DataObjectService dataObjectService;
60 protected ConfigurationService kualiConfigurationService;
61 protected PlatformTransactionManager transactionManager;
62
63 protected static final String EXEC_INTERVAL_SECS = "kim.identityArchiveServiceImpl.executionIntervalSeconds";
64 protected static final String MAX_WRITE_QUEUE_SIZE = "kim.identityArchiveServiceImpl.maxWriteQueueSize";
65 protected static final int EXECUTION_INTERVAL_SECONDS_DEFAULT = 600;
66 protected static final int MAX_WRITE_QUEUE_SIZE_DEFAULT = 300;
67
68 protected final WriteQueue writeQueue = new WriteQueue();
69 protected final EntityArchiveWriter writer = new EntityArchiveWriter();
70
71
72 protected final Runnable maxQueueSizeExceededWriter =
73 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "max size exceeded, flushing write queue"));
74
75
76 protected final Runnable scheduledWriter =
77 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "scheduled write out, flushing write queue"));
78
79
80 protected final Runnable shutdownWriter =
81 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "rice is shutting down, flushing write queue"));
82
83 protected int getExecutionIntervalSeconds() {
84 final String prop = kualiConfigurationService.getPropertyValueAsString(EXEC_INTERVAL_SECS);
85 try {
86 return Integer.valueOf(prop).intValue();
87 } catch (NumberFormatException e) {
88 return EXECUTION_INTERVAL_SECONDS_DEFAULT;
89 }
90 }
91
92 protected int getMaxWriteQueueSize() {
93 final String prop = kualiConfigurationService.getPropertyValueAsString(MAX_WRITE_QUEUE_SIZE);
94 try {
95 return Integer.valueOf(prop).intValue();
96 } catch (NumberFormatException e) {
97 return MAX_WRITE_QUEUE_SIZE_DEFAULT;
98 }
99 }
100
101 @Override
102 public EntityDefault getEntityDefaultFromArchive( String entityId ) {
103 if (StringUtils.isBlank(entityId)) {
104 throw new IllegalArgumentException("entityId is blank");
105 }
106
107 List<EntityDefaultInfoCacheBo> results = dataObjectService.findMatching(EntityDefaultInfoCacheBo.class,
108 QueryByCriteria.Builder.forAttribute(KimConstants.PrimaryKeyConstants.SUB_ENTITY_ID, entityId).build() ).getResults();
109 EntityDefaultInfoCacheBo cachedValue = null;
110 if ( !results.isEmpty() ) {
111 cachedValue = results.get(0);
112 }
113
114 return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
115 }
116
117 @Override
118 public EntityDefault getEntityDefaultFromArchiveByPrincipalId(String principalId) {
119 if (StringUtils.isBlank(principalId)) {
120 throw new IllegalArgumentException("principalId is blank");
121 }
122
123 EntityDefaultInfoCacheBo cachedValue = dataObjectService.find(EntityDefaultInfoCacheBo.class, principalId);
124 return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
125 }
126
127 @Override
128 public EntityDefault getEntityDefaultFromArchiveByPrincipalName(String principalName) {
129 if (StringUtils.isBlank(principalName)) {
130 throw new IllegalArgumentException("principalName is blank");
131 }
132
133 List<EntityDefaultInfoCacheBo> entities = dataObjectService.findMatching(EntityDefaultInfoCacheBo.class,
134 QueryByCriteria.Builder.forAttribute("principalName", principalName).build()).getResults();
135 return entities.isEmpty() ? null : entities.get(0).convertCacheToEntityDefaultInfo();
136 }
137
138 @Override
139 public EntityDefault getEntityDefaultFromArchiveByEmployeeId(String employeeId) {
140 if (StringUtils.isBlank(employeeId)) {
141 throw new IllegalArgumentException("employeeId is blank");
142 }
143 List<EntityDefaultInfoCacheBo> entities = dataObjectService.findMatching(EntityDefaultInfoCacheBo.class,
144 QueryByCriteria.Builder.forAttribute("employeeId", employeeId).build()).getResults();
145 return entities.isEmpty() ? null : entities.get(0).convertCacheToEntityDefaultInfo();
146 }
147
148 @Override
149 public void saveEntityDefaultToArchive(EntityDefault entity) {
150 if (entity == null) {
151 throw new IllegalArgumentException("entity is blank");
152 }
153
154
155 if (getMaxWriteQueueSize() <= writeQueue.offerAndGetSize(entity) &&
156 writer.requestSubmit()) {
157 KSBServiceLocator.getThreadPool().execute(maxQueueSizeExceededWriter);
158 }
159 }
160
161 @Override
162 public void flushToArchive() {
163 writer.call();
164 }
165
166 public void setKualiConfigurationService(
167 ConfigurationService kualiConfigurationService) {
168 this.kualiConfigurationService = kualiConfigurationService;
169 }
170
171 public void setTransactionManager(PlatformTransactionManager txMgr) {
172 this.transactionManager = txMgr;
173 }
174
175
176 @Override
177 public void afterPropertiesSet() throws Exception {
178 LOG.info("scheduling writer...");
179 KSBServiceLocator.getScheduledPool().scheduleAtFixedRate(scheduledWriter,
180 getExecutionIntervalSeconds(), getExecutionIntervalSeconds(), TimeUnit.SECONDS);
181 }
182
183
184 @Override
185 public void destroy() throws Exception {
186 shutdownWriter.run();
187 }
188
189
190
191
192
193
194
195
196 protected class EntityArchiveWriter implements Callable {
197
198
199 AtomicBoolean currentlySubmitted = new AtomicBoolean(false);
200
201 private final Comparator<Comparable> nullSafeComparator = new Comparator<Comparable>() {
202 @Override
203 public int compare(Comparable i1, Comparable i2) {
204 if (i1 != null && i2 != null) {
205 return i1.compareTo(i2);
206 } else if (i1 == null) {
207 if (i2 == null) {
208 return 0;
209 } else {
210 return -1;
211 }
212 } else {
213 return 1;
214 }
215 };
216 };
217
218
219
220
221 protected final Comparator<EntityDefault> kediComparator = new Comparator<EntityDefault>() {
222
223
224
225
226 @Override
227 public int compare(EntityDefault o1, EntityDefault o2) {
228 String entityId1 = (o1 == null) ? null : o1.getEntityId();
229 String entityId2 = (o2 == null) ? null : o2.getEntityId();
230
231 int result = nullSafeComparator.compare(entityId1, entityId2);
232
233 if (result == 0) {
234 result = getPrincipalIdsString(o1).compareTo(getPrincipalIdsString(o2));
235 }
236
237 return result;
238 }
239
240
241
242
243
244
245
246 private String getPrincipalIdsString(EntityDefault entity) {
247 String result = "";
248 if (entity != null) {
249 List<Principal> principals = entity.getPrincipals();
250 if (principals != null) {
251 if (principals.size() == 1) {
252 result = principals.get(0).getPrincipalId();
253 } else {
254 String [] ids = new String [principals.size()];
255 int insertIndex = 0;
256 for (Principal principal : principals) {
257 ids[insertIndex++] = principal.getPrincipalId();
258 }
259 Arrays.sort(ids);
260 result = StringUtils.join(ids, "\n");
261 }
262 }
263 }
264 return result;
265 }
266 };
267
268 public boolean requestSubmit() {
269 return currentlySubmitted.compareAndSet(false, true);
270 }
271
272
273
274
275
276 @Override
277 public Object call() {
278 try {
279
280
281
282 TransactionTemplate template = new TransactionTemplate(transactionManager);
283 template.execute(new TransactionCallback() {
284 @Override
285 public Object doInTransaction(TransactionStatus status) {
286 EntityDefault entity = null;
287 ArrayList<EntityDefault> entitiesToInsert = new ArrayList<EntityDefault>(getMaxWriteQueueSize());
288 Set<String> deduper = new HashSet<String>(getMaxWriteQueueSize());
289
290
291 while (entitiesToInsert.size() < getMaxWriteQueueSize() && null != (entity = writeQueue.poll())) {
292
293 if (entity.getPrincipals().size() > 0 && deduper.add(entity.getEntityId())) {
294 entitiesToInsert.add(entity);
295 }
296 }
297
298 Collections.sort(entitiesToInsert, kediComparator);
299 for (EntityDefault entityToInsert : entitiesToInsert) {
300 dataObjectService.save( new EntityDefaultInfoCacheBo( entityToInsert ) );
301 }
302 return null;
303 }
304 });
305 } finally {
306 currentlySubmitted.compareAndSet(true, false);
307 }
308
309 return Boolean.TRUE;
310 }
311 }
312
313
314
315
316
317
318
319
320
321 protected static class WriteQueue {
322 AtomicInteger writeQueueSize = new AtomicInteger(0);
323 ConcurrentLinkedQueue<EntityDefault> queue = new ConcurrentLinkedQueue<EntityDefault>();
324
325 public int offerAndGetSize(EntityDefault entity) {
326 queue.add(entity);
327 return writeQueueSize.incrementAndGet();
328 }
329
330 protected EntityDefault poll() {
331 EntityDefault result = queue.poll();
332 if (result != null) { writeQueueSize.decrementAndGet(); }
333 return result;
334 }
335 }
336
337
338
339
340
341
342
343 protected static class PreLogCallableWrapper<A> implements Callable<A> {
344
345 protected final Callable inner;
346 protected final Level level;
347 protected final String message;
348
349 public PreLogCallableWrapper(Callable inner, Level level, String message) {
350 this.inner = inner;
351 this.level = level;
352 this.message = message;
353 }
354
355
356
357
358
359
360 @Override
361 @SuppressWarnings("unchecked")
362 public A call() throws Exception {
363 LOG.log(level, message);
364 return (A)inner.call();
365 }
366 }
367
368
369
370
371
372
373
374 protected static class CallableAdapter implements Runnable {
375
376 private final Callable callable;
377
378 public CallableAdapter(Callable callable) {
379 this.callable = callable;
380 }
381
382 @Override
383 public void run() {
384 try {
385 callable.call();
386 } catch (Exception e) {
387 throw new RuntimeException(e);
388 }
389 }
390 }
391
392
393
394
395 public void setDataObjectService(DataObjectService dataObjectService) {
396 this.dataObjectService = dataObjectService;
397 }
398 }