1 package org.kuali.ole.docstore.service;
2
3 import org.apache.commons.lang.time.StopWatch;
4 import org.apache.cxf.common.util.StringUtils;
5 import org.kuali.ole.RepositoryManager;
6 import org.kuali.ole.docstore.OleDocStoreException;
7 import org.kuali.ole.docstore.factory.DocstoreFactory;
8 import org.kuali.ole.docstore.model.xmlpojo.ingest.Request;
9 import org.kuali.ole.docstore.model.xmlpojo.ingest.RequestDocument;
10 import org.kuali.ole.docstore.model.xmlpojo.ingest.Response;
11 import org.kuali.ole.docstore.model.xmlpojo.ingest.ResponseDocument;
12 import org.kuali.ole.docstore.model.xstream.ingest.ResponseHandler;
13 import org.kuali.ole.docstore.process.BulkIngestNIndexProcessor;
14 import org.kuali.ole.docstore.process.BulkLoadHandler;
15 import org.kuali.ole.docstore.process.DocStoreCamelContext;
16 import org.kuali.ole.docstore.process.ProcessParameters;
17 import org.kuali.ole.docstore.process.batch.BulkProcessRequest;
18 import org.kuali.ole.docstore.transaction.TransactionManager;
19 import org.kuali.ole.docstore.util.ItemExistsException;
20 import org.kuali.ole.docstore.utility.BatchIngestStatistics;
21 import org.kuali.ole.docstore.utility.BulkIngestStatistics;
22 import org.kuali.ole.pojo.OleException;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 import javax.jcr.RepositoryException;
27 import java.io.FileNotFoundException;
28 import java.util.ArrayList;
29 import java.util.List;
30 import java.util.Set;
31
32
33
34
35
36
37 public class DocumentServiceImpl
38 implements DocumentService {
39 private static Logger logger = LoggerFactory.getLogger(BulkIngestNIndexProcessor.class);
40 private static DocumentServiceImpl documentService = new DocumentServiceImpl();
41 private BulkProcessRequest bulkIngestRequest = null;
42
43
44
45
46
47
48 protected RepositoryManager repositoryManager;
49 private TransactionManager transactionManager;
50
51 public static DocumentServiceImpl getInstance() {
52 return documentService;
53 }
54
55 private DocumentServiceImpl() {
56 try {
57 this.repositoryManager = RepositoryManager.getRepositoryManager();
58 } catch (OleException oe) {
59
60
61 }
62 }
63
64
65
66
67 @Override
68 public Response process(Request request) throws OleDocStoreException, RepositoryException, OleException, FileNotFoundException {
69 authenticateAndAuthorizeUser(request);
70 validateInput(request);
71 DocstoreFactory docstoreFactory = BeanLocator.getDocstoreFactory();
72
73 List<RequestDocument> requestDocuments = request.getRequestDocuments();
74 if (transactionManager==null){
75 transactionManager = docstoreFactory.getTransactionManager(requestDocuments.get(0).getCategory(), requestDocuments.get(0).getType(), requestDocuments.get(0).getFormat());
76 }
77
78
79 List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
80 Response response = new Response();
81 response.setOperation(request.getOperation());
82 response.setUser(request.getUser());
83 try {
84 if (request.getOperation().equals(Request.Operation.ingest.toString())) {
85 transactionManager.startTransaction(request.getUser(), request.getOperation());
86 try {
87 responseDocuments = transactionManager.ingest(requestDocuments);
88 transactionManager.commit();
89 transactionManager.closeSession();
90 } catch (OleDocStoreException e) {
91 transactionManager.abort();
92 transactionManager.closeSession();
93 throw e;
94 }
95 response.setStatus("Success");
96 response.setMessage("Documents ingested.");
97 response.setStatusMessage("Documents ingested successfully.");
98 response.setDocuments(responseDocuments);
99 } else if (request.getOperation().equals(Request.Operation.checkIn.toString())) {
100 transactionManager.startTransaction(request.getUser(), request.getOperation());
101 try {
102 responseDocuments = transactionManager.checkIn(request.getRequestDocuments());
103 transactionManager.commit();
104 transactionManager.closeSession();
105 } catch (OleDocStoreException e) {
106 transactionManager.abort();
107 transactionManager.closeSession();
108 throw e;
109 }
110 response.setStatus("Success");
111 response.setMessage("Documents Checked In .");
112 response.setStatusMessage("Documents Checked In successfully.");
113 response.setDocuments(responseDocuments);
114 } else if (request.getOperation().equals(Request.Operation.checkOut.toString())) {
115 transactionManager.startSession(request.getUser(), request.getOperation());
116 responseDocuments = transactionManager.checkOut(request.getRequestDocuments(), request.getUser());
117 transactionManager.closeSession();
118 response.setStatus("Success");
119 response.setMessage("Documents Checked Out.");
120 response.setStatusMessage("Documents Checked Out successfully.");
121 response.setDocuments(responseDocuments);
122 } else if (request.getOperation().equalsIgnoreCase(Request.Operation.deleteVerify.toString())) {
123 transactionManager.startTransaction(request.getUser(), request.getOperation());
124 try {
125 responseDocuments = transactionManager.deleteVerify(request.getRequestDocuments());
126 response.setStatus("Success");
127 response.setMessage("Delete Verify success");
128 response.setStatusMessage("Delete Verify success");
129 response.setDocuments(responseDocuments);
130
131
132
133
134
135 logger.debug("deleteVerify toXML " + new ResponseHandler().toXML(response));
136
137 } catch (OleDocStoreException e) {
138 transactionManager.abort();
139 transactionManager.closeSession();
140 throw e;
141 }
142 } else if (request.getOperation().contains(Request.Operation.delete.toString())) {
143 transactionManager.startTransaction(request.getUser(), request.getOperation());
144 try {
145 responseDocuments = transactionManager.delete(request.getRequestDocuments());
146 transactionManager.commit();
147 transactionManager.closeSession();
148 } catch (OleDocStoreException e) {
149 transactionManager.abort();
150 transactionManager.closeSession();
151 throw e;
152 }
153 response.setStatus("Success");
154 response.setMessage("Documents deleted");
155 response.setStatusMessage("Documents deleted successfully");
156 response.setDocuments(responseDocuments);
157 } else if (request.getOperation().equalsIgnoreCase(Request.Operation.bind.toString())) {
158 transactionManager.startTransaction(request.getUser(), request.getOperation());
159 try {
160 responseDocuments = transactionManager.bind(request.getRequestDocuments(), request.getOperation());
161 transactionManager.commit();
162 transactionManager.closeSession();
163 } catch (Exception e) {
164 transactionManager.abort();
165 transactionManager.closeSession();
166 logger.error(e.getMessage() , e);
167 }
168 response.setStatus("Success");
169 response.setMessage("Documents bounded In .");
170 response.setStatusMessage("Documents bounded In successfully.");
171 response.setDocuments(responseDocuments);
172
173
174
175
176
177
178
179
180
181 } else if (request.getOperation().contains(Request.Operation.transferInstances.toString())) {
182 try {
183
184 transactionManager.startTransaction(request.getUser(), request.getOperation());
185 transactionManager.transferInstances(request.getRequestDocuments());
186 transactionManager.commit();
187 transactionManager.closeSession();
188 } catch (Exception e) {
189 transactionManager.abort();
190 transactionManager.closeSession();
191 logger.error(e.getMessage() , e);
192 }
193 } else if (request.getOperation().contains(Request.Operation.transferItems.toString())) {
194 try {
195 transactionManager.startTransaction(request.getUser(), request.getOperation());
196 transactionManager.transferItems(request.getRequestDocuments());
197 transactionManager.commit();
198 transactionManager.closeSession();
199 response.setStatus("success");
200 response.setMessage("Transfer Items success ");
201 response.setStatusMessage("Status : Transfer of items success ");
202 response.setDocuments(responseDocuments);
203 } catch (ItemExistsException itemExistsException) {
204 System.out.println("itemExistsException" + itemExistsException);
205 response.setStatus("failure");
206 response.setMessage("Transfer Items Failed " + itemExistsException.getMessage());
207 response.setStatusMessage("Status : Transfer of items failed " + itemExistsException.getMessage());
208 response.setDocuments(responseDocuments);
209 transactionManager.abort();
210 transactionManager.closeSession();
211 } catch (Exception e) {
212 logger.error(e.getMessage() , e);
213 transactionManager.abort();
214 transactionManager.closeSession();
215 }
216 }
217
218 } catch (OleDocStoreException ode) {
219 logger.error("", ode);
220 response.setStatus("Failed");
221 response.setMessage("Operation failed.");
222 response.setStatusMessage(ode.getMessage());
223 response.setDocuments(responseDocuments);
224 } catch (Exception e) {
225 logger.error(e.getMessage() , e);
226 }
227 return response;
228 }
229
230
231
232
233 @Override
234 public void bulkProcess(BulkProcessRequest request) throws Exception {
235 authenticateAndAuthorizeUser(request);
236
237
238
239 Response response = null;
240 if (request.getOperation().equals(BulkProcessRequest.BulkProcessOperation.INGEST)) {
241 response = bulkIngestManage(request);
242 }
243 }
244
245
246
247
248 private Response bulkIngestManage(BulkProcessRequest request) throws Exception {
249 if ((request.getAction()).equals(BulkProcessRequest.BulkProcessAction.START)) {
250 if (request.getDataFormat().equals(BulkProcessRequest.BulkIngestDataFormat.DOCSTORE)) {
251 if (bulkIngestRequest != null) {
252
253 throw new OleDocStoreException("Bulk ingest already running!");
254 } else {
255 BeanLocator.getBulkIngestProcessHandlerService().startBulkIngestForDocStoreRequestFormat(request.getBulkIngestFolder());
256 BulkLoadHandler bulkLoadHandler = BeanLocator.getBulkIngestProcessHandlerService().getLoadHandler();
257 BulkIngestNIndexProcessor bulkIngestNIndexProcessor = bulkLoadHandler.getBulkRoute()
258 .getBulkIngestNIndexProcessor();
259 request.setBulkIngestStatistics(BulkIngestStatistics.getInstance());
260 bulkIngestNIndexProcessor.setBulkProcessRequest(request);
261
262 bulkIngestRequest = request;
263 DocStoreCamelContext.getInstance().resume();
264 }
265 } else if (request.getDataFormat().equals(BulkProcessRequest.BulkIngestDataFormat.STANDARD)) {
266 String folder = request.getDataFolder();
267 if (folder != null && folder.trim().length() != 0) {
268 BeanLocator.getBulkIngestProcessHandlerService()
269 .startBulkIngestForStandardXMLFormat(request.getDataFolder(), request.getDocCategory(),
270 request.getDocType(), request.getDocFormat(), request.getBulkIngestFolder());
271 }
272 }
273 } else if ((request.getAction()).equals(BulkProcessRequest.BulkProcessAction.STATUS)) {
274 logger.info(bulkIngestRequest.getBulkIngestStatistics().getJsonString());
275 } else if ((request.getAction()).equals(BulkProcessRequest.BulkProcessAction.STOP)) {
276 DocStoreCamelContext.getInstance().suspend();
277 } else if ((request.getAction()).equals(BulkProcessRequest.BulkProcessAction.CLEAR)) {
278 bulkIngestRequest.getBulkIngestStatistics().clearBulkIngestStatistics();
279 }
280 return null;
281 }
282
283 public void bulkIngest(BulkProcessRequest bulkProcessRequest, List<RequestDocument> requestDocuments)
284 throws Exception {
285 TransactionManager transactionManager = bulkProcessRequest.getTransactionManager();
286 validateBulkProcessInput(bulkProcessRequest, requestDocuments);
287 if (null == transactionManager) {
288 transactionManager = BeanLocator.getDocstoreFactory().getTransactionManager(requestDocuments.get(0).getCategory(), requestDocuments.get(0).getType(), requestDocuments.get(0).getFormat());
289 transactionManager.startTransaction(bulkProcessRequest.getUser(),
290 "bulk" + bulkProcessRequest.getOperation().toString());
291 bulkProcessRequest.setTransactionManager(transactionManager);
292 }
293
294
295
296
297
298
299 batchIngest(bulkProcessRequest, requestDocuments);
300
301 }
302
303 public void batchIngest(BulkProcessRequest bulkProcessRequest, List<RequestDocument> requestDocuments) {
304 BulkIngestStatistics bulkIngestStatistics = bulkProcessRequest.getBulkIngestStatistics();
305 BatchIngestStatistics batchStatistics = bulkIngestStatistics.getCurrentBatch();
306 long commitSize = ProcessParameters.BULK_INGEST_COMMIT_SIZE;
307 logger.debug("commitSize = " + commitSize);
308 logger.debug("bulkIngestNIndex(" + requestDocuments.size() + ") START");
309 logger.debug("BULK_INGEST_IS_LINKING_ENABLED=" + ProcessParameters.BULK_INGEST_IS_LINKING_ENABLED);
310 StopWatch totalTimer = new StopWatch();
311 StopWatch sessionSaveTimer = new StopWatch();
312 long recCount = requestDocuments.size();
313 batchStatistics.setRecCount(recCount);
314
315 boolean isCommit = false;
316 totalTimer.start();
317 TransactionManager transactionManager = bulkProcessRequest.getTransactionManager();
318 try {
319 transactionManager.batchIngest(bulkProcessRequest, requestDocuments);
320 bulkIngestStatistics.setCommitRecCount(bulkIngestStatistics.getCommitRecCount() + recCount);
321 if (bulkIngestStatistics.getCommitRecCount() == commitSize || bulkIngestStatistics.isLastBatch()) {
322 isCommit = true;
323 }
324 if (isCommit) {
325 sessionSaveTimer.start();
326 logger.info("Bulk ingest: Commit started. Number of records being committed : " + bulkIngestStatistics
327 .getCommitRecCount());
328 transactionManager.commit();
329 bulkIngestStatistics.setCommitRecCount(0);
330 sessionSaveTimer.stop();
331 }
332
333
334 logger.debug("Documents processed: " + recCount);
335 bulkIngestStatistics.setFileRecCount(bulkIngestStatistics.getFileRecCount() + recCount);
336 logger.info(
337 "Bulk ingest: Records processed in the current file :" + bulkIngestStatistics.getFileRecCount());
338 } catch (Exception e) {
339 transactionManager.abort();
340 bulkIngestStatistics.setCommitRecCount(0);
341 logger.error("Document Ingest & Index Failed, Cause: " + e.getMessage(), e);
342 }
343 totalTimer.stop();
344 batchStatistics.setTimeToSaveJcrSession(sessionSaveTimer.getTime());
345 batchStatistics.setIngestingTime(
346 batchStatistics.getTimeToCreateNodesInJcr() + batchStatistics.getTimeToSaveJcrSession());
347 batchStatistics
348 .setIndexingTime(batchStatistics.getTimeToIndexSolrInputDocs() + batchStatistics.getTimeToSolrCommit());
349 batchStatistics.setIngestNIndexTotalTime(totalTimer.getTime());
350 }
351
352
353 public void setRepositoryManager(RepositoryManager repositoryManager) {
354 this.repositoryManager = repositoryManager;
355 }
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370 private void validateInput(Request request) throws OleDocStoreException {
371 Set<String> validOperationSet = Request.validOperationSet;
372 if (StringUtils.isEmpty(request.getUser())) {
373 throw new OleDocStoreException("User cannot be null or empty. Please verify input file.");
374 }
375
376 if (StringUtils.isEmpty(request.getOperation())) {
377 throw new OleDocStoreException("Operation cannot be null or empty. Please verify input file");
378 } else {
379
380 if (validOperationSet.contains(request.getOperation())) {
381 for (RequestDocument requestDocument : request.getRequestDocuments()) {
382 requestDocument.setUser(request.getUser());
383 requestDocument.setOperation(request.getOperation());
384 }
385 } else {
386 throw new OleDocStoreException("Not a valid Docstore operation:" + request.getOperation());
387 }
388 }
389 }
390
391
392
393
394
395
396
397 private void authenticateAndAuthorizeUser(Request request) throws OleDocStoreException {
398
399 }
400
401 private void authenticateAndAuthorizeUser(BulkProcessRequest request) throws OleDocStoreException {
402
403 }
404
405
406
407
408
409
410
411
412
413 private void validateBulkProcessInput(BulkProcessRequest request, List<RequestDocument> requestDocuments)
414 throws OleDocStoreException {
415
416 Set<String> validOperationSet = BulkProcessRequest.validOperationSet;
417 if (StringUtils.isEmpty(request.getUser())) {
418 request.setUser("BulkIngest-User");
419 }
420
421 if (StringUtils.isEmpty(request.getOperation().toString())) {
422 throw new OleDocStoreException("Operation cannot be null or empty. Please verify input file");
423 } else {
424
425 if (validOperationSet.contains(request.getOperation().toString())) {
426 for (RequestDocument requestDocument : requestDocuments) {
427 requestDocument.setUser(request.getUser());
428 requestDocument.setOperation(request.getOperation().toString());
429 }
430 } else {
431 throw new OleDocStoreException("Not a valid Docstore operation:" + request.getOperation());
432 }
433 }
434 }
435
436 public BulkProcessRequest getBulkIngestRequest() {
437 return bulkIngestRequest;
438 }
439
440 public void setBulkIngestRequest(BulkProcessRequest bulkIngestRequest) {
441 this.bulkIngestRequest = bulkIngestRequest;
442 }
443
444 public void setTransactionManager(TransactionManager transactionManager) {
445 this.transactionManager = transactionManager;
446 }
447
448 }