1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.kuali.rice.kew.batch;
18
19 import org.kuali.rice.kew.service.KEWServiceLocator;
20
21 import java.io.*;
22 import java.text.Format;
23 import java.text.SimpleDateFormat;
24 import java.util.*;
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45 public class XmlPollerServiceImpl implements XmlPollerService {
46
47 private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger
48 .getLogger(XmlPollerServiceImpl.class);
49 private static final Format DIR_FORMAT = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss-SSS");
50
51
52
53
54 private int pollIntervalSecs = 5 * 60;
55
56
57
58 private int initialDelaySecs = 30;
59
60
61
62 private String xmlPendingLocation;
63
64
65
66 private String xmlCompletedLocation;
67
68
69
70 private String xmlProblemLocation;
71
72 private String xmlParentDirectory;
73 private static final String PENDING_MOVE_FAILED_ARCHIVE_FILE = "movesfailed";
74 private static final String NEW_LINE = "\n";
75
76 public void run() {
77
78
79
80
81 LOG.debug("checking for xml data files...");
82 File[] files = getXmlPendingDir().listFiles();
83 if (files == null || files.length == 0) {
84 return;
85 }
86 LOG.info("Found " + files.length + " files to ingest.");
87 List<XmlDocCollection> collections = new ArrayList<XmlDocCollection>();
88 for (File file : files)
89 {
90 if (file.isDirectory())
91 {
92 collections.add(new DirectoryXmlDocCollection(file));
93 } else if (file.getName().equals(PENDING_MOVE_FAILED_ARCHIVE_FILE))
94 {
95
96 continue;
97 } else if (file.getName().toLowerCase().endsWith(".zip"))
98 {
99 try
100 {
101 collections.add(new ZipXmlDocCollection(file));
102 } catch (IOException ioe)
103 {
104 LOG.error("Unable to load file: " + file);
105 }
106 } else if (file.getName().endsWith(".xml"))
107 {
108 collections.add(new FileXmlDocCollection(file));
109 } else
110 {
111 LOG.warn("Ignoring extraneous file in xml pending directory: " + file);
112 }
113 }
114
115
116 Iterator collectionsIt = collections.iterator();
117 Collection<XmlDocCollection> culled = new ArrayList<XmlDocCollection>();
118 while (collectionsIt.hasNext()) {
119 XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
120
121 if (inPendingMoveFailedArchive(container.getFile())) {
122 LOG.info("Ignoring previously processed resource: " + container);
123 culled.add(container);
124 }
125 }
126 collections.removeAll(culled);
127
128 if (collections.size() == 0) {
129 LOG.debug("No valid new resources found to ingest");
130 return;
131 }
132
133 Date LOAD_TIME = Calendar.getInstance().getTime();
134
135 File completeDir = new File(getXmlCompleteDir(), DIR_FORMAT.format(LOAD_TIME));
136 File failedDir = new File(getXmlProblemDir(), DIR_FORMAT.format(LOAD_TIME));
137
138
139 Collection failed = null;
140 try {
141 failed = KEWServiceLocator.getXmlIngesterService().ingest(collections);
142 } catch (Exception e) {
143 LOG.error("Error ingesting data", e);
144
145 }
146
147
148 LOG.info("Moving files...");
149 collectionsIt = collections.iterator();
150 while (collectionsIt.hasNext()) {
151 XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
152 LOG.debug("container: " + container);
153 try {
154
155
156 container.close();
157 } catch (IOException ioe) {
158 LOG.warn("Error closing " + container, ioe);
159 }
160 if (failed.contains(container)) {
161
162
163 if (container.getFile() != null) {
164 LOG.error("Moving " + container.getFile() + " to problem dir.");
165 if ((!failedDir.isDirectory() && !failedDir.mkdirs())
166 || !moveFile(failedDir, container.getFile())) {
167 LOG.error("Could not move: " + container.getFile());
168 recordUnmovablePendingFile(container.getFile(), LOAD_TIME);
169 }
170 }
171 } else {
172 if (container.getFile() != null) {
173 LOG.info("Moving " + container.getFile() + " to loaded dir.");
174 if((!completeDir.isDirectory() && !completeDir.mkdirs())
175 || !moveFile(completeDir, container.getFile())){
176 LOG.error("Could not move: " + container.getFile());
177 recordUnmovablePendingFile(container.getFile(), LOAD_TIME);
178 }
179 }
180 }
181 }
182 }
183
184 private boolean inPendingMoveFailedArchive(File xmlDataFile){
185 if (xmlDataFile == null) return false;
186 BufferedReader inFile = null;
187 File movesFailedFile = new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE);
188 if (!movesFailedFile.isFile()) return false;
189 try {
190 inFile = new BufferedReader(new FileReader(movesFailedFile));
191 String line;
192
193 while((line = inFile.readLine()) != null){
194 String trimmedLine = line.trim();
195 if(trimmedLine.equals(xmlDataFile.getName()) ||
196 trimmedLine.startsWith(xmlDataFile.getName() + "=")) {
197 return true;
198 }
199 }
200 } catch (IOException e){
201 LOG.warn("Error reading file " + movesFailedFile);
202
203 } finally {
204 if (inFile != null) try {
205 inFile.close();
206 } catch (Exception e) {
207 LOG.warn("Error closing buffered reader for " + movesFailedFile);
208 }
209 }
210
211 return false;
212 }
213
214 private boolean recordUnmovablePendingFile(File unMovablePendingFile, Date dateLoaded){
215 boolean recorded = false;
216 FileWriter archiveFile = null;
217 try{
218 archiveFile = new FileWriter(new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE), true);
219 archiveFile.write(unMovablePendingFile.getName() + "=" + dateLoaded.getTime() + NEW_LINE);
220 recorded = true;
221 } catch (IOException e){
222 LOG.error("Unable to record unmovable pending file " + unMovablePendingFile.getName() + "in the archive file " + PENDING_MOVE_FAILED_ARCHIVE_FILE);
223 } finally {
224 if (archiveFile != null) {
225 try {
226 archiveFile.close();
227 } catch (IOException ioe) {
228 LOG.error("Error closing unmovable pending file", ioe);
229 }
230 }
231 }
232 return recorded;
233 }
234
235 private boolean moveFile(File toDirectory, File fileToMove){
236 boolean moved = true;
237 if (!fileToMove.renameTo(new File(toDirectory.getPath(), fileToMove.getName()))){
238 LOG.error("Unable to move file " + fileToMove.getName() + " to directory " + toDirectory.getPath());
239 moved = false;
240 }
241 return moved;
242 }
243
244 private File getXmlPendingDir() {
245 return new File(getXmlPendingLocation());
246 }
247
248 private File getXmlCompleteDir() {
249 return new File(getXmlCompletedLocation());
250 }
251
252 private File getXmlProblemDir() {
253 return new File(getXmlProblemLocation());
254 }
255
256 public String getXmlCompletedLocation() {
257 return xmlCompletedLocation;
258 }
259
260 public void setXmlCompletedLocation(String xmlCompletedLocation) {
261 this.xmlCompletedLocation = xmlCompletedLocation;
262 }
263
264 public String getXmlPendingLocation() {
265 return xmlPendingLocation;
266 }
267
268
269
270
271
272
273 public void setXmlPendingLocation(String xmlPendingLocation) {
274 this.xmlPendingLocation = xmlPendingLocation;
275 }
276
277 public String getXmlProblemLocation() {
278 return xmlProblemLocation;
279 }
280
281 public void setXmlProblemLocation(String xmlProblemLocation) {
282 this.xmlProblemLocation = xmlProblemLocation;
283 }
284 public String getXmlParentDirectory() {
285 return xmlParentDirectory;
286 }
287 public void setXmlParentDirectory(String xmlDataParentDirectory) {
288 this.xmlParentDirectory = xmlDataParentDirectory;
289 }
290
291
292
293
294
295 public void setPollIntervalSecs(int seconds) {
296 this.pollIntervalSecs = seconds;
297 }
298
299
300
301
302
303 public int getPollIntervalSecs() {
304 return this.pollIntervalSecs;
305 }
306
307
308
309
310
311 public void setInitialDelaySecs(int seconds) {
312 this.initialDelaySecs = seconds;
313 }
314
315
316
317
318
319 public int getInitialDelaySecs() {
320 return this.initialDelaySecs;
321 }
322 }