001    /*
002     * Copyright 2005-2007 The Kuali Foundation
003     * 
004     * 
005     * Licensed under the Educational Community License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     * 
009     * http://www.opensource.org/licenses/ecl2.php
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.kuali.rice.kew.batch;
018    
019    import java.io.BufferedReader;
020    import java.io.File;
021    import java.io.FileReader;
022    import java.io.FileWriter;
023    import java.io.IOException;
024    import java.text.Format;
025    import java.text.SimpleDateFormat;
026    import java.util.ArrayList;
027    import java.util.Arrays;
028    import java.util.Calendar;
029    import java.util.Collection;
030    import java.util.Date;
031    import java.util.Iterator;
032    import java.util.List;
033    
034    import org.apache.commons.io.IOUtils;
035    import org.kuali.rice.kew.service.KEWServiceLocator;
036    
037    /**
038     * Utility class responsible for polling and ingesting XML data files containing various forms of workflow engine data
039     * (e.g. document types and rules). Loaded files and problem files are placed into a subdirectory of a configured
040     * 'loaded' and 'problem' directory, respectively. "Problem-ness" is determined by inspecting a 'processed' flag on each
041     * <code>XmlDoc</code> in each collection. If not all <code>XmlDoc</code>s are marked 'processed' an error is assumed,
042     * and the collection file (e.g. for a Zip, the Zip file) is moved to the 'problem' directory. As such, it is the <b>
043     * <code>XmlIngesterService</code>'s responsibility</b> to mark any unknown or otherwise innocuous non-failure
044     * non-processed files, as 'processed'. A different mechanism can be developed if this proves to be a problem, but for
045     * now it is simple enough for the <code>XmlIngesterService</code> to determine this.
046     * 
047     * @see org.kuali.rice.kew.batch.XmlPollerService
048     * @see org.kuali.rice.kew.batch.XmlIngesterServiceImpl
049     * @author Kuali Rice Team (rice.collab@kuali.org)
050     */
051    public class MyXMLPollerServiceImpl implements XmlPollerService {
052    
053        private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger.getLogger(XmlPollerServiceImpl.class);
054        private static final Format DIR_FORMAT = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss-SSS");
055    
056        /**
057         * Specifies the polling interval that should be used with this task.
058         */
059        private int pollIntervalSecs = 5 * 60; // default to 5 minutes
060        /**
061         * Specifies the initial delay the poller should wait before starting to poll
062         */
063        private int initialDelaySecs = 30; // default to 30 seconds
064        /**
065         * Location in which to find XML files to load.
066         */
067        private String xmlPendingLocation;
068        /**
069         * Location in which to place successfully loaded XML files.
070         */
071        private String xmlCompletedLocation;
072        /**
073         * Location in which to place XML files which have failed to load.
074         */
075        private String xmlProblemLocation;
076    
077        private String xmlParentDirectory;
078        private static final String PENDING_MOVE_FAILED_ARCHIVE_FILE = "movesfailed";
079        private static final String NEW_LINE = "\n";
080    
081        @Override
082        public void run() {
083            // if(!directoriesWritable()){
084            // LOG.error("Error writing to xml data directories. Stopping xmlLoader ...");
085            // this.cancel();
086            // }
087            LOG.debug("checking for xml data files...");
088            File[] files = getXmlPendingDir().listFiles();
089            if (files == null || files.length == 0) {
090                return;
091            }
092            // Sort them so the ordering is deterministic by file name instead of random
093            Arrays.sort(files);
094    
095            LOG.info("Found " + files.length + " files to ingest.");
096    
097            List<XmlDocCollection> collections = new ArrayList<XmlDocCollection>();
098            for (File file : files) {
099                if (file.isDirectory()) {
100                    collections.add(new DirectoryXmlDocCollection(file));
101                } else if (file.getName().equals(PENDING_MOVE_FAILED_ARCHIVE_FILE)) {
102                    // the movesfailed file...ignore this
103                    continue;
104                } else if (file.getName().toLowerCase().endsWith(".zip")) {
105                    try {
106                        collections.add(new ZipXmlDocCollection(file));
107                    } catch (IOException ioe) {
108                        LOG.error("Unable to load file: " + file);
109                    }
110                } else if (file.getName().endsWith(".xml")) {
111                    collections.add(new FileXmlDocCollection(file));
112                } else {
113                    LOG.warn("Ignoring extraneous file in xml pending directory: " + file);
114                }
115            }
116    
117            // Cull any resources which were already processed and whose moves failed
118            Iterator<?> collectionsIt = collections.iterator();
119            Collection<XmlDocCollection> culled = new ArrayList<XmlDocCollection>();
120            while (collectionsIt.hasNext()) {
121                XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
122                // if a move has already failed for this archive, ignore it
123                if (inPendingMoveFailedArchive(container.getFile())) {
124                    LOG.info("Ignoring previously processed resource: " + container);
125                    culled.add(container);
126                }
127            }
128            collections.removeAll(culled);
129    
130            if (collections.size() == 0) {
131                LOG.debug("No valid new resources found to ingest");
132                return;
133            }
134    
135            Date LOAD_TIME = Calendar.getInstance().getTime();
136            // synchronization around date format should not be an issue as this code is single-threaded
137            File completeDir = new File(getXmlCompleteDir(), DIR_FORMAT.format(LOAD_TIME));
138            File failedDir = new File(getXmlProblemDir(), DIR_FORMAT.format(LOAD_TIME));
139    
140            // now ingest the containers
141            Collection<?> failed = null;
142            try {
143                failed = KEWServiceLocator.getXmlIngesterService().ingest(collections);
144            } catch (Exception e) {
145                LOG.error("Error ingesting data", e);
146                // throw new RuntimeException(e);
147            }
148    
149            // now iterate through all containers again, and move containers to approprate dir
150            LOG.info("Moving files...");
151            collectionsIt = collections.iterator();
152            while (collectionsIt.hasNext()) {
153                XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
154                LOG.debug("container: " + container);
155                try {
156                    // "close" the container
157                    // this only matters for ZipFiles for now
158                    container.close();
159                } catch (IOException ioe) {
160                    LOG.warn("Error closing " + container, ioe);
161                }
162                if (failed.contains(container)) {
163                    // some docs must have failed, move the whole
164                    // container to the failed dir
165                    if (container.getFile() != null) {
166                        LOG.error("Moving " + container.getFile() + " to problem dir.");
167                        if ((!failedDir.isDirectory() && !failedDir.mkdirs()) || !moveFile(failedDir, container.getFile())) {
168                            LOG.error("Could not move: " + container.getFile());
169                            recordUnmovablePendingFile(container.getFile(), LOAD_TIME);
170                        }
171                    }
172                } else {
173                    if (container.getFile() != null) {
174                        LOG.info("Moving " + container.getFile() + " to loaded dir.");
175                        if ((!completeDir.isDirectory() && !completeDir.mkdirs())
176                                || !moveFile(completeDir, container.getFile())) {
177                            LOG.error("Could not move: " + container.getFile());
178                            recordUnmovablePendingFile(container.getFile(), LOAD_TIME);
179                        }
180                    }
181                }
182            }
183        }
184    
185        private boolean inPendingMoveFailedArchive(File xmlDataFile) {
186            if (xmlDataFile == null) {
187                return false;
188            }
189            File movesFailedFile = new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE);
190            if (!movesFailedFile.isFile()) {
191                return false;
192            }
193            BufferedReader inFile = null;
194            try {
195                inFile = new BufferedReader(new FileReader(movesFailedFile));
196                String line;
197                boolean found = false;
198                while ((line = inFile.readLine()) != null) {
199                    String trimmedLine = line.trim();
200                    if (trimmedLine.equals(xmlDataFile.getName()) || trimmedLine.startsWith(xmlDataFile.getName() + "=")) {
201                        found = true;
202                        break;
203                    }
204                }
205                return found;
206            } catch (IOException e) {
207                LOG.warn("Error reading file " + movesFailedFile);
208            } finally {
209                IOUtils.closeQuietly(inFile);
210            }
211            return false;
212        }
213    
214        private boolean recordUnmovablePendingFile(File unMovablePendingFile, Date dateLoaded) {
215            boolean recorded = false;
216            FileWriter archiveFile = null;
217            try {
218                archiveFile = new FileWriter(new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE), true);
219                archiveFile.write(unMovablePendingFile.getName() + "=" + dateLoaded.getTime() + NEW_LINE);
220                recorded = true;
221            } catch (IOException e) {
222                LOG.error("Unable to record unmovable pending file " + unMovablePendingFile.getName()
223                        + "in the archive file " + PENDING_MOVE_FAILED_ARCHIVE_FILE);
224            } finally {
225                if (archiveFile != null) {
226                    try {
227                        archiveFile.close();
228                    } catch (IOException ioe) {
229                        LOG.error("Error closing unmovable pending file", ioe);
230                    }
231                }
232            }
233            return recorded;
234        }
235    
236        private boolean moveFile(File toDirectory, File fileToMove) {
237            boolean moved = true;
238            if (!fileToMove.renameTo(new File(toDirectory.getPath(), fileToMove.getName()))) {
239                LOG.error("Unable to move file " + fileToMove.getName() + " to directory " + toDirectory.getPath());
240                moved = false;
241            }
242            return moved;
243        }
244    
245        private File getXmlPendingDir() {
246            return new File(getXmlPendingLocation());
247        }
248    
249        private File getXmlCompleteDir() {
250            return new File(getXmlCompletedLocation());
251        }
252    
253        private File getXmlProblemDir() {
254            return new File(getXmlProblemLocation());
255        }
256    
257        public String getXmlCompletedLocation() {
258            return xmlCompletedLocation;
259        }
260    
261        public void setXmlCompletedLocation(String xmlCompletedLocation) {
262            this.xmlCompletedLocation = xmlCompletedLocation;
263        }
264    
265        public String getXmlPendingLocation() {
266            return xmlPendingLocation;
267        }
268    
269        /*
270         * public boolean validate(File uploadedFile) { XmlDataLoaderFileFilter filter = new XmlDataLoaderFileFilter();
271         * return filter.accept(uploadedFile); }
272         */
273    
274        public void setXmlPendingLocation(String xmlPendingLocation) {
275            this.xmlPendingLocation = xmlPendingLocation;
276        }
277    
278        public String getXmlProblemLocation() {
279            return xmlProblemLocation;
280        }
281    
282        public void setXmlProblemLocation(String xmlProblemLocation) {
283            this.xmlProblemLocation = xmlProblemLocation;
284        }
285    
286        public String getXmlParentDirectory() {
287            return xmlParentDirectory;
288        }
289    
290        public void setXmlParentDirectory(String xmlDataParentDirectory) {
291            this.xmlParentDirectory = xmlDataParentDirectory;
292        }
293    
294        /**
295         * Sets the polling interval time in seconds
296         * 
297         * @param seconds
298         *            the polling interval time in seconds
299         */
300        public void setPollIntervalSecs(int seconds) {
301            this.pollIntervalSecs = seconds;
302        }
303    
304        /**
305         * Gets the polling interval time in seconds
306         * 
307         * @return the polling interval time in seconds
308         */
309        @Override
310        public int getPollIntervalSecs() {
311            return this.pollIntervalSecs;
312        }
313    
314        /**
315         * Sets the initial delay time in seconds
316         * 
317         * @param seconds
318         *            the initial delay time in seconds
319         */
320        public void setInitialDelaySecs(int seconds) {
321            this.initialDelaySecs = seconds;
322        }
323    
324        /**
325         * Gets the initial delay time in seconds
326         * 
327         * @return the initial delay time in seconds
328         */
329        @Override
330        public int getInitialDelaySecs() {
331            return this.initialDelaySecs;
332        }
333    }