001    /**
002     * Copyright 2005-2014 The Kuali Foundation
003     *
004     * Licensed under the Educational Community License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.opensource.org/licenses/ecl2.php
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package org.kuali.rice.kim.service.impl;
017    
018    import org.apache.commons.lang.StringUtils;
019    import org.apache.log4j.Level;
020    import org.apache.log4j.Logger;
021    import org.kuali.rice.core.api.config.property.ConfigurationService;
022    import org.kuali.rice.kim.api.KimConstants;
023    import org.kuali.rice.kim.impl.identity.IdentityArchiveService;
024    import org.kuali.rice.kim.api.identity.entity.EntityDefault;
025    import org.kuali.rice.kim.api.identity.principal.Principal;
026    import org.kuali.rice.kim.impl.identity.EntityDefaultInfoCacheBo;
027    import org.kuali.rice.krad.service.BusinessObjectService;
028    import org.kuali.rice.ksb.service.KSBServiceLocator;
029    import org.springframework.beans.factory.DisposableBean;
030    import org.springframework.beans.factory.InitializingBean;
031    import org.springframework.transaction.PlatformTransactionManager;
032    import org.springframework.transaction.TransactionStatus;
033    import org.springframework.transaction.support.TransactionCallback;
034    import org.springframework.transaction.support.TransactionTemplate;
035    
036    import java.util.ArrayList;
037    import java.util.Arrays;
038    import java.util.Collection;
039    import java.util.Collections;
040    import java.util.Comparator;
041    import java.util.HashMap;
042    import java.util.HashSet;
043    import java.util.List;
044    import java.util.Map;
045    import java.util.Set;
046    import java.util.concurrent.Callable;
047    import java.util.concurrent.ConcurrentLinkedQueue;
048    import java.util.concurrent.TimeUnit;
049    import java.util.concurrent.atomic.AtomicBoolean;
050    import java.util.concurrent.atomic.AtomicInteger;
051    
052    /**
053     * This is the default implementation for the IdentityArchiveService.
054     * @see IdentityArchiveService
055     *
056     * @author Kuali Rice Team (rice.collab@kuali.org)
057     */
058    public class IdentityArchiveServiceImpl implements IdentityArchiveService, InitializingBean, DisposableBean {
059            private static final Logger LOG = Logger.getLogger( IdentityArchiveServiceImpl.class );
060    
061            private BusinessObjectService businessObjectService;
062            private ConfigurationService kualiConfigurationService;
063        private PlatformTransactionManager transactionManager;
064    
065            private static final String EXEC_INTERVAL_SECS = "kim.identityArchiveServiceImpl.executionIntervalSeconds";
066            private static final String MAX_WRITE_QUEUE_SIZE = "kim.identityArchiveServiceImpl.maxWriteQueueSize";
067            private static final int EXECUTION_INTERVAL_SECONDS_DEFAULT = 600; // by default, flush the write queue this often
068            private static final int MAX_WRITE_QUEUE_SIZE_DEFAULT = 300; // cache this many KEDI's before forcing write
069    
070            private final WriteQueue writeQueue = new WriteQueue();
071            private final EntityArchiveWriter writer = new EntityArchiveWriter();
072    
073            // all this ceremony just decorates the writer so it logs a message first, and converts the Callable to Runnable
074            private final Runnable maxQueueSizeExceededWriter =
075                    new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "max size exceeded, flushing write queue"));
076    
077            // ditto
078            private final Runnable scheduledWriter =
079                    new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "scheduled write out, flushing write queue"));
080    
081            // ditto
082            private final Runnable shutdownWriter =
083                    new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "rice is shutting down, flushing write queue"));
084            
085            private int getExecutionIntervalSeconds() {
086                    final String prop = kualiConfigurationService.getPropertyValueAsString(EXEC_INTERVAL_SECS);
087                    try {
088                            return Integer.valueOf(prop).intValue();
089                    } catch (NumberFormatException e) {
090                            return EXECUTION_INTERVAL_SECONDS_DEFAULT;
091                    }
092            }
093            
094            private int getMaxWriteQueueSize() {
095                    final String prop = kualiConfigurationService.getPropertyValueAsString(MAX_WRITE_QUEUE_SIZE);
096                    try {
097                            return Integer.valueOf(prop).intValue();
098                    } catch (NumberFormatException e) {
099                            return MAX_WRITE_QUEUE_SIZE_DEFAULT;
100                    }
101            }
102    
103            @Override
104            public EntityDefault getEntityDefaultFromArchive( String entityId ) {
105            if (StringUtils.isBlank(entityId)) {
106                throw new IllegalArgumentException("entityId is blank");
107            }
108    
109            Map<String,String> criteria = new HashMap<String, String>(1);
110            criteria.put(KimConstants.PrimaryKeyConstants.SUB_ENTITY_ID, entityId);
111            EntityDefaultInfoCacheBo cachedValue = businessObjectService.findByPrimaryKey(EntityDefaultInfoCacheBo.class, criteria);
112            return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
113        }
114    
115        @Override
116            public EntityDefault getEntityDefaultFromArchiveByPrincipalId(String principalId) {
117            if (StringUtils.isBlank(principalId)) {
118                throw new IllegalArgumentException("principalId is blank");
119            }
120    
121            Map<String,String> criteria = new HashMap<String, String>(1);
122            criteria.put("principalId", principalId);
123            EntityDefaultInfoCacheBo cachedValue = businessObjectService.findByPrimaryKey(EntityDefaultInfoCacheBo.class, criteria);
124            return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
125        }
126    
127        @Override
128            public EntityDefault getEntityDefaultFromArchiveByPrincipalName(String principalName) {
129            if (StringUtils.isBlank(principalName)) {
130                throw new IllegalArgumentException("principalName is blank");
131            }
132    
133            Map<String,String> criteria = new HashMap<String, String>(1);
134            criteria.put("principalName", principalName);
135            Collection<EntityDefaultInfoCacheBo> entities = businessObjectService.findMatching(EntityDefaultInfoCacheBo.class, criteria);
136            return (entities == null || entities.isEmpty()) ? null : entities.iterator().next().convertCacheToEntityDefaultInfo();
137        }
138        
139        @Override
140        public EntityDefault getEntityDefaultFromArchiveByEmployeeId(String employeeId) {
141            if (StringUtils.isBlank(employeeId)) {
142                throw new IllegalArgumentException("employeeId is blank");
143            }
144            Map<String,String> criteria = new HashMap<String, String>(1);
145            criteria.put("employeeId", employeeId);
146            Collection<EntityDefaultInfoCacheBo> entities = businessObjectService.findMatching(EntityDefaultInfoCacheBo.class, criteria);
147            return (entities == null || entities.isEmpty()) ? null : entities.iterator().next().convertCacheToEntityDefaultInfo();
148        }
149        
150        @Override
151            public void saveEntityDefaultToArchive(EntityDefault entity) {
152            if (entity == null) {
153                throw new IllegalArgumentException("entity is blank");
154            }
155    
156            // if the max size has been reached, schedule now
157            if (getMaxWriteQueueSize() <= writeQueue.offerAndGetSize(entity) /* <- this enqueues the KEDI */ &&
158                            writer.requestSubmit()) {
159                    KSBServiceLocator.getThreadPool().execute(maxQueueSizeExceededWriter);
160            }
161        }
162    
163        @Override
164        public void flushToArchive() {
165            writer.call();
166        }
167    
168    
169            public void setBusinessObjectService(BusinessObjectService businessObjectService) {
170                    this.businessObjectService = businessObjectService;
171            }
172    
173            public void setKualiConfigurationService(
174                            ConfigurationService kualiConfigurationService) {
175                    this.kualiConfigurationService = kualiConfigurationService;
176            }
177    
178        public void setTransactionManager(PlatformTransactionManager txMgr) {
179            this.transactionManager = txMgr;
180        }
181        
182        /** schedule the writer on the KSB scheduled pool. */
183            @Override
184            public void afterPropertiesSet() throws Exception {
185                    LOG.info("scheduling writer...");
186                    KSBServiceLocator.getScheduledPool().scheduleAtFixedRate(scheduledWriter,
187                                    getExecutionIntervalSeconds(), getExecutionIntervalSeconds(), TimeUnit.SECONDS);
188            }
189    
190            /** flush the write queue immediately. */
191            @Override
192            public void destroy() throws Exception {
193                    KSBServiceLocator.getThreadPool().execute(shutdownWriter);
194            }
195    
196            /**
197             * store the person to the database, but do this an alternate thread to
198             * prevent transaction issues since this service is non-transactional
199             *
200             * @author Kuali Rice Team (rice.collab@kuali.org)
201             *
202             */
203            private class EntityArchiveWriter implements Callable {
204    
205                    // flag used to prevent multiple processes from being submitted at once
206                    AtomicBoolean currentlySubmitted = new AtomicBoolean(false);
207    
208                    private final Comparator<Comparable> nullSafeComparator = new Comparator<Comparable>() {
209                            @Override
210                            public int compare(Comparable i1, Comparable i2) {
211                                    if (i1 != null && i2 != null) {
212                                            return i1.compareTo(i2);
213                                    } else if (i1 == null) {
214                                            if (i2 == null) {
215                                                    return 0;
216                                            } else {
217                                                    return -1;
218                                            }
219                                    } else { // if (entityId2 == null) {
220                                            return 1;
221                                    }
222                            };
223                    };
224    
225                    /**
226                     * Comparator that attempts to impose a total ordering on EntityDefault instances
227                     */
228                    private final Comparator<EntityDefault> kediComparator = new Comparator<EntityDefault>() {
229                            /**
230                             * compares by entityId value
231                             * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
232                             */
233                            @Override
234                            public int compare(EntityDefault o1, EntityDefault o2) {
235                                    String entityId1 = (o1 == null) ? null : o1.getEntityId();
236                                    String entityId2 = (o2 == null) ? null : o2.getEntityId();
237    
238                                    int result = nullSafeComparator.compare(entityId1, entityId2);
239    
240                                    if (result == 0) {
241                                            result = getPrincipalIdsString(o1).compareTo(getPrincipalIdsString(o2));
242                                    }
243    
244                                    return result;
245                            }
246    
247                            /**
248                             * This method builds a newline delimited String containing the identity's principal IDs in sorted order
249                             *
250                             * @param entity
251                             * @return
252                             */
253                            private String getPrincipalIdsString(EntityDefault entity) {
254                                    String result = "";
255                                    if (entity != null) {
256                                            List<Principal> principals = entity.getPrincipals();
257                                            if (principals != null) {
258                                                    if (principals.size() == 1) { // one
259                                                            result = principals.get(0).getPrincipalId();
260                                                    } else { // multiple
261                                                            String [] ids = new String [principals.size()];
262                                                            int insertIndex = 0;
263                                                            for (Principal principal : principals) {
264                                                                    ids[insertIndex++] = principal.getPrincipalId();
265                                                            }
266                                                            Arrays.sort(ids);
267                                                            result = StringUtils.join(ids, "\n");
268                                                    }
269                                            }
270                                    }
271                                    return result;
272                            }
273                    };
274    
275                    public boolean requestSubmit() {
276                            return currentlySubmitted.compareAndSet(false, true);
277                    }
278    
279                    /**
280                     * Call that tries to flush the write queue.
281                     * @see Callable#call()
282                     */
283                    @Override
284                    public Object call() {
285                            try {
286                                    // the strategy is to grab chunks of entities, dedupe & sort them, and insert them in a big
287                                    // batch to reduce transaction overhead.  Sorting is done so insertion order is guaranteed, which
288                                    // prevents deadlocks between concurrent writers to the database.
289                                    TransactionTemplate template = new TransactionTemplate(transactionManager);
290                                    template.execute(new TransactionCallback() {
291                                            @Override
292                                            public Object doInTransaction(TransactionStatus status) {
293                                                    EntityDefault entity = null;
294                                                    ArrayList<EntityDefault> entitiesToInsert = new ArrayList<EntityDefault>(getMaxWriteQueueSize());
295                                                    Set<String> deduper = new HashSet<String>(getMaxWriteQueueSize());
296    
297                                                    // order is important in this conditional so that elements aren't dequeued and then ignored
298                                                    while (entitiesToInsert.size() < getMaxWriteQueueSize() && null != (entity = writeQueue.poll())) {
299                                                            if (deduper.add(entity.getEntityId())) {
300                                                                    entitiesToInsert.add(entity);
301                                                            }
302                                                    }
303    
304                                                    Collections.sort(entitiesToInsert, kediComparator);
305                            List<EntityDefaultInfoCacheBo> entityCache = new ArrayList<EntityDefaultInfoCacheBo>(entitiesToInsert.size());
306                            for (EntityDefault entityToInsert : entitiesToInsert) {
307                                entityCache.add(new EntityDefaultInfoCacheBo( entityToInsert ));
308                            }
309                            businessObjectService.save(entityCache);
310                                                    //for (EntityDefault entityToInsert : entitiesToInsert) {
311                                                    //      businessObjectService.save( new EntityDefaultInfoCacheBo( entityToInsert ) );
312                                                    //}
313                                                    return null;
314                                            }
315                                    });
316                            } finally { // make sure our running flag is unset, otherwise we'll never run again
317                                    currentlySubmitted.compareAndSet(true, false);
318                            }
319    
320                            return Boolean.TRUE;
321                    }
322            }
323    
324            /**
325             * A class encapsulating a {@link ConcurrentLinkedQueue} and an {@link AtomicInteger} to
326             * provide fast offer(enqueue)/poll(dequeue) and size checking.  Size may be approximate due to concurrent
327             * activity, but for our purposes that is fine.
328             *
329             * @author Kuali Rice Team (rice.collab@kuali.org)
330             *
331             */
332            private static class WriteQueue {
333                    AtomicInteger writeQueueSize = new AtomicInteger(0);
334                    ConcurrentLinkedQueue<EntityDefault> queue = new ConcurrentLinkedQueue<EntityDefault>();
335    
336                    public int offerAndGetSize(EntityDefault entity) {
337                            queue.add(entity);
338                            return writeQueueSize.incrementAndGet();
339                    }
340    
341                    private EntityDefault poll() {
342                            EntityDefault result = queue.poll();
343                            if (result != null) { writeQueueSize.decrementAndGet(); }
344                            return result;
345                    }
346            }
347    
348            /**
349             * decorator for a callable to log a message before it is executed
350             *
351             * @author Kuali Rice Team (rice.collab@kuali.org)
352             *
353             */
354            private static class PreLogCallableWrapper<A> implements Callable<A> {
355                    
356                    private final Callable inner;
357                    private final Level level;
358                    private final String message;
359    
360                    public PreLogCallableWrapper(Callable inner, Level level, String message) {
361                            this.inner = inner;
362                            this.level = level;
363                            this.message = message;
364                    }
365    
366                    /**
367                     * logs the message then calls the inner Callable
368                     * 
369                     * @see java.util.concurrent.Callable#call()
370                     */
371                    @Override
372                    @SuppressWarnings("unchecked")
373                    public A call() throws Exception {
374                            LOG.log(level, message);
375                            return (A)inner.call();
376                    }
377            }
378    
379            /**
380             * Adapts a Callable to be Runnable
381             *
382             * @author Kuali Rice Team (rice.collab@kuali.org)
383             *
384             */
385            private static class CallableAdapter implements Runnable {
386    
387                    private final Callable callable;
388    
389                    public CallableAdapter(Callable callable) {
390                            this.callable = callable;
391                    }
392    
393                    @Override
394                    public void run() {
395                            try {
396                                    callable.call();
397                            } catch (Exception e) {
398                                    throw new RuntimeException(e);
399                            }
400                    }
401            }
402    }