001    /*
002     * Copyright 2007-2009 The Kuali Foundation
003     *
004     * Licensed under the Educational Community License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.opensource.org/licenses/ecl2.php
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package org.kuali.rice.kim.service.impl;
017    
018    import java.util.ArrayList;
019    import java.util.Arrays;
020    import java.util.Collection;
021    import java.util.Collections;
022    import java.util.Comparator;
023    import java.util.HashMap;
024    import java.util.HashSet;
025    import java.util.List;
026    import java.util.Map;
027    import java.util.Set;
028    import java.util.concurrent.Callable;
029    import java.util.concurrent.ConcurrentLinkedQueue;
030    import java.util.concurrent.TimeUnit;
031    import java.util.concurrent.atomic.AtomicBoolean;
032    import java.util.concurrent.atomic.AtomicInteger;
033    
034    import org.apache.commons.lang.StringUtils;
035    import org.apache.log4j.Level;
036    import org.apache.log4j.Logger;
037    import org.kuali.rice.core.config.ConfigContext;
038    import org.kuali.rice.core.config.RiceConfigurer;
039    import org.kuali.rice.core.config.event.AfterStartEvent;
040    import org.kuali.rice.core.config.event.BeforeStopEvent;
041    import org.kuali.rice.core.config.event.RiceConfigEvent;
042    import org.kuali.rice.core.config.event.RiceConfigEventListener;
043    import org.kuali.rice.core.util.RiceConstants;
044    import org.kuali.rice.kim.bo.entity.dto.KimEntityDefaultInfo;
045    import org.kuali.rice.kim.bo.entity.dto.KimPrincipalInfo;
046    import org.kuali.rice.kim.bo.entity.impl.KimEntityDefaultInfoCacheImpl;
047    import org.kuali.rice.kim.service.IdentityArchiveService;
048    import org.kuali.rice.kim.util.KimConstants;
049    import org.kuali.rice.kns.service.BusinessObjectService;
050    import org.kuali.rice.kns.service.KNSServiceLocator;
051    import org.kuali.rice.ksb.service.KSBServiceLocator;
052    import org.springframework.transaction.PlatformTransactionManager;
053    import org.springframework.transaction.TransactionStatus;
054    import org.springframework.transaction.support.TransactionCallback;
055    import org.springframework.transaction.support.TransactionTemplate;
056    
057    
058    /**
059     * This is the default implementation for the IdentityArchiveService.
060     * @see IdentityArchiveService
061     * @author Kuali Rice Team (rice.collab@kuali.org)
062     *
063     */
064    public class IdentityArchiveServiceImpl implements IdentityArchiveService, RiceConfigEventListener {
065            private static final Logger LOG = Logger.getLogger( IdentityArchiveServiceImpl.class );
066    
067            private BusinessObjectService businessObjectService;
068    
069            private static final String EXEC_INTERVAL_SECS = "kim.identityArchiveServiceImpl.executionIntervalSeconds";
070            private static final String MAX_WRITE_QUEUE_SIZE = "kim.identityArchiveServiceImpl.maxWriteQueueSize";
071    
072            private int executionIntervalSeconds = 600; // by default, flush the write queue this often
073            private int maxWriteQueueSize = 300; // cache this many KEDI's before forcing write
074            private final WriteQueue writeQueue = new WriteQueue();
075            private final EntityArchiveWriter writer = new EntityArchiveWriter();
076    
077            // all this ceremony just decorates the writer so it logs a message first, and converts the Callable to Runnable
078            private final Runnable maxQueueSizeExceededWriter =
079                    new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "max size exceeded, flushing write queue"));
080    
081            // ditto
082            private final Runnable scheduledWriter =
083                    new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "scheduled write out, flushing write queue"));
084    
085            // ditto
086            private final Runnable shutdownWriter =
087                    new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "rice is shutting down, flushing write queue"));
088    
089            public IdentityArchiveServiceImpl(Integer executionIntervalSeconds, Integer maxWriteQueueSize) {
090                    // register for RiceConfigEventS
091                    RiceConfigurer rice =
092                            (RiceConfigurer)ConfigContext.getCurrentContextConfig().getObject( RiceConstants.RICE_CONFIGURER_CONFIG_NAME );
093                    LOG.debug("registering for events...");
094                    rice.getKimConfigurer().registerConfigEventListener(this);
095    
096                    if (executionIntervalSeconds != null) {
097                            this.executionIntervalSeconds = executionIntervalSeconds;
098                    }
099    
100                    if (maxWriteQueueSize != null) {
101                            this.maxWriteQueueSize = maxWriteQueueSize;
102                    }
103            }
104    
105            protected BusinessObjectService getBusinessObjectService() {
106                    if ( businessObjectService == null ) {
107                            businessObjectService = KNSServiceLocator.getBusinessObjectService();
108                    }
109                    return businessObjectService;
110            }
111    
112            public KimEntityDefaultInfo getEntityDefaultInfoFromArchive( String entityId ) {
113            Map<String,String> criteria = new HashMap<String, String>(1);
114            criteria.put(KimConstants.PrimaryKeyConstants.ENTITY_ID, entityId);
115            KimEntityDefaultInfoCacheImpl cachedValue = (KimEntityDefaultInfoCacheImpl)getBusinessObjectService().findByPrimaryKey(KimEntityDefaultInfoCacheImpl.class, criteria);
116            return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
117        }
118    
119        public KimEntityDefaultInfo getEntityDefaultInfoFromArchiveByPrincipalId( String principalId ) {
120            Map<String,String> criteria = new HashMap<String, String>(1);
121            criteria.put("principalId", principalId);
122            KimEntityDefaultInfoCacheImpl cachedValue = (KimEntityDefaultInfoCacheImpl)getBusinessObjectService().findByPrimaryKey(KimEntityDefaultInfoCacheImpl.class, criteria);
123            return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
124        }
125    
126        @SuppressWarnings("unchecked")
127            public KimEntityDefaultInfo getEntityDefaultInfoFromArchiveByPrincipalName( String principalName ) {
128            Map<String,String> criteria = new HashMap<String, String>(1);
129            criteria.put("principalName", principalName);
130            Collection<KimEntityDefaultInfoCacheImpl> entities = getBusinessObjectService().findMatching(KimEntityDefaultInfoCacheImpl.class, criteria);
131            return (entities == null || entities.size() == 0) ? null : entities.iterator().next().convertCacheToEntityDefaultInfo();
132        }
133    
134        public void saveDefaultInfoToArchive( KimEntityDefaultInfo entity ) {
135            // if the max size has been reached, schedule now
136            if (maxWriteQueueSize <= writeQueue.offerAndGetSize(entity) /* <- this enqueues the KEDI */ &&
137                            writer.requestSubmit()) {
138                    KSBServiceLocator.getThreadPool().execute(maxQueueSizeExceededWriter);
139            }
140        }
141    
142        /**
143         * <h4>On events:</h4>
144         * <p>{@link AfterStartEvent}: schedule the writer on the KSB scheduled pool
145         * <p>{@link BeforeStopEvent}: flush the write queue immediately
146         *
147         * @see org.springframework.context.ApplicationListener#onApplicationEvent(org.springframework.context.ApplicationEvent)
148         */
149        public void onEvent(final RiceConfigEvent event) {
150            if (event instanceof AfterStartEvent) {
151                    // on startup, schedule this to run
152                    LOG.info("scheduling writer...");
153                    KSBServiceLocator.getScheduledPool().scheduleAtFixedRate(scheduledWriter,
154                                    executionIntervalSeconds, executionIntervalSeconds, TimeUnit.SECONDS);
155            } else if (event instanceof BeforeStopEvent) {
156                    KSBServiceLocator.getThreadPool().execute(shutdownWriter);
157            }
158        }
159    
160            /**
161             * store the person to the database, but do this an alternate thread to
162             * prevent transaction issues since this service is non-transactional
163             *
164             * @author Kuali Rice Team (rice.collab@kuali.org)
165             *
166             */
167            private class EntityArchiveWriter implements Callable {
168    
169                    // flag used to prevent multiple processes from being submitted at once
170                    AtomicBoolean currentlySubmitted = new AtomicBoolean(false);
171    
172                    private final Comparator<Comparable> nullSafeComparator = new Comparator<Comparable>() {
173                            public int compare(Comparable i1, Comparable i2) {
174                                    if (i1 != null && i2 != null) {
175                                            return i1.compareTo(i2);
176                                    } else if (i1 == null) {
177                                            if (i2 == null) {
178                                                    return 0;
179                                            } else {
180                                                    return -1;
181                                            }
182                                    } else { // if (entityId2 == null) {
183                                            return 1;
184                                    }
185                            };
186                    };
187    
188                    /**
189                     * Comparator that attempts to impose a total ordering on KimEntityDefaultInfo instances
190                     */
191                    private final Comparator<KimEntityDefaultInfo> kediComparator = new Comparator<KimEntityDefaultInfo>() {
192                            /**
193                             * compares by entityId value
194                             * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
195                             */
196                            public int compare(KimEntityDefaultInfo o1, KimEntityDefaultInfo o2) {
197                                    String entityId1 = (o1 == null) ? null : o1.getEntityId();
198                                    String entityId2 = (o2 == null) ? null : o2.getEntityId();
199    
200                                    int result = nullSafeComparator.compare(entityId1, entityId2);
201    
202                                    if (result == 0) {
203                                            result = getPrincipalIdsString(o1).compareTo(getPrincipalIdsString(o2));
204                                    }
205    
206                                    return result;
207                            }
208    
209                            /**
210                             * This method builds a newline delimited String containing the entity's principal IDs in sorted order
211                             *
212                             * @param entity
213                             * @return
214                             */
215                            private String getPrincipalIdsString(KimEntityDefaultInfo entity) {
216                                    String result = "";
217                                    if (entity != null) {
218                                            List<KimPrincipalInfo> principals = entity.getPrincipals();
219                                            if (principals != null) {
220                                                    if (principals.size() == 1) { // one
221                                                            result = principals.get(0).getPrincipalId();
222                                                    } else { // multiple
223                                                            String [] ids = new String [principals.size()];
224                                                            int insertIndex = 0;
225                                                            for (KimPrincipalInfo principal : principals) {
226                                                                    ids[insertIndex++] = principal.getPrincipalId();
227                                                            }
228                                                            Arrays.sort(ids);
229                                                            result = StringUtils.join(ids, "\n");
230                                                    }
231                                            }
232                                    }
233                                    return result;
234                            }
235                    };
236    
237                    public boolean requestSubmit() {
238                            return currentlySubmitted.compareAndSet(false, true);
239                    }
240    
241                    /**
242                     * Call that tries to flush the write queue.
243                     * @see Callable#call()
244                     */
245                    public Object call() {
246                            try {
247                                    // the strategy is to grab chunks of entities, dedupe & sort them, and insert them in a big
248                                    // batch to reduce transaction overhead.  Sorting is done so insertion order is guaranteed, which
249                                    // prevents deadlocks between concurrent writers to the database.
250                                    PlatformTransactionManager transactionManager = KNSServiceLocator.getTransactionManager();
251                                    TransactionTemplate template = new TransactionTemplate(transactionManager);
252                                    template.execute(new TransactionCallback() {
253                                            public Object doInTransaction(TransactionStatus status) {
254                                                    KimEntityDefaultInfo entity = null;
255                                                    ArrayList<KimEntityDefaultInfo> entitiesToInsert = new ArrayList<KimEntityDefaultInfo>(maxWriteQueueSize);
256                                                    Set<String> deduper = new HashSet<String>(maxWriteQueueSize);
257    
258                                                    // order is important in this conditional so that elements aren't dequeued and then ignored
259                                                    while (entitiesToInsert.size() < maxWriteQueueSize && null != (entity = writeQueue.poll())) {
260                                                            if (deduper.add(entity.getEntityId())) entitiesToInsert.add(entity);
261                                                    }
262    
263                                                    Collections.sort(entitiesToInsert, kediComparator);
264    
265                                                    for (KimEntityDefaultInfo entityToInsert : entitiesToInsert) {
266                                                            getBusinessObjectService().save( new KimEntityDefaultInfoCacheImpl( entityToInsert ) );
267                                                    }
268                                                    return null;
269                                            }
270                                    });
271                            } finally { // make sure our running flag is unset, otherwise we'll never run again
272                                    currentlySubmitted.compareAndSet(true, false);
273                            }
274    
275                            return Boolean.TRUE;
276                    }
277            }
278    
279            /**
280             * A class encapsulating a {@link ConcurrentLinkedQueue} and an {@link AtomicInteger} to
281             * provide fast offer(enqueue)/poll(dequeue) and size checking.  Size may be approximate due to concurrent
282             * activity, but for our purposes that is fine.
283             *
284             * @author Kuali Rice Team (rice.collab@kuali.org)
285             *
286             */
287            private static class WriteQueue {
288                    AtomicInteger writeQueueSize = new AtomicInteger(0);
289                    ConcurrentLinkedQueue<KimEntityDefaultInfo> queue = new ConcurrentLinkedQueue<KimEntityDefaultInfo>();
290    
291                    public int offerAndGetSize(KimEntityDefaultInfo entity) {
292                            queue.add(entity);
293                            return writeQueueSize.incrementAndGet();
294                    }
295    
296                    private KimEntityDefaultInfo poll() {
297                            KimEntityDefaultInfo result = queue.poll();
298                            if (result != null) { writeQueueSize.decrementAndGet(); }
299                            return result;
300                    }
301    
302                    private int getSize() {
303                            return writeQueueSize.get();
304                    }
305            }
306    
307            /**
308             * decorator for a callable to log a message before it is executed
309             *
310             * @author Kuali Rice Team (rice.collab@kuali.org)
311             *
312             */
313            private static class PreLogCallableWrapper<A> implements Callable<A> {
314    
315                    private final Callable inner;
316                    private final Level level;
317                    private final String message;
318    
319                    public PreLogCallableWrapper(Callable inner, Level level, String message) {
320                            this.inner = inner;
321                            this.level = level;
322                            this.message = message;
323                    }
324    
325                    /**
326                     * logs the message then calls the inner Callable
327                     *
328                     * @see java.util.concurrent.Callable#call()
329                     */
330                    @SuppressWarnings("unchecked")
331                    public A call() throws Exception {
332                            LOG.log(level, message);
333                            return (A)inner.call();
334                    }
335            }
336    
337            /**
338             * Adapts a Callable to be Runnable
339             *
340             * @author Kuali Rice Team (rice.collab@kuali.org)
341             *
342             */
343            private static class CallableAdapter implements Runnable {
344    
345                    private final Callable callable;
346    
347                    public CallableAdapter(Callable callable) {
348                            this.callable = callable;
349                    }
350    
351                    public void run() {
352                            try {
353                                    callable.call();
354                            } catch (Exception e) {
355                                    throw new RuntimeException(e);
356                            }
357                    }
358            }
359    }