1 package org.kuali.ole.docstore.service;
2
3 import org.apache.commons.lang.time.StopWatch;
4 import org.kuali.ole.RepositoryManager;
5 import org.kuali.ole.docstore.OleDocStoreException;
6 import org.kuali.ole.docstore.model.xmlpojo.ingest.Request;
7 import org.kuali.ole.docstore.model.xmlpojo.ingest.RequestDocument;
8 import org.kuali.ole.docstore.model.xmlpojo.ingest.Response;
9 import org.kuali.ole.docstore.model.xmlpojo.ingest.ResponseDocument;
10 import org.kuali.ole.docstore.process.BulkIngestNIndexProcessor;
11 import org.kuali.ole.docstore.process.BulkLoadHandler;
12 import org.kuali.ole.docstore.process.DocStoreCamelContext;
13 import org.kuali.ole.docstore.process.ProcessParameters;
14 import org.kuali.ole.docstore.process.batch.BulkProcessRequest;
15 import org.kuali.ole.docstore.transaction.TransactionManager;
16 import org.kuali.ole.docstore.utility.BatchIngestStatistics;
17 import org.kuali.ole.docstore.utility.BulkIngestStatistics;
18 import org.kuali.ole.pojo.OleException;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21
22 import java.util.ArrayList;
23 import java.util.List;
24
25
26
27
28
29
30 public class DocumentServiceImpl
31 implements DocumentService {
32 private static Logger logger = LoggerFactory.getLogger(BulkIngestNIndexProcessor.class);
33 private static DocumentServiceImpl documentService = new DocumentServiceImpl();
34 private BulkProcessRequest bulkIngestRequest = null;
35
36
37
38
39
40
41 protected RepositoryManager repositoryManager;
42
43 public static DocumentServiceImpl getInstance() {
44 return documentService;
45 }
46
47 private DocumentServiceImpl() {
48 try {
49 this.repositoryManager = RepositoryManager.getRepositoryManager();
50 }
51 catch (OleException oe) {
52
53
54 }
55 }
56
57
58
59
60 @Override
61 public Response process(Request request) throws OleDocStoreException {
62 authenticateAndAuthorizeUser(request);
63 validateInput(request);
64
65 TransactionManager transactionManager = new TransactionManager();
66 transactionManager.startTransaction(request.getUser(), request.getOperation());
67 List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
68 Response response = new Response();
69 response.setOperation(request.getOperation());
70 response.setUser(request.getUser());
71 try {
72 if (request.getOperation().equals(Request.Operation.ingest.toString())) {
73 responseDocuments = transactionManager.ingest(request.getRequestDocuments());
74 transactionManager.commit();
75
76 response.setStatus("Success");
77 response.setMessage("Documents ingested.");
78 response.setStatusMessage("Documents ingested successfully.");
79 response.setDocuments(responseDocuments);
80 }
81
82 else if (request.getOperation().equals(Request.Operation.checkIn.toString())) {
83 responseDocuments = transactionManager.checkIn(request.getRequestDocuments(), request.getOperation());
84 transactionManager.commit();
85
86 response.setStatus("Success");
87 response.setMessage("Documents Checked In .");
88 response.setStatusMessage("Documents Checked In successfully.");
89 response.setDocuments(responseDocuments);
90 }
91 else if (request.getOperation().equals(Request.Operation.checkOut.toString())) {
92 responseDocuments = transactionManager.checkOut(request.getRequestDocuments(), request.getUser());
93
94 response.setStatus("Success");
95 response.setMessage("Documents Checked Out.");
96 response.setStatusMessage("Documents Checked Out successfully.");
97 response.setDocuments(responseDocuments);
98 }
99 else if (request.getOperation().contains(Request.Operation.delete.toString())) {
100 responseDocuments = transactionManager.delete(request.getRequestDocuments());
101 transactionManager.commit();
102 response.setStatus("Success");
103 response.setMessage("Documents deleted");
104 response.setStatusMessage("Documents deleted successfully");
105 response.setDocuments(responseDocuments);
106 }
107 }
108 catch (OleDocStoreException ode) {
109 logger.error("", ode);
110 transactionManager.abort();
111 response.setStatus("Failure");
112 response.setMessage("Operation failed.");
113 response.setStatusMessage("Operation failed.");
114 response.setDocuments(responseDocuments);
115 }
116
117 return response;
118 }
119
120
121
122
123 @Override
124 public void bulkProcess(BulkProcessRequest request) throws Exception {
125 authenticateAndAuthorizeUser(request);
126 validateBulkProcessInput(request);
127
128 Response response = null;
129 if (request.getOperation().equals(BulkProcessRequest.BulkProcessOperation.INGEST)) {
130 response = bulkIngestManage(request);
131 }
132 }
133
134
135
136
137 private Response bulkIngestManage(BulkProcessRequest request) throws Exception {
138 if ((request.getAction()).equals(BulkProcessRequest.BulkProcessAction.START)) {
139 if (request.getDataFormat().equals(BulkProcessRequest.BulkIngestDataFormat.DOCSTORE)) {
140 if (bulkIngestRequest != null) {
141
142 throw new OleDocStoreException("Bulk ingest already running!");
143 }
144 else {
145 BeanLocator.getBulkIngestProcessHandlerService().startBulkIngestForDocStoreRequestFormat(null);
146 BulkLoadHandler bulkLoadHandler = BeanLocator.getBulkIngestProcessHandlerService().getLoadHandler();
147 BulkIngestNIndexProcessor bulkIngestNIndexProcessor = bulkLoadHandler.getBulkRoute()
148 .getBulkIngestNIndexProcessor();
149 request.setBulkIngestStatistics(BulkIngestStatistics.getInstance());
150 bulkIngestNIndexProcessor.setBulkProcessRequest(request);
151
152 bulkIngestRequest = request;
153 DocStoreCamelContext.getInstance().resume();
154 }
155 }
156 else if (request.getDataFormat().equals(BulkProcessRequest.BulkIngestDataFormat.STANDARD)) {
157 String folder = request.getDataFolder();
158 if (folder != null && folder.trim().length() != 0) {
159 BeanLocator.getBulkIngestProcessHandlerService()
160 .startBulkIngestForStandardXMLFormat(request.getDataFolder(), request.getDocCategory(),
161 request.getDocType(), request.getDocFormat(),null);
162 }
163 }
164 }
165 else if ((request.getAction()).equals(BulkProcessRequest.BulkProcessAction.STATUS)) {
166 logger.info(bulkIngestRequest.getBulkIngestStatistics().getJsonString());
167 }
168 else if ((request.getAction()).equals(BulkProcessRequest.BulkProcessAction.STOP)) {
169 DocStoreCamelContext.getInstance().suspend();
170 }
171 else if ((request.getAction()).equals(BulkProcessRequest.BulkProcessAction.CLEAR)) {
172 bulkIngestRequest.getBulkIngestStatistics().clearBulkIngestStatistics();
173 }
174 return null;
175 }
176
177 public void bulkIngest(BulkProcessRequest bulkProcessRequest, List<RequestDocument> requestDocuments)
178 throws OleDocStoreException {
179 TransactionManager transactionManager = bulkProcessRequest.getTransactionManager();
180 if (null == transactionManager) {
181 transactionManager = new TransactionManager();
182 transactionManager.startTransaction(bulkProcessRequest.getUser(),
183 "bulk" + bulkProcessRequest.getOperation().toString());
184 bulkProcessRequest.setTransactionManager(transactionManager);
185 }
186
187
188
189
190
191
192 batchIngest(bulkProcessRequest, requestDocuments);
193
194 }
195
196 public void batchIngest(BulkProcessRequest bulkProcessRequest, List<RequestDocument> requestDocuments) {
197 BulkIngestStatistics bulkIngestStatistics = bulkProcessRequest.getBulkIngestStatistics();
198 BatchIngestStatistics batchStatistics = bulkIngestStatistics.getCurrentBatch();
199 long commitSize = ProcessParameters.BULK_INGEST_COMMIT_SIZE;
200 logger.debug("commitSize = " + commitSize);
201 logger.debug("bulkIngestNIndex(" + requestDocuments.size() + ") START");
202 logger.debug("BULK_INGEST_IS_LINKING_ENABLED=" + ProcessParameters.BULK_INGEST_IS_LINKING_ENABLED);
203 StopWatch totalTimer = new StopWatch();
204 long recCount = requestDocuments.size();
205 boolean isCommit = false;
206 totalTimer.start();
207 TransactionManager transactionManager = bulkProcessRequest.getTransactionManager();
208 try {
209 transactionManager.batchIngest(bulkProcessRequest, requestDocuments);
210
211 bulkIngestStatistics.setCommitRecCount(bulkIngestStatistics.getCommitRecCount() + recCount);
212 if (bulkIngestStatistics.getCommitRecCount() == commitSize || bulkIngestStatistics.isLastBatch()) {
213 isCommit = true;
214 }
215 if (isCommit) {
216 logger.info("Bulk ingest: Commit started. Number of records being committed : " + bulkIngestStatistics
217 .getCommitRecCount());
218 transactionManager.commit();
219 bulkIngestStatistics.setCommitRecCount(0);
220 }
221
222
223 logger.debug("Documents processed: " + recCount);
224 bulkIngestStatistics.setFileRecCount(bulkIngestStatistics.getFileRecCount() + recCount);
225 logger.info(
226 "Bulk ingest: Records processed in the current file :" + bulkIngestStatistics.getFileRecCount());
227 }
228 catch (Exception e) {
229 transactionManager.abort();
230 bulkIngestStatistics.setCommitRecCount(0);
231 logger.error("Document Ingest & Index Failed, Cause: " + e.getMessage(), e);
232 }
233 totalTimer.stop();
234 batchStatistics.setIngestingTime(
235 batchStatistics.getTimeToCreateNodesInJcr() + batchStatistics.getTimeToSaveJcrSession());
236 batchStatistics
237 .setIndexingTime(batchStatistics.getTimeToIndexSolrInputDocs() + batchStatistics.getTimeToSolrCommit());
238 batchStatistics.setIngestNIndexTotalTime(totalTimer.getTime());
239 }
240
241
242 public void setRepositoryManager(RepositoryManager repositoryManager) {
243 this.repositoryManager = repositoryManager;
244 }
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259 private void validateInput(Request request) throws OleDocStoreException {
260
261 }
262
263
264
265
266
267
268
269 private void authenticateAndAuthorizeUser(Request request) throws OleDocStoreException {
270
271 }
272
273 private void authenticateAndAuthorizeUser(BulkProcessRequest request) throws OleDocStoreException {
274
275 }
276
277
278
279
280
281
282
283
284
285 private void validateBulkProcessInput(BulkProcessRequest request) throws OleDocStoreException {
286
287 }
288
289 }