001    /*
002     * Copyright 2005-2007 The Kuali Foundation
003     * 
004     * 
005     * Licensed under the Educational Community License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     * 
009     * http://www.opensource.org/licenses/ecl2.php
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.kuali.rice.kew.batch;
018    
019    import org.kuali.rice.kew.service.KEWServiceLocator;
020    
021    import java.io.*;
022    import java.text.Format;
023    import java.text.SimpleDateFormat;
024    import java.util.*;
025    
026    
027    /**
028     * Utility class responsible for polling and ingesting XML data files
029     * containing various forms of workflow engine data (e.g. document types
030     * and rules).
031     * Loaded files and problem files are placed into a subdirectory of a
032     * configured 'loaded' and 'problem' directory, respectively.
033     * "Problem-ness" is determined by inspecting a 'processed' flag on each <code>XmlDoc</code>
034     * in each collection.  If not all <code>XmlDoc</code>s are marked 'processed' an
035     * error is assumed, and the collection file (e.g. for a Zip, the Zip file) is moved
036     * to the 'problem' directory.
037     * As such, it is the <b><code>XmlIngesterService</code>'s responsibility</b> to mark
038     * any unknown or otherwise innocuous non-failure non-processed files, as 'processed'.
039     * A different mechanism can be developed if this proves to be a problem, but for now
040     * it is simple enough for the <code>XmlIngesterService</code> to determine this.
041     * @see org.kuali.rice.kew.batch.XmlPollerService
042     * @see org.kuali.rice.kew.batch.XmlIngesterServiceImpl
043     * @author Kuali Rice Team (rice.collab@kuali.org)
044     */
045    public class XmlPollerServiceImpl implements XmlPollerService {
046    
047        private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger
048                .getLogger(XmlPollerServiceImpl.class);
049        private static final Format DIR_FORMAT = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss-SSS");
050    
051        /**
052         * Specifies the polling interval that should be used with this task.
053         */
054        private int pollIntervalSecs = 5 * 60; // default to 5 minutes
055        /**
056         * Specifies the initial delay the poller should wait before starting to poll
057         */
058        private int initialDelaySecs = 30; // default to 30 seconds
059        /**
060         * Location in which to find XML files to load.
061         */
062        private String xmlPendingLocation;
063        /**
064         * Location in which to place successfully loaded XML files.
065         */
066        private String xmlCompletedLocation;
067        /**
068         * Location in which to place XML files which have failed to load.
069         */
070        private String xmlProblemLocation;
071        
072        private String xmlParentDirectory;
073        private static final String PENDING_MOVE_FAILED_ARCHIVE_FILE = "movesfailed";
074        private static final String NEW_LINE = "\n";
075    
076        public void run() {
077            // if(!directoriesWritable()){
078            //     LOG.error("Error writing to xml data directories. Stopping xmlLoader ...");
079            //     this.cancel();
080            // }
081            LOG.debug("checking for xml data files...");
082            File[] files = getXmlPendingDir().listFiles();
083            if (files == null || files.length == 0) {
084                    return;
085            }
086            LOG.info("Found " + files.length + " files to ingest.");
087            List<XmlDocCollection> collections = new ArrayList<XmlDocCollection>();
088            for (File file : files)
089            {
090                if (file.isDirectory())
091                {
092                    collections.add(new DirectoryXmlDocCollection(file));
093                } else if (file.getName().equals(PENDING_MOVE_FAILED_ARCHIVE_FILE))
094                {
095                    // the movesfailed file...ignore this
096                    continue;
097                } else if (file.getName().toLowerCase().endsWith(".zip"))
098                {
099                    try
100                    {
101                        collections.add(new ZipXmlDocCollection(file));
102                    } catch (IOException ioe)
103                    {
104                        LOG.error("Unable to load file: " + file);
105                    }
106                } else if (file.getName().endsWith(".xml"))
107                {
108                    collections.add(new FileXmlDocCollection(file));
109                } else
110                {
111                    LOG.warn("Ignoring extraneous file in xml pending directory: " + file);
112                }
113            }
114    
115            // Cull any resources which were already processed and whose moves failed
116            Iterator collectionsIt = collections.iterator();
117            Collection<XmlDocCollection> culled = new ArrayList<XmlDocCollection>();
118            while (collectionsIt.hasNext()) {
119                XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
120                // if a move has already failed for this archive, ignore it
121                if (inPendingMoveFailedArchive(container.getFile())) {
122                    LOG.info("Ignoring previously processed resource: " + container);
123                    culled.add(container);
124                }
125            }
126            collections.removeAll(culled);
127    
128            if (collections.size() == 0) {
129                LOG.debug("No valid new resources found to ingest");
130                return;
131            }
132    
133            Date LOAD_TIME = Calendar.getInstance().getTime();
134            // synchronization around date format should not be an issue as this code is single-threaded
135            File completeDir = new File(getXmlCompleteDir(), DIR_FORMAT.format(LOAD_TIME));
136            File failedDir = new File(getXmlProblemDir(), DIR_FORMAT.format(LOAD_TIME));
137    
138            // now ingest the containers
139            Collection failed = null;
140            try {
141                failed = KEWServiceLocator.getXmlIngesterService().ingest(collections);
142            } catch (Exception e) {
143                LOG.error("Error ingesting data", e);
144                //throw new RuntimeException(e);
145            }
146        
147            // now iterate through all containers again, and move containers to approprate dir
148            LOG.info("Moving files...");
149            collectionsIt = collections.iterator();
150            while (collectionsIt.hasNext()) {
151                XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
152                LOG.debug("container: " + container);
153                try {
154                    // "close" the container
155                    // this only matters for ZipFiles for now
156                    container.close();
157                } catch (IOException ioe) {
158                    LOG.warn("Error closing " + container, ioe);
159                }
160                if (failed.contains(container)) {
161                    // some docs must have failed, move the whole
162                    // container to the failed dir
163                    if (container.getFile() != null) {
164                        LOG.error("Moving " + container.getFile() + " to problem dir.");
165                        if ((!failedDir.isDirectory() && !failedDir.mkdirs())
166                            || !moveFile(failedDir, container.getFile())) {
167                            LOG.error("Could not move: " + container.getFile());
168                            recordUnmovablePendingFile(container.getFile(), LOAD_TIME);         
169                        }
170                    }
171                } else {
172                    if (container.getFile() != null) {
173                        LOG.info("Moving " + container.getFile() + " to loaded dir.");
174                        if((!completeDir.isDirectory() && !completeDir.mkdirs())
175                            || !moveFile(completeDir, container.getFile())){
176                            LOG.error("Could not move: " + container.getFile());
177                            recordUnmovablePendingFile(container.getFile(), LOAD_TIME);         
178                        }
179                    }
180                }
181            }
182        }
183    
184        private boolean inPendingMoveFailedArchive(File xmlDataFile){
185            if (xmlDataFile == null) return false;
186            BufferedReader inFile = null;
187            File movesFailedFile = new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE);
188            if (!movesFailedFile.isFile()) return false;
189            try {
190                inFile = new BufferedReader(new FileReader(movesFailedFile));
191                String line;
192                
193                while((line = inFile.readLine()) != null){
194                    String trimmedLine = line.trim();
195                    if(trimmedLine.equals(xmlDataFile.getName()) ||
196                       trimmedLine.startsWith(xmlDataFile.getName() + "=")) { 
197                        return true;
198                    }
199                }
200            } catch (IOException e){
201                LOG.warn("Error reading file " + movesFailedFile);
202                //TODO try reading the pending file or stop?
203            } finally {
204                if (inFile != null) try {
205                    inFile.close();
206                } catch (Exception e) {
207                    LOG.warn("Error closing buffered reader for " + movesFailedFile);
208                }
209            }
210          
211            return false;
212        }
213    
214        private boolean recordUnmovablePendingFile(File unMovablePendingFile, Date dateLoaded){
215            boolean recorded = false;
216            FileWriter archiveFile = null;
217            try{
218                archiveFile = new FileWriter(new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE), true);  
219                archiveFile.write(unMovablePendingFile.getName() + "=" + dateLoaded.getTime() + NEW_LINE);
220                recorded = true;
221            } catch (IOException e){
222                LOG.error("Unable to record unmovable pending file " + unMovablePendingFile.getName() + "in the archive file " + PENDING_MOVE_FAILED_ARCHIVE_FILE);
223            } finally {
224                if (archiveFile != null) {
225                    try {
226                        archiveFile.close();
227                    } catch (IOException ioe) {
228                        LOG.error("Error closing unmovable pending file", ioe);
229                    }
230                }
231            }
232            return recorded;       
233        }
234    
235        private boolean moveFile(File toDirectory, File fileToMove){
236            boolean moved = true;
237            if (!fileToMove.renameTo(new File(toDirectory.getPath(), fileToMove.getName()))){
238                LOG.error("Unable to move file " + fileToMove.getName() + " to directory " + toDirectory.getPath());
239                moved = false;
240            }
241            return moved;
242        }
243    
244        private File getXmlPendingDir() {
245            return new File(getXmlPendingLocation());
246        }
247    
248        private File getXmlCompleteDir() {
249            return new File(getXmlCompletedLocation());
250        }
251    
252        private File getXmlProblemDir() {
253            return new File(getXmlProblemLocation());
254        }
255    
256        public String getXmlCompletedLocation() {
257            return xmlCompletedLocation;
258        }
259    
260        public void setXmlCompletedLocation(String xmlCompletedLocation) {
261            this.xmlCompletedLocation = xmlCompletedLocation;
262        }
263    
264        public String getXmlPendingLocation() {
265            return xmlPendingLocation;
266        }
267    
268        /*public boolean validate(File uploadedFile) {
269            XmlDataLoaderFileFilter filter = new XmlDataLoaderFileFilter();
270            return filter.accept(uploadedFile);
271        }*/
272    
273        public void setXmlPendingLocation(String xmlPendingLocation) {
274            this.xmlPendingLocation = xmlPendingLocation;
275        }
276    
277        public String getXmlProblemLocation() {
278            return xmlProblemLocation;
279        }
280    
281        public void setXmlProblemLocation(String xmlProblemLocation) {
282            this.xmlProblemLocation = xmlProblemLocation;
283        }
284        public String getXmlParentDirectory() {
285            return xmlParentDirectory;
286        }
287        public void setXmlParentDirectory(String xmlDataParentDirectory) {
288            this.xmlParentDirectory = xmlDataParentDirectory;
289        }
290    
291        /**
292         * Sets the polling interval time in seconds
293         * @param seconds the polling interval time in seconds
294         */
295        public void setPollIntervalSecs(int seconds) {
296            this.pollIntervalSecs = seconds;
297        }
298    
299        /**
300         * Gets the polling interval time in seconds
301         * @return the polling interval time in seconds
302         */
303        public int getPollIntervalSecs() {
304            return this.pollIntervalSecs;
305        }
306    
307        /**
308         * Sets the initial delay time in seconds
309         * @param seconds the initial delay time in seconds
310         */
311        public void setInitialDelaySecs(int seconds) {
312            this.initialDelaySecs = seconds;
313        }
314    
315        /**
316         * Gets the initial delay time in seconds
317         * @return the initial delay time in seconds
318         */
319        public int getInitialDelaySecs() {
320            return this.initialDelaySecs;
321        }
322    }