001/* 002 * Copyright 2012 The Kuali Foundation. 003 * 004 * Licensed under the Educational Community License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/ecl2.php 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.kuali.ole.docstore.process; 017 018import org.apache.camel.Exchange; 019import org.apache.camel.impl.DefaultExchange; 020import org.apache.camel.processor.aggregate.AggregationStrategy; 021import org.kuali.ole.docstore.utility.BulkIngestStatistics; 022import org.kuali.ole.docstore.utility.FileIngestStatistics; 023import org.slf4j.Logger; 024import org.slf4j.LoggerFactory; 025 026import java.util.ArrayList; 027import java.util.List; 028 029/** 030 * Class to Aggregate message bodies. 031 * 032 * @author Rajesh Chowdary K 033 * @created Jun 11, 2012 034 */ 035public class BodyAggregator 036 implements AggregationStrategy { 037 private Logger logger = LoggerFactory.getLogger(BodyAggregator.class); 038 private static Integer start = 0; 039 private BulkIngestStatistics bulkIngestStatistics = BulkIngestStatistics.getInstance(); 040 private FileIngestStatistics fileIngestStatistics = null; 041 042 public BodyAggregator() { 043 } 044 045 @SuppressWarnings("unchecked") 046 public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 047 048 if (start.equals(newExchange.getProperty("CamelSplitIndex"))) { 049 fileIngestStatistics = bulkIngestStatistics.startFile(); 050 bulkIngestStatistics.setFirstBatch(true); 051 bulkIngestStatistics.setFileIngestStatistics(fileIngestStatistics); 052 logger.info("Bulk ingest: File started : " + newExchange.getProperty("CamelFileExchangeFile")); 053 } 054 055 if (oldExchange == null) { 056 oldExchange = new DefaultExchange(newExchange); 057 oldExchange.getIn().setHeaders(newExchange.getIn().getHeaders()); 058 List<Object> body = new ArrayList<Object>(); 059 oldExchange.getIn().setBody(body); 060 oldExchange.getExchangeId(); 061 } 062 oldExchange.getIn().getBody(List.class).add(newExchange.getIn().getBody()); 063 064 for (String key : newExchange.getProperties().keySet()) { 065 oldExchange.setProperty(key, newExchange.getProperty(key)); 066 } 067 068 return oldExchange; 069 } 070 071 public FileIngestStatistics getFileIngestStatistics() { 072 return fileIngestStatistics; 073 } 074 075 public void setFileIngestStatistics(FileIngestStatistics fileIngestStatistics) { 076 this.fileIngestStatistics = fileIngestStatistics; 077 } 078}