1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.kuali.rice.kew.batch;
18
19 import java.io.BufferedReader;
20 import java.io.File;
21 import java.io.FileReader;
22 import java.io.FileWriter;
23 import java.io.IOException;
24 import java.text.Format;
25 import java.text.SimpleDateFormat;
26 import java.util.ArrayList;
27 import java.util.Calendar;
28 import java.util.Collection;
29 import java.util.Date;
30 import java.util.Iterator;
31 import java.util.List;
32
33 import org.kuali.rice.core.api.CoreApiServiceLocator;
34 import org.kuali.rice.core.api.impex.xml.DirectoryXmlDocCollection;
35 import org.kuali.rice.core.api.impex.xml.FileXmlDocCollection;
36 import org.kuali.rice.core.api.impex.xml.XmlDocCollection;
37 import org.kuali.rice.core.api.impex.xml.XmlIngesterService;
38 import org.kuali.rice.core.api.impex.xml.ZipXmlDocCollection;
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 public class XmlPollerServiceImpl implements XmlPollerService {
60
61 private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger
62 .getLogger(XmlPollerServiceImpl.class);
63 private static final Format DIR_FORMAT = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss-SSS");
64
65
66
67
68 private int pollIntervalSecs = 5 * 60;
69
70
71
72 private int initialDelaySecs = 30;
73
74
75
76 private String xmlPendingLocation;
77
78
79
80 private String xmlCompletedLocation;
81
82
83
84 private String xmlProblemLocation;
85
86 private String xmlParentDirectory;
87 private static final String PENDING_MOVE_FAILED_ARCHIVE_FILE = "movesfailed";
88 private static final String NEW_LINE = "\n";
89
90 public void run() {
91
92
93
94
95 LOG.debug("checking for xml data files...");
96 File[] files = getXmlPendingDir().listFiles();
97 if (files == null || files.length == 0) {
98 return;
99 }
100 LOG.info("Found " + files.length + " files to ingest.");
101 List<XmlDocCollection> collections = new ArrayList<XmlDocCollection>();
102 for (File file : files)
103 {
104 if (file.isDirectory())
105 {
106 collections.add(new DirectoryXmlDocCollection(file));
107 } else if (file.getName().equals(PENDING_MOVE_FAILED_ARCHIVE_FILE))
108 {
109
110 continue;
111 } else if (file.getName().toLowerCase().endsWith(".zip"))
112 {
113 try
114 {
115 collections.add(new ZipXmlDocCollection(file));
116 } catch (IOException ioe)
117 {
118 LOG.error("Unable to load file: " + file);
119 }
120 } else if (file.getName().endsWith(".xml"))
121 {
122 collections.add(new FileXmlDocCollection(file));
123 } else
124 {
125 LOG.warn("Ignoring extraneous file in xml pending directory: " + file);
126 }
127 }
128
129
130 Iterator collectionsIt = collections.iterator();
131 Collection<XmlDocCollection> culled = new ArrayList<XmlDocCollection>();
132 while (collectionsIt.hasNext()) {
133 XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
134
135 if (inPendingMoveFailedArchive(container.getFile())) {
136 LOG.info("Ignoring previously processed resource: " + container);
137 culled.add(container);
138 }
139 }
140 collections.removeAll(culled);
141
142 if (collections.size() == 0) {
143 LOG.debug("No valid new resources found to ingest");
144 return;
145 }
146
147 Date LOAD_TIME = Calendar.getInstance().getTime();
148
149 File completeDir = new File(getXmlCompleteDir(), DIR_FORMAT.format(LOAD_TIME));
150 File failedDir = new File(getXmlProblemDir(), DIR_FORMAT.format(LOAD_TIME));
151
152
153 Collection failed = null;
154 try {
155 failed = CoreApiServiceLocator.getXmlIngesterService().ingest(collections);
156 } catch (Exception e) {
157 LOG.error("Error ingesting data", e);
158
159 }
160
161
162 LOG.info("Moving files...");
163 collectionsIt = collections.iterator();
164 while (collectionsIt.hasNext()) {
165 XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
166 LOG.debug("container: " + container);
167 try {
168
169
170 container.close();
171 } catch (IOException ioe) {
172 LOG.warn("Error closing " + container, ioe);
173 }
174 if (failed.contains(container)) {
175
176
177 if (container.getFile() != null) {
178 LOG.error("Moving " + container.getFile() + " to problem dir.");
179 if ((!failedDir.isDirectory() && !failedDir.mkdirs())
180 || !moveFile(failedDir, container.getFile())) {
181 LOG.error("Could not move: " + container.getFile());
182 recordUnmovablePendingFile(container.getFile(), LOAD_TIME);
183 }
184 }
185 } else {
186 if (container.getFile() != null) {
187 LOG.info("Moving " + container.getFile() + " to loaded dir.");
188 if((!completeDir.isDirectory() && !completeDir.mkdirs())
189 || !moveFile(completeDir, container.getFile())){
190 LOG.error("Could not move: " + container.getFile());
191 recordUnmovablePendingFile(container.getFile(), LOAD_TIME);
192 }
193 }
194 }
195 }
196 }
197
198 private boolean inPendingMoveFailedArchive(File xmlDataFile){
199 if (xmlDataFile == null) return false;
200 BufferedReader inFile = null;
201 File movesFailedFile = new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE);
202 if (!movesFailedFile.isFile()) return false;
203 try {
204 inFile = new BufferedReader(new FileReader(movesFailedFile));
205 String line;
206
207 while((line = inFile.readLine()) != null){
208 String trimmedLine = line.trim();
209 if(trimmedLine.equals(xmlDataFile.getName()) ||
210 trimmedLine.startsWith(xmlDataFile.getName() + "=")) {
211 return true;
212 }
213 }
214 } catch (IOException e){
215 LOG.warn("Error reading file " + movesFailedFile);
216
217 } finally {
218 if (inFile != null) try {
219 inFile.close();
220 } catch (Exception e) {
221 LOG.warn("Error closing buffered reader for " + movesFailedFile);
222 }
223 }
224
225 return false;
226 }
227
228 private boolean recordUnmovablePendingFile(File unMovablePendingFile, Date dateLoaded){
229 boolean recorded = false;
230 FileWriter archiveFile = null;
231 try{
232 archiveFile = new FileWriter(new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE), true);
233 archiveFile.write(unMovablePendingFile.getName() + "=" + dateLoaded.getTime() + NEW_LINE);
234 recorded = true;
235 } catch (IOException e){
236 LOG.error("Unable to record unmovable pending file " + unMovablePendingFile.getName() + "in the archive file " + PENDING_MOVE_FAILED_ARCHIVE_FILE);
237 } finally {
238 if (archiveFile != null) {
239 try {
240 archiveFile.close();
241 } catch (IOException ioe) {
242 LOG.error("Error closing unmovable pending file", ioe);
243 }
244 }
245 }
246 return recorded;
247 }
248
249 private boolean moveFile(File toDirectory, File fileToMove){
250 boolean moved = true;
251 if (!fileToMove.renameTo(new File(toDirectory.getPath(), fileToMove.getName()))){
252 LOG.error("Unable to move file " + fileToMove.getName() + " to directory " + toDirectory.getPath());
253 moved = false;
254 }
255 return moved;
256 }
257
258 private File getXmlPendingDir() {
259 return new File(getXmlPendingLocation());
260 }
261
262 private File getXmlCompleteDir() {
263 return new File(getXmlCompletedLocation());
264 }
265
266 private File getXmlProblemDir() {
267 return new File(getXmlProblemLocation());
268 }
269
270 public String getXmlCompletedLocation() {
271 return xmlCompletedLocation;
272 }
273
274 public void setXmlCompletedLocation(String xmlCompletedLocation) {
275 this.xmlCompletedLocation = xmlCompletedLocation;
276 }
277
278 public String getXmlPendingLocation() {
279 return xmlPendingLocation;
280 }
281
282
283
284
285
286
287 public void setXmlPendingLocation(String xmlPendingLocation) {
288 this.xmlPendingLocation = xmlPendingLocation;
289 }
290
291 public String getXmlProblemLocation() {
292 return xmlProblemLocation;
293 }
294
295 public void setXmlProblemLocation(String xmlProblemLocation) {
296 this.xmlProblemLocation = xmlProblemLocation;
297 }
298 public String getXmlParentDirectory() {
299 return xmlParentDirectory;
300 }
301 public void setXmlParentDirectory(String xmlDataParentDirectory) {
302 this.xmlParentDirectory = xmlDataParentDirectory;
303 }
304
305
306
307
308
309 public void setPollIntervalSecs(int seconds) {
310 this.pollIntervalSecs = seconds;
311 }
312
313
314
315
316
317 public int getPollIntervalSecs() {
318 return this.pollIntervalSecs;
319 }
320
321
322
323
324
325 public void setInitialDelaySecs(int seconds) {
326 this.initialDelaySecs = seconds;
327 }
328
329
330
331
332
333 public int getInitialDelaySecs() {
334 return this.initialDelaySecs;
335 }
336 }