001 /* 002 * Copyright 2005-2007 The Kuali Foundation 003 * 004 * 005 * Licensed under the Educational Community License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.opensource.org/licenses/ecl2.php 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.kuali.rice.kew.batch; 018 019 import org.kuali.rice.kew.service.KEWServiceLocator; 020 021 import java.io.*; 022 import java.text.Format; 023 import java.text.SimpleDateFormat; 024 import java.util.*; 025 026 027 /** 028 * Utility class responsible for polling and ingesting XML data files 029 * containing various forms of workflow engine data (e.g. document types 030 * and rules). 031 * Loaded files and problem files are placed into a subdirectory of a 032 * configured 'loaded' and 'problem' directory, respectively. 033 * "Problem-ness" is determined by inspecting a 'processed' flag on each <code>XmlDoc</code> 034 * in each collection. If not all <code>XmlDoc</code>s are marked 'processed' an 035 * error is assumed, and the collection file (e.g. for a Zip, the Zip file) is moved 036 * to the 'problem' directory. 037 * As such, it is the <b><code>XmlIngesterService</code>'s responsibility</b> to mark 038 * any unknown or otherwise innocuous non-failure non-processed files, as 'processed'. 039 * A different mechanism can be developed if this proves to be a problem, but for now 040 * it is simple enough for the <code>XmlIngesterService</code> to determine this. 041 * @see org.kuali.rice.kew.batch.XmlPollerService 042 * @see org.kuali.rice.kew.batch.XmlIngesterServiceImpl 043 * @author Kuali Rice Team (rice.collab@kuali.org) 044 */ 045 public class XmlPollerServiceImpl implements XmlPollerService { 046 047 private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger 048 .getLogger(XmlPollerServiceImpl.class); 049 private static final Format DIR_FORMAT = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss-SSS"); 050 051 /** 052 * Specifies the polling interval that should be used with this task. 053 */ 054 private int pollIntervalSecs = 5 * 60; // default to 5 minutes 055 /** 056 * Specifies the initial delay the poller should wait before starting to poll 057 */ 058 private int initialDelaySecs = 30; // default to 30 seconds 059 /** 060 * Location in which to find XML files to load. 061 */ 062 private String xmlPendingLocation; 063 /** 064 * Location in which to place successfully loaded XML files. 065 */ 066 private String xmlCompletedLocation; 067 /** 068 * Location in which to place XML files which have failed to load. 069 */ 070 private String xmlProblemLocation; 071 072 private String xmlParentDirectory; 073 private static final String PENDING_MOVE_FAILED_ARCHIVE_FILE = "movesfailed"; 074 private static final String NEW_LINE = "\n"; 075 076 public void run() { 077 // if(!directoriesWritable()){ 078 // LOG.error("Error writing to xml data directories. Stopping xmlLoader ..."); 079 // this.cancel(); 080 // } 081 LOG.debug("checking for xml data files..."); 082 File[] files = getXmlPendingDir().listFiles(); 083 if (files == null || files.length == 0) { 084 return; 085 } 086 LOG.info("Found " + files.length + " files to ingest."); 087 List<XmlDocCollection> collections = new ArrayList<XmlDocCollection>(); 088 for (File file : files) 089 { 090 if (file.isDirectory()) 091 { 092 collections.add(new DirectoryXmlDocCollection(file)); 093 } else if (file.getName().equals(PENDING_MOVE_FAILED_ARCHIVE_FILE)) 094 { 095 // the movesfailed file...ignore this 096 continue; 097 } else if (file.getName().toLowerCase().endsWith(".zip")) 098 { 099 try 100 { 101 collections.add(new ZipXmlDocCollection(file)); 102 } catch (IOException ioe) 103 { 104 LOG.error("Unable to load file: " + file); 105 } 106 } else if (file.getName().endsWith(".xml")) 107 { 108 collections.add(new FileXmlDocCollection(file)); 109 } else 110 { 111 LOG.warn("Ignoring extraneous file in xml pending directory: " + file); 112 } 113 } 114 115 // Cull any resources which were already processed and whose moves failed 116 Iterator collectionsIt = collections.iterator(); 117 Collection<XmlDocCollection> culled = new ArrayList<XmlDocCollection>(); 118 while (collectionsIt.hasNext()) { 119 XmlDocCollection container = (XmlDocCollection) collectionsIt.next(); 120 // if a move has already failed for this archive, ignore it 121 if (inPendingMoveFailedArchive(container.getFile())) { 122 LOG.info("Ignoring previously processed resource: " + container); 123 culled.add(container); 124 } 125 } 126 collections.removeAll(culled); 127 128 if (collections.size() == 0) { 129 LOG.debug("No valid new resources found to ingest"); 130 return; 131 } 132 133 Date LOAD_TIME = Calendar.getInstance().getTime(); 134 // synchronization around date format should not be an issue as this code is single-threaded 135 File completeDir = new File(getXmlCompleteDir(), DIR_FORMAT.format(LOAD_TIME)); 136 File failedDir = new File(getXmlProblemDir(), DIR_FORMAT.format(LOAD_TIME)); 137 138 // now ingest the containers 139 Collection failed = null; 140 try { 141 failed = KEWServiceLocator.getXmlIngesterService().ingest(collections); 142 } catch (Exception e) { 143 LOG.error("Error ingesting data", e); 144 //throw new RuntimeException(e); 145 } 146 147 // now iterate through all containers again, and move containers to approprate dir 148 LOG.info("Moving files..."); 149 collectionsIt = collections.iterator(); 150 while (collectionsIt.hasNext()) { 151 XmlDocCollection container = (XmlDocCollection) collectionsIt.next(); 152 LOG.debug("container: " + container); 153 try { 154 // "close" the container 155 // this only matters for ZipFiles for now 156 container.close(); 157 } catch (IOException ioe) { 158 LOG.warn("Error closing " + container, ioe); 159 } 160 if (failed.contains(container)) { 161 // some docs must have failed, move the whole 162 // container to the failed dir 163 if (container.getFile() != null) { 164 LOG.error("Moving " + container.getFile() + " to problem dir."); 165 if ((!failedDir.isDirectory() && !failedDir.mkdirs()) 166 || !moveFile(failedDir, container.getFile())) { 167 LOG.error("Could not move: " + container.getFile()); 168 recordUnmovablePendingFile(container.getFile(), LOAD_TIME); 169 } 170 } 171 } else { 172 if (container.getFile() != null) { 173 LOG.info("Moving " + container.getFile() + " to loaded dir."); 174 if((!completeDir.isDirectory() && !completeDir.mkdirs()) 175 || !moveFile(completeDir, container.getFile())){ 176 LOG.error("Could not move: " + container.getFile()); 177 recordUnmovablePendingFile(container.getFile(), LOAD_TIME); 178 } 179 } 180 } 181 } 182 } 183 184 private boolean inPendingMoveFailedArchive(File xmlDataFile){ 185 if (xmlDataFile == null) return false; 186 BufferedReader inFile = null; 187 File movesFailedFile = new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE); 188 if (!movesFailedFile.isFile()) return false; 189 try { 190 inFile = new BufferedReader(new FileReader(movesFailedFile)); 191 String line; 192 193 while((line = inFile.readLine()) != null){ 194 String trimmedLine = line.trim(); 195 if(trimmedLine.equals(xmlDataFile.getName()) || 196 trimmedLine.startsWith(xmlDataFile.getName() + "=")) { 197 return true; 198 } 199 } 200 } catch (IOException e){ 201 LOG.warn("Error reading file " + movesFailedFile); 202 //TODO try reading the pending file or stop? 203 } finally { 204 if (inFile != null) try { 205 inFile.close(); 206 } catch (Exception e) { 207 LOG.warn("Error closing buffered reader for " + movesFailedFile); 208 } 209 } 210 211 return false; 212 } 213 214 private boolean recordUnmovablePendingFile(File unMovablePendingFile, Date dateLoaded){ 215 boolean recorded = false; 216 FileWriter archiveFile = null; 217 try{ 218 archiveFile = new FileWriter(new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE), true); 219 archiveFile.write(unMovablePendingFile.getName() + "=" + dateLoaded.getTime() + NEW_LINE); 220 recorded = true; 221 } catch (IOException e){ 222 LOG.error("Unable to record unmovable pending file " + unMovablePendingFile.getName() + "in the archive file " + PENDING_MOVE_FAILED_ARCHIVE_FILE); 223 } finally { 224 if (archiveFile != null) { 225 try { 226 archiveFile.close(); 227 } catch (IOException ioe) { 228 LOG.error("Error closing unmovable pending file", ioe); 229 } 230 } 231 } 232 return recorded; 233 } 234 235 private boolean moveFile(File toDirectory, File fileToMove){ 236 boolean moved = true; 237 if (!fileToMove.renameTo(new File(toDirectory.getPath(), fileToMove.getName()))){ 238 LOG.error("Unable to move file " + fileToMove.getName() + " to directory " + toDirectory.getPath()); 239 moved = false; 240 } 241 return moved; 242 } 243 244 private File getXmlPendingDir() { 245 return new File(getXmlPendingLocation()); 246 } 247 248 private File getXmlCompleteDir() { 249 return new File(getXmlCompletedLocation()); 250 } 251 252 private File getXmlProblemDir() { 253 return new File(getXmlProblemLocation()); 254 } 255 256 public String getXmlCompletedLocation() { 257 return xmlCompletedLocation; 258 } 259 260 public void setXmlCompletedLocation(String xmlCompletedLocation) { 261 this.xmlCompletedLocation = xmlCompletedLocation; 262 } 263 264 public String getXmlPendingLocation() { 265 return xmlPendingLocation; 266 } 267 268 /*public boolean validate(File uploadedFile) { 269 XmlDataLoaderFileFilter filter = new XmlDataLoaderFileFilter(); 270 return filter.accept(uploadedFile); 271 }*/ 272 273 public void setXmlPendingLocation(String xmlPendingLocation) { 274 this.xmlPendingLocation = xmlPendingLocation; 275 } 276 277 public String getXmlProblemLocation() { 278 return xmlProblemLocation; 279 } 280 281 public void setXmlProblemLocation(String xmlProblemLocation) { 282 this.xmlProblemLocation = xmlProblemLocation; 283 } 284 public String getXmlParentDirectory() { 285 return xmlParentDirectory; 286 } 287 public void setXmlParentDirectory(String xmlDataParentDirectory) { 288 this.xmlParentDirectory = xmlDataParentDirectory; 289 } 290 291 /** 292 * Sets the polling interval time in seconds 293 * @param seconds the polling interval time in seconds 294 */ 295 public void setPollIntervalSecs(int seconds) { 296 this.pollIntervalSecs = seconds; 297 } 298 299 /** 300 * Gets the polling interval time in seconds 301 * @return the polling interval time in seconds 302 */ 303 public int getPollIntervalSecs() { 304 return this.pollIntervalSecs; 305 } 306 307 /** 308 * Sets the initial delay time in seconds 309 * @param seconds the initial delay time in seconds 310 */ 311 public void setInitialDelaySecs(int seconds) { 312 this.initialDelaySecs = seconds; 313 } 314 315 /** 316 * Gets the initial delay time in seconds 317 * @return the initial delay time in seconds 318 */ 319 public int getInitialDelaySecs() { 320 return this.initialDelaySecs; 321 } 322 }