View Javadoc

1   /*
2    * Copyright 2005-2007 The Kuali Foundation
3    * 
4    * 
5    * Licensed under the Educational Community License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    * 
9    * http://www.opensource.org/licenses/ecl2.php
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.kuali.rice.kew.batch;
18  
19  import java.io.BufferedReader;
20  import java.io.File;
21  import java.io.FileReader;
22  import java.io.FileWriter;
23  import java.io.IOException;
24  import java.text.Format;
25  import java.text.SimpleDateFormat;
26  import java.util.ArrayList;
27  import java.util.Arrays;
28  import java.util.Calendar;
29  import java.util.Collection;
30  import java.util.Date;
31  import java.util.Iterator;
32  import java.util.List;
33  
34  import org.apache.commons.io.IOUtils;
35  import org.kuali.rice.kew.service.KEWServiceLocator;
36  
37  /**
38   * Utility class responsible for polling and ingesting XML data files containing various forms of workflow engine data
39   * (e.g. document types and rules). Loaded files and problem files are placed into a subdirectory of a configured
40   * 'loaded' and 'problem' directory, respectively. "Problem-ness" is determined by inspecting a 'processed' flag on each
41   * <code>XmlDoc</code> in each collection. If not all <code>XmlDoc</code>s are marked 'processed' an error is assumed,
42   * and the collection file (e.g. for a Zip, the Zip file) is moved to the 'problem' directory. As such, it is the <b>
43   * <code>XmlIngesterService</code>'s responsibility</b> to mark any unknown or otherwise innocuous non-failure
44   * non-processed files, as 'processed'. A different mechanism can be developed if this proves to be a problem, but for
45   * now it is simple enough for the <code>XmlIngesterService</code> to determine this.
46   * 
47   * @see org.kuali.rice.kew.batch.XmlPollerService
48   * @see org.kuali.rice.kew.batch.XmlIngesterServiceImpl
49   * @author Kuali Rice Team (rice.collab@kuali.org)
50   */
51  public class MyXMLPollerServiceImpl implements XmlPollerService {
52  
53      private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger.getLogger(XmlPollerServiceImpl.class);
54      private static final Format DIR_FORMAT = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss-SSS");
55  
56      /**
57       * Specifies the polling interval that should be used with this task.
58       */
59      private int pollIntervalSecs = 5 * 60; // default to 5 minutes
60      /**
61       * Specifies the initial delay the poller should wait before starting to poll
62       */
63      private int initialDelaySecs = 30; // default to 30 seconds
64      /**
65       * Location in which to find XML files to load.
66       */
67      private String xmlPendingLocation;
68      /**
69       * Location in which to place successfully loaded XML files.
70       */
71      private String xmlCompletedLocation;
72      /**
73       * Location in which to place XML files which have failed to load.
74       */
75      private String xmlProblemLocation;
76  
77      private String xmlParentDirectory;
78      private static final String PENDING_MOVE_FAILED_ARCHIVE_FILE = "movesfailed";
79      private static final String NEW_LINE = "\n";
80  
81      @Override
82      public void run() {
83          // if(!directoriesWritable()){
84          // LOG.error("Error writing to xml data directories. Stopping xmlLoader ...");
85          // this.cancel();
86          // }
87          LOG.debug("checking for xml data files...");
88          File[] files = getXmlPendingDir().listFiles();
89          if (files == null || files.length == 0) {
90              return;
91          }
92          // Sort them so the ordering is deterministic by file name instead of random
93          Arrays.sort(files);
94  
95          LOG.info("Found " + files.length + " files to ingest.");
96  
97          List<XmlDocCollection> collections = new ArrayList<XmlDocCollection>();
98          for (File file : files) {
99              if (file.isDirectory()) {
100                 collections.add(new DirectoryXmlDocCollection(file));
101             } else if (file.getName().equals(PENDING_MOVE_FAILED_ARCHIVE_FILE)) {
102                 // the movesfailed file...ignore this
103                 continue;
104             } else if (file.getName().toLowerCase().endsWith(".zip")) {
105                 try {
106                     collections.add(new ZipXmlDocCollection(file));
107                 } catch (IOException ioe) {
108                     LOG.error("Unable to load file: " + file);
109                 }
110             } else if (file.getName().endsWith(".xml")) {
111                 collections.add(new FileXmlDocCollection(file));
112             } else {
113                 LOG.warn("Ignoring extraneous file in xml pending directory: " + file);
114             }
115         }
116 
117         // Cull any resources which were already processed and whose moves failed
118         Iterator<?> collectionsIt = collections.iterator();
119         Collection<XmlDocCollection> culled = new ArrayList<XmlDocCollection>();
120         while (collectionsIt.hasNext()) {
121             XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
122             // if a move has already failed for this archive, ignore it
123             if (inPendingMoveFailedArchive(container.getFile())) {
124                 LOG.info("Ignoring previously processed resource: " + container);
125                 culled.add(container);
126             }
127         }
128         collections.removeAll(culled);
129 
130         if (collections.size() == 0) {
131             LOG.debug("No valid new resources found to ingest");
132             return;
133         }
134 
135         Date LOAD_TIME = Calendar.getInstance().getTime();
136         // synchronization around date format should not be an issue as this code is single-threaded
137         File completeDir = new File(getXmlCompleteDir(), DIR_FORMAT.format(LOAD_TIME));
138         File failedDir = new File(getXmlProblemDir(), DIR_FORMAT.format(LOAD_TIME));
139 
140         // now ingest the containers
141         Collection<?> failed = null;
142         try {
143             failed = KEWServiceLocator.getXmlIngesterService().ingest(collections);
144         } catch (Exception e) {
145             LOG.error("Error ingesting data", e);
146             // throw new RuntimeException(e);
147         }
148 
149         // now iterate through all containers again, and move containers to approprate dir
150         LOG.info("Moving files...");
151         collectionsIt = collections.iterator();
152         while (collectionsIt.hasNext()) {
153             XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
154             LOG.debug("container: " + container);
155             try {
156                 // "close" the container
157                 // this only matters for ZipFiles for now
158                 container.close();
159             } catch (IOException ioe) {
160                 LOG.warn("Error closing " + container, ioe);
161             }
162             if (failed.contains(container)) {
163                 // some docs must have failed, move the whole
164                 // container to the failed dir
165                 if (container.getFile() != null) {
166                     LOG.error("Moving " + container.getFile() + " to problem dir.");
167                     if ((!failedDir.isDirectory() && !failedDir.mkdirs()) || !moveFile(failedDir, container.getFile())) {
168                         LOG.error("Could not move: " + container.getFile());
169                         recordUnmovablePendingFile(container.getFile(), LOAD_TIME);
170                     }
171                 }
172             } else {
173                 if (container.getFile() != null) {
174                     LOG.info("Moving " + container.getFile() + " to loaded dir.");
175                     if ((!completeDir.isDirectory() && !completeDir.mkdirs())
176                             || !moveFile(completeDir, container.getFile())) {
177                         LOG.error("Could not move: " + container.getFile());
178                         recordUnmovablePendingFile(container.getFile(), LOAD_TIME);
179                     }
180                 }
181             }
182         }
183     }
184 
185     private boolean inPendingMoveFailedArchive(File xmlDataFile) {
186         if (xmlDataFile == null) {
187             return false;
188         }
189         File movesFailedFile = new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE);
190         if (!movesFailedFile.isFile()) {
191             return false;
192         }
193         BufferedReader inFile = null;
194         try {
195             inFile = new BufferedReader(new FileReader(movesFailedFile));
196             String line;
197             boolean found = false;
198             while ((line = inFile.readLine()) != null) {
199                 String trimmedLine = line.trim();
200                 if (trimmedLine.equals(xmlDataFile.getName()) || trimmedLine.startsWith(xmlDataFile.getName() + "=")) {
201                     found = true;
202                     break;
203                 }
204             }
205             return found;
206         } catch (IOException e) {
207             LOG.warn("Error reading file " + movesFailedFile);
208         } finally {
209             IOUtils.closeQuietly(inFile);
210         }
211         return false;
212     }
213 
214     private boolean recordUnmovablePendingFile(File unMovablePendingFile, Date dateLoaded) {
215         boolean recorded = false;
216         FileWriter archiveFile = null;
217         try {
218             archiveFile = new FileWriter(new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE), true);
219             archiveFile.write(unMovablePendingFile.getName() + "=" + dateLoaded.getTime() + NEW_LINE);
220             recorded = true;
221         } catch (IOException e) {
222             LOG.error("Unable to record unmovable pending file " + unMovablePendingFile.getName()
223                     + "in the archive file " + PENDING_MOVE_FAILED_ARCHIVE_FILE);
224         } finally {
225             if (archiveFile != null) {
226                 try {
227                     archiveFile.close();
228                 } catch (IOException ioe) {
229                     LOG.error("Error closing unmovable pending file", ioe);
230                 }
231             }
232         }
233         return recorded;
234     }
235 
236     private boolean moveFile(File toDirectory, File fileToMove) {
237         boolean moved = true;
238         if (!fileToMove.renameTo(new File(toDirectory.getPath(), fileToMove.getName()))) {
239             LOG.error("Unable to move file " + fileToMove.getName() + " to directory " + toDirectory.getPath());
240             moved = false;
241         }
242         return moved;
243     }
244 
245     private File getXmlPendingDir() {
246         return new File(getXmlPendingLocation());
247     }
248 
249     private File getXmlCompleteDir() {
250         return new File(getXmlCompletedLocation());
251     }
252 
253     private File getXmlProblemDir() {
254         return new File(getXmlProblemLocation());
255     }
256 
257     public String getXmlCompletedLocation() {
258         return xmlCompletedLocation;
259     }
260 
261     public void setXmlCompletedLocation(String xmlCompletedLocation) {
262         this.xmlCompletedLocation = xmlCompletedLocation;
263     }
264 
265     public String getXmlPendingLocation() {
266         return xmlPendingLocation;
267     }
268 
269     /*
270      * public boolean validate(File uploadedFile) { XmlDataLoaderFileFilter filter = new XmlDataLoaderFileFilter();
271      * return filter.accept(uploadedFile); }
272      */
273 
274     public void setXmlPendingLocation(String xmlPendingLocation) {
275         this.xmlPendingLocation = xmlPendingLocation;
276     }
277 
278     public String getXmlProblemLocation() {
279         return xmlProblemLocation;
280     }
281 
282     public void setXmlProblemLocation(String xmlProblemLocation) {
283         this.xmlProblemLocation = xmlProblemLocation;
284     }
285 
286     public String getXmlParentDirectory() {
287         return xmlParentDirectory;
288     }
289 
290     public void setXmlParentDirectory(String xmlDataParentDirectory) {
291         this.xmlParentDirectory = xmlDataParentDirectory;
292     }
293 
294     /**
295      * Sets the polling interval time in seconds
296      * 
297      * @param seconds
298      *            the polling interval time in seconds
299      */
300     public void setPollIntervalSecs(int seconds) {
301         this.pollIntervalSecs = seconds;
302     }
303 
304     /**
305      * Gets the polling interval time in seconds
306      * 
307      * @return the polling interval time in seconds
308      */
309     @Override
310     public int getPollIntervalSecs() {
311         return this.pollIntervalSecs;
312     }
313 
314     /**
315      * Sets the initial delay time in seconds
316      * 
317      * @param seconds
318      *            the initial delay time in seconds
319      */
320     public void setInitialDelaySecs(int seconds) {
321         this.initialDelaySecs = seconds;
322     }
323 
324     /**
325      * Gets the initial delay time in seconds
326      * 
327      * @return the initial delay time in seconds
328      */
329     @Override
330     public int getInitialDelaySecs() {
331         return this.initialDelaySecs;
332     }
333 }