View Javadoc

1   package org.kuali.ole.docstore.transaction;
2   
3   import java.util.ArrayList;
4   import java.util.List;
5   import javax.jcr.Node;
6   import javax.jcr.Session;
7   
8   import org.apache.commons.lang.time.StopWatch;
9   import org.kuali.ole.RepositoryManager;
10  import org.kuali.ole.docstore.OleDocStoreException;
11  import org.kuali.ole.docstore.discovery.service.IndexerService;
12  import org.kuali.ole.docstore.document.DocumentManager;
13  import org.kuali.ole.docstore.model.xmlpojo.ingest.RequestDocument;
14  import org.kuali.ole.docstore.model.xmlpojo.ingest.ResponseDocument;
15  import org.kuali.ole.docstore.process.batch.BulkProcessRequest;
16  import org.kuali.ole.docstore.repository.CustomNodeManager;
17  import org.kuali.ole.docstore.service.BeanLocator;
18  import org.kuali.ole.docstore.service.ServiceLocator;
19  import org.kuali.ole.docstore.utility.BatchIngestStatistics;
20  import org.kuali.ole.docstore.utility.BulkIngestStatistics;
21  import org.kuali.ole.repository.NodeHandler;
22  import org.slf4j.Logger;
23  import org.slf4j.LoggerFactory;
24  
25  /**
26   * Encapsulates reusable common logic to support ACID transactions in DocStore.
27   * User: tirumalesh.b
28   * Date: 26/9/12 Time: 10:40 AM
29   */
30  public class TransactionManager {
31      private static final Logger logger = LoggerFactory.getLogger(NodeHandler.class);
32      private IndexerService indexerService;
33      private Session        session;
34      private TransactionState      transactionState         = TransactionState.IDLE;
35      private List<RequestDocument> newStateRequestDocuments = null;
36      private List<RequestDocument> oldStateRequestDocuments = new ArrayList<RequestDocument>();
37  
38      public enum TransactionState {
39          IDLE, STARTED, COMMITTED, ABORTED, FAILED
40      }
41  
42      public void startTransaction(String user, String operation) throws OleDocStoreException {
43          if (transactionState == TransactionState.STARTED) {
44              throw new OleDocStoreException("Transaction already started.");
45          }
46          try {
47              if (null == session) {
48                  session = RepositoryManager.getRepositoryManager().getSession(user, operation);
49              }
50              if (null == indexerService) {
51                  indexerService = ServiceLocator.getIndexerService();
52              }
53          }
54          catch (Exception e) {
55              throw new OleDocStoreException(e);
56          }
57          transactionState = TransactionState.STARTED;
58      }
59  
60      public void commit() throws OleDocStoreException {
61          commit(null);
62      }
63  
64      public void commit(BatchIngestStatistics batchIngestStatistics) throws OleDocStoreException {
65          logger.info("Commit: Saving changes to index...");
66          StopWatch solrCommitTimer = new StopWatch();
67          try {
68              if (null != batchIngestStatistics) {
69                  solrCommitTimer = new StopWatch();
70                  solrCommitTimer.start();
71              }
72              indexerService.commit();
73              if (null != batchIngestStatistics) {
74                  solrCommitTimer.stop();
75                  batchIngestStatistics.setTimeToSolrCommit(solrCommitTimer.getTime());
76              }
77          }
78          catch (Exception e) {
79              transactionState = TransactionState.FAILED;
80              logger.error("Exception during commit: Unable to save changes to index. :", e);
81              try {
82                  session.refresh(false);
83              }
84              catch (Exception re) {
85                  // Ignore this, as data would not be saved anyway.
86                  logger.error("Exception during commit. Unable to rollback changes to docstore. :", re);
87              }
88              throw new OleDocStoreException("Commit failed.", e);
89          }
90  
91          logger.info("Commit: Saving changes to docstore...");
92          StopWatch sessionSaveTimer = null;
93          new StopWatch();
94          try {
95              if (null != batchIngestStatistics) {
96                  sessionSaveTimer = new StopWatch();
97                  sessionSaveTimer.start();
98              }
99              session.save();
100             if (null != batchIngestStatistics) {
101                 sessionSaveTimer.stop();
102                 batchIngestStatistics.setTimeToSaveJcrSession(sessionSaveTimer.getTime());
103             }
104         }
105         catch (Exception e) {
106             transactionState = TransactionState.FAILED;
107             logger.error("Exception during commit. Unable to save changes to docstore. :", e);
108             logger.info("Commit: Reverting changes to index...");
109             // TODO: Implement now. Delete data saved by indexerService.
110             try {
111                 rollBackDataInIndexer(oldStateRequestDocuments);
112             }
113             catch (Exception ex) {
114                 logger.error("error while performing roll back in Indexer");
115             }
116             throw new OleDocStoreException("Commit failed.", e);
117         }
118         // We are all done!!!
119         transactionState = TransactionState.COMMITTED;
120         newStateRequestDocuments = null;
121         oldStateRequestDocuments = null;
122     }
123 
124     private void rollBackDataInIndexer(List<RequestDocument> oldStateRequestDocuments) throws OleDocStoreException {
125         if (oldStateRequestDocuments != null && oldStateRequestDocuments.size() > 0) {
126             String result = indexerService.indexDocuments(oldStateRequestDocuments, false);
127             if (!result.startsWith("success")) {
128                 throw new OleDocStoreException(result);
129             }
130         }
131     }
132 
133     public void abort() {
134         try {
135             session.refresh(false);
136         }
137         catch (Exception re) {
138             // Ignore this, as we want to rollback anyway.
139             logger.error("Exception during abort. Unable to rollback changes to docstore. :", re);
140 
141         }
142         try {
143             indexerService.rollback();
144         }
145         catch (Exception e) {
146             // Ignore this, as we want to rollback anyway.
147             logger.error("Exception during abort. Unable to rollback changes to index. :", e);
148         }
149         transactionState = TransactionState.ABORTED;
150         newStateRequestDocuments = null;
151         oldStateRequestDocuments = null;
152     }
153 
154     public void close() {
155         session.logout();
156         session = null;
157         transactionState = TransactionState.IDLE;
158         newStateRequestDocuments = null;
159         oldStateRequestDocuments = null;
160         setIndexerService(null);
161     }
162 
163     public List<ResponseDocument> ingest(List<RequestDocument> requestDocuments) throws OleDocStoreException {
164         DocumentManager documentManager = null;
165         // Save the request documents so that they can be rolled back if necessary.
166         newStateRequestDocuments = requestDocuments;
167         List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
168         for (RequestDocument requestDocument : requestDocuments) {
169             // Each document could be of different cat-type-format.
170             documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
171             // Store
172             responseDocuments.add(documentManager.ingest(requestDocument, session));
173         }
174         // Index
175         String result = indexerService.indexDocuments(requestDocuments, false);
176         if (!result.startsWith("success")) {
177             throw new OleDocStoreException(result);
178         }
179 
180         return responseDocuments;
181     }
182 
183     public ResponseDocument ingest(RequestDocument requestDocument) throws OleDocStoreException {
184         DocumentManager documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
185         // Save the request documents so that they can be rolled back if necessary.
186         newStateRequestDocuments = new ArrayList<RequestDocument>(1);
187         newStateRequestDocuments.add(requestDocument);
188 
189         ResponseDocument responseDocument = documentManager.ingest(requestDocument, session);
190         String result = indexerService.indexDocument(requestDocument, false);
191         if (!result.startsWith("success")) {
192             throw new OleDocStoreException(result);
193         }
194         return responseDocument;
195     }
196 
197 
198     public List<ResponseDocument> checkIn(List<RequestDocument> requestDocuments, String operation)
199             throws OleDocStoreException {
200         DocumentManager documentManager = null;
201         // get the previous content from docStore to rollback the records if saving the session fails.
202 
203         List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
204         for (RequestDocument requestDocument : requestDocuments) {
205             if (requestDocument.getId() != null || requestDocument.getId().trim().length() > 0) {
206                 requestDocument.setUuid(requestDocument.getId());
207             }
208 
209             /*   try {
210 
211                             Node nodeByUUID = CustomNodeManager.getInstance().getNodeByUUID(session, requestDocument.getId());
212                             RequestDocument prevRequestDoc = new RequestDocument();
213                             prevRequestDoc = requestDocument;
214                             String prevContent = nodeByUUID.getNode("jcr:content").getProperty("jcr:data").getValue().getString();
215                             prevRequestDoc.getContent().setContent(prevContent);
216                             oldStateRequestDocuments.add(prevRequestDoc);
217                         }
218                         catch (Exception ex) {
219                             String failMsg = "Check in failed." + "Unable to get the record from docstore.  Record UUID "
220                                              + requestDocument.getId();
221                             logger.info(failMsg, ex);
222                             throw new OleDocStoreException(failMsg, ex);
223                         }
224             */
225             requestDocument.setOperation(operation);
226             // Each document could be of different cat-type-format.
227             documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
228 
229             // update Docstore
230             responseDocuments.add(documentManager.checkin(requestDocument, session));
231         }
232         // Index to solr
233         String result = indexerService.indexDocuments(requestDocuments, false);
234         if (!result.startsWith("success")) {
235             throw new OleDocStoreException(result);
236         }
237 
238         return responseDocuments;
239     }
240 
241 
242     public List<ResponseDocument> checkOut(List<RequestDocument> requestDocuments, String user)
243             throws OleDocStoreException {
244         DocumentManager documentManager = null;
245         // Save the request documents so that they can be rolled back if necessary.
246         newStateRequestDocuments = requestDocuments;
247         List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
248         for (RequestDocument requestDocument : requestDocuments) {
249             // Each document could be of different cat-type-format.
250             documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
251             // To differentiate between restfull API and Normal CheckOut.
252             requestDocument.setUser(user);
253             // checkOut
254             responseDocuments.add(documentManager.checkout(requestDocument, session));
255         }
256         return responseDocuments;
257     }
258 
259 
260     public ResponseDocument checkIn(RequestDocument requestDocument) throws OleDocStoreException {
261         DocumentManager documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
262         // Save the request documents so that they can be rolled back if necessary.
263         newStateRequestDocuments = new ArrayList<RequestDocument>(1);
264         newStateRequestDocuments.add(requestDocument);
265         // update Docstore
266         ResponseDocument responseDocument = documentManager.checkin(requestDocument, session);
267         //Index to solr
268         String result = indexerService.indexDocument(requestDocument, false);
269         if (!result.startsWith("success")) {
270             throw new OleDocStoreException(result);
271         }
272         return responseDocument;
273     }
274 
275     public void batchIngest(BulkProcessRequest bulkProcessRequest, List<RequestDocument> requestDocuments)
276             throws OleDocStoreException {
277         BulkIngestStatistics bulkIngestStatistics = bulkProcessRequest.getBulkIngestStatistics();
278         BatchIngestStatistics batchStatistics = bulkIngestStatistics.getCurrentBatch();
279         newStateRequestDocuments = new ArrayList<RequestDocument>();
280         // Save the request documents so that they can be rolled back if necessary.
281         newStateRequestDocuments.addAll(requestDocuments);
282         DocumentManager documentManager = BeanLocator.getDocumentManagerFactory()
283                                                      .getDocumentManager(requestDocuments.get(0));
284 
285         // Store
286         StopWatch createNodesTimer = new StopWatch();
287         createNodesTimer.start();
288         documentManager.ingest(requestDocuments, session);
289         createNodesTimer.stop();
290         batchStatistics.setTimeToCreateNodesInJcr(createNodesTimer.getTime());
291 
292         StopWatch indexSolrDocsTime = new StopWatch();
293         indexSolrDocsTime.start();
294 
295         // Index
296         String result = indexerService.indexDocuments(requestDocuments, false);
297         indexSolrDocsTime.stop();
298         batchStatistics.setTimeToIndexSolrInputDocs(indexSolrDocsTime.getTime());
299 
300         if (!result.startsWith("success")) {
301             throw new OleDocStoreException(result);
302         }
303     }
304 
305     public List<ResponseDocument> delete(List<RequestDocument> requestDocuments) throws OleDocStoreException {
306         DocumentManager documentManager = null;
307         List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
308         for (RequestDocument requestDocument : requestDocuments) {
309             documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
310             responseDocuments.add(documentManager.delete(requestDocument, session));
311         }
312         return responseDocuments;
313     }
314 
315     public IndexerService getIndexerService() {
316         return indexerService;
317     }
318 
319     public void setIndexerService(IndexerService indexerService) {
320         this.indexerService = indexerService;
321     }
322 }