1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.ole.docstore.process;
17
18 import org.apache.camel.CamelContext;
19 import org.apache.camel.builder.RouteBuilder;
20 import org.apache.camel.component.file.FileEndpoint;
21 import org.apache.camel.component.file.GenericFile;
22 import org.apache.camel.component.file.GenericFileFilter;
23 import org.apache.camel.model.AggregateDefinition;
24 import org.apache.camel.model.RouteDefinition;
25 import org.apache.camel.model.SplitDefinition;
26 import org.apache.camel.model.ThreadsDefinition;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 import static org.kuali.ole.docstore.process.ProcessParameters.*;
31
32
33
34
35
36
37
38 public class BulkIngestNIndexRouteBuilder
39 extends RouteBuilder {
40
41 private static Logger log = LoggerFactory.getLogger(BulkIngestNIndexProcessor.class);
42 private String folder = null;
43 private String user = null;
44 private String action = null;
45 private FileEndpoint fEPoint = null;
46 private BulkIngestNIndexProcessor bulkIngestNIndexProcessor = null;
47
48
49
50
51
52
53
54
55
56 public BulkIngestNIndexRouteBuilder(String folder, String user, String action, CamelContext context) {
57 super(context);
58 this.folder = folder;
59 this.user = user;
60 this.action = action;
61 }
62
63
64
65
66
67
68 @Override
69 public void configure() throws Exception {
70 log.debug("Loading Bulk Ingest Process: @" + folder);
71 fEPoint = endpoint(
72 "file:" + folder + "?noop=false&sortBy=file:name&move=.done&delay=" + BULK_INGEST_POLL_INTERVAL,
73 FileEndpoint.class);
74 fEPoint.setFilter(new BulkIngestFileFilter());
75 RouteDefinition route = from(fEPoint);
76 route.setId(folder);
77 SplitDefinition split = route.split().tokenizeXML("ingestDocument");
78 split.streaming();
79 AggregateDefinition aggregator = split.aggregate(constant(true), new BodyAggregator());
80 aggregator.setParallelProcessing(BULK_PROCESSOR_MULTI_THREADED);
81 aggregator.completionPredicate(new SplitPredicate(BULK_PROCESSOR_SPLIT_SIZE));
82 ThreadsDefinition threads = aggregator.threads(BULK_PROCESSOR_THREADS_MIN, BULK_PROCESSOR_THREADS_MAX);
83 bulkIngestNIndexProcessor = new BulkIngestNIndexProcessor(user, action);
84 threads.process(bulkIngestNIndexProcessor);
85 threads.setThreadName("bulkIngest");
86 route.setErrorHandlerBuilder(DocStoreCamelContext.getInstance().getErrorHandler());
87 log.info("Loaded Bulk Ingest Process: @" + folder);
88 }
89
90 protected FileEndpoint getFileEndPoint() {
91 return fEPoint;
92 }
93
94 public class BulkIngestFileFilter
95 implements GenericFileFilter {
96
97 @Override
98 public boolean accept(GenericFile file) {
99 return file.getFileName().toLowerCase().endsWith(".xml");
100 }
101
102 }
103
104 public BulkIngestNIndexProcessor getBulkIngestNIndexProcessor() {
105 return bulkIngestNIndexProcessor;
106 }
107
108 public void setBulkIngestNIndexProcessor(BulkIngestNIndexProcessor bulkIngestNIndexProcessor) {
109 this.bulkIngestNIndexProcessor = bulkIngestNIndexProcessor;
110 }
111 }