1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.kim.service.impl;
17
18 import org.apache.commons.lang.StringUtils;
19 import org.apache.log4j.Level;
20 import org.apache.log4j.Logger;
21 import org.kuali.rice.core.api.config.property.ConfigurationService;
22 import org.kuali.rice.kim.api.KimConstants;
23 import org.kuali.rice.kim.impl.identity.IdentityArchiveService;
24 import org.kuali.rice.kim.api.identity.entity.EntityDefault;
25 import org.kuali.rice.kim.api.identity.principal.Principal;
26 import org.kuali.rice.kim.impl.identity.EntityDefaultInfoCacheBo;
27 import org.kuali.rice.krad.service.BusinessObjectService;
28 import org.kuali.rice.krad.service.KRADServiceLocatorInternal;
29 import org.kuali.rice.ksb.service.KSBServiceLocator;
30 import org.springframework.beans.factory.DisposableBean;
31 import org.springframework.beans.factory.InitializingBean;
32 import org.springframework.transaction.PlatformTransactionManager;
33 import org.springframework.transaction.TransactionStatus;
34 import org.springframework.transaction.support.TransactionCallback;
35 import org.springframework.transaction.support.TransactionTemplate;
36
37 import java.util.ArrayList;
38 import java.util.Arrays;
39 import java.util.Collection;
40 import java.util.Collections;
41 import java.util.Comparator;
42 import java.util.HashMap;
43 import java.util.HashSet;
44 import java.util.List;
45 import java.util.Map;
46 import java.util.Set;
47 import java.util.concurrent.Callable;
48 import java.util.concurrent.ConcurrentLinkedQueue;
49 import java.util.concurrent.TimeUnit;
50 import java.util.concurrent.atomic.AtomicBoolean;
51 import java.util.concurrent.atomic.AtomicInteger;
52
53
54
55
56
57
58
59 public class IdentityArchiveServiceImpl implements IdentityArchiveService, InitializingBean, DisposableBean {
60 private static final Logger LOG = Logger.getLogger( IdentityArchiveServiceImpl.class );
61
62 private BusinessObjectService businessObjectService;
63 private ConfigurationService kualiConfigurationService;
64 private PlatformTransactionManager transactionManager;
65
66 private static final String EXEC_INTERVAL_SECS = "kim.identityArchiveServiceImpl.executionIntervalSeconds";
67 private static final String MAX_WRITE_QUEUE_SIZE = "kim.identityArchiveServiceImpl.maxWriteQueueSize";
68 private static final int EXECUTION_INTERVAL_SECONDS_DEFAULT = 600;
69 private static final int MAX_WRITE_QUEUE_SIZE_DEFAULT = 300;
70
71 private final WriteQueue writeQueue = new WriteQueue();
72 private final EntityArchiveWriter writer = new EntityArchiveWriter();
73
74
75 private final Runnable maxQueueSizeExceededWriter =
76 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "max size exceeded, flushing write queue"));
77
78
79 private final Runnable scheduledWriter =
80 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "scheduled write out, flushing write queue"));
81
82
83 private final Runnable shutdownWriter =
84 new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "rice is shutting down, flushing write queue"));
85
86 private int getExecutionIntervalSeconds() {
87 final String prop = kualiConfigurationService.getPropertyValueAsString(EXEC_INTERVAL_SECS);
88 try {
89 return Integer.valueOf(prop).intValue();
90 } catch (NumberFormatException e) {
91 return EXECUTION_INTERVAL_SECONDS_DEFAULT;
92 }
93 }
94
95 private int getMaxWriteQueueSize() {
96 final String prop = kualiConfigurationService.getPropertyValueAsString(MAX_WRITE_QUEUE_SIZE);
97 try {
98 return Integer.valueOf(prop).intValue();
99 } catch (NumberFormatException e) {
100 return MAX_WRITE_QUEUE_SIZE_DEFAULT;
101 }
102 }
103
104 @Override
105 public EntityDefault getEntityDefaultFromArchive( String entityId ) {
106 if (StringUtils.isBlank(entityId)) {
107 throw new IllegalArgumentException("entityId is blank");
108 }
109
110 Map<String,String> criteria = new HashMap<String, String>(1);
111 criteria.put(KimConstants.PrimaryKeyConstants.SUB_ENTITY_ID, entityId);
112 EntityDefaultInfoCacheBo cachedValue = businessObjectService.findByPrimaryKey(EntityDefaultInfoCacheBo.class, criteria);
113 return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
114 }
115
116 @Override
117 public EntityDefault getEntityDefaultFromArchiveByPrincipalId(String principalId) {
118 if (StringUtils.isBlank(principalId)) {
119 throw new IllegalArgumentException("principalId is blank");
120 }
121
122 Map<String,String> criteria = new HashMap<String, String>(1);
123 criteria.put("principalId", principalId);
124 EntityDefaultInfoCacheBo cachedValue = businessObjectService.findByPrimaryKey(EntityDefaultInfoCacheBo.class, criteria);
125 return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
126 }
127
128 @Override
129 public EntityDefault getEntityDefaultFromArchiveByPrincipalName(String principalName) {
130 if (StringUtils.isBlank(principalName)) {
131 throw new IllegalArgumentException("principalName is blank");
132 }
133
134 Map<String,String> criteria = new HashMap<String, String>(1);
135 criteria.put("principalName", principalName);
136 Collection<EntityDefaultInfoCacheBo> entities = businessObjectService.findMatching(EntityDefaultInfoCacheBo.class, criteria);
137 return (entities == null || entities.isEmpty()) ? null : entities.iterator().next().convertCacheToEntityDefaultInfo();
138 }
139
140 @Override
141 public EntityDefault getEntityDefaultFromArchiveByEmployeeId(String employeeId) {
142 if (StringUtils.isBlank(employeeId)) {
143 throw new IllegalArgumentException("employeeId is blank");
144 }
145 Map<String,String> criteria = new HashMap<String, String>(1);
146 criteria.put("employeeId", employeeId);
147 Collection<EntityDefaultInfoCacheBo> entities = businessObjectService.findMatching(EntityDefaultInfoCacheBo.class, criteria);
148 return (entities == null || entities.isEmpty()) ? null : entities.iterator().next().convertCacheToEntityDefaultInfo();
149 }
150
151 @Override
152 public void saveEntityDefaultToArchive(EntityDefault entity) {
153 if (entity == null) {
154 throw new IllegalArgumentException("entity is blank");
155 }
156
157
158 if (getMaxWriteQueueSize() <= writeQueue.offerAndGetSize(entity)
159 writer.requestSubmit()) {
160 KSBServiceLocator.getThreadPool().execute(maxQueueSizeExceededWriter);
161 }
162 }
163
164 @Override
165 public void flushToArchive() {
166 writer.call();
167 }
168
169
170 public void setBusinessObjectService(BusinessObjectService businessObjectService) {
171 this.businessObjectService = businessObjectService;
172 }
173
174 public void setKualiConfigurationService(
175 ConfigurationService kualiConfigurationService) {
176 this.kualiConfigurationService = kualiConfigurationService;
177 }
178
179 public void setTransactionManager(PlatformTransactionManager txMgr) {
180 this.transactionManager = txMgr;
181 }
182
183
184 @Override
185 public void afterPropertiesSet() throws Exception {
186 LOG.info("scheduling writer...");
187 KSBServiceLocator.getScheduledPool().scheduleAtFixedRate(scheduledWriter,
188 getExecutionIntervalSeconds(), getExecutionIntervalSeconds(), TimeUnit.SECONDS);
189 }
190
191
192 @Override
193 public void destroy() throws Exception {
194 KSBServiceLocator.getThreadPool().execute(shutdownWriter);
195 }
196
197
198
199
200
201
202
203
204 private class EntityArchiveWriter implements Callable {
205
206
207 AtomicBoolean currentlySubmitted = new AtomicBoolean(false);
208
209 private final Comparator<Comparable> nullSafeComparator = new Comparator<Comparable>() {
210 @Override
211 public int compare(Comparable i1, Comparable i2) {
212 if (i1 != null && i2 != null) {
213 return i1.compareTo(i2);
214 } else if (i1 == null) {
215 if (i2 == null) {
216 return 0;
217 } else {
218 return -1;
219 }
220 } else {
221 return 1;
222 }
223 };
224 };
225
226
227
228
229 private final Comparator<EntityDefault> kediComparator = new Comparator<EntityDefault>() {
230
231
232
233
234 @Override
235 public int compare(EntityDefault o1, EntityDefault o2) {
236 String entityId1 = (o1 == null) ? null : o1.getEntityId();
237 String entityId2 = (o2 == null) ? null : o2.getEntityId();
238
239 int result = nullSafeComparator.compare(entityId1, entityId2);
240
241 if (result == 0) {
242 result = getPrincipalIdsString(o1).compareTo(getPrincipalIdsString(o2));
243 }
244
245 return result;
246 }
247
248
249
250
251
252
253
254 private String getPrincipalIdsString(EntityDefault entity) {
255 String result = "";
256 if (entity != null) {
257 List<Principal> principals = entity.getPrincipals();
258 if (principals != null) {
259 if (principals.size() == 1) {
260 result = principals.get(0).getPrincipalId();
261 } else {
262 String [] ids = new String [principals.size()];
263 int insertIndex = 0;
264 for (Principal principal : principals) {
265 ids[insertIndex++] = principal.getPrincipalId();
266 }
267 Arrays.sort(ids);
268 result = StringUtils.join(ids, "\n");
269 }
270 }
271 }
272 return result;
273 }
274 };
275
276 public boolean requestSubmit() {
277 return currentlySubmitted.compareAndSet(false, true);
278 }
279
280
281
282
283
284 @Override
285 public Object call() {
286 try {
287
288
289
290 TransactionTemplate template = new TransactionTemplate(transactionManager);
291 template.execute(new TransactionCallback() {
292 @Override
293 public Object doInTransaction(TransactionStatus status) {
294 EntityDefault entity = null;
295 ArrayList<EntityDefault> entitiesToInsert = new ArrayList<EntityDefault>(getMaxWriteQueueSize());
296 Set<String> deduper = new HashSet<String>(getMaxWriteQueueSize());
297
298
299 while (entitiesToInsert.size() < getMaxWriteQueueSize() && null != (entity = writeQueue.poll())) {
300 if (deduper.add(entity.getEntityId())) {
301 entitiesToInsert.add(entity);
302 }
303 }
304
305 Collections.sort(entitiesToInsert, kediComparator);
306 List<EntityDefaultInfoCacheBo> entityCache = new ArrayList<EntityDefaultInfoCacheBo>(entitiesToInsert.size());
307 for (EntityDefault entityToInsert : entitiesToInsert) {
308 entityCache.add(new EntityDefaultInfoCacheBo( entityToInsert ));
309 }
310 businessObjectService.save(entityCache);
311
312
313
314 return null;
315 }
316 });
317 } finally {
318 currentlySubmitted.compareAndSet(true, false);
319 }
320
321 return Boolean.TRUE;
322 }
323 }
324
325
326
327
328
329
330
331
332
333 private static class WriteQueue {
334 AtomicInteger writeQueueSize = new AtomicInteger(0);
335 ConcurrentLinkedQueue<EntityDefault> queue = new ConcurrentLinkedQueue<EntityDefault>();
336
337 public int offerAndGetSize(EntityDefault entity) {
338 queue.add(entity);
339 return writeQueueSize.incrementAndGet();
340 }
341
342 private EntityDefault poll() {
343 EntityDefault result = queue.poll();
344 if (result != null) { writeQueueSize.decrementAndGet(); }
345 return result;
346 }
347 }
348
349
350
351
352
353
354
355 private static class PreLogCallableWrapper<A> implements Callable<A> {
356
357 private final Callable inner;
358 private final Level level;
359 private final String message;
360
361 public PreLogCallableWrapper(Callable inner, Level level, String message) {
362 this.inner = inner;
363 this.level = level;
364 this.message = message;
365 }
366
367
368
369
370
371
372 @Override
373 @SuppressWarnings("unchecked")
374 public A call() throws Exception {
375 LOG.log(level, message);
376 return (A)inner.call();
377 }
378 }
379
380
381
382
383
384
385
386 private static class CallableAdapter implements Runnable {
387
388 private final Callable callable;
389
390 public CallableAdapter(Callable callable) {
391 this.callable = callable;
392 }
393
394 @Override
395 public void run() {
396 try {
397 callable.call();
398 } catch (Exception e) {
399 throw new RuntimeException(e);
400 }
401 }
402 }
403 }