001    /*
002     * Copyright 2006-2011 The Kuali Foundation
003     *
004     * Licensed under the Educational Community License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.opensource.org/licenses/ecl2.php
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
017    package org.kuali.rice.kim.service.impl;
019    import org.apache.commons.lang.StringUtils;
020    import org.apache.log4j.Level;
021    import org.apache.log4j.Logger;
022    import org.kuali.rice.core.api.config.property.ConfigurationService;
023    import org.kuali.rice.kim.api.identity.entity.EntityDefault;
024    import org.kuali.rice.kim.api.identity.principal.Principal;
025    import org.kuali.rice.kim.api.services.IdentityArchiveService;
026    import org.kuali.rice.kim.bo.entity.impl.KimEntityDefaultInfoCacheImpl;
027    import org.kuali.rice.kim.util.KimConstants;
028    import org.kuali.rice.krad.service.BusinessObjectService;
029    import org.kuali.rice.krad.service.KRADServiceLocatorInternal;
030    import org.kuali.rice.ksb.service.KSBServiceLocator;
031    import org.springframework.beans.factory.DisposableBean;
032    import org.springframework.beans.factory.InitializingBean;
033    import org.springframework.transaction.PlatformTransactionManager;
034    import org.springframework.transaction.TransactionStatus;
035    import org.springframework.transaction.support.TransactionCallback;
036    import org.springframework.transaction.support.TransactionTemplate;
038    import java.util.ArrayList;
039    import java.util.Arrays;
040    import java.util.Collection;
041    import java.util.Collections;
042    import java.util.Comparator;
043    import java.util.HashMap;
044    import java.util.HashSet;
045    import java.util.List;
046    import java.util.Map;
047    import java.util.Set;
048    import java.util.concurrent.Callable;
049    import java.util.concurrent.ConcurrentLinkedQueue;
050    import java.util.concurrent.TimeUnit;
051    import java.util.concurrent.atomic.AtomicBoolean;
052    import java.util.concurrent.atomic.AtomicInteger;
054    /**
055     * This is the default implementation for the IdentityArchiveService.
056     * @see IdentityArchiveService
057     * @author Kuali Rice Team (rice.collab@kuali.org)
058     *
059     */
060    public class IdentityArchiveServiceImpl implements IdentityArchiveService, InitializingBean, DisposableBean {
061            private static final Logger LOG = Logger.getLogger( IdentityArchiveServiceImpl.class );
063            private BusinessObjectService businessObjectService;
064            private ConfigurationService kualiConfigurationService;
066            private static final String EXEC_INTERVAL_SECS = "kim.identityArchiveServiceImpl.executionIntervalSeconds";
067            private static final String MAX_WRITE_QUEUE_SIZE = "kim.identityArchiveServiceImpl.maxWriteQueueSize";
068            private static final int EXECUTION_INTERVAL_SECONDS_DEFAULT = 600; // by default, flush the write queue this often
069            private static final int MAX_WRITE_QUEUE_SIZE_DEFAULT = 300; // cache this many KEDI's before forcing write
071            private final WriteQueue writeQueue = new WriteQueue();
072            private final EntityArchiveWriter writer = new EntityArchiveWriter();
074            // all this ceremony just decorates the writer so it logs a message first, and converts the Callable to Runnable
075            private final Runnable maxQueueSizeExceededWriter =
076                    new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "max size exceeded, flushing write queue"));
078            // ditto
079            private final Runnable scheduledWriter =
080                    new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "scheduled write out, flushing write queue"));
082            // ditto
083            private final Runnable shutdownWriter =
084                    new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "rice is shutting down, flushing write queue"));
086            private int getExecutionIntervalSeconds() {
087                    final String prop = kualiConfigurationService.getPropertyString(EXEC_INTERVAL_SECS);
088                    try {
089                            return Integer.valueOf(prop).intValue();
090                    } catch (NumberFormatException e) {
091                            return EXECUTION_INTERVAL_SECONDS_DEFAULT;
092                    }
093            }
095            private int getMaxWriteQueueSize() {
096                    final String prop = kualiConfigurationService.getPropertyString(MAX_WRITE_QUEUE_SIZE);
097                    try {
098                            return Integer.valueOf(prop).intValue();
099                    } catch (NumberFormatException e) {
100                            return MAX_WRITE_QUEUE_SIZE_DEFAULT;
101                    }
102            }
104            @Override
105            public EntityDefault getEntityDefaultInfoFromArchive( String entityId ) {
106            Map<String,String> criteria = new HashMap<String, String>(1);
107            criteria.put(KimConstants.PrimaryKeyConstants.ENTITY_ID, entityId);
108            KimEntityDefaultInfoCacheImpl cachedValue = getBusinessObjectService().findByPrimaryKey(KimEntityDefaultInfoCacheImpl.class, criteria);
109            return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
110        }
112        @Override
113            public EntityDefault getEntityDefaultInfoFromArchiveByPrincipalId( String principalId ) {
114            Map<String,String> criteria = new HashMap<String, String>(1);
115            criteria.put("principalId", principalId);
116            KimEntityDefaultInfoCacheImpl cachedValue = getBusinessObjectService().findByPrimaryKey(KimEntityDefaultInfoCacheImpl.class, criteria);
117            return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
118        }
120        @Override
121            public EntityDefault getEntityDefaultInfoFromArchiveByPrincipalName( String principalName ) {
122            Map<String,String> criteria = new HashMap<String, String>(1);
123            criteria.put("principalName", principalName);
124            Collection<KimEntityDefaultInfoCacheImpl> entities = getBusinessObjectService().findMatching(KimEntityDefaultInfoCacheImpl.class, criteria);
125            return (entities == null || entities.size() == 0) ? null : entities.iterator().next().convertCacheToEntityDefaultInfo();
126        }
128        @Override
129            public void saveDefaultInfoToArchive( EntityDefault entity ) {
130            // if the max size has been reached, schedule now
131            if (getMaxWriteQueueSize() <= writeQueue.offerAndGetSize(entity) /* <- this enqueues the KEDI */ &&
132                            writer.requestSubmit()) {
133                    KSBServiceLocator.getThreadPool().execute(maxQueueSizeExceededWriter);
134            }
135        }
137            public BusinessObjectService getBusinessObjectService() {
138                    return this.businessObjectService;
139            }
141            public void setBusinessObjectService(BusinessObjectService businessObjectService) {
142                    this.businessObjectService = businessObjectService;
143            }
145            public ConfigurationService getKualiConfigurationService() {
146                    return this.kualiConfigurationService;
147            }
149            public void setKualiConfigurationService(
150                            ConfigurationService kualiConfigurationService) {
151                    this.kualiConfigurationService = kualiConfigurationService;
152            }
154        /** schedule the writer on the KSB scheduled pool. */
155            @Override
156            public void afterPropertiesSet() throws Exception {
157                    LOG.info("scheduling writer...");
158                    KSBServiceLocator.getScheduledPool().scheduleAtFixedRate(scheduledWriter,
159                                    getExecutionIntervalSeconds(), getExecutionIntervalSeconds(), TimeUnit.SECONDS);
160            }
162            /** flush the write queue immediately. */
163            @Override
164            public void destroy() throws Exception {
165                    KSBServiceLocator.getThreadPool().execute(shutdownWriter);
166            }
168            /**
169             * store the person to the database, but do this an alternate thread to
170             * prevent transaction issues since this service is non-transactional
171             *
172             * @author Kuali Rice Team (rice.collab@kuali.org)
173             *
174             */
175            private class EntityArchiveWriter implements Callable {
177                    // flag used to prevent multiple processes from being submitted at once
178                    AtomicBoolean currentlySubmitted = new AtomicBoolean(false);
180                    private final Comparator<Comparable> nullSafeComparator = new Comparator<Comparable>() {
181                            @Override
182                            public int compare(Comparable i1, Comparable i2) {
183                                    if (i1 != null && i2 != null) {
184                                            return i1.compareTo(i2);
185                                    } else if (i1 == null) {
186                                            if (i2 == null) {
187                                                    return 0;
188                                            } else {
189                                                    return -1;
190                                            }
191                                    } else { // if (entityId2 == null) {
192                                            return 1;
193                                    }
194                            };
195                    };
197                    /**
198                     * Comparator that attempts to impose a total ordering on EntityDefault instances
199                     */
200                    private final Comparator<EntityDefault> kediComparator = new Comparator<EntityDefault>() {
201                            /**
202                             * compares by entityId value
203                             * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
204                             */
205                            @Override
206                            public int compare(EntityDefault o1, EntityDefault o2) {
207                                    String entityId1 = (o1 == null) ? null : o1.getEntityId();
208                                    String entityId2 = (o2 == null) ? null : o2.getEntityId();
210                                    int result = nullSafeComparator.compare(entityId1, entityId2);
212                                    if (result == 0) {
213                                            result = getPrincipalIdsString(o1).compareTo(getPrincipalIdsString(o2));
214                                    }
216                                    return result;
217                            }
219                            /**
220                             * This method builds a newline delimited String containing the identity's principal IDs in sorted order
221                             *
222                             * @param entity
223                             * @return
224                             */
225                            private String getPrincipalIdsString(EntityDefault entity) {
226                                    String result = "";
227                                    if (entity != null) {
228                                            List<Principal> principals = entity.getPrincipals();
229                                            if (principals != null) {
230                                                    if (principals.size() == 1) { // one
231                                                            result = principals.get(0).getPrincipalId();
232                                                    } else { // multiple
233                                                            String [] ids = new String [principals.size()];
234                                                            int insertIndex = 0;
235                                                            for (Principal principal : principals) {
236                                                                    ids[insertIndex++] = principal.getPrincipalId();
237                                                            }
238                                                            Arrays.sort(ids);
239                                                            result = StringUtils.join(ids, "\n");
240                                                    }
241                                            }
242                                    }
243                                    return result;
244                            }
245                    };
247                    public boolean requestSubmit() {
248                            return currentlySubmitted.compareAndSet(false, true);
249                    }
251                    /**
252                     * Call that tries to flush the write queue.
253                     * @see Callable#call()
254                     */
255                    @Override
256                    public Object call() {
257                            try {
258                                    // the strategy is to grab chunks of entities, dedupe & sort them, and insert them in a big
259                                    // batch to reduce transaction overhead.  Sorting is done so insertion order is guaranteed, which
260                                    // prevents deadlocks between concurrent writers to the database.
261                                    PlatformTransactionManager transactionManager = KRADServiceLocatorInternal.getTransactionManager();
262                                    TransactionTemplate template = new TransactionTemplate(transactionManager);
263                                    template.execute(new TransactionCallback() {
264                                            @Override
265                                            public Object doInTransaction(TransactionStatus status) {
266                                                    EntityDefault entity = null;
267                                                    ArrayList<EntityDefault> entitiesToInsert = new ArrayList<EntityDefault>(getMaxWriteQueueSize());
268                                                    Set<String> deduper = new HashSet<String>(getMaxWriteQueueSize());
270                                                    // order is important in this conditional so that elements aren't dequeued and then ignored
271                                                    while (entitiesToInsert.size() < getMaxWriteQueueSize() && null != (entity = writeQueue.poll())) {
272                                                            if (deduper.add(entity.getEntityId())) {
273                                                                    entitiesToInsert.add(entity);
274                                                            }
275                                                    }
277                                                    Collections.sort(entitiesToInsert, kediComparator);
279                                                    for (EntityDefault entityToInsert : entitiesToInsert) {
280                                                            getBusinessObjectService().save( new KimEntityDefaultInfoCacheImpl( entityToInsert ) );
281                                                    }
282                                                    return null;
283                                            }
284                                    });
285                            } finally { // make sure our running flag is unset, otherwise we'll never run again
286                                    currentlySubmitted.compareAndSet(true, false);
287                            }
289                            return Boolean.TRUE;
290                    }
291            }
293            /**
294             * A class encapsulating a {@link ConcurrentLinkedQueue} and an {@link AtomicInteger} to
295             * provide fast offer(enqueue)/poll(dequeue) and size checking.  Size may be approximate due to concurrent
296             * activity, but for our purposes that is fine.
297             *
298             * @author Kuali Rice Team (rice.collab@kuali.org)
299             *
300             */
301            private static class WriteQueue {
302                    AtomicInteger writeQueueSize = new AtomicInteger(0);
303                    ConcurrentLinkedQueue<EntityDefault> queue = new ConcurrentLinkedQueue<EntityDefault>();
305                    public int offerAndGetSize(EntityDefault entity) {
306                            queue.add(entity);
307                            return writeQueueSize.incrementAndGet();
308                    }
310                    private EntityDefault poll() {
311                            EntityDefault result = queue.poll();
312                            if (result != null) { writeQueueSize.decrementAndGet(); }
313                            return result;
314                    }
315            }
317            /**
318             * decorator for a callable to log a message before it is executed
319             *
320             * @author Kuali Rice Team (rice.collab@kuali.org)
321             *
322             */
323            private static class PreLogCallableWrapper<A> implements Callable<A> {
325                    private final Callable inner;
326                    private final Level level;
327                    private final String message;
329                    public PreLogCallableWrapper(Callable inner, Level level, String message) {
330                            this.inner = inner;
331                            this.level = level;
332                            this.message = message;
333                    }
335                    /**
336                     * logs the message then calls the inner Callable
337                     * 
338                     * @see java.util.concurrent.Callable#call()
339                     */
340                    @Override
341                    @SuppressWarnings("unchecked")
342                    public A call() throws Exception {
343                            LOG.log(level, message);
344                            return (A)inner.call();
345                    }
346            }
348            /**
349             * Adapts a Callable to be Runnable
350             *
351             * @author Kuali Rice Team (rice.collab@kuali.org)
352             *
353             */
354            private static class CallableAdapter implements Runnable {
356                    private final Callable callable;
358                    public CallableAdapter(Callable callable) {
359                            this.callable = callable;
360                    }
362                    @Override
363                    public void run() {
364                            try {
365                                    callable.call();
366                            } catch (Exception e) {
367                                    throw new RuntimeException(e);
368                            }
369                    }
370            }
371    }