001/**
002 * Copyright 2005-2015 The Kuali Foundation
003 *
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/ecl2.php
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.kuali.rice.kim.service.impl;
017
018import java.util.ArrayList;
019import java.util.Arrays;
020import java.util.Collections;
021import java.util.Comparator;
022import java.util.HashSet;
023import java.util.List;
024import java.util.Set;
025import java.util.concurrent.Callable;
026import java.util.concurrent.ConcurrentLinkedQueue;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.atomic.AtomicBoolean;
029import java.util.concurrent.atomic.AtomicInteger;
030
031import org.apache.commons.lang.StringUtils;
032import org.apache.log4j.Level;
033import org.apache.log4j.Logger;
034import org.kuali.rice.core.api.config.property.ConfigurationService;
035import org.kuali.rice.core.api.criteria.QueryByCriteria;
036import org.kuali.rice.kim.api.KimConstants;
037import org.kuali.rice.kim.api.identity.entity.EntityDefault;
038import org.kuali.rice.kim.api.identity.principal.Principal;
039import org.kuali.rice.kim.impl.identity.EntityDefaultInfoCacheBo;
040import org.kuali.rice.kim.impl.identity.IdentityArchiveService;
041import org.kuali.rice.krad.data.DataObjectService;
042import org.kuali.rice.ksb.service.KSBServiceLocator;
043import org.springframework.beans.factory.DisposableBean;
044import org.springframework.beans.factory.InitializingBean;
045import org.springframework.transaction.PlatformTransactionManager;
046import org.springframework.transaction.TransactionStatus;
047import org.springframework.transaction.support.TransactionCallback;
048import org.springframework.transaction.support.TransactionTemplate;
049
050/**
051 * This is the default implementation for the IdentityArchiveService.
052 * @see IdentityArchiveService
053 *
054 * @author Kuali Rice Team (rice.collab@kuali.org)
055 */
056public class IdentityArchiveServiceImpl implements IdentityArchiveService, InitializingBean, DisposableBean {
057        private static final Logger LOG = Logger.getLogger( IdentityArchiveServiceImpl.class );
058
059        protected DataObjectService dataObjectService;
060        protected ConfigurationService kualiConfigurationService;
061        protected PlatformTransactionManager transactionManager;
062
063        protected static final String EXEC_INTERVAL_SECS = "kim.identityArchiveServiceImpl.executionIntervalSeconds";
064        protected static final String MAX_WRITE_QUEUE_SIZE = "kim.identityArchiveServiceImpl.maxWriteQueueSize";
065        protected static final int EXECUTION_INTERVAL_SECONDS_DEFAULT = 600; // by default, flush the write queue this often
066        protected static final int MAX_WRITE_QUEUE_SIZE_DEFAULT = 300; // cache this many KEDI's before forcing write
067
068        protected final WriteQueue writeQueue = new WriteQueue();
069        protected final EntityArchiveWriter writer = new EntityArchiveWriter();
070
071        // all this ceremony just decorates the writer so it logs a message first, and converts the Callable to Runnable
072        protected final Runnable maxQueueSizeExceededWriter =
073                new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "max size exceeded, flushing write queue"));
074
075        // ditto
076        protected final Runnable scheduledWriter =
077                new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "scheduled write out, flushing write queue"));
078
079        // ditto
080        protected final Runnable shutdownWriter =
081                new CallableAdapter(new PreLogCallableWrapper<Boolean>(writer, Level.DEBUG, "rice is shutting down, flushing write queue"));
082
083        protected int getExecutionIntervalSeconds() {
084                final String prop = kualiConfigurationService.getPropertyValueAsString(EXEC_INTERVAL_SECS);
085                try {
086                        return Integer.valueOf(prop).intValue();
087                } catch (NumberFormatException e) {
088                        return EXECUTION_INTERVAL_SECONDS_DEFAULT;
089                }
090        }
091
092        protected int getMaxWriteQueueSize() {
093                final String prop = kualiConfigurationService.getPropertyValueAsString(MAX_WRITE_QUEUE_SIZE);
094                try {
095                        return Integer.valueOf(prop).intValue();
096                } catch (NumberFormatException e) {
097                        return MAX_WRITE_QUEUE_SIZE_DEFAULT;
098                }
099        }
100
101        @Override
102        public EntityDefault getEntityDefaultFromArchive( String entityId ) {
103        if (StringUtils.isBlank(entityId)) {
104            throw new IllegalArgumentException("entityId is blank");
105        }
106
107        List<EntityDefaultInfoCacheBo> results = dataObjectService.findMatching(EntityDefaultInfoCacheBo.class,
108                QueryByCriteria.Builder.forAttribute(KimConstants.PrimaryKeyConstants.SUB_ENTITY_ID, entityId).build() ).getResults();
109        EntityDefaultInfoCacheBo cachedValue = null;
110        if ( !results.isEmpty() ) {
111            cachedValue = results.get(0);
112        }
113
114        return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
115    }
116
117    @Override
118        public EntityDefault getEntityDefaultFromArchiveByPrincipalId(String principalId) {
119        if (StringUtils.isBlank(principalId)) {
120            throw new IllegalArgumentException("principalId is blank");
121        }
122
123        EntityDefaultInfoCacheBo cachedValue = dataObjectService.find(EntityDefaultInfoCacheBo.class, principalId);
124        return (cachedValue == null) ? null : cachedValue.convertCacheToEntityDefaultInfo();
125    }
126
127    @Override
128        public EntityDefault getEntityDefaultFromArchiveByPrincipalName(String principalName) {
129        if (StringUtils.isBlank(principalName)) {
130            throw new IllegalArgumentException("principalName is blank");
131        }
132
133        List<EntityDefaultInfoCacheBo> entities = dataObjectService.findMatching(EntityDefaultInfoCacheBo.class,
134                QueryByCriteria.Builder.forAttribute("principalName", principalName).build()).getResults();
135        return entities.isEmpty() ? null : entities.get(0).convertCacheToEntityDefaultInfo();
136    }
137
138    @Override
139    public EntityDefault getEntityDefaultFromArchiveByEmployeeId(String employeeId) {
140        if (StringUtils.isBlank(employeeId)) {
141            throw new IllegalArgumentException("employeeId is blank");
142        }
143        List<EntityDefaultInfoCacheBo> entities = dataObjectService.findMatching(EntityDefaultInfoCacheBo.class,
144                QueryByCriteria.Builder.forAttribute("employeeId", employeeId).build()).getResults();
145        return entities.isEmpty() ? null : entities.get(0).convertCacheToEntityDefaultInfo();
146    }
147
148    @Override
149        public void saveEntityDefaultToArchive(EntityDefault entity) {
150        if (entity == null) {
151            throw new IllegalArgumentException("entity is blank");
152        }
153
154        // if the max size has been reached, schedule now
155        if (getMaxWriteQueueSize() <= writeQueue.offerAndGetSize(entity) /* <- this enqueues the KEDI */ &&
156                        writer.requestSubmit()) {
157                KSBServiceLocator.getThreadPool().execute(maxQueueSizeExceededWriter);
158        }
159    }
160
161    @Override
162    public void flushToArchive() {
163        writer.call();
164    }
165
166        public void setKualiConfigurationService(
167                        ConfigurationService kualiConfigurationService) {
168                this.kualiConfigurationService = kualiConfigurationService;
169        }
170
171    public void setTransactionManager(PlatformTransactionManager txMgr) {
172        this.transactionManager = txMgr;
173    }
174
175    /** schedule the writer on the KSB scheduled pool. */
176        @Override
177        public void afterPropertiesSet() throws Exception {
178                LOG.info("scheduling writer...");
179                KSBServiceLocator.getScheduledPool().scheduleAtFixedRate(scheduledWriter,
180                                getExecutionIntervalSeconds(), getExecutionIntervalSeconds(), TimeUnit.SECONDS);
181        }
182
183        /** flush the write queue immediately. */
184        @Override
185        public void destroy() throws Exception {
186                shutdownWriter.run();
187        }
188
189        /**
190         * store the person to the database, but do this an alternate thread to
191         * prevent transaction issues since this service is non-transactional
192         *
193         * @author Kuali Rice Team (rice.collab@kuali.org)
194         *
195         */
196        protected class EntityArchiveWriter implements Callable {
197
198                // flag used to prevent multiple processes from being submitted at once
199                AtomicBoolean currentlySubmitted = new AtomicBoolean(false);
200
201                private final Comparator<Comparable> nullSafeComparator = new Comparator<Comparable>() {
202                        @Override
203                        public int compare(Comparable i1, Comparable i2) {
204                                if (i1 != null && i2 != null) {
205                                        return i1.compareTo(i2);
206                                } else if (i1 == null) {
207                                        if (i2 == null) {
208                                                return 0;
209                                        } else {
210                                                return -1;
211                                        }
212                                } else { // if (entityId2 == null) {
213                                        return 1;
214                                }
215                        };
216                };
217
218                /**
219                 * Comparator that attempts to impose a total ordering on EntityDefault instances
220                 */
221                protected final Comparator<EntityDefault> kediComparator = new Comparator<EntityDefault>() {
222                        /**
223                         * compares by entityId value
224                         * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
225                         */
226                        @Override
227                        public int compare(EntityDefault o1, EntityDefault o2) {
228                                String entityId1 = (o1 == null) ? null : o1.getEntityId();
229                                String entityId2 = (o2 == null) ? null : o2.getEntityId();
230
231                                int result = nullSafeComparator.compare(entityId1, entityId2);
232
233                                if (result == 0) {
234                                        result = getPrincipalIdsString(o1).compareTo(getPrincipalIdsString(o2));
235                                }
236
237                                return result;
238                        }
239
240                        /**
241                         * This method builds a newline delimited String containing the identity's principal IDs in sorted order
242                         *
243                         * @param entity
244                         * @return
245                         */
246                        private String getPrincipalIdsString(EntityDefault entity) {
247                                String result = "";
248                                if (entity != null) {
249                                        List<Principal> principals = entity.getPrincipals();
250                                        if (principals != null) {
251                                                if (principals.size() == 1) { // one
252                                                        result = principals.get(0).getPrincipalId();
253                                                } else { // multiple
254                                                        String [] ids = new String [principals.size()];
255                                                        int insertIndex = 0;
256                                                        for (Principal principal : principals) {
257                                                                ids[insertIndex++] = principal.getPrincipalId();
258                                                        }
259                                                        Arrays.sort(ids);
260                                                        result = StringUtils.join(ids, "\n");
261                                                }
262                                        }
263                                }
264                                return result;
265                        }
266                };
267
268                public boolean requestSubmit() {
269                        return currentlySubmitted.compareAndSet(false, true);
270                }
271
272                /**
273                 * Call that tries to flush the write queue.
274                 * @see Callable#call()
275                 */
276                @Override
277                public Object call() {
278                        try {
279                                // the strategy is to grab chunks of entities, dedupe & sort them, and insert them in a big
280                                // batch to reduce transaction overhead.  Sorting is done so insertion order is guaranteed, which
281                                // prevents deadlocks between concurrent writers to the database.
282                                TransactionTemplate template = new TransactionTemplate(transactionManager);
283                                template.execute(new TransactionCallback() {
284                                        @Override
285                                        public Object doInTransaction(TransactionStatus status) {
286                                                EntityDefault entity = null;
287                                                ArrayList<EntityDefault> entitiesToInsert = new ArrayList<EntityDefault>(getMaxWriteQueueSize());
288                                                Set<String> deduper = new HashSet<String>(getMaxWriteQueueSize());
289
290                                                // order is important in this conditional so that elements aren't dequeued and then ignored
291                                                while (entitiesToInsert.size() < getMaxWriteQueueSize() && null != (entity = writeQueue.poll())) {
292                            //Added an if condition to check whether an entity has a principal
293                            if (entity.getPrincipals().size() > 0 && deduper.add(entity.getEntityId())) {
294                                                                entitiesToInsert.add(entity);
295                                                        }
296                                                }
297
298                                                Collections.sort(entitiesToInsert, kediComparator);
299                                                for (EntityDefault entityToInsert : entitiesToInsert) {
300                                                        dataObjectService.save( new EntityDefaultInfoCacheBo( entityToInsert ) );
301                                                }
302                                                return null;
303                                        }
304                                });
305                        } finally { // make sure our running flag is unset, otherwise we'll never run again
306                                currentlySubmitted.compareAndSet(true, false);
307                        }
308
309                        return Boolean.TRUE;
310                }
311        }
312
313        /**
314         * A class encapsulating a {@link ConcurrentLinkedQueue} and an {@link AtomicInteger} to
315         * provide fast offer(enqueue)/poll(dequeue) and size checking.  Size may be approximate due to concurrent
316         * activity, but for our purposes that is fine.
317         *
318         * @author Kuali Rice Team (rice.collab@kuali.org)
319         *
320         */
321        protected static class WriteQueue {
322                AtomicInteger writeQueueSize = new AtomicInteger(0);
323                ConcurrentLinkedQueue<EntityDefault> queue = new ConcurrentLinkedQueue<EntityDefault>();
324
325                public int offerAndGetSize(EntityDefault entity) {
326                        queue.add(entity);
327                        return writeQueueSize.incrementAndGet();
328                }
329
330                protected EntityDefault poll() {
331                        EntityDefault result = queue.poll();
332                        if (result != null) { writeQueueSize.decrementAndGet(); }
333                        return result;
334                }
335        }
336
337        /**
338         * decorator for a callable to log a message before it is executed
339         *
340         * @author Kuali Rice Team (rice.collab@kuali.org)
341         *
342         */
343        protected static class PreLogCallableWrapper<A> implements Callable<A> {
344
345            protected final Callable inner;
346            protected final Level level;
347            protected final String message;
348
349                public PreLogCallableWrapper(Callable inner, Level level, String message) {
350                        this.inner = inner;
351                        this.level = level;
352                        this.message = message;
353                }
354
355                /**
356                 * logs the message then calls the inner Callable
357                 *
358                 * @see java.util.concurrent.Callable#call()
359                 */
360                @Override
361                @SuppressWarnings("unchecked")
362                public A call() throws Exception {
363                        LOG.log(level, message);
364                        return (A)inner.call();
365                }
366        }
367
368        /**
369         * Adapts a Callable to be Runnable
370         *
371         * @author Kuali Rice Team (rice.collab@kuali.org)
372         *
373         */
374        protected static class CallableAdapter implements Runnable {
375
376                private final Callable callable;
377
378                public CallableAdapter(Callable callable) {
379                        this.callable = callable;
380                }
381
382                @Override
383                public void run() {
384                        try {
385                                callable.call();
386                        } catch (Exception e) {
387                                throw new RuntimeException(e);
388                        }
389                }
390        }
391
392    /**
393     * @param dataObjectService the dataObjectService to set
394     */
395    public void setDataObjectService(DataObjectService dataObjectService) {
396        this.dataObjectService = dataObjectService;
397    }
398}