001/** 002 * Copyright 2005-2015 The Kuali Foundation 003 * 004 * Licensed under the Educational Community License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/ecl2.php 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.kuali.rice.kew.batch; 017 018import org.kuali.rice.core.api.CoreApiServiceLocator; 019import org.kuali.rice.core.api.impex.xml.DirectoryXmlDocCollection; 020import org.kuali.rice.core.api.impex.xml.FileXmlDocCollection; 021import org.kuali.rice.core.api.impex.xml.XmlDocCollection; 022import org.kuali.rice.core.api.impex.xml.ZipXmlDocCollection; 023 024import java.io.BufferedReader; 025import java.io.File; 026import java.io.FileReader; 027import java.io.FileWriter; 028import java.io.IOException; 029import java.text.Format; 030import java.text.SimpleDateFormat; 031import java.util.ArrayList; 032import java.util.Calendar; 033import java.util.Collection; 034import java.util.Date; 035import java.util.Iterator; 036import java.util.List; 037 038 039/** 040 * Utility class responsible for polling and ingesting XML data files 041 * containing various forms of workflow engine data (e.g. document types 042 * and rules). 043 * Loaded files and problem files are placed into a subdirectory of a 044 * configured 'loaded' and 'problem' directory, respectively. 045 * "Problem-ness" is determined by inspecting a 'processed' flag on each <code>XmlDoc</code> 046 * in each collection. If not all <code>XmlDoc</code>s are marked 'processed' an 047 * error is assumed, and the collection file (e.g. for a Zip, the Zip file) is moved 048 * to the 'problem' directory. 049 * As such, it is the <b><code>XmlIngesterService</code>'s responsibility</b> to mark 050 * any unknown or otherwise innocuous non-failure non-processed files, as 'processed'. 051 * A different mechanism can be developed if this proves to be a problem, but for now 052 * it is simple enough for the <code>XmlIngesterService</code> to determine this. 053 * @see org.kuali.rice.kew.batch.XmlPollerService 054 * @see org.kuali.rice.kew.batch.XmlIngesterServiceImpl 055 * @author Kuali Rice Team (rice.collab@kuali.org) 056 */ 057public class XmlPollerServiceImpl implements XmlPollerService { 058 059 private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger 060 .getLogger(XmlPollerServiceImpl.class); 061 private static final Format DIR_FORMAT = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss-SSS"); 062 063 /** 064 * Specifies the polling interval that should be used with this task. 065 */ 066 private int pollIntervalSecs = 5 * 60; // default to 5 minutes 067 /** 068 * Specifies the initial delay the poller should wait before starting to poll 069 */ 070 private int initialDelaySecs = 30; // default to 30 seconds 071 /** 072 * Location in which to find XML files to load. 073 */ 074 private String xmlPendingLocation; 075 /** 076 * Location in which to place successfully loaded XML files. 077 */ 078 private String xmlCompletedLocation; 079 /** 080 * Location in which to place XML files which have failed to load. 081 */ 082 private String xmlProblemLocation; 083 084 private String xmlParentDirectory; 085 private static final String PENDING_MOVE_FAILED_ARCHIVE_FILE = "movesfailed"; 086 private static final String NEW_LINE = "\n"; 087 088 public void run() { 089 // if(!directoriesWritable()){ 090 // LOG.error("Error writing to xml data directories. Stopping xmlLoader ..."); 091 // this.cancel(); 092 // } 093 LOG.debug("checking for xml data files..."); 094 File[] files = getXmlPendingDir().listFiles(); 095 if (files == null || files.length == 0) { 096 return; 097 } 098 LOG.info("Found " + files.length + " files to ingest."); 099 List<XmlDocCollection> collections = new ArrayList<XmlDocCollection>(); 100 for (File file : files) 101 { 102 if (file.isDirectory()) 103 { 104 collections.add(new DirectoryXmlDocCollection(file)); 105 } else if (file.getName().equals(PENDING_MOVE_FAILED_ARCHIVE_FILE)) 106 { 107 // the movesfailed file...ignore this 108 continue; 109 } else if (file.getName().toLowerCase().endsWith(".zip")) 110 { 111 try 112 { 113 collections.add(new ZipXmlDocCollection(file)); 114 } catch (IOException ioe) 115 { 116 LOG.error("Unable to load file: " + file); 117 } 118 } else if (file.getName().endsWith(".xml")) 119 { 120 collections.add(new FileXmlDocCollection(file)); 121 } else 122 { 123 LOG.warn("Ignoring extraneous file in xml pending directory: " + file); 124 } 125 } 126 127 // Cull any resources which were already processed and whose moves failed 128 Iterator collectionsIt = collections.iterator(); 129 Collection<XmlDocCollection> culled = new ArrayList<XmlDocCollection>(); 130 while (collectionsIt.hasNext()) { 131 XmlDocCollection container = (XmlDocCollection) collectionsIt.next(); 132 // if a move has already failed for this archive, ignore it 133 if (inPendingMoveFailedArchive(container.getFile())) { 134 LOG.info("Ignoring previously processed resource: " + container); 135 culled.add(container); 136 } 137 } 138 collections.removeAll(culled); 139 140 if (collections.size() == 0) { 141 LOG.debug("No valid new resources found to ingest"); 142 return; 143 } 144 145 Date LOAD_TIME = Calendar.getInstance().getTime(); 146 // synchronization around date format should not be an issue as this code is single-threaded 147 File completeDir = new File(getXmlCompleteDir(), DIR_FORMAT.format(LOAD_TIME)); 148 File failedDir = new File(getXmlProblemDir(), DIR_FORMAT.format(LOAD_TIME)); 149 150 // now ingest the containers 151 Collection failed = null; 152 try { 153 failed = CoreApiServiceLocator.getXmlIngesterService().ingest(collections); 154 } catch (Exception e) { 155 LOG.error("Error ingesting data", e); 156 //throw new RuntimeException(e); 157 } 158 159 // now iterate through all containers again, and move containers to approprate dir 160 LOG.info("Moving files..."); 161 collectionsIt = collections.iterator(); 162 while (collectionsIt.hasNext()) { 163 XmlDocCollection container = (XmlDocCollection) collectionsIt.next(); 164 LOG.debug("container: " + container); 165 try { 166 // "close" the container 167 // this only matters for ZipFiles for now 168 container.close(); 169 } catch (IOException ioe) { 170 LOG.warn("Error closing " + container, ioe); 171 } 172 if (failed.contains(container)) { 173 // some docs must have failed, move the whole 174 // container to the failed dir 175 if (container.getFile() != null) { 176 LOG.error("Moving " + container.getFile() + " to problem dir."); 177 if ((!failedDir.isDirectory() && !failedDir.mkdirs()) 178 || !moveFile(failedDir, container.getFile())) { 179 LOG.error("Could not move: " + container.getFile()); 180 recordUnmovablePendingFile(container.getFile(), LOAD_TIME); 181 } 182 } 183 } else { 184 if (container.getFile() != null) { 185 LOG.info("Moving " + container.getFile() + " to loaded dir."); 186 if((!completeDir.isDirectory() && !completeDir.mkdirs()) 187 || !moveFile(completeDir, container.getFile())){ 188 LOG.error("Could not move: " + container.getFile()); 189 recordUnmovablePendingFile(container.getFile(), LOAD_TIME); 190 } 191 } 192 } 193 } 194 } 195 196 private boolean inPendingMoveFailedArchive(File xmlDataFile){ 197 if (xmlDataFile == null) return false; 198 BufferedReader inFile = null; 199 File movesFailedFile = new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE); 200 if (!movesFailedFile.isFile()) return false; 201 try { 202 inFile = new BufferedReader(new FileReader(movesFailedFile)); 203 String line; 204 205 while((line = inFile.readLine()) != null){ 206 String trimmedLine = line.trim(); 207 if(trimmedLine.equals(xmlDataFile.getName()) || 208 trimmedLine.startsWith(xmlDataFile.getName() + "=")) { 209 return true; 210 } 211 } 212 } catch (IOException e){ 213 LOG.warn("Error reading file " + movesFailedFile); 214 //TODO try reading the pending file or stop? 215 } finally { 216 if (inFile != null) try { 217 inFile.close(); 218 } catch (Exception e) { 219 LOG.warn("Error closing buffered reader for " + movesFailedFile); 220 } 221 } 222 223 return false; 224 } 225 226 private boolean recordUnmovablePendingFile(File unMovablePendingFile, Date dateLoaded){ 227 boolean recorded = false; 228 FileWriter archiveFile = null; 229 try{ 230 archiveFile = new FileWriter(new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE), true); 231 archiveFile.write(unMovablePendingFile.getName() + "=" + dateLoaded.getTime() + NEW_LINE); 232 recorded = true; 233 } catch (IOException e){ 234 LOG.error("Unable to record unmovable pending file " + unMovablePendingFile.getName() + "in the archive file " + PENDING_MOVE_FAILED_ARCHIVE_FILE); 235 } finally { 236 if (archiveFile != null) { 237 try { 238 archiveFile.close(); 239 } catch (IOException ioe) { 240 LOG.error("Error closing unmovable pending file", ioe); 241 } 242 } 243 } 244 return recorded; 245 } 246 247 private boolean moveFile(File toDirectory, File fileToMove){ 248 boolean moved = true; 249 if (!fileToMove.renameTo(new File(toDirectory.getPath(), fileToMove.getName()))){ 250 LOG.error("Unable to move file " + fileToMove.getName() + " to directory " + toDirectory.getPath()); 251 moved = false; 252 } 253 return moved; 254 } 255 256 private File getXmlPendingDir() { 257 return new File(getXmlPendingLocation()); 258 } 259 260 private File getXmlCompleteDir() { 261 return new File(getXmlCompletedLocation()); 262 } 263 264 private File getXmlProblemDir() { 265 return new File(getXmlProblemLocation()); 266 } 267 268 public String getXmlCompletedLocation() { 269 return xmlCompletedLocation; 270 } 271 272 public void setXmlCompletedLocation(String xmlCompletedLocation) { 273 this.xmlCompletedLocation = xmlCompletedLocation; 274 } 275 276 public String getXmlPendingLocation() { 277 return xmlPendingLocation; 278 } 279 280 /*public boolean validate(File uploadedFile) { 281 XmlDataLoaderFileFilter filter = new XmlDataLoaderFileFilter(); 282 return filter.accept(uploadedFile); 283 }*/ 284 285 public void setXmlPendingLocation(String xmlPendingLocation) { 286 this.xmlPendingLocation = xmlPendingLocation; 287 } 288 289 public String getXmlProblemLocation() { 290 return xmlProblemLocation; 291 } 292 293 public void setXmlProblemLocation(String xmlProblemLocation) { 294 this.xmlProblemLocation = xmlProblemLocation; 295 } 296 public String getXmlParentDirectory() { 297 return xmlParentDirectory; 298 } 299 public void setXmlParentDirectory(String xmlDataParentDirectory) { 300 this.xmlParentDirectory = xmlDataParentDirectory; 301 } 302 303 /** 304 * Sets the polling interval time in seconds 305 * @param seconds the polling interval time in seconds 306 */ 307 public void setPollIntervalSecs(int seconds) { 308 this.pollIntervalSecs = seconds; 309 } 310 311 /** 312 * Gets the polling interval time in seconds 313 * @return the polling interval time in seconds 314 */ 315 public int getPollIntervalSecs() { 316 return this.pollIntervalSecs; 317 } 318 319 /** 320 * Sets the initial delay time in seconds 321 * @param seconds the initial delay time in seconds 322 */ 323 public void setInitialDelaySecs(int seconds) { 324 this.initialDelaySecs = seconds; 325 } 326 327 /** 328 * Gets the initial delay time in seconds 329 * @return the initial delay time in seconds 330 */ 331 public int getInitialDelaySecs() { 332 return this.initialDelaySecs; 333 } 334}