001 package org.kuali.ole.docstore.transaction;
002
003 import java.util.ArrayList;
004 import java.util.List;
005 import javax.jcr.Node;
006 import javax.jcr.Session;
007
008 import org.apache.commons.lang.time.StopWatch;
009 import org.kuali.ole.RepositoryManager;
010 import org.kuali.ole.docstore.OleDocStoreException;
011 import org.kuali.ole.docstore.discovery.service.IndexerService;
012 import org.kuali.ole.docstore.document.DocumentManager;
013 import org.kuali.ole.docstore.model.xmlpojo.ingest.RequestDocument;
014 import org.kuali.ole.docstore.model.xmlpojo.ingest.ResponseDocument;
015 import org.kuali.ole.docstore.process.batch.BulkProcessRequest;
016 import org.kuali.ole.docstore.repository.CustomNodeManager;
017 import org.kuali.ole.docstore.service.BeanLocator;
018 import org.kuali.ole.docstore.service.ServiceLocator;
019 import org.kuali.ole.docstore.utility.BatchIngestStatistics;
020 import org.kuali.ole.docstore.utility.BulkIngestStatistics;
021 import org.kuali.ole.repository.NodeHandler;
022 import org.slf4j.Logger;
023 import org.slf4j.LoggerFactory;
024
025 /**
026 * Encapsulates reusable common logic to support ACID transactions in DocStore.
027 * User: tirumalesh.b
028 * Date: 26/9/12 Time: 10:40 AM
029 */
030 public class TransactionManager {
031 private static final Logger logger = LoggerFactory.getLogger(NodeHandler.class);
032 private IndexerService indexerService;
033 private Session session;
034 private TransactionState transactionState = TransactionState.IDLE;
035 private List<RequestDocument> newStateRequestDocuments = null;
036 private List<RequestDocument> oldStateRequestDocuments = new ArrayList<RequestDocument>();
037
038 public enum TransactionState {
039 IDLE, STARTED, COMMITTED, ABORTED, FAILED
040 }
041
042 public void startTransaction(String user, String operation) throws OleDocStoreException {
043 if (transactionState == TransactionState.STARTED) {
044 throw new OleDocStoreException("Transaction already started.");
045 }
046 try {
047 if (null == session) {
048 session = RepositoryManager.getRepositoryManager().getSession(user, operation);
049 }
050 if (null == indexerService) {
051 indexerService = ServiceLocator.getIndexerService();
052 }
053 }
054 catch (Exception e) {
055 throw new OleDocStoreException(e);
056 }
057 transactionState = TransactionState.STARTED;
058 }
059
060 public void commit() throws OleDocStoreException {
061 commit(null);
062 }
063
064 public void commit(BatchIngestStatistics batchIngestStatistics) throws OleDocStoreException {
065 logger.info("Commit: Saving changes to index...");
066 StopWatch solrCommitTimer = new StopWatch();
067 try {
068 if (null != batchIngestStatistics) {
069 solrCommitTimer = new StopWatch();
070 solrCommitTimer.start();
071 }
072 indexerService.commit();
073 if (null != batchIngestStatistics) {
074 solrCommitTimer.stop();
075 batchIngestStatistics.setTimeToSolrCommit(solrCommitTimer.getTime());
076 }
077 }
078 catch (Exception e) {
079 transactionState = TransactionState.FAILED;
080 logger.error("Exception during commit: Unable to save changes to index. :", e);
081 try {
082 session.refresh(false);
083 }
084 catch (Exception re) {
085 // Ignore this, as data would not be saved anyway.
086 logger.error("Exception during commit. Unable to rollback changes to docstore. :", re);
087 }
088 throw new OleDocStoreException("Commit failed.", e);
089 }
090
091 logger.info("Commit: Saving changes to docstore...");
092 StopWatch sessionSaveTimer = null;
093 new StopWatch();
094 try {
095 if (null != batchIngestStatistics) {
096 sessionSaveTimer = new StopWatch();
097 sessionSaveTimer.start();
098 }
099 session.save();
100 if (null != batchIngestStatistics) {
101 sessionSaveTimer.stop();
102 batchIngestStatistics.setTimeToSaveJcrSession(sessionSaveTimer.getTime());
103 }
104 }
105 catch (Exception e) {
106 transactionState = TransactionState.FAILED;
107 logger.error("Exception during commit. Unable to save changes to docstore. :", e);
108 logger.info("Commit: Reverting changes to index...");
109 // TODO: Implement now. Delete data saved by indexerService.
110 try {
111 rollBackDataInIndexer(oldStateRequestDocuments);
112 }
113 catch (Exception ex) {
114 logger.error("error while performing roll back in Indexer");
115 }
116 throw new OleDocStoreException("Commit failed.", e);
117 }
118 // We are all done!!!
119 transactionState = TransactionState.COMMITTED;
120 newStateRequestDocuments = null;
121 oldStateRequestDocuments = null;
122 }
123
124 private void rollBackDataInIndexer(List<RequestDocument> oldStateRequestDocuments) throws OleDocStoreException {
125 if (oldStateRequestDocuments != null && oldStateRequestDocuments.size() > 0) {
126 String result = indexerService.indexDocuments(oldStateRequestDocuments, false);
127 if (!result.startsWith("success")) {
128 throw new OleDocStoreException(result);
129 }
130 }
131 }
132
133 public void abort() {
134 try {
135 session.refresh(false);
136 }
137 catch (Exception re) {
138 // Ignore this, as we want to rollback anyway.
139 logger.error("Exception during abort. Unable to rollback changes to docstore. :", re);
140
141 }
142 try {
143 indexerService.rollback();
144 }
145 catch (Exception e) {
146 // Ignore this, as we want to rollback anyway.
147 logger.error("Exception during abort. Unable to rollback changes to index. :", e);
148 }
149 transactionState = TransactionState.ABORTED;
150 newStateRequestDocuments = null;
151 oldStateRequestDocuments = null;
152 }
153
154 public void close() {
155 session.logout();
156 session = null;
157 transactionState = TransactionState.IDLE;
158 newStateRequestDocuments = null;
159 oldStateRequestDocuments = null;
160 setIndexerService(null);
161 }
162
163 public List<ResponseDocument> ingest(List<RequestDocument> requestDocuments) throws OleDocStoreException {
164 DocumentManager documentManager = null;
165 // Save the request documents so that they can be rolled back if necessary.
166 newStateRequestDocuments = requestDocuments;
167 List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
168 for (RequestDocument requestDocument : requestDocuments) {
169 // Each document could be of different cat-type-format.
170 documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
171 // Store
172 responseDocuments.add(documentManager.ingest(requestDocument, session));
173 }
174 // Index
175 String result = indexerService.indexDocuments(requestDocuments, false);
176 if (!result.startsWith("success")) {
177 throw new OleDocStoreException(result);
178 }
179
180 return responseDocuments;
181 }
182
183 public ResponseDocument ingest(RequestDocument requestDocument) throws OleDocStoreException {
184 DocumentManager documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
185 // Save the request documents so that they can be rolled back if necessary.
186 newStateRequestDocuments = new ArrayList<RequestDocument>(1);
187 newStateRequestDocuments.add(requestDocument);
188
189 ResponseDocument responseDocument = documentManager.ingest(requestDocument, session);
190 String result = indexerService.indexDocument(requestDocument, false);
191 if (!result.startsWith("success")) {
192 throw new OleDocStoreException(result);
193 }
194 return responseDocument;
195 }
196
197
198 public List<ResponseDocument> checkIn(List<RequestDocument> requestDocuments, String operation)
199 throws OleDocStoreException {
200 DocumentManager documentManager = null;
201 // get the previous content from docStore to rollback the records if saving the session fails.
202
203 List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
204 for (RequestDocument requestDocument : requestDocuments) {
205 if (requestDocument.getId() != null || requestDocument.getId().trim().length() > 0) {
206 requestDocument.setUuid(requestDocument.getId());
207 }
208
209 /* try {
210
211 Node nodeByUUID = CustomNodeManager.getInstance().getNodeByUUID(session, requestDocument.getId());
212 RequestDocument prevRequestDoc = new RequestDocument();
213 prevRequestDoc = requestDocument;
214 String prevContent = nodeByUUID.getNode("jcr:content").getProperty("jcr:data").getValue().getString();
215 prevRequestDoc.getContent().setContent(prevContent);
216 oldStateRequestDocuments.add(prevRequestDoc);
217 }
218 catch (Exception ex) {
219 String failMsg = "Check in failed." + "Unable to get the record from docstore. Record UUID "
220 + requestDocument.getId();
221 logger.info(failMsg, ex);
222 throw new OleDocStoreException(failMsg, ex);
223 }
224 */
225 requestDocument.setOperation(operation);
226 // Each document could be of different cat-type-format.
227 documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
228
229 // update Docstore
230 responseDocuments.add(documentManager.checkin(requestDocument, session));
231 }
232 // Index to solr
233 String result = indexerService.indexDocuments(requestDocuments, false);
234 if (!result.startsWith("success")) {
235 throw new OleDocStoreException(result);
236 }
237
238 return responseDocuments;
239 }
240
241
242 public List<ResponseDocument> checkOut(List<RequestDocument> requestDocuments, String user)
243 throws OleDocStoreException {
244 DocumentManager documentManager = null;
245 // Save the request documents so that they can be rolled back if necessary.
246 newStateRequestDocuments = requestDocuments;
247 List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
248 for (RequestDocument requestDocument : requestDocuments) {
249 // Each document could be of different cat-type-format.
250 documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
251 // To differentiate between restfull API and Normal CheckOut.
252 requestDocument.setUser(user);
253 // checkOut
254 responseDocuments.add(documentManager.checkout(requestDocument, session));
255 }
256 return responseDocuments;
257 }
258
259
260 public ResponseDocument checkIn(RequestDocument requestDocument) throws OleDocStoreException {
261 DocumentManager documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
262 // Save the request documents so that they can be rolled back if necessary.
263 newStateRequestDocuments = new ArrayList<RequestDocument>(1);
264 newStateRequestDocuments.add(requestDocument);
265 // update Docstore
266 ResponseDocument responseDocument = documentManager.checkin(requestDocument, session);
267 //Index to solr
268 String result = indexerService.indexDocument(requestDocument, false);
269 if (!result.startsWith("success")) {
270 throw new OleDocStoreException(result);
271 }
272 return responseDocument;
273 }
274
275 public void batchIngest(BulkProcessRequest bulkProcessRequest, List<RequestDocument> requestDocuments)
276 throws OleDocStoreException {
277 BulkIngestStatistics bulkIngestStatistics = bulkProcessRequest.getBulkIngestStatistics();
278 BatchIngestStatistics batchStatistics = bulkIngestStatistics.getCurrentBatch();
279 newStateRequestDocuments = new ArrayList<RequestDocument>();
280 // Save the request documents so that they can be rolled back if necessary.
281 newStateRequestDocuments.addAll(requestDocuments);
282 DocumentManager documentManager = BeanLocator.getDocumentManagerFactory()
283 .getDocumentManager(requestDocuments.get(0));
284
285 // Store
286 StopWatch createNodesTimer = new StopWatch();
287 createNodesTimer.start();
288 documentManager.ingest(requestDocuments, session);
289 createNodesTimer.stop();
290 batchStatistics.setTimeToCreateNodesInJcr(createNodesTimer.getTime());
291
292 StopWatch indexSolrDocsTime = new StopWatch();
293 indexSolrDocsTime.start();
294
295 // Index
296 String result = indexerService.indexDocuments(requestDocuments, false);
297 indexSolrDocsTime.stop();
298 batchStatistics.setTimeToIndexSolrInputDocs(indexSolrDocsTime.getTime());
299
300 if (!result.startsWith("success")) {
301 throw new OleDocStoreException(result);
302 }
303 }
304
305 public List<ResponseDocument> delete(List<RequestDocument> requestDocuments) throws OleDocStoreException {
306 DocumentManager documentManager = null;
307 List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
308 for (RequestDocument requestDocument : requestDocuments) {
309 documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
310 responseDocuments.add(documentManager.delete(requestDocument, session));
311 }
312 return responseDocuments;
313 }
314
315 public IndexerService getIndexerService() {
316 return indexerService;
317 }
318
319 public void setIndexerService(IndexerService indexerService) {
320 this.indexerService = indexerService;
321 }
322 }