View Javadoc

1   /*
2    * Copyright 2005-2007 The Kuali Foundation
3    * 
4    * 
5    * Licensed under the Educational Community License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    * 
9    * http://www.opensource.org/licenses/ecl2.php
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.kuali.rice.kew.batch;
18  
19  import org.kuali.rice.kew.service.KEWServiceLocator;
20  
21  import java.io.*;
22  import java.text.Format;
23  import java.text.SimpleDateFormat;
24  import java.util.*;
25  
26  
27  /**
28   * Utility class responsible for polling and ingesting XML data files
29   * containing various forms of workflow engine data (e.g. document types
30   * and rules).
31   * Loaded files and problem files are placed into a subdirectory of a
32   * configured 'loaded' and 'problem' directory, respectively.
33   * "Problem-ness" is determined by inspecting a 'processed' flag on each <code>XmlDoc</code>
34   * in each collection.  If not all <code>XmlDoc</code>s are marked 'processed' an
35   * error is assumed, and the collection file (e.g. for a Zip, the Zip file) is moved
36   * to the 'problem' directory.
37   * As such, it is the <b><code>XmlIngesterService</code>'s responsibility</b> to mark
38   * any unknown or otherwise innocuous non-failure non-processed files, as 'processed'.
39   * A different mechanism can be developed if this proves to be a problem, but for now
40   * it is simple enough for the <code>XmlIngesterService</code> to determine this.
41   * @see org.kuali.rice.kew.batch.XmlPollerService
42   * @see org.kuali.rice.kew.batch.XmlIngesterServiceImpl
43   * @author Kuali Rice Team (rice.collab@kuali.org)
44   */
45  public class XmlPollerServiceImpl implements XmlPollerService {
46  
47      private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger
48              .getLogger(XmlPollerServiceImpl.class);
49      private static final Format DIR_FORMAT = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss-SSS");
50  
51      /**
52       * Specifies the polling interval that should be used with this task.
53       */
54      private int pollIntervalSecs = 5 * 60; // default to 5 minutes
55      /**
56       * Specifies the initial delay the poller should wait before starting to poll
57       */
58      private int initialDelaySecs = 30; // default to 30 seconds
59      /**
60       * Location in which to find XML files to load.
61       */
62      private String xmlPendingLocation;
63      /**
64       * Location in which to place successfully loaded XML files.
65       */
66      private String xmlCompletedLocation;
67      /**
68       * Location in which to place XML files which have failed to load.
69       */
70      private String xmlProblemLocation;
71      
72      private String xmlParentDirectory;
73      private static final String PENDING_MOVE_FAILED_ARCHIVE_FILE = "movesfailed";
74      private static final String NEW_LINE = "\n";
75  
76      public void run() {
77          // if(!directoriesWritable()){
78          //     LOG.error("Error writing to xml data directories. Stopping xmlLoader ...");
79          //     this.cancel();
80          // }
81          LOG.debug("checking for xml data files...");
82          File[] files = getXmlPendingDir().listFiles();
83          if (files == null || files.length == 0) {
84          	return;
85          }
86          LOG.info("Found " + files.length + " files to ingest.");
87          List<XmlDocCollection> collections = new ArrayList<XmlDocCollection>();
88          for (File file : files)
89          {
90              if (file.isDirectory())
91              {
92                  collections.add(new DirectoryXmlDocCollection(file));
93              } else if (file.getName().equals(PENDING_MOVE_FAILED_ARCHIVE_FILE))
94              {
95                  // the movesfailed file...ignore this
96                  continue;
97              } else if (file.getName().toLowerCase().endsWith(".zip"))
98              {
99                  try
100                 {
101                     collections.add(new ZipXmlDocCollection(file));
102                 } catch (IOException ioe)
103                 {
104                     LOG.error("Unable to load file: " + file);
105                 }
106             } else if (file.getName().endsWith(".xml"))
107             {
108                 collections.add(new FileXmlDocCollection(file));
109             } else
110             {
111                 LOG.warn("Ignoring extraneous file in xml pending directory: " + file);
112             }
113         }
114 
115         // Cull any resources which were already processed and whose moves failed
116         Iterator collectionsIt = collections.iterator();
117         Collection<XmlDocCollection> culled = new ArrayList<XmlDocCollection>();
118         while (collectionsIt.hasNext()) {
119             XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
120             // if a move has already failed for this archive, ignore it
121             if (inPendingMoveFailedArchive(container.getFile())) {
122                 LOG.info("Ignoring previously processed resource: " + container);
123                 culled.add(container);
124             }
125         }
126         collections.removeAll(culled);
127 
128         if (collections.size() == 0) {
129             LOG.debug("No valid new resources found to ingest");
130             return;
131         }
132 
133         Date LOAD_TIME = Calendar.getInstance().getTime();
134         // synchronization around date format should not be an issue as this code is single-threaded
135         File completeDir = new File(getXmlCompleteDir(), DIR_FORMAT.format(LOAD_TIME));
136         File failedDir = new File(getXmlProblemDir(), DIR_FORMAT.format(LOAD_TIME));
137 
138         // now ingest the containers
139         Collection failed = null;
140         try {
141             failed = KEWServiceLocator.getXmlIngesterService().ingest(collections);
142         } catch (Exception e) {
143             LOG.error("Error ingesting data", e);
144             //throw new RuntimeException(e);
145         }
146     
147         // now iterate through all containers again, and move containers to approprate dir
148         LOG.info("Moving files...");
149         collectionsIt = collections.iterator();
150         while (collectionsIt.hasNext()) {
151             XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
152             LOG.debug("container: " + container);
153             try {
154                 // "close" the container
155                 // this only matters for ZipFiles for now
156                 container.close();
157             } catch (IOException ioe) {
158                 LOG.warn("Error closing " + container, ioe);
159             }
160             if (failed.contains(container)) {
161                 // some docs must have failed, move the whole
162                 // container to the failed dir
163                 if (container.getFile() != null) {
164                     LOG.error("Moving " + container.getFile() + " to problem dir.");
165                     if ((!failedDir.isDirectory() && !failedDir.mkdirs())
166                         || !moveFile(failedDir, container.getFile())) {
167                         LOG.error("Could not move: " + container.getFile());
168                         recordUnmovablePendingFile(container.getFile(), LOAD_TIME);         
169                     }
170                 }
171             } else {
172                 if (container.getFile() != null) {
173                     LOG.info("Moving " + container.getFile() + " to loaded dir.");
174                     if((!completeDir.isDirectory() && !completeDir.mkdirs())
175                         || !moveFile(completeDir, container.getFile())){
176                         LOG.error("Could not move: " + container.getFile());
177                         recordUnmovablePendingFile(container.getFile(), LOAD_TIME);         
178                     }
179                 }
180             }
181         }
182     }
183 
184     private boolean inPendingMoveFailedArchive(File xmlDataFile){
185         if (xmlDataFile == null) return false;
186         BufferedReader inFile = null;
187         File movesFailedFile = new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE);
188         if (!movesFailedFile.isFile()) return false;
189         try {
190             inFile = new BufferedReader(new FileReader(movesFailedFile));
191             String line;
192             
193             while((line = inFile.readLine()) != null){
194                 String trimmedLine = line.trim();
195                 if(trimmedLine.equals(xmlDataFile.getName()) ||
196                    trimmedLine.startsWith(xmlDataFile.getName() + "=")) { 
197                     return true;
198                 }
199             }
200         } catch (IOException e){
201             LOG.warn("Error reading file " + movesFailedFile);
202             //TODO try reading the pending file or stop?
203         } finally {
204             if (inFile != null) try {
205                 inFile.close();
206             } catch (Exception e) {
207                 LOG.warn("Error closing buffered reader for " + movesFailedFile);
208             }
209         }
210       
211         return false;
212     }
213 
214     private boolean recordUnmovablePendingFile(File unMovablePendingFile, Date dateLoaded){
215         boolean recorded = false;
216         FileWriter archiveFile = null;
217         try{
218             archiveFile = new FileWriter(new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE), true);  
219             archiveFile.write(unMovablePendingFile.getName() + "=" + dateLoaded.getTime() + NEW_LINE);
220             recorded = true;
221         } catch (IOException e){
222             LOG.error("Unable to record unmovable pending file " + unMovablePendingFile.getName() + "in the archive file " + PENDING_MOVE_FAILED_ARCHIVE_FILE);
223         } finally {
224             if (archiveFile != null) {
225                 try {
226                     archiveFile.close();
227                 } catch (IOException ioe) {
228                     LOG.error("Error closing unmovable pending file", ioe);
229                 }
230             }
231         }
232         return recorded;       
233     }
234 
235     private boolean moveFile(File toDirectory, File fileToMove){
236         boolean moved = true;
237         if (!fileToMove.renameTo(new File(toDirectory.getPath(), fileToMove.getName()))){
238             LOG.error("Unable to move file " + fileToMove.getName() + " to directory " + toDirectory.getPath());
239             moved = false;
240         }
241         return moved;
242     }
243 
244     private File getXmlPendingDir() {
245         return new File(getXmlPendingLocation());
246     }
247 
248     private File getXmlCompleteDir() {
249         return new File(getXmlCompletedLocation());
250     }
251 
252     private File getXmlProblemDir() {
253         return new File(getXmlProblemLocation());
254     }
255 
256     public String getXmlCompletedLocation() {
257         return xmlCompletedLocation;
258     }
259 
260     public void setXmlCompletedLocation(String xmlCompletedLocation) {
261         this.xmlCompletedLocation = xmlCompletedLocation;
262     }
263 
264     public String getXmlPendingLocation() {
265         return xmlPendingLocation;
266     }
267 
268     /*public boolean validate(File uploadedFile) {
269         XmlDataLoaderFileFilter filter = new XmlDataLoaderFileFilter();
270         return filter.accept(uploadedFile);
271     }*/
272 
273     public void setXmlPendingLocation(String xmlPendingLocation) {
274         this.xmlPendingLocation = xmlPendingLocation;
275     }
276 
277     public String getXmlProblemLocation() {
278         return xmlProblemLocation;
279     }
280 
281     public void setXmlProblemLocation(String xmlProblemLocation) {
282         this.xmlProblemLocation = xmlProblemLocation;
283     }
284     public String getXmlParentDirectory() {
285         return xmlParentDirectory;
286     }
287     public void setXmlParentDirectory(String xmlDataParentDirectory) {
288         this.xmlParentDirectory = xmlDataParentDirectory;
289     }
290 
291     /**
292      * Sets the polling interval time in seconds
293      * @param seconds the polling interval time in seconds
294      */
295     public void setPollIntervalSecs(int seconds) {
296         this.pollIntervalSecs = seconds;
297     }
298 
299     /**
300      * Gets the polling interval time in seconds
301      * @return the polling interval time in seconds
302      */
303     public int getPollIntervalSecs() {
304         return this.pollIntervalSecs;
305     }
306 
307     /**
308      * Sets the initial delay time in seconds
309      * @param seconds the initial delay time in seconds
310      */
311     public void setInitialDelaySecs(int seconds) {
312         this.initialDelaySecs = seconds;
313     }
314 
315     /**
316      * Gets the initial delay time in seconds
317      * @return the initial delay time in seconds
318      */
319     public int getInitialDelaySecs() {
320         return this.initialDelaySecs;
321     }
322 }