View Javadoc
1   package org.kuali.ole.docstore.service;
2   
3   import org.apache.commons.lang.time.StopWatch;
4   import org.apache.cxf.common.util.StringUtils;
5   import org.kuali.ole.RepositoryManager;
6   import org.kuali.ole.docstore.OleDocStoreException;
7   import org.kuali.ole.docstore.factory.DocstoreFactory;
8   import org.kuali.ole.docstore.model.xmlpojo.ingest.Request;
9   import org.kuali.ole.docstore.model.xmlpojo.ingest.RequestDocument;
10  import org.kuali.ole.docstore.model.xmlpojo.ingest.Response;
11  import org.kuali.ole.docstore.model.xmlpojo.ingest.ResponseDocument;
12  import org.kuali.ole.docstore.model.xstream.ingest.ResponseHandler;
13  import org.kuali.ole.docstore.process.BulkIngestNIndexProcessor;
14  import org.kuali.ole.docstore.process.BulkLoadHandler;
15  import org.kuali.ole.docstore.process.DocStoreCamelContext;
16  import org.kuali.ole.docstore.process.ProcessParameters;
17  import org.kuali.ole.docstore.process.batch.BulkProcessRequest;
18  import org.kuali.ole.docstore.transaction.TransactionManager;
19  import org.kuali.ole.docstore.util.ItemExistsException;
20  import org.kuali.ole.docstore.utility.BatchIngestStatistics;
21  import org.kuali.ole.docstore.utility.BulkIngestStatistics;
22  import org.kuali.ole.pojo.OleException;
23  import org.slf4j.Logger;
24  import org.slf4j.LoggerFactory;
25  
26  import javax.jcr.RepositoryException;
27  import java.io.FileNotFoundException;
28  import java.util.ArrayList;
29  import java.util.List;
30  import java.util.Set;
31  
32  /**
33   * Implements DocumentService interface.
34   * User: tirumalesh.b
35   * Date: 28/8/12 Time: 12:09 PM
36   */
37  public class DocumentServiceImpl
38          implements DocumentService {
39      private static Logger logger = LoggerFactory.getLogger(BulkIngestNIndexProcessor.class);
40      private static DocumentServiceImpl documentService = new DocumentServiceImpl();
41      private BulkProcessRequest bulkIngestRequest = null;
42      //    private        BulkLoadHandler              bulkLoadHandler              = null;
43      //    private        BulkIngestNIndexRouteBuilder bulkIngestNIndexRouteBuilder = null;
44      //    private        BulkIngestNIndexProcessor    bulkIngestNIndexProcessor    = null;
45      //    private        BulkIngestStatistics         bulkLoadStatistics           = null;
46      //    private BulkIngestProcessHandlerService bIService        = (BulkIngestProcessHandlerService) BeanLocator
47      //                                                                     .getBean("bulkIngestProcessHandlerService");
48      protected RepositoryManager repositoryManager;
49      private TransactionManager transactionManager;
50  
51      public static DocumentServiceImpl getInstance() {
52          return documentService;
53      }
54  
55      private DocumentServiceImpl() {
56          try {
57              this.repositoryManager = RepositoryManager.getRepositoryManager();
58          } catch (OleException oe) {
59              //throw new OleDocStoreException(oe);
60              // TODO: log the exception
61          }
62      }
63  
64      /**
65       * @inheritDoc
66       */
67      @Override
68      public Response process(Request request) throws OleDocStoreException, RepositoryException, OleException, FileNotFoundException {
69          authenticateAndAuthorizeUser(request);
70          validateInput(request);
71          DocstoreFactory docstoreFactory = BeanLocator.getDocstoreFactory();
72  
73          List<RequestDocument> requestDocuments = request.getRequestDocuments();
74          if (transactionManager==null){
75              transactionManager = docstoreFactory.getTransactionManager(requestDocuments.get(0).getCategory(), requestDocuments.get(0).getType(), requestDocuments.get(0).getFormat());
76          }
77  
78  
79          List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
80          Response response = new Response();
81          response.setOperation(request.getOperation());
82          response.setUser(request.getUser());
83          try {
84              if (request.getOperation().equals(Request.Operation.ingest.toString())) {
85                  transactionManager.startTransaction(request.getUser(), request.getOperation());
86                  try {
87                      responseDocuments = transactionManager.ingest(requestDocuments);
88                      transactionManager.commit();
89                      transactionManager.closeSession();
90                  } catch (OleDocStoreException e) {
91                      transactionManager.abort();
92                      transactionManager.closeSession();
93                      throw e;
94                  }
95                  response.setStatus("Success");
96                  response.setMessage("Documents ingested.");
97                  response.setStatusMessage("Documents ingested successfully.");
98                  response.setDocuments(responseDocuments);
99              } else if (request.getOperation().equals(Request.Operation.checkIn.toString())) {
100                 transactionManager.startTransaction(request.getUser(), request.getOperation());
101                 try {
102                     responseDocuments = transactionManager.checkIn(request.getRequestDocuments());
103                     transactionManager.commit();
104                     transactionManager.closeSession();
105                 } catch (OleDocStoreException e) {
106                     transactionManager.abort();
107                     transactionManager.closeSession();
108                     throw e;
109                 }
110                 response.setStatus("Success");
111                 response.setMessage("Documents Checked In .");
112                 response.setStatusMessage("Documents Checked In successfully.");
113                 response.setDocuments(responseDocuments);
114             } else if (request.getOperation().equals(Request.Operation.checkOut.toString())) {
115                 transactionManager.startSession(request.getUser(), request.getOperation());
116                 responseDocuments = transactionManager.checkOut(request.getRequestDocuments(), request.getUser());
117                 transactionManager.closeSession();
118                 response.setStatus("Success");
119                 response.setMessage("Documents Checked Out.");
120                 response.setStatusMessage("Documents Checked Out successfully.");
121                 response.setDocuments(responseDocuments);
122             } else if (request.getOperation().equalsIgnoreCase(Request.Operation.deleteVerify.toString())) {
123                 transactionManager.startTransaction(request.getUser(), request.getOperation());
124                 try {
125                     responseDocuments = transactionManager.deleteVerify(request.getRequestDocuments());
126                     response.setStatus("Success");
127                     response.setMessage("Delete Verify success");
128                     response.setStatusMessage("Delete Verify success");
129                     response.setDocuments(responseDocuments);
130                     //                response.setOperation("deleteVerify");
131                     //                response.setMessage("success");
132                     //                response.setStatus("success");
133                     //                response.setUser("kuali");
134                     //                response.setDocuments(responseDocuments);
135                     logger.debug("deleteVerify toXML " + new ResponseHandler().toXML(response));
136 
137                 } catch (OleDocStoreException e) {
138                     transactionManager.abort();
139                     transactionManager.closeSession();
140                     throw e;
141                 }
142             } else if (request.getOperation().contains(Request.Operation.delete.toString())) {
143                 transactionManager.startTransaction(request.getUser(), request.getOperation());
144                 try {
145                     responseDocuments = transactionManager.delete(request.getRequestDocuments());
146                     transactionManager.commit();
147                     transactionManager.closeSession();
148                 } catch (OleDocStoreException e) {
149                     transactionManager.abort();
150                     transactionManager.closeSession();
151                     throw e;
152                 }
153                 response.setStatus("Success");
154                 response.setMessage("Documents deleted");
155                 response.setStatusMessage("Documents deleted successfully");
156                 response.setDocuments(responseDocuments);
157             } else if (request.getOperation().equalsIgnoreCase(Request.Operation.bind.toString())) {
158                 transactionManager.startTransaction(request.getUser(), request.getOperation());
159                 try {
160                     responseDocuments = transactionManager.bind(request.getRequestDocuments(), request.getOperation());
161                     transactionManager.commit();
162                     transactionManager.closeSession();
163                 } catch (Exception e) {
164                     transactionManager.abort();
165                     transactionManager.closeSession();
166                     logger.error(e.getMessage() , e);
167                 }
168                 response.setStatus("Success");
169                 response.setMessage("Documents bounded In .");
170                 response.setStatusMessage("Documents bounded In successfully.");
171                 response.setDocuments(responseDocuments);
172 //            } else if (request.getOperation().equalsIgnoreCase(Request.Operation.unbind.toString())) {
173 //                transactionManager.startTransaction(request.getUser(), request.getOperation());
174 //                responseDocuments = transactionManager.unbind(request.getRequestDocuments(), request.getOperation());
175 //                transactionManager.commit();
176 //                transactionManager.closeSession();
177 //                response.setStatus("Success");
178 //                response.setMessage("Documents unbounded.");
179 //                response.setStatusMessage("Documents unbounded successfully.");
180 //                response.setDocuments(responseDocuments);
181             } else if (request.getOperation().contains(Request.Operation.transferInstances.toString())) {
182                 try {
183 
184                     transactionManager.startTransaction(request.getUser(), request.getOperation());
185                     transactionManager.transferInstances(request.getRequestDocuments());
186                     transactionManager.commit();
187                     transactionManager.closeSession();
188                 } catch (Exception e) {
189                     transactionManager.abort();
190                     transactionManager.closeSession();
191                     logger.error(e.getMessage() , e);
192                 }
193             } else if (request.getOperation().contains(Request.Operation.transferItems.toString())) {
194                 try {
195                     transactionManager.startTransaction(request.getUser(), request.getOperation());
196                     transactionManager.transferItems(request.getRequestDocuments());
197                     transactionManager.commit();
198                     transactionManager.closeSession();
199                     response.setStatus("success");
200                     response.setMessage("Transfer Items success ");
201                     response.setStatusMessage("Status : Transfer of items success ");
202                     response.setDocuments(responseDocuments);
203                 } catch (ItemExistsException itemExistsException) {
204                     System.out.println("itemExistsException" + itemExistsException);
205                     response.setStatus("failure");
206                     response.setMessage("Transfer Items Failed " + itemExistsException.getMessage());
207                     response.setStatusMessage("Status : Transfer of items failed " + itemExistsException.getMessage());
208                     response.setDocuments(responseDocuments);
209                     transactionManager.abort();
210                     transactionManager.closeSession();
211                 } catch (Exception e) {
212                     logger.error(e.getMessage() , e);
213                     transactionManager.abort();
214                     transactionManager.closeSession();
215                 }
216             }
217 
218         } catch (OleDocStoreException ode) {
219             logger.error("", ode);
220             response.setStatus("Failed");
221             response.setMessage("Operation failed.");
222             response.setStatusMessage(ode.getMessage());
223             response.setDocuments(responseDocuments);
224         } catch (Exception e) {
225             logger.error(e.getMessage() , e);
226         }
227         return response;
228     }
229 
230     /**
231      * @inheritDoc
232      */
233     @Override
234     public void bulkProcess(BulkProcessRequest request) throws Exception {
235         authenticateAndAuthorizeUser(request);
236         //The following line is commented out as it was handled in method bulkIngest()
237         // validateBulkProcessInput(request, null);
238 
239         Response response = null;
240         if (request.getOperation().equals(BulkProcessRequest.BulkProcessOperation.INGEST)) {
241             response = bulkIngestManage(request);
242         }
243     }
244 
245     /**
246      * Manages the bulk ingest process.
247      */
248     private Response bulkIngestManage(BulkProcessRequest request) throws Exception {
249         if ((request.getAction()).equals(BulkProcessRequest.BulkProcessAction.START)) {
250             if (request.getDataFormat().equals(BulkProcessRequest.BulkIngestDataFormat.DOCSTORE)) {
251                 if (bulkIngestRequest != null) {
252                     //DocStoreCamelContext.getInstance().resume();
253                     throw new OleDocStoreException("Bulk ingest already running!");
254                 } else {
255                     BeanLocator.getBulkIngestProcessHandlerService().startBulkIngestForDocStoreRequestFormat(request.getBulkIngestFolder());
256                     BulkLoadHandler bulkLoadHandler = BeanLocator.getBulkIngestProcessHandlerService().getLoadHandler();
257                     BulkIngestNIndexProcessor bulkIngestNIndexProcessor = bulkLoadHandler.getBulkRoute()
258                             .getBulkIngestNIndexProcessor();
259                     request.setBulkIngestStatistics(BulkIngestStatistics.getInstance());
260                     bulkIngestNIndexProcessor.setBulkProcessRequest(request);
261                     //bulkLoadStatistics = bulkIngestNIndexProcessor.getBulkLoadStatistics();
262                     bulkIngestRequest = request;
263                     DocStoreCamelContext.getInstance().resume();
264                 }
265             } else if (request.getDataFormat().equals(BulkProcessRequest.BulkIngestDataFormat.STANDARD)) {
266                 String folder = request.getDataFolder();
267                 if (folder != null && folder.trim().length() != 0) {
268                     BeanLocator.getBulkIngestProcessHandlerService()
269                             .startBulkIngestForStandardXMLFormat(request.getDataFolder(), request.getDocCategory(),
270                                     request.getDocType(), request.getDocFormat(), request.getBulkIngestFolder());
271                 }
272             }
273         } else if ((request.getAction()).equals(BulkProcessRequest.BulkProcessAction.STATUS)) {
274             logger.info(bulkIngestRequest.getBulkIngestStatistics().getJsonString());
275         } else if ((request.getAction()).equals(BulkProcessRequest.BulkProcessAction.STOP)) {
276             DocStoreCamelContext.getInstance().suspend();
277         } else if ((request.getAction()).equals(BulkProcessRequest.BulkProcessAction.CLEAR)) {
278             bulkIngestRequest.getBulkIngestStatistics().clearBulkIngestStatistics();
279         }
280         return null;
281     }
282 
283     public void bulkIngest(BulkProcessRequest bulkProcessRequest, List<RequestDocument> requestDocuments)
284             throws Exception {
285         TransactionManager transactionManager = bulkProcessRequest.getTransactionManager();
286         validateBulkProcessInput(bulkProcessRequest, requestDocuments);
287         if (null == transactionManager) {
288             transactionManager = BeanLocator.getDocstoreFactory().getTransactionManager(requestDocuments.get(0).getCategory(), requestDocuments.get(0).getType(), requestDocuments.get(0).getFormat());
289             transactionManager.startTransaction(bulkProcessRequest.getUser(),
290                     "bulk" + bulkProcessRequest.getOperation().toString());
291             bulkProcessRequest.setTransactionManager(transactionManager);
292         }
293         //        if (bulkProcessRequest.getPreviousBatchDocuments() == null) {
294         //            List<RequestDocument> newList = new ArrayList<RequestDocument>();
295         //            bulkProcessRequest.setPreviousBatchDocuments(newList);
296         //        }
297         //        bulkProcessRequest.getPreviousBatchDocuments().addAll(requestDocuments);
298 
299         batchIngest(bulkProcessRequest, requestDocuments);
300 
301     }
302 
303     public void batchIngest(BulkProcessRequest bulkProcessRequest, List<RequestDocument> requestDocuments) {
304         BulkIngestStatistics bulkIngestStatistics = bulkProcessRequest.getBulkIngestStatistics();
305         BatchIngestStatistics batchStatistics = bulkIngestStatistics.getCurrentBatch();
306         long commitSize = ProcessParameters.BULK_INGEST_COMMIT_SIZE;
307         logger.debug("commitSize = " + commitSize);
308         logger.debug("bulkIngestNIndex(" + requestDocuments.size() + ") START");
309         logger.debug("BULK_INGEST_IS_LINKING_ENABLED=" + ProcessParameters.BULK_INGEST_IS_LINKING_ENABLED);
310         StopWatch totalTimer = new StopWatch();
311         StopWatch sessionSaveTimer = new StopWatch();
312         long recCount = requestDocuments.size();
313         batchStatistics.setRecCount(recCount);
314 
315         boolean isCommit = false;
316         totalTimer.start();
317         TransactionManager transactionManager = bulkProcessRequest.getTransactionManager();
318         try {
319             transactionManager.batchIngest(bulkProcessRequest, requestDocuments);
320             bulkIngestStatistics.setCommitRecCount(bulkIngestStatistics.getCommitRecCount() + recCount);
321             if (bulkIngestStatistics.getCommitRecCount() == commitSize || bulkIngestStatistics.isLastBatch()) {
322                 isCommit = true;
323             }
324             if (isCommit) {
325                 sessionSaveTimer.start();
326                 logger.info("Bulk ingest: Commit started. Number of records being committed : " + bulkIngestStatistics
327                         .getCommitRecCount());
328                 transactionManager.commit();
329                 bulkIngestStatistics.setCommitRecCount(0);
330                 sessionSaveTimer.stop();
331             }
332 
333             // Documents processed can be different from records processed as in the case of Instance data.
334             logger.debug("Documents processed: " + recCount);
335             bulkIngestStatistics.setFileRecCount(bulkIngestStatistics.getFileRecCount() + recCount);
336             logger.info(
337                     "Bulk ingest: Records processed in the current file :" + bulkIngestStatistics.getFileRecCount());
338         } catch (Exception e) {
339             transactionManager.abort();
340             bulkIngestStatistics.setCommitRecCount(0);
341             logger.error("Document Ingest & Index Failed, Cause: " + e.getMessage(), e);
342         }
343         totalTimer.stop();
344         batchStatistics.setTimeToSaveJcrSession(sessionSaveTimer.getTime());
345         batchStatistics.setIngestingTime(
346                 batchStatistics.getTimeToCreateNodesInJcr() + batchStatistics.getTimeToSaveJcrSession());
347         batchStatistics
348                 .setIndexingTime(batchStatistics.getTimeToIndexSolrInputDocs() + batchStatistics.getTimeToSolrCommit());
349         batchStatistics.setIngestNIndexTotalTime(totalTimer.getTime());
350     }
351 
352 
353     public void setRepositoryManager(RepositoryManager repositoryManager) {
354         this.repositoryManager = repositoryManager;
355     }
356 
357     /*
358      *
359      *
360      * @param request
361      */
362 
363     /**
364      * Validates the given request for normal processing.
365      *
366      * @param request
367      * @throws OleDocStoreException - if the operation is invalid
368      *                              - if no documents are specified
369      */
370     private void validateInput(Request request) throws OleDocStoreException {
371         Set<String> validOperationSet = Request.validOperationSet;
372         if (StringUtils.isEmpty(request.getUser())) {
373             throw new OleDocStoreException("User cannot be null or empty. Please verify input file.");
374         }
375 
376         if (StringUtils.isEmpty(request.getOperation())) {
377             throw new OleDocStoreException("Operation cannot be null or empty. Please verify input file");
378         } else {
379             //verify for valid docstore operation
380             if (validOperationSet.contains(request.getOperation())) {
381                 for (RequestDocument requestDocument : request.getRequestDocuments()) {
382                     requestDocument.setUser(request.getUser());
383                     requestDocument.setOperation(request.getOperation());
384                 }
385             } else {
386                 throw new OleDocStoreException("Not a valid Docstore operation:" + request.getOperation());
387             }
388         }
389     }
390 
391     /**
392      * Verifies whether the user is authenticated and authorized to execute the request.
393      *
394      * @param request
395      * @throws OleDocStoreException
396      */
397     private void authenticateAndAuthorizeUser(Request request) throws OleDocStoreException {
398         // TODO: Implement later.
399     }
400 
401     private void authenticateAndAuthorizeUser(BulkProcessRequest request) throws OleDocStoreException {
402         // TODO: Implement later.
403     }
404 
405     /**
406      * Validates the given request for bulk processing.
407      *
408      * @param request
409      * @throws OleDocStoreException - if the operation is invalid
410      *                              - if no documents are specified
411      *                              - if all documents are not of same [cat-type-format]
412      */
413     private void validateBulkProcessInput(BulkProcessRequest request, List<RequestDocument> requestDocuments)
414             throws OleDocStoreException {
415 
416         Set<String> validOperationSet = BulkProcessRequest.validOperationSet;
417         if (StringUtils.isEmpty(request.getUser())) {
418             request.setUser("BulkIngest-User");
419         }
420 
421         if (StringUtils.isEmpty(request.getOperation().toString())) {
422             throw new OleDocStoreException("Operation cannot be null or empty. Please verify input file");
423         } else {
424             //verify for valid docstore operation
425             if (validOperationSet.contains(request.getOperation().toString())) {
426                 for (RequestDocument requestDocument : requestDocuments) {
427                     requestDocument.setUser(request.getUser());
428                     requestDocument.setOperation(request.getOperation().toString());
429                 }
430             } else {
431                 throw new OleDocStoreException("Not a valid Docstore operation:" + request.getOperation());
432             }
433         }
434     }
435 
436     public BulkProcessRequest getBulkIngestRequest() {
437         return bulkIngestRequest;
438     }
439 
440     public void setBulkIngestRequest(BulkProcessRequest bulkIngestRequest) {
441         this.bulkIngestRequest = bulkIngestRequest;
442     }
443 
444     public void setTransactionManager(TransactionManager transactionManager) {
445         this.transactionManager = transactionManager;
446     }
447 
448 }