1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  package org.kuali.rice.kew.batch;
17  
18  import org.kuali.rice.core.api.CoreApiServiceLocator;
19  import org.kuali.rice.core.api.impex.xml.DirectoryXmlDocCollection;
20  import org.kuali.rice.core.api.impex.xml.FileXmlDocCollection;
21  import org.kuali.rice.core.api.impex.xml.XmlDocCollection;
22  import org.kuali.rice.core.api.impex.xml.ZipXmlDocCollection;
23  
24  import java.io.BufferedReader;
25  import java.io.File;
26  import java.io.FileReader;
27  import java.io.FileWriter;
28  import java.io.IOException;
29  import java.text.Format;
30  import java.text.SimpleDateFormat;
31  import java.util.ArrayList;
32  import java.util.Calendar;
33  import java.util.Collection;
34  import java.util.Date;
35  import java.util.Iterator;
36  import java.util.List;
37  
38  
39  
40  
41  
42  
43  
44  
45  
46  
47  
48  
49  
50  
51  
52  
53  
54  
55  
56  
57  public class XmlPollerServiceImpl implements XmlPollerService {
58  
59      private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger
60              .getLogger(XmlPollerServiceImpl.class);
61      private static final Format DIR_FORMAT = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss-SSS");
62  
63      
64  
65  
66      private int pollIntervalSecs = 5 * 60; 
67      
68  
69  
70      private int initialDelaySecs = 30; 
71      
72  
73  
74      private String xmlPendingLocation;
75      
76  
77  
78      private String xmlCompletedLocation;
79      
80  
81  
82      private String xmlProblemLocation;
83      
84      private String xmlParentDirectory;
85      private static final String PENDING_MOVE_FAILED_ARCHIVE_FILE = "movesfailed";
86      private static final String NEW_LINE = "\n";
87  
88      public void run() {
89          
90          
91          
92          
93          LOG.debug("checking for xml data files...");
94          File[] files = getXmlPendingDir().listFiles();
95          if (files == null || files.length == 0) {
96          	return;
97          }
98          LOG.info("Found " + files.length + " files to ingest.");
99          List<XmlDocCollection> collections = new ArrayList<XmlDocCollection>();
100         for (File file : files)
101         {
102             if (file.isDirectory())
103             {
104                 collections.add(new DirectoryXmlDocCollection(file));
105             } else if (file.getName().equals(PENDING_MOVE_FAILED_ARCHIVE_FILE))
106             {
107                 
108                 continue;
109             } else if (file.getName().toLowerCase().endsWith(".zip"))
110             {
111                 try
112                 {
113                     collections.add(new ZipXmlDocCollection(file));
114                 } catch (IOException ioe)
115                 {
116                     LOG.error("Unable to load file: " + file);
117                 }
118             } else if (file.getName().endsWith(".xml"))
119             {
120                 collections.add(new FileXmlDocCollection(file));
121             } else
122             {
123                 LOG.warn("Ignoring extraneous file in xml pending directory: " + file);
124             }
125         }
126 
127         
128         Iterator collectionsIt = collections.iterator();
129         Collection<XmlDocCollection> culled = new ArrayList<XmlDocCollection>();
130         while (collectionsIt.hasNext()) {
131             XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
132             
133             if (inPendingMoveFailedArchive(container.getFile())) {
134                 LOG.info("Ignoring previously processed resource: " + container);
135                 culled.add(container);
136             }
137         }
138         collections.removeAll(culled);
139 
140         if (collections.size() == 0) {
141             LOG.debug("No valid new resources found to ingest");
142             return;
143         }
144 
145         Date LOAD_TIME = Calendar.getInstance().getTime();
146         
147         File completeDir = new File(getXmlCompleteDir(), DIR_FORMAT.format(LOAD_TIME));
148         File failedDir = new File(getXmlProblemDir(), DIR_FORMAT.format(LOAD_TIME));
149 
150         
151         Collection failed = null;
152         try {
153             failed = CoreApiServiceLocator.getXmlIngesterService().ingest(collections);
154         } catch (Exception e) {
155             LOG.error("Error ingesting data", e);
156             
157         }
158     
159         
160         LOG.info("Moving files...");
161         collectionsIt = collections.iterator();
162         while (collectionsIt.hasNext()) {
163             XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
164             LOG.debug("container: " + container);
165             try {
166                 
167                 
168                 container.close();
169             } catch (IOException ioe) {
170                 LOG.warn("Error closing " + container, ioe);
171             }
172             if (failed.contains(container)) {
173                 
174                 
175                 if (container.getFile() != null) {
176                     LOG.error("Moving " + container.getFile() + " to problem dir.");
177                     if ((!failedDir.isDirectory() && !failedDir.mkdirs())
178                         || !moveFile(failedDir, container.getFile())) {
179                         LOG.error("Could not move: " + container.getFile());
180                         recordUnmovablePendingFile(container.getFile(), LOAD_TIME);         
181                     }
182                 }
183             } else {
184                 if (container.getFile() != null) {
185                     LOG.info("Moving " + container.getFile() + " to loaded dir.");
186                     if((!completeDir.isDirectory() && !completeDir.mkdirs())
187                         || !moveFile(completeDir, container.getFile())){
188                         LOG.error("Could not move: " + container.getFile());
189                         recordUnmovablePendingFile(container.getFile(), LOAD_TIME);         
190                     }
191                 }
192             }
193         }
194     }
195 
196     private boolean inPendingMoveFailedArchive(File xmlDataFile){
197         if (xmlDataFile == null) return false;
198         BufferedReader inFile = null;
199         File movesFailedFile = new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE);
200         if (!movesFailedFile.isFile()) return false;
201         try {
202             inFile = new BufferedReader(new FileReader(movesFailedFile));
203             String line;
204             
205             while((line = inFile.readLine()) != null){
206                 String trimmedLine = line.trim();
207                 if(trimmedLine.equals(xmlDataFile.getName()) ||
208                    trimmedLine.startsWith(xmlDataFile.getName() + "=")) { 
209                     return true;
210                 }
211             }
212         } catch (IOException e){
213             LOG.warn("Error reading file " + movesFailedFile);
214             
215         } finally {
216             if (inFile != null) try {
217                 inFile.close();
218             } catch (Exception e) {
219                 LOG.warn("Error closing buffered reader for " + movesFailedFile);
220             }
221         }
222       
223         return false;
224     }
225 
226     private boolean recordUnmovablePendingFile(File unMovablePendingFile, Date dateLoaded){
227         boolean recorded = false;
228         FileWriter archiveFile = null;
229         try{
230             archiveFile = new FileWriter(new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE), true);  
231             archiveFile.write(unMovablePendingFile.getName() + "=" + dateLoaded.getTime() + NEW_LINE);
232             recorded = true;
233         } catch (IOException e){
234             LOG.error("Unable to record unmovable pending file " + unMovablePendingFile.getName() + "in the archive file " + PENDING_MOVE_FAILED_ARCHIVE_FILE);
235         } finally {
236             if (archiveFile != null) {
237                 try {
238                     archiveFile.close();
239                 } catch (IOException ioe) {
240                     LOG.error("Error closing unmovable pending file", ioe);
241                 }
242             }
243         }
244         return recorded;       
245     }
246 
247     private boolean moveFile(File toDirectory, File fileToMove){
248         boolean moved = true;
249         if (!fileToMove.renameTo(new File(toDirectory.getPath(), fileToMove.getName()))){
250             LOG.error("Unable to move file " + fileToMove.getName() + " to directory " + toDirectory.getPath());
251             moved = false;
252         }
253         return moved;
254     }
255 
256     private File getXmlPendingDir() {
257         return new File(getXmlPendingLocation());
258     }
259 
260     private File getXmlCompleteDir() {
261         return new File(getXmlCompletedLocation());
262     }
263 
264     private File getXmlProblemDir() {
265         return new File(getXmlProblemLocation());
266     }
267 
268     public String getXmlCompletedLocation() {
269         return xmlCompletedLocation;
270     }
271 
272     public void setXmlCompletedLocation(String xmlCompletedLocation) {
273         this.xmlCompletedLocation = xmlCompletedLocation;
274     }
275 
276     public String getXmlPendingLocation() {
277         return xmlPendingLocation;
278     }
279 
280     
281 
282 
283 
284 
285     public void setXmlPendingLocation(String xmlPendingLocation) {
286         this.xmlPendingLocation = xmlPendingLocation;
287     }
288 
289     public String getXmlProblemLocation() {
290         return xmlProblemLocation;
291     }
292 
293     public void setXmlProblemLocation(String xmlProblemLocation) {
294         this.xmlProblemLocation = xmlProblemLocation;
295     }
296     public String getXmlParentDirectory() {
297         return xmlParentDirectory;
298     }
299     public void setXmlParentDirectory(String xmlDataParentDirectory) {
300         this.xmlParentDirectory = xmlDataParentDirectory;
301     }
302 
303     
304 
305 
306 
307     public void setPollIntervalSecs(int seconds) {
308         this.pollIntervalSecs = seconds;
309     }
310 
311     
312 
313 
314 
315     public int getPollIntervalSecs() {
316         return this.pollIntervalSecs;
317     }
318 
319     
320 
321 
322 
323     public void setInitialDelaySecs(int seconds) {
324         this.initialDelaySecs = seconds;
325     }
326 
327     
328 
329 
330 
331     public int getInitialDelaySecs() {
332         return this.initialDelaySecs;
333     }
334 }