001package org.kuali.ole.docstore.transaction; 002 003import org.apache.commons.lang.time.StopWatch; 004import org.kuali.ole.RepositoryManager; 005import org.kuali.ole.docstore.OleDocStoreException; 006import org.kuali.ole.docstore.document.DocumentManager; 007import org.kuali.ole.docstore.document.jcr.JcrWorkInstanceDocumentManager; 008import org.kuali.ole.docstore.factory.DocstoreFactory; 009import org.kuali.ole.docstore.indexer.solr.IndexerService; 010import org.kuali.ole.docstore.indexer.solr.WorkBibDocumentIndexer; 011import org.kuali.ole.docstore.model.xmlpojo.ingest.RequestDocument; 012import org.kuali.ole.docstore.model.xmlpojo.ingest.ResponseDocument; 013import org.kuali.ole.docstore.model.xmlpojo.ingest.ResponseStatus; 014import org.kuali.ole.docstore.process.ProcessParameters; 015import org.kuali.ole.docstore.process.batch.BulkProcessRequest; 016import org.kuali.ole.docstore.service.BeanLocator; 017import org.kuali.ole.docstore.utility.BatchIngestStatistics; 018import org.kuali.ole.docstore.utility.BulkIngestStatistics; 019import org.kuali.ole.pojo.OleException; 020import org.slf4j.Logger; 021import org.slf4j.LoggerFactory; 022 023import javax.jcr.Session; 024import java.util.ArrayList; 025import java.util.List; 026 027/** 028 * Created with IntelliJ IDEA. 029 * User: sambasivam 030 * Date: 6/20/13 031 * Time: 12:58 PM 032 * To change this template use File | Settings | File Templates. 033 */ 034public class JcrTransactionManager extends AbstractTransactionManager { 035 036 private static final Logger logger = LoggerFactory.getLogger(JcrTransactionManager.class); 037 private Session session; 038 private TransactionState transactionState = TransactionState.IDLE; 039 private List<RequestDocument> newStateRequestDocuments = null; 040 private List<RequestDocument> oldStateRequestDocuments = new ArrayList<RequestDocument>(); 041 042 private IndexerService indexerService; 043 044 @Override 045 public void startSession(String user, String operation) throws OleDocStoreException { 046 try { 047 if (null == session) { 048 session = RepositoryManager.getRepositoryManager().getSession(user, operation); 049 } 050 } catch (Exception e) { 051 throw new OleDocStoreException(e); 052 } 053 } 054 055 @Override 056 public void closeSession() { 057 if (null != session) { 058 try { 059 RepositoryManager.getRepositoryManager().logout(session); 060 } catch (OleException e) { 061 logger.error(" Error while closing session :" + e); 062 } 063 } 064 } 065 066 @Override 067 public void startTransaction(String user, String operation) throws Exception { 068 if (transactionState == TransactionState.STARTED) { 069 throw new OleDocStoreException("Transaction already started."); 070 } 071 try { 072 if (null == session) { 073 session = RepositoryManager.getRepositoryManager().getSession(user, operation); 074 } 075 /*if (null == indexerService) { 076 indexerService = BeanLocator.getDocstoreFactory().getIndexerService(); 077 }*/ 078 } catch (Exception e) { 079 throw new OleDocStoreException(e); 080 } 081 transactionState = TransactionManager.TransactionState.STARTED; 082 } 083 084 @Override 085 public void commit() throws Exception { 086 commit(null); 087 } 088 089 @Override 090 public void abort() { 091 try { 092 session.refresh(false); 093 } catch (Exception re) { 094 // Ignore this, as we want to rollback anyway. 095 logger.error("Exception during abort. Unable to rollback changes to docstore. :", re); 096 097 } 098 try { 099// indexerService.rollback(); 100 } catch (Exception e) { 101 // Ignore this, as we want to rollback anyway. 102 logger.error("Exception during abort. Unable to rollback changes to index. :", e); 103 } 104 transactionState = TransactionState.ABORTED; 105 } 106 107 108 @Override 109 public List<ResponseDocument> ingest(List<RequestDocument> requestDocuments) throws Exception { 110 111 DocstoreFactory docstoreFactory = BeanLocator.getDocstoreFactory(); 112 113 // Save the request documents so that they can be rolled back if necessary. 114 newStateRequestDocuments = requestDocuments; 115 List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>(); 116 List<String> valuesList = new ArrayList<String>(); 117 for (RequestDocument requestDocument : requestDocuments) { 118 // Each document could be of different cat-type-format. 119 DocumentManager documentManager = docstoreFactory.getDocumentManager(requestDocument.getCategory(), requestDocument.getType(), requestDocument.getFormat()); 120 ResponseDocument respDoc = new ResponseDocument(); 121 // validate given input 122 123 try { 124 documentManager.validateInput(requestDocument, session, valuesList); 125 } catch (OleDocStoreException e) { 126 respDoc.setStatus(ResponseStatus.INVALID_DATA.toString()); 127 logger.debug("validationMessage-->" + e.getMessage(), e); 128 throw new OleDocStoreException(e.getMessage(), e); 129 } 130 131 // Store 132 documentManager.ingest(requestDocument, session, respDoc); 133 responseDocuments.add(respDoc); 134 } 135 // Index 136 for (RequestDocument requestDocument : requestDocuments) { 137 String result = getIndexerService(requestDocument).indexDocument(requestDocument, false); 138 if (!result.startsWith("success")) { 139 throw new OleDocStoreException(result); 140 } 141 } 142 143 return responseDocuments; 144 145 } 146 147 @Override 148 public List<ResponseDocument> checkIn(List<RequestDocument> requestDocuments) throws Exception { 149 DocumentManager documentManager = null; 150 // get the previous content from docStore to rollback the records if saving the session fails. 151 152 List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>(); 153 List<String> fieldValueList = new ArrayList<String>(); 154 for (RequestDocument requestDocument : requestDocuments) { 155 // Each document could be of different cat-type-format. 156 documentManager = BeanLocator.getDocstoreFactory().getDocumentManager(requestDocument.getCategory(), requestDocument.getType(), requestDocument.getFormat()); 157 if (requestDocument.getId() != null || requestDocument.getId().trim().length() > 0) { 158 requestDocument.setUuid(requestDocument.getId()); 159 } 160 //requestDocument.setOperation(operation); 161 //requestDocument.setUser(user); 162 // validate given input 163 ResponseDocument respDoc = new ResponseDocument(); 164 165 try { 166 documentManager.validateInput(requestDocument, session, fieldValueList); 167 } catch (OleDocStoreException e) { 168 respDoc.setStatus(ResponseStatus.INVALID_DATA.toString()); 169 logger.debug("validationMessage-->" + e.getMessage(), e); 170 throw new OleDocStoreException(e.getMessage()); 171 } 172 // Each document could be of different cat-type-format. 173 //documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument); 174 175 // update Docstore 176 documentManager.checkin(requestDocument, session, respDoc); 177 responseDocuments.add(respDoc); 178 } 179 // Index to solr 180 for (RequestDocument requestDocument : requestDocuments) { 181 String result = getIndexerService(requestDocument).indexDocument(requestDocument, false); 182 if (!result.startsWith("success")) { 183 throw new OleDocStoreException(result); 184 } 185 } 186 187 return responseDocuments; 188 } 189 190 @Override 191 public List<ResponseDocument> checkOut(List<RequestDocument> requestDocuments, Object object) throws Exception { 192 DocumentManager documentManager = null; 193 // Save the request documents so that they can be rolled back if necessary. 194 newStateRequestDocuments = requestDocuments; 195 List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>(); 196 for (RequestDocument requestDocument : requestDocuments) { 197 // Each document could be of different cat-type-format. 198 documentManager = BeanLocator.getDocstoreFactory().getDocumentManager(requestDocument.getCategory(), requestDocument.getType(), requestDocument.getFormat()); 199 // To differentiate between restfull API and Normal CheckOut. 200// requestDocument.setUser(user); 201 // checkOut 202 responseDocuments.add(documentManager.checkout(requestDocument, session)); 203 } 204 return responseDocuments; 205 } 206 207 @Override 208 public List<ResponseDocument> bind(List<RequestDocument> requestDocuments, String operation) throws Exception { 209 DocumentManager documentManager = null; 210 // get the previous content from docStore to rollback the records if saving the session fails. 211 212 List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>(); 213 for (RequestDocument requestDocument : requestDocuments) { 214 documentManager = BeanLocator.getDocstoreFactory().getDocumentManager(requestDocument.getCategory(), requestDocument.getType(), requestDocument.getFormat()); 215 // update Docstore 216 responseDocuments.add(documentManager.bind(requestDocument, session, operation)); 217 } 218 219 // Index to solr 220 String result = getIndexerService(requestDocuments.get(0)).bind(requestDocuments); 221 if (!result.startsWith("success")) { 222 throw new OleDocStoreException(result); 223 } 224 225 return responseDocuments; 226 } 227 228 @Override 229 public List<ResponseDocument> unbind(List<RequestDocument> requestDocuments, String operation) throws Exception { 230 DocumentManager documentManager = null; 231 // get the previous content from docStore to rollback the records if saving the session fails. 232 233 List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>(); 234 for (RequestDocument requestDocument : requestDocuments) { 235 documentManager = BeanLocator.getDocstoreFactory().getDocumentManager(requestDocument.getCategory(), requestDocument.getType(), requestDocument.getFormat()); 236 // update Docstore 237 responseDocuments.add(documentManager.unbind(requestDocument, session, operation)); 238 239 } 240 241 // Index to solr 242 String result = getIndexerService(requestDocuments.get(0)).unbind(requestDocuments); 243 if (!result.startsWith("success")) { 244 throw new OleDocStoreException(result); 245 } 246 247 return responseDocuments; 248 } 249 250 @Override 251 public List<ResponseDocument> delete(List<RequestDocument> requestDocuments) throws Exception { 252 DocumentManager documentManager = null; 253 // Save the request documents so that they can be rolled back if necessary. 254 newStateRequestDocuments = requestDocuments; 255 List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>(); 256 for (RequestDocument requestDocument : requestDocuments) { 257 // Each document could be of different cat-type-format. 258 documentManager = BeanLocator.getDocstoreFactory().getDocumentManager(requestDocument.getCategory(), requestDocument.getType(), requestDocument.getFormat()); 259 //Store 260 responseDocuments.add(documentManager.delete(requestDocument, session)); 261 } 262 // Index 263 String result = getIndexerService(requestDocuments.get(0)).delete(requestDocuments); 264 if (!result.startsWith("success")) { 265 throw new OleDocStoreException(result); 266 } 267 return responseDocuments; 268 } 269 270 @Override 271 public List<ResponseDocument> deleteVerify(List<RequestDocument> requestDocuments) throws Exception { 272 DocumentManager documentManager = null; 273 List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>(); 274 for (RequestDocument requestDocument : requestDocuments) { 275 documentManager = BeanLocator.getDocstoreFactory().getDocumentManager(requestDocument.getCategory(), requestDocument.getType(), requestDocument.getFormat()); 276 responseDocuments.add(documentManager.deleteVerify(requestDocument, session)); 277 } 278 return responseDocuments; 279 280 } 281 282 @Override 283 public void transferInstances(List<RequestDocument> requestDocuments) throws Exception { 284 JcrWorkInstanceDocumentManager.getInstance().transferInstances(requestDocuments, session); 285 if (requestDocuments.size() > 0) 286 getIndexerService(requestDocuments.get(0)).transferInstances(requestDocuments); 287 } 288 289 @Override 290 public void transferItems(List<RequestDocument> requestDocuments) throws Exception { 291 logger.debug("TransactionManager transferItems"); 292 JcrWorkInstanceDocumentManager.getInstance().transferItems(requestDocuments, session); 293 if (requestDocuments.size() > 0) 294 getIndexerService(requestDocuments.get(0)).transferItems(requestDocuments); 295 } 296 297 @Override 298 public void batchIngest(BulkProcessRequest bulkProcessRequest, List<RequestDocument> requestDocuments) 299 throws Exception { 300 301 BulkIngestStatistics bulkIngestStatistics = bulkProcessRequest.getBulkIngestStatistics(); 302 BatchIngestStatistics batchStatistics = bulkIngestStatistics.getCurrentBatch(); 303 long commitSize = ProcessParameters.BULK_INGEST_COMMIT_SIZE; 304 newStateRequestDocuments = new ArrayList<RequestDocument>(); 305 // Save the request documents so that they can be rolled back if necessary. 306 newStateRequestDocuments.addAll(requestDocuments); 307 DocumentManager documentManager = BeanLocator.getDocstoreFactory() 308 .getDocumentManager(requestDocuments.get(0).getCategory(), requestDocuments.get(0).getType(), requestDocuments.get(0).getFormat()); 309 310 // Store 311 StopWatch createNodesTimer = new StopWatch(); 312 createNodesTimer.start(); 313 documentManager.ingest(requestDocuments, session); 314 createNodesTimer.stop(); 315 batchStatistics.setTimeToCreateNodesInJcr(createNodesTimer.getTime()); 316 batchStatistics.setCommitSize(commitSize); 317 318 StopWatch indexSolrDocsTime = new StopWatch(); 319 indexSolrDocsTime.start(); 320 321 // Index 322 String result = getIndexerService(requestDocuments.get(0)).indexDocuments(requestDocuments, false); 323 indexSolrDocsTime.stop(); 324 batchStatistics.setTimeToIndexSolrInputDocs(indexSolrDocsTime.getTime()); 325 326 if (!result.startsWith("success")) { 327 throw new OleDocStoreException(result); 328 } 329 330 } 331 332 public void commit(BatchIngestStatistics batchIngestStatistics) throws OleDocStoreException { 333 logger.info("Commit: Saving changes to index..."); 334 StopWatch solrCommitTimer = new StopWatch(); 335 try { 336 if (null != batchIngestStatistics) { 337 solrCommitTimer = new StopWatch(); 338 solrCommitTimer.start(); 339 } 340 WorkBibDocumentIndexer.getInstance().commit(); 341 if (null != batchIngestStatistics) { 342 solrCommitTimer.stop(); 343 batchIngestStatistics.setTimeToSolrCommit(solrCommitTimer.getTime()); 344 } 345 } catch (Exception e) { 346 transactionState = TransactionState.FAILED; 347 logger.error("Exception during commit: Unable to save changes to index. :", e); 348 try { 349 session.refresh(false); 350 } catch (Exception re) { 351 // Ignore this, as data would not be saved anyway. 352 logger.error("Exception during commit. Unable to rollback changes to docstore. :", re); 353 } 354 throw new OleDocStoreException("Commit failed.", e); 355 } 356 357 logger.info("Commit: Saving changes to docstore..."); 358 StopWatch sessionSaveTimer = null; 359 try { 360 if (null != batchIngestStatistics) { 361 sessionSaveTimer = new StopWatch(); 362 sessionSaveTimer.start(); 363 } 364 session.save(); 365 if (null != batchIngestStatistics) { 366 sessionSaveTimer.stop(); 367 batchIngestStatistics.setTimeToSaveJcrSession(sessionSaveTimer.getTime()); 368 } 369 } catch (Exception e) { 370 transactionState = TransactionState.FAILED; 371 logger.error("Exception during commit. Unable to save changes to docstore. :", e); 372 logger.info("Commit: Reverting changes to index..."); 373 // TODO: Implement now. Delete data saved by indexerService. 374 try { 375 rollBackDataInIndexer(oldStateRequestDocuments); 376// rollBackDataInIndexer(oldStateRequestDocuments,newStateRequestDocuments); 377 } catch (Exception ex) { 378 logger.error("error while performing roll back in Indexer"); 379 } 380 throw new OleDocStoreException("Commit failed.", e); 381 } 382 //Session logout is handled in DocumentServiceImpl.java 383 // session.logout(); 384 // We are all done!!! 385 transactionState = TransactionState.COMMITTED; 386 newStateRequestDocuments = null; 387 oldStateRequestDocuments = null; 388 } 389 390 private void rollBackDataInIndexer(List<RequestDocument> oldStateRequestDocuments) throws OleDocStoreException { 391 if (oldStateRequestDocuments != null && oldStateRequestDocuments.size() > 0) { 392 for (RequestDocument requestDocument : oldStateRequestDocuments) { 393 String result = getIndexerService(requestDocument).indexDocument(requestDocument, false); 394 if (!result.startsWith("success")) { 395 throw new OleDocStoreException(result); 396 } 397 } 398 } 399 } 400 401 402} 403