View Javadoc
1   package org.kuali.ole.docstore.transaction;
2   
3   import org.apache.commons.lang.time.StopWatch;
4   import org.kuali.ole.RepositoryManager;
5   import org.kuali.ole.docstore.OleDocStoreException;
6   import org.kuali.ole.docstore.document.DocumentManager;
7   import org.kuali.ole.docstore.document.jcr.JcrWorkInstanceDocumentManager;
8   import org.kuali.ole.docstore.factory.DocstoreFactory;
9   import org.kuali.ole.docstore.indexer.solr.IndexerService;
10  import org.kuali.ole.docstore.indexer.solr.WorkBibDocumentIndexer;
11  import org.kuali.ole.docstore.model.xmlpojo.ingest.RequestDocument;
12  import org.kuali.ole.docstore.model.xmlpojo.ingest.ResponseDocument;
13  import org.kuali.ole.docstore.model.xmlpojo.ingest.ResponseStatus;
14  import org.kuali.ole.docstore.process.ProcessParameters;
15  import org.kuali.ole.docstore.process.batch.BulkProcessRequest;
16  import org.kuali.ole.docstore.service.BeanLocator;
17  import org.kuali.ole.docstore.utility.BatchIngestStatistics;
18  import org.kuali.ole.docstore.utility.BulkIngestStatistics;
19  import org.kuali.ole.pojo.OleException;
20  import org.slf4j.Logger;
21  import org.slf4j.LoggerFactory;
22  
23  import javax.jcr.Session;
24  import java.util.ArrayList;
25  import java.util.List;
26  
27  /**
28   * Created with IntelliJ IDEA.
29   * User: sambasivam
30   * Date: 6/20/13
31   * Time: 12:58 PM
32   * To change this template use File | Settings | File Templates.
33   */
34  public class JcrTransactionManager extends AbstractTransactionManager {
35  
36      private static final Logger logger = LoggerFactory.getLogger(JcrTransactionManager.class);
37      private Session session;
38      private TransactionState transactionState = TransactionState.IDLE;
39      private List<RequestDocument> newStateRequestDocuments = null;
40      private List<RequestDocument> oldStateRequestDocuments = new ArrayList<RequestDocument>();
41  
42      private IndexerService indexerService;
43  
44      @Override
45      public void startSession(String user, String operation) throws OleDocStoreException {
46          try {
47              if (null == session) {
48                  session = RepositoryManager.getRepositoryManager().getSession(user, operation);
49              }
50          } catch (Exception e) {
51              throw new OleDocStoreException(e);
52          }
53      }
54  
55      @Override
56      public void closeSession() {
57          if (null != session) {
58              try {
59                  RepositoryManager.getRepositoryManager().logout(session);
60              } catch (OleException e) {
61                  logger.error(" Error while closing session :" + e);
62              }
63          }
64      }
65  
66      @Override
67      public void startTransaction(String user, String operation) throws Exception {
68          if (transactionState == TransactionState.STARTED) {
69              throw new OleDocStoreException("Transaction already started.");
70          }
71          try {
72              if (null == session) {
73                  session = RepositoryManager.getRepositoryManager().getSession(user, operation);
74              }
75              /*if (null == indexerService) {
76                  indexerService = BeanLocator.getDocstoreFactory().getIndexerService();
77              }*/
78          } catch (Exception e) {
79              throw new OleDocStoreException(e);
80          }
81          transactionState = TransactionManager.TransactionState.STARTED;
82      }
83  
84      @Override
85      public void commit() throws Exception {
86          commit(null);
87      }
88  
89      @Override
90      public void abort() {
91          try {
92              session.refresh(false);
93          } catch (Exception re) {
94              // Ignore this, as we want to rollback anyway.
95              logger.error("Exception during abort. Unable to rollback changes to docstore. :", re);
96  
97          }
98          try {
99  //            indexerService.rollback();
100         } catch (Exception e) {
101             // Ignore this, as we want to rollback anyway.
102             logger.error("Exception during abort. Unable to rollback changes to index. :", e);
103         }
104         transactionState = TransactionState.ABORTED;
105     }
106 
107 
108     @Override
109     public List<ResponseDocument> ingest(List<RequestDocument> requestDocuments) throws Exception {
110 
111         DocstoreFactory docstoreFactory = BeanLocator.getDocstoreFactory();
112 
113         // Save the request documents so that they can be rolled back if necessary.
114         newStateRequestDocuments = requestDocuments;
115         List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
116         List<String> valuesList = new ArrayList<String>();
117         for (RequestDocument requestDocument : requestDocuments) {
118             // Each document could be of different cat-type-format.
119             DocumentManager documentManager = docstoreFactory.getDocumentManager(requestDocument.getCategory(), requestDocument.getType(), requestDocument.getFormat());
120             ResponseDocument respDoc = new ResponseDocument();
121             // validate given input
122 
123             try {
124                 documentManager.validateInput(requestDocument, session, valuesList);
125             } catch (OleDocStoreException e) {
126                 respDoc.setStatus(ResponseStatus.INVALID_DATA.toString());
127                 logger.debug("validationMessage-->" + e.getMessage(), e);
128                 throw new OleDocStoreException(e.getMessage(), e);
129             }
130 
131             // Store
132             documentManager.ingest(requestDocument, session, respDoc);
133             responseDocuments.add(respDoc);
134         }
135         // Index
136         for (RequestDocument requestDocument : requestDocuments) {
137             String result = getIndexerService(requestDocument).indexDocument(requestDocument, false);
138             if (!result.startsWith("success")) {
139                 throw new OleDocStoreException(result);
140             }
141         }
142 
143         return responseDocuments;
144 
145     }
146 
147     @Override
148     public List<ResponseDocument> checkIn(List<RequestDocument> requestDocuments) throws Exception {
149         DocumentManager documentManager = null;
150         // get the previous content from docStore to rollback the records if saving the session fails.
151 
152         List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
153         List<String> fieldValueList = new ArrayList<String>();
154         for (RequestDocument requestDocument : requestDocuments) {
155             // Each document could be of different cat-type-format.
156             documentManager = BeanLocator.getDocstoreFactory().getDocumentManager(requestDocument.getCategory(), requestDocument.getType(), requestDocument.getFormat());
157             if (requestDocument.getId() != null || requestDocument.getId().trim().length() > 0) {
158                 requestDocument.setUuid(requestDocument.getId());
159             }
160             //requestDocument.setOperation(operation);
161             //requestDocument.setUser(user);
162             // validate given input
163             ResponseDocument respDoc = new ResponseDocument();
164 
165             try {
166                 documentManager.validateInput(requestDocument, session, fieldValueList);
167             } catch (OleDocStoreException e) {
168                 respDoc.setStatus(ResponseStatus.INVALID_DATA.toString());
169                 logger.debug("validationMessage-->" + e.getMessage(), e);
170                 throw new OleDocStoreException(e.getMessage());
171             }
172             // Each document could be of different cat-type-format.
173             //documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
174 
175             // update Docstore
176             documentManager.checkin(requestDocument, session, respDoc);
177             responseDocuments.add(respDoc);
178         }
179         // Index to solr
180         for (RequestDocument requestDocument : requestDocuments) {
181             String result = getIndexerService(requestDocument).indexDocument(requestDocument, false);
182             if (!result.startsWith("success")) {
183                 throw new OleDocStoreException(result);
184             }
185         }
186 
187         return responseDocuments;
188     }
189 
190     @Override
191     public List<ResponseDocument> checkOut(List<RequestDocument> requestDocuments, Object object) throws Exception {
192         DocumentManager documentManager = null;
193         // Save the request documents so that they can be rolled back if necessary.
194         newStateRequestDocuments = requestDocuments;
195         List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
196         for (RequestDocument requestDocument : requestDocuments) {
197             // Each document could be of different cat-type-format.
198             documentManager = BeanLocator.getDocstoreFactory().getDocumentManager(requestDocument.getCategory(), requestDocument.getType(), requestDocument.getFormat());
199             // To differentiate between restfull API and Normal CheckOut.
200 //            requestDocument.setUser(user);
201             // checkOut
202             responseDocuments.add(documentManager.checkout(requestDocument, session));
203         }
204         return responseDocuments;
205     }
206 
207     @Override
208     public List<ResponseDocument> bind(List<RequestDocument> requestDocuments, String operation) throws Exception {
209         DocumentManager documentManager = null;
210         // get the previous content from docStore to rollback the records if saving the session fails.
211 
212         List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
213         for (RequestDocument requestDocument : requestDocuments) {
214             documentManager = BeanLocator.getDocstoreFactory().getDocumentManager(requestDocument.getCategory(), requestDocument.getType(), requestDocument.getFormat());
215             // update Docstore
216             responseDocuments.add(documentManager.bind(requestDocument, session, operation));
217         }
218 
219         // Index to solr
220         String result = getIndexerService(requestDocuments.get(0)).bind(requestDocuments);
221         if (!result.startsWith("success")) {
222             throw new OleDocStoreException(result);
223         }
224 
225         return responseDocuments;
226     }
227 
228     @Override
229     public List<ResponseDocument> unbind(List<RequestDocument> requestDocuments, String operation) throws Exception {
230         DocumentManager documentManager = null;
231         // get the previous content from docStore to rollback the records if saving the session fails.
232 
233         List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
234         for (RequestDocument requestDocument : requestDocuments) {
235             documentManager = BeanLocator.getDocstoreFactory().getDocumentManager(requestDocument.getCategory(), requestDocument.getType(), requestDocument.getFormat());
236             // update Docstore
237             responseDocuments.add(documentManager.unbind(requestDocument, session, operation));
238 
239         }
240 
241         // Index to solr
242         String result = getIndexerService(requestDocuments.get(0)).unbind(requestDocuments);
243         if (!result.startsWith("success")) {
244             throw new OleDocStoreException(result);
245         }
246 
247         return responseDocuments;
248     }
249 
250     @Override
251     public List<ResponseDocument> delete(List<RequestDocument> requestDocuments) throws Exception {
252         DocumentManager documentManager = null;
253         // Save the request documents so that they can be rolled back if necessary.
254         newStateRequestDocuments = requestDocuments;
255         List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
256         for (RequestDocument requestDocument : requestDocuments) {
257             // Each document could be of different cat-type-format.
258             documentManager = BeanLocator.getDocstoreFactory().getDocumentManager(requestDocument.getCategory(), requestDocument.getType(), requestDocument.getFormat());
259             //Store
260             responseDocuments.add(documentManager.delete(requestDocument, session));
261         }
262         // Index
263         String result = getIndexerService(requestDocuments.get(0)).delete(requestDocuments);
264         if (!result.startsWith("success")) {
265             throw new OleDocStoreException(result);
266         }
267         return responseDocuments;
268     }
269 
270     @Override
271     public List<ResponseDocument> deleteVerify(List<RequestDocument> requestDocuments) throws Exception {
272         DocumentManager documentManager = null;
273         List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
274         for (RequestDocument requestDocument : requestDocuments) {
275             documentManager = BeanLocator.getDocstoreFactory().getDocumentManager(requestDocument.getCategory(), requestDocument.getType(), requestDocument.getFormat());
276             responseDocuments.add(documentManager.deleteVerify(requestDocument, session));
277         }
278         return responseDocuments;
279 
280     }
281 
282     @Override
283     public void transferInstances(List<RequestDocument> requestDocuments) throws Exception {
284         JcrWorkInstanceDocumentManager.getInstance().transferInstances(requestDocuments, session);
285         if (requestDocuments.size() > 0)
286             getIndexerService(requestDocuments.get(0)).transferInstances(requestDocuments);
287     }
288 
289     @Override
290     public void transferItems(List<RequestDocument> requestDocuments) throws Exception {
291         logger.debug("TransactionManager transferItems");
292         JcrWorkInstanceDocumentManager.getInstance().transferItems(requestDocuments, session);
293         if (requestDocuments.size() > 0)
294             getIndexerService(requestDocuments.get(0)).transferItems(requestDocuments);
295     }
296 
297     @Override
298     public void batchIngest(BulkProcessRequest bulkProcessRequest, List<RequestDocument> requestDocuments)
299             throws Exception {
300 
301         BulkIngestStatistics bulkIngestStatistics = bulkProcessRequest.getBulkIngestStatistics();
302         BatchIngestStatistics batchStatistics = bulkIngestStatistics.getCurrentBatch();
303         long commitSize = ProcessParameters.BULK_INGEST_COMMIT_SIZE;
304         newStateRequestDocuments = new ArrayList<RequestDocument>();
305         // Save the request documents so that they can be rolled back if necessary.
306         newStateRequestDocuments.addAll(requestDocuments);
307         DocumentManager documentManager = BeanLocator.getDocstoreFactory()
308                 .getDocumentManager(requestDocuments.get(0).getCategory(), requestDocuments.get(0).getType(), requestDocuments.get(0).getFormat());
309 
310         // Store
311         StopWatch createNodesTimer = new StopWatch();
312         createNodesTimer.start();
313         documentManager.ingest(requestDocuments, session);
314         createNodesTimer.stop();
315         batchStatistics.setTimeToCreateNodesInJcr(createNodesTimer.getTime());
316         batchStatistics.setCommitSize(commitSize);
317 
318         StopWatch indexSolrDocsTime = new StopWatch();
319         indexSolrDocsTime.start();
320 
321         // Index
322         String result = getIndexerService(requestDocuments.get(0)).indexDocuments(requestDocuments, false);
323         indexSolrDocsTime.stop();
324         batchStatistics.setTimeToIndexSolrInputDocs(indexSolrDocsTime.getTime());
325 
326         if (!result.startsWith("success")) {
327             throw new OleDocStoreException(result);
328         }
329 
330     }
331 
332     public void commit(BatchIngestStatistics batchIngestStatistics) throws OleDocStoreException {
333         logger.info("Commit: Saving changes to index...");
334         StopWatch solrCommitTimer = new StopWatch();
335         try {
336             if (null != batchIngestStatistics) {
337                 solrCommitTimer = new StopWatch();
338                 solrCommitTimer.start();
339             }
340             WorkBibDocumentIndexer.getInstance().commit();
341             if (null != batchIngestStatistics) {
342                 solrCommitTimer.stop();
343                 batchIngestStatistics.setTimeToSolrCommit(solrCommitTimer.getTime());
344             }
345         } catch (Exception e) {
346             transactionState = TransactionState.FAILED;
347             logger.error("Exception during commit: Unable to save changes to index. :", e);
348             try {
349                 session.refresh(false);
350             } catch (Exception re) {
351                 // Ignore this, as data would not be saved anyway.
352                 logger.error("Exception during commit. Unable to rollback changes to docstore. :", re);
353             }
354             throw new OleDocStoreException("Commit failed.", e);
355         }
356 
357         logger.info("Commit: Saving changes to docstore...");
358         StopWatch sessionSaveTimer = null;
359         try {
360             if (null != batchIngestStatistics) {
361                 sessionSaveTimer = new StopWatch();
362                 sessionSaveTimer.start();
363             }
364             session.save();
365             if (null != batchIngestStatistics) {
366                 sessionSaveTimer.stop();
367                 batchIngestStatistics.setTimeToSaveJcrSession(sessionSaveTimer.getTime());
368             }
369         } catch (Exception e) {
370             transactionState = TransactionState.FAILED;
371             logger.error("Exception during commit. Unable to save changes to docstore. :", e);
372             logger.info("Commit: Reverting changes to index...");
373             // TODO: Implement now. Delete data saved by indexerService.
374             try {
375                 rollBackDataInIndexer(oldStateRequestDocuments);
376 //                rollBackDataInIndexer(oldStateRequestDocuments,newStateRequestDocuments);
377             } catch (Exception ex) {
378                 logger.error("error while performing roll back in Indexer");
379             }
380             throw new OleDocStoreException("Commit failed.", e);
381         }
382         //Session logout is handled in DocumentServiceImpl.java
383         // session.logout();
384         // We are all done!!!
385         transactionState = TransactionState.COMMITTED;
386         newStateRequestDocuments = null;
387         oldStateRequestDocuments = null;
388     }
389 
390     private void rollBackDataInIndexer(List<RequestDocument> oldStateRequestDocuments) throws OleDocStoreException {
391         if (oldStateRequestDocuments != null && oldStateRequestDocuments.size() > 0) {
392             for (RequestDocument requestDocument : oldStateRequestDocuments) {
393                 String result = getIndexerService(requestDocument).indexDocument(requestDocument, false);
394                 if (!result.startsWith("success")) {
395                     throw new OleDocStoreException(result);
396                 }
397             }
398         }
399     }
400 
401 
402 }
403