View Javadoc

1   /*
2    * Copyright 2011 The Kuali Foundation.
3    * 
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    * http://www.opensource.org/licenses/ecl2.php
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.ole.docstore.discovery.service;
17  
18  import java.io.BufferedReader;
19  import java.io.File;
20  import java.io.FileReader;
21  import java.io.FilenameFilter;
22  import java.io.IOException;
23  import java.io.InputStreamReader;
24  import java.io.OutputStreamWriter;
25  import java.io.StringWriter;
26  import java.net.HttpURLConnection;
27  import java.net.MalformedURLException;
28  import java.net.URL;
29  import java.util.ArrayList;
30  import java.util.Date;
31  import java.util.HashMap;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.UUID;
35  import javax.xml.stream.XMLInputFactory;
36  import javax.xml.stream.XMLStreamConstants;
37  import javax.xml.stream.XMLStreamReader;
38  import javax.xml.transform.OutputKeys;
39  import javax.xml.transform.Transformer;
40  import javax.xml.transform.TransformerFactory;
41  import javax.xml.transform.stax.StAXSource;
42  import javax.xml.transform.stream.StreamResult;
43  
44  import org.apache.commons.io.FileUtils;
45  import org.apache.commons.lang.time.StopWatch;
46  import org.apache.solr.client.solrj.SolrQuery;
47  import org.apache.solr.client.solrj.SolrServer;
48  import org.apache.solr.client.solrj.SolrServerException;
49  import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
50  import org.apache.solr.client.solrj.response.QueryResponse;
51  import org.apache.solr.client.solrj.response.UpdateResponse;
52  import org.apache.solr.client.solrj.util.ClientUtils;
53  import org.apache.solr.common.SolrDocument;
54  import org.apache.solr.common.SolrInputDocument;
55  import org.apache.solr.common.SolrInputField;
56  import org.kuali.ole.docstore.discovery.solr.security.patron.oleml.SecurityPatronOlemlDocBuilder;
57  import org.kuali.ole.docstore.discovery.solr.work.bib.WorkBibCommonFields;
58  import org.kuali.ole.docstore.discovery.solr.work.bib.dublin.WorkBibDublinDocBuilder;
59  import org.kuali.ole.docstore.discovery.solr.work.bib.dublin.unqualified.WorkBibDublinUnQualifiedDocBuilder;
60  import org.kuali.ole.docstore.discovery.solr.work.bib.marc.WorkBibMarcDocBuilder;
61  import org.kuali.ole.docstore.discovery.solr.work.instance.oleml.WorkInstanceOlemlDocBuilder;
62  import org.kuali.ole.docstore.discovery.solr.work.license.binary.WorkLicenseBinaryDocBuilder;
63  import org.kuali.ole.docstore.discovery.solr.work.license.onixpl.WorkLicenseOnixplDocBuilder;
64  import org.kuali.ole.docstore.discovery.util.PropertyUtil;
65  import org.kuali.ole.docstore.model.enums.DocCategory;
66  import org.kuali.ole.docstore.model.enums.DocFormat;
67  import org.kuali.ole.docstore.model.enums.DocType;
68  import org.kuali.ole.docstore.model.xmlpojo.ingest.RequestDocument;
69  import org.kuali.ole.docstore.model.xmlpojo.work.bib.dublin.WorkBibDublinRecord;
70  import org.kuali.ole.docstore.model.xmlpojo.work.instance.oleml.Instance;
71  import org.kuali.ole.docstore.model.xmlpojo.work.instance.oleml.InstanceCollection;
72  import org.kuali.ole.docstore.model.xstream.work.bib.dublin.WorkBibDublinRecordProcessor;
73  import org.kuali.ole.docstore.model.xstream.work.bib.dublin.unqualified.WorkBibDublinUnQualifiedRecordProcessor;
74  import org.kuali.ole.docstore.model.xstream.work.bib.marc.WorkBibMarcRecordProcessor;
75  import org.kuali.ole.docstore.utility.BatchIngestStatistics;
76  import org.kuali.ole.docstore.utility.BulkIngestStatistics;
77  import org.slf4j.Logger;
78  import org.slf4j.LoggerFactory;
79  
80  /**
81   * This class implements the {@link IndexerService} methods.
82   * <p>
83   * All updates to the Solr indexes should ideally take place through this class.
84   * Some important notes about solr:
85   * Unlike a database, there are no distinct sessions (i.e. transactions) between each client,
86   * and instead there is in-effect one global modification state.
87   * There should be only one process for updating solr.
88   * It is recommended to explicitly optimize the Solr index at an opportune time
89   * like after a bulk load of data and/or a daily interval in off-peak hours.
90   * </p>
91   */
92  public class IndexerServiceImpl
93          implements IndexerService {
94      private static final Logger LOG                   = LoggerFactory.getLogger(IndexerServiceImpl.class);
95      public static final  String UUID_FILE_NAME_SUFFIX = "_UUID_.xml";
96  
97      //    private static      String         docSearchUrl            = null;
98      private static      IndexerService indexerService      = null;
99      public static final String         ID_FIELD_PREFIX     = "id_disc_";
100     public static final int            BATCH_SIZE          = 10000;
101     private final       String         BIBLIOGRAPHIC       = "bibliographic";
102     private final       String         DOC_TYPE            = "DocType";
103     private final       String         DOC_FORMAT          = "DocFormat";
104     private final       String         HOLDINGS_IDENTIFIER = "holdingsIdentifier";
105     private final       String         ITEM_IDENTIFIER     = "itemIdentifier";
106     private final       String         INSTANCE            = "instance";
107 
108 
109     private IndexerServiceImpl() {
110         init();
111     }
112 
113     public static IndexerService getInstance() {
114         if (null == indexerService) {
115             indexerService = new IndexerServiceImpl();
116         }
117         return indexerService;
118     }
119 
120     protected void init() {
121         LOG.debug("IndexerServiceImpl init ");
122         //        docSearchUrl = PropertyUtil.getPropertyUtil().getProperty("docSearchURL");
123         //        if ((null != docSearchUrl) && !docSearchUrl.endsWith("/")) {
124         //            docSearchUrl = docSearchUrl + "/";
125         //        }
126     }
127 
128     public String deleteDocuments(String docCategory, List<String> uuidList)
129             throws MalformedURLException, SolrServerException {
130         String result = deleteDocumentsByUUIDList(uuidList, docCategory);
131         return result;
132     }
133 
134     public String deleteDocument(String docCategory, String uuid) {
135         String result = deleteDocumentByUUID(uuid, docCategory);
136         return result;
137     }
138 
139     public String indexSolrDocuments(List<SolrInputDocument> solrDocs) {
140         return indexSolrDocuments(solrDocs, true);
141     }
142 
143     public String indexSolrDocuments(List<SolrInputDocument> solrDocs, boolean commit) {
144         String result = null;
145         StopWatch timer = new StopWatch();
146         timer.start();
147         try {
148             result = indexSolrDocuments(solrDocs, commit, false);
149             timer.stop();
150             LOG.info("Time taken for indexing " + solrDocs.size() + " Solr docs:" + timer.toString());
151         }
152         catch (Exception e) {
153             result = buildFailureMsg(null, "Indexing failed. " + e.getMessage());
154             LOG.error(result, e);
155         }
156         return result;
157     }
158 
159     @Override
160     public String indexDocumentsFromDirBySolrDoc(String docCategory, String docType, String docFormat, String dataDir) {
161         String result = null;
162         String xmlContent = "";
163         // get the files from the dir.
164         File srcDir = new File(dataDir);
165         if ((null == srcDir) || !srcDir.isDirectory()) {
166             result = buildFailureMsg(null, "Invalid data directory:" + dataDir);
167             return result;
168         }
169         FilenameFilter filter = new FilenameFilter() {
170             public boolean accept(File dir, String name) {
171                 return (!name.startsWith(".") && (name.endsWith(".xml")));
172             }
173         };
174 
175         String[] srcFileNames = srcDir.list(filter);
176         if ((null == srcFileNames) || (srcFileNames.length == 0)) {
177             result = buildFailureMsg(null, "No data files found in data dir:" + dataDir);
178             return result;
179         }
180         List<File> fileList = new ArrayList<File>(srcFileNames.length);
181         for (int i = 0; i < srcFileNames.length; i++) {
182             File srcFile = new File(dataDir + File.separator + srcFileNames[i]);
183             fileList.add(srcFile);
184         }
185         return indexDocumentsFromFiles(docCategory, docType, docFormat, fileList);
186     }
187 
188     @Override
189     public String indexDocumentsFromStringBySolrDoc(String docCategory, String docType, String docFormat, String data)
190             throws IOException {
191 
192         File file = File.createTempFile("marc.xml", ".tmp");
193         FileUtils.writeStringToFile(file, data, "UTF-8");
194         String filePath = file.getAbsolutePath();
195         return indexDocumentsFromFileBySolrDoc(docCategory, docType, docFormat,
196                                                filePath);  //To change body of implemented methods use File | Settings | File Templates.
197     }
198 
199     @Override
200     public String indexDocumentsFromFileBySolrDoc(String docCategory, String docType, String docFormat,
201                                                   String filePath) {
202         List<File> fileList = new ArrayList<File>(0);
203         fileList.add(new File(filePath));
204         return indexDocumentsFromFiles(docCategory, docType, docFormat, fileList);
205     }
206 
207 
208     /**
209      * Indexes the records (of the given docCategory, docType and docFormat) from the files in the given data directory.
210      * <p>
211      * This is a utility method to use Discovery separately from DocStore.
212      * </p>
213      *
214      * @param docCategory category of the documents expected in the input files
215      * @param docType     type of the documents expected in the input files
216      * @param docFormat   format of the documents expected in the input files
217      * @param fileList    list of files to be indexed
218      * @return SUCCESS or FAILURE
219      */
220     @Override
221     public String indexDocumentsFromFiles(String docCategory, String docType, String docFormat, List<File> fileList) {
222         // TODO: Modify this method so that if dataDir is a file, it should be indexed.
223         String result = null;
224         String xmlContent = "";
225         try {
226             StopWatch indexingTimer = new StopWatch();
227             StopWatch conversionTimer = new StopWatch();
228             StopWatch fileIOTimer = new StopWatch();
229             StopWatch totalTimer = new StopWatch();
230             totalTimer.start();
231             fileIOTimer.start();
232             fileIOTimer.suspend();
233 
234             if ((null == fileList) || (fileList.size() == 0)) {
235                 result = buildFailureMsg(null, "No  files found in data dir:" + fileList);
236                 return result;
237             }
238             int numFiles = fileList.size();
239             int numDocs = 0;
240             SolrServer solr = SolrServerManager.getInstance().getSolrServer();
241             TransformerFactory tf = new com.sun.org.apache.xalan.internal.xsltc.trax.TransformerFactoryImpl();
242             Transformer t = tf.newTransformer();
243             t.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
244             t.setOutputProperty(OutputKeys.INDENT, "yes");
245             conversionTimer.start();
246             conversionTimer.suspend();
247             indexingTimer.start();
248             indexingTimer.suspend();
249             for (int i = 0; i < fileList.size(); i++) {
250                 File srcFile = fileList.get(i);
251                 LOG.info("Processing File: " + srcFile.getAbsolutePath());
252                 String srcFileName = srcFile.getName();
253 
254                 // Get the id of the doc from the file name if Exists.
255                 String idFromFileName = null;
256                 List<String> idFromFileNameList = null;
257                 int suffixIndex = srcFileName.indexOf(UUID_FILE_NAME_SUFFIX);
258                 if (suffixIndex > 0) {
259                     idFromFileName = srcFileName.substring(0, suffixIndex);
260                     idFromFileNameList = new ArrayList<String>(1);
261                     idFromFileNameList.add(idFromFileName);
262                 }
263 
264                 int recordsProcessedInFile = 0;
265                 try {
266                     XMLInputFactory xif = XMLInputFactory.newInstance();
267                     XMLStreamReader xsr = xif.createXMLStreamReader(new FileReader(srcFile));
268                     xsr.nextTag();
269                     recordsProcessedInFile = 0;
270                     List<SolrInputDocument> solrDocsToAdd = new ArrayList<SolrInputDocument>();
271                     List<SolrInputDocument> solrDocs = null;
272                     while (xsr.hasNext()) {
273                         int eventType = xsr.next();
274                         if (eventType == XMLStreamConstants.START_ELEMENT) {
275                             if (DocFormat.MARC.isEqualTo(docFormat)) {
276                                 recordsProcessedInFile++;
277                                 LOG.debug("Processing Record(" + recordsProcessedInFile + ") of File: " + srcFileName);
278                                 fileIOTimer.resume();
279                                 StringWriter str = new StringWriter();
280                                 str.append("<collection>");
281                                 t.transform(new StAXSource(xsr), new StreamResult(str));
282                                 str.append("</collection>");
283                                 xmlContent = str.getBuffer().toString();
284                                 fileIOTimer.suspend();
285                                 conversionTimer.resume();
286                                 solrDocs = convertToSolrDocs(docCategory, docType, docFormat, xmlContent);
287                                 if ((null == solrDocs) || (solrDocs.size() == 0)) {
288                                     continue;
289                                 }
290                                 if (idFromFileName == null) {
291                                     assignUUIDs(solrDocs, null);
292                                 }
293                                 else {
294                                     assignUUIDs(solrDocs.subList(0, 1), idFromFileNameList);
295                                 }
296                                 conversionTimer.suspend();
297                                 numDocs += solrDocs.size();
298                             }
299                             else if (DocFormat.DUBLIN_CORE.isEqualTo(docFormat)) {
300                                 // TODO: May be moved out of while loop?
301                                 conversionTimer.resume();
302                                 solrDocs = convertToSolrDocs(docCategory, docType, docFormat,
303                                                              FileUtils.readFileToString(srcFile, "UTF-8"));
304                                 assignUUIDs(solrDocs, null);
305                                 conversionTimer.suspend();
306                                 solrDocsToAdd.addAll(solrDocs);
307                                 numDocs += solrDocs.size();
308                                 break;
309                             }
310                             else if (DocFormat.DUBLIN_UNQUALIFIED.isEqualTo(docFormat)) {
311                                 if (xsr.getName().getLocalPart().equalsIgnoreCase("record")) {
312                                     conversionTimer.resume();
313                                     solrDocs = new ArrayList<SolrInputDocument>();
314                                     StringWriter str = new StringWriter();
315                                     str.append("<OAI-PMH><ListRecords>");
316                                     t.transform(new StAXSource(xsr), new StreamResult(str));
317                                     str.append("</ListRecords></OAI-PMH>");
318                                     str.close();
319                                     xmlContent = str.getBuffer().toString();
320                                     solrDocs = convertToSolrDocs(docCategory, docType, docFormat, xmlContent);
321                                     str.flush();
322                                     assignUUIDs(solrDocs, null);
323                                     conversionTimer.suspend();
324                                     numDocs += solrDocs.size();
325                                 }
326                             }
327                             else {
328                                 throw new Exception("Unsupported Document Format: " + docFormat);
329                             }
330                         }
331                         else {
332                             continue;
333                         }
334 
335                         if (solrDocs != null) {
336                             solrDocsToAdd.addAll(solrDocs);
337                         }
338                         if (solrDocsToAdd.size() < 500) {
339                             // TODO: Handle the case when the size of the batch is too high. Do a check on the size.
340                             continue;
341                         }
342                         indexingTimer.resume();
343                         solr.add(solrDocsToAdd);
344                         indexingTimer.suspend();
345                         solrDocsToAdd.clear();
346                         if (recordsProcessedInFile % 10000 == 0) {
347                             totalTimer.split();
348                             LOG.info("Records processed in file " + srcFileName + ":" + recordsProcessedInFile
349                                      + "; Time elapsed:" + totalTimer.toSplitString());
350                         }
351                         if (idFromFileName != null || DocFormat.DUBLIN_CORE.isEqualTo(docFormat)) {
352                             break;
353                         }
354                     }
355                     if (solrDocsToAdd.size() > 0) {
356                         indexingTimer.resume();
357                         solr.add(solrDocsToAdd);
358                         indexingTimer.suspend();
359                         solrDocsToAdd.clear();
360                     }
361                 }
362                 catch (Exception ex) {
363                     String message = "Failure while processing file '" + srcFile.getAbsolutePath() + "' \nat Record: "
364                                      + recordsProcessedInFile + "\n" + xmlContent;
365                     ex.printStackTrace();
366                     LOG.error(message);
367                     solr.rollback();
368                     throw ex;
369                 }
370                 totalTimer.split();
371                 if (recordsProcessedInFile > 0) {
372                     // Do not log this message if a file has only one record.
373                     LOG.info("Records processed in file " + srcFileName + ":" + recordsProcessedInFile
374                              + "; Time elapsed:" + totalTimer.toSplitString());
375                 }
376             }
377             // commit after all docs are added.
378             if (numDocs > 0) {
379                 indexingTimer.resume();
380                 solr.commit();
381                 indexingTimer.suspend();
382             }
383 
384             conversionTimer.stop();
385             fileIOTimer.stop();
386             indexingTimer.stop();
387             totalTimer.stop();
388             LOG.info("Num of files processed:" + numFiles + "; Num of documents processed:" + numDocs);
389             LOG.info("Time taken for reading files:" + fileIOTimer.toString()
390                      + "; Time taken for parsing and converting to Solr Docs:" + conversionTimer.toString());
391             LOG.info(
392                     "Time taken for indexing Solr docs:" + indexingTimer.toString() + "; Total time taken:" + totalTimer
393                             .toString());
394             result = SUCCESS + "-" + numDocs;
395         }
396         catch (Exception e) {
397             result = buildFailureMsg(null, "Indexing failed. " + e.getMessage());
398             LOG.error(result, e);
399         }
400         return result;
401     }
402 
403     //    public String indexDocuments(List<RequestDocument> requestDocuments) {
404     //        for (RequestDocument requestDocument : requestDocuments) {
405     //            indexDocument(requestDocument);
406     //        }
407     //        return null;
408     //    }
409 
410     public String indexDocument(RequestDocument requestDocument) {
411         return indexDocument(requestDocument, true);
412     }
413 
414     public String indexDocument(RequestDocument requestDocument, boolean commit) {
415         List<RequestDocument> requestDocuments = null;
416         if (requestDocument != null) {
417             requestDocuments = new ArrayList<RequestDocument>(1);
418             requestDocuments.add(requestDocument);
419         }
420         return indexDocuments(requestDocuments, commit);
421     }
422 
423     @Override
424     public String indexDocuments(List<RequestDocument> requestDocuments) {
425         return indexDocuments(requestDocuments, true);
426     }
427 
428     @Override
429     public String indexDocuments(List<RequestDocument> requestDocuments, boolean commit) {
430         String result = null;
431         StopWatch timer = new StopWatch();
432         StopWatch buildSolrInputDocTime = new StopWatch();
433         StopWatch xmlToObjTime = new StopWatch();
434         buildSolrInputDocTime.start();
435         buildSolrInputDocTime.suspend();
436         xmlToObjTime.start();
437         xmlToObjTime.suspend();
438         timer.start();
439         List<SolrInputDocument> solrInputDocuments = new ArrayList<SolrInputDocument>();
440         try {
441             for (RequestDocument requestDocument : requestDocuments) {
442                 if (requestDocument == null) {
443                     continue;
444                 }
445                 if (DocCategory.WORK.isEqualTo(requestDocument.getCategory())) {
446                     if (DocType.BIB.isEqualTo(requestDocument.getType())) { // Biblographic
447                         if (DocFormat.MARC.isEqualTo(requestDocument.getFormat())) {
448                             new WorkBibMarcDocBuilder()
449                                     .buildSolrInputDocument(requestDocument, solrInputDocuments, buildSolrInputDocTime,
450                                                             xmlToObjTime);
451                         }
452                         else if (DocFormat.DUBLIN_CORE.isEqualTo(requestDocument.getFormat())) {
453                             new WorkBibDublinDocBuilder().buildSolrInputDocument(requestDocument, solrInputDocuments);
454                         }
455                         else if (DocFormat.DUBLIN_UNQUALIFIED.isEqualTo(requestDocument.getFormat())) {
456                             new WorkBibDublinUnQualifiedDocBuilder()
457                                     .buildSolrInputDocument(requestDocument, solrInputDocuments);
458 
459                         }
460                         else {
461                             throw new Exception(
462                                     "Unsupported Document Format : " + requestDocument.getFormat() + " Called.");
463                         }
464                     }
465                     else if (DocType.LICENSE.isEqualTo(requestDocument.getType())) { //License
466                         if (DocFormat.ONIXPL.isEqualTo((requestDocument.getFormat()))) { //onixpl
467                             new WorkLicenseOnixplDocBuilder()
468                                     .buildSolrInputDocument(requestDocument, solrInputDocuments);
469                         }
470                         else if ((DocFormat.PDF.isEqualTo((requestDocument.getFormat()))) || DocFormat.DOC.isEqualTo(
471                                 requestDocument.getFormat()) || DocFormat.XSLT.isEqualTo(
472                                 requestDocument.getFormat())) { //pdf
473                             new WorkLicenseBinaryDocBuilder()
474                                     .buildSolrInputDocument(requestDocument, solrInputDocuments);
475                         }
476                         else {
477                             throw new Exception(
478                                     "Unsupported Document Format : " + requestDocument.getFormat() + " Called.");
479                         }
480                     }
481 
482                     else if (DocType.INSTANCE.isEqualTo(requestDocument.getType())) { // Instance
483                         if (DocFormat.OLEML.isEqualTo(requestDocument.getFormat())) {
484                             new WorkInstanceOlemlDocBuilder()
485                                     .buildSolrInputDocument(requestDocument, solrInputDocuments);
486                         }
487                         else {
488                             throw new Exception(
489                                     "Unsupported Document Format : " + requestDocument.getFormat() + " Called.");
490                         }
491                     }
492                     else if (DocType.HOLDINGS.isEqualTo(requestDocument.getType())) { // Holdings
493                         if (DocFormat.OLEML.isEqualTo(requestDocument.getFormat())) {
494                             new WorkInstanceOlemlDocBuilder()
495                                     .buildSolrInputDocument(requestDocument, solrInputDocuments);
496                         }
497                         else {
498                             throw new Exception(
499                                     "Unsupported Document Format : " + requestDocument.getFormat() + " Called.");
500                         }
501                     }
502                     else if (DocType.ITEM.isEqualTo(requestDocument.getType())) { // Item
503                         if (DocFormat.OLEML.isEqualTo(requestDocument.getFormat())) {
504                             new WorkInstanceOlemlDocBuilder()
505                                     .buildSolrInputDocument(requestDocument, solrInputDocuments);
506                         }
507                         else {
508                             throw new Exception(
509                                     "Unsupported Document Format : " + requestDocument.getFormat() + " Called.");
510                         }
511                     }
512                     else if (DocType.SOURCEHOLDINGS.isEqualTo(requestDocument.getType())) { // Item
513                         if (DocFormat.OLEML.isEqualTo(requestDocument.getFormat())) {
514                             return "success";
515                         }
516                         else {
517                             throw new Exception(
518                                     "Unsupported Document Format : " + requestDocument.getFormat() + " Called.");
519                         }
520                     }
521                     else {
522                         throw new Exception("Unsupported Document Type : " + requestDocument.getType() + " Called.");
523                     }
524                 }
525                 else if (DocCategory.SECURITY.isEqualTo(requestDocument.getCategory())) {
526                     if (DocType.PATRON.isEqualTo(requestDocument.getType())) {
527                         if (DocFormat.OLEML.isEqualTo(requestDocument.getFormat())) {
528                             new SecurityPatronOlemlDocBuilder()
529                                     .buildSolrInputDocument(requestDocument, solrInputDocuments);
530                         }
531                         else {
532                             throw new Exception(
533                                     "Unsupported Document Format : " + requestDocument.getFormat() + " Called.");
534                         }
535                     }
536                     else {
537                         throw new Exception("Unsupported Document Type : " + requestDocument.getType() + " Called.");
538                     }
539                 }
540                 else {
541                     //                        logger.error("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
542                     throw new Exception(
543                             "Unsupported Document Category : " + requestDocument.getCategory() + " Called.");
544                 }
545                 assignUUIDs(solrInputDocuments, null);
546             }
547         }
548         catch (Exception e1) {
549             result = buildFailureMsg(null, "Indexing failed. " + e1.getMessage());
550             LOG.error(result, e1);
551         }
552         timer.stop();
553         if ((null == solrInputDocuments) || (solrInputDocuments.isEmpty())) {
554             result = buildFailureMsg(null, "No valid documents found in input.");
555             return result;
556         }
557         int numDocs = solrInputDocuments.size();
558         LOG.info("Conversion to Solr docs- Num:" + numDocs + ": Time taken:" + timer.toString());
559         result = indexSolrDocuments(solrInputDocuments, commit);
560         return result;
561     }
562 
563     public String bulkIndexDocuments(List<RequestDocument> requestDocuments, boolean isCommit) {
564         String result = "success";
565         Map<String, SolrInputDocument> bibIdToDocMap = new HashMap<String, SolrInputDocument>();
566         BatchIngestStatistics batchStatistics = BulkIngestStatistics.getInstance().getCurrentBatch();
567         if (requestDocuments != null && requestDocuments.size() > 0) {
568             StopWatch timer = new StopWatch();
569             StopWatch buildSolrInputDocTimer = new StopWatch();
570             StopWatch xmlToPojoTimer = new StopWatch();
571             timer.start();
572             buildSolrInputDocTimer.start();
573             buildSolrInputDocTimer.suspend();
574             xmlToPojoTimer.start();
575             xmlToPojoTimer.suspend();
576 
577             List<SolrInputDocument> solrInputDocuments = new ArrayList<SolrInputDocument>();
578             try {
579                 if (DocCategory.WORK.isEqualTo(requestDocuments.get(0).getCategory())) {
580                     if (DocType.BIB.isEqualTo(requestDocuments.get(0).getType())) {
581                         if (DocFormat.MARC.isEqualTo(requestDocuments.get(0).getFormat())) {
582                             WorkBibMarcDocBuilder marcBuilder = new WorkBibMarcDocBuilder();
583                             for (RequestDocument requestDocument : requestDocuments) {
584                                 marcBuilder.buildSolrInputDocument(requestDocument, solrInputDocuments,
585                                                                    buildSolrInputDocTimer, xmlToPojoTimer);
586                             }
587                         }
588                         else if (DocFormat.DUBLIN_CORE.isEqualTo(requestDocuments.get(0).getFormat())) {
589                             WorkBibDublinDocBuilder dublinBuilder = new WorkBibDublinDocBuilder();
590                             for (RequestDocument requestDocument : requestDocuments) {
591                                 dublinBuilder.buildSolrInputDocument(requestDocument, solrInputDocuments);
592                             }
593                         }
594                         else if (DocFormat.DUBLIN_UNQUALIFIED.isEqualTo(requestDocuments.get(0).getFormat())) {
595                             WorkBibDublinUnQualifiedDocBuilder dublinUnqBuilder
596                                     = new WorkBibDublinUnQualifiedDocBuilder();
597                             for (RequestDocument requestDocument : requestDocuments) {
598                                 dublinUnqBuilder.buildSolrInputDocument(requestDocument, solrInputDocuments);
599                             }
600                         }
601                     }
602                     else if (DocType.INSTANCE.isEqualTo(requestDocuments.get(0).getType())) {
603                         WorkInstanceOlemlDocBuilder oleMlDocBuilder = new WorkInstanceOlemlDocBuilder();
604                         for (RequestDocument requestDocument : requestDocuments) {
605                             Instance instance = ((InstanceCollection) requestDocument.getContent().getContentObject())
606                                     .getInstance().get(0);
607                             for (String rId : instance.getResourceIdentifier()) {
608                                 List<SolrDocument> docs = getSolrDocumentBySolrId(rId);
609                                 for (SolrDocument solrDoc : docs) {
610                                     SolrInputDocument bibSolrIDoc = ClientUtils.toSolrInputDocument(solrDoc);
611                                     String bibId = bibSolrIDoc.getFieldValue(WorkBibCommonFields.UNIQUE_ID).toString();
612                                     if (bibIdToDocMap.get(bibId) == null) {
613                                         bibIdToDocMap.put(bibId, bibSolrIDoc);
614                                     }
615                                     bibIdToDocMap.get(bibId)
616                                                  .addField("instanceIdentifier", instance.getInstanceIdentifier());
617                                 }
618                             }
619                             oleMlDocBuilder.buildSolrInputDocuments(requestDocument, solrInputDocuments);
620                         }
621                     }
622                 }
623                 if (DocCategory.SECURITY.isEqualTo(requestDocuments.get(0).getCategory())) {
624                     if (DocType.PATRON.isEqualTo(requestDocuments.get(0).getType())) {
625                         if (DocFormat.OLEML.isEqualTo(requestDocuments.get(0).getFormat())) {
626                             SecurityPatronOlemlDocBuilder patronBuilder = new SecurityPatronOlemlDocBuilder();
627                             for (RequestDocument requestDocument : requestDocuments) {
628                                 patronBuilder.buildSolrInputDocument(requestDocument, solrInputDocuments);
629                             }
630                         }
631                     }
632                 }
633                 assignUUIDs(solrInputDocuments, null);
634                 solrInputDocuments.addAll(bibIdToDocMap.values());
635             }
636             catch (Exception e1) {
637                 result = buildFailureMsg(null, "Bulk Indexing failed. " + e1.getMessage());
638                 LOG.error(result, e1);
639                 return result;
640             }
641             timer.stop();
642             if (solrInputDocuments.isEmpty()) {
643                 result = buildFailureMsg(null, "No valid documents found in input.");
644                 return result;
645             }
646             int numDocs = solrInputDocuments.size();
647             batchStatistics.setTimeToConvertXmlToPojo(xmlToPojoTimer.getTime());
648             batchStatistics.setTimeToConvertToSolrInputDocs(buildSolrInputDocTimer.getTime());
649             StopWatch indexingTimer = new StopWatch();
650             indexingTimer.start();
651             try {
652                 result = indexSolrDocuments(solrInputDocuments, isCommit, false, false, false);
653                 indexingTimer.stop();
654                 //                batchStatistics.setTimeToIndexSolrInputDocs(indexingTimer.toString());
655             }
656             catch (Exception e) {
657                 result = buildFailureMsg(null, "Indexing failed. " + e.getMessage());
658                 LOG.error(result, e);
659             }
660             LOG.debug("Time Consumptions...:\txmlToObj(" + numDocs + "):" + xmlToPojoTimer + "\tbuildSolrInputDoc("
661                       + numDocs + "):" + buildSolrInputDocTimer + "\tTotal(" + numDocs + "):" + timer.toString()
662                       + "\t indexingTime(" + solrInputDocuments.size() + "):" + indexingTimer.toString());
663         }
664         return result;
665     }
666 
667     public List<SolrDocument> getSolrDocumentBySolrId(String uniqueId) {
668         QueryResponse response = null;
669         String result = null;
670         try {
671             String args = "(" + WorkBibCommonFields.UNIQUE_ID + ":" + uniqueId + ")";
672             SolrServer solr = SolrServerManager.getInstance().getSolrServer();
673             SolrQuery query = new SolrQuery();
674             query.setQuery(args);
675             response = solr.query(query);
676         }
677         catch (Exception e) {
678             result = buildFailureMsg();
679             LOG.error(result, e);
680         }
681         return response.getResults();
682     }
683 
684     public List<SolrDocument> getSolrDocument(String fieldName, String fieldValue) {
685         QueryResponse response = null;
686         String result = null;
687         try {
688             String args = "(" + fieldName + ":" + fieldValue + ")";
689             SolrServer solr = SolrServerManager.getInstance().getSolrServer();
690             SolrQuery query = new SolrQuery();
691             query.setQuery(args);
692             response = solr.query(query);
693         }
694         catch (Exception e) {
695             result = buildFailureMsg();
696             LOG.error(result, e);
697         }
698         return response.getResults();
699     }
700 
701     /**
702      * Assigns UUIDs for each document (that does not have an "id" field) in the given list.
703      * Also makes sure "uniqueId" field is present. The UUIDs generated by this method start
704      * with ID_FIELD_PREFIX for easy identification. Optionally takes a list
705      * of UUIDs to be used to set/override the "id" field values of the documents.
706      *
707      * @param solrDocs
708      * @param ids      List of id values (optional) to be used for the given documents.
709      */
710     protected void assignUUIDs(List<SolrInputDocument> solrDocs, List<String> ids) throws Exception {
711         if ((null == solrDocs) || (solrDocs.size() == 0)) {
712             return;
713         }
714         if ((null != ids) && (ids.size() < solrDocs.size())) {
715             throw new Exception(
716                     "Insufficient UUIDs(" + ids.size() + ") specified for documents(" + solrDocs.size() + ".");
717         }
718         for (int i = 0; i < solrDocs.size(); i++) {
719             SolrInputDocument solrInputDocument = solrDocs.get(i);
720             SolrInputField idField = solrInputDocument.getField("id");
721             String uuid = null;
722             if (null != ids) {
723                 // Get the supplied UUID.
724                 uuid = ids.get(i);
725             }
726             if (null == idField) {
727                 if (null == uuid) {
728                     // Generate UUID.
729                     uuid = UUID.randomUUID().toString();
730                     uuid = ID_FIELD_PREFIX + uuid; // identifies the uuid generated by discovery module.
731                 }
732                 solrInputDocument.addField("id", uuid);
733                 solrInputDocument.addField("uniqueId", uuid);
734             }
735             else {
736                 if (null != uuid) {
737                     // Use the supplied UUID.
738                     solrInputDocument.setField("id", uuid);
739                     solrInputDocument.setField("uniqueId", uuid);
740                 }
741                 else {
742                     // Leave the existing id value and make sure uniqueId is set.
743                     //                    uuid = (String) idField.getValue();
744                     if (idField.getValue() instanceof List) {
745                         List<String> uuidList = (List<String>) idField.getValue();
746                         uuid = uuidList.get(0);
747                     }
748                     else if (idField.getValue() instanceof String) {
749                         uuid = (String) idField.getValue();
750                     }
751                     if (null == uuid) {
752                         // Generate UUID.
753                         uuid = UUID.randomUUID().toString();
754                         uuid = ID_FIELD_PREFIX + uuid; // identifies the uuid generated by discovery module.
755                         idField.setValue(uuid, 1.0f);
756                     }
757                     SolrInputField uniqueIdField = solrInputDocument.getField("uniqueId");
758                     if (null == uniqueIdField) {
759                         solrInputDocument.addField("uniqueId", uuid);
760                     }
761                     else {
762                         solrInputDocument.setField("uniqueId", uuid);
763                     }
764                 }
765             }
766         }
767     }
768 
769     @Override
770     public void commit() throws Exception {
771         boolean waitFlush = false;
772         boolean waitSearcher = false;
773         SolrServer solr = SolrServerManager.getInstance().getSolrServer();
774         solr.commit(waitFlush, waitSearcher);
775     }
776 
777     @Override
778     public void rollback() throws Exception {
779         SolrServer solr = SolrServerManager.getInstance().getSolrServer();
780         solr.rollback();
781     }
782 
783     protected String indexSolrDocuments(List<SolrInputDocument> solrDocs, boolean commit, boolean optimize,
784                                         boolean waitFlush, boolean waitSearcher) throws Exception {
785         BulkIngestStatistics bulkLoadStatistics = BulkIngestStatistics.getInstance();
786         BatchIngestStatistics batchStatistics = bulkLoadStatistics.getCurrentBatch();
787         StopWatch indexSolrDocsTime = new StopWatch();
788         StopWatch solrCommitTime = new StopWatch();
789         indexSolrDocsTime.start();
790         SolrServer solr = null;
791         if ((null == solrDocs) || (solrDocs.isEmpty())) {
792             return SUCCESS + "-0";
793         }
794         solr = SolrServerManager.getInstance().getSolrServer();
795         if (solrDocs.size() > BATCH_SIZE) {
796             int numSolrDocs = solrDocs.size();
797             for (int fromIndex = 0; fromIndex < numSolrDocs; fromIndex += BATCH_SIZE) {
798                 int toIndex = fromIndex + BATCH_SIZE;
799                 if (toIndex > numSolrDocs) {
800                     toIndex = numSolrDocs;
801                 }
802                 List batchSolrDocs = solrDocs.subList(fromIndex, toIndex);
803                 if ((null != batchSolrDocs) && (!batchSolrDocs.isEmpty())) {
804                     LOG.info("Indexing records. fromIndex=" + fromIndex + ", toIndex=" + toIndex);
805                     UpdateResponse response = solr.add(solrDocs);
806                 }
807             }
808         }
809         else {
810             LOG.debug("Indexing records. size=" + solrDocs.size());
811             UpdateResponse response = solr.add(solrDocs);
812         }
813         indexSolrDocsTime.stop();
814         solrCommitTime.start();
815         if (commit) {
816             LOG.info("Bulk ingest: Index commit started. Number of records being committed: " + bulkLoadStatistics
817                     .getCommitRecCount());
818             solr.commit(waitFlush, waitSearcher);
819         }
820         solrCommitTime.stop();
821         if (optimize) {
822             solr.optimize(waitFlush, waitSearcher);
823         }
824 
825 
826         LOG.debug("Time Consumptions...: Solr input docs of size ..." + solrDocs.size()
827                   + "\t time taken to index solr Input Docs" + indexSolrDocsTime + "solrcommit & Optimize"
828                   + solrCommitTime);
829         batchStatistics.setTimeToIndexSolrInputDocs(indexSolrDocsTime.getTime());
830         batchStatistics.setTimeToSolrCommit(solrCommitTime.getTime());
831         return SUCCESS + "-" + solrDocs.size();
832     }
833 
834     protected String indexSolrDocuments(List<SolrInputDocument> solrDocs, boolean commit, boolean optimize)
835             throws Exception {
836         String result = indexSolrDocuments(solrDocs, commit, optimize, true, true);
837         return result;
838     }
839 
840     protected List<SolrInputDocument> convertToSolrDocs(String docCategory, String docType, String docFormat,
841                                                         String docContent) throws Exception {
842         List<SolrInputDocument> solrDocs = null;
843         if (DocCategory.WORK.isEqualTo(docCategory) && DocType.BIB.isEqualTo(docType) && DocFormat.MARC.isEqualTo(
844                 docFormat)) {
845             try {
846                 WorkBibMarcRecordProcessor recordProcessor = new WorkBibMarcRecordProcessor();
847                 solrDocs = new WorkBibMarcDocBuilder()
848                         .buildSolrInputDocuments(recordProcessor.fromXML(docContent).getRecords());
849             }
850             catch (Exception e) {
851                 e.printStackTrace();
852                 throw new Exception("Exception while converting given XML Document: ", e);
853             }
854         }
855         else if (DocCategory.WORK.isEqualTo(docCategory) && DocType.BIB.isEqualTo(docType) && DocFormat.DUBLIN_CORE
856                                                                                                        .isEqualTo(
857                                                                                                                docFormat)) {
858             WorkBibDublinRecordProcessor processor = new WorkBibDublinRecordProcessor();
859             WorkBibDublinRecord record = processor.fromXML(docContent);
860             solrDocs = new ArrayList<SolrInputDocument>();
861             solrDocs.add(new WorkBibDublinDocBuilder().buildSolrInputDocument(record));
862         }
863         else if (DocCategory.WORK.isEqualTo(docCategory) && DocType.BIB.isEqualTo(docType) && DocFormat
864                 .DUBLIN_UNQUALIFIED.isEqualTo(docFormat)) {
865             solrDocs = new WorkBibDublinUnQualifiedDocBuilder()
866                     .buildSolrInputDocuments(new WorkBibDublinUnQualifiedRecordProcessor().fromXML(docContent));
867         }
868         else {
869             throw new Exception("UnSupported Document Format: " + docCategory + ", " + docType + ", " + docFormat);
870         }
871         return solrDocs;
872     }
873 
874     protected String deleteDocumentByUUID(String uuid, String category, boolean commit) {
875         String result = SUCCESS;
876         try {
877             SolrServer solr = SolrServerManager.getInstance().getSolrServer();
878             solr.deleteById(uuid);
879             if (commit) {
880                 solr.commit();
881             }
882         }
883         catch (Exception e) {
884             result = buildFailureMsg();
885             LOG.error(result, e);
886         }
887         return result;
888     }
889 
890     protected String deleteDocumentByUUID(String uuid, String category) {
891         return deleteDocumentByUUID(uuid, category, true);
892     }
893 
894     protected String deleteDocumentsByUUIDList(List<String> uuidList, String category, boolean commit) {
895         String result = SUCCESS;
896         try {
897             SolrServer solr = SolrServerManager.getInstance().getSolrServer();
898             List<String> uuidList1 = new ArrayList<String>();
899             if (uuidList.size() > 0) {
900                 for (String id : uuidList) {
901                     if (id != null) {
902                         uuidList1.add(id);
903                     }
904                 }
905             }
906             if (uuidList1.size() > 0) {
907                 solr.deleteById(uuidList1);
908             }
909             if (commit) {
910                 solr.commit();
911             }
912         }
913         catch (Exception e) {
914             result = buildFailureMsg();
915             LOG.error(result, e);
916         }
917         return result;
918     }
919 
920     protected String deleteDocumentsByUUIDList(List<String> uuidsList, String category)
921             throws SolrServerException, MalformedURLException {
922         List<String> deleteUuidsList = new ArrayList<String>();
923         List<String> holdingsIdentifierList = new ArrayList<String>();
924         List<String> itemIdentifierList = new ArrayList<String>();
925         SolrServer solr = SolrServerManager.getInstance().getSolrServer();
926         SolrQuery query = new SolrQuery();
927         deleteUuidsList.addAll(uuidsList);
928         for (int i = 0; i < uuidsList.size(); i++) {
929             query.setQuery("id:" + uuidsList.get(i));
930             QueryResponse response = solr.query(query);
931             LOG.debug("query-->" + query);
932             for (SolrDocument doc : response.getResults()) {
933                 LOG.debug("doc" + doc.toString());
934                 String docFormat = (String) doc.getFieldValue(DOC_FORMAT);
935                 String docType = (String) doc.getFieldValue(DOC_TYPE);
936                 if (docType.equalsIgnoreCase(BIBLIOGRAPHIC)) {
937                 }
938                 else if (docType.equalsIgnoreCase(INSTANCE)) {
939                     if (doc.getFieldValue(ITEM_IDENTIFIER) instanceof List) {
940                         itemIdentifierList = (List<String>) doc.getFieldValue(ITEM_IDENTIFIER);
941                     }
942                     else {
943                         itemIdentifierList.add((String) doc.getFieldValue(ITEM_IDENTIFIER));
944                     }
945                     if (doc.getFieldValue(HOLDINGS_IDENTIFIER) instanceof String) {
946                         holdingsIdentifierList.add((String) doc.getFieldValue(HOLDINGS_IDENTIFIER));
947                     }
948                     else {
949                         holdingsIdentifierList = (List<String>) doc.getFieldValue(HOLDINGS_IDENTIFIER);
950                     }
951                     if (holdingsIdentifierList != null && holdingsIdentifierList.size() > 0) {
952                         deleteUuidsList.addAll(holdingsIdentifierList);
953                     }
954                     if (itemIdentifierList != null && itemIdentifierList.size() > 0) {
955                         deleteUuidsList.addAll(itemIdentifierList);
956 
957                     }
958                 }
959             }
960         }
961         return deleteDocumentsByUUIDList(deleteUuidsList, category, true);
962     }
963 
964     protected String buildDeleteQueryParamsForDeleteUrl(List<String> uuidList, boolean commit) {
965         StringBuffer deleteQueryBuffer = new StringBuffer("");
966         deleteQueryBuffer.append("stream.body=");
967         deleteQueryBuffer.append("<delete>");
968         for (int i = 0; i < uuidList.size(); i++) {
969             deleteQueryBuffer.append("<query>");
970             deleteQueryBuffer.append("id:");
971             deleteQueryBuffer.append(uuidList.get(i));
972             deleteQueryBuffer.append("</query>");
973         }
974         deleteQueryBuffer.append("</delete>");
975         if (commit) {
976             deleteQueryBuffer.append("&stream.body=<commit/>");
977         }
978         return deleteQueryBuffer.toString();
979 
980     }
981 
982     protected String buildDeleteQuery(String uuid, String category, boolean commit) {
983         StringBuffer deleteQueryUrl = new StringBuffer("");
984         if (commit) {
985             deleteQueryUrl.append(SolrServerManager.getInstance().getSolrCoreURL());
986             deleteQueryUrl.append("/update?stream.body=<delete><query>id:" + uuid
987                                   + "</query></delete>&stream.body=<commit/>");
988         }
989         else {
990             deleteQueryUrl.append(SolrServerManager.getInstance().getSolrCoreURL());
991             deleteQueryUrl.append("/update?stream.body=<delete><query>id:" + uuid + "</query></delete>");
992         }
993         return deleteQueryUrl.toString();
994     }
995 
996     /**
997      * @param inputURL
998      * @throws Exception
999      */
1000     protected void openConnection(URL inputURL) throws Exception {
1001         HttpURLConnection urlConnection = (HttpURLConnection) inputURL.openConnection();
1002         urlConnection.setDoOutput(true);
1003         urlConnection.connect();
1004         OutputStreamWriter streamWriter = new OutputStreamWriter(urlConnection.getOutputStream());
1005         streamWriter.flush();
1006         // Get the response from inputURL
1007         BufferedReader bufferReader = new BufferedReader(new InputStreamReader(urlConnection.getInputStream()));
1008         String xmlResponse;
1009         while ((xmlResponse = bufferReader.readLine()) != null) {
1010             if (LOG.isDebugEnabled()) {
1011                 LOG.debug("XmlResponse->" + xmlResponse);
1012             }
1013         }
1014     }
1015 
1016     protected String getErrorID() {
1017         return String.valueOf(new Date().getTime());
1018     }
1019 
1020     protected String buildFailureMsg(String id, String msg) {
1021         StringBuilder sb = new StringBuilder();
1022         sb.append(FAILURE).append("-ErrorID:");
1023         if (null != id) {
1024             sb.append(id);
1025         }
1026         else {
1027             sb.append(getErrorID());
1028         }
1029         if (null != msg) {
1030             sb.append("-ErrorMsg:").append(msg);
1031         }
1032         return sb.toString();
1033     }
1034 
1035     protected String buildFailureMsg() {
1036         return FAILURE + "-ErrorID:" + getErrorID();
1037     }
1038 
1039     public QueryResponse searchBibRecord(String docCat, String docType, String docFormat, String fieldName,
1040                                          String fieldValue, String fieldList) {
1041         QueryResponse response = null;
1042         String result = null;
1043         try {
1044             String identifier_args = "(" + fieldName + ":" + fieldValue + ")";
1045             String docCategory_args = "(DocCategory" + ":" + docCat + ")";
1046             String docType_args = "(DocType" + ":" + docType + ")";
1047             String docFormat_args = "(DocFormat" + ":" + docFormat + ")";
1048             String args = identifier_args + "AND" + docCategory_args + "AND" + docType_args + "AND" + docFormat_args;
1049             SolrServer solr = new CommonsHttpSolrServer(
1050                     PropertyUtil.getPropertyUtil().getProperty("docSearchURL") + "bib");
1051             SolrQuery query = new SolrQuery();
1052             query.addField(fieldList);
1053             query.setQuery(args);
1054             response = solr.query(query);
1055         }
1056         catch (Exception e) {
1057             result = buildFailureMsg();
1058             LOG.error(result, e);
1059         }
1060         return response;
1061     }
1062 
1063     @Override
1064     public void cleanupDiscoveryData() throws IOException, SolrServerException {
1065         SolrServer server = null;
1066         try {
1067             server = SolrServerManager.getInstance().getSolrServer();
1068         }
1069         catch (SolrServerException e) {
1070             e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
1071         }
1072         server.deleteByQuery("*:*");
1073         server.commit();
1074 
1075     }
1076 }