001 /* 002 * Copyright 2005-2007 The Kuali Foundation 003 * 004 * 005 * Licensed under the Educational Community License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.opensource.org/licenses/ecl2.php 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.kuali.rice.kew.batch; 018 019 import java.io.BufferedReader; 020 import java.io.File; 021 import java.io.FileReader; 022 import java.io.FileWriter; 023 import java.io.IOException; 024 import java.text.Format; 025 import java.text.SimpleDateFormat; 026 import java.util.ArrayList; 027 import java.util.Arrays; 028 import java.util.Calendar; 029 import java.util.Collection; 030 import java.util.Date; 031 import java.util.Iterator; 032 import java.util.List; 033 034 import org.apache.commons.io.IOUtils; 035 import org.kuali.rice.kew.service.KEWServiceLocator; 036 037 /** 038 * Utility class responsible for polling and ingesting XML data files containing various forms of workflow engine data 039 * (e.g. document types and rules). Loaded files and problem files are placed into a subdirectory of a configured 040 * 'loaded' and 'problem' directory, respectively. "Problem-ness" is determined by inspecting a 'processed' flag on each 041 * <code>XmlDoc</code> in each collection. If not all <code>XmlDoc</code>s are marked 'processed' an error is assumed, 042 * and the collection file (e.g. for a Zip, the Zip file) is moved to the 'problem' directory. As such, it is the <b> 043 * <code>XmlIngesterService</code>'s responsibility</b> to mark any unknown or otherwise innocuous non-failure 044 * non-processed files, as 'processed'. A different mechanism can be developed if this proves to be a problem, but for 045 * now it is simple enough for the <code>XmlIngesterService</code> to determine this. 046 * 047 * @see org.kuali.rice.kew.batch.XmlPollerService 048 * @see org.kuali.rice.kew.batch.XmlIngesterServiceImpl 049 * @author Kuali Rice Team (rice.collab@kuali.org) 050 */ 051 public class MyXMLPollerServiceImpl implements XmlPollerService { 052 053 private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger.getLogger(XmlPollerServiceImpl.class); 054 private static final Format DIR_FORMAT = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss-SSS"); 055 056 /** 057 * Specifies the polling interval that should be used with this task. 058 */ 059 private int pollIntervalSecs = 5 * 60; // default to 5 minutes 060 /** 061 * Specifies the initial delay the poller should wait before starting to poll 062 */ 063 private int initialDelaySecs = 30; // default to 30 seconds 064 /** 065 * Location in which to find XML files to load. 066 */ 067 private String xmlPendingLocation; 068 /** 069 * Location in which to place successfully loaded XML files. 070 */ 071 private String xmlCompletedLocation; 072 /** 073 * Location in which to place XML files which have failed to load. 074 */ 075 private String xmlProblemLocation; 076 077 private String xmlParentDirectory; 078 private static final String PENDING_MOVE_FAILED_ARCHIVE_FILE = "movesfailed"; 079 private static final String NEW_LINE = "\n"; 080 081 @Override 082 public void run() { 083 // if(!directoriesWritable()){ 084 // LOG.error("Error writing to xml data directories. Stopping xmlLoader ..."); 085 // this.cancel(); 086 // } 087 LOG.debug("checking for xml data files..."); 088 File[] files = getXmlPendingDir().listFiles(); 089 if (files == null || files.length == 0) { 090 return; 091 } 092 // Sort them so the ordering is deterministic by file name instead of random 093 Arrays.sort(files); 094 095 LOG.info("Found " + files.length + " files to ingest."); 096 097 List<XmlDocCollection> collections = new ArrayList<XmlDocCollection>(); 098 for (File file : files) { 099 if (file.isDirectory()) { 100 collections.add(new DirectoryXmlDocCollection(file)); 101 } else if (file.getName().equals(PENDING_MOVE_FAILED_ARCHIVE_FILE)) { 102 // the movesfailed file...ignore this 103 continue; 104 } else if (file.getName().toLowerCase().endsWith(".zip")) { 105 try { 106 collections.add(new ZipXmlDocCollection(file)); 107 } catch (IOException ioe) { 108 LOG.error("Unable to load file: " + file); 109 } 110 } else if (file.getName().endsWith(".xml")) { 111 collections.add(new FileXmlDocCollection(file)); 112 } else { 113 LOG.warn("Ignoring extraneous file in xml pending directory: " + file); 114 } 115 } 116 117 // Cull any resources which were already processed and whose moves failed 118 Iterator<?> collectionsIt = collections.iterator(); 119 Collection<XmlDocCollection> culled = new ArrayList<XmlDocCollection>(); 120 while (collectionsIt.hasNext()) { 121 XmlDocCollection container = (XmlDocCollection) collectionsIt.next(); 122 // if a move has already failed for this archive, ignore it 123 if (inPendingMoveFailedArchive(container.getFile())) { 124 LOG.info("Ignoring previously processed resource: " + container); 125 culled.add(container); 126 } 127 } 128 collections.removeAll(culled); 129 130 if (collections.size() == 0) { 131 LOG.debug("No valid new resources found to ingest"); 132 return; 133 } 134 135 Date LOAD_TIME = Calendar.getInstance().getTime(); 136 // synchronization around date format should not be an issue as this code is single-threaded 137 File completeDir = new File(getXmlCompleteDir(), DIR_FORMAT.format(LOAD_TIME)); 138 File failedDir = new File(getXmlProblemDir(), DIR_FORMAT.format(LOAD_TIME)); 139 140 // now ingest the containers 141 Collection<?> failed = null; 142 try { 143 failed = KEWServiceLocator.getXmlIngesterService().ingest(collections); 144 } catch (Exception e) { 145 LOG.error("Error ingesting data", e); 146 // throw new RuntimeException(e); 147 } 148 149 // now iterate through all containers again, and move containers to approprate dir 150 LOG.info("Moving files..."); 151 collectionsIt = collections.iterator(); 152 while (collectionsIt.hasNext()) { 153 XmlDocCollection container = (XmlDocCollection) collectionsIt.next(); 154 LOG.debug("container: " + container); 155 try { 156 // "close" the container 157 // this only matters for ZipFiles for now 158 container.close(); 159 } catch (IOException ioe) { 160 LOG.warn("Error closing " + container, ioe); 161 } 162 if (failed.contains(container)) { 163 // some docs must have failed, move the whole 164 // container to the failed dir 165 if (container.getFile() != null) { 166 LOG.error("Moving " + container.getFile() + " to problem dir."); 167 if ((!failedDir.isDirectory() && !failedDir.mkdirs()) || !moveFile(failedDir, container.getFile())) { 168 LOG.error("Could not move: " + container.getFile()); 169 recordUnmovablePendingFile(container.getFile(), LOAD_TIME); 170 } 171 } 172 } else { 173 if (container.getFile() != null) { 174 LOG.info("Moving " + container.getFile() + " to loaded dir."); 175 if ((!completeDir.isDirectory() && !completeDir.mkdirs()) 176 || !moveFile(completeDir, container.getFile())) { 177 LOG.error("Could not move: " + container.getFile()); 178 recordUnmovablePendingFile(container.getFile(), LOAD_TIME); 179 } 180 } 181 } 182 } 183 } 184 185 private boolean inPendingMoveFailedArchive(File xmlDataFile) { 186 if (xmlDataFile == null) { 187 return false; 188 } 189 File movesFailedFile = new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE); 190 if (!movesFailedFile.isFile()) { 191 return false; 192 } 193 BufferedReader inFile = null; 194 try { 195 inFile = new BufferedReader(new FileReader(movesFailedFile)); 196 String line; 197 boolean found = false; 198 while ((line = inFile.readLine()) != null) { 199 String trimmedLine = line.trim(); 200 if (trimmedLine.equals(xmlDataFile.getName()) || trimmedLine.startsWith(xmlDataFile.getName() + "=")) { 201 found = true; 202 break; 203 } 204 } 205 return found; 206 } catch (IOException e) { 207 LOG.warn("Error reading file " + movesFailedFile); 208 } finally { 209 IOUtils.closeQuietly(inFile); 210 } 211 return false; 212 } 213 214 private boolean recordUnmovablePendingFile(File unMovablePendingFile, Date dateLoaded) { 215 boolean recorded = false; 216 FileWriter archiveFile = null; 217 try { 218 archiveFile = new FileWriter(new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE), true); 219 archiveFile.write(unMovablePendingFile.getName() + "=" + dateLoaded.getTime() + NEW_LINE); 220 recorded = true; 221 } catch (IOException e) { 222 LOG.error("Unable to record unmovable pending file " + unMovablePendingFile.getName() 223 + "in the archive file " + PENDING_MOVE_FAILED_ARCHIVE_FILE); 224 } finally { 225 if (archiveFile != null) { 226 try { 227 archiveFile.close(); 228 } catch (IOException ioe) { 229 LOG.error("Error closing unmovable pending file", ioe); 230 } 231 } 232 } 233 return recorded; 234 } 235 236 private boolean moveFile(File toDirectory, File fileToMove) { 237 boolean moved = true; 238 if (!fileToMove.renameTo(new File(toDirectory.getPath(), fileToMove.getName()))) { 239 LOG.error("Unable to move file " + fileToMove.getName() + " to directory " + toDirectory.getPath()); 240 moved = false; 241 } 242 return moved; 243 } 244 245 private File getXmlPendingDir() { 246 return new File(getXmlPendingLocation()); 247 } 248 249 private File getXmlCompleteDir() { 250 return new File(getXmlCompletedLocation()); 251 } 252 253 private File getXmlProblemDir() { 254 return new File(getXmlProblemLocation()); 255 } 256 257 public String getXmlCompletedLocation() { 258 return xmlCompletedLocation; 259 } 260 261 public void setXmlCompletedLocation(String xmlCompletedLocation) { 262 this.xmlCompletedLocation = xmlCompletedLocation; 263 } 264 265 public String getXmlPendingLocation() { 266 return xmlPendingLocation; 267 } 268 269 /* 270 * public boolean validate(File uploadedFile) { XmlDataLoaderFileFilter filter = new XmlDataLoaderFileFilter(); 271 * return filter.accept(uploadedFile); } 272 */ 273 274 public void setXmlPendingLocation(String xmlPendingLocation) { 275 this.xmlPendingLocation = xmlPendingLocation; 276 } 277 278 public String getXmlProblemLocation() { 279 return xmlProblemLocation; 280 } 281 282 public void setXmlProblemLocation(String xmlProblemLocation) { 283 this.xmlProblemLocation = xmlProblemLocation; 284 } 285 286 public String getXmlParentDirectory() { 287 return xmlParentDirectory; 288 } 289 290 public void setXmlParentDirectory(String xmlDataParentDirectory) { 291 this.xmlParentDirectory = xmlDataParentDirectory; 292 } 293 294 /** 295 * Sets the polling interval time in seconds 296 * 297 * @param seconds 298 * the polling interval time in seconds 299 */ 300 public void setPollIntervalSecs(int seconds) { 301 this.pollIntervalSecs = seconds; 302 } 303 304 /** 305 * Gets the polling interval time in seconds 306 * 307 * @return the polling interval time in seconds 308 */ 309 @Override 310 public int getPollIntervalSecs() { 311 return this.pollIntervalSecs; 312 } 313 314 /** 315 * Sets the initial delay time in seconds 316 * 317 * @param seconds 318 * the initial delay time in seconds 319 */ 320 public void setInitialDelaySecs(int seconds) { 321 this.initialDelaySecs = seconds; 322 } 323 324 /** 325 * Gets the initial delay time in seconds 326 * 327 * @return the initial delay time in seconds 328 */ 329 @Override 330 public int getInitialDelaySecs() { 331 return this.initialDelaySecs; 332 } 333 }