001/*
002 * Copyright 2011 The Kuali Foundation.
003 * 
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 * 
008 * http://www.opensource.org/licenses/ecl2.php
009 * 
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.kuali.ole.docstore.service;
017
018import org.apache.commons.lang.time.StopWatch;
019import org.kuali.ole.RepositoryManager;
020import org.kuali.ole.docstore.common.document.content.instance.Instance;
021import org.kuali.ole.docstore.common.document.content.instance.InstanceCollection;
022import org.kuali.ole.docstore.common.document.content.instance.Item;
023import org.kuali.ole.docstore.model.enums.DocCategory;
024import org.kuali.ole.docstore.model.enums.DocFormat;
025import org.kuali.ole.docstore.model.enums.DocType;
026import org.kuali.ole.docstore.model.xmlpojo.ingest.Request;
027import org.kuali.ole.docstore.model.xmlpojo.ingest.RequestDocument;
028import org.kuali.ole.docstore.model.xmlpojo.ingest.Response;
029import org.kuali.ole.docstore.model.xmlpojo.ingest.ResponseDocument;
030import org.kuali.ole.docstore.model.xstream.ingest.RequestHandler;
031import org.kuali.ole.docstore.model.xstream.ingest.ResponseHandler;
032import org.kuali.ole.docstore.process.BulkIngestTimeManager;
033import org.kuali.ole.docstore.process.ProcessParameters;
034import org.kuali.ole.docstore.utility.BatchIngestStatistics;
035import org.kuali.ole.docstore.utility.BulkIngestStatistics;
036import org.kuali.ole.pojo.OleException;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039import org.springframework.beans.factory.annotation.Required;
040
041import javax.jcr.Session;
042import java.util.ArrayList;
043import java.util.Iterator;
044import java.util.List;
045
046/**
047 * Class to IngestNIndexHandlerService.
048 *
049 * @author Rajesh Chowdary K
050 * @created Feb 15, 2012
051 * <p/>
052 * Singleton instance of this class is created by Spring.
053 */
054public class IngestNIndexHandlerService {
055
056    private static Logger logger = LoggerFactory.getLogger(IngestNIndexHandlerService.class);
057
058    /**
059     * Singleton instance of  RequestHandler initialized by Spring DI.
060     */
061    private RequestHandler requestHandler;
062    /**
063     * Singleton instance of  DocumentIngester initialized by Spring DI.
064     */
065    private DocumentIngester documentIngester;
066    /**
067     * Singleton instance of  DocumentIndexer initialized by Spring DI.
068     */
069    private DocumentIndexer documentIndexer;
070    private static long docCount = 0;
071    private BulkIngestStatistics bulkLoadStatistics = BulkIngestStatistics.getInstance();
072    private static List<RequestDocument> prevRequestDocs = null;
073    private RepositoryManager repositoryManager;
074
075    @Required
076    public void setDocumentIngester(DocumentIngester documentIngester) {
077        this.documentIngester = documentIngester;
078    }
079
080    @Required
081    public void setDocumentIndexer(DocumentIndexer documentIndexer) {
082        this.documentIndexer = documentIndexer;
083    }
084
085    @Required
086    public void setRequestHandler(RequestHandler requestHandler) {
087        this.requestHandler = requestHandler;
088    }
089
090    /**
091     * Method to ingest & index xml String Request Document
092     *
093     * @param xmlRequestString
094     * @return
095     * @throws Exception
096     */
097    public String ingestNIndexRequestDocuments(String xmlRequestString) throws Exception {
098        Request request = null;
099        request = requestHandler.toObject(xmlRequestString);
100        Response response = ingestNIndexRequestDocuments(request);
101        String xmlResponse = new ResponseHandler().toXML(response);
102        return xmlResponse;
103    }
104
105    /**
106     * `
107     * <p/>
108     * Method to ingest & index xml String Request Document
109     *
110     * @param request
111     * @return
112     * @throws Exception
113     */
114    public Response ingestNIndexRequestDocuments(Request request) throws Exception {
115
116        for (RequestDocument doc : request.getRequestDocuments()) {
117            doc.setUser(request.getUser());
118        }
119        Session session = null;
120        List<String> docUUIDs = new ArrayList<String>();
121        try {
122            session = getRepositoryManager().getSession(request.getUser(), request.getOperation());
123
124            // Ingest & check for any unsupported Category/Type/Formats
125            for (RequestDocument reqDoc : request.getRequestDocuments()) {
126                if (DocCategory.WORK.isEqualTo(reqDoc.getCategory())) {
127                    if (DocType.BIB.isEqualTo(reqDoc.getType())) { // Biblographic
128                        if (DocFormat.MARC.isEqualTo(reqDoc.getFormat())
129                                || DocFormat.DUBLIN_CORE.isEqualTo(reqDoc.getFormat()) || DocFormat.DUBLIN_UNQUALIFIED
130                                .isEqualTo(reqDoc.getFormat())) {
131                            docUUIDs.addAll(documentIngester.ingestBibNLinkedInstanceRequestDocuments(reqDoc, session));
132                            documentIndexer.indexDocument(reqDoc);
133                        } else {
134                            logger.error("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
135                            throw new Exception("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
136                        }
137                    } else if (DocType.INSTANCE.isEqualTo(reqDoc.getType())) { // Instace
138                        if (DocFormat.OLEML.isEqualTo(reqDoc.getFormat())) { // OLE-ML
139                            documentIngester.ingestInstanceDocument(reqDoc, session, docUUIDs, null, null);
140                            documentIndexer.indexDocument(reqDoc);
141                        } else {
142                            logger.error("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
143                            throw new Exception("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
144                        }
145                    } else if (DocType.LICENSE.isEqualTo(reqDoc.getType())) { // License
146                        if (DocFormat.ONIXPL.isEqualTo(reqDoc.getFormat())
147                                || DocFormat.PDF.isEqualTo(reqDoc.getFormat())
148                                || DocFormat.DOC.isEqualTo(reqDoc.getFormat()) || DocFormat.XSLT
149                                .isEqualTo(reqDoc.getFormat())) { //Onixpl, pdf, doc, xslt.
150                            documentIngester.ingestWorkLicenseOnixplRequestDocument(reqDoc, session, docUUIDs);
151                            documentIndexer.indexDocument(reqDoc);
152                        } else {
153                            logger.error("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
154                            throw new Exception("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
155                        }
156                    } else {
157                        logger.error("Unsupported Document Type : " + reqDoc.getType() + " Called.");
158                        throw new Exception("Unsupported Document Type : " + reqDoc.getType() + " Called.");
159                    }
160                } else if (DocCategory.SECURITY.isEqualTo(reqDoc.getCategory())) { // Security
161                    if (DocType.PATRON.isEqualTo(reqDoc.getType())) { // Patron
162                        if (DocFormat.OLEML.isEqualTo(reqDoc.getFormat())) { // oleml
163                            docUUIDs.addAll(documentIngester.ingestPatronRequestDocument(reqDoc, session, null));
164                            documentIndexer.indexDocument(reqDoc);
165                        } else {
166                            logger.error("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
167                            throw new Exception("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
168                        }
169                    } else {
170                        logger.error("Unsupported Document Type : " + reqDoc.getType() + " Called.");
171                        throw new Exception("Unsupported Document Type : " + reqDoc.getType() + " Called.");
172                    }
173                } else {
174                    logger.error("Unsupported Category : " + reqDoc.getCategory() + " Called.");
175                    throw new Exception("Unsupported Document Category : " + reqDoc.getCategory() + " Called.");
176                }
177            }
178
179            // Commit: DocStore
180            session.save();
181
182        } catch (Exception e) {
183            logger.error("Document Ingest & Index Failed, Cause: " + e.getMessage(), e);
184            documentIngester.rollbackDocStoreIngestedData(session, request.getRequestDocuments());
185            documentIndexer.rollbackIndexedData(request.getRequestDocuments());
186            throw e;
187        } finally {
188            if (session != null) {
189                getRepositoryManager().logout(session);
190            }
191        }
192        Response response = buildResponse(request);
193        return response;
194    }
195
196    private RepositoryManager getRepositoryManager() throws OleException {
197        if (null == repositoryManager) {
198            repositoryManager = RepositoryManager.getRepositoryManager();
199        }
200        return repositoryManager;
201    }
202
203    /**
204     * Method to ingest and index bulk Request.
205     *
206     * @param request
207     * @return
208     */
209    public List<String> bulkIngestNIndex(Request request, Session session) {
210        //RequestDocument requestDocument = request.getRequestDocuments().get(0);
211        //DocumentManager documentManager = BeanLocator.getDocumentManagerFactory().getDocumentManager(requestDocument);
212        BatchIngestStatistics batchStatistics = BulkIngestStatistics.getInstance().getCurrentBatch();
213        BulkIngestStatistics bulkLoadStatistics = BulkIngestStatistics.getInstance();
214        long commitSize = ProcessParameters.BULK_INGEST_COMMIT_SIZE;
215        logger.debug("commitSize = " + commitSize);
216        logger.debug("bulkIngestNIndex(" + request.getRequestDocuments().size() + ") START");
217        logger.debug("BULK_INGEST_IS_LINKING_ENABLED=" + ProcessParameters.BULK_INGEST_IS_LINKING_ENABLED);
218        //Session session = null;
219        List<String> docUUIDs = new ArrayList<String>();
220        StopWatch ingestTimer = new StopWatch();
221        StopWatch indexTimer = new StopWatch();
222        StopWatch totalTimer = new StopWatch();
223        StopWatch createNodesTimer = new StopWatch();
224        StopWatch sessionSaveTimer = new StopWatch();
225        StopWatch solrOptimizeTimer = new StopWatch();
226        long recCount = request.getRequestDocuments().size();
227        boolean isCommit = false;
228        totalTimer.start();
229        try {
230            ingestTimer.start();
231            createNodesTimer.start();
232            //session = RepositoryManager.getRepositoryManager().getSession(request.getUser(), request.getOperation());
233            List<RequestDocument> reqDocs = request.getRequestDocuments();
234            if (prevRequestDocs == null) {
235                prevRequestDocs = new ArrayList<RequestDocument>();
236            }
237            prevRequestDocs.addAll(request.getRequestDocuments());
238            logger.info("prevRequestDocs" + prevRequestDocs.size());
239            docUUIDs.addAll(documentIngester.ingestRequestDocumentsForBulk(reqDocs, session));
240            //docUUIDs.addAll(documentIngester.ingestRequestDocumentsForBulkUsingBTreeMgr(reqDocs, session));
241            //documentManager.store(reqDocs,session);
242            createNodesTimer.stop();
243            try {
244                ingestTimer.suspend();
245                indexTimer.start();
246            } catch (Exception e2) {
247                logger.error(e2.getMessage() , e2 );
248            }
249            bulkLoadStatistics.setCommitRecCount(bulkLoadStatistics.getCommitRecCount() + recCount);
250            if (bulkLoadStatistics.getCommitRecCount() == commitSize || bulkLoadStatistics.isLastBatch()) {
251                isCommit = true;
252            }
253            documentIndexer.indexDocumentsForBulk(reqDocs, isCommit);
254            //documentManager.index(reqDocs,isCommit);
255            try {
256                indexTimer.suspend();
257                ingestTimer.resume();
258            } catch (Exception e2) {
259                logger.error(e2.getMessage() , e2 );
260            }
261            if (isCommit) {
262                sessionSaveTimer.start();
263                logger.info("Bulk ingest: Repository commit started. Number of records being committed : "
264                        + bulkLoadStatistics.getCommitRecCount());
265                session.save();
266                bulkLoadStatistics.setCommitRecCount(0);
267                prevRequestDocs = null;
268                sessionSaveTimer.stop();
269            }
270
271            try {
272                ingestTimer.stop();
273            } catch (Exception e2) {
274                logger.error(e2.getMessage() , e2 );
275            }
276            // Documents processed can be different from records processed as in the case of Instance data.
277            logger.debug("Documents processed:" + recCount);
278            bulkLoadStatistics.setFileRecCount(bulkLoadStatistics.getFileRecCount() + recCount);
279            logger.info("Bulk ingest: Records processed in the current file :" + bulkLoadStatistics.getFileRecCount());
280        } catch (Exception e) {
281            bulkLoadStatistics.setCommitRecCount(0);
282            try {
283                ingestTimer.resume();
284            } catch (Exception e2) {
285                logger.error(e2.getMessage() , e2 );
286            }
287            //documentIngester.rollbackDocStoreIngestedData(session, request.getRequestDocuments());
288            documentIngester.rollbackDocStoreIngestedData(session, prevRequestDocs);
289            ingestTimer.stop();
290            try {
291                indexTimer.resume();
292            } catch (Exception e2) {
293                logger.error(e2.getMessage() , e2 );
294            }
295            //documentIndexer.rollbackIndexedData(request.getRequestDocuments());
296            //prevRequestDocs = prevRequestDocs.subList(0, prevRequestDocs.size() - request.getRequestDocuments().size());
297            //logger.info("prevRequestDocs before remove INDEXES = " + prevRequestDocs.size());
298            documentIndexer.rollbackIndexedData(prevRequestDocs);
299            prevRequestDocs = null;
300            try {
301                indexTimer.stop();
302            } catch (Exception e2) {
303                logger.error(e2.getMessage() , e2 );
304            }
305            logger.error("Document Ingest & Index Failed, Cause: " + e.getMessage(), e);
306            try {
307                totalTimer.stop();
308            } catch (Exception e2) {
309                logger.error(e2.getMessage() , e2 );
310            }
311            logger.debug("Time Consumptions...:\tcreatingNodes(" + docUUIDs.size() + "):" + createNodesTimer
312                    + "\tSessionSave(" + docUUIDs.size() + "):" + sessionSaveTimer + "\tIngest(" + docUUIDs.size()
313                    + "):" + ingestTimer + "\tIndexing(" + docUUIDs.size() + "):" + indexTimer + "\tTotal Time: "
314                    + totalTimer);
315            docUUIDs.clear();
316        } finally {
317            /*if (session != null) {
318                try {
319                    RepositoryManager.getRepositoryManager().logout(session);
320                } catch (OleException e) {
321                }
322            } */
323        }
324        try {
325            totalTimer.stop();
326        } catch (Exception exe) {
327            logger.error(exe.getMessage() , exe );
328        }
329        logger.debug(
330                "Time Consumptions...:\tcreatingNodes(" + docUUIDs.size() + "):" + createNodesTimer + "\tSessionSave("
331                        + docUUIDs.size() + "):" + sessionSaveTimer + "\tIngest(" + docUUIDs.size() + "):" + ingestTimer
332                        + "\tIndexing(" + docUUIDs.size() + "):" + indexTimer + "\tTotal Time: " + totalTimer);
333        logger.debug("bulkIngestNIndex(" + request.getRequestDocuments().size() + ") END");
334        batchStatistics.setTimeToCreateNodesInJcr(createNodesTimer.getTime());
335        batchStatistics.setTimeToSaveJcrSession(sessionSaveTimer.getTime());
336        batchStatistics.setIngestingTime(ingestTimer.getTime());
337        batchStatistics.setIndexingTime(indexTimer.getTime());
338        batchStatistics.setIngestNIndexTotalTime(totalTimer.getTime());
339        updateProcessTimer(docUUIDs.size(), ingestTimer, indexTimer, totalTimer);
340        solrOptimizeTimer.start();
341        optimizeSolr(docUUIDs.size());
342        solrOptimizeTimer.stop();
343        batchStatistics.setTimeToSolrOptimize(solrOptimizeTimer.getTime());
344        return docUUIDs;
345    }
346
347    private void updateProcessTimer(int recordsProcessed, StopWatch ingest, StopWatch index, StopWatch total) {
348        BulkIngestTimeManager timer = ProcessParameters.BULK_PROCESSOR_TIME_MANAGER;
349        synchronized (timer) {
350            timer.setRecordsCount(timer.getRecordsCount() + recordsProcessed);
351            timer.setIngestingTimer(timer.getIngestingTimer() + ingest.getTime());
352            timer.setIndexingTimer(timer.getIndexingTimer() + index.getTime());
353            timer.setProcessTimer(timer.getProcessTimer() + total.getTime());
354            if (timer.getRecordsCount() >= ProcessParameters.BULK_PROCESSOR_TIMER_DISPLAY) {
355                logger.debug(
356                        "----------------------------------------------------------------------------------------------------------------------");
357                logger.debug(timer.toString());
358                logger.debug(
359                        "----------------------------------------------------------------------------------------------------------------------");
360                timer.reset();
361            }
362        }
363    }
364
365    private void optimizeSolr(long recordsProcessed) {
366        docCount += recordsProcessed;
367        logger.debug("BULK_INGEST_OPTIMIZE_SIZE=" + ProcessParameters.BULK_INGEST_OPTIMIZE_SIZE
368                + ". Records processed till now=" + docCount);
369        logger.info("Bulk ingest: Records processed in the bulk ingest " + docCount);
370        if (docCount >= ProcessParameters.BULK_INGEST_OPTIMIZE_SIZE) {
371            docCount = 0;
372            try {
373                logger.debug("Solr Optimization: START");
374                documentIndexer.optimizeSolr(false, false);
375                logger.debug("Solr Optimization: END");
376            } catch (Exception e) {
377                logger.warn("Solr Optimization Failed: ", e);
378            }
379        }
380    }
381
382    public Response buildResponse(Request request) {
383        Response docStoreResponse = new Response();
384        docStoreResponse.setUser(request.getUser());
385        docStoreResponse.setOperation(request.getOperation());
386        docStoreResponse.setMessage("Documents ingested");
387        docStoreResponse.setStatus("Success");
388        docStoreResponse.setStatusMessage("Documents Ingested Successfully");
389        List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
390        ResponseDocument linkedDocument = null;
391        ResponseDocument responseDocument = null;
392        ResponseDocument linkedInstanceDocument = null;
393        ResponseDocument linkedInstanceItemDocument = null;
394        ResponseDocument linkedInstanceSrHoldingDoc = null;
395        // documents
396        for (Iterator<RequestDocument> iterator = request.getRequestDocuments().iterator(); iterator.hasNext(); ) {
397            RequestDocument docStoreDocument = iterator.next();
398            docStoreDocument.getContent().setContent("");
399            responseDocument = new ResponseDocument();
400            setResponseParameters(responseDocument, docStoreDocument);
401            responseDocuments.add(responseDocument);
402            if (docStoreDocument.getLinkedRequestDocuments() != null
403                    && docStoreDocument.getLinkedRequestDocuments().size() > 0 && request != null
404                    && request.getOperation() != null && !request.getOperation().equalsIgnoreCase("checkIn")) {
405                List<ResponseDocument> linkResponseDos = new ArrayList<ResponseDocument>();
406                // linked instance documents
407                for (Iterator<RequestDocument> linkIterator = docStoreDocument.getLinkedRequestDocuments()
408                        .iterator(); linkIterator.hasNext(); ) {
409                    RequestDocument linkedRequestDocument = linkIterator.next();
410                    linkedRequestDocument.getContent().setContent("");
411                    linkedDocument = new ResponseDocument();
412                    setResponseParameters(linkedDocument, linkedRequestDocument);
413                    linkResponseDos.add(linkedDocument);
414                    List<ResponseDocument> linkInstanceDocs = new ArrayList<ResponseDocument>();
415                    InstanceCollection instanceCollection = (InstanceCollection) linkedRequestDocument.getContent()
416                            .getContentObject();
417                    for (Instance oleInstance : instanceCollection.getInstance()) {
418                        // holding from instance
419                        linkedInstanceDocument = new ResponseDocument();
420                        setResponseParameters(linkedInstanceDocument, linkedRequestDocument);
421                        linkedInstanceDocument.setUuid(oleInstance.getOleHoldings().getHoldingsIdentifier());
422                        linkedInstanceDocument.setType("holdings");
423                        linkInstanceDocs.add(linkedInstanceDocument);
424
425                        //SourceHolding from Instance
426                        linkedInstanceSrHoldingDoc = new ResponseDocument();
427                        setResponseParameters(linkedInstanceSrHoldingDoc, linkedRequestDocument);
428                        if (oleInstance.getSourceHoldings() != null &&
429                                oleInstance.getSourceHoldings().getHoldingsIdentifier() != null) {
430                            linkedInstanceSrHoldingDoc.setUuid(oleInstance.getSourceHoldings().getHoldingsIdentifier());
431                            linkedInstanceSrHoldingDoc.setType("sourceHoldings");
432                            linkInstanceDocs.add(linkedInstanceSrHoldingDoc);
433                        }
434
435
436                        // item from instance
437                        for (Iterator<Item> itemIterator = oleInstance.getItems().getItem().iterator(); itemIterator
438                                .hasNext(); ) {
439                            Item oleItem = itemIterator.next();
440                            linkedInstanceItemDocument = new ResponseDocument();
441                            setResponseParameters(linkedInstanceItemDocument, linkedRequestDocument);
442                            linkedInstanceItemDocument.setUuid(oleItem.getItemIdentifier());
443                            linkedInstanceItemDocument.setType("item");
444                            linkInstanceDocs.add(linkedInstanceItemDocument);
445                        }
446                    }
447                    responseDocument.setLinkedInstanceDocuments(linkInstanceDocs);
448                }
449                responseDocument.setLinkedDocuments(linkResponseDos);
450            }
451        }
452        docStoreResponse.setDocuments(responseDocuments);
453        return docStoreResponse;
454    }
455
456    private void setResponseParameters(ResponseDocument responseDocument, RequestDocument docStoreDocument) {
457        responseDocument.setId(docStoreDocument.getId());
458        responseDocument.setCategory(docStoreDocument.getCategory());
459        responseDocument.setType(docStoreDocument.getType());
460        responseDocument.setFormat(docStoreDocument.getFormat());
461        responseDocument.setContent(docStoreDocument.getContent());
462        responseDocument.setUuid(docStoreDocument.getUuid());
463    }
464
465    public void setRepositoryManager(RepositoryManager repositoryManager) {
466        this.repositoryManager = repositoryManager;
467    }
468
469    public DocumentIngester getDocumentIngester() {
470        return documentIngester;
471    }
472}