1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.ole;
17
18 import org.kuali.rice.core.api.CoreApiServiceLocator;
19 import org.kuali.rice.core.api.impex.xml.DirectoryXmlDocCollection;
20 import org.kuali.rice.core.api.impex.xml.FileXmlDocCollection;
21 import org.kuali.rice.core.api.impex.xml.XmlDocCollection;
22 import org.kuali.rice.core.api.impex.xml.ZipXmlDocCollection;
23 import org.kuali.rice.kew.batch.XmlPollerService;
24
25 import java.io.BufferedReader;
26 import java.io.File;
27 import java.io.FileReader;
28 import java.io.FileWriter;
29 import java.io.IOException;
30 import java.text.Format;
31 import java.text.SimpleDateFormat;
32 import java.util.ArrayList;
33 import java.util.Arrays;
34 import java.util.Calendar;
35 import java.util.Collection;
36 import java.util.Date;
37 import java.util.Iterator;
38 import java.util.List;
39
40 public class OleXmlPollerServiceImpl implements XmlPollerService {
41
42 private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger.getLogger(OleXmlPollerServiceImpl.class);
43 private static final Format DIR_FORMAT = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss-SSS");
44
45
46
47
48 private int pollIntervalSecs = 5 * 60;
49
50
51
52 private int initialDelaySecs = 30;
53
54
55
56 private String xmlPendingLocation;
57
58
59
60 private String xmlCompletedLocation;
61
62
63
64 private String xmlProblemLocation;
65
66 private String xmlParentDirectory;
67 private static final String PENDING_MOVE_FAILED_ARCHIVE_FILE = "movesfailed";
68 private static final String NEW_LINE = "\n";
69
70 public void run() {
71
72
73
74
75 LOG.debug("checking for xml data files...");
76 File[] files = getXmlPendingDir().listFiles();
77 Arrays.sort(files);
78 if (files == null || files.length == 0) {
79 return;
80 }
81 LOG.info("Found " + files.length + " files to ingest.");
82 for (int i = 0; i < files.length; i++) {
83 LOG.info(files[i].getAbsolutePath());
84 }
85 List<XmlDocCollection> collections = new ArrayList<XmlDocCollection>();
86 for (File file : files) {
87 if (file.isDirectory()) {
88 collections.add(new DirectoryXmlDocCollection(file));
89 } else if (file.getName().equals(PENDING_MOVE_FAILED_ARCHIVE_FILE)) {
90
91 continue;
92 } else if (file.getName().toLowerCase().endsWith(".zip")) {
93 try {
94 collections.add(new ZipXmlDocCollection(file));
95 } catch (IOException ioe) {
96 LOG.error("Unable to load file: " + file);
97 }
98 } else if (file.getName().endsWith(".xml")) {
99 collections.add(new FileXmlDocCollection(file));
100 } else {
101 LOG.warn("Ignoring extraneous file in xml pending directory: " + file);
102 }
103 }
104
105
106 Iterator collectionsIt = collections.iterator();
107 Collection<XmlDocCollection> culled = new ArrayList<XmlDocCollection>();
108 while (collectionsIt.hasNext()) {
109 XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
110
111 if (inPendingMoveFailedArchive(container.getFile())) {
112 LOG.info("Ignoring previously processed resource: " + container);
113 culled.add(container);
114 }
115 }
116 collections.removeAll(culled);
117
118 if (collections.size() == 0) {
119 LOG.debug("No valid new resources found to ingest");
120 return;
121 }
122
123 Date LOAD_TIME = Calendar.getInstance().getTime();
124
125 File completeDir = new File(getXmlCompleteDir(), DIR_FORMAT.format(LOAD_TIME));
126 File failedDir = new File(getXmlProblemDir(), DIR_FORMAT.format(LOAD_TIME));
127
128
129 Collection failed = null;
130 try {
131 failed = CoreApiServiceLocator.getXmlIngesterService().ingest(collections);
132 } catch (Exception e) {
133 LOG.error("Error ingesting data", e);
134
135 }
136
137
138 LOG.info("Moving files...");
139 collectionsIt = collections.iterator();
140 while (collectionsIt.hasNext()) {
141 XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
142 LOG.debug("container: " + container);
143 try {
144
145
146 container.close();
147 } catch (IOException ioe) {
148 LOG.warn("Error closing " + container, ioe);
149 }
150 if (failed.contains(container)) {
151
152
153 if (container.getFile() != null) {
154 LOG.error("Moving " + container.getFile() + " to problem dir.");
155 if ((!failedDir.isDirectory() && !failedDir.mkdirs()) || !moveFile(failedDir, container.getFile())) {
156 LOG.error("Could not move: " + container.getFile());
157 recordUnmovablePendingFile(container.getFile(), LOAD_TIME);
158 }
159 }
160 } else {
161 if (container.getFile() != null) {
162 LOG.info("Moving " + container.getFile() + " to loaded dir.");
163 if ((!completeDir.isDirectory() && !completeDir.mkdirs()) || !moveFile(completeDir, container.getFile())) {
164 LOG.error("Could not move: " + container.getFile());
165 recordUnmovablePendingFile(container.getFile(), LOAD_TIME);
166 }
167 }
168 }
169 }
170 }
171
172 private boolean inPendingMoveFailedArchive(File xmlDataFile) {
173 if (xmlDataFile == null)
174 return false;
175 BufferedReader inFile = null;
176 File movesFailedFile = new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE);
177 if (!movesFailedFile.isFile())
178 return false;
179 try {
180 inFile = new BufferedReader(new FileReader(movesFailedFile));
181 String line;
182
183 while ((line = inFile.readLine()) != null) {
184 String trimmedLine = line.trim();
185 if (trimmedLine.equals(xmlDataFile.getName()) || trimmedLine.startsWith(xmlDataFile.getName() + "=")) {
186 return true;
187 }
188 }
189 } catch (IOException e) {
190 LOG.warn("Error reading file " + movesFailedFile);
191
192 } finally {
193 if (inFile != null)
194 try {
195 inFile.close();
196 } catch (Exception e) {
197 LOG.warn("Error closing buffered reader for " + movesFailedFile);
198 }
199 }
200
201 return false;
202 }
203
204 private boolean recordUnmovablePendingFile(File unMovablePendingFile, Date dateLoaded) {
205 boolean recorded = false;
206 FileWriter archiveFile = null;
207 try {
208 archiveFile = new FileWriter(new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE), true);
209 archiveFile.write(unMovablePendingFile.getName() + "=" + dateLoaded.getTime() + NEW_LINE);
210 recorded = true;
211 } catch (IOException e) {
212 LOG.error("Unable to record unmovable pending file " + unMovablePendingFile.getName() + "in the archive file " + PENDING_MOVE_FAILED_ARCHIVE_FILE);
213 } finally {
214 if (archiveFile != null) {
215 try {
216 archiveFile.close();
217 } catch (IOException ioe) {
218 LOG.error("Error closing unmovable pending file", ioe);
219 }
220 }
221 }
222 return recorded;
223 }
224
225 private boolean moveFile(File toDirectory, File fileToMove) {
226 boolean moved = true;
227 if (!fileToMove.renameTo(new File(toDirectory.getPath(), fileToMove.getName()))) {
228 LOG.error("Unable to move file " + fileToMove.getName() + " to directory " + toDirectory.getPath());
229 moved = false;
230 }
231 return moved;
232 }
233
234 private File getXmlPendingDir() {
235 return new File(getXmlPendingLocation());
236 }
237
238 private File getXmlCompleteDir() {
239 return new File(getXmlCompletedLocation());
240 }
241
242 private File getXmlProblemDir() {
243 return new File(getXmlProblemLocation());
244 }
245
246 public String getXmlCompletedLocation() {
247 return xmlCompletedLocation;
248 }
249
250 public void setXmlCompletedLocation(String xmlCompletedLocation) {
251 this.xmlCompletedLocation = xmlCompletedLocation;
252 }
253
254 public String getXmlPendingLocation() {
255 return xmlPendingLocation;
256 }
257
258
259
260
261
262
263 public void setXmlPendingLocation(String xmlPendingLocation) {
264 this.xmlPendingLocation = xmlPendingLocation;
265 }
266
267 public String getXmlProblemLocation() {
268 return xmlProblemLocation;
269 }
270
271 public void setXmlProblemLocation(String xmlProblemLocation) {
272 this.xmlProblemLocation = xmlProblemLocation;
273 }
274
275 public String getXmlParentDirectory() {
276 return xmlParentDirectory;
277 }
278
279 public void setXmlParentDirectory(String xmlDataParentDirectory) {
280 this.xmlParentDirectory = xmlDataParentDirectory;
281 }
282
283
284
285
286
287
288
289 public void setPollIntervalSecs(int seconds) {
290 this.pollIntervalSecs = seconds;
291 }
292
293
294
295
296
297
298 public int getPollIntervalSecs() {
299 return this.pollIntervalSecs;
300 }
301
302
303
304
305
306
307
308 public void setInitialDelaySecs(int seconds) {
309 this.initialDelaySecs = seconds;
310 }
311
312
313
314
315
316
317 public int getInitialDelaySecs() {
318 return this.initialDelaySecs;
319 }
320 }