1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.ole.docstore.process;
17
18 import org.apache.camel.Exchange;
19 import org.apache.camel.impl.DefaultExchange;
20 import org.apache.camel.processor.aggregate.AggregationStrategy;
21 import org.kuali.ole.docstore.utility.BulkIngestStatistics;
22 import org.kuali.ole.docstore.utility.FileIngestStatistics;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 import java.util.ArrayList;
27 import java.util.List;
28
29
30
31
32
33
34
35 public class BodyAggregator
36 implements AggregationStrategy {
37 private Logger logger = LoggerFactory.getLogger(BodyAggregator.class);
38 private static Integer start = 0;
39 private BulkIngestStatistics bulkIngestStatistics = BulkIngestStatistics.getInstance();
40 private FileIngestStatistics fileIngestStatistics = null;
41
42 public BodyAggregator() {
43 }
44
45 @SuppressWarnings("unchecked")
46 public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
47
48 if (start.equals(newExchange.getProperty("CamelSplitIndex"))) {
49 fileIngestStatistics = bulkIngestStatistics.startFile();
50 bulkIngestStatistics.setFirstBatch(true);
51 bulkIngestStatistics.setFileIngestStatistics(fileIngestStatistics);
52 logger.info("Bulk ingest: File started : " + newExchange.getProperty("CamelFileExchangeFile"));
53 }
54
55 if (oldExchange == null) {
56 oldExchange = new DefaultExchange(newExchange);
57 oldExchange.getIn().setHeaders(newExchange.getIn().getHeaders());
58 List<Object> body = new ArrayList<Object>();
59 oldExchange.getIn().setBody(body);
60 oldExchange.getExchangeId();
61 }
62 oldExchange.getIn().getBody(List.class).add(newExchange.getIn().getBody());
63
64 for (String key : newExchange.getProperties().keySet()) {
65 oldExchange.setProperty(key, newExchange.getProperty(key));
66 }
67
68 return oldExchange;
69 }
70
71 public FileIngestStatistics getFileIngestStatistics() {
72 return fileIngestStatistics;
73 }
74
75 public void setFileIngestStatistics(FileIngestStatistics fileIngestStatistics) {
76 this.fileIngestStatistics = fileIngestStatistics;
77 }
78 }