1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.kim.service.impl;
17
18 import org.apache.commons.lang.StringUtils;
19 import org.apache.log4j.Level;
20 import org.apache.log4j.Logger;
21 import org.kuali.rice.core.api.config.property.ConfigurationService;
22 import org.kuali.rice.kim.api.KimConstants;
23 import org.kuali.rice.kim.impl.identity.IdentityArchiveService;
24 import org.kuali.rice.kim.api.identity.entity.EntityDefault;
25 import org.kuali.rice.kim.api.identity.principal.Principal;
26 import org.kuali.rice.kim.impl.identity.EntityDefaultInfoCacheBo;
27 import org.kuali.rice.krad.service.BusinessObjectService;
28 import org.kuali.rice.ksb.service.KSBServiceLocator;
29 import org.springframework.beans.factory.DisposableBean;
30 import org.springframework.beans.factory.InitializingBean;
31 import org.springframework.transaction.PlatformTransactionManager;
32 import org.springframework.transaction.TransactionStatus;
33 import org.springframework.transaction.support.TransactionCallback;
34 import org.springframework.transaction.support.TransactionTemplate;
35
36 import java.util.ArrayList;
37 import java.util.Arrays;
38 import java.util.Collection;
39 import java.util.Collections;
40 import java.util.Comparator;
41 import java.util.HashMap;
42 import java.util.HashSet;
43 import java.util.List;
44 import java.util.Map;
45 import java.util.Set;
46 import java.util.concurrent.Callable;
47 import java.util.concurrent.ConcurrentLinkedQueue;
48 import java.util.concurrent.TimeUnit;
49 import java.util.concurrent.atomic.AtomicBoolean;
50 import java.util.concurrent.atomic.AtomicInteger;
51
52
53
54
55
56
57
58 public class IdentityArchiveServiceImpl implements IdentityArchiveService, InitializingBean, DisposableBean {
59 private static final Logger LOG = Logger.getLogger( IdentityArchiveServiceImpl.class );
60
61 private BusinessObjectService businessObjectService;
62 private ConfigurationService kualiConfigurationService;
63 private PlatformTransactionManager transactionManager;
64
65 private static final String EXEC_INTERVAL_SECS = "kim.identityArchiveServiceImpl.executionIntervalSeconds";
66 private static final String MAX_WRITE_QUEUE_SIZE = "kim.identityArchiveServiceImpl.maxWriteQueueSize";
67 private static final int EXECUTION_INTERVAL_SECONDS_DEFAULT = 600;
68 private static final int MAX_WRITE_QUEUE_SIZE_DEFAULT = 300;
69
70 private final WriteQueue writeQueue = new WriteQueue();
71 private final EntityArchiveWriter writer = new EntityArchiveWriter();
72
73
74 private final Runnable maxQueueSizeExceededWriter =
75 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "max size exceeded, flushing write queue"));
76
77
78 private final Runnable scheduledWriter =
79 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "scheduled write out, flushing write queue"));
80
81
82 private final Runnable shutdownWriter =
83 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "rice is shutting down, flushing write queue"));
84
85 private int getExecutionIntervalSeconds() {
86 final String prop = kualiConfigurationService.getPropertyValueAsString(EXEC_INTERVAL_SECS);
87 try {
88 return Integer.valueOf(prop).intValue();
89 } catch (NumberFormatException e) {
90 return EXECUTION_INTERVAL_SECONDS_DEFAULT;
91 }
92 }
93
94 private int getMaxWriteQueueSize() {
95 final String prop = kualiConfigurationService.getPropertyValueAsString(MAX_WRITE_QUEUE_SIZE);
96 try {
97 return Integer.valueOf(prop).intValue();
98 } catch (NumberFormatException e) {
99 return MAX_WRITE_QUEUE_SIZE_DEFAULT;
100 }
101 }
102
103 @Override
104 public EntityDefault getEntityDefaultFromArchive( String entityId ) {
105 if (StringUtils.isBlank(entityId)) {
106 throw new IllegalArgumentException("entityId is blank");
107 }
108
109 Map<String,String> criteria = new HashMap<String, String>(1);
110 criteria.put(KimConstants.PrimaryKeyConstants.SUB_ENTITY_ID, entityId);
111 EntityDefaultInfoCacheBo cachedValue = businessObjectService.findByPrimaryKey(EntityDefaultInfoCacheBo.class, criteria);
112 return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
113 }
114
115 @Override
116 public EntityDefault getEntityDefaultFromArchiveByPrincipalId(String principalId) {
117 if (StringUtils.isBlank(principalId)) {
118 throw new IllegalArgumentException("principalId is blank");
119 }
120
121 Map<String,String> criteria = new HashMap<String, String>(1);
122 criteria.put("principalId", principalId);
123 EntityDefaultInfoCacheBo cachedValue = businessObjectService.findByPrimaryKey(EntityDefaultInfoCacheBo.class, criteria);
124 return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
125 }
126
127 @Override
128 public EntityDefault getEntityDefaultFromArchiveByPrincipalName(String principalName) {
129 if (StringUtils.isBlank(principalName)) {
130 throw new IllegalArgumentException("principalName is blank");
131 }
132
133 Map<String,String> criteria = new HashMap<String, String>(1);
134 criteria.put("principalName", principalName);
135 Collection<EntityDefaultInfoCacheBo> entities = businessObjectService.findMatching(EntityDefaultInfoCacheBo.class, criteria);
136 return (entities == null || entities.isEmpty()) ? null : entities.iterator().next().convertCacheToEntityDefaultInfo();
137 }
138
139 @Override
140 public EntityDefault getEntityDefaultFromArchiveByEmployeeId(String employeeId) {
141 if (StringUtils.isBlank(employeeId)) {
142 throw new IllegalArgumentException("employeeId is blank");
143 }
144 Map<String,String> criteria = new HashMap<String, String>(1);
145 criteria.put("employeeId", employeeId);
146 Collection<EntityDefaultInfoCacheBo> entities = businessObjectService.findMatching(EntityDefaultInfoCacheBo.class, criteria);
147 return (entities == null || entities.isEmpty()) ? null : entities.iterator().next().convertCacheToEntityDefaultInfo();
148 }
149
150 @Override
151 public void saveEntityDefaultToArchive(EntityDefault entity) {
152 if (entity == null) {
153 throw new IllegalArgumentException("entity is blank");
154 }
155
156
157 if (getMaxWriteQueueSize() <= writeQueue.offerAndGetSize(entity)
158 writer.requestSubmit()) {
159 KSBServiceLocator.getThreadPool().execute(maxQueueSizeExceededWriter);
160 }
161 }
162
163 @Override
164 public void flushToArchive() {
165 writer.call();
166 }
167
168
169 public void setBusinessObjectService(BusinessObjectService businessObjectService) {
170 this.businessObjectService = businessObjectService;
171 }
172
173 public void setKualiConfigurationService(
174 ConfigurationService kualiConfigurationService) {
175 this.kualiConfigurationService = kualiConfigurationService;
176 }
177
178 public void setTransactionManager(PlatformTransactionManager txMgr) {
179 this.transactionManager = txMgr;
180 }
181
182
183 @Override
184 public void afterPropertiesSet() throws Exception {
185 LOG.info("scheduling writer...");
186 KSBServiceLocator.getScheduledPool().scheduleAtFixedRate(scheduledWriter,
187 getExecutionIntervalSeconds(), getExecutionIntervalSeconds(), TimeUnit.SECONDS);
188 }
189
190
191 @Override
192 public void destroy() throws Exception {
193 KSBServiceLocator.getThreadPool().execute(shutdownWriter);
194 }
195
196
197
198
199
200
201
202
203 private class EntityArchiveWriter implements Callable {
204
205
206 AtomicBoolean currentlySubmitted = new AtomicBoolean(false);
207
208 private final Comparator<Comparable> nullSafeComparator = new Comparator<Comparable>() {
209 @Override
210 public int compare(Comparable i1, Comparable i2) {
211 if (i1 != null && i2 != null) {
212 return i1.compareTo(i2);
213 } else if (i1 == null) {
214 if (i2 == null) {
215 return 0;
216 } else {
217 return -1;
218 }
219 } else {
220 return 1;
221 }
222 };
223 };
224
225
226
227
228 private final Comparator<EntityDefault> kediComparator = new Comparator<EntityDefault>() {
229
230
231
232
233 @Override
234 public int compare(EntityDefault o1, EntityDefault o2) {
235 String entityId1 = (o1 == null) ? null : o1.getEntityId();
236 String entityId2 = (o2 == null) ? null : o2.getEntityId();
237
238 int result = nullSafeComparator.compare(entityId1, entityId2);
239
240 if (result == 0) {
241 result = getPrincipalIdsString(o1).compareTo(getPrincipalIdsString(o2));
242 }
243
244 return result;
245 }
246
247
248
249
250
251
252
253 private String getPrincipalIdsString(EntityDefault entity) {
254 String result = "";
255 if (entity != null) {
256 List<Principal> principals = entity.getPrincipals();
257 if (principals != null) {
258 if (principals.size() == 1) {
259 result = principals.get(0).getPrincipalId();
260 } else {
261 String [] ids = new String [principals.size()];
262 int insertIndex = 0;
263 for (Principal principal : principals) {
264 ids[insertIndex++] = principal.getPrincipalId();
265 }
266 Arrays.sort(ids);
267 result = StringUtils.join(ids, "\n");
268 }
269 }
270 }
271 return result;
272 }
273 };
274
275 public boolean requestSubmit() {
276 return currentlySubmitted.compareAndSet(false, true);
277 }
278
279
280
281
282
283 @Override
284 public Object call() {
285 try {
286
287
288
289 TransactionTemplate template = new TransactionTemplate(transactionManager);
290 template.execute(new TransactionCallback() {
291 @Override
292 public Object doInTransaction(TransactionStatus status) {
293 EntityDefault entity = null;
294 ArrayList<EntityDefault> entitiesToInsert = new ArrayList<EntityDefault>(getMaxWriteQueueSize());
295 Set<String> deduper = new HashSet<String>(getMaxWriteQueueSize());
296
297
298 while (entitiesToInsert.size() < getMaxWriteQueueSize() && null != (entity = writeQueue.poll())) {
299 if (deduper.add(entity.getEntityId())) {
300 entitiesToInsert.add(entity);
301 }
302 }
303
304 Collections.sort(entitiesToInsert, kediComparator);
305 List<EntityDefaultInfoCacheBo> entityCache = new ArrayList<EntityDefaultInfoCacheBo>(entitiesToInsert.size());
306 for (EntityDefault entityToInsert : entitiesToInsert) {
307 entityCache.add(new EntityDefaultInfoCacheBo( entityToInsert ));
308 }
309 businessObjectService.save(entityCache);
310
311
312
313 return null;
314 }
315 });
316 } finally {
317 currentlySubmitted.compareAndSet(true, false);
318 }
319
320 return Boolean.TRUE;
321 }
322 }
323
324
325
326
327
328
329
330
331
332 private static class WriteQueue {
333 AtomicInteger writeQueueSize = new AtomicInteger(0);
334 ConcurrentLinkedQueue<EntityDefault> queue = new ConcurrentLinkedQueue<EntityDefault>();
335
336 public int offerAndGetSize(EntityDefault entity) {
337 queue.add(entity);
338 return writeQueueSize.incrementAndGet();
339 }
340
341 private EntityDefault poll() {
342 EntityDefault result = queue.poll();
343 if (result != null) { writeQueueSize.decrementAndGet(); }
344 return result;
345 }
346 }
347
348
349
350
351
352
353
354 private static class PreLogCallableWrapper<A> implements Callable<A> {
355
356 private final Callable inner;
357 private final Level level;
358 private final String message;
359
360 public PreLogCallableWrapper(Callable inner, Level level, String message) {
361 this.inner = inner;
362 this.level = level;
363 this.message = message;
364 }
365
366
367
368
369
370
371 @Override
372 @SuppressWarnings("unchecked")
373 public A call() throws Exception {
374 LOG.log(level, message);
375 return (A)inner.call();
376 }
377 }
378
379
380
381
382
383
384
385 private static class CallableAdapter implements Runnable {
386
387 private final Callable callable;
388
389 public CallableAdapter(Callable callable) {
390 this.callable = callable;
391 }
392
393 @Override
394 public void run() {
395 try {
396 callable.call();
397 } catch (Exception e) {
398 throw new RuntimeException(e);
399 }
400 }
401 }
402 }