1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.kim.service.impl;
17
18 import org.apache.commons.lang.StringUtils;
19 import org.apache.log4j.Level;
20 import org.apache.log4j.Logger;
21 import org.kuali.rice.core.api.config.property.ConfigurationService;
22 import org.kuali.rice.kim.api.KimConstants;
23 import org.kuali.rice.kim.impl.identity.IdentityArchiveService;
24 import org.kuali.rice.kim.api.identity.entity.EntityDefault;
25 import org.kuali.rice.kim.api.identity.principal.Principal;
26 import org.kuali.rice.kim.impl.identity.EntityDefaultInfoCacheBo;
27 import org.kuali.rice.krad.service.BusinessObjectService;
28 import org.kuali.rice.krad.service.KRADServiceLocatorInternal;
29 import org.kuali.rice.ksb.service.KSBServiceLocator;
30 import org.springframework.beans.factory.DisposableBean;
31 import org.springframework.beans.factory.InitializingBean;
32 import org.springframework.transaction.PlatformTransactionManager;
33 import org.springframework.transaction.TransactionStatus;
34 import org.springframework.transaction.support.TransactionCallback;
35 import org.springframework.transaction.support.TransactionTemplate;
36
37 import java.util.ArrayList;
38 import java.util.Arrays;
39 import java.util.Collection;
40 import java.util.Collections;
41 import java.util.Comparator;
42 import java.util.HashMap;
43 import java.util.HashSet;
44 import java.util.List;
45 import java.util.Map;
46 import java.util.Set;
47 import java.util.concurrent.Callable;
48 import java.util.concurrent.ConcurrentLinkedQueue;
49 import java.util.concurrent.TimeUnit;
50 import java.util.concurrent.atomic.AtomicBoolean;
51 import java.util.concurrent.atomic.AtomicInteger;
52
53
54
55
56
57
58
59 public class IdentityArchiveServiceImpl implements IdentityArchiveService, InitializingBean, DisposableBean {
60 private static final Logger LOG = Logger.getLogger( IdentityArchiveServiceImpl.class );
61
62 private BusinessObjectService businessObjectService;
63 private ConfigurationService kualiConfigurationService;
64
65 private static final String EXEC_INTERVAL_SECS = "kim.identityArchiveServiceImpl.executionIntervalSeconds";
66 private static final String MAX_WRITE_QUEUE_SIZE = "kim.identityArchiveServiceImpl.maxWriteQueueSize";
67 private static final int EXECUTION_INTERVAL_SECONDS_DEFAULT = 600;
68 private static final int MAX_WRITE_QUEUE_SIZE_DEFAULT = 300;
69
70 private final WriteQueue writeQueue = new WriteQueue();
71 private final EntityArchiveWriter writer = new EntityArchiveWriter();
72
73
74 private final Runnable maxQueueSizeExceededWriter =
75 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "max size exceeded, flushing write queue"));
76
77
78 private final Runnable scheduledWriter =
79 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "scheduled write out, flushing write queue"));
80
81
82 private final Runnable shutdownWriter =
83 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "rice is shutting down, flushing write queue"));
84
85 private int getExecutionIntervalSeconds() {
86 final String prop = kualiConfigurationService.getPropertyValueAsString(EXEC_INTERVAL_SECS);
87 try {
88 return Integer.valueOf(prop).intValue();
89 } catch (NumberFormatException e) {
90 return EXECUTION_INTERVAL_SECONDS_DEFAULT;
91 }
92 }
93
94 private int getMaxWriteQueueSize() {
95 final String prop = kualiConfigurationService.getPropertyValueAsString(MAX_WRITE_QUEUE_SIZE);
96 try {
97 return Integer.valueOf(prop).intValue();
98 } catch (NumberFormatException e) {
99 return MAX_WRITE_QUEUE_SIZE_DEFAULT;
100 }
101 }
102
103 @Override
104 public EntityDefault getEntityDefaultFromArchive( String entityId ) {
105 if (StringUtils.isBlank(entityId)) {
106 throw new IllegalArgumentException("entityId is blank");
107 }
108
109 Map<String,String> criteria = new HashMap<String, String>(1);
110 criteria.put(KimConstants.PrimaryKeyConstants.SUB_ENTITY_ID, entityId);
111 EntityDefaultInfoCacheBo cachedValue = businessObjectService.findByPrimaryKey(EntityDefaultInfoCacheBo.class, criteria);
112 return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
113 }
114
115 @Override
116 public EntityDefault getEntityDefaultFromArchiveByPrincipalId(String principalId) {
117 if (StringUtils.isBlank(principalId)) {
118 throw new IllegalArgumentException("principalId is blank");
119 }
120
121 Map<String,String> criteria = new HashMap<String, String>(1);
122 criteria.put("principalId", principalId);
123 EntityDefaultInfoCacheBo cachedValue = businessObjectService.findByPrimaryKey(EntityDefaultInfoCacheBo.class, criteria);
124 return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
125 }
126
127 @Override
128 public EntityDefault getEntityDefaultFromArchiveByPrincipalName(String principalName) {
129 if (StringUtils.isBlank(principalName)) {
130 throw new IllegalArgumentException("principalName is blank");
131 }
132
133 Map<String,String> criteria = new HashMap<String, String>(1);
134 criteria.put("principalName", principalName);
135 Collection<EntityDefaultInfoCacheBo> entities = businessObjectService.findMatching(EntityDefaultInfoCacheBo.class, criteria);
136 return (entities == null || entities.isEmpty()) ? null : entities.iterator().next().convertCacheToEntityDefaultInfo();
137 }
138
139 @Override
140 public EntityDefault getEntityDefaultFromArchiveByEmployeeId(String employeeId) {
141 if (StringUtils.isBlank(employeeId)) {
142 throw new IllegalArgumentException("employeeId is blank");
143 }
144 Map<String,String> criteria = new HashMap<String, String>(1);
145 criteria.put("employeeId", employeeId);
146 Collection<EntityDefaultInfoCacheBo> entities = businessObjectService.findMatching(EntityDefaultInfoCacheBo.class, criteria);
147 return (entities == null || entities.isEmpty()) ? null : entities.iterator().next().convertCacheToEntityDefaultInfo();
148 }
149
150 @Override
151 public void saveEntityDefaultToArchive(EntityDefault entity) {
152 if (entity == null) {
153 throw new IllegalArgumentException("entity is blank");
154 }
155
156
157 if (getMaxWriteQueueSize() <= writeQueue.offerAndGetSize(entity)
158 writer.requestSubmit()) {
159 KSBServiceLocator.getThreadPool().execute(maxQueueSizeExceededWriter);
160 }
161 }
162
163 @Override
164 public void flushToArchive() {
165 writer.call();
166 }
167
168
169 public void setBusinessObjectService(BusinessObjectService businessObjectService) {
170 this.businessObjectService = businessObjectService;
171 }
172
173 public void setKualiConfigurationService(
174 ConfigurationService kualiConfigurationService) {
175 this.kualiConfigurationService = kualiConfigurationService;
176 }
177
178
179 @Override
180 public void afterPropertiesSet() throws Exception {
181 LOG.info("scheduling writer...");
182 KSBServiceLocator.getScheduledPool().scheduleAtFixedRate(scheduledWriter,
183 getExecutionIntervalSeconds(), getExecutionIntervalSeconds(), TimeUnit.SECONDS);
184 }
185
186
187 @Override
188 public void destroy() throws Exception {
189 KSBServiceLocator.getThreadPool().execute(shutdownWriter);
190 }
191
192
193
194
195
196
197
198
199 private class EntityArchiveWriter implements Callable {
200
201
202 AtomicBoolean currentlySubmitted = new AtomicBoolean(false);
203
204 private final Comparator<Comparable> nullSafeComparator = new Comparator<Comparable>() {
205 @Override
206 public int compare(Comparable i1, Comparable i2) {
207 if (i1 != null && i2 != null) {
208 return i1.compareTo(i2);
209 } else if (i1 == null) {
210 if (i2 == null) {
211 return 0;
212 } else {
213 return -1;
214 }
215 } else {
216 return 1;
217 }
218 };
219 };
220
221
222
223
224 private final Comparator<EntityDefault> kediComparator = new Comparator<EntityDefault>() {
225
226
227
228
229 @Override
230 public int compare(EntityDefault o1, EntityDefault o2) {
231 String entityId1 = (o1 == null) ? null : o1.getEntityId();
232 String entityId2 = (o2 == null) ? null : o2.getEntityId();
233
234 int result = nullSafeComparator.compare(entityId1, entityId2);
235
236 if (result == 0) {
237 result = getPrincipalIdsString(o1).compareTo(getPrincipalIdsString(o2));
238 }
239
240 return result;
241 }
242
243
244
245
246
247
248
249 private String getPrincipalIdsString(EntityDefault entity) {
250 String result = "";
251 if (entity != null) {
252 List<Principal> principals = entity.getPrincipals();
253 if (principals != null) {
254 if (principals.size() == 1) {
255 result = principals.get(0).getPrincipalId();
256 } else {
257 String [] ids = new String [principals.size()];
258 int insertIndex = 0;
259 for (Principal principal : principals) {
260 ids[insertIndex++] = principal.getPrincipalId();
261 }
262 Arrays.sort(ids);
263 result = StringUtils.join(ids, "\n");
264 }
265 }
266 }
267 return result;
268 }
269 };
270
271 public boolean requestSubmit() {
272 return currentlySubmitted.compareAndSet(false, true);
273 }
274
275
276
277
278
279 @Override
280 public Object call() {
281 try {
282
283
284
285 PlatformTransactionManager transactionManager = KRADServiceLocatorInternal.getTransactionManager();
286 TransactionTemplate template = new TransactionTemplate(transactionManager);
287 template.execute(new TransactionCallback() {
288 @Override
289 public Object doInTransaction(TransactionStatus status) {
290 EntityDefault entity = null;
291 ArrayList<EntityDefault> entitiesToInsert = new ArrayList<EntityDefault>(getMaxWriteQueueSize());
292 Set<String> deduper = new HashSet<String>(getMaxWriteQueueSize());
293
294
295 while (entitiesToInsert.size() < getMaxWriteQueueSize() && null != (entity = writeQueue.poll())) {
296 if (deduper.add(entity.getEntityId())) {
297 entitiesToInsert.add(entity);
298 }
299 }
300
301 Collections.sort(entitiesToInsert, kediComparator);
302 List<EntityDefaultInfoCacheBo> entityCache = new ArrayList<EntityDefaultInfoCacheBo>(entitiesToInsert.size());
303 for (EntityDefault entityToInsert : entitiesToInsert) {
304 entityCache.add(new EntityDefaultInfoCacheBo( entityToInsert ));
305 }
306 businessObjectService.save(entityCache);
307
308
309
310 return null;
311 }
312 });
313 } finally {
314 currentlySubmitted.compareAndSet(true, false);
315 }
316
317 return Boolean.TRUE;
318 }
319 }
320
321
322
323
324
325
326
327
328
329 private static class WriteQueue {
330 AtomicInteger writeQueueSize = new AtomicInteger(0);
331 ConcurrentLinkedQueue<EntityDefault> queue = new ConcurrentLinkedQueue<EntityDefault>();
332
333 public int offerAndGetSize(EntityDefault entity) {
334 queue.add(entity);
335 return writeQueueSize.incrementAndGet();
336 }
337
338 private EntityDefault poll() {
339 EntityDefault result = queue.poll();
340 if (result != null) { writeQueueSize.decrementAndGet(); }
341 return result;
342 }
343 }
344
345
346
347
348
349
350
351 private static class PreLogCallableWrapper<A> implements Callable<A> {
352
353 private final Callable inner;
354 private final Level level;
355 private final String message;
356
357 public PreLogCallableWrapper(Callable inner, Level level, String message) {
358 this.inner = inner;
359 this.level = level;
360 this.message = message;
361 }
362
363
364
365
366
367
368 @Override
369 @SuppressWarnings("unchecked")
370 public A call() throws Exception {
371 LOG.log(level, message);
372 return (A)inner.call();
373 }
374 }
375
376
377
378
379
380
381
382 private static class CallableAdapter implements Runnable {
383
384 private final Callable callable;
385
386 public CallableAdapter(Callable callable) {
387 this.callable = callable;
388 }
389
390 @Override
391 public void run() {
392 try {
393 callable.call();
394 } catch (Exception e) {
395 throw new RuntimeException(e);
396 }
397 }
398 }
399 }