View Javadoc

1   /*
2    * Copyright 2011 The Kuali Foundation.
3    * 
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    * http://www.opensource.org/licenses/ecl2.php
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.ole.docstore.service;
17  
18  import org.apache.commons.lang.time.StopWatch;
19  import org.kuali.ole.RepositoryManager;
20  import org.kuali.ole.docstore.model.enums.DocCategory;
21  import org.kuali.ole.docstore.model.enums.DocFormat;
22  import org.kuali.ole.docstore.model.enums.DocType;
23  import org.kuali.ole.docstore.model.xmlpojo.ingest.Request;
24  import org.kuali.ole.docstore.model.xmlpojo.ingest.RequestDocument;
25  import org.kuali.ole.docstore.model.xmlpojo.ingest.Response;
26  import org.kuali.ole.docstore.model.xmlpojo.ingest.ResponseDocument;
27  import org.kuali.ole.docstore.model.xmlpojo.work.instance.oleml.Instance;
28  import org.kuali.ole.docstore.model.xmlpojo.work.instance.oleml.InstanceCollection;
29  import org.kuali.ole.docstore.model.xmlpojo.work.instance.oleml.Item;
30  import org.kuali.ole.docstore.model.xstream.ingest.RequestHandler;
31  import org.kuali.ole.docstore.model.xstream.ingest.ResponseHandler;
32  import org.kuali.ole.docstore.process.BulkIngestTimeManager;
33  import org.kuali.ole.docstore.process.ProcessParameters;
34  import org.kuali.ole.docstore.utility.BatchIngestStatistics;
35  import org.kuali.ole.docstore.utility.BulkIngestStatistics;
36  import org.slf4j.Logger;
37  import org.slf4j.LoggerFactory;
38  import org.springframework.beans.factory.annotation.Required;
39  
40  import javax.jcr.Session;
41  import java.util.ArrayList;
42  import java.util.Iterator;
43  import java.util.List;
44  
45  /**
46   * Class to IngestNIndexHandlerService.
47   *
48   * @author Rajesh Chowdary K
49   * @created Feb 15, 2012
50   * <p/>
51   * Singleton instance of this class is created by Spring.
52   */
53  public class IngestNIndexHandlerService {
54  
55      private static Logger logger = LoggerFactory.getLogger(IngestNIndexHandlerService.class);
56  
57      /**
58       * Singleton instance of  RequestHandler initialized by Spring DI.
59       */
60      private RequestHandler   requestHandler;
61      /**
62       * Singleton instance of  DocumentIngester initialized by Spring DI.
63       */
64      private DocumentIngester documentIngester;
65      /**
66       * Singleton instance of  DocumentIndexer initialized by Spring DI.
67       */
68      private DocumentIndexer  documentIndexer;
69      private static long                  docCount           = 0;
70      private        BulkIngestStatistics  bulkLoadStatistics = BulkIngestStatistics.getInstance();
71      private static List<RequestDocument> prevRequestDocs    = null;
72  
73      @Required
74      public void setDocumentIngester(DocumentIngester documentIngester) {
75          this.documentIngester = documentIngester;
76      }
77  
78      @Required
79      public void setDocumentIndexer(DocumentIndexer documentIndexer) {
80          this.documentIndexer = documentIndexer;
81      }
82  
83      @Required
84      public void setRequestHandler(RequestHandler requestHandler) {
85          this.requestHandler = requestHandler;
86      }
87  
88      /**
89       * Method to ingest & index xml String Request Document
90       *
91       * @param xmlRequestString
92       * @return
93       * @throws Exception
94       */
95      public String ingestNIndexRequestDocuments(String xmlRequestString) throws Exception {
96          Request request = null;
97          request = requestHandler.toObject(xmlRequestString);
98          Response response = ingestNIndexRequestDocuments(request);
99          String xmlResponse = new ResponseHandler().toXML(response);
100         return xmlResponse;
101     }
102 
103     /**
104      * `
105      * <p/>
106      * Method to ingest & index xml String Request Document
107      *
108      * @param request
109      * @return
110      * @throws Exception
111      */
112     public Response ingestNIndexRequestDocuments(Request request) throws Exception {
113 
114         for (RequestDocument doc : request.getRequestDocuments()) {
115             doc.setUser(request.getUser());
116         }
117         Session session = null;
118         List<String> docUUIDs = new ArrayList<String>();
119         try {
120             session = RepositoryManager.getRepositoryManager().getSession(request.getUser(), request.getOperation());
121 
122             // Ingest & check for any unsupported Category/Type/Formats
123             for (RequestDocument reqDoc : request.getRequestDocuments()) {
124                 if (DocCategory.WORK.isEqualTo(reqDoc.getCategory())) {
125                     if (DocType.BIB.isEqualTo(reqDoc.getType())) { // Biblographic
126                         if (DocFormat.MARC.isEqualTo(reqDoc.getFormat())
127                             || DocFormat.DUBLIN_CORE.isEqualTo(reqDoc.getFormat()) || DocFormat.DUBLIN_UNQUALIFIED
128                                 .isEqualTo(reqDoc.getFormat())) {
129                             docUUIDs.addAll(documentIngester.ingestBibNLinkedInstanceRequestDocuments(reqDoc, session));
130                             documentIndexer.indexDocument(reqDoc);
131                         }
132                         else {
133                             logger.error("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
134                             throw new Exception("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
135                         }
136                     }
137                     else if (DocType.INSTANCE.isEqualTo(reqDoc.getType())) { // Instace
138                         if (DocFormat.OLEML.isEqualTo(reqDoc.getFormat())) { // OLE-ML
139                             documentIngester.ingestInstanceDocument(reqDoc, session, docUUIDs, null, null);
140                             documentIndexer.indexDocument(reqDoc);
141                         }
142                         else {
143                             logger.error("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
144                             throw new Exception("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
145                         }
146                     }
147                     else if (DocType.LICENSE.isEqualTo(reqDoc.getType())) { // License
148                         if (DocFormat.ONIXPL.isEqualTo(reqDoc.getFormat())
149                             || DocFormat.PDF.isEqualTo(reqDoc.getFormat())
150                             || DocFormat.DOC.isEqualTo(reqDoc.getFormat()) || DocFormat.XSLT
151                                 .isEqualTo(reqDoc.getFormat())) { //Onixpl, pdf, doc, xslt.
152                             documentIngester.ingestWorkLicenseOnixplRequestDocument(reqDoc, session, docUUIDs);
153                             documentIndexer.indexDocument(reqDoc);
154                         }
155                         else {
156                             logger.error("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
157                             throw new Exception("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
158                         }
159                     }
160                     else {
161                         logger.error("Unsupported Document Type : " + reqDoc.getType() + " Called.");
162                         throw new Exception("Unsupported Document Type : " + reqDoc.getType() + " Called.");
163                     }
164                 }
165                 else if (DocCategory.SECURITY.isEqualTo(reqDoc.getCategory())) { // Security
166                     if (DocType.PATRON.isEqualTo(reqDoc.getType())) { // Patron
167                         if (DocFormat.OLEML.isEqualTo(reqDoc.getFormat())) { // oleml
168                             docUUIDs.addAll(documentIngester.ingestPatronRequestDocument(reqDoc, session, null));
169                             documentIndexer.indexDocument(reqDoc);
170                         }
171                         else {
172                             logger.error("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
173                             throw new Exception("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
174                         }
175                     }
176                     else {
177                         logger.error("Unsupported Document Type : " + reqDoc.getType() + " Called.");
178                         throw new Exception("Unsupported Document Type : " + reqDoc.getType() + " Called.");
179                     }
180                 }
181                 else {
182                     logger.error("Unsupported Category : " + reqDoc.getCategory() + " Called.");
183                     throw new Exception("Unsupported Document Category : " + reqDoc.getCategory() + " Called.");
184                 }
185             }
186 
187             // Commit: DocStore
188             session.save();
189 
190         }
191         catch (Exception e) {
192             logger.error("Document Ingest & Index Failed, Cause: " + e.getMessage(), e);
193             documentIngester.rollbackDocStoreIngestedData(session, request.getRequestDocuments());
194             documentIndexer.rollbackIndexedData(request.getRequestDocuments());
195             throw e;
196         }
197         finally {
198             if (session != null) {
199                 RepositoryManager.getRepositoryManager().logout(session);
200             }
201         }
202         Response response = buildResponse(request);
203         return response;
204     }
205 
206     /**
207      * Method to ingest and index bulk Request.
208      *
209      * @param request
210      * @return
211      */
212     public List<String> bulkIngestNIndex(Request request, Session session) {
213         //RequestDocument requestDocument = request.getRequestDocuments().get(0);
214         //DocumentManager documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
215         BatchIngestStatistics batchStatistics = BulkIngestStatistics.getInstance().getCurrentBatch();
216         BulkIngestStatistics bulkLoadStatistics = BulkIngestStatistics.getInstance();
217         long commitSize = ProcessParameters.BULK_INGEST_COMMIT_SIZE;
218         logger.debug("commitSize = " + commitSize);
219         logger.debug("bulkIngestNIndex(" + request.getRequestDocuments().size() + ") START");
220         logger.debug("BULK_INGEST_IS_LINKING_ENABLED=" + ProcessParameters.BULK_INGEST_IS_LINKING_ENABLED);
221         //Session session = null;
222         List<String> docUUIDs = new ArrayList<String>();
223         StopWatch ingestTimer = new StopWatch();
224         StopWatch indexTimer = new StopWatch();
225         StopWatch totalTimer = new StopWatch();
226         StopWatch createNodesTimer = new StopWatch();
227         StopWatch sessionSaveTimer = new StopWatch();
228         StopWatch solrOptimizeTimer = new StopWatch();
229         long recCount = request.getRequestDocuments().size();
230         boolean isCommit = false;
231         totalTimer.start();
232         try {
233             ingestTimer.start();
234             createNodesTimer.start();
235             //session = RepositoryManager.getRepositoryManager().getSession(request.getUser(), request.getOperation());
236             List<RequestDocument> reqDocs = request.getRequestDocuments();
237             if (prevRequestDocs == null) {
238                 prevRequestDocs = new ArrayList<RequestDocument>();
239             }
240             prevRequestDocs.addAll(request.getRequestDocuments());
241             logger.info("prevRequestDocs" + prevRequestDocs.size());
242             docUUIDs.addAll(documentIngester.ingestRequestDocumentsForBulk(reqDocs, session));
243             //docUUIDs.addAll(documentIngester.ingestRequestDocumentsForBulkUsingBTreeMgr(reqDocs, session));
244             //documentManager.store(reqDocs,session);
245             createNodesTimer.stop();
246             try {
247                 ingestTimer.suspend();
248                 indexTimer.start();
249             }
250             catch (Exception e2) {
251             }
252             bulkLoadStatistics.setCommitRecCount(bulkLoadStatistics.getCommitRecCount() + recCount);
253             if (bulkLoadStatistics.getCommitRecCount() == commitSize || bulkLoadStatistics.isLastBatch()) {
254                 isCommit = true;
255             }
256             documentIndexer.indexDocumentsForBulk(reqDocs, isCommit);
257             //documentManager.index(reqDocs,isCommit);
258             try {
259                 indexTimer.suspend();
260                 ingestTimer.resume();
261             }
262             catch (Exception e2) {
263             }
264             if (isCommit) {
265                 sessionSaveTimer.start();
266                 logger.info("Bulk ingest: Repository commit started. Number of records being committed : "
267                             + bulkLoadStatistics.getCommitRecCount());
268                 session.save();
269                 bulkLoadStatistics.setCommitRecCount(0);
270                 prevRequestDocs = null;
271                 sessionSaveTimer.stop();
272             }
273 
274             try {
275                 ingestTimer.stop();
276             }
277             catch (Exception e2) {
278             }
279             // Documents processed can be different from records processed as in the case of Instance data.
280             logger.debug("Documents processed:" + recCount);
281             bulkLoadStatistics.setFileRecCount(bulkLoadStatistics.getFileRecCount() + recCount);
282             logger.info("Bulk ingest: Records processed in the current file :" + bulkLoadStatistics.getFileRecCount());
283         }
284         catch (Exception e) {
285             bulkLoadStatistics.setCommitRecCount(0);
286             try {
287                 ingestTimer.resume();
288             }
289             catch (Exception e2) {
290             }
291             //documentIngester.rollbackDocStoreIngestedData(session, request.getRequestDocuments());
292             documentIngester.rollbackDocStoreIngestedData(session, prevRequestDocs);
293             ingestTimer.stop();
294             try {
295                 indexTimer.resume();
296             }
297             catch (Exception e2) {
298             }
299             //documentIndexer.rollbackIndexedData(request.getRequestDocuments());
300             //prevRequestDocs = prevRequestDocs.subList(0, prevRequestDocs.size() - request.getRequestDocuments().size());
301             //logger.info("prevRequestDocs before remove INDEXES = " + prevRequestDocs.size());
302             documentIndexer.rollbackIndexedData(prevRequestDocs);
303             prevRequestDocs = null;
304             try {
305                 indexTimer.stop();
306             }
307             catch (Exception e2) {
308             }
309             logger.error("Document Ingest & Index Failed, Cause: " + e.getMessage(), e);
310             try {
311                 totalTimer.stop();
312             }
313             catch (Exception e2) {
314             }
315             logger.debug("Time Consumptions...:\tcreatingNodes(" + docUUIDs.size() + "):" + createNodesTimer
316                          + "\tSessionSave(" + docUUIDs.size() + "):" + sessionSaveTimer + "\tIngest(" + docUUIDs.size()
317                          + "):" + ingestTimer + "\tIndexing(" + docUUIDs.size() + "):" + indexTimer + "\tTotal Time: "
318                          + totalTimer);
319             docUUIDs.clear();
320         }
321         finally {
322             /*if (session != null) {
323                 try {
324                     RepositoryManager.getRepositoryManager().logout(session);
325                 } catch (OleException e) {
326                 }
327             } */
328         }
329         try {
330             totalTimer.stop();
331         }
332         catch (Exception exe) {
333         }
334         logger.debug(
335                 "Time Consumptions...:\tcreatingNodes(" + docUUIDs.size() + "):" + createNodesTimer + "\tSessionSave("
336                 + docUUIDs.size() + "):" + sessionSaveTimer + "\tIngest(" + docUUIDs.size() + "):" + ingestTimer
337                 + "\tIndexing(" + docUUIDs.size() + "):" + indexTimer + "\tTotal Time: " + totalTimer);
338         logger.debug("bulkIngestNIndex(" + request.getRequestDocuments().size() + ") END");
339         batchStatistics.setTimeToCreateNodesInJcr(createNodesTimer.getTime());
340         batchStatistics.setTimeToSaveJcrSession(sessionSaveTimer.getTime());
341         batchStatistics.setIngestingTime(ingestTimer.getTime());
342         batchStatistics.setIndexingTime(indexTimer.getTime());
343         batchStatistics.setIngestNIndexTotalTime(totalTimer.getTime());
344         updateProcessTimer(docUUIDs.size(), ingestTimer, indexTimer, totalTimer);
345         solrOptimizeTimer.start();
346         optimizeSolr(docUUIDs.size());
347         solrOptimizeTimer.stop();
348         batchStatistics.setTimeToSolrOptimize(solrOptimizeTimer.getTime());
349         return docUUIDs;
350     }
351 
352     private void updateProcessTimer(int recordsProcessed, StopWatch ingest, StopWatch index, StopWatch total) {
353         BulkIngestTimeManager timer = ProcessParameters.BULK_PROCESSOR_TIME_MANAGER;
354         synchronized (timer) {
355             timer.setRecordsCount(timer.getRecordsCount() + recordsProcessed);
356             timer.setIngestingTimer(timer.getIngestingTimer() + ingest.getTime());
357             timer.setIndexingTimer(timer.getIndexingTimer() + index.getTime());
358             timer.setProcessTimer(timer.getProcessTimer() + total.getTime());
359             if (timer.getRecordsCount() >= ProcessParameters.BULK_PROCESSOR_TIMER_DISPLAY) {
360                 logger.debug(
361                         "----------------------------------------------------------------------------------------------------------------------");
362                 logger.debug(timer.toString());
363                 logger.debug(
364                         "----------------------------------------------------------------------------------------------------------------------");
365                 timer.reset();
366             }
367         }
368     }
369 
370     private void optimizeSolr(long recordsProcessed) {
371         docCount += recordsProcessed;
372         logger.debug("BULK_INGEST_OPTIMIZE_SIZE=" + ProcessParameters.BULK_INGEST_OPTIMIZE_SIZE
373                      + ". Records processed till now=" + docCount);
374         logger.info("Bulk ingest: Records processed in the bulk ingest " + docCount);
375         if (docCount >= ProcessParameters.BULK_INGEST_OPTIMIZE_SIZE) {
376             docCount = 0;
377             try {
378                 logger.debug("Solr Optimization: START");
379                 documentIndexer.optimizeSolr(false, false);
380                 logger.debug("Solr Optimization: END");
381             }
382             catch (Exception e) {
383                 logger.warn("Solr Optimization Failed: ", e);
384             }
385         }
386     }
387 
388     public Response buildResponse(Request request) {
389         Response docStoreResponse = new Response();
390         docStoreResponse.setUser(request.getUser());
391         docStoreResponse.setOperation(request.getOperation());
392         docStoreResponse.setMessage("Documents ingested");
393         docStoreResponse.setStatus("Success");
394         docStoreResponse.setStatusMessage("Documents Ingested Successfully");
395         List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
396         ResponseDocument linkedDocument = null;
397         ResponseDocument responseDocument = null;
398         ResponseDocument linkedInstanceDocument = null;
399         ResponseDocument linkedInstanceItemDocument = null;
400         ResponseDocument linkedInstanceSrHoldingDoc = null;
401         // documents
402         for (Iterator<RequestDocument> iterator = request.getRequestDocuments().iterator(); iterator.hasNext(); ) {
403             RequestDocument docStoreDocument = iterator.next();
404             docStoreDocument.getContent().setContent("");
405             responseDocument = new ResponseDocument();
406             setResponseParameters(responseDocument, docStoreDocument);
407             responseDocuments.add(responseDocument);
408             if (docStoreDocument.getLinkedRequestDocuments() != null
409                 && docStoreDocument.getLinkedRequestDocuments().size() > 0 && request != null
410                 && request.getOperation() != null && !request.getOperation().equalsIgnoreCase("checkIn")) {
411                 List<ResponseDocument> linkResponseDos = new ArrayList<ResponseDocument>();
412                 // linked instance documents
413                 for (Iterator<RequestDocument> linkIterator = docStoreDocument.getLinkedRequestDocuments()
414                                                                               .iterator(); linkIterator.hasNext(); ) {
415                     RequestDocument linkedRequestDocument = linkIterator.next();
416                     linkedRequestDocument.getContent().setContent("");
417                     linkedDocument = new ResponseDocument();
418                     setResponseParameters(linkedDocument, linkedRequestDocument);
419                     linkResponseDos.add(linkedDocument);
420                     List<ResponseDocument> linkInstanceDocs = new ArrayList<ResponseDocument>();
421                     InstanceCollection instanceCollection = (InstanceCollection) linkedRequestDocument.getContent()
422                                                                                                       .getContentObject();
423                     for (Instance oleInstance : instanceCollection.getInstance()) {
424                          // holding from instance
425                         linkedInstanceDocument = new ResponseDocument();
426                         setResponseParameters(linkedInstanceDocument, linkedRequestDocument);
427                         linkedInstanceDocument.setUuid(oleInstance.getOleHoldings().getHoldingsIdentifier());
428                         linkedInstanceDocument.setType("holdings");
429                         linkInstanceDocs.add(linkedInstanceDocument);
430 
431                         //SourceHolding from Instance
432                         linkedInstanceSrHoldingDoc = new ResponseDocument();
433                         setResponseParameters(linkedInstanceSrHoldingDoc, linkedRequestDocument);
434                         if(oleInstance.getSourceHoldings() != null &&
435                                                     oleInstance.getSourceHoldings().getHoldingsIdentifier() != null ){
436                         linkedInstanceSrHoldingDoc.setUuid(oleInstance.getSourceHoldings().getHoldingsIdentifier());
437                         linkedInstanceSrHoldingDoc.setType("sourceHoldings");
438                         linkInstanceDocs.add(linkedInstanceSrHoldingDoc);
439                         }
440 
441 
442                         // item from instance
443                         for (Iterator<Item> itemIterator = oleInstance.getItems().getItem().iterator(); itemIterator
444                                 .hasNext(); ) {
445                             Item oleItem = itemIterator.next();
446                             linkedInstanceItemDocument = new ResponseDocument();
447                             setResponseParameters(linkedInstanceItemDocument, linkedRequestDocument);
448                             linkedInstanceItemDocument.setUuid(oleItem.getItemIdentifier());
449                             linkedInstanceItemDocument.setType("item");
450                             linkInstanceDocs.add(linkedInstanceItemDocument);
451                         }
452                     }
453                     responseDocument.setLinkedInstanceDocuments(linkInstanceDocs);
454                 }
455                 responseDocument.setLinkedDocuments(linkResponseDos);
456             }
457         }
458         docStoreResponse.setDocuments(responseDocuments);
459         return docStoreResponse;
460     }
461 
462     private void setResponseParameters(ResponseDocument responseDocument, RequestDocument docStoreDocument) {
463         responseDocument.setId(docStoreDocument.getId());
464         responseDocument.setCategory(docStoreDocument.getCategory());
465         responseDocument.setType(docStoreDocument.getType());
466         responseDocument.setFormat(docStoreDocument.getFormat());
467         responseDocument.setContent(docStoreDocument.getContent());
468         responseDocument.setUuid(docStoreDocument.getUuid());
469     }
470 
471 }