001/**
002 * Copyright 2005-2016 The Kuali Foundation
003 *
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/ecl2.php
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.kuali.rice.kim.service.impl;
017
018import org.apache.commons.lang.StringUtils;
019import org.apache.log4j.Level;
020import org.apache.log4j.Logger;
021import org.kuali.rice.core.api.config.property.ConfigurationService;
022import org.kuali.rice.kim.api.KimConstants;
023import org.kuali.rice.kim.impl.identity.IdentityArchiveService;
024import org.kuali.rice.kim.api.identity.entity.EntityDefault;
025import org.kuali.rice.kim.api.identity.principal.Principal;
026import org.kuali.rice.kim.impl.identity.EntityDefaultInfoCacheBo;
027import org.kuali.rice.krad.service.BusinessObjectService;
028import org.kuali.rice.ksb.service.KSBServiceLocator;
029import org.springframework.beans.factory.DisposableBean;
030import org.springframework.beans.factory.InitializingBean;
031import org.springframework.transaction.PlatformTransactionManager;
032import org.springframework.transaction.TransactionStatus;
033import org.springframework.transaction.support.TransactionCallback;
034import org.springframework.transaction.support.TransactionTemplate;
035
036import java.util.ArrayList;
037import java.util.Arrays;
038import java.util.Collection;
039import java.util.Collections;
040import java.util.Comparator;
041import java.util.HashMap;
042import java.util.HashSet;
043import java.util.List;
044import java.util.Map;
045import java.util.Set;
046import java.util.concurrent.Callable;
047import java.util.concurrent.ConcurrentLinkedQueue;
048import java.util.concurrent.TimeUnit;
049import java.util.concurrent.atomic.AtomicBoolean;
050import java.util.concurrent.atomic.AtomicInteger;
051
052/**
053 * This is the default implementation for the IdentityArchiveService.
054 * @see IdentityArchiveService
055 *
056 * @author Kuali Rice Team (rice.collab@kuali.org)
057 */
058public class IdentityArchiveServiceImpl implements IdentityArchiveService, InitializingBean, DisposableBean {
059        private static final Logger LOG = Logger.getLogger( IdentityArchiveServiceImpl.class );
060
061        private BusinessObjectService businessObjectService;
062        private ConfigurationService kualiConfigurationService;
063    private PlatformTransactionManager transactionManager;
064
065        private static final String EXEC_INTERVAL_SECS = "kim.identityArchiveServiceImpl.executionIntervalSeconds";
066        private static final String MAX_WRITE_QUEUE_SIZE = "kim.identityArchiveServiceImpl.maxWriteQueueSize";
067        private static final int EXECUTION_INTERVAL_SECONDS_DEFAULT = 600; // by default, flush the write queue this often
068        private static final int MAX_WRITE_QUEUE_SIZE_DEFAULT = 300; // cache this many KEDI's before forcing write
069
070        private final WriteQueue writeQueue = new WriteQueue();
071        private final EntityArchiveWriter writer = new EntityArchiveWriter();
072
073        // all this ceremony just decorates the writer so it logs a message first, and converts the Callable to Runnable
074        private final Runnable maxQueueSizeExceededWriter =
075                new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "max size exceeded, flushing write queue"));
076
077        // ditto
078        private final Runnable scheduledWriter =
079                new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "scheduled write out, flushing write queue"));
080
081        // ditto
082        private final Runnable shutdownWriter =
083                new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "rice is shutting down, flushing write queue"));
084        
085        private int getExecutionIntervalSeconds() {
086                final String prop = kualiConfigurationService.getPropertyValueAsString(EXEC_INTERVAL_SECS);
087                try {
088                        return Integer.valueOf(prop).intValue();
089                } catch (NumberFormatException e) {
090                        return EXECUTION_INTERVAL_SECONDS_DEFAULT;
091                }
092        }
093        
094        private int getMaxWriteQueueSize() {
095                final String prop = kualiConfigurationService.getPropertyValueAsString(MAX_WRITE_QUEUE_SIZE);
096                try {
097                        return Integer.valueOf(prop).intValue();
098                } catch (NumberFormatException e) {
099                        return MAX_WRITE_QUEUE_SIZE_DEFAULT;
100                }
101        }
102
103        @Override
104        public EntityDefault getEntityDefaultFromArchive( String entityId ) {
105        if (StringUtils.isBlank(entityId)) {
106            throw new IllegalArgumentException("entityId is blank");
107        }
108
109        Map<String,String> criteria = new HashMap<String, String>(1);
110        criteria.put(KimConstants.PrimaryKeyConstants.SUB_ENTITY_ID, entityId);
111        EntityDefaultInfoCacheBo cachedValue = businessObjectService.findByPrimaryKey(EntityDefaultInfoCacheBo.class, criteria);
112        return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
113    }
114
115    @Override
116        public EntityDefault getEntityDefaultFromArchiveByPrincipalId(String principalId) {
117        if (StringUtils.isBlank(principalId)) {
118            throw new IllegalArgumentException("principalId is blank");
119        }
120
121        Map<String,String> criteria = new HashMap<String, String>(1);
122        criteria.put("principalId", principalId);
123        EntityDefaultInfoCacheBo cachedValue = businessObjectService.findByPrimaryKey(EntityDefaultInfoCacheBo.class, criteria);
124        return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
125    }
126
127    @Override
128        public EntityDefault getEntityDefaultFromArchiveByPrincipalName(String principalName) {
129        if (StringUtils.isBlank(principalName)) {
130            throw new IllegalArgumentException("principalName is blank");
131        }
132
133        Map<String,String> criteria = new HashMap<String, String>(1);
134        criteria.put("principalName", principalName);
135        Collection<EntityDefaultInfoCacheBo> entities = businessObjectService.findMatching(EntityDefaultInfoCacheBo.class, criteria);
136        return (entities == null || entities.isEmpty()) ? null : entities.iterator().next().convertCacheToEntityDefaultInfo();
137    }
138    
139    @Override
140    public EntityDefault getEntityDefaultFromArchiveByEmployeeId(String employeeId) {
141        if (StringUtils.isBlank(employeeId)) {
142            throw new IllegalArgumentException("employeeId is blank");
143        }
144        Map<String,String> criteria = new HashMap<String, String>(1);
145        criteria.put("employeeId", employeeId);
146        Collection<EntityDefaultInfoCacheBo> entities = businessObjectService.findMatching(EntityDefaultInfoCacheBo.class, criteria);
147        return (entities == null || entities.isEmpty()) ? null : entities.iterator().next().convertCacheToEntityDefaultInfo();
148    }
149    
150    @Override
151        public void saveEntityDefaultToArchive(EntityDefault entity) {
152        if (entity == null) {
153            throw new IllegalArgumentException("entity is blank");
154        }
155
156        // if the max size has been reached, schedule now
157        if (getMaxWriteQueueSize() <= writeQueue.offerAndGetSize(entity) /* <- this enqueues the KEDI */ &&
158                        writer.requestSubmit()) {
159                KSBServiceLocator.getThreadPool().execute(maxQueueSizeExceededWriter);
160        }
161    }
162
163    @Override
164    public void flushToArchive() {
165        writer.call();
166    }
167
168
169        public void setBusinessObjectService(BusinessObjectService businessObjectService) {
170                this.businessObjectService = businessObjectService;
171        }
172
173        public void setKualiConfigurationService(
174                        ConfigurationService kualiConfigurationService) {
175                this.kualiConfigurationService = kualiConfigurationService;
176        }
177
178    public void setTransactionManager(PlatformTransactionManager txMgr) {
179        this.transactionManager = txMgr;
180    }
181    
182    /** schedule the writer on the KSB scheduled pool. */
183        @Override
184        public void afterPropertiesSet() throws Exception {
185                LOG.info("scheduling writer...");
186                KSBServiceLocator.getScheduledPool().scheduleAtFixedRate(scheduledWriter,
187                                getExecutionIntervalSeconds(), getExecutionIntervalSeconds(), TimeUnit.SECONDS);
188        }
189
190        /** flush the write queue immediately. */
191        @Override
192        public void destroy() throws Exception {
193                KSBServiceLocator.getThreadPool().execute(shutdownWriter);
194        }
195
196        /**
197         * store the person to the database, but do this an alternate thread to
198         * prevent transaction issues since this service is non-transactional
199         *
200         * @author Kuali Rice Team (rice.collab@kuali.org)
201         *
202         */
203        private class EntityArchiveWriter implements Callable {
204
205                // flag used to prevent multiple processes from being submitted at once
206                AtomicBoolean currentlySubmitted = new AtomicBoolean(false);
207
208                private final Comparator<Comparable> nullSafeComparator = new Comparator<Comparable>() {
209                        @Override
210                        public int compare(Comparable i1, Comparable i2) {
211                                if (i1 != null && i2 != null) {
212                                        return i1.compareTo(i2);
213                                } else if (i1 == null) {
214                                        if (i2 == null) {
215                                                return 0;
216                                        } else {
217                                                return -1;
218                                        }
219                                } else { // if (entityId2 == null) {
220                                        return 1;
221                                }
222                        };
223                };
224
225                /**
226                 * Comparator that attempts to impose a total ordering on EntityDefault instances
227                 */
228                private final Comparator<EntityDefault> kediComparator = new Comparator<EntityDefault>() {
229                        /**
230                         * compares by entityId value
231                         * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
232                         */
233                        @Override
234                        public int compare(EntityDefault o1, EntityDefault o2) {
235                                String entityId1 = (o1 == null) ? null : o1.getEntityId();
236                                String entityId2 = (o2 == null) ? null : o2.getEntityId();
237
238                                int result = nullSafeComparator.compare(entityId1, entityId2);
239
240                                if (result == 0) {
241                                        result = getPrincipalIdsString(o1).compareTo(getPrincipalIdsString(o2));
242                                }
243
244                                return result;
245                        }
246
247                        /**
248                         * This method builds a newline delimited String containing the identity's principal IDs in sorted order
249                         *
250                         * @param entity
251                         * @return
252                         */
253                        private String getPrincipalIdsString(EntityDefault entity) {
254                                String result = "";
255                                if (entity != null) {
256                                        List<Principal> principals = entity.getPrincipals();
257                                        if (principals != null) {
258                                                if (principals.size() == 1) { // one
259                                                        result = principals.get(0).getPrincipalId();
260                                                } else { // multiple
261                                                        String [] ids = new String [principals.size()];
262                                                        int insertIndex = 0;
263                                                        for (Principal principal : principals) {
264                                                                ids[insertIndex++] = principal.getPrincipalId();
265                                                        }
266                                                        Arrays.sort(ids);
267                                                        result = StringUtils.join(ids, "\n");
268                                                }
269                                        }
270                                }
271                                return result;
272                        }
273                };
274
275                public boolean requestSubmit() {
276                        return currentlySubmitted.compareAndSet(false, true);
277                }
278
279                /**
280                 * Call that tries to flush the write queue.
281                 * @see Callable#call()
282                 */
283                @Override
284                public Object call() {
285                        try {
286                                // the strategy is to grab chunks of entities, dedupe & sort them, and insert them in a big
287                                // batch to reduce transaction overhead.  Sorting is done so insertion order is guaranteed, which
288                                // prevents deadlocks between concurrent writers to the database.
289                                TransactionTemplate template = new TransactionTemplate(transactionManager);
290                                template.execute(new TransactionCallback() {
291                                        @Override
292                                        public Object doInTransaction(TransactionStatus status) {
293                                                EntityDefault entity = null;
294                                                ArrayList<EntityDefault> entitiesToInsert = new ArrayList<EntityDefault>(getMaxWriteQueueSize());
295                                                Set<String> deduper = new HashSet<String>(getMaxWriteQueueSize());
296
297                                                // order is important in this conditional so that elements aren't dequeued and then ignored
298                                                while (entitiesToInsert.size() < getMaxWriteQueueSize() && null != (entity = writeQueue.poll())) {
299                                                        if (deduper.add(entity.getEntityId())) {
300                                                                entitiesToInsert.add(entity);
301                                                        }
302                                                }
303
304                                                Collections.sort(entitiesToInsert, kediComparator);
305                        List<EntityDefaultInfoCacheBo> entityCache = new ArrayList<EntityDefaultInfoCacheBo>(entitiesToInsert.size());
306                        for (EntityDefault entityToInsert : entitiesToInsert) {
307                            entityCache.add(new EntityDefaultInfoCacheBo( entityToInsert ));
308                        }
309                        businessObjectService.save(entityCache);
310                                                //for (EntityDefault entityToInsert : entitiesToInsert) {
311                                                //      businessObjectService.save( new EntityDefaultInfoCacheBo( entityToInsert ) );
312                                                //}
313                                                return null;
314                                        }
315                                });
316                        } finally { // make sure our running flag is unset, otherwise we'll never run again
317                                currentlySubmitted.compareAndSet(true, false);
318                        }
319
320                        return Boolean.TRUE;
321                }
322        }
323
324        /**
325         * A class encapsulating a {@link ConcurrentLinkedQueue} and an {@link AtomicInteger} to
326         * provide fast offer(enqueue)/poll(dequeue) and size checking.  Size may be approximate due to concurrent
327         * activity, but for our purposes that is fine.
328         *
329         * @author Kuali Rice Team (rice.collab@kuali.org)
330         *
331         */
332        private static class WriteQueue {
333                AtomicInteger writeQueueSize = new AtomicInteger(0);
334                ConcurrentLinkedQueue<EntityDefault> queue = new ConcurrentLinkedQueue<EntityDefault>();
335
336                public int offerAndGetSize(EntityDefault entity) {
337                        queue.add(entity);
338                        return writeQueueSize.incrementAndGet();
339                }
340
341                private EntityDefault poll() {
342                        EntityDefault result = queue.poll();
343                        if (result != null) { writeQueueSize.decrementAndGet(); }
344                        return result;
345                }
346        }
347
348        /**
349         * decorator for a callable to log a message before it is executed
350         *
351         * @author Kuali Rice Team (rice.collab@kuali.org)
352         *
353         */
354        private static class PreLogCallableWrapper<A> implements Callable<A> {
355                
356                private final Callable inner;
357                private final Level level;
358                private final String message;
359
360                public PreLogCallableWrapper(Callable inner, Level level, String message) {
361                        this.inner = inner;
362                        this.level = level;
363                        this.message = message;
364                }
365
366                /**
367                 * logs the message then calls the inner Callable
368                 * 
369                 * @see java.util.concurrent.Callable#call()
370                 */
371                @Override
372                @SuppressWarnings("unchecked")
373                public A call() throws Exception {
374                        LOG.log(level, message);
375                        return (A)inner.call();
376                }
377        }
378
379        /**
380         * Adapts a Callable to be Runnable
381         *
382         * @author Kuali Rice Team (rice.collab@kuali.org)
383         *
384         */
385        private static class CallableAdapter implements Runnable {
386
387                private final Callable callable;
388
389                public CallableAdapter(Callable callable) {
390                        this.callable = callable;
391                }
392
393                @Override
394                public void run() {
395                        try {
396                                callable.call();
397                        } catch (Exception e) {
398                                throw new RuntimeException(e);
399                        }
400                }
401        }
402}