001/*
002 * Copyright 2012 The Kuali Foundation.
003 * 
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 * 
008 * http://www.opensource.org/licenses/ecl2.php
009 * 
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.kuali.ole.docstore.process;
017
018import org.apache.camel.Exchange;
019import org.apache.camel.impl.DefaultExchange;
020import org.apache.camel.processor.aggregate.AggregationStrategy;
021import org.kuali.ole.docstore.utility.BulkIngestStatistics;
022import org.kuali.ole.docstore.utility.FileIngestStatistics;
023import org.slf4j.Logger;
024import org.slf4j.LoggerFactory;
025
026import java.util.ArrayList;
027import java.util.List;
028
029/**
030 * Class to Aggregate message bodies.
031 *
032 * @author Rajesh Chowdary K
033 * @created Jun 11, 2012
034 */
035public class BodyAggregator
036        implements AggregationStrategy {
037    private Logger logger = LoggerFactory.getLogger(BodyAggregator.class);
038    private static Integer start = 0;
039    private BulkIngestStatistics bulkIngestStatistics = BulkIngestStatistics.getInstance();
040    private FileIngestStatistics fileIngestStatistics = null;
041
042    public BodyAggregator() {
043    }
044
045    @SuppressWarnings("unchecked")
046    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
047
048        if (start.equals(newExchange.getProperty("CamelSplitIndex"))) {
049            fileIngestStatistics = bulkIngestStatistics.startFile();
050            bulkIngestStatistics.setFirstBatch(true);
051            bulkIngestStatistics.setFileIngestStatistics(fileIngestStatistics);
052            logger.info("Bulk ingest: File started : " + newExchange.getProperty("CamelFileExchangeFile"));
053        }
054
055        if (oldExchange == null) {
056            oldExchange = new DefaultExchange(newExchange);
057            oldExchange.getIn().setHeaders(newExchange.getIn().getHeaders());
058            List<Object> body = new ArrayList<Object>();
059            oldExchange.getIn().setBody(body);
060            oldExchange.getExchangeId();
061        }
062        oldExchange.getIn().getBody(List.class).add(newExchange.getIn().getBody());
063
064        for (String key : newExchange.getProperties().keySet()) {
065            oldExchange.setProperty(key, newExchange.getProperty(key));
066        }
067
068        return oldExchange;
069    }
070
071    public FileIngestStatistics getFileIngestStatistics() {
072        return fileIngestStatistics;
073    }
074
075    public void setFileIngestStatistics(FileIngestStatistics fileIngestStatistics) {
076        this.fileIngestStatistics = fileIngestStatistics;
077    }
078}