View Javadoc
1   /**
2    * Copyright 2005-2015 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.kew.batch;
17  
18  import org.kuali.rice.core.api.CoreApiServiceLocator;
19  import org.kuali.rice.core.api.impex.xml.DirectoryXmlDocCollection;
20  import org.kuali.rice.core.api.impex.xml.FileXmlDocCollection;
21  import org.kuali.rice.core.api.impex.xml.XmlDocCollection;
22  import org.kuali.rice.core.api.impex.xml.ZipXmlDocCollection;
23  
24  import java.io.BufferedReader;
25  import java.io.File;
26  import java.io.FileReader;
27  import java.io.FileWriter;
28  import java.io.IOException;
29  import java.text.Format;
30  import java.text.SimpleDateFormat;
31  import java.util.ArrayList;
32  import java.util.Calendar;
33  import java.util.Collection;
34  import java.util.Date;
35  import java.util.Iterator;
36  import java.util.List;
37  
38  
39  /**
40   * Utility class responsible for polling and ingesting XML data files
41   * containing various forms of workflow engine data (e.g. document types
42   * and rules).
43   * Loaded files and problem files are placed into a subdirectory of a
44   * configured 'loaded' and 'problem' directory, respectively.
45   * "Problem-ness" is determined by inspecting a 'processed' flag on each <code>XmlDoc</code>
46   * in each collection.  If not all <code>XmlDoc</code>s are marked 'processed' an
47   * error is assumed, and the collection file (e.g. for a Zip, the Zip file) is moved
48   * to the 'problem' directory.
49   * As such, it is the <b><code>XmlIngesterService</code>'s responsibility</b> to mark
50   * any unknown or otherwise innocuous non-failure non-processed files, as 'processed'.
51   * A different mechanism can be developed if this proves to be a problem, but for now
52   * it is simple enough for the <code>XmlIngesterService</code> to determine this.
53   * @see org.kuali.rice.kew.batch.XmlPollerService
54   * @see org.kuali.rice.kew.batch.XmlIngesterServiceImpl
55   * @author Kuali Rice Team (rice.collab@kuali.org)
56   */
57  public class XmlPollerServiceImpl implements XmlPollerService {
58  
59      private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger
60              .getLogger(XmlPollerServiceImpl.class);
61      private static final Format DIR_FORMAT = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss-SSS");
62  
63      /**
64       * Specifies the polling interval that should be used with this task.
65       */
66      private int pollIntervalSecs = 5 * 60; // default to 5 minutes
67      /**
68       * Specifies the initial delay the poller should wait before starting to poll
69       */
70      private int initialDelaySecs = 30; // default to 30 seconds
71      /**
72       * Location in which to find XML files to load.
73       */
74      private String xmlPendingLocation;
75      /**
76       * Location in which to place successfully loaded XML files.
77       */
78      private String xmlCompletedLocation;
79      /**
80       * Location in which to place XML files which have failed to load.
81       */
82      private String xmlProblemLocation;
83      
84      private String xmlParentDirectory;
85      private static final String PENDING_MOVE_FAILED_ARCHIVE_FILE = "movesfailed";
86      private static final String NEW_LINE = "\n";
87  
88      public void run() {
89          // if(!directoriesWritable()){
90          //     LOG.error("Error writing to xml data directories. Stopping xmlLoader ...");
91          //     this.cancel();
92          // }
93          LOG.debug("checking for xml data files...");
94          File[] files = getXmlPendingDir().listFiles();
95          if (files == null || files.length == 0) {
96          	return;
97          }
98          LOG.info("Found " + files.length + " files to ingest.");
99          List<XmlDocCollection> collections = new ArrayList<XmlDocCollection>();
100         for (File file : files)
101         {
102             if (file.isDirectory())
103             {
104                 collections.add(new DirectoryXmlDocCollection(file));
105             } else if (file.getName().equals(PENDING_MOVE_FAILED_ARCHIVE_FILE))
106             {
107                 // the movesfailed file...ignore this
108                 continue;
109             } else if (file.getName().toLowerCase().endsWith(".zip"))
110             {
111                 try
112                 {
113                     collections.add(new ZipXmlDocCollection(file));
114                 } catch (IOException ioe)
115                 {
116                     LOG.error("Unable to load file: " + file);
117                 }
118             } else if (file.getName().endsWith(".xml"))
119             {
120                 collections.add(new FileXmlDocCollection(file));
121             } else
122             {
123                 LOG.warn("Ignoring extraneous file in xml pending directory: " + file);
124             }
125         }
126 
127         // Cull any resources which were already processed and whose moves failed
128         Iterator collectionsIt = collections.iterator();
129         Collection<XmlDocCollection> culled = new ArrayList<XmlDocCollection>();
130         while (collectionsIt.hasNext()) {
131             XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
132             // if a move has already failed for this archive, ignore it
133             if (inPendingMoveFailedArchive(container.getFile())) {
134                 LOG.info("Ignoring previously processed resource: " + container);
135                 culled.add(container);
136             }
137         }
138         collections.removeAll(culled);
139 
140         if (collections.size() == 0) {
141             LOG.debug("No valid new resources found to ingest");
142             return;
143         }
144 
145         Date LOAD_TIME = Calendar.getInstance().getTime();
146         // synchronization around date format should not be an issue as this code is single-threaded
147         File completeDir = new File(getXmlCompleteDir(), DIR_FORMAT.format(LOAD_TIME));
148         File failedDir = new File(getXmlProblemDir(), DIR_FORMAT.format(LOAD_TIME));
149 
150         // now ingest the containers
151         Collection failed = null;
152         try {
153             failed = CoreApiServiceLocator.getXmlIngesterService().ingest(collections);
154         } catch (Exception e) {
155             LOG.error("Error ingesting data", e);
156             //throw new RuntimeException(e);
157         }
158     
159         // now iterate through all containers again, and move containers to approprate dir
160         LOG.info("Moving files...");
161         collectionsIt = collections.iterator();
162         while (collectionsIt.hasNext()) {
163             XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
164             LOG.debug("container: " + container);
165             try {
166                 // "close" the container
167                 // this only matters for ZipFiles for now
168                 container.close();
169             } catch (IOException ioe) {
170                 LOG.warn("Error closing " + container, ioe);
171             }
172             if (failed.contains(container)) {
173                 // some docs must have failed, move the whole
174                 // container to the failed dir
175                 if (container.getFile() != null) {
176                     LOG.error("Moving " + container.getFile() + " to problem dir.");
177                     if ((!failedDir.isDirectory() && !failedDir.mkdirs())
178                         || !moveFile(failedDir, container.getFile())) {
179                         LOG.error("Could not move: " + container.getFile());
180                         recordUnmovablePendingFile(container.getFile(), LOAD_TIME);         
181                     }
182                 }
183             } else {
184                 if (container.getFile() != null) {
185                     LOG.info("Moving " + container.getFile() + " to loaded dir.");
186                     if((!completeDir.isDirectory() && !completeDir.mkdirs())
187                         || !moveFile(completeDir, container.getFile())){
188                         LOG.error("Could not move: " + container.getFile());
189                         recordUnmovablePendingFile(container.getFile(), LOAD_TIME);         
190                     }
191                 }
192             }
193         }
194     }
195 
196     private boolean inPendingMoveFailedArchive(File xmlDataFile){
197         if (xmlDataFile == null) return false;
198         BufferedReader inFile = null;
199         File movesFailedFile = new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE);
200         if (!movesFailedFile.isFile()) return false;
201         try {
202             inFile = new BufferedReader(new FileReader(movesFailedFile));
203             String line;
204             
205             while((line = inFile.readLine()) != null){
206                 String trimmedLine = line.trim();
207                 if(trimmedLine.equals(xmlDataFile.getName()) ||
208                    trimmedLine.startsWith(xmlDataFile.getName() + "=")) { 
209                     return true;
210                 }
211             }
212         } catch (IOException e){
213             LOG.warn("Error reading file " + movesFailedFile);
214             //TODO try reading the pending file or stop?
215         } finally {
216             if (inFile != null) try {
217                 inFile.close();
218             } catch (Exception e) {
219                 LOG.warn("Error closing buffered reader for " + movesFailedFile);
220             }
221         }
222       
223         return false;
224     }
225 
226     private boolean recordUnmovablePendingFile(File unMovablePendingFile, Date dateLoaded){
227         boolean recorded = false;
228         FileWriter archiveFile = null;
229         try{
230             archiveFile = new FileWriter(new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE), true);  
231             archiveFile.write(unMovablePendingFile.getName() + "=" + dateLoaded.getTime() + NEW_LINE);
232             recorded = true;
233         } catch (IOException e){
234             LOG.error("Unable to record unmovable pending file " + unMovablePendingFile.getName() + "in the archive file " + PENDING_MOVE_FAILED_ARCHIVE_FILE);
235         } finally {
236             if (archiveFile != null) {
237                 try {
238                     archiveFile.close();
239                 } catch (IOException ioe) {
240                     LOG.error("Error closing unmovable pending file", ioe);
241                 }
242             }
243         }
244         return recorded;       
245     }
246 
247     private boolean moveFile(File toDirectory, File fileToMove){
248         boolean moved = true;
249         if (!fileToMove.renameTo(new File(toDirectory.getPath(), fileToMove.getName()))){
250             LOG.error("Unable to move file " + fileToMove.getName() + " to directory " + toDirectory.getPath());
251             moved = false;
252         }
253         return moved;
254     }
255 
256     private File getXmlPendingDir() {
257         return new File(getXmlPendingLocation());
258     }
259 
260     private File getXmlCompleteDir() {
261         return new File(getXmlCompletedLocation());
262     }
263 
264     private File getXmlProblemDir() {
265         return new File(getXmlProblemLocation());
266     }
267 
268     public String getXmlCompletedLocation() {
269         return xmlCompletedLocation;
270     }
271 
272     public void setXmlCompletedLocation(String xmlCompletedLocation) {
273         this.xmlCompletedLocation = xmlCompletedLocation;
274     }
275 
276     public String getXmlPendingLocation() {
277         return xmlPendingLocation;
278     }
279 
280     /*public boolean validate(File uploadedFile) {
281         XmlDataLoaderFileFilter filter = new XmlDataLoaderFileFilter();
282         return filter.accept(uploadedFile);
283     }*/
284 
285     public void setXmlPendingLocation(String xmlPendingLocation) {
286         this.xmlPendingLocation = xmlPendingLocation;
287     }
288 
289     public String getXmlProblemLocation() {
290         return xmlProblemLocation;
291     }
292 
293     public void setXmlProblemLocation(String xmlProblemLocation) {
294         this.xmlProblemLocation = xmlProblemLocation;
295     }
296     public String getXmlParentDirectory() {
297         return xmlParentDirectory;
298     }
299     public void setXmlParentDirectory(String xmlDataParentDirectory) {
300         this.xmlParentDirectory = xmlDataParentDirectory;
301     }
302 
303     /**
304      * Sets the polling interval time in seconds
305      * @param seconds the polling interval time in seconds
306      */
307     public void setPollIntervalSecs(int seconds) {
308         this.pollIntervalSecs = seconds;
309     }
310 
311     /**
312      * Gets the polling interval time in seconds
313      * @return the polling interval time in seconds
314      */
315     public int getPollIntervalSecs() {
316         return this.pollIntervalSecs;
317     }
318 
319     /**
320      * Sets the initial delay time in seconds
321      * @param seconds the initial delay time in seconds
322      */
323     public void setInitialDelaySecs(int seconds) {
324         this.initialDelaySecs = seconds;
325     }
326 
327     /**
328      * Gets the initial delay time in seconds
329      * @return the initial delay time in seconds
330      */
331     public int getInitialDelaySecs() {
332         return this.initialDelaySecs;
333     }
334 }