View Javadoc
1   package org.kuali.ole.docstore.transaction;
2   
3   import org.apache.commons.lang.time.StopWatch;
4   import org.kuali.ole.docstore.OleDocStoreException;
5   import org.kuali.ole.docstore.document.DocumentManager;
6   import org.kuali.ole.docstore.document.jcr.JcrWorkInstanceDocumentManager;
7   import org.kuali.ole.docstore.document.rdbms.RdbmsWorkInstanceDocumentManager;
8   import org.kuali.ole.docstore.factory.DocstoreFactory;
9   import org.kuali.ole.docstore.indexer.solr.IndexerService;
10  import org.kuali.ole.docstore.indexer.solr.WorkBibDocumentIndexer;
11  import org.kuali.ole.docstore.indexer.solr.WorkInstanceDocumentIndexer;
12  import org.kuali.ole.docstore.indexer.solr.WorkItemDocumentIndexer;
13  import org.kuali.ole.docstore.model.xmlpojo.ingest.RequestDocument;
14  import org.kuali.ole.docstore.model.xmlpojo.ingest.ResponseDocument;
15  import org.kuali.ole.docstore.model.xmlpojo.ingest.ResponseStatus;
16  import org.kuali.ole.docstore.process.ProcessParameters;
17  import org.kuali.ole.docstore.process.batch.BulkProcessRequest;
18  import org.kuali.ole.docstore.service.BeanLocator;
19  import org.kuali.ole.docstore.utility.BatchIngestStatistics;
20  import org.kuali.ole.docstore.utility.BulkIngestStatistics;
21  import org.kuali.rice.krad.service.BusinessObjectService;
22  import org.kuali.rice.krad.service.KRADServiceLocator;
23  import org.slf4j.Logger;
24  import org.slf4j.LoggerFactory;
25  
26  import java.util.ArrayList;
27  import java.util.List;
28  
29  /**
30   * Created with IntelliJ IDEA.
31   * User: sambasivam
32   * Date: 6/20/13
33   * Time: 12:58 PM
34   * To change this template use File | Settings | File Templates.
35   */
36  public class RdbmsTransactionManager extends AbstractTransactionManager {
37  
38      private static final Logger logger = LoggerFactory.getLogger(RdbmsTransactionManager.class);
39      private BusinessObjectService businessObjectService;
40      private IndexerService indexerService;
41      private TransactionState transactionState = TransactionState.IDLE;
42      private List<RequestDocument> newStateRequestDocuments = null;
43      private List<RequestDocument> oldStateRequestDocuments = new ArrayList<RequestDocument>();
44  
45      @Override
46      public void startTransaction(String user, String operation) throws Exception {
47          try {
48              if (null == businessObjectService) {
49                  businessObjectService = KRADServiceLocator.getBusinessObjectService();
50              }
51             /* if (null == indexerService) {
52                  indexerService = BeanLocator.getDocstoreFactory().getIndexerService();
53              }*/
54          } catch (Exception e) {
55              logger.error(e.getMessage() , e );
56  
57          }
58      }
59  
60  
61      @Override
62      public List<ResponseDocument> ingest(List<RequestDocument> requestDocuments) throws Exception {
63  
64          DocstoreFactory docstoreFactory = BeanLocator.getDocstoreFactory();
65          // Save the request documents so that they can be rolled back if necessary.
66          List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
67          List<String> valuesList = new ArrayList<String>();
68          for (RequestDocument requestDocument : requestDocuments) {
69              // Each document could be of different cat-type-format.
70              DocumentManager documentManager = docstoreFactory.getDocumentManager(requestDocument.getCategory(), requestDocument.getType(), requestDocument.getFormat());
71              ResponseDocument respDoc = new ResponseDocument();
72              try {
73                  documentManager.validateInput(requestDocument, businessObjectService, valuesList);
74              } catch (OleDocStoreException e) {
75                  respDoc.setStatus(ResponseStatus.INVALID_DATA.toString());
76                  logger.debug("validationMessage-->" + e.getMessage(), e);
77                  throw new OleDocStoreException(e.getMessage(), e);
78              }
79  
80              documentManager.ingest(requestDocument, businessObjectService, respDoc);
81              responseDocuments.add(respDoc);
82          }
83          // Index
84          for (RequestDocument requestDocument : requestDocuments) {
85              String result = getIndexerService(requestDocument).indexDocument(requestDocument, false);
86              if (!result.startsWith("success")) {
87                  throw new OleDocStoreException(result);
88              }
89          }
90  
91          return responseDocuments;
92      }
93  
94      @Override
95      public List<ResponseDocument> checkIn(List<RequestDocument> requestDocuments) throws Exception {
96          DocstoreFactory docstoreFactory = BeanLocator.getDocstoreFactory();
97          // Save the request documents so that they can be rolled back if necessary.
98          List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
99          List<String> valuesList = new ArrayList<String>();
100         for (RequestDocument requestDocument : requestDocuments) {
101             // Each document could be of different cat-type-format.
102             DocumentManager documentManager = BeanLocator.getDocstoreFactory().getDocumentManager(requestDocument.getCategory(), requestDocument.getType(), requestDocument.getFormat());
103             if (requestDocument.getId() != null || requestDocument.getId().trim().length() > 0) {
104                 requestDocument.setUuid(requestDocument.getId());
105             }
106             //requestDocument.setOperation(operation);
107             //requestDocument.setUser(user);
108             // validate given input
109             ResponseDocument respDoc = new ResponseDocument();
110 
111             try {
112                 documentManager.validateInput(requestDocument, businessObjectService, valuesList);
113             } catch (OleDocStoreException e) {
114                 respDoc.setStatus(ResponseStatus.INVALID_DATA.toString());
115                 logger.debug("validationMessage-->" + e.getMessage(), e);
116                 throw new OleDocStoreException(e.getMessage());
117             }
118             // Each document could be of different cat-type-format.
119             //documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
120 
121             // update Docstore
122             documentManager.checkin(requestDocument, businessObjectService, respDoc);
123             responseDocuments.add(respDoc);
124         }
125         // Index to solr
126         for (RequestDocument requestDocument : requestDocuments) {
127             String result = getIndexerService(requestDocument).indexDocument(requestDocument, false);
128             if (!result.startsWith("success")) {
129                 throw new OleDocStoreException(result);
130             }
131         }
132 
133         return responseDocuments;
134     }
135 
136     @Override
137     public List<ResponseDocument> checkOut(List<RequestDocument> requestDocuments, Object object) throws Exception {
138         DocstoreFactory docstoreFactory = BeanLocator.getDocstoreFactory();
139         // Save the request documents so that they can be rolled back if necessary.
140         List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
141         List<String> valuesList = new ArrayList<String>();
142         for (RequestDocument requestDocument : requestDocuments) {
143             // Each document could be of different cat-type-format.
144             DocumentManager documentManager = docstoreFactory.getDocumentManager(requestDocument.getCategory(), requestDocument.getType(), requestDocument.getFormat());
145             responseDocuments.add(documentManager.checkout(requestDocument, businessObjectService));
146         }
147         return responseDocuments;
148     }
149 
150     @Override
151     public List<ResponseDocument> bind(List<RequestDocument> requestDocuments, String operation) throws Exception {
152         DocumentManager documentManager = null;
153         List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
154         for (RequestDocument requestDocument : requestDocuments) {
155             documentManager = BeanLocator.getDocstoreFactory()
156                     .getDocumentManager(requestDocument.getCategory(), requestDocument.getType(),
157                             requestDocument.getFormat());
158             responseDocuments.add(documentManager.bind(requestDocument, businessObjectService, operation));
159         }
160 
161         // Index to solr
162         for (RequestDocument requestDocument : requestDocuments) {
163             String result = getIndexerService(requestDocument).bind(requestDocument);
164             if (!result.startsWith("success")) {
165                 throw new OleDocStoreException(result);
166             }
167 
168         }
169         return responseDocuments;
170     }
171 
172     @Override
173     public List<ResponseDocument> unbind(List<RequestDocument> requestDocuments, String operation) throws Exception {
174         return null;  //To change body of implemented methods use File | Settings | File Templates.
175     }
176 
177     @Override
178     public List<ResponseDocument> delete(List<RequestDocument> requestDocuments) throws Exception {
179         DocumentManager documentManager = null;
180         // Save the request documents so that they can be rolled back if necessary.
181         List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
182         for (RequestDocument requestDocument : requestDocuments) {
183             // Each document could be of different cat-type-format.
184             documentManager = BeanLocator.getDocstoreFactory()
185                     .getDocumentManager(requestDocument.getCategory(), requestDocument.getType(),
186                             requestDocument.getFormat());
187             //Store
188             responseDocuments.add(documentManager.delete(requestDocument, businessObjectService));
189         }
190 
191         for (int index = 0; index < requestDocuments.size(); index++) {
192             ResponseDocument responseDocument = responseDocuments.get(index);
193             if ("success".equals(responseDocument.getStatus())) {
194                 RequestDocument requestDocument = requestDocuments.get(index);
195                 String result = getIndexerService(requestDocument).delete(requestDocument);
196                 if (!result.startsWith("success")) {
197                     throw new OleDocStoreException(result);
198                 }
199             }
200         }
201         return responseDocuments;
202     }
203 
204     @Override
205     public List<ResponseDocument> deleteVerify(List<RequestDocument> requestDocuments) throws Exception {
206         DocumentManager documentManager = null;
207         List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
208         for (RequestDocument requestDocument : requestDocuments) {
209             documentManager = BeanLocator.getDocstoreFactory().getDocumentManager(requestDocument.getCategory(), requestDocument.getType(), requestDocument.getFormat());
210             responseDocuments.add(documentManager.deleteVerify(requestDocument, businessObjectService));
211         }
212         return responseDocuments;
213     }
214 
215     @Override
216     public void transferInstances(List<RequestDocument> requestDocuments) throws Exception {
217         logger.debug("TransactionManager transferInstancessssssssssssssssss");
218         RdbmsWorkInstanceDocumentManager.getInstance().transferInstances(requestDocuments, businessObjectService);
219         WorkInstanceDocumentIndexer.getInstance().transferInstances(requestDocuments);
220     }
221 
222     @Override
223     public void transferItems(List<RequestDocument> requestDocuments) throws Exception {
224         logger.debug("TransactionManager transferItemsssssssssss");
225         RdbmsWorkInstanceDocumentManager.getInstance().transferItems(requestDocuments, businessObjectService);
226         WorkItemDocumentIndexer.getInstance().transferItems(requestDocuments);
227     }
228 
229     @Override
230     public void batchIngest(BulkProcessRequest bulkProcessRequest, List<RequestDocument> requestDocuments)
231             throws Exception {
232         BulkIngestStatistics bulkIngestStatistics = bulkProcessRequest.getBulkIngestStatistics();
233         BatchIngestStatistics batchStatistics = bulkIngestStatistics.getCurrentBatch();
234         long commitSize = ProcessParameters.BULK_INGEST_COMMIT_SIZE;
235         newStateRequestDocuments = new ArrayList<RequestDocument>();
236         // Save the request documents so that they can be rolled back if necessary.
237         newStateRequestDocuments.addAll(requestDocuments);
238         DocumentManager documentManager = BeanLocator.getDocstoreFactory()
239                 .getDocumentManager(requestDocuments.get(0).getCategory(),
240                         requestDocuments.get(0).getType(),
241                         requestDocuments.get(0).getFormat());
242 
243         // Store
244         StopWatch createNodesTimer = new StopWatch();
245         createNodesTimer.start();
246         documentManager.ingest(requestDocuments, businessObjectService);
247         createNodesTimer.stop();
248         batchStatistics.setTimeToCreateNodesInJcr(createNodesTimer.getTime());
249         batchStatistics.setCommitSize(commitSize);
250 
251         StopWatch indexSolrDocsTime = new StopWatch();
252         indexSolrDocsTime.start();
253 
254         // Index
255         String result = getIndexerService(requestDocuments.get(0)).bulkIndexDocuments(requestDocuments, false);
256 //        for (RequestDocument requestDocument : requestDocuments) {
257 //            String result = getIndexerService(requestDocument).indexDocument(requestDocument, false);
258 //            if (!result.startsWith("success")) {
259 //                throw new OleDocStoreException(result);
260 //            }
261 //        }
262         indexSolrDocsTime.stop();
263         batchStatistics.setTimeToIndexSolrInputDocs(indexSolrDocsTime.getTime());
264 
265         if (!result.startsWith("success")) {
266             throw new OleDocStoreException(result);
267         }
268     }
269 
270     @Override
271     public void startSession(String user, String operation) throws Exception {
272         //To change body of implemented methods use File | Settings | File Templates.
273     }
274 
275     @Override
276     public void commit() throws Exception {
277         commit(null);
278     }
279 
280     @Override
281     public void closeSession() {
282         //To change body of implemented methods use File | Settings | File Templates.
283     }
284 
285     @Override
286     public void abort() {
287         //To change body of implemented methods use File | Settings | File Templates.
288     }
289 
290     public void commit(BatchIngestStatistics batchIngestStatistics) throws OleDocStoreException {
291         logger.info("Commit: Saving changes to index...");
292         StopWatch solrCommitTimer = new StopWatch();
293         try {
294             if (null != batchIngestStatistics) {
295                 solrCommitTimer = new StopWatch();
296                 solrCommitTimer.start();
297             }
298             WorkBibDocumentIndexer.getInstance().commit();
299             if (null != batchIngestStatistics) {
300                 solrCommitTimer.stop();
301                 batchIngestStatistics.setTimeToSolrCommit(solrCommitTimer.getTime());
302             }
303         } catch (Exception e) {
304             transactionState = TransactionState.FAILED;
305             logger.error("Exception during commit: Unable to save changes to index. :", e);
306 
307             throw new OleDocStoreException("Commit failed.", e);
308         }
309 
310         logger.info("Commit: Saving changes to docstore...");
311         StopWatch sessionSaveTimer = null;
312         try {
313             if (null != batchIngestStatistics) {
314                 sessionSaveTimer = new StopWatch();
315                 sessionSaveTimer.start();
316             }
317             if (null != batchIngestStatistics) {
318                 sessionSaveTimer.stop();
319                 batchIngestStatistics.setTimeToSaveJcrSession(sessionSaveTimer.getTime());
320             }
321         } catch (Exception e) {
322             transactionState = TransactionState.FAILED;
323             logger.error("Exception during commit. Unable to save changes to docstore. :", e);
324             logger.info("Commit: Reverting changes to index...");
325             // TODO: Implement now. Delete data saved by indexerService.
326             try {
327                 rollBackDataInIndexer(oldStateRequestDocuments);
328                 //                rollBackDataInIndexer(oldStateRequestDocuments,newStateRequestDocuments);
329             } catch (Exception ex) {
330                 logger.error("error while performing roll back in Indexer" , ex);
331             }
332             throw new OleDocStoreException("Commit failed.", e);
333         }
334         //Session logout is handled in DocumentServiceImpl.java
335         // session.logout();
336         // We are all done!!!
337         transactionState = TransactionState.COMMITTED;
338         newStateRequestDocuments = null;
339         oldStateRequestDocuments = null;
340     }
341 
342     private void rollBackDataInIndexer(List<RequestDocument> oldStateRequestDocuments) throws OleDocStoreException {
343         if (oldStateRequestDocuments != null && oldStateRequestDocuments.size() > 0) {
344             for (RequestDocument requestDocument : oldStateRequestDocuments) {
345                 String result = getIndexerService(requestDocument).indexDocument(requestDocument, false);
346                 if (!result.startsWith("success")) {
347                     throw new OleDocStoreException(result);
348                 }
349             }
350         }
351     }
352 
353     public void setBusinessObjectService(BusinessObjectService businessObjectService) {
354         this.businessObjectService = businessObjectService;
355     }
356 }