1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.ole;
17
18 import org.kuali.rice.core.api.CoreApiServiceLocator;
19 import org.kuali.rice.core.api.impex.xml.*;
20 import org.kuali.rice.kew.batch.XmlPollerService;
21
22 import java.io.BufferedReader;
23 import java.io.File;
24 import java.io.FileReader;
25 import java.io.FileWriter;
26 import java.io.IOException;
27 import java.text.Format;
28 import java.text.SimpleDateFormat;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Calendar;
32 import java.util.Collection;
33 import java.util.Date;
34 import java.util.Iterator;
35 import java.util.List;
36
37 public abstract class OleXmlPollerServiceImpl implements OleXmlPollerService {
38
39
40
41 private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger.getLogger(OleXmlPollerServiceImpl.class);
42 private static final Format DIR_FORMAT = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss-SSS");
43
44
45
46
47 private int pollIntervalSecs = 5 * 60;
48
49
50
51 private int initialDelaySecs = 30;
52
53
54
55 private String xmlPendingLocation;
56
57
58
59 private String xmlCompletedLocation;
60
61
62
63 private String xmlProblemLocation;
64
65
66
67 private String xmlReportLocation;
68
69 private String xmlParentDirectory;
70 private static final String PENDING_MOVE_FAILED_ARCHIVE_FILE = "movesfailed";
71 private static final String NEW_LINE = "\n";
72
73 public void run() {
74
75
76
77
78 File[] files = getXmlPendingDir().listFiles();
79 Arrays.sort(files);
80 if (files == null || files.length == 0) {
81 return;
82 }
83 LOG.info("Found " + files.length + " files to ingest.");
84 for (int i = 0; i < files.length; i++) {
85 LOG.info(files[i].getAbsolutePath());
86 }
87 List<XmlDocCollection> collections = new ArrayList<XmlDocCollection>();
88 for (File file : files) {
89 if (file.isDirectory()) {
90 collections.add(new DirectoryXmlDocCollection(file));
91 } else if (file.getName().equals(PENDING_MOVE_FAILED_ARCHIVE_FILE)) {
92
93 continue;
94 } else if (file.getName().toLowerCase().endsWith(".zip")) {
95 try {
96 collections.add(new ZipXmlDocCollection(file));
97 } catch (IOException ioe) {
98 LOG.error("Unable to load file: " + file);
99 }
100 } else if (file.getName().endsWith(".xml")) {
101 collections.add(new FileXmlDocCollection(file));
102 } else {
103 LOG.warn("Ignoring extraneous file in xml pending directory: " + file);
104 }
105 }
106
107
108 Iterator collectionsIt = collections.iterator();
109 Collection<XmlDocCollection> culled = new ArrayList<XmlDocCollection>();
110 while (collectionsIt.hasNext()) {
111 XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
112
113 if (inPendingMoveFailedArchive(container.getFile())) {
114 LOG.info("Ignoring previously processed resource: " + container);
115 culled.add(container);
116 }
117 }
118 collections.removeAll(culled);
119
120 if (collections.size() == 0) {
121 LOG.debug("No valid new resources found to ingest");
122 return;
123 }
124
125 Date LOAD_TIME = Calendar.getInstance().getTime();
126
127 File completeDir = new File(getXmlCompleteDir(), DIR_FORMAT.format(LOAD_TIME));
128 File failedDir = new File(getXmlProblemDir(), DIR_FORMAT.format(LOAD_TIME));
129
130
131 Collection failed = null;
132 try {
133 failed = getIngesterService().ingest(collections);
134 } catch (Exception e) {
135 LOG.error("Error ingesting data", e);
136
137 }
138
139
140 LOG.info("Moving files...");
141 collectionsIt = collections.iterator();
142 while (collectionsIt.hasNext()) {
143 XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
144 LOG.debug("container: " + container);
145 try {
146
147
148 container.close();
149 } catch (IOException ioe) {
150 LOG.warn("Error closing " + container, ioe);
151 }
152 if (failed.contains(container)) {
153
154
155 if (container.getFile() != null) {
156 LOG.error("Moving " + container.getFile() + " to problem dir.");
157 if ((!failedDir.isDirectory() && !failedDir.mkdirs()) || !moveFile(failedDir, container.getFile())) {
158 LOG.error("Could not move: " + container.getFile());
159 recordUnmovablePendingFile(container.getFile(), LOAD_TIME);
160 }
161 }
162 } else {
163 if (container.getFile() != null) {
164 LOG.info("Moving " + container.getFile() + " to loaded dir.");
165 if ((!completeDir.isDirectory() && !completeDir.mkdirs()) || !moveFile(completeDir, container.getFile())) {
166 LOG.error("Could not move: " + container.getFile());
167 recordUnmovablePendingFile(container.getFile(), LOAD_TIME);
168 }
169 }
170 }
171 }
172 processReportContent(collections, DIR_FORMAT.format(LOAD_TIME));
173 }
174
175 protected void processReportContent(List<XmlDocCollection> collections, String loadTime) {
176
177 }
178
179 protected abstract XmlIngesterService getIngesterService();
180
181 private boolean inPendingMoveFailedArchive(File xmlDataFile) {
182 if (xmlDataFile == null)
183 return false;
184 BufferedReader inFile = null;
185 File movesFailedFile = new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE);
186 if (!movesFailedFile.isFile())
187 return false;
188 try {
189 inFile = new BufferedReader(new FileReader(movesFailedFile));
190 String line;
191
192 while ((line = inFile.readLine()) != null) {
193 String trimmedLine = line.trim();
194 if (trimmedLine.equals(xmlDataFile.getName()) || trimmedLine.startsWith(xmlDataFile.getName() + "=")) {
195 return true;
196 }
197 }
198 } catch (IOException e) {
199 LOG.warn("Error reading file " + movesFailedFile);
200
201 } finally {
202 if (inFile != null)
203 try {
204 inFile.close();
205 } catch (Exception e) {
206 LOG.warn("Error closing buffered reader for " + movesFailedFile);
207 }
208 }
209
210 return false;
211 }
212
213 private boolean recordUnmovablePendingFile(File unMovablePendingFile, Date dateLoaded) {
214 boolean recorded = false;
215 FileWriter archiveFile = null;
216 try {
217 archiveFile = new FileWriter(new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE), true);
218 archiveFile.write(unMovablePendingFile.getName() + "=" + dateLoaded.getTime() + NEW_LINE);
219 recorded = true;
220 } catch (IOException e) {
221 LOG.error("Unable to record unmovable pending file " + unMovablePendingFile.getName() + "in the archive file " + PENDING_MOVE_FAILED_ARCHIVE_FILE);
222 } finally {
223 if (archiveFile != null) {
224 try {
225 archiveFile.close();
226 } catch (IOException ioe) {
227 LOG.error("Error closing unmovable pending file", ioe);
228 }
229 }
230 }
231 return recorded;
232 }
233
234 private boolean moveFile(File toDirectory, File fileToMove) {
235 boolean moved = true;
236 if (!fileToMove.renameTo(new File(toDirectory.getPath(), fileToMove.getName()))) {
237 LOG.error("Unable to move file " + fileToMove.getName() + " to directory " + toDirectory.getPath());
238 moved = false;
239 }
240 return moved;
241 }
242
243 private File getXmlPendingDir() {
244 return new File(getXmlPendingLocation());
245 }
246
247 private File getXmlCompleteDir() {
248 return new File(getXmlCompletedLocation());
249 }
250
251 private File getXmlProblemDir() {
252 return new File(getXmlProblemLocation());
253 }
254
255 public String getXmlCompletedLocation() {
256 return xmlCompletedLocation;
257 }
258
259 public void setXmlCompletedLocation(String xmlCompletedLocation) {
260 this.xmlCompletedLocation = xmlCompletedLocation;
261 }
262
263 public String getXmlPendingLocation() {
264 return xmlPendingLocation;
265 }
266
267
268
269
270
271
272 public void setXmlPendingLocation(String xmlPendingLocation) {
273 this.xmlPendingLocation = xmlPendingLocation;
274 }
275
276 public String getXmlProblemLocation() {
277 return xmlProblemLocation;
278 }
279
280 public void setXmlProblemLocation(String xmlProblemLocation) {
281 this.xmlProblemLocation = xmlProblemLocation;
282 }
283
284 public String getXmlReportLocation() {
285 return xmlReportLocation;
286 }
287
288 public void setXmlReportLocation(String xmlReportLocation) {
289 this.xmlReportLocation = xmlReportLocation;
290 }
291
292 public String getXmlParentDirectory() {
293 return xmlParentDirectory;
294 }
295
296 public void setXmlParentDirectory(String xmlDataParentDirectory) {
297 this.xmlParentDirectory = xmlDataParentDirectory;
298 }
299
300
301
302
303
304
305
306 public void setPollIntervalSecs(int seconds) {
307 this.pollIntervalSecs = seconds;
308 }
309
310
311
312
313
314
315 public int getPollIntervalSecs() {
316 return this.pollIntervalSecs;
317 }
318
319
320
321
322
323
324
325 public void setInitialDelaySecs(int seconds) {
326 this.initialDelaySecs = seconds;
327 }
328
329
330
331
332
333
334 public int getInitialDelaySecs() {
335 return this.initialDelaySecs;
336 }
337 }