001/**
002 * Copyright 2005-2015 The Kuali Foundation
003 *
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/ecl2.php
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.kuali.rice.kew.batch;
017
018import org.kuali.rice.core.api.CoreApiServiceLocator;
019import org.kuali.rice.core.api.impex.xml.DirectoryXmlDocCollection;
020import org.kuali.rice.core.api.impex.xml.FileXmlDocCollection;
021import org.kuali.rice.core.api.impex.xml.XmlDocCollection;
022import org.kuali.rice.core.api.impex.xml.ZipXmlDocCollection;
023
024import java.io.BufferedReader;
025import java.io.File;
026import java.io.FileReader;
027import java.io.FileWriter;
028import java.io.IOException;
029import java.text.Format;
030import java.text.SimpleDateFormat;
031import java.util.ArrayList;
032import java.util.Calendar;
033import java.util.Collection;
034import java.util.Date;
035import java.util.Iterator;
036import java.util.List;
037
038
039/**
040 * Utility class responsible for polling and ingesting XML data files
041 * containing various forms of workflow engine data (e.g. document types
042 * and rules).
043 * Loaded files and problem files are placed into a subdirectory of a
044 * configured 'loaded' and 'problem' directory, respectively.
045 * "Problem-ness" is determined by inspecting a 'processed' flag on each <code>XmlDoc</code>
046 * in each collection.  If not all <code>XmlDoc</code>s are marked 'processed' an
047 * error is assumed, and the collection file (e.g. for a Zip, the Zip file) is moved
048 * to the 'problem' directory.
049 * As such, it is the <b><code>XmlIngesterService</code>'s responsibility</b> to mark
050 * any unknown or otherwise innocuous non-failure non-processed files, as 'processed'.
051 * A different mechanism can be developed if this proves to be a problem, but for now
052 * it is simple enough for the <code>XmlIngesterService</code> to determine this.
053 * @see org.kuali.rice.kew.batch.XmlPollerService
054 * @see org.kuali.rice.kew.batch.XmlIngesterServiceImpl
055 * @author Kuali Rice Team (rice.collab@kuali.org)
056 */
057public class XmlPollerServiceImpl implements XmlPollerService {
058
059    private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger
060            .getLogger(XmlPollerServiceImpl.class);
061    private static final Format DIR_FORMAT = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss-SSS");
062
063    /**
064     * Specifies the polling interval that should be used with this task.
065     */
066    private int pollIntervalSecs = 5 * 60; // default to 5 minutes
067    /**
068     * Specifies the initial delay the poller should wait before starting to poll
069     */
070    private int initialDelaySecs = 30; // default to 30 seconds
071    /**
072     * Location in which to find XML files to load.
073     */
074    private String xmlPendingLocation;
075    /**
076     * Location in which to place successfully loaded XML files.
077     */
078    private String xmlCompletedLocation;
079    /**
080     * Location in which to place XML files which have failed to load.
081     */
082    private String xmlProblemLocation;
083    
084    private String xmlParentDirectory;
085    private static final String PENDING_MOVE_FAILED_ARCHIVE_FILE = "movesfailed";
086    private static final String NEW_LINE = "\n";
087
088    public void run() {
089        // if(!directoriesWritable()){
090        //     LOG.error("Error writing to xml data directories. Stopping xmlLoader ...");
091        //     this.cancel();
092        // }
093        LOG.debug("checking for xml data files...");
094        File[] files = getXmlPendingDir().listFiles();
095        if (files == null || files.length == 0) {
096                return;
097        }
098        LOG.info("Found " + files.length + " files to ingest.");
099        List<XmlDocCollection> collections = new ArrayList<XmlDocCollection>();
100        for (File file : files)
101        {
102            if (file.isDirectory())
103            {
104                collections.add(new DirectoryXmlDocCollection(file));
105            } else if (file.getName().equals(PENDING_MOVE_FAILED_ARCHIVE_FILE))
106            {
107                // the movesfailed file...ignore this
108                continue;
109            } else if (file.getName().toLowerCase().endsWith(".zip"))
110            {
111                try
112                {
113                    collections.add(new ZipXmlDocCollection(file));
114                } catch (IOException ioe)
115                {
116                    LOG.error("Unable to load file: " + file);
117                }
118            } else if (file.getName().endsWith(".xml"))
119            {
120                collections.add(new FileXmlDocCollection(file));
121            } else
122            {
123                LOG.warn("Ignoring extraneous file in xml pending directory: " + file);
124            }
125        }
126
127        // Cull any resources which were already processed and whose moves failed
128        Iterator collectionsIt = collections.iterator();
129        Collection<XmlDocCollection> culled = new ArrayList<XmlDocCollection>();
130        while (collectionsIt.hasNext()) {
131            XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
132            // if a move has already failed for this archive, ignore it
133            if (inPendingMoveFailedArchive(container.getFile())) {
134                LOG.info("Ignoring previously processed resource: " + container);
135                culled.add(container);
136            }
137        }
138        collections.removeAll(culled);
139
140        if (collections.size() == 0) {
141            LOG.debug("No valid new resources found to ingest");
142            return;
143        }
144
145        Date LOAD_TIME = Calendar.getInstance().getTime();
146        // synchronization around date format should not be an issue as this code is single-threaded
147        File completeDir = new File(getXmlCompleteDir(), DIR_FORMAT.format(LOAD_TIME));
148        File failedDir = new File(getXmlProblemDir(), DIR_FORMAT.format(LOAD_TIME));
149
150        // now ingest the containers
151        Collection failed = null;
152        try {
153            failed = CoreApiServiceLocator.getXmlIngesterService().ingest(collections);
154        } catch (Exception e) {
155            LOG.error("Error ingesting data", e);
156            //throw new RuntimeException(e);
157        }
158    
159        // now iterate through all containers again, and move containers to approprate dir
160        LOG.info("Moving files...");
161        collectionsIt = collections.iterator();
162        while (collectionsIt.hasNext()) {
163            XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
164            LOG.debug("container: " + container);
165            try {
166                // "close" the container
167                // this only matters for ZipFiles for now
168                container.close();
169            } catch (IOException ioe) {
170                LOG.warn("Error closing " + container, ioe);
171            }
172            if (failed.contains(container)) {
173                // some docs must have failed, move the whole
174                // container to the failed dir
175                if (container.getFile() != null) {
176                    LOG.error("Moving " + container.getFile() + " to problem dir.");
177                    if ((!failedDir.isDirectory() && !failedDir.mkdirs())
178                        || !moveFile(failedDir, container.getFile())) {
179                        LOG.error("Could not move: " + container.getFile());
180                        recordUnmovablePendingFile(container.getFile(), LOAD_TIME);         
181                    }
182                }
183            } else {
184                if (container.getFile() != null) {
185                    LOG.info("Moving " + container.getFile() + " to loaded dir.");
186                    if((!completeDir.isDirectory() && !completeDir.mkdirs())
187                        || !moveFile(completeDir, container.getFile())){
188                        LOG.error("Could not move: " + container.getFile());
189                        recordUnmovablePendingFile(container.getFile(), LOAD_TIME);         
190                    }
191                }
192            }
193        }
194    }
195
196    private boolean inPendingMoveFailedArchive(File xmlDataFile){
197        if (xmlDataFile == null) return false;
198        BufferedReader inFile = null;
199        File movesFailedFile = new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE);
200        if (!movesFailedFile.isFile()) return false;
201        try {
202            inFile = new BufferedReader(new FileReader(movesFailedFile));
203            String line;
204            
205            while((line = inFile.readLine()) != null){
206                String trimmedLine = line.trim();
207                if(trimmedLine.equals(xmlDataFile.getName()) ||
208                   trimmedLine.startsWith(xmlDataFile.getName() + "=")) { 
209                    return true;
210                }
211            }
212        } catch (IOException e){
213            LOG.warn("Error reading file " + movesFailedFile);
214            //TODO try reading the pending file or stop?
215        } finally {
216            if (inFile != null) try {
217                inFile.close();
218            } catch (Exception e) {
219                LOG.warn("Error closing buffered reader for " + movesFailedFile);
220            }
221        }
222      
223        return false;
224    }
225
226    private boolean recordUnmovablePendingFile(File unMovablePendingFile, Date dateLoaded){
227        boolean recorded = false;
228        FileWriter archiveFile = null;
229        try{
230            archiveFile = new FileWriter(new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE), true);  
231            archiveFile.write(unMovablePendingFile.getName() + "=" + dateLoaded.getTime() + NEW_LINE);
232            recorded = true;
233        } catch (IOException e){
234            LOG.error("Unable to record unmovable pending file " + unMovablePendingFile.getName() + "in the archive file " + PENDING_MOVE_FAILED_ARCHIVE_FILE);
235        } finally {
236            if (archiveFile != null) {
237                try {
238                    archiveFile.close();
239                } catch (IOException ioe) {
240                    LOG.error("Error closing unmovable pending file", ioe);
241                }
242            }
243        }
244        return recorded;       
245    }
246
247    private boolean moveFile(File toDirectory, File fileToMove){
248        boolean moved = true;
249        if (!fileToMove.renameTo(new File(toDirectory.getPath(), fileToMove.getName()))){
250            LOG.error("Unable to move file " + fileToMove.getName() + " to directory " + toDirectory.getPath());
251            moved = false;
252        }
253        return moved;
254    }
255
256    private File getXmlPendingDir() {
257        return new File(getXmlPendingLocation());
258    }
259
260    private File getXmlCompleteDir() {
261        return new File(getXmlCompletedLocation());
262    }
263
264    private File getXmlProblemDir() {
265        return new File(getXmlProblemLocation());
266    }
267
268    public String getXmlCompletedLocation() {
269        return xmlCompletedLocation;
270    }
271
272    public void setXmlCompletedLocation(String xmlCompletedLocation) {
273        this.xmlCompletedLocation = xmlCompletedLocation;
274    }
275
276    public String getXmlPendingLocation() {
277        return xmlPendingLocation;
278    }
279
280    /*public boolean validate(File uploadedFile) {
281        XmlDataLoaderFileFilter filter = new XmlDataLoaderFileFilter();
282        return filter.accept(uploadedFile);
283    }*/
284
285    public void setXmlPendingLocation(String xmlPendingLocation) {
286        this.xmlPendingLocation = xmlPendingLocation;
287    }
288
289    public String getXmlProblemLocation() {
290        return xmlProblemLocation;
291    }
292
293    public void setXmlProblemLocation(String xmlProblemLocation) {
294        this.xmlProblemLocation = xmlProblemLocation;
295    }
296    public String getXmlParentDirectory() {
297        return xmlParentDirectory;
298    }
299    public void setXmlParentDirectory(String xmlDataParentDirectory) {
300        this.xmlParentDirectory = xmlDataParentDirectory;
301    }
302
303    /**
304     * Sets the polling interval time in seconds
305     * @param seconds the polling interval time in seconds
306     */
307    public void setPollIntervalSecs(int seconds) {
308        this.pollIntervalSecs = seconds;
309    }
310
311    /**
312     * Gets the polling interval time in seconds
313     * @return the polling interval time in seconds
314     */
315    public int getPollIntervalSecs() {
316        return this.pollIntervalSecs;
317    }
318
319    /**
320     * Sets the initial delay time in seconds
321     * @param seconds the initial delay time in seconds
322     */
323    public void setInitialDelaySecs(int seconds) {
324        this.initialDelaySecs = seconds;
325    }
326
327    /**
328     * Gets the initial delay time in seconds
329     * @return the initial delay time in seconds
330     */
331    public int getInitialDelaySecs() {
332        return this.initialDelaySecs;
333    }
334}