001    /**
002     * Copyright 2005-2013 The Kuali Foundation
003     *
004     * Licensed under the Educational Community License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.opensource.org/licenses/ecl2.php
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package org.kuali.rice.kew.batch;
017    
018    import org.kuali.rice.core.api.CoreApiServiceLocator;
019    import org.kuali.rice.core.api.impex.xml.DirectoryXmlDocCollection;
020    import org.kuali.rice.core.api.impex.xml.FileXmlDocCollection;
021    import org.kuali.rice.core.api.impex.xml.XmlDocCollection;
022    import org.kuali.rice.core.api.impex.xml.ZipXmlDocCollection;
023    
024    import java.io.BufferedReader;
025    import java.io.File;
026    import java.io.FileReader;
027    import java.io.FileWriter;
028    import java.io.IOException;
029    import java.text.Format;
030    import java.text.SimpleDateFormat;
031    import java.util.ArrayList;
032    import java.util.Calendar;
033    import java.util.Collection;
034    import java.util.Date;
035    import java.util.Iterator;
036    import java.util.List;
037    
038    
039    /**
040     * Utility class responsible for polling and ingesting XML data files
041     * containing various forms of workflow engine data (e.g. document types
042     * and rules).
043     * Loaded files and problem files are placed into a subdirectory of a
044     * configured 'loaded' and 'problem' directory, respectively.
045     * "Problem-ness" is determined by inspecting a 'processed' flag on each <code>XmlDoc</code>
046     * in each collection.  If not all <code>XmlDoc</code>s are marked 'processed' an
047     * error is assumed, and the collection file (e.g. for a Zip, the Zip file) is moved
048     * to the 'problem' directory.
049     * As such, it is the <b><code>XmlIngesterService</code>'s responsibility</b> to mark
050     * any unknown or otherwise innocuous non-failure non-processed files, as 'processed'.
051     * A different mechanism can be developed if this proves to be a problem, but for now
052     * it is simple enough for the <code>XmlIngesterService</code> to determine this.
053     * @see org.kuali.rice.kew.batch.XmlPollerService
054     * @see org.kuali.rice.kew.batch.XmlIngesterServiceImpl
055     * @author Kuali Rice Team (rice.collab@kuali.org)
056     */
057    public class XmlPollerServiceImpl implements XmlPollerService {
058    
059        private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger
060                .getLogger(XmlPollerServiceImpl.class);
061        private static final Format DIR_FORMAT = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss-SSS");
062    
063        /**
064         * Specifies the polling interval that should be used with this task.
065         */
066        private int pollIntervalSecs = 5 * 60; // default to 5 minutes
067        /**
068         * Specifies the initial delay the poller should wait before starting to poll
069         */
070        private int initialDelaySecs = 30; // default to 30 seconds
071        /**
072         * Location in which to find XML files to load.
073         */
074        private String xmlPendingLocation;
075        /**
076         * Location in which to place successfully loaded XML files.
077         */
078        private String xmlCompletedLocation;
079        /**
080         * Location in which to place XML files which have failed to load.
081         */
082        private String xmlProblemLocation;
083        
084        private String xmlParentDirectory;
085        private static final String PENDING_MOVE_FAILED_ARCHIVE_FILE = "movesfailed";
086        private static final String NEW_LINE = "\n";
087    
088        public void run() {
089            // if(!directoriesWritable()){
090            //     LOG.error("Error writing to xml data directories. Stopping xmlLoader ...");
091            //     this.cancel();
092            // }
093            LOG.debug("checking for xml data files...");
094            File[] files = getXmlPendingDir().listFiles();
095            if (files == null || files.length == 0) {
096                    return;
097            }
098            LOG.info("Found " + files.length + " files to ingest.");
099            List<XmlDocCollection> collections = new ArrayList<XmlDocCollection>();
100            for (File file : files)
101            {
102                if (file.isDirectory())
103                {
104                    collections.add(new DirectoryXmlDocCollection(file));
105                } else if (file.getName().equals(PENDING_MOVE_FAILED_ARCHIVE_FILE))
106                {
107                    // the movesfailed file...ignore this
108                    continue;
109                } else if (file.getName().toLowerCase().endsWith(".zip"))
110                {
111                    try
112                    {
113                        collections.add(new ZipXmlDocCollection(file));
114                    } catch (IOException ioe)
115                    {
116                        LOG.error("Unable to load file: " + file);
117                    }
118                } else if (file.getName().endsWith(".xml"))
119                {
120                    collections.add(new FileXmlDocCollection(file));
121                } else
122                {
123                    LOG.warn("Ignoring extraneous file in xml pending directory: " + file);
124                }
125            }
126    
127            // Cull any resources which were already processed and whose moves failed
128            Iterator collectionsIt = collections.iterator();
129            Collection<XmlDocCollection> culled = new ArrayList<XmlDocCollection>();
130            while (collectionsIt.hasNext()) {
131                XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
132                // if a move has already failed for this archive, ignore it
133                if (inPendingMoveFailedArchive(container.getFile())) {
134                    LOG.info("Ignoring previously processed resource: " + container);
135                    culled.add(container);
136                }
137            }
138            collections.removeAll(culled);
139    
140            if (collections.size() == 0) {
141                LOG.debug("No valid new resources found to ingest");
142                return;
143            }
144    
145            Date LOAD_TIME = Calendar.getInstance().getTime();
146            // synchronization around date format should not be an issue as this code is single-threaded
147            File completeDir = new File(getXmlCompleteDir(), DIR_FORMAT.format(LOAD_TIME));
148            File failedDir = new File(getXmlProblemDir(), DIR_FORMAT.format(LOAD_TIME));
149    
150            // now ingest the containers
151            Collection failed = null;
152            try {
153                failed = CoreApiServiceLocator.getXmlIngesterService().ingest(collections);
154            } catch (Exception e) {
155                LOG.error("Error ingesting data", e);
156                //throw new RuntimeException(e);
157            }
158        
159            // now iterate through all containers again, and move containers to approprate dir
160            LOG.info("Moving files...");
161            collectionsIt = collections.iterator();
162            while (collectionsIt.hasNext()) {
163                XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
164                LOG.debug("container: " + container);
165                try {
166                    // "close" the container
167                    // this only matters for ZipFiles for now
168                    container.close();
169                } catch (IOException ioe) {
170                    LOG.warn("Error closing " + container, ioe);
171                }
172                if (failed.contains(container)) {
173                    // some docs must have failed, move the whole
174                    // container to the failed dir
175                    if (container.getFile() != null) {
176                        LOG.error("Moving " + container.getFile() + " to problem dir.");
177                        if ((!failedDir.isDirectory() && !failedDir.mkdirs())
178                            || !moveFile(failedDir, container.getFile())) {
179                            LOG.error("Could not move: " + container.getFile());
180                            recordUnmovablePendingFile(container.getFile(), LOAD_TIME);         
181                        }
182                    }
183                } else {
184                    if (container.getFile() != null) {
185                        LOG.info("Moving " + container.getFile() + " to loaded dir.");
186                        if((!completeDir.isDirectory() && !completeDir.mkdirs())
187                            || !moveFile(completeDir, container.getFile())){
188                            LOG.error("Could not move: " + container.getFile());
189                            recordUnmovablePendingFile(container.getFile(), LOAD_TIME);         
190                        }
191                    }
192                }
193            }
194        }
195    
196        private boolean inPendingMoveFailedArchive(File xmlDataFile){
197            if (xmlDataFile == null) return false;
198            BufferedReader inFile = null;
199            File movesFailedFile = new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE);
200            if (!movesFailedFile.isFile()) return false;
201            try {
202                inFile = new BufferedReader(new FileReader(movesFailedFile));
203                String line;
204                
205                while((line = inFile.readLine()) != null){
206                    String trimmedLine = line.trim();
207                    if(trimmedLine.equals(xmlDataFile.getName()) ||
208                       trimmedLine.startsWith(xmlDataFile.getName() + "=")) { 
209                        return true;
210                    }
211                }
212            } catch (IOException e){
213                LOG.warn("Error reading file " + movesFailedFile);
214                //TODO try reading the pending file or stop?
215            } finally {
216                if (inFile != null) try {
217                    inFile.close();
218                } catch (Exception e) {
219                    LOG.warn("Error closing buffered reader for " + movesFailedFile);
220                }
221            }
222          
223            return false;
224        }
225    
226        private boolean recordUnmovablePendingFile(File unMovablePendingFile, Date dateLoaded){
227            boolean recorded = false;
228            FileWriter archiveFile = null;
229            try{
230                archiveFile = new FileWriter(new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE), true);  
231                archiveFile.write(unMovablePendingFile.getName() + "=" + dateLoaded.getTime() + NEW_LINE);
232                recorded = true;
233            } catch (IOException e){
234                LOG.error("Unable to record unmovable pending file " + unMovablePendingFile.getName() + "in the archive file " + PENDING_MOVE_FAILED_ARCHIVE_FILE);
235            } finally {
236                if (archiveFile != null) {
237                    try {
238                        archiveFile.close();
239                    } catch (IOException ioe) {
240                        LOG.error("Error closing unmovable pending file", ioe);
241                    }
242                }
243            }
244            return recorded;       
245        }
246    
247        private boolean moveFile(File toDirectory, File fileToMove){
248            boolean moved = true;
249            if (!fileToMove.renameTo(new File(toDirectory.getPath(), fileToMove.getName()))){
250                LOG.error("Unable to move file " + fileToMove.getName() + " to directory " + toDirectory.getPath());
251                moved = false;
252            }
253            return moved;
254        }
255    
256        private File getXmlPendingDir() {
257            return new File(getXmlPendingLocation());
258        }
259    
260        private File getXmlCompleteDir() {
261            return new File(getXmlCompletedLocation());
262        }
263    
264        private File getXmlProblemDir() {
265            return new File(getXmlProblemLocation());
266        }
267    
268        public String getXmlCompletedLocation() {
269            return xmlCompletedLocation;
270        }
271    
272        public void setXmlCompletedLocation(String xmlCompletedLocation) {
273            this.xmlCompletedLocation = xmlCompletedLocation;
274        }
275    
276        public String getXmlPendingLocation() {
277            return xmlPendingLocation;
278        }
279    
280        /*public boolean validate(File uploadedFile) {
281            XmlDataLoaderFileFilter filter = new XmlDataLoaderFileFilter();
282            return filter.accept(uploadedFile);
283        }*/
284    
285        public void setXmlPendingLocation(String xmlPendingLocation) {
286            this.xmlPendingLocation = xmlPendingLocation;
287        }
288    
289        public String getXmlProblemLocation() {
290            return xmlProblemLocation;
291        }
292    
293        public void setXmlProblemLocation(String xmlProblemLocation) {
294            this.xmlProblemLocation = xmlProblemLocation;
295        }
296        public String getXmlParentDirectory() {
297            return xmlParentDirectory;
298        }
299        public void setXmlParentDirectory(String xmlDataParentDirectory) {
300            this.xmlParentDirectory = xmlDataParentDirectory;
301        }
302    
303        /**
304         * Sets the polling interval time in seconds
305         * @param seconds the polling interval time in seconds
306         */
307        public void setPollIntervalSecs(int seconds) {
308            this.pollIntervalSecs = seconds;
309        }
310    
311        /**
312         * Gets the polling interval time in seconds
313         * @return the polling interval time in seconds
314         */
315        public int getPollIntervalSecs() {
316            return this.pollIntervalSecs;
317        }
318    
319        /**
320         * Sets the initial delay time in seconds
321         * @param seconds the initial delay time in seconds
322         */
323        public void setInitialDelaySecs(int seconds) {
324            this.initialDelaySecs = seconds;
325        }
326    
327        /**
328         * Gets the initial delay time in seconds
329         * @return the initial delay time in seconds
330         */
331        public int getInitialDelaySecs() {
332            return this.initialDelaySecs;
333        }
334    }