View Javadoc
1   /*
2    * Copyright 2012 The Kuali Foundation.
3    * 
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    * http://www.opensource.org/licenses/ecl2.php
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.ole.docstore.process;
17  
18  import org.apache.camel.Exchange;
19  import org.apache.camel.impl.DefaultExchange;
20  import org.apache.camel.processor.aggregate.AggregationStrategy;
21  import org.kuali.ole.docstore.utility.BulkIngestStatistics;
22  import org.kuali.ole.docstore.utility.FileIngestStatistics;
23  import org.slf4j.Logger;
24  import org.slf4j.LoggerFactory;
25  
26  import java.util.ArrayList;
27  import java.util.List;
28  
29  /**
30   * Class to Aggregate message bodies.
31   *
32   * @author Rajesh Chowdary K
33   * @created Jun 11, 2012
34   */
35  public class BodyAggregator
36          implements AggregationStrategy {
37      private Logger logger = LoggerFactory.getLogger(BodyAggregator.class);
38      private static Integer start = 0;
39      private BulkIngestStatistics bulkIngestStatistics = BulkIngestStatistics.getInstance();
40      private FileIngestStatistics fileIngestStatistics = null;
41  
42      public BodyAggregator() {
43      }
44  
45      @SuppressWarnings("unchecked")
46      public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
47  
48          if (start.equals(newExchange.getProperty("CamelSplitIndex"))) {
49              fileIngestStatistics = bulkIngestStatistics.startFile();
50              bulkIngestStatistics.setFirstBatch(true);
51              bulkIngestStatistics.setFileIngestStatistics(fileIngestStatistics);
52              logger.info("Bulk ingest: File started : " + newExchange.getProperty("CamelFileExchangeFile"));
53          }
54  
55          if (oldExchange == null) {
56              oldExchange = new DefaultExchange(newExchange);
57              oldExchange.getIn().setHeaders(newExchange.getIn().getHeaders());
58              List<Object> body = new ArrayList<Object>();
59              oldExchange.getIn().setBody(body);
60              oldExchange.getExchangeId();
61          }
62          oldExchange.getIn().getBody(List.class).add(newExchange.getIn().getBody());
63  
64          for (String key : newExchange.getProperties().keySet()) {
65              oldExchange.setProperty(key, newExchange.getProperty(key));
66          }
67  
68          return oldExchange;
69      }
70  
71      public FileIngestStatistics getFileIngestStatistics() {
72          return fileIngestStatistics;
73      }
74  
75      public void setFileIngestStatistics(FileIngestStatistics fileIngestStatistics) {
76          this.fileIngestStatistics = fileIngestStatistics;
77      }
78  }