1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  package org.kuali.rice.kew.batch;
18  
19  import java.io.BufferedReader;
20  import java.io.File;
21  import java.io.FileReader;
22  import java.io.FileWriter;
23  import java.io.IOException;
24  import java.text.Format;
25  import java.text.SimpleDateFormat;
26  import java.util.ArrayList;
27  import java.util.Arrays;
28  import java.util.Calendar;
29  import java.util.Collection;
30  import java.util.Date;
31  import java.util.Iterator;
32  import java.util.List;
33  
34  import org.apache.commons.io.IOUtils;
35  import org.kuali.rice.kew.service.KEWServiceLocator;
36  
37  
38  
39  
40  
41  
42  
43  
44  
45  
46  
47  
48  
49  
50  
51  public class MyXMLPollerServiceImpl implements XmlPollerService {
52  
53      private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger.getLogger(XmlPollerServiceImpl.class);
54      private static final Format DIR_FORMAT = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss-SSS");
55  
56      
57  
58  
59      private int pollIntervalSecs = 5 * 60; 
60      
61  
62  
63      private int initialDelaySecs = 30; 
64      
65  
66  
67      private String xmlPendingLocation;
68      
69  
70  
71      private String xmlCompletedLocation;
72      
73  
74  
75      private String xmlProblemLocation;
76  
77      private String xmlParentDirectory;
78      private static final String PENDING_MOVE_FAILED_ARCHIVE_FILE = "movesfailed";
79      private static final String NEW_LINE = "\n";
80  
81      @Override
82      public void run() {
83          
84          
85          
86          
87          LOG.debug("checking for xml data files...");
88          File[] files = getXmlPendingDir().listFiles();
89          if (files == null || files.length == 0) {
90              return;
91          }
92          
93          Arrays.sort(files);
94  
95          LOG.info("Found " + files.length + " files to ingest.");
96  
97          List<XmlDocCollection> collections = new ArrayList<XmlDocCollection>();
98          for (File file : files) {
99              if (file.isDirectory()) {
100                 collections.add(new DirectoryXmlDocCollection(file));
101             } else if (file.getName().equals(PENDING_MOVE_FAILED_ARCHIVE_FILE)) {
102                 
103                 continue;
104             } else if (file.getName().toLowerCase().endsWith(".zip")) {
105                 try {
106                     collections.add(new ZipXmlDocCollection(file));
107                 } catch (IOException ioe) {
108                     LOG.error("Unable to load file: " + file);
109                 }
110             } else if (file.getName().endsWith(".xml")) {
111                 collections.add(new FileXmlDocCollection(file));
112             } else {
113                 LOG.warn("Ignoring extraneous file in xml pending directory: " + file);
114             }
115         }
116 
117         
118         Iterator<?> collectionsIt = collections.iterator();
119         Collection<XmlDocCollection> culled = new ArrayList<XmlDocCollection>();
120         while (collectionsIt.hasNext()) {
121             XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
122             
123             if (inPendingMoveFailedArchive(container.getFile())) {
124                 LOG.info("Ignoring previously processed resource: " + container);
125                 culled.add(container);
126             }
127         }
128         collections.removeAll(culled);
129 
130         if (collections.size() == 0) {
131             LOG.debug("No valid new resources found to ingest");
132             return;
133         }
134 
135         Date LOAD_TIME = Calendar.getInstance().getTime();
136         
137         File completeDir = new File(getXmlCompleteDir(), DIR_FORMAT.format(LOAD_TIME));
138         File failedDir = new File(getXmlProblemDir(), DIR_FORMAT.format(LOAD_TIME));
139 
140         
141         Collection<?> failed = null;
142         try {
143             failed = KEWServiceLocator.getXmlIngesterService().ingest(collections);
144         } catch (Exception e) {
145             LOG.error("Error ingesting data", e);
146             
147         }
148 
149         
150         LOG.info("Moving files...");
151         collectionsIt = collections.iterator();
152         while (collectionsIt.hasNext()) {
153             XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
154             LOG.debug("container: " + container);
155             try {
156                 
157                 
158                 container.close();
159             } catch (IOException ioe) {
160                 LOG.warn("Error closing " + container, ioe);
161             }
162             if (failed.contains(container)) {
163                 
164                 
165                 if (container.getFile() != null) {
166                     LOG.error("Moving " + container.getFile() + " to problem dir.");
167                     if ((!failedDir.isDirectory() && !failedDir.mkdirs()) || !moveFile(failedDir, container.getFile())) {
168                         LOG.error("Could not move: " + container.getFile());
169                         recordUnmovablePendingFile(container.getFile(), LOAD_TIME);
170                     }
171                 }
172             } else {
173                 if (container.getFile() != null) {
174                     LOG.info("Moving " + container.getFile() + " to loaded dir.");
175                     if ((!completeDir.isDirectory() && !completeDir.mkdirs())
176                             || !moveFile(completeDir, container.getFile())) {
177                         LOG.error("Could not move: " + container.getFile());
178                         recordUnmovablePendingFile(container.getFile(), LOAD_TIME);
179                     }
180                 }
181             }
182         }
183     }
184 
185     private boolean inPendingMoveFailedArchive(File xmlDataFile) {
186         if (xmlDataFile == null) {
187             return false;
188         }
189         File movesFailedFile = new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE);
190         if (!movesFailedFile.isFile()) {
191             return false;
192         }
193         BufferedReader inFile = null;
194         try {
195             inFile = new BufferedReader(new FileReader(movesFailedFile));
196             String line;
197             boolean found = false;
198             while ((line = inFile.readLine()) != null) {
199                 String trimmedLine = line.trim();
200                 if (trimmedLine.equals(xmlDataFile.getName()) || trimmedLine.startsWith(xmlDataFile.getName() + "=")) {
201                     found = true;
202                     break;
203                 }
204             }
205             return found;
206         } catch (IOException e) {
207             LOG.warn("Error reading file " + movesFailedFile);
208         } finally {
209             IOUtils.closeQuietly(inFile);
210         }
211         return false;
212     }
213 
214     private boolean recordUnmovablePendingFile(File unMovablePendingFile, Date dateLoaded) {
215         boolean recorded = false;
216         FileWriter archiveFile = null;
217         try {
218             archiveFile = new FileWriter(new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE), true);
219             archiveFile.write(unMovablePendingFile.getName() + "=" + dateLoaded.getTime() + NEW_LINE);
220             recorded = true;
221         } catch (IOException e) {
222             LOG.error("Unable to record unmovable pending file " + unMovablePendingFile.getName()
223                     + "in the archive file " + PENDING_MOVE_FAILED_ARCHIVE_FILE);
224         } finally {
225             if (archiveFile != null) {
226                 try {
227                     archiveFile.close();
228                 } catch (IOException ioe) {
229                     LOG.error("Error closing unmovable pending file", ioe);
230                 }
231             }
232         }
233         return recorded;
234     }
235 
236     private boolean moveFile(File toDirectory, File fileToMove) {
237         boolean moved = true;
238         if (!fileToMove.renameTo(new File(toDirectory.getPath(), fileToMove.getName()))) {
239             LOG.error("Unable to move file " + fileToMove.getName() + " to directory " + toDirectory.getPath());
240             moved = false;
241         }
242         return moved;
243     }
244 
245     private File getXmlPendingDir() {
246         return new File(getXmlPendingLocation());
247     }
248 
249     private File getXmlCompleteDir() {
250         return new File(getXmlCompletedLocation());
251     }
252 
253     private File getXmlProblemDir() {
254         return new File(getXmlProblemLocation());
255     }
256 
257     public String getXmlCompletedLocation() {
258         return xmlCompletedLocation;
259     }
260 
261     public void setXmlCompletedLocation(String xmlCompletedLocation) {
262         this.xmlCompletedLocation = xmlCompletedLocation;
263     }
264 
265     public String getXmlPendingLocation() {
266         return xmlPendingLocation;
267     }
268 
269     
270 
271 
272 
273 
274     public void setXmlPendingLocation(String xmlPendingLocation) {
275         this.xmlPendingLocation = xmlPendingLocation;
276     }
277 
278     public String getXmlProblemLocation() {
279         return xmlProblemLocation;
280     }
281 
282     public void setXmlProblemLocation(String xmlProblemLocation) {
283         this.xmlProblemLocation = xmlProblemLocation;
284     }
285 
286     public String getXmlParentDirectory() {
287         return xmlParentDirectory;
288     }
289 
290     public void setXmlParentDirectory(String xmlDataParentDirectory) {
291         this.xmlParentDirectory = xmlDataParentDirectory;
292     }
293 
294     
295 
296 
297 
298 
299 
300     public void setPollIntervalSecs(int seconds) {
301         this.pollIntervalSecs = seconds;
302     }
303 
304     
305 
306 
307 
308 
309     @Override
310     public int getPollIntervalSecs() {
311         return this.pollIntervalSecs;
312     }
313 
314     
315 
316 
317 
318 
319 
320     public void setInitialDelaySecs(int seconds) {
321         this.initialDelaySecs = seconds;
322     }
323 
324     
325 
326 
327 
328 
329     @Override
330     public int getInitialDelaySecs() {
331         return this.initialDelaySecs;
332     }
333 }