001    package org.kuali.ole.docstore.transaction;
002    
003    import java.util.ArrayList;
004    import java.util.List;
005    import javax.jcr.Node;
006    import javax.jcr.Session;
007    
008    import org.apache.commons.lang.time.StopWatch;
009    import org.kuali.ole.RepositoryManager;
010    import org.kuali.ole.docstore.OleDocStoreException;
011    import org.kuali.ole.docstore.discovery.service.IndexerService;
012    import org.kuali.ole.docstore.document.DocumentManager;
013    import org.kuali.ole.docstore.model.xmlpojo.ingest.RequestDocument;
014    import org.kuali.ole.docstore.model.xmlpojo.ingest.ResponseDocument;
015    import org.kuali.ole.docstore.process.batch.BulkProcessRequest;
016    import org.kuali.ole.docstore.repository.CustomNodeManager;
017    import org.kuali.ole.docstore.service.BeanLocator;
018    import org.kuali.ole.docstore.service.ServiceLocator;
019    import org.kuali.ole.docstore.utility.BatchIngestStatistics;
020    import org.kuali.ole.docstore.utility.BulkIngestStatistics;
021    import org.kuali.ole.repository.NodeHandler;
022    import org.slf4j.Logger;
023    import org.slf4j.LoggerFactory;
024    
025    /**
026     * Encapsulates reusable common logic to support ACID transactions in DocStore.
027     * User: tirumalesh.b
028     * Date: 26/9/12 Time: 10:40 AM
029     */
030    public class TransactionManager {
031        private static final Logger logger = LoggerFactory.getLogger(NodeHandler.class);
032        private IndexerService indexerService;
033        private Session        session;
034        private TransactionState      transactionState         = TransactionState.IDLE;
035        private List<RequestDocument> newStateRequestDocuments = null;
036        private List<RequestDocument> oldStateRequestDocuments = new ArrayList<RequestDocument>();
037    
038        public enum TransactionState {
039            IDLE, STARTED, COMMITTED, ABORTED, FAILED
040        }
041    
042        public void startTransaction(String user, String operation) throws OleDocStoreException {
043            if (transactionState == TransactionState.STARTED) {
044                throw new OleDocStoreException("Transaction already started.");
045            }
046            try {
047                if (null == session) {
048                    session = RepositoryManager.getRepositoryManager().getSession(user, operation);
049                }
050                if (null == indexerService) {
051                    indexerService = ServiceLocator.getIndexerService();
052                }
053            }
054            catch (Exception e) {
055                throw new OleDocStoreException(e);
056            }
057            transactionState = TransactionState.STARTED;
058        }
059    
060        public void commit() throws OleDocStoreException {
061            commit(null);
062        }
063    
064        public void commit(BatchIngestStatistics batchIngestStatistics) throws OleDocStoreException {
065            logger.info("Commit: Saving changes to index...");
066            StopWatch solrCommitTimer = new StopWatch();
067            try {
068                if (null != batchIngestStatistics) {
069                    solrCommitTimer = new StopWatch();
070                    solrCommitTimer.start();
071                }
072                indexerService.commit();
073                if (null != batchIngestStatistics) {
074                    solrCommitTimer.stop();
075                    batchIngestStatistics.setTimeToSolrCommit(solrCommitTimer.getTime());
076                }
077            }
078            catch (Exception e) {
079                transactionState = TransactionState.FAILED;
080                logger.error("Exception during commit: Unable to save changes to index. :", e);
081                try {
082                    session.refresh(false);
083                }
084                catch (Exception re) {
085                    // Ignore this, as data would not be saved anyway.
086                    logger.error("Exception during commit. Unable to rollback changes to docstore. :", re);
087                }
088                throw new OleDocStoreException("Commit failed.", e);
089            }
090    
091            logger.info("Commit: Saving changes to docstore...");
092            StopWatch sessionSaveTimer = null;
093            new StopWatch();
094            try {
095                if (null != batchIngestStatistics) {
096                    sessionSaveTimer = new StopWatch();
097                    sessionSaveTimer.start();
098                }
099                session.save();
100                if (null != batchIngestStatistics) {
101                    sessionSaveTimer.stop();
102                    batchIngestStatistics.setTimeToSaveJcrSession(sessionSaveTimer.getTime());
103                }
104            }
105            catch (Exception e) {
106                transactionState = TransactionState.FAILED;
107                logger.error("Exception during commit. Unable to save changes to docstore. :", e);
108                logger.info("Commit: Reverting changes to index...");
109                // TODO: Implement now. Delete data saved by indexerService.
110                try {
111                    rollBackDataInIndexer(oldStateRequestDocuments);
112                }
113                catch (Exception ex) {
114                    logger.error("error while performing roll back in Indexer");
115                }
116                throw new OleDocStoreException("Commit failed.", e);
117            }
118            // We are all done!!!
119            transactionState = TransactionState.COMMITTED;
120            newStateRequestDocuments = null;
121            oldStateRequestDocuments = null;
122        }
123    
124        private void rollBackDataInIndexer(List<RequestDocument> oldStateRequestDocuments) throws OleDocStoreException {
125            if (oldStateRequestDocuments != null && oldStateRequestDocuments.size() > 0) {
126                String result = indexerService.indexDocuments(oldStateRequestDocuments, false);
127                if (!result.startsWith("success")) {
128                    throw new OleDocStoreException(result);
129                }
130            }
131        }
132    
133        public void abort() {
134            try {
135                session.refresh(false);
136            }
137            catch (Exception re) {
138                // Ignore this, as we want to rollback anyway.
139                logger.error("Exception during abort. Unable to rollback changes to docstore. :", re);
140    
141            }
142            try {
143                indexerService.rollback();
144            }
145            catch (Exception e) {
146                // Ignore this, as we want to rollback anyway.
147                logger.error("Exception during abort. Unable to rollback changes to index. :", e);
148            }
149            transactionState = TransactionState.ABORTED;
150            newStateRequestDocuments = null;
151            oldStateRequestDocuments = null;
152        }
153    
154        public void close() {
155            session.logout();
156            session = null;
157            transactionState = TransactionState.IDLE;
158            newStateRequestDocuments = null;
159            oldStateRequestDocuments = null;
160            setIndexerService(null);
161        }
162    
163        public List<ResponseDocument> ingest(List<RequestDocument> requestDocuments) throws OleDocStoreException {
164            DocumentManager documentManager = null;
165            // Save the request documents so that they can be rolled back if necessary.
166            newStateRequestDocuments = requestDocuments;
167            List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
168            for (RequestDocument requestDocument : requestDocuments) {
169                // Each document could be of different cat-type-format.
170                documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
171                // Store
172                responseDocuments.add(documentManager.ingest(requestDocument, session));
173            }
174            // Index
175            String result = indexerService.indexDocuments(requestDocuments, false);
176            if (!result.startsWith("success")) {
177                throw new OleDocStoreException(result);
178            }
179    
180            return responseDocuments;
181        }
182    
183        public ResponseDocument ingest(RequestDocument requestDocument) throws OleDocStoreException {
184            DocumentManager documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
185            // Save the request documents so that they can be rolled back if necessary.
186            newStateRequestDocuments = new ArrayList<RequestDocument>(1);
187            newStateRequestDocuments.add(requestDocument);
188    
189            ResponseDocument responseDocument = documentManager.ingest(requestDocument, session);
190            String result = indexerService.indexDocument(requestDocument, false);
191            if (!result.startsWith("success")) {
192                throw new OleDocStoreException(result);
193            }
194            return responseDocument;
195        }
196    
197    
198        public List<ResponseDocument> checkIn(List<RequestDocument> requestDocuments, String operation)
199                throws OleDocStoreException {
200            DocumentManager documentManager = null;
201            // get the previous content from docStore to rollback the records if saving the session fails.
202    
203            List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
204            for (RequestDocument requestDocument : requestDocuments) {
205                if (requestDocument.getId() != null || requestDocument.getId().trim().length() > 0) {
206                    requestDocument.setUuid(requestDocument.getId());
207                }
208    
209                /*   try {
210    
211                                Node nodeByUUID = CustomNodeManager.getInstance().getNodeByUUID(session, requestDocument.getId());
212                                RequestDocument prevRequestDoc = new RequestDocument();
213                                prevRequestDoc = requestDocument;
214                                String prevContent = nodeByUUID.getNode("jcr:content").getProperty("jcr:data").getValue().getString();
215                                prevRequestDoc.getContent().setContent(prevContent);
216                                oldStateRequestDocuments.add(prevRequestDoc);
217                            }
218                            catch (Exception ex) {
219                                String failMsg = "Check in failed." + "Unable to get the record from docstore.  Record UUID "
220                                                 + requestDocument.getId();
221                                logger.info(failMsg, ex);
222                                throw new OleDocStoreException(failMsg, ex);
223                            }
224                */
225                requestDocument.setOperation(operation);
226                // Each document could be of different cat-type-format.
227                documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
228    
229                // update Docstore
230                responseDocuments.add(documentManager.checkin(requestDocument, session));
231            }
232            // Index to solr
233            String result = indexerService.indexDocuments(requestDocuments, false);
234            if (!result.startsWith("success")) {
235                throw new OleDocStoreException(result);
236            }
237    
238            return responseDocuments;
239        }
240    
241    
242        public List<ResponseDocument> checkOut(List<RequestDocument> requestDocuments, String user)
243                throws OleDocStoreException {
244            DocumentManager documentManager = null;
245            // Save the request documents so that they can be rolled back if necessary.
246            newStateRequestDocuments = requestDocuments;
247            List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
248            for (RequestDocument requestDocument : requestDocuments) {
249                // Each document could be of different cat-type-format.
250                documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
251                // To differentiate between restfull API and Normal CheckOut.
252                requestDocument.setUser(user);
253                // checkOut
254                responseDocuments.add(documentManager.checkout(requestDocument, session));
255            }
256            return responseDocuments;
257        }
258    
259    
260        public ResponseDocument checkIn(RequestDocument requestDocument) throws OleDocStoreException {
261            DocumentManager documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
262            // Save the request documents so that they can be rolled back if necessary.
263            newStateRequestDocuments = new ArrayList<RequestDocument>(1);
264            newStateRequestDocuments.add(requestDocument);
265            // update Docstore
266            ResponseDocument responseDocument = documentManager.checkin(requestDocument, session);
267            //Index to solr
268            String result = indexerService.indexDocument(requestDocument, false);
269            if (!result.startsWith("success")) {
270                throw new OleDocStoreException(result);
271            }
272            return responseDocument;
273        }
274    
275        public void batchIngest(BulkProcessRequest bulkProcessRequest, List<RequestDocument> requestDocuments)
276                throws OleDocStoreException {
277            BulkIngestStatistics bulkIngestStatistics = bulkProcessRequest.getBulkIngestStatistics();
278            BatchIngestStatistics batchStatistics = bulkIngestStatistics.getCurrentBatch();
279            newStateRequestDocuments = new ArrayList<RequestDocument>();
280            // Save the request documents so that they can be rolled back if necessary.
281            newStateRequestDocuments.addAll(requestDocuments);
282            DocumentManager documentManager = BeanLocator.getDocumentManagerFactory()
283                                                         .getDocumentManager(requestDocuments.get(0));
284    
285            // Store
286            StopWatch createNodesTimer = new StopWatch();
287            createNodesTimer.start();
288            documentManager.ingest(requestDocuments, session);
289            createNodesTimer.stop();
290            batchStatistics.setTimeToCreateNodesInJcr(createNodesTimer.getTime());
291    
292            StopWatch indexSolrDocsTime = new StopWatch();
293            indexSolrDocsTime.start();
294    
295            // Index
296            String result = indexerService.indexDocuments(requestDocuments, false);
297            indexSolrDocsTime.stop();
298            batchStatistics.setTimeToIndexSolrInputDocs(indexSolrDocsTime.getTime());
299    
300            if (!result.startsWith("success")) {
301                throw new OleDocStoreException(result);
302            }
303        }
304    
305        public List<ResponseDocument> delete(List<RequestDocument> requestDocuments) throws OleDocStoreException {
306            DocumentManager documentManager = null;
307            List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
308            for (RequestDocument requestDocument : requestDocuments) {
309                documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
310                responseDocuments.add(documentManager.delete(requestDocument, session));
311            }
312            return responseDocuments;
313        }
314    
315        public IndexerService getIndexerService() {
316            return indexerService;
317        }
318    
319        public void setIndexerService(IndexerService indexerService) {
320            this.indexerService = indexerService;
321        }
322    }