View Javadoc
1   /*
2    * Copyright 2011 The Kuali Foundation.
3    * 
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    * http://www.opensource.org/licenses/ecl2.php
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.ole.docstore.service;
17  
18  import org.apache.commons.lang.time.StopWatch;
19  import org.kuali.ole.RepositoryManager;
20  import org.kuali.ole.docstore.common.document.content.instance.Instance;
21  import org.kuali.ole.docstore.common.document.content.instance.InstanceCollection;
22  import org.kuali.ole.docstore.common.document.content.instance.Item;
23  import org.kuali.ole.docstore.model.enums.DocCategory;
24  import org.kuali.ole.docstore.model.enums.DocFormat;
25  import org.kuali.ole.docstore.model.enums.DocType;
26  import org.kuali.ole.docstore.model.xmlpojo.ingest.Request;
27  import org.kuali.ole.docstore.model.xmlpojo.ingest.RequestDocument;
28  import org.kuali.ole.docstore.model.xmlpojo.ingest.Response;
29  import org.kuali.ole.docstore.model.xmlpojo.ingest.ResponseDocument;
30  import org.kuali.ole.docstore.model.xstream.ingest.RequestHandler;
31  import org.kuali.ole.docstore.model.xstream.ingest.ResponseHandler;
32  import org.kuali.ole.docstore.process.BulkIngestTimeManager;
33  import org.kuali.ole.docstore.process.ProcessParameters;
34  import org.kuali.ole.docstore.utility.BatchIngestStatistics;
35  import org.kuali.ole.docstore.utility.BulkIngestStatistics;
36  import org.kuali.ole.pojo.OleException;
37  import org.slf4j.Logger;
38  import org.slf4j.LoggerFactory;
39  import org.springframework.beans.factory.annotation.Required;
40  
41  import javax.jcr.Session;
42  import java.util.ArrayList;
43  import java.util.Iterator;
44  import java.util.List;
45  
46  /**
47   * Class to IngestNIndexHandlerService.
48   *
49   * @author Rajesh Chowdary K
50   * @created Feb 15, 2012
51   * <p/>
52   * Singleton instance of this class is created by Spring.
53   */
54  public class IngestNIndexHandlerService {
55  
56      private static Logger logger = LoggerFactory.getLogger(IngestNIndexHandlerService.class);
57  
58      /**
59       * Singleton instance of  RequestHandler initialized by Spring DI.
60       */
61      private RequestHandler requestHandler;
62      /**
63       * Singleton instance of  DocumentIngester initialized by Spring DI.
64       */
65      private DocumentIngester documentIngester;
66      /**
67       * Singleton instance of  DocumentIndexer initialized by Spring DI.
68       */
69      private DocumentIndexer documentIndexer;
70      private static long docCount = 0;
71      private BulkIngestStatistics bulkLoadStatistics = BulkIngestStatistics.getInstance();
72      private static List<RequestDocument> prevRequestDocs = null;
73      private RepositoryManager repositoryManager;
74  
75      @Required
76      public void setDocumentIngester(DocumentIngester documentIngester) {
77          this.documentIngester = documentIngester;
78      }
79  
80      @Required
81      public void setDocumentIndexer(DocumentIndexer documentIndexer) {
82          this.documentIndexer = documentIndexer;
83      }
84  
85      @Required
86      public void setRequestHandler(RequestHandler requestHandler) {
87          this.requestHandler = requestHandler;
88      }
89  
90      /**
91       * Method to ingest & index xml String Request Document
92       *
93       * @param xmlRequestString
94       * @return
95       * @throws Exception
96       */
97      public String ingestNIndexRequestDocuments(String xmlRequestString) throws Exception {
98          Request request = null;
99          request = requestHandler.toObject(xmlRequestString);
100         Response response = ingestNIndexRequestDocuments(request);
101         String xmlResponse = new ResponseHandler().toXML(response);
102         return xmlResponse;
103     }
104 
105     /**
106      * `
107      * <p/>
108      * Method to ingest & index xml String Request Document
109      *
110      * @param request
111      * @return
112      * @throws Exception
113      */
114     public Response ingestNIndexRequestDocuments(Request request) throws Exception {
115 
116         for (RequestDocument doc : request.getRequestDocuments()) {
117             doc.setUser(request.getUser());
118         }
119         Session session = null;
120         List<String> docUUIDs = new ArrayList<String>();
121         try {
122             session = getRepositoryManager().getSession(request.getUser(), request.getOperation());
123 
124             // Ingest & check for any unsupported Category/Type/Formats
125             for (RequestDocument reqDoc : request.getRequestDocuments()) {
126                 if (DocCategory.WORK.isEqualTo(reqDoc.getCategory())) {
127                     if (DocType.BIB.isEqualTo(reqDoc.getType())) { // Biblographic
128                         if (DocFormat.MARC.isEqualTo(reqDoc.getFormat())
129                                 || DocFormat.DUBLIN_CORE.isEqualTo(reqDoc.getFormat()) || DocFormat.DUBLIN_UNQUALIFIED
130                                 .isEqualTo(reqDoc.getFormat())) {
131                             docUUIDs.addAll(documentIngester.ingestBibNLinkedInstanceRequestDocuments(reqDoc, session));
132                             documentIndexer.indexDocument(reqDoc);
133                         } else {
134                             logger.error("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
135                             throw new Exception("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
136                         }
137                     } else if (DocType.INSTANCE.isEqualTo(reqDoc.getType())) { // Instace
138                         if (DocFormat.OLEML.isEqualTo(reqDoc.getFormat())) { // OLE-ML
139                             documentIngester.ingestInstanceDocument(reqDoc, session, docUUIDs, null, null);
140                             documentIndexer.indexDocument(reqDoc);
141                         } else {
142                             logger.error("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
143                             throw new Exception("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
144                         }
145                     } else if (DocType.LICENSE.isEqualTo(reqDoc.getType())) { // License
146                         if (DocFormat.ONIXPL.isEqualTo(reqDoc.getFormat())
147                                 || DocFormat.PDF.isEqualTo(reqDoc.getFormat())
148                                 || DocFormat.DOC.isEqualTo(reqDoc.getFormat()) || DocFormat.XSLT
149                                 .isEqualTo(reqDoc.getFormat())) { //Onixpl, pdf, doc, xslt.
150                             documentIngester.ingestWorkLicenseOnixplRequestDocument(reqDoc, session, docUUIDs);
151                             documentIndexer.indexDocument(reqDoc);
152                         } else {
153                             logger.error("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
154                             throw new Exception("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
155                         }
156                     } else {
157                         logger.error("Unsupported Document Type : " + reqDoc.getType() + " Called.");
158                         throw new Exception("Unsupported Document Type : " + reqDoc.getType() + " Called.");
159                     }
160                 } else if (DocCategory.SECURITY.isEqualTo(reqDoc.getCategory())) { // Security
161                     if (DocType.PATRON.isEqualTo(reqDoc.getType())) { // Patron
162                         if (DocFormat.OLEML.isEqualTo(reqDoc.getFormat())) { // oleml
163                             docUUIDs.addAll(documentIngester.ingestPatronRequestDocument(reqDoc, session, null));
164                             documentIndexer.indexDocument(reqDoc);
165                         } else {
166                             logger.error("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
167                             throw new Exception("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
168                         }
169                     } else {
170                         logger.error("Unsupported Document Type : " + reqDoc.getType() + " Called.");
171                         throw new Exception("Unsupported Document Type : " + reqDoc.getType() + " Called.");
172                     }
173                 } else {
174                     logger.error("Unsupported Category : " + reqDoc.getCategory() + " Called.");
175                     throw new Exception("Unsupported Document Category : " + reqDoc.getCategory() + " Called.");
176                 }
177             }
178 
179             // Commit: DocStore
180             session.save();
181 
182         } catch (Exception e) {
183             logger.error("Document Ingest & Index Failed, Cause: " + e.getMessage(), e);
184             documentIngester.rollbackDocStoreIngestedData(session, request.getRequestDocuments());
185             documentIndexer.rollbackIndexedData(request.getRequestDocuments());
186             throw e;
187         } finally {
188             if (session != null) {
189                 getRepositoryManager().logout(session);
190             }
191         }
192         Response response = buildResponse(request);
193         return response;
194     }
195 
196     private RepositoryManager getRepositoryManager() throws OleException {
197         if (null == repositoryManager) {
198             repositoryManager = RepositoryManager.getRepositoryManager();
199         }
200         return repositoryManager;
201     }
202 
203     /**
204      * Method to ingest and index bulk Request.
205      *
206      * @param request
207      * @return
208      */
209     public List<String> bulkIngestNIndex(Request request, Session session) {
210         //RequestDocument requestDocument = request.getRequestDocuments().get(0);
211         //DocumentManager documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
212         BatchIngestStatistics batchStatistics = BulkIngestStatistics.getInstance().getCurrentBatch();
213         BulkIngestStatistics bulkLoadStatistics = BulkIngestStatistics.getInstance();
214         long commitSize = ProcessParameters.BULK_INGEST_COMMIT_SIZE;
215         logger.debug("commitSize = " + commitSize);
216         logger.debug("bulkIngestNIndex(" + request.getRequestDocuments().size() + ") START");
217         logger.debug("BULK_INGEST_IS_LINKING_ENABLED=" + ProcessParameters.BULK_INGEST_IS_LINKING_ENABLED);
218         //Session session = null;
219         List<String> docUUIDs = new ArrayList<String>();
220         StopWatch ingestTimer = new StopWatch();
221         StopWatch indexTimer = new StopWatch();
222         StopWatch totalTimer = new StopWatch();
223         StopWatch createNodesTimer = new StopWatch();
224         StopWatch sessionSaveTimer = new StopWatch();
225         StopWatch solrOptimizeTimer = new StopWatch();
226         long recCount = request.getRequestDocuments().size();
227         boolean isCommit = false;
228         totalTimer.start();
229         try {
230             ingestTimer.start();
231             createNodesTimer.start();
232             //session = RepositoryManager.getRepositoryManager().getSession(request.getUser(), request.getOperation());
233             List<RequestDocument> reqDocs = request.getRequestDocuments();
234             if (prevRequestDocs == null) {
235                 prevRequestDocs = new ArrayList<RequestDocument>();
236             }
237             prevRequestDocs.addAll(request.getRequestDocuments());
238             logger.info("prevRequestDocs" + prevRequestDocs.size());
239             docUUIDs.addAll(documentIngester.ingestRequestDocumentsForBulk(reqDocs, session));
240             //docUUIDs.addAll(documentIngester.ingestRequestDocumentsForBulkUsingBTreeMgr(reqDocs, session));
241             //documentManager.store(reqDocs,session);
242             createNodesTimer.stop();
243             try {
244                 ingestTimer.suspend();
245                 indexTimer.start();
246             } catch (Exception e2) {
247                 logger.error(e2.getMessage() , e2 );
248             }
249             bulkLoadStatistics.setCommitRecCount(bulkLoadStatistics.getCommitRecCount() + recCount);
250             if (bulkLoadStatistics.getCommitRecCount() == commitSize || bulkLoadStatistics.isLastBatch()) {
251                 isCommit = true;
252             }
253             documentIndexer.indexDocumentsForBulk(reqDocs, isCommit);
254             //documentManager.index(reqDocs,isCommit);
255             try {
256                 indexTimer.suspend();
257                 ingestTimer.resume();
258             } catch (Exception e2) {
259                 logger.error(e2.getMessage() , e2 );
260             }
261             if (isCommit) {
262                 sessionSaveTimer.start();
263                 logger.info("Bulk ingest: Repository commit started. Number of records being committed : "
264                         + bulkLoadStatistics.getCommitRecCount());
265                 session.save();
266                 bulkLoadStatistics.setCommitRecCount(0);
267                 prevRequestDocs = null;
268                 sessionSaveTimer.stop();
269             }
270 
271             try {
272                 ingestTimer.stop();
273             } catch (Exception e2) {
274                 logger.error(e2.getMessage() , e2 );
275             }
276             // Documents processed can be different from records processed as in the case of Instance data.
277             logger.debug("Documents processed:" + recCount);
278             bulkLoadStatistics.setFileRecCount(bulkLoadStatistics.getFileRecCount() + recCount);
279             logger.info("Bulk ingest: Records processed in the current file :" + bulkLoadStatistics.getFileRecCount());
280         } catch (Exception e) {
281             bulkLoadStatistics.setCommitRecCount(0);
282             try {
283                 ingestTimer.resume();
284             } catch (Exception e2) {
285                 logger.error(e2.getMessage() , e2 );
286             }
287             //documentIngester.rollbackDocStoreIngestedData(session, request.getRequestDocuments());
288             documentIngester.rollbackDocStoreIngestedData(session, prevRequestDocs);
289             ingestTimer.stop();
290             try {
291                 indexTimer.resume();
292             } catch (Exception e2) {
293                 logger.error(e2.getMessage() , e2 );
294             }
295             //documentIndexer.rollbackIndexedData(request.getRequestDocuments());
296             //prevRequestDocs = prevRequestDocs.subList(0, prevRequestDocs.size() - request.getRequestDocuments().size());
297             //logger.info("prevRequestDocs before remove INDEXES = " + prevRequestDocs.size());
298             documentIndexer.rollbackIndexedData(prevRequestDocs);
299             prevRequestDocs = null;
300             try {
301                 indexTimer.stop();
302             } catch (Exception e2) {
303                 logger.error(e2.getMessage() , e2 );
304             }
305             logger.error("Document Ingest & Index Failed, Cause: " + e.getMessage(), e);
306             try {
307                 totalTimer.stop();
308             } catch (Exception e2) {
309                 logger.error(e2.getMessage() , e2 );
310             }
311             logger.debug("Time Consumptions...:\tcreatingNodes(" + docUUIDs.size() + "):" + createNodesTimer
312                     + "\tSessionSave(" + docUUIDs.size() + "):" + sessionSaveTimer + "\tIngest(" + docUUIDs.size()
313                     + "):" + ingestTimer + "\tIndexing(" + docUUIDs.size() + "):" + indexTimer + "\tTotal Time: "
314                     + totalTimer);
315             docUUIDs.clear();
316         } finally {
317             /*if (session != null) {
318                 try {
319                     RepositoryManager.getRepositoryManager().logout(session);
320                 } catch (OleException e) {
321                 }
322             } */
323         }
324         try {
325             totalTimer.stop();
326         } catch (Exception exe) {
327             logger.error(exe.getMessage() , exe );
328         }
329         logger.debug(
330                 "Time Consumptions...:\tcreatingNodes(" + docUUIDs.size() + "):" + createNodesTimer + "\tSessionSave("
331                         + docUUIDs.size() + "):" + sessionSaveTimer + "\tIngest(" + docUUIDs.size() + "):" + ingestTimer
332                         + "\tIndexing(" + docUUIDs.size() + "):" + indexTimer + "\tTotal Time: " + totalTimer);
333         logger.debug("bulkIngestNIndex(" + request.getRequestDocuments().size() + ") END");
334         batchStatistics.setTimeToCreateNodesInJcr(createNodesTimer.getTime());
335         batchStatistics.setTimeToSaveJcrSession(sessionSaveTimer.getTime());
336         batchStatistics.setIngestingTime(ingestTimer.getTime());
337         batchStatistics.setIndexingTime(indexTimer.getTime());
338         batchStatistics.setIngestNIndexTotalTime(totalTimer.getTime());
339         updateProcessTimer(docUUIDs.size(), ingestTimer, indexTimer, totalTimer);
340         solrOptimizeTimer.start();
341         optimizeSolr(docUUIDs.size());
342         solrOptimizeTimer.stop();
343         batchStatistics.setTimeToSolrOptimize(solrOptimizeTimer.getTime());
344         return docUUIDs;
345     }
346 
347     private void updateProcessTimer(int recordsProcessed, StopWatch ingest, StopWatch index, StopWatch total) {
348         BulkIngestTimeManager timer = ProcessParameters.BULK_PROCESSOR_TIME_MANAGER;
349         synchronized (timer) {
350             timer.setRecordsCount(timer.getRecordsCount() + recordsProcessed);
351             timer.setIngestingTimer(timer.getIngestingTimer() + ingest.getTime());
352             timer.setIndexingTimer(timer.getIndexingTimer() + index.getTime());
353             timer.setProcessTimer(timer.getProcessTimer() + total.getTime());
354             if (timer.getRecordsCount() >= ProcessParameters.BULK_PROCESSOR_TIMER_DISPLAY) {
355                 logger.debug(
356                         "----------------------------------------------------------------------------------------------------------------------");
357                 logger.debug(timer.toString());
358                 logger.debug(
359                         "----------------------------------------------------------------------------------------------------------------------");
360                 timer.reset();
361             }
362         }
363     }
364 
365     private void optimizeSolr(long recordsProcessed) {
366         docCount += recordsProcessed;
367         logger.debug("BULK_INGEST_OPTIMIZE_SIZE=" + ProcessParameters.BULK_INGEST_OPTIMIZE_SIZE
368                 + ". Records processed till now=" + docCount);
369         logger.info("Bulk ingest: Records processed in the bulk ingest " + docCount);
370         if (docCount >= ProcessParameters.BULK_INGEST_OPTIMIZE_SIZE) {
371             docCount = 0;
372             try {
373                 logger.debug("Solr Optimization: START");
374                 documentIndexer.optimizeSolr(false, false);
375                 logger.debug("Solr Optimization: END");
376             } catch (Exception e) {
377                 logger.warn("Solr Optimization Failed: ", e);
378             }
379         }
380     }
381 
382     public Response buildResponse(Request request) {
383         Response docStoreResponse = new Response();
384         docStoreResponse.setUser(request.getUser());
385         docStoreResponse.setOperation(request.getOperation());
386         docStoreResponse.setMessage("Documents ingested");
387         docStoreResponse.setStatus("Success");
388         docStoreResponse.setStatusMessage("Documents Ingested Successfully");
389         List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
390         ResponseDocument linkedDocument = null;
391         ResponseDocument responseDocument = null;
392         ResponseDocument linkedInstanceDocument = null;
393         ResponseDocument linkedInstanceItemDocument = null;
394         ResponseDocument linkedInstanceSrHoldingDoc = null;
395         // documents
396         for (Iterator<RequestDocument> iterator = request.getRequestDocuments().iterator(); iterator.hasNext(); ) {
397             RequestDocument docStoreDocument = iterator.next();
398             docStoreDocument.getContent().setContent("");
399             responseDocument = new ResponseDocument();
400             setResponseParameters(responseDocument, docStoreDocument);
401             responseDocuments.add(responseDocument);
402             if (docStoreDocument.getLinkedRequestDocuments() != null
403                     && docStoreDocument.getLinkedRequestDocuments().size() > 0 && request != null
404                     && request.getOperation() != null && !request.getOperation().equalsIgnoreCase("checkIn")) {
405                 List<ResponseDocument> linkResponseDos = new ArrayList<ResponseDocument>();
406                 // linked instance documents
407                 for (Iterator<RequestDocument> linkIterator = docStoreDocument.getLinkedRequestDocuments()
408                         .iterator(); linkIterator.hasNext(); ) {
409                     RequestDocument linkedRequestDocument = linkIterator.next();
410                     linkedRequestDocument.getContent().setContent("");
411                     linkedDocument = new ResponseDocument();
412                     setResponseParameters(linkedDocument, linkedRequestDocument);
413                     linkResponseDos.add(linkedDocument);
414                     List<ResponseDocument> linkInstanceDocs = new ArrayList<ResponseDocument>();
415                     InstanceCollection instanceCollection = (InstanceCollection) linkedRequestDocument.getContent()
416                             .getContentObject();
417                     for (Instance oleInstance : instanceCollection.getInstance()) {
418                         // holding from instance
419                         linkedInstanceDocument = new ResponseDocument();
420                         setResponseParameters(linkedInstanceDocument, linkedRequestDocument);
421                         linkedInstanceDocument.setUuid(oleInstance.getOleHoldings().getHoldingsIdentifier());
422                         linkedInstanceDocument.setType("holdings");
423                         linkInstanceDocs.add(linkedInstanceDocument);
424 
425                         //SourceHolding from Instance
426                         linkedInstanceSrHoldingDoc = new ResponseDocument();
427                         setResponseParameters(linkedInstanceSrHoldingDoc, linkedRequestDocument);
428                         if (oleInstance.getSourceHoldings() != null &&
429                                 oleInstance.getSourceHoldings().getHoldingsIdentifier() != null) {
430                             linkedInstanceSrHoldingDoc.setUuid(oleInstance.getSourceHoldings().getHoldingsIdentifier());
431                             linkedInstanceSrHoldingDoc.setType("sourceHoldings");
432                             linkInstanceDocs.add(linkedInstanceSrHoldingDoc);
433                         }
434 
435 
436                         // item from instance
437                         for (Iterator<Item> itemIterator = oleInstance.getItems().getItem().iterator(); itemIterator
438                                 .hasNext(); ) {
439                             Item oleItem = itemIterator.next();
440                             linkedInstanceItemDocument = new ResponseDocument();
441                             setResponseParameters(linkedInstanceItemDocument, linkedRequestDocument);
442                             linkedInstanceItemDocument.setUuid(oleItem.getItemIdentifier());
443                             linkedInstanceItemDocument.setType("item");
444                             linkInstanceDocs.add(linkedInstanceItemDocument);
445                         }
446                     }
447                     responseDocument.setLinkedInstanceDocuments(linkInstanceDocs);
448                 }
449                 responseDocument.setLinkedDocuments(linkResponseDos);
450             }
451         }
452         docStoreResponse.setDocuments(responseDocuments);
453         return docStoreResponse;
454     }
455 
456     private void setResponseParameters(ResponseDocument responseDocument, RequestDocument docStoreDocument) {
457         responseDocument.setId(docStoreDocument.getId());
458         responseDocument.setCategory(docStoreDocument.getCategory());
459         responseDocument.setType(docStoreDocument.getType());
460         responseDocument.setFormat(docStoreDocument.getFormat());
461         responseDocument.setContent(docStoreDocument.getContent());
462         responseDocument.setUuid(docStoreDocument.getUuid());
463     }
464 
465     public void setRepositoryManager(RepositoryManager repositoryManager) {
466         this.repositoryManager = repositoryManager;
467     }
468 
469     public DocumentIngester getDocumentIngester() {
470         return documentIngester;
471     }
472 }