001 package org.kuali.ole.docstore.transaction; 002 003 import java.util.ArrayList; 004 import java.util.List; 005 import javax.jcr.Node; 006 import javax.jcr.Session; 007 008 import org.apache.commons.lang.time.StopWatch; 009 import org.kuali.ole.RepositoryManager; 010 import org.kuali.ole.docstore.OleDocStoreException; 011 import org.kuali.ole.docstore.discovery.service.IndexerService; 012 import org.kuali.ole.docstore.document.DocumentManager; 013 import org.kuali.ole.docstore.model.xmlpojo.ingest.RequestDocument; 014 import org.kuali.ole.docstore.model.xmlpojo.ingest.ResponseDocument; 015 import org.kuali.ole.docstore.process.batch.BulkProcessRequest; 016 import org.kuali.ole.docstore.repository.CustomNodeManager; 017 import org.kuali.ole.docstore.service.BeanLocator; 018 import org.kuali.ole.docstore.service.ServiceLocator; 019 import org.kuali.ole.docstore.utility.BatchIngestStatistics; 020 import org.kuali.ole.docstore.utility.BulkIngestStatistics; 021 import org.kuali.ole.repository.NodeHandler; 022 import org.slf4j.Logger; 023 import org.slf4j.LoggerFactory; 024 025 /** 026 * Encapsulates reusable common logic to support ACID transactions in DocStore. 027 * User: tirumalesh.b 028 * Date: 26/9/12 Time: 10:40 AM 029 */ 030 public class TransactionManager { 031 private static final Logger logger = LoggerFactory.getLogger(NodeHandler.class); 032 private IndexerService indexerService; 033 private Session session; 034 private TransactionState transactionState = TransactionState.IDLE; 035 private List<RequestDocument> newStateRequestDocuments = null; 036 private List<RequestDocument> oldStateRequestDocuments = new ArrayList<RequestDocument>(); 037 038 public enum TransactionState { 039 IDLE, STARTED, COMMITTED, ABORTED, FAILED 040 } 041 042 public void startTransaction(String user, String operation) throws OleDocStoreException { 043 if (transactionState == TransactionState.STARTED) { 044 throw new OleDocStoreException("Transaction already started."); 045 } 046 try { 047 if (null == session) { 048 session = RepositoryManager.getRepositoryManager().getSession(user, operation); 049 } 050 if (null == indexerService) { 051 indexerService = ServiceLocator.getIndexerService(); 052 } 053 } 054 catch (Exception e) { 055 throw new OleDocStoreException(e); 056 } 057 transactionState = TransactionState.STARTED; 058 } 059 060 public void commit() throws OleDocStoreException { 061 commit(null); 062 } 063 064 public void commit(BatchIngestStatistics batchIngestStatistics) throws OleDocStoreException { 065 logger.info("Commit: Saving changes to index..."); 066 StopWatch solrCommitTimer = new StopWatch(); 067 try { 068 if (null != batchIngestStatistics) { 069 solrCommitTimer = new StopWatch(); 070 solrCommitTimer.start(); 071 } 072 indexerService.commit(); 073 if (null != batchIngestStatistics) { 074 solrCommitTimer.stop(); 075 batchIngestStatistics.setTimeToSolrCommit(solrCommitTimer.getTime()); 076 } 077 } 078 catch (Exception e) { 079 transactionState = TransactionState.FAILED; 080 logger.error("Exception during commit: Unable to save changes to index. :", e); 081 try { 082 session.refresh(false); 083 } 084 catch (Exception re) { 085 // Ignore this, as data would not be saved anyway. 086 logger.error("Exception during commit. Unable to rollback changes to docstore. :", re); 087 } 088 throw new OleDocStoreException("Commit failed.", e); 089 } 090 091 logger.info("Commit: Saving changes to docstore..."); 092 StopWatch sessionSaveTimer = null; 093 new StopWatch(); 094 try { 095 if (null != batchIngestStatistics) { 096 sessionSaveTimer = new StopWatch(); 097 sessionSaveTimer.start(); 098 } 099 session.save(); 100 if (null != batchIngestStatistics) { 101 sessionSaveTimer.stop(); 102 batchIngestStatistics.setTimeToSaveJcrSession(sessionSaveTimer.getTime()); 103 } 104 } 105 catch (Exception e) { 106 transactionState = TransactionState.FAILED; 107 logger.error("Exception during commit. Unable to save changes to docstore. :", e); 108 logger.info("Commit: Reverting changes to index..."); 109 // TODO: Implement now. Delete data saved by indexerService. 110 try { 111 rollBackDataInIndexer(oldStateRequestDocuments); 112 } 113 catch (Exception ex) { 114 logger.error("error while performing roll back in Indexer"); 115 } 116 throw new OleDocStoreException("Commit failed.", e); 117 } 118 // We are all done!!! 119 transactionState = TransactionState.COMMITTED; 120 newStateRequestDocuments = null; 121 oldStateRequestDocuments = null; 122 } 123 124 private void rollBackDataInIndexer(List<RequestDocument> oldStateRequestDocuments) throws OleDocStoreException { 125 if (oldStateRequestDocuments != null && oldStateRequestDocuments.size() > 0) { 126 String result = indexerService.indexDocuments(oldStateRequestDocuments, false); 127 if (!result.startsWith("success")) { 128 throw new OleDocStoreException(result); 129 } 130 } 131 } 132 133 public void abort() { 134 try { 135 session.refresh(false); 136 } 137 catch (Exception re) { 138 // Ignore this, as we want to rollback anyway. 139 logger.error("Exception during abort. Unable to rollback changes to docstore. :", re); 140 141 } 142 try { 143 indexerService.rollback(); 144 } 145 catch (Exception e) { 146 // Ignore this, as we want to rollback anyway. 147 logger.error("Exception during abort. Unable to rollback changes to index. :", e); 148 } 149 transactionState = TransactionState.ABORTED; 150 newStateRequestDocuments = null; 151 oldStateRequestDocuments = null; 152 } 153 154 public void close() { 155 session.logout(); 156 session = null; 157 transactionState = TransactionState.IDLE; 158 newStateRequestDocuments = null; 159 oldStateRequestDocuments = null; 160 setIndexerService(null); 161 } 162 163 public List<ResponseDocument> ingest(List<RequestDocument> requestDocuments) throws OleDocStoreException { 164 DocumentManager documentManager = null; 165 // Save the request documents so that they can be rolled back if necessary. 166 newStateRequestDocuments = requestDocuments; 167 List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>(); 168 for (RequestDocument requestDocument : requestDocuments) { 169 // Each document could be of different cat-type-format. 170 documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument); 171 // Store 172 responseDocuments.add(documentManager.ingest(requestDocument, session)); 173 } 174 // Index 175 String result = indexerService.indexDocuments(requestDocuments, false); 176 if (!result.startsWith("success")) { 177 throw new OleDocStoreException(result); 178 } 179 180 return responseDocuments; 181 } 182 183 public ResponseDocument ingest(RequestDocument requestDocument) throws OleDocStoreException { 184 DocumentManager documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument); 185 // Save the request documents so that they can be rolled back if necessary. 186 newStateRequestDocuments = new ArrayList<RequestDocument>(1); 187 newStateRequestDocuments.add(requestDocument); 188 189 ResponseDocument responseDocument = documentManager.ingest(requestDocument, session); 190 String result = indexerService.indexDocument(requestDocument, false); 191 if (!result.startsWith("success")) { 192 throw new OleDocStoreException(result); 193 } 194 return responseDocument; 195 } 196 197 198 public List<ResponseDocument> checkIn(List<RequestDocument> requestDocuments, String operation) 199 throws OleDocStoreException { 200 DocumentManager documentManager = null; 201 // get the previous content from docStore to rollback the records if saving the session fails. 202 203 List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>(); 204 for (RequestDocument requestDocument : requestDocuments) { 205 if (requestDocument.getId() != null || requestDocument.getId().trim().length() > 0) { 206 requestDocument.setUuid(requestDocument.getId()); 207 } 208 209 /* try { 210 211 Node nodeByUUID = CustomNodeManager.getInstance().getNodeByUUID(session, requestDocument.getId()); 212 RequestDocument prevRequestDoc = new RequestDocument(); 213 prevRequestDoc = requestDocument; 214 String prevContent = nodeByUUID.getNode("jcr:content").getProperty("jcr:data").getValue().getString(); 215 prevRequestDoc.getContent().setContent(prevContent); 216 oldStateRequestDocuments.add(prevRequestDoc); 217 } 218 catch (Exception ex) { 219 String failMsg = "Check in failed." + "Unable to get the record from docstore. Record UUID " 220 + requestDocument.getId(); 221 logger.info(failMsg, ex); 222 throw new OleDocStoreException(failMsg, ex); 223 } 224 */ 225 requestDocument.setOperation(operation); 226 // Each document could be of different cat-type-format. 227 documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument); 228 229 // update Docstore 230 responseDocuments.add(documentManager.checkin(requestDocument, session)); 231 } 232 // Index to solr 233 String result = indexerService.indexDocuments(requestDocuments, false); 234 if (!result.startsWith("success")) { 235 throw new OleDocStoreException(result); 236 } 237 238 return responseDocuments; 239 } 240 241 242 public List<ResponseDocument> checkOut(List<RequestDocument> requestDocuments, String user) 243 throws OleDocStoreException { 244 DocumentManager documentManager = null; 245 // Save the request documents so that they can be rolled back if necessary. 246 newStateRequestDocuments = requestDocuments; 247 List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>(); 248 for (RequestDocument requestDocument : requestDocuments) { 249 // Each document could be of different cat-type-format. 250 documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument); 251 // To differentiate between restfull API and Normal CheckOut. 252 requestDocument.setUser(user); 253 // checkOut 254 responseDocuments.add(documentManager.checkout(requestDocument, session)); 255 } 256 return responseDocuments; 257 } 258 259 260 public ResponseDocument checkIn(RequestDocument requestDocument) throws OleDocStoreException { 261 DocumentManager documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument); 262 // Save the request documents so that they can be rolled back if necessary. 263 newStateRequestDocuments = new ArrayList<RequestDocument>(1); 264 newStateRequestDocuments.add(requestDocument); 265 // update Docstore 266 ResponseDocument responseDocument = documentManager.checkin(requestDocument, session); 267 //Index to solr 268 String result = indexerService.indexDocument(requestDocument, false); 269 if (!result.startsWith("success")) { 270 throw new OleDocStoreException(result); 271 } 272 return responseDocument; 273 } 274 275 public void batchIngest(BulkProcessRequest bulkProcessRequest, List<RequestDocument> requestDocuments) 276 throws OleDocStoreException { 277 BulkIngestStatistics bulkIngestStatistics = bulkProcessRequest.getBulkIngestStatistics(); 278 BatchIngestStatistics batchStatistics = bulkIngestStatistics.getCurrentBatch(); 279 newStateRequestDocuments = new ArrayList<RequestDocument>(); 280 // Save the request documents so that they can be rolled back if necessary. 281 newStateRequestDocuments.addAll(requestDocuments); 282 DocumentManager documentManager = BeanLocator.getDocumentManagerFactory() 283 .getDocumentManager(requestDocuments.get(0)); 284 285 // Store 286 StopWatch createNodesTimer = new StopWatch(); 287 createNodesTimer.start(); 288 documentManager.ingest(requestDocuments, session); 289 createNodesTimer.stop(); 290 batchStatistics.setTimeToCreateNodesInJcr(createNodesTimer.getTime()); 291 292 StopWatch indexSolrDocsTime = new StopWatch(); 293 indexSolrDocsTime.start(); 294 295 // Index 296 String result = indexerService.indexDocuments(requestDocuments, false); 297 indexSolrDocsTime.stop(); 298 batchStatistics.setTimeToIndexSolrInputDocs(indexSolrDocsTime.getTime()); 299 300 if (!result.startsWith("success")) { 301 throw new OleDocStoreException(result); 302 } 303 } 304 305 public List<ResponseDocument> delete(List<RequestDocument> requestDocuments) throws OleDocStoreException { 306 DocumentManager documentManager = null; 307 List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>(); 308 for (RequestDocument requestDocument : requestDocuments) { 309 documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument); 310 responseDocuments.add(documentManager.delete(requestDocument, session)); 311 } 312 return responseDocuments; 313 } 314 315 public IndexerService getIndexerService() { 316 return indexerService; 317 } 318 319 public void setIndexerService(IndexerService indexerService) { 320 this.indexerService = indexerService; 321 } 322 }