View Javadoc

1   package org.kuali.ole.docstore.service;
2   
3   import org.apache.commons.lang.time.StopWatch;
4   import org.kuali.ole.RepositoryManager;
5   import org.kuali.ole.docstore.OleDocStoreException;
6   import org.kuali.ole.docstore.model.xmlpojo.ingest.Request;
7   import org.kuali.ole.docstore.model.xmlpojo.ingest.RequestDocument;
8   import org.kuali.ole.docstore.model.xmlpojo.ingest.Response;
9   import org.kuali.ole.docstore.model.xmlpojo.ingest.ResponseDocument;
10  import org.kuali.ole.docstore.process.BulkIngestNIndexProcessor;
11  import org.kuali.ole.docstore.process.BulkLoadHandler;
12  import org.kuali.ole.docstore.process.DocStoreCamelContext;
13  import org.kuali.ole.docstore.process.ProcessParameters;
14  import org.kuali.ole.docstore.process.batch.BulkProcessRequest;
15  import org.kuali.ole.docstore.transaction.TransactionManager;
16  import org.kuali.ole.docstore.utility.BatchIngestStatistics;
17  import org.kuali.ole.docstore.utility.BulkIngestStatistics;
18  import org.kuali.ole.pojo.OleException;
19  import org.slf4j.Logger;
20  import org.slf4j.LoggerFactory;
21  
22  import java.util.ArrayList;
23  import java.util.List;
24  
25  /**
26   * Implements DocumentService interface.
27   * User: tirumalesh.b
28   * Date: 28/8/12 Time: 12:09 PM
29   */
30  public class DocumentServiceImpl
31          implements DocumentService {
32      private static Logger              logger            = LoggerFactory.getLogger(BulkIngestNIndexProcessor.class);
33      private static DocumentServiceImpl documentService   = new DocumentServiceImpl();
34      private        BulkProcessRequest  bulkIngestRequest = null;
35      //    private        BulkLoadHandler              bulkLoadHandler              = null;
36      //    private        BulkIngestNIndexRouteBuilder bulkIngestNIndexRouteBuilder = null;
37      //    private        BulkIngestNIndexProcessor    bulkIngestNIndexProcessor    = null;
38      //    private        BulkIngestStatistics         bulkLoadStatistics           = null;
39      //    private BulkIngestProcessHandlerService bIService        = (BulkIngestProcessHandlerService) BeanLocator
40      //                                                                     .getBean("bulkIngestProcessHandlerService");
41      protected RepositoryManager repositoryManager;
42  
43      public static DocumentServiceImpl getInstance() {
44          return documentService;
45      }
46  
47      private DocumentServiceImpl() {
48          try {
49              this.repositoryManager = RepositoryManager.getRepositoryManager();
50          }
51          catch (OleException oe) {
52              //throw new OleDocStoreException(oe);
53              // TODO: log the exception
54          }
55      }
56  
57      /**
58       * @inheritDoc
59       */
60      @Override
61      public Response process(Request request) throws OleDocStoreException {
62          authenticateAndAuthorizeUser(request);
63          validateInput(request);
64  
65          TransactionManager transactionManager = new TransactionManager();
66          transactionManager.startTransaction(request.getUser(), request.getOperation());
67          List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
68          Response response = new Response();
69          response.setOperation(request.getOperation());
70          response.setUser(request.getUser());
71          try {
72              if (request.getOperation().equals(Request.Operation.ingest.toString())) {
73                  responseDocuments = transactionManager.ingest(request.getRequestDocuments());
74                  transactionManager.commit();
75  
76                  response.setStatus("Success");
77                  response.setMessage("Documents ingested.");
78                  response.setStatusMessage("Documents ingested successfully.");
79                  response.setDocuments(responseDocuments);
80              }
81  
82              else if (request.getOperation().equals(Request.Operation.checkIn.toString())) {
83                  responseDocuments = transactionManager.checkIn(request.getRequestDocuments(), request.getOperation());
84                  transactionManager.commit();
85  
86                  response.setStatus("Success");
87                  response.setMessage("Documents Checked In .");
88                  response.setStatusMessage("Documents Checked In successfully.");
89                  response.setDocuments(responseDocuments);
90              }
91              else if (request.getOperation().equals(Request.Operation.checkOut.toString())) {
92                  responseDocuments = transactionManager.checkOut(request.getRequestDocuments(), request.getUser());
93  
94                  response.setStatus("Success");
95                  response.setMessage("Documents Checked Out.");
96                  response.setStatusMessage("Documents Checked Out successfully.");
97                  response.setDocuments(responseDocuments);
98              }
99              else if (request.getOperation().contains(Request.Operation.delete.toString())) {
100                 responseDocuments = transactionManager.delete(request.getRequestDocuments());
101                 transactionManager.commit();
102                 response.setStatus("Success");
103                 response.setMessage("Documents deleted");
104                 response.setStatusMessage("Documents deleted successfully");
105                 response.setDocuments(responseDocuments);
106             }
107         }
108         catch (OleDocStoreException ode) {
109             logger.error("", ode);
110             transactionManager.abort();
111             response.setStatus("Failure");
112             response.setMessage("Operation failed.");
113             response.setStatusMessage("Operation failed.");
114             response.setDocuments(responseDocuments);
115         }
116 
117         return response;
118     }
119 
120     /**
121      * @inheritDoc
122      */
123     @Override
124     public void bulkProcess(BulkProcessRequest request) throws Exception {
125         authenticateAndAuthorizeUser(request);
126         validateBulkProcessInput(request);
127 
128         Response response = null;
129         if (request.getOperation().equals(BulkProcessRequest.BulkProcessOperation.INGEST)) {
130             response = bulkIngestManage(request);
131         }
132     }
133 
134     /**
135      * Manages the bulk ingest process.
136      */
137     private Response bulkIngestManage(BulkProcessRequest request) throws Exception {
138         if ((request.getAction()).equals(BulkProcessRequest.BulkProcessAction.START)) {
139             if (request.getDataFormat().equals(BulkProcessRequest.BulkIngestDataFormat.DOCSTORE)) {
140                 if (bulkIngestRequest != null) {
141                     //DocStoreCamelContext.getInstance().resume();
142                     throw new OleDocStoreException("Bulk ingest already running!");
143                 }
144                 else {
145                     BeanLocator.getBulkIngestProcessHandlerService().startBulkIngestForDocStoreRequestFormat(null);
146                     BulkLoadHandler bulkLoadHandler = BeanLocator.getBulkIngestProcessHandlerService().getLoadHandler();
147                     BulkIngestNIndexProcessor bulkIngestNIndexProcessor = bulkLoadHandler.getBulkRoute()
148                                                                                          .getBulkIngestNIndexProcessor();
149                     request.setBulkIngestStatistics(BulkIngestStatistics.getInstance());
150                     bulkIngestNIndexProcessor.setBulkProcessRequest(request);
151                     //bulkLoadStatistics = bulkIngestNIndexProcessor.getBulkLoadStatistics();
152                     bulkIngestRequest = request;
153                     DocStoreCamelContext.getInstance().resume();
154                 }
155             }
156             else if (request.getDataFormat().equals(BulkProcessRequest.BulkIngestDataFormat.STANDARD)) {
157                 String folder = request.getDataFolder();
158                 if (folder != null && folder.trim().length() != 0) {
159                     BeanLocator.getBulkIngestProcessHandlerService()
160                                .startBulkIngestForStandardXMLFormat(request.getDataFolder(), request.getDocCategory(),
161                                                                     request.getDocType(), request.getDocFormat(),null);
162                 }
163             }
164         }
165         else if ((request.getAction()).equals(BulkProcessRequest.BulkProcessAction.STATUS)) {
166             logger.info(bulkIngestRequest.getBulkIngestStatistics().getJsonString());
167         }
168         else if ((request.getAction()).equals(BulkProcessRequest.BulkProcessAction.STOP)) {
169             DocStoreCamelContext.getInstance().suspend();
170         }
171         else if ((request.getAction()).equals(BulkProcessRequest.BulkProcessAction.CLEAR)) {
172             bulkIngestRequest.getBulkIngestStatistics().clearBulkIngestStatistics();
173         }
174         return null;
175     }
176 
177     public void bulkIngest(BulkProcessRequest bulkProcessRequest, List<RequestDocument> requestDocuments)
178             throws OleDocStoreException {
179         TransactionManager transactionManager = bulkProcessRequest.getTransactionManager();
180         if (null == transactionManager) {
181             transactionManager = new TransactionManager();
182             transactionManager.startTransaction(bulkProcessRequest.getUser(),
183                                                 "bulk" + bulkProcessRequest.getOperation().toString());
184             bulkProcessRequest.setTransactionManager(transactionManager);
185         }
186         //        if (bulkProcessRequest.getPreviousBatchDocuments() == null) {
187         //            List<RequestDocument> newList = new ArrayList<RequestDocument>();
188         //            bulkProcessRequest.setPreviousBatchDocuments(newList);
189         //        }
190         //        bulkProcessRequest.getPreviousBatchDocuments().addAll(requestDocuments);
191 
192         batchIngest(bulkProcessRequest, requestDocuments);
193 
194     }
195 
196     public void batchIngest(BulkProcessRequest bulkProcessRequest, List<RequestDocument> requestDocuments) {
197         BulkIngestStatistics bulkIngestStatistics = bulkProcessRequest.getBulkIngestStatistics();
198         BatchIngestStatistics batchStatistics = bulkIngestStatistics.getCurrentBatch();
199         long commitSize = ProcessParameters.BULK_INGEST_COMMIT_SIZE;
200         logger.debug("commitSize = " + commitSize);
201         logger.debug("bulkIngestNIndex(" + requestDocuments.size() + ") START");
202         logger.debug("BULK_INGEST_IS_LINKING_ENABLED=" + ProcessParameters.BULK_INGEST_IS_LINKING_ENABLED);
203         StopWatch totalTimer = new StopWatch();
204         long recCount = requestDocuments.size();
205         boolean isCommit = false;
206         totalTimer.start();
207         TransactionManager transactionManager = bulkProcessRequest.getTransactionManager();
208         try {
209             transactionManager.batchIngest(bulkProcessRequest, requestDocuments);
210 
211             bulkIngestStatistics.setCommitRecCount(bulkIngestStatistics.getCommitRecCount() + recCount);
212             if (bulkIngestStatistics.getCommitRecCount() == commitSize || bulkIngestStatistics.isLastBatch()) {
213                 isCommit = true;
214             }
215             if (isCommit) {
216                 logger.info("Bulk ingest: Commit started. Number of records being committed : " + bulkIngestStatistics
217                         .getCommitRecCount());
218                 transactionManager.commit();
219                 bulkIngestStatistics.setCommitRecCount(0);
220             }
221 
222             // Documents processed can be different from records processed as in the case of Instance data.
223             logger.debug("Documents processed: " + recCount);
224             bulkIngestStatistics.setFileRecCount(bulkIngestStatistics.getFileRecCount() + recCount);
225             logger.info(
226                     "Bulk ingest: Records processed in the current file :" + bulkIngestStatistics.getFileRecCount());
227         }
228         catch (Exception e) {
229             transactionManager.abort();
230             bulkIngestStatistics.setCommitRecCount(0);
231             logger.error("Document Ingest & Index Failed, Cause: " + e.getMessage(), e);
232         }
233         totalTimer.stop();
234         batchStatistics.setIngestingTime(
235                 batchStatistics.getTimeToCreateNodesInJcr() + batchStatistics.getTimeToSaveJcrSession());
236         batchStatistics
237                 .setIndexingTime(batchStatistics.getTimeToIndexSolrInputDocs() + batchStatistics.getTimeToSolrCommit());
238         batchStatistics.setIngestNIndexTotalTime(totalTimer.getTime());
239     }
240 
241 
242     public void setRepositoryManager(RepositoryManager repositoryManager) {
243         this.repositoryManager = repositoryManager;
244     }
245 
246     /*
247      *
248      *
249      * @param request
250      */
251 
252     /**
253      * Validates the given request for normal processing.
254      *
255      * @param request
256      * @throws OleDocStoreException - if the operation is invalid
257      *                              - if no documents are specified
258      */
259     private void validateInput(Request request) throws OleDocStoreException {
260         // TODO: Implement now.
261     }
262 
263     /**
264      * Verifies whether the user is authenticated and authorized to execute the request.
265      *
266      * @param request
267      * @throws OleDocStoreException
268      */
269     private void authenticateAndAuthorizeUser(Request request) throws OleDocStoreException {
270         // TODO: Implement later.
271     }
272 
273     private void authenticateAndAuthorizeUser(BulkProcessRequest request) throws OleDocStoreException {
274         // TODO: Implement later.
275     }
276 
277     /**
278      * Validates the given request for bulk processing.
279      *
280      * @param request
281      * @throws OleDocStoreException - if the operation is invalid
282      *                              - if no documents are specified
283      *                              - if all documents are not of same [cat-type-format]
284      */
285     private void validateBulkProcessInput(BulkProcessRequest request) throws OleDocStoreException {
286         // TODO: Implement now.
287     }
288 
289 }