View Javadoc

1   /*
2    * Copyright 2005-2007 The Kuali Foundation
3    * 
4    * 
5    * Licensed under the Educational Community License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    * 
9    * http://www.opensource.org/licenses/ecl2.php
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.kuali.rice.kew.batch;
18  
19  import java.io.BufferedReader;
20  import java.io.File;
21  import java.io.FileReader;
22  import java.io.FileWriter;
23  import java.io.IOException;
24  import java.text.Format;
25  import java.text.SimpleDateFormat;
26  import java.util.ArrayList;
27  import java.util.Calendar;
28  import java.util.Collection;
29  import java.util.Date;
30  import java.util.Iterator;
31  import java.util.List;
32  
33  import org.kuali.rice.core.api.CoreApiServiceLocator;
34  import org.kuali.rice.core.api.impex.xml.DirectoryXmlDocCollection;
35  import org.kuali.rice.core.api.impex.xml.FileXmlDocCollection;
36  import org.kuali.rice.core.api.impex.xml.XmlDocCollection;
37  import org.kuali.rice.core.api.impex.xml.XmlIngesterService;
38  import org.kuali.rice.core.api.impex.xml.ZipXmlDocCollection;
39  
40  
41  /**
42   * Utility class responsible for polling and ingesting XML data files
43   * containing various forms of workflow engine data (e.g. document types
44   * and rules).
45   * Loaded files and problem files are placed into a subdirectory of a
46   * configured 'loaded' and 'problem' directory, respectively.
47   * "Problem-ness" is determined by inspecting a 'processed' flag on each <code>XmlDoc</code>
48   * in each collection.  If not all <code>XmlDoc</code>s are marked 'processed' an
49   * error is assumed, and the collection file (e.g. for a Zip, the Zip file) is moved
50   * to the 'problem' directory.
51   * As such, it is the <b><code>XmlIngesterService</code>'s responsibility</b> to mark
52   * any unknown or otherwise innocuous non-failure non-processed files, as 'processed'.
53   * A different mechanism can be developed if this proves to be a problem, but for now
54   * it is simple enough for the <code>XmlIngesterService</code> to determine this.
55   * @see org.kuali.rice.kew.batch.XmlPollerService
56   * @see org.kuali.rice.kew.batch.XmlIngesterServiceImpl
57   * @author Kuali Rice Team (rice.collab@kuali.org)
58   */
59  public class XmlPollerServiceImpl implements XmlPollerService {
60  
61      private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger
62              .getLogger(XmlPollerServiceImpl.class);
63      private static final Format DIR_FORMAT = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss-SSS");
64  
65      /**
66       * Specifies the polling interval that should be used with this task.
67       */
68      private int pollIntervalSecs = 5 * 60; // default to 5 minutes
69      /**
70       * Specifies the initial delay the poller should wait before starting to poll
71       */
72      private int initialDelaySecs = 30; // default to 30 seconds
73      /**
74       * Location in which to find XML files to load.
75       */
76      private String xmlPendingLocation;
77      /**
78       * Location in which to place successfully loaded XML files.
79       */
80      private String xmlCompletedLocation;
81      /**
82       * Location in which to place XML files which have failed to load.
83       */
84      private String xmlProblemLocation;
85      
86      private String xmlParentDirectory;
87      private static final String PENDING_MOVE_FAILED_ARCHIVE_FILE = "movesfailed";
88      private static final String NEW_LINE = "\n";
89  
90      public void run() {
91          // if(!directoriesWritable()){
92          //     LOG.error("Error writing to xml data directories. Stopping xmlLoader ...");
93          //     this.cancel();
94          // }
95          LOG.debug("checking for xml data files...");
96          File[] files = getXmlPendingDir().listFiles();
97          if (files == null || files.length == 0) {
98          	return;
99          }
100         LOG.info("Found " + files.length + " files to ingest.");
101         List<XmlDocCollection> collections = new ArrayList<XmlDocCollection>();
102         for (File file : files)
103         {
104             if (file.isDirectory())
105             {
106                 collections.add(new DirectoryXmlDocCollection(file));
107             } else if (file.getName().equals(PENDING_MOVE_FAILED_ARCHIVE_FILE))
108             {
109                 // the movesfailed file...ignore this
110                 continue;
111             } else if (file.getName().toLowerCase().endsWith(".zip"))
112             {
113                 try
114                 {
115                     collections.add(new ZipXmlDocCollection(file));
116                 } catch (IOException ioe)
117                 {
118                     LOG.error("Unable to load file: " + file);
119                 }
120             } else if (file.getName().endsWith(".xml"))
121             {
122                 collections.add(new FileXmlDocCollection(file));
123             } else
124             {
125                 LOG.warn("Ignoring extraneous file in xml pending directory: " + file);
126             }
127         }
128 
129         // Cull any resources which were already processed and whose moves failed
130         Iterator collectionsIt = collections.iterator();
131         Collection<XmlDocCollection> culled = new ArrayList<XmlDocCollection>();
132         while (collectionsIt.hasNext()) {
133             XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
134             // if a move has already failed for this archive, ignore it
135             if (inPendingMoveFailedArchive(container.getFile())) {
136                 LOG.info("Ignoring previously processed resource: " + container);
137                 culled.add(container);
138             }
139         }
140         collections.removeAll(culled);
141 
142         if (collections.size() == 0) {
143             LOG.debug("No valid new resources found to ingest");
144             return;
145         }
146 
147         Date LOAD_TIME = Calendar.getInstance().getTime();
148         // synchronization around date format should not be an issue as this code is single-threaded
149         File completeDir = new File(getXmlCompleteDir(), DIR_FORMAT.format(LOAD_TIME));
150         File failedDir = new File(getXmlProblemDir(), DIR_FORMAT.format(LOAD_TIME));
151 
152         // now ingest the containers
153         Collection failed = null;
154         try {
155             failed = CoreApiServiceLocator.getXmlIngesterService().ingest(collections);
156         } catch (Exception e) {
157             LOG.error("Error ingesting data", e);
158             //throw new RuntimeException(e);
159         }
160     
161         // now iterate through all containers again, and move containers to approprate dir
162         LOG.info("Moving files...");
163         collectionsIt = collections.iterator();
164         while (collectionsIt.hasNext()) {
165             XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
166             LOG.debug("container: " + container);
167             try {
168                 // "close" the container
169                 // this only matters for ZipFiles for now
170                 container.close();
171             } catch (IOException ioe) {
172                 LOG.warn("Error closing " + container, ioe);
173             }
174             if (failed.contains(container)) {
175                 // some docs must have failed, move the whole
176                 // container to the failed dir
177                 if (container.getFile() != null) {
178                     LOG.error("Moving " + container.getFile() + " to problem dir.");
179                     if ((!failedDir.isDirectory() && !failedDir.mkdirs())
180                         || !moveFile(failedDir, container.getFile())) {
181                         LOG.error("Could not move: " + container.getFile());
182                         recordUnmovablePendingFile(container.getFile(), LOAD_TIME);         
183                     }
184                 }
185             } else {
186                 if (container.getFile() != null) {
187                     LOG.info("Moving " + container.getFile() + " to loaded dir.");
188                     if((!completeDir.isDirectory() && !completeDir.mkdirs())
189                         || !moveFile(completeDir, container.getFile())){
190                         LOG.error("Could not move: " + container.getFile());
191                         recordUnmovablePendingFile(container.getFile(), LOAD_TIME);         
192                     }
193                 }
194             }
195         }
196     }
197 
198     private boolean inPendingMoveFailedArchive(File xmlDataFile){
199         if (xmlDataFile == null) return false;
200         BufferedReader inFile = null;
201         File movesFailedFile = new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE);
202         if (!movesFailedFile.isFile()) return false;
203         try {
204             inFile = new BufferedReader(new FileReader(movesFailedFile));
205             String line;
206             
207             while((line = inFile.readLine()) != null){
208                 String trimmedLine = line.trim();
209                 if(trimmedLine.equals(xmlDataFile.getName()) ||
210                    trimmedLine.startsWith(xmlDataFile.getName() + "=")) { 
211                     return true;
212                 }
213             }
214         } catch (IOException e){
215             LOG.warn("Error reading file " + movesFailedFile);
216             //TODO try reading the pending file or stop?
217         } finally {
218             if (inFile != null) try {
219                 inFile.close();
220             } catch (Exception e) {
221                 LOG.warn("Error closing buffered reader for " + movesFailedFile);
222             }
223         }
224       
225         return false;
226     }
227 
228     private boolean recordUnmovablePendingFile(File unMovablePendingFile, Date dateLoaded){
229         boolean recorded = false;
230         FileWriter archiveFile = null;
231         try{
232             archiveFile = new FileWriter(new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE), true);  
233             archiveFile.write(unMovablePendingFile.getName() + "=" + dateLoaded.getTime() + NEW_LINE);
234             recorded = true;
235         } catch (IOException e){
236             LOG.error("Unable to record unmovable pending file " + unMovablePendingFile.getName() + "in the archive file " + PENDING_MOVE_FAILED_ARCHIVE_FILE);
237         } finally {
238             if (archiveFile != null) {
239                 try {
240                     archiveFile.close();
241                 } catch (IOException ioe) {
242                     LOG.error("Error closing unmovable pending file", ioe);
243                 }
244             }
245         }
246         return recorded;       
247     }
248 
249     private boolean moveFile(File toDirectory, File fileToMove){
250         boolean moved = true;
251         if (!fileToMove.renameTo(new File(toDirectory.getPath(), fileToMove.getName()))){
252             LOG.error("Unable to move file " + fileToMove.getName() + " to directory " + toDirectory.getPath());
253             moved = false;
254         }
255         return moved;
256     }
257 
258     private File getXmlPendingDir() {
259         return new File(getXmlPendingLocation());
260     }
261 
262     private File getXmlCompleteDir() {
263         return new File(getXmlCompletedLocation());
264     }
265 
266     private File getXmlProblemDir() {
267         return new File(getXmlProblemLocation());
268     }
269 
270     public String getXmlCompletedLocation() {
271         return xmlCompletedLocation;
272     }
273 
274     public void setXmlCompletedLocation(String xmlCompletedLocation) {
275         this.xmlCompletedLocation = xmlCompletedLocation;
276     }
277 
278     public String getXmlPendingLocation() {
279         return xmlPendingLocation;
280     }
281 
282     /*public boolean validate(File uploadedFile) {
283         XmlDataLoaderFileFilter filter = new XmlDataLoaderFileFilter();
284         return filter.accept(uploadedFile);
285     }*/
286 
287     public void setXmlPendingLocation(String xmlPendingLocation) {
288         this.xmlPendingLocation = xmlPendingLocation;
289     }
290 
291     public String getXmlProblemLocation() {
292         return xmlProblemLocation;
293     }
294 
295     public void setXmlProblemLocation(String xmlProblemLocation) {
296         this.xmlProblemLocation = xmlProblemLocation;
297     }
298     public String getXmlParentDirectory() {
299         return xmlParentDirectory;
300     }
301     public void setXmlParentDirectory(String xmlDataParentDirectory) {
302         this.xmlParentDirectory = xmlDataParentDirectory;
303     }
304 
305     /**
306      * Sets the polling interval time in seconds
307      * @param seconds the polling interval time in seconds
308      */
309     public void setPollIntervalSecs(int seconds) {
310         this.pollIntervalSecs = seconds;
311     }
312 
313     /**
314      * Gets the polling interval time in seconds
315      * @return the polling interval time in seconds
316      */
317     public int getPollIntervalSecs() {
318         return this.pollIntervalSecs;
319     }
320 
321     /**
322      * Sets the initial delay time in seconds
323      * @param seconds the initial delay time in seconds
324      */
325     public void setInitialDelaySecs(int seconds) {
326         this.initialDelaySecs = seconds;
327     }
328 
329     /**
330      * Gets the initial delay time in seconds
331      * @return the initial delay time in seconds
332      */
333     public int getInitialDelaySecs() {
334         return this.initialDelaySecs;
335     }
336 }