1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  package org.kuali.ole.docstore.process;
17  
18  import org.apache.camel.Exchange;
19  import org.apache.camel.impl.DefaultExchange;
20  import org.apache.camel.processor.aggregate.AggregationStrategy;
21  import org.kuali.ole.docstore.utility.BulkIngestStatistics;
22  import org.kuali.ole.docstore.utility.FileIngestStatistics;
23  import org.slf4j.Logger;
24  import org.slf4j.LoggerFactory;
25  
26  import java.util.ArrayList;
27  import java.util.List;
28  
29  
30  
31  
32  
33  
34  
35  public class BodyAggregator
36          implements AggregationStrategy {
37      private Logger logger = LoggerFactory.getLogger(BodyAggregator.class);
38      private static Integer start = 0;
39      private BulkIngestStatistics bulkIngestStatistics = BulkIngestStatistics.getInstance();
40      private FileIngestStatistics fileIngestStatistics = null;
41  
42      public BodyAggregator() {
43      }
44  
45      @SuppressWarnings("unchecked")
46      public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
47  
48          if (start.equals(newExchange.getProperty("CamelSplitIndex"))) {
49              fileIngestStatistics = bulkIngestStatistics.startFile();
50              bulkIngestStatistics.setFirstBatch(true);
51              bulkIngestStatistics.setFileIngestStatistics(fileIngestStatistics);
52              logger.info("Bulk ingest: File started : " + newExchange.getProperty("CamelFileExchangeFile"));
53          }
54  
55          if (oldExchange == null) {
56              oldExchange = new DefaultExchange(newExchange);
57              oldExchange.getIn().setHeaders(newExchange.getIn().getHeaders());
58              List<Object> body = new ArrayList<Object>();
59              oldExchange.getIn().setBody(body);
60              oldExchange.getExchangeId();
61          }
62          oldExchange.getIn().getBody(List.class).add(newExchange.getIn().getBody());
63  
64          for (String key : newExchange.getProperties().keySet()) {
65              oldExchange.setProperty(key, newExchange.getProperty(key));
66          }
67  
68          return oldExchange;
69      }
70  
71      public FileIngestStatistics getFileIngestStatistics() {
72          return fileIngestStatistics;
73      }
74  
75      public void setFileIngestStatistics(FileIngestStatistics fileIngestStatistics) {
76          this.fileIngestStatistics = fileIngestStatistics;
77      }
78  }