View Javadoc

1   package org.kuali.ole.docstore.transaction;
2   
3   import org.apache.commons.lang.time.StopWatch;
4   import org.kuali.ole.RepositoryManager;
5   import org.kuali.ole.docstore.OleDocStoreException;
6   import org.kuali.ole.docstore.discovery.service.IndexerService;
7   import org.kuali.ole.docstore.document.DocumentManager;
8   import org.kuali.ole.docstore.document.WorkInstanceDocumentManager;
9   import org.kuali.ole.docstore.model.xmlpojo.ingest.RequestDocument;
10  import org.kuali.ole.docstore.model.xmlpojo.ingest.ResponseDocument;
11  import org.kuali.ole.docstore.model.xmlpojo.ingest.ResponseStatus;
12  import org.kuali.ole.docstore.process.ProcessParameters;
13  import org.kuali.ole.docstore.process.batch.BulkProcessRequest;
14  import org.kuali.ole.docstore.service.BeanLocator;
15  import org.kuali.ole.docstore.service.ServiceLocator;
16  import org.kuali.ole.docstore.utility.BatchIngestStatistics;
17  import org.kuali.ole.docstore.utility.BulkIngestStatistics;
18  import org.kuali.ole.pojo.OleException;
19  import org.kuali.ole.repository.NodeHandler;
20  import org.slf4j.Logger;
21  import org.slf4j.LoggerFactory;
22  
23  import javax.jcr.RepositoryException;
24  import javax.jcr.Session;
25  import java.io.FileNotFoundException;
26  import java.util.ArrayList;
27  import java.util.List;
28  
29  /**
30   * Encapsulates reusable common logic to support ACID transactions in DocStore.
31   * User: tirumalesh.b
32   * Date: 26/9/12 Time: 10:40 AM
33   */
34  public class TransactionManager {
35      private static final Logger logger = LoggerFactory.getLogger(NodeHandler.class);
36      private IndexerService indexerService;
37      private Session        session;
38      private TransactionState      transactionState         = TransactionState.IDLE;
39      private List<RequestDocument> newStateRequestDocuments = null;
40      private List<RequestDocument> oldStateRequestDocuments = new ArrayList<RequestDocument>();
41  
42      public enum TransactionState {
43          IDLE, STARTED, COMMITTED, ABORTED, FAILED
44      }
45  
46      public void startSession(String user, String operation) throws OleDocStoreException {
47          try {
48              if (null == session) {
49                  session = RepositoryManager.getRepositoryManager().getSession(user, operation);
50              }
51          }
52          catch (Exception e) {
53              throw new OleDocStoreException(e);
54          }
55      }
56  
57      public void closeSession() {
58          if (null != session) {
59              try {
60                  RepositoryManager.getRepositoryManager().logout(session);
61              }
62              catch (OleException e) {
63                  logger.error(" Error while closing session :" + e);
64              }
65          }
66      }
67  
68      public void startTransaction(String user, String operation) throws OleDocStoreException {
69          if (transactionState == TransactionState.STARTED) {
70              throw new OleDocStoreException("Transaction already started.");
71          }
72          try {
73              if (null == session) {
74                  session = RepositoryManager.getRepositoryManager().getSession(user, operation);
75              }
76              if (null == indexerService) {
77                  indexerService = ServiceLocator.getIndexerService();
78              }
79          }
80          catch (Exception e) {
81              throw new OleDocStoreException(e);
82          }
83          transactionState = TransactionState.STARTED;
84      }
85  
86      public void commit() throws OleDocStoreException {
87          commit(null);
88      }
89  
90      public void commit(BatchIngestStatistics batchIngestStatistics) throws OleDocStoreException {
91          logger.info("Commit: Saving changes to index...");
92          StopWatch solrCommitTimer = new StopWatch();
93          try {
94              if (null != batchIngestStatistics) {
95                  solrCommitTimer = new StopWatch();
96                  solrCommitTimer.start();
97              }
98              indexerService.commit();
99              if (null != batchIngestStatistics) {
100                 solrCommitTimer.stop();
101                 batchIngestStatistics.setTimeToSolrCommit(solrCommitTimer.getTime());
102             }
103         }
104         catch (Exception e) {
105             transactionState = TransactionState.FAILED;
106             logger.error("Exception during commit: Unable to save changes to index. :", e);
107             try {
108                 session.refresh(false);
109             }
110             catch (Exception re) {
111                 // Ignore this, as data would not be saved anyway.
112                 logger.error("Exception during commit. Unable to rollback changes to docstore. :", re);
113             }
114             throw new OleDocStoreException("Commit failed.", e);
115         }
116 
117         logger.info("Commit: Saving changes to docstore...");
118         StopWatch sessionSaveTimer = null;
119         try {
120             if (null != batchIngestStatistics) {
121                 sessionSaveTimer = new StopWatch();
122                 sessionSaveTimer.start();
123             }
124             session.save();
125             if (null != batchIngestStatistics) {
126                 sessionSaveTimer.stop();
127                 batchIngestStatistics.setTimeToSaveJcrSession(sessionSaveTimer.getTime());
128             }
129         }
130         catch (Exception e) {
131             transactionState = TransactionState.FAILED;
132             logger.error("Exception during commit. Unable to save changes to docstore. :", e);
133             logger.info("Commit: Reverting changes to index...");
134             // TODO: Implement now. Delete data saved by indexerService.
135             try {
136                 rollBackDataInIndexer(oldStateRequestDocuments);
137 //                rollBackDataInIndexer(oldStateRequestDocuments,newStateRequestDocuments);
138             }
139             catch (Exception ex) {
140                 logger.error("error while performing roll back in Indexer");
141             }
142             throw new OleDocStoreException("Commit failed.", e);
143         }
144         //Session logout is handled in DocumentServiceImpl.java
145         // session.logout();
146         // We are all done!!!
147         transactionState = TransactionState.COMMITTED;
148         newStateRequestDocuments = null;
149         oldStateRequestDocuments = null;
150     }
151 
152     private void rollBackDataInIndexer(List<RequestDocument> oldStateRequestDocuments) throws OleDocStoreException {
153         if (oldStateRequestDocuments != null && oldStateRequestDocuments.size() > 0) {
154             String result = indexerService.indexDocuments(oldStateRequestDocuments, false);
155             if (!result.startsWith("success")) {
156                 throw new OleDocStoreException(result);
157             }
158         }
159     }
160 
161 
162    /* private void rollBackDataInIndexer(List<RequestDocument> newStateRequestDocuments,
163                                        List<RequestDocument> oldStateRequestDocuments) throws OleDocStoreException {
164         if (oldStateRequestDocuments != null && oldStateRequestDocuments.size() > 0) {
165             String result = indexerService.indexDocuments(oldStateRequestDocuments, false);
166             if (!result.startsWith("success")) {
167                 throw new OleDocStoreException(result);
168             }
169         }
170         else if (newStateRequestDocuments != null && newStateRequestDocuments.size() > 0) {
171             try {
172                 Map<String, List<String>> uuids = new HashMap<String, List<String>>();
173                 for (RequestDocument document : newStateRequestDocuments) {
174                     for (RequestDocument linkedDoc : document.getLinkedRequestDocuments()) {
175                         if (uuids.get(linkedDoc.getCategory()) == null) {
176                             uuids.put(linkedDoc.getCategory(), new ArrayList<String>());
177                         }
178                         uuids.get(linkedDoc.getCategory()).add(linkedDoc.getUuid());
179                     }
180                     if (uuids.get(document.getCategory()) == null) {
181                         uuids.put(document.getCategory(), new ArrayList<String>());
182                     }
183                     uuids.get(document.getCategory()).add(document.getUuid());
184                 }
185                 for (String category : uuids.keySet()) {
186                     indexerService.deleteDocuments(category, uuids.get(category));
187                 }
188             }
189             catch (Exception e) {
190                 logger.info(e.getMessage(),e);
191             }
192 
193         }
194     }*/
195 
196     public void abort() {
197         try {
198             session.refresh(false);
199         }
200         catch (Exception re) {
201             // Ignore this, as we want to rollback anyway.
202             logger.error("Exception during abort. Unable to rollback changes to docstore. :", re);
203 
204         }
205         try {
206             indexerService.rollback();
207         }
208         catch (Exception e) {
209             // Ignore this, as we want to rollback anyway.
210             logger.error("Exception during abort. Unable to rollback changes to index. :", e);
211         }
212         transactionState = TransactionState.ABORTED;
213         newStateRequestDocuments = null;
214         oldStateRequestDocuments = null;
215     }
216 
217     public void close() {
218         session.logout();
219         session = null;
220         transactionState = TransactionState.IDLE;
221         newStateRequestDocuments = null;
222         oldStateRequestDocuments = null;
223         setIndexerService(null);
224     }
225 
226     public List<ResponseDocument> ingest(List<RequestDocument> requestDocuments) throws OleDocStoreException {
227         DocumentManager documentManager = null;
228         // Save the request documents so that they can be rolled back if necessary.
229         newStateRequestDocuments = requestDocuments;
230         List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
231         List<String> valuesList = new ArrayList<String>();
232         for (RequestDocument requestDocument : requestDocuments) {
233             // Each document could be of different cat-type-format.
234             documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
235             ResponseDocument respDoc = new ResponseDocument();
236             // validate given input
237 
238             try {
239                 documentManager.validateInput(requestDocument, session, valuesList);
240             } catch (OleDocStoreException e) {
241                 respDoc.setStatus(ResponseStatus.INVALID_DATA.toString());
242                 logger.debug("validationMessage-->" + e.getMessage(),e);
243                 throw new OleDocStoreException(e.getMessage(),e);
244             }
245 
246             // Store
247             documentManager.ingest(requestDocument, session, respDoc);
248             responseDocuments.add(respDoc);
249         }
250         // Index
251         String result = indexerService.indexDocuments(requestDocuments, false);
252         if (!result.startsWith("success")) {
253             throw new OleDocStoreException(result);
254         }
255 
256         return responseDocuments;
257     }
258 
259     public ResponseDocument ingest(RequestDocument requestDocument) throws OleDocStoreException {
260         DocumentManager documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
261         // Save the request documents so that they can be rolled back if necessary.
262         newStateRequestDocuments = new ArrayList<RequestDocument>(1);
263         newStateRequestDocuments.add(requestDocument);
264         List<String> valuesList = new ArrayList<String>();
265         // validate given input
266         ResponseDocument respDoc = new ResponseDocument();
267         try {
268             documentManager.validateInput(requestDocument, session, valuesList);
269         } catch (OleDocStoreException e) {
270             respDoc.setStatus(ResponseStatus.INVALID_DATA.toString());
271             logger.debug("validationMessage-->" + e.getMessage());
272             throw new OleDocStoreException(e.getMessage());
273         }
274         documentManager.ingest(requestDocument, session, respDoc);
275         String result = indexerService.indexDocument(requestDocument, false);
276         if (!result.startsWith("success")) {
277             throw new OleDocStoreException(result);
278         }
279         return respDoc;
280     }
281 
282 
283     public List<ResponseDocument> checkIn(List<RequestDocument> requestDocuments)
284             throws OleDocStoreException {
285         DocumentManager documentManager = null;
286         // get the previous content from docStore to rollback the records if saving the session fails.
287 
288         List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
289         List<String> fieldValueList = new ArrayList<String>();
290         for (RequestDocument requestDocument : requestDocuments) {
291             // Each document could be of different cat-type-format.
292             documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
293             if (requestDocument.getId() != null || requestDocument.getId().trim().length() > 0) {
294                 requestDocument.setUuid(requestDocument.getId());
295             }
296             //requestDocument.setOperation(operation);
297             //requestDocument.setUser(user);
298             // validate given input
299             ResponseDocument respDoc = new ResponseDocument();
300 
301             try {
302                 documentManager.validateInput(requestDocument, session, fieldValueList);
303             } catch (OleDocStoreException e) {
304                 respDoc.setStatus(ResponseStatus.INVALID_DATA.toString());
305                 logger.debug("validationMessage-->" + e.getMessage(),e);
306                     throw new OleDocStoreException(e.getMessage());
307             }
308             // Each document could be of different cat-type-format.
309             //documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
310 
311             // update Docstore
312             documentManager.checkin(requestDocument, session, respDoc);
313             responseDocuments.add(respDoc);
314         }
315         // Index to solr
316         String result = indexerService.indexDocuments(requestDocuments, false);
317         if (!result.startsWith("success")) {
318             throw new OleDocStoreException(result);
319         }
320 
321         return responseDocuments;
322     }
323 
324 
325     public List<ResponseDocument> checkOut(List<RequestDocument> requestDocuments, String user)
326             throws OleDocStoreException {
327         DocumentManager documentManager = null;
328         // Save the request documents so that they can be rolled back if necessary.
329         newStateRequestDocuments = requestDocuments;
330         List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
331         for (RequestDocument requestDocument : requestDocuments) {
332             // Each document could be of different cat-type-format.
333             documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
334             // To differentiate between restfull API and Normal CheckOut.
335             requestDocument.setUser(user);
336             // checkOut
337             responseDocuments.add(documentManager.checkout(requestDocument, session));
338         }
339         return responseDocuments;
340     }
341 
342 
343     public ResponseDocument checkIn(RequestDocument requestDocument) throws OleDocStoreException {
344         DocumentManager documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
345         // Save the request documents so that they can be rolled back if necessary.
346         newStateRequestDocuments = new ArrayList<RequestDocument>(1);
347         newStateRequestDocuments.add(requestDocument);
348         ResponseDocument responseDocument = new ResponseDocument();
349         List<String> fieldValueList = new ArrayList<String>();
350 
351         // validate given input
352         ResponseDocument respDoc = new ResponseDocument();
353 
354         try {
355             documentManager.validateInput(requestDocument, session, fieldValueList);
356         } catch (OleDocStoreException e) {
357             respDoc.setStatus(ResponseStatus.INVALID_DATA.toString());
358             logger.debug("validationMessage-->" + e.getMessage(),e);
359             throw new OleDocStoreException(e.getMessage());
360         }
361         // update Docstore
362         documentManager.checkin(requestDocument, session, responseDocument);
363         //Index to solr
364         String result = indexerService.indexDocument(requestDocument, false);
365         if (!result.startsWith("success")) {
366             throw new OleDocStoreException(result);
367         }
368         return responseDocument;
369     }
370 
371 
372     public List<ResponseDocument> bind(List<RequestDocument> requestDocuments, String operation)
373             throws Exception, RepositoryException, OleException, FileNotFoundException {
374         DocumentManager documentManager = null;
375         // get the previous content from docStore to rollback the records if saving the session fails.
376 
377         List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
378         for (RequestDocument requestDocument : requestDocuments) {
379             documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
380             // update Docstore
381             responseDocuments.add(documentManager.bind(requestDocument, session, operation));
382         }
383 
384         // Index to solr
385         String result = indexerService.bind(requestDocuments);
386         if (!result.startsWith("success")) {
387             throw new OleDocStoreException(result);
388         }
389 
390         return responseDocuments;
391     }
392 
393 
394     public List<ResponseDocument> unbind(List<RequestDocument> requestDocuments, String operation)
395             throws Exception, RepositoryException, OleException, FileNotFoundException {
396         DocumentManager documentManager = null;
397         // get the previous content from docStore to rollback the records if saving the session fails.
398 
399         List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
400         for (RequestDocument requestDocument : requestDocuments) {
401             documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
402             // update Docstore
403             responseDocuments.add(documentManager.unbind(requestDocument, session, operation));
404 
405         }
406 
407         // Index to solr
408         String result = indexerService.unbind(requestDocuments);
409         if (!result.startsWith("success")) {
410             throw new OleDocStoreException(result);
411         }
412 
413         return responseDocuments;
414     }
415 
416 
417     public void batchIngest(BulkProcessRequest bulkProcessRequest, List<RequestDocument> requestDocuments)
418             throws OleDocStoreException {
419         BulkIngestStatistics bulkIngestStatistics = bulkProcessRequest.getBulkIngestStatistics();
420         BatchIngestStatistics batchStatistics = bulkIngestStatistics.getCurrentBatch();
421         long commitSize = ProcessParameters.BULK_INGEST_COMMIT_SIZE;
422         newStateRequestDocuments = new ArrayList<RequestDocument>();
423         // Save the request documents so that they can be rolled back if necessary.
424         newStateRequestDocuments.addAll(requestDocuments);
425         DocumentManager documentManager = BeanLocator.getDocumentManagerFactory()
426                                                      .getDocumentManager(requestDocuments.get(0));
427 
428         // Store
429         StopWatch createNodesTimer = new StopWatch();
430         createNodesTimer.start();
431         documentManager.ingest(requestDocuments, session);
432         createNodesTimer.stop();
433         batchStatistics.setTimeToCreateNodesInJcr(createNodesTimer.getTime());
434         batchStatistics.setCommitSize(commitSize);
435 
436         StopWatch indexSolrDocsTime = new StopWatch();
437         indexSolrDocsTime.start();
438 
439         // Index
440         String result = indexerService.indexDocuments(requestDocuments, false);
441         indexSolrDocsTime.stop();
442         batchStatistics.setTimeToIndexSolrInputDocs(indexSolrDocsTime.getTime());
443 
444         if (!result.startsWith("success")) {
445             throw new OleDocStoreException(result);
446         }
447     }
448 
449     public List<ResponseDocument> delete(List<RequestDocument> requestDocuments) throws Exception{
450         DocumentManager documentManager = null;
451         // Save the request documents so that they can be rolled back if necessary.
452         newStateRequestDocuments = requestDocuments;
453         List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
454         for (RequestDocument requestDocument : requestDocuments) {
455             // Each document could be of different cat-type-format.
456             documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
457             //Store
458             responseDocuments.add(documentManager.delete(requestDocument,session));
459         }
460         // Index
461         String result = indexerService.delete(requestDocuments);
462         if (!result.startsWith("success")) {
463             throw new OleDocStoreException(result);
464         }
465         return responseDocuments;
466     }
467 
468     public List<ResponseDocument> deleteVerify(List<RequestDocument> requestDocuments) throws Exception {
469         DocumentManager documentManager = null;
470         List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
471         for (RequestDocument requestDocument : requestDocuments) {
472             documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
473             responseDocuments.add(documentManager.deleteVerify(requestDocument, session));
474         }
475         return responseDocuments;
476     }
477     public IndexerService getIndexerService() {
478         return indexerService;
479     }
480 
481     public void setIndexerService(IndexerService indexerService) {
482         this.indexerService = indexerService;
483     }
484        public void transferInstances(List<RequestDocument> requestDocuments) throws Exception {
485            WorkInstanceDocumentManager.getInstance().transferInstances(requestDocuments,session);
486         indexerService.transferInstances(requestDocuments);
487 
488     }
489     public void transferItems(List<RequestDocument> requestDocuments) throws Exception {
490         logger.debug("TransactionManager transferItems");
491         WorkInstanceDocumentManager.getInstance().transferItems(requestDocuments, session);
492         indexerService.transferItems(requestDocuments);
493 
494     }
495 }