1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.ole.docstore.process;
17
18 import org.apache.camel.CamelContext;
19 import org.apache.camel.builder.RouteBuilder;
20 import org.apache.camel.component.file.FileEndpoint;
21 import org.apache.camel.component.file.GenericFile;
22 import org.apache.camel.component.file.GenericFileFilter;
23 import org.apache.camel.model.AggregateDefinition;
24 import org.apache.camel.model.RouteDefinition;
25 import org.apache.camel.model.SplitDefinition;
26 import org.apache.camel.model.ThreadsDefinition;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 import static org.kuali.ole.docstore.process.ProcessParameters.*;
31
32
33
34
35
36
37
38 public class BulkIngestNIndexRouteBuilder extends RouteBuilder {
39
40 private static Logger log = LoggerFactory.getLogger(BulkIngestNIndexProcessor.class);
41 private String folder = null;
42 private String user = null;
43 private String action = null;
44 private FileEndpoint fEPoint = null;
45
46
47
48
49
50
51
52
53
54 public BulkIngestNIndexRouteBuilder(String folder, String user, String action, CamelContext context) {
55 super(context);
56 this.folder = folder;
57 this.user = user;
58 this.action = action;
59 }
60
61
62
63
64
65
66 @Override
67 public void configure() throws Exception {
68 log.debug("Loading Bulk Ingest Process: @" + folder);
69 fEPoint = endpoint("file:" + folder + "?noop=false&move=.done&delay=" + BULK_INGEST_POLL_INTERVAL, FileEndpoint.class);
70 fEPoint.setFilter(new BulkIngestFileFilter());
71 RouteDefinition route = from(fEPoint);
72 route.setId(folder);
73 SplitDefinition split = route.split().tokenizeXML("ingestDocument");
74 split.streaming();
75 AggregateDefinition aggregator = split.aggregate(constant(true), new BodyAggregator());
76 aggregator.setParallelProcessing(BULK_PROCESSOR_MULTI_THREADED);
77 aggregator.completionPredicate(new SplitPredicate(BULK_PROCESSOR_SPLIT_SIZE));
78 ThreadsDefinition threads = aggregator.threads(BULK_PROCESSOR_THREADS_MIN, BULK_PROCESSOR_THREADS_MAX);
79 threads.process(new BulkIngestNIndexProcessor(user, action));
80 threads.setThreadName("bulkIngest");
81 route.setErrorHandlerBuilder(DocStoreCamelContext.getInstance().getErrorHandler());
82 log.info("Loaded Bulk Ingest Process: @" + folder);
83 }
84
85 protected FileEndpoint getFileEndPoint() {
86 return fEPoint;
87 }
88
89 public class BulkIngestFileFilter implements GenericFileFilter {
90
91 @Override
92 public boolean accept(GenericFile file) {
93 return file.getFileName().toLowerCase().endsWith(".xml");
94 }
95
96 }
97 }