001/* 002 * Copyright 2011 The Kuali Foundation. 003 * 004 * Licensed under the Educational Community License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/ecl2.php 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.kuali.ole.docstore.service; 017 018import org.apache.commons.lang.time.StopWatch; 019import org.kuali.ole.RepositoryManager; 020import org.kuali.ole.docstore.common.document.content.instance.Instance; 021import org.kuali.ole.docstore.common.document.content.instance.InstanceCollection; 022import org.kuali.ole.docstore.common.document.content.instance.Item; 023import org.kuali.ole.docstore.model.enums.DocCategory; 024import org.kuali.ole.docstore.model.enums.DocFormat; 025import org.kuali.ole.docstore.model.enums.DocType; 026import org.kuali.ole.docstore.model.xmlpojo.ingest.Request; 027import org.kuali.ole.docstore.model.xmlpojo.ingest.RequestDocument; 028import org.kuali.ole.docstore.model.xmlpojo.ingest.Response; 029import org.kuali.ole.docstore.model.xmlpojo.ingest.ResponseDocument; 030import org.kuali.ole.docstore.model.xstream.ingest.RequestHandler; 031import org.kuali.ole.docstore.model.xstream.ingest.ResponseHandler; 032import org.kuali.ole.docstore.process.BulkIngestTimeManager; 033import org.kuali.ole.docstore.process.ProcessParameters; 034import org.kuali.ole.docstore.utility.BatchIngestStatistics; 035import org.kuali.ole.docstore.utility.BulkIngestStatistics; 036import org.kuali.ole.pojo.OleException; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039import org.springframework.beans.factory.annotation.Required; 040 041import javax.jcr.Session; 042import java.util.ArrayList; 043import java.util.Iterator; 044import java.util.List; 045 046/** 047 * Class to IngestNIndexHandlerService. 048 * 049 * @author Rajesh Chowdary K 050 * @created Feb 15, 2012 051 * <p/> 052 * Singleton instance of this class is created by Spring. 053 */ 054public class IngestNIndexHandlerService { 055 056 private static Logger logger = LoggerFactory.getLogger(IngestNIndexHandlerService.class); 057 058 /** 059 * Singleton instance of RequestHandler initialized by Spring DI. 060 */ 061 private RequestHandler requestHandler; 062 /** 063 * Singleton instance of DocumentIngester initialized by Spring DI. 064 */ 065 private DocumentIngester documentIngester; 066 /** 067 * Singleton instance of DocumentIndexer initialized by Spring DI. 068 */ 069 private DocumentIndexer documentIndexer; 070 private static long docCount = 0; 071 private BulkIngestStatistics bulkLoadStatistics = BulkIngestStatistics.getInstance(); 072 private static List<RequestDocument> prevRequestDocs = null; 073 private RepositoryManager repositoryManager; 074 075 @Required 076 public void setDocumentIngester(DocumentIngester documentIngester) { 077 this.documentIngester = documentIngester; 078 } 079 080 @Required 081 public void setDocumentIndexer(DocumentIndexer documentIndexer) { 082 this.documentIndexer = documentIndexer; 083 } 084 085 @Required 086 public void setRequestHandler(RequestHandler requestHandler) { 087 this.requestHandler = requestHandler; 088 } 089 090 /** 091 * Method to ingest & index xml String Request Document 092 * 093 * @param xmlRequestString 094 * @return 095 * @throws Exception 096 */ 097 public String ingestNIndexRequestDocuments(String xmlRequestString) throws Exception { 098 Request request = null; 099 request = requestHandler.toObject(xmlRequestString); 100 Response response = ingestNIndexRequestDocuments(request); 101 String xmlResponse = new ResponseHandler().toXML(response); 102 return xmlResponse; 103 } 104 105 /** 106 * ` 107 * <p/> 108 * Method to ingest & index xml String Request Document 109 * 110 * @param request 111 * @return 112 * @throws Exception 113 */ 114 public Response ingestNIndexRequestDocuments(Request request) throws Exception { 115 116 for (RequestDocument doc : request.getRequestDocuments()) { 117 doc.setUser(request.getUser()); 118 } 119 Session session = null; 120 List<String> docUUIDs = new ArrayList<String>(); 121 try { 122 session = getRepositoryManager().getSession(request.getUser(), request.getOperation()); 123 124 // Ingest & check for any unsupported Category/Type/Formats 125 for (RequestDocument reqDoc : request.getRequestDocuments()) { 126 if (DocCategory.WORK.isEqualTo(reqDoc.getCategory())) { 127 if (DocType.BIB.isEqualTo(reqDoc.getType())) { // Biblographic 128 if (DocFormat.MARC.isEqualTo(reqDoc.getFormat()) 129 || DocFormat.DUBLIN_CORE.isEqualTo(reqDoc.getFormat()) || DocFormat.DUBLIN_UNQUALIFIED 130 .isEqualTo(reqDoc.getFormat())) { 131 docUUIDs.addAll(documentIngester.ingestBibNLinkedInstanceRequestDocuments(reqDoc, session)); 132 documentIndexer.indexDocument(reqDoc); 133 } else { 134 logger.error("Unsupported Document Format : " + reqDoc.getFormat() + " Called."); 135 throw new Exception("Unsupported Document Format : " + reqDoc.getFormat() + " Called."); 136 } 137 } else if (DocType.INSTANCE.isEqualTo(reqDoc.getType())) { // Instace 138 if (DocFormat.OLEML.isEqualTo(reqDoc.getFormat())) { // OLE-ML 139 documentIngester.ingestInstanceDocument(reqDoc, session, docUUIDs, null, null); 140 documentIndexer.indexDocument(reqDoc); 141 } else { 142 logger.error("Unsupported Document Format : " + reqDoc.getFormat() + " Called."); 143 throw new Exception("Unsupported Document Format : " + reqDoc.getFormat() + " Called."); 144 } 145 } else if (DocType.LICENSE.isEqualTo(reqDoc.getType())) { // License 146 if (DocFormat.ONIXPL.isEqualTo(reqDoc.getFormat()) 147 || DocFormat.PDF.isEqualTo(reqDoc.getFormat()) 148 || DocFormat.DOC.isEqualTo(reqDoc.getFormat()) || DocFormat.XSLT 149 .isEqualTo(reqDoc.getFormat())) { //Onixpl, pdf, doc, xslt. 150 documentIngester.ingestWorkLicenseOnixplRequestDocument(reqDoc, session, docUUIDs); 151 documentIndexer.indexDocument(reqDoc); 152 } else { 153 logger.error("Unsupported Document Format : " + reqDoc.getFormat() + " Called."); 154 throw new Exception("Unsupported Document Format : " + reqDoc.getFormat() + " Called."); 155 } 156 } else { 157 logger.error("Unsupported Document Type : " + reqDoc.getType() + " Called."); 158 throw new Exception("Unsupported Document Type : " + reqDoc.getType() + " Called."); 159 } 160 } else if (DocCategory.SECURITY.isEqualTo(reqDoc.getCategory())) { // Security 161 if (DocType.PATRON.isEqualTo(reqDoc.getType())) { // Patron 162 if (DocFormat.OLEML.isEqualTo(reqDoc.getFormat())) { // oleml 163 docUUIDs.addAll(documentIngester.ingestPatronRequestDocument(reqDoc, session, null)); 164 documentIndexer.indexDocument(reqDoc); 165 } else { 166 logger.error("Unsupported Document Format : " + reqDoc.getFormat() + " Called."); 167 throw new Exception("Unsupported Document Format : " + reqDoc.getFormat() + " Called."); 168 } 169 } else { 170 logger.error("Unsupported Document Type : " + reqDoc.getType() + " Called."); 171 throw new Exception("Unsupported Document Type : " + reqDoc.getType() + " Called."); 172 } 173 } else { 174 logger.error("Unsupported Category : " + reqDoc.getCategory() + " Called."); 175 throw new Exception("Unsupported Document Category : " + reqDoc.getCategory() + " Called."); 176 } 177 } 178 179 // Commit: DocStore 180 session.save(); 181 182 } catch (Exception e) { 183 logger.error("Document Ingest & Index Failed, Cause: " + e.getMessage(), e); 184 documentIngester.rollbackDocStoreIngestedData(session, request.getRequestDocuments()); 185 documentIndexer.rollbackIndexedData(request.getRequestDocuments()); 186 throw e; 187 } finally { 188 if (session != null) { 189 getRepositoryManager().logout(session); 190 } 191 } 192 Response response = buildResponse(request); 193 return response; 194 } 195 196 private RepositoryManager getRepositoryManager() throws OleException { 197 if (null == repositoryManager) { 198 repositoryManager = RepositoryManager.getRepositoryManager(); 199 } 200 return repositoryManager; 201 } 202 203 /** 204 * Method to ingest and index bulk Request. 205 * 206 * @param request 207 * @return 208 */ 209 public List<String> bulkIngestNIndex(Request request, Session session) { 210 //RequestDocument requestDocument = request.getRequestDocuments().get(0); 211 //DocumentManager documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument); 212 BatchIngestStatistics batchStatistics = BulkIngestStatistics.getInstance().getCurrentBatch(); 213 BulkIngestStatistics bulkLoadStatistics = BulkIngestStatistics.getInstance(); 214 long commitSize = ProcessParameters.BULK_INGEST_COMMIT_SIZE; 215 logger.debug("commitSize = " + commitSize); 216 logger.debug("bulkIngestNIndex(" + request.getRequestDocuments().size() + ") START"); 217 logger.debug("BULK_INGEST_IS_LINKING_ENABLED=" + ProcessParameters.BULK_INGEST_IS_LINKING_ENABLED); 218 //Session session = null; 219 List<String> docUUIDs = new ArrayList<String>(); 220 StopWatch ingestTimer = new StopWatch(); 221 StopWatch indexTimer = new StopWatch(); 222 StopWatch totalTimer = new StopWatch(); 223 StopWatch createNodesTimer = new StopWatch(); 224 StopWatch sessionSaveTimer = new StopWatch(); 225 StopWatch solrOptimizeTimer = new StopWatch(); 226 long recCount = request.getRequestDocuments().size(); 227 boolean isCommit = false; 228 totalTimer.start(); 229 try { 230 ingestTimer.start(); 231 createNodesTimer.start(); 232 //session = RepositoryManager.getRepositoryManager().getSession(request.getUser(), request.getOperation()); 233 List<RequestDocument> reqDocs = request.getRequestDocuments(); 234 if (prevRequestDocs == null) { 235 prevRequestDocs = new ArrayList<RequestDocument>(); 236 } 237 prevRequestDocs.addAll(request.getRequestDocuments()); 238 logger.info("prevRequestDocs" + prevRequestDocs.size()); 239 docUUIDs.addAll(documentIngester.ingestRequestDocumentsForBulk(reqDocs, session)); 240 //docUUIDs.addAll(documentIngester.ingestRequestDocumentsForBulkUsingBTreeMgr(reqDocs, session)); 241 //documentManager.store(reqDocs,session); 242 createNodesTimer.stop(); 243 try { 244 ingestTimer.suspend(); 245 indexTimer.start(); 246 } catch (Exception e2) { 247 logger.error(e2.getMessage() , e2 ); 248 } 249 bulkLoadStatistics.setCommitRecCount(bulkLoadStatistics.getCommitRecCount() + recCount); 250 if (bulkLoadStatistics.getCommitRecCount() == commitSize || bulkLoadStatistics.isLastBatch()) { 251 isCommit = true; 252 } 253 documentIndexer.indexDocumentsForBulk(reqDocs, isCommit); 254 //documentManager.index(reqDocs,isCommit); 255 try { 256 indexTimer.suspend(); 257 ingestTimer.resume(); 258 } catch (Exception e2) { 259 logger.error(e2.getMessage() , e2 ); 260 } 261 if (isCommit) { 262 sessionSaveTimer.start(); 263 logger.info("Bulk ingest: Repository commit started. Number of records being committed : " 264 + bulkLoadStatistics.getCommitRecCount()); 265 session.save(); 266 bulkLoadStatistics.setCommitRecCount(0); 267 prevRequestDocs = null; 268 sessionSaveTimer.stop(); 269 } 270 271 try { 272 ingestTimer.stop(); 273 } catch (Exception e2) { 274 logger.error(e2.getMessage() , e2 ); 275 } 276 // Documents processed can be different from records processed as in the case of Instance data. 277 logger.debug("Documents processed:" + recCount); 278 bulkLoadStatistics.setFileRecCount(bulkLoadStatistics.getFileRecCount() + recCount); 279 logger.info("Bulk ingest: Records processed in the current file :" + bulkLoadStatistics.getFileRecCount()); 280 } catch (Exception e) { 281 bulkLoadStatistics.setCommitRecCount(0); 282 try { 283 ingestTimer.resume(); 284 } catch (Exception e2) { 285 logger.error(e2.getMessage() , e2 ); 286 } 287 //documentIngester.rollbackDocStoreIngestedData(session, request.getRequestDocuments()); 288 documentIngester.rollbackDocStoreIngestedData(session, prevRequestDocs); 289 ingestTimer.stop(); 290 try { 291 indexTimer.resume(); 292 } catch (Exception e2) { 293 logger.error(e2.getMessage() , e2 ); 294 } 295 //documentIndexer.rollbackIndexedData(request.getRequestDocuments()); 296 //prevRequestDocs = prevRequestDocs.subList(0, prevRequestDocs.size() - request.getRequestDocuments().size()); 297 //logger.info("prevRequestDocs before remove INDEXES = " + prevRequestDocs.size()); 298 documentIndexer.rollbackIndexedData(prevRequestDocs); 299 prevRequestDocs = null; 300 try { 301 indexTimer.stop(); 302 } catch (Exception e2) { 303 logger.error(e2.getMessage() , e2 ); 304 } 305 logger.error("Document Ingest & Index Failed, Cause: " + e.getMessage(), e); 306 try { 307 totalTimer.stop(); 308 } catch (Exception e2) { 309 logger.error(e2.getMessage() , e2 ); 310 } 311 logger.debug("Time Consumptions...:\tcreatingNodes(" + docUUIDs.size() + "):" + createNodesTimer 312 + "\tSessionSave(" + docUUIDs.size() + "):" + sessionSaveTimer + "\tIngest(" + docUUIDs.size() 313 + "):" + ingestTimer + "\tIndexing(" + docUUIDs.size() + "):" + indexTimer + "\tTotal Time: " 314 + totalTimer); 315 docUUIDs.clear(); 316 } finally { 317 /*if (session != null) { 318 try { 319 RepositoryManager.getRepositoryManager().logout(session); 320 } catch (OleException e) { 321 } 322 } */ 323 } 324 try { 325 totalTimer.stop(); 326 } catch (Exception exe) { 327 logger.error(exe.getMessage() , exe ); 328 } 329 logger.debug( 330 "Time Consumptions...:\tcreatingNodes(" + docUUIDs.size() + "):" + createNodesTimer + "\tSessionSave(" 331 + docUUIDs.size() + "):" + sessionSaveTimer + "\tIngest(" + docUUIDs.size() + "):" + ingestTimer 332 + "\tIndexing(" + docUUIDs.size() + "):" + indexTimer + "\tTotal Time: " + totalTimer); 333 logger.debug("bulkIngestNIndex(" + request.getRequestDocuments().size() + ") END"); 334 batchStatistics.setTimeToCreateNodesInJcr(createNodesTimer.getTime()); 335 batchStatistics.setTimeToSaveJcrSession(sessionSaveTimer.getTime()); 336 batchStatistics.setIngestingTime(ingestTimer.getTime()); 337 batchStatistics.setIndexingTime(indexTimer.getTime()); 338 batchStatistics.setIngestNIndexTotalTime(totalTimer.getTime()); 339 updateProcessTimer(docUUIDs.size(), ingestTimer, indexTimer, totalTimer); 340 solrOptimizeTimer.start(); 341 optimizeSolr(docUUIDs.size()); 342 solrOptimizeTimer.stop(); 343 batchStatistics.setTimeToSolrOptimize(solrOptimizeTimer.getTime()); 344 return docUUIDs; 345 } 346 347 private void updateProcessTimer(int recordsProcessed, StopWatch ingest, StopWatch index, StopWatch total) { 348 BulkIngestTimeManager timer = ProcessParameters.BULK_PROCESSOR_TIME_MANAGER; 349 synchronized (timer) { 350 timer.setRecordsCount(timer.getRecordsCount() + recordsProcessed); 351 timer.setIngestingTimer(timer.getIngestingTimer() + ingest.getTime()); 352 timer.setIndexingTimer(timer.getIndexingTimer() + index.getTime()); 353 timer.setProcessTimer(timer.getProcessTimer() + total.getTime()); 354 if (timer.getRecordsCount() >= ProcessParameters.BULK_PROCESSOR_TIMER_DISPLAY) { 355 logger.debug( 356 "----------------------------------------------------------------------------------------------------------------------"); 357 logger.debug(timer.toString()); 358 logger.debug( 359 "----------------------------------------------------------------------------------------------------------------------"); 360 timer.reset(); 361 } 362 } 363 } 364 365 private void optimizeSolr(long recordsProcessed) { 366 docCount += recordsProcessed; 367 logger.debug("BULK_INGEST_OPTIMIZE_SIZE=" + ProcessParameters.BULK_INGEST_OPTIMIZE_SIZE 368 + ". Records processed till now=" + docCount); 369 logger.info("Bulk ingest: Records processed in the bulk ingest " + docCount); 370 if (docCount >= ProcessParameters.BULK_INGEST_OPTIMIZE_SIZE) { 371 docCount = 0; 372 try { 373 logger.debug("Solr Optimization: START"); 374 documentIndexer.optimizeSolr(false, false); 375 logger.debug("Solr Optimization: END"); 376 } catch (Exception e) { 377 logger.warn("Solr Optimization Failed: ", e); 378 } 379 } 380 } 381 382 public Response buildResponse(Request request) { 383 Response docStoreResponse = new Response(); 384 docStoreResponse.setUser(request.getUser()); 385 docStoreResponse.setOperation(request.getOperation()); 386 docStoreResponse.setMessage("Documents ingested"); 387 docStoreResponse.setStatus("Success"); 388 docStoreResponse.setStatusMessage("Documents Ingested Successfully"); 389 List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>(); 390 ResponseDocument linkedDocument = null; 391 ResponseDocument responseDocument = null; 392 ResponseDocument linkedInstanceDocument = null; 393 ResponseDocument linkedInstanceItemDocument = null; 394 ResponseDocument linkedInstanceSrHoldingDoc = null; 395 // documents 396 for (Iterator<RequestDocument> iterator = request.getRequestDocuments().iterator(); iterator.hasNext(); ) { 397 RequestDocument docStoreDocument = iterator.next(); 398 docStoreDocument.getContent().setContent(""); 399 responseDocument = new ResponseDocument(); 400 setResponseParameters(responseDocument, docStoreDocument); 401 responseDocuments.add(responseDocument); 402 if (docStoreDocument.getLinkedRequestDocuments() != null 403 && docStoreDocument.getLinkedRequestDocuments().size() > 0 && request != null 404 && request.getOperation() != null && !request.getOperation().equalsIgnoreCase("checkIn")) { 405 List<ResponseDocument> linkResponseDos = new ArrayList<ResponseDocument>(); 406 // linked instance documents 407 for (Iterator<RequestDocument> linkIterator = docStoreDocument.getLinkedRequestDocuments() 408 .iterator(); linkIterator.hasNext(); ) { 409 RequestDocument linkedRequestDocument = linkIterator.next(); 410 linkedRequestDocument.getContent().setContent(""); 411 linkedDocument = new ResponseDocument(); 412 setResponseParameters(linkedDocument, linkedRequestDocument); 413 linkResponseDos.add(linkedDocument); 414 List<ResponseDocument> linkInstanceDocs = new ArrayList<ResponseDocument>(); 415 InstanceCollection instanceCollection = (InstanceCollection) linkedRequestDocument.getContent() 416 .getContentObject(); 417 for (Instance oleInstance : instanceCollection.getInstance()) { 418 // holding from instance 419 linkedInstanceDocument = new ResponseDocument(); 420 setResponseParameters(linkedInstanceDocument, linkedRequestDocument); 421 linkedInstanceDocument.setUuid(oleInstance.getOleHoldings().getHoldingsIdentifier()); 422 linkedInstanceDocument.setType("holdings"); 423 linkInstanceDocs.add(linkedInstanceDocument); 424 425 //SourceHolding from Instance 426 linkedInstanceSrHoldingDoc = new ResponseDocument(); 427 setResponseParameters(linkedInstanceSrHoldingDoc, linkedRequestDocument); 428 if (oleInstance.getSourceHoldings() != null && 429 oleInstance.getSourceHoldings().getHoldingsIdentifier() != null) { 430 linkedInstanceSrHoldingDoc.setUuid(oleInstance.getSourceHoldings().getHoldingsIdentifier()); 431 linkedInstanceSrHoldingDoc.setType("sourceHoldings"); 432 linkInstanceDocs.add(linkedInstanceSrHoldingDoc); 433 } 434 435 436 // item from instance 437 for (Iterator<Item> itemIterator = oleInstance.getItems().getItem().iterator(); itemIterator 438 .hasNext(); ) { 439 Item oleItem = itemIterator.next(); 440 linkedInstanceItemDocument = new ResponseDocument(); 441 setResponseParameters(linkedInstanceItemDocument, linkedRequestDocument); 442 linkedInstanceItemDocument.setUuid(oleItem.getItemIdentifier()); 443 linkedInstanceItemDocument.setType("item"); 444 linkInstanceDocs.add(linkedInstanceItemDocument); 445 } 446 } 447 responseDocument.setLinkedInstanceDocuments(linkInstanceDocs); 448 } 449 responseDocument.setLinkedDocuments(linkResponseDos); 450 } 451 } 452 docStoreResponse.setDocuments(responseDocuments); 453 return docStoreResponse; 454 } 455 456 private void setResponseParameters(ResponseDocument responseDocument, RequestDocument docStoreDocument) { 457 responseDocument.setId(docStoreDocument.getId()); 458 responseDocument.setCategory(docStoreDocument.getCategory()); 459 responseDocument.setType(docStoreDocument.getType()); 460 responseDocument.setFormat(docStoreDocument.getFormat()); 461 responseDocument.setContent(docStoreDocument.getContent()); 462 responseDocument.setUuid(docStoreDocument.getUuid()); 463 } 464 465 public void setRepositoryManager(RepositoryManager repositoryManager) { 466 this.repositoryManager = repositoryManager; 467 } 468 469 public DocumentIngester getDocumentIngester() { 470 return documentIngester; 471 } 472}