001/* 002 * Copyright 2013 The Kuali Foundation. 003 * 004 * Licensed under the Educational Community License, Version 1.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/ecl1.php 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.kuali.ole; 017 018import org.kuali.rice.core.api.CoreApiServiceLocator; 019import org.kuali.rice.core.api.impex.xml.*; 020import org.kuali.rice.kew.batch.XmlPollerService; 021 022import java.io.BufferedReader; 023import java.io.File; 024import java.io.FileReader; 025import java.io.FileWriter; 026import java.io.IOException; 027import java.text.Format; 028import java.text.SimpleDateFormat; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.Calendar; 032import java.util.Collection; 033import java.util.Date; 034import java.util.Iterator; 035import java.util.List; 036 037public abstract class OleXmlPollerServiceImpl implements OleXmlPollerService { 038 039 040 041 private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger.getLogger(OleXmlPollerServiceImpl.class); 042 private static final Format DIR_FORMAT = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss-SSS"); 043 044 /** 045 * Specifies the polling interval that should be used with this task. 046 */ 047 private int pollIntervalSecs = 5 * 60; // default to 5 minutes 048 /** 049 * Specifies the initial delay the poller should wait before starting to poll 050 */ 051 private int initialDelaySecs = 30; // default to 30 seconds 052 /** 053 * Location in which to find XML files to load. 054 */ 055 private String xmlPendingLocation; 056 /** 057 * Location in which to place successfully loaded XML files. 058 */ 059 private String xmlCompletedLocation; 060 /** 061 * Location in which to place XML files which have failed to load. 062 */ 063 private String xmlProblemLocation; 064 065 private String xmlParentDirectory; 066 private static final String PENDING_MOVE_FAILED_ARCHIVE_FILE = "movesfailed"; 067 private static final String NEW_LINE = "\n"; 068 069 public void run() { 070 // if(!directoriesWritable()){ 071 // LOG.error("Error writing to xml data directories. Stopping xmlLoader ..."); 072 // this.cancel(); 073 // } LOG.debug("checking for xml data files..."); 074 File[] files = getXmlPendingDir().listFiles(); 075 Arrays.sort(files); 076 if (files == null || files.length == 0) { 077 return; 078 } 079 LOG.info("Found " + files.length + " files to ingest."); 080 for (int i = 0; i < files.length; i++) { 081 LOG.info(files[i].getAbsolutePath()); 082 } 083 List<XmlDocCollection> collections = new ArrayList<XmlDocCollection>(); 084 for (File file : files) { 085 if (file.isDirectory()) { 086 collections.add(new DirectoryXmlDocCollection(file)); 087 } else if (file.getName().equals(PENDING_MOVE_FAILED_ARCHIVE_FILE)) { 088 // the movesfailed file...ignore this 089 continue; 090 } else if (file.getName().toLowerCase().endsWith(".zip")) { 091 try { 092 collections.add(new ZipXmlDocCollection(file)); 093 } catch (IOException ioe) { 094 LOG.error("Unable to load file: " + file); 095 } 096 } else if (file.getName().endsWith(".xml")) { 097 collections.add(new FileXmlDocCollection(file)); 098 } else { 099 LOG.warn("Ignoring extraneous file in xml pending directory: " + file); 100 } 101 } 102 103 // Cull any resources which were already processed and whose moves failed 104 Iterator collectionsIt = collections.iterator(); 105 Collection<XmlDocCollection> culled = new ArrayList<XmlDocCollection>(); 106 while (collectionsIt.hasNext()) { 107 XmlDocCollection container = (XmlDocCollection) collectionsIt.next(); 108 // if a move has already failed for this archive, ignore it 109 if (inPendingMoveFailedArchive(container.getFile())) { 110 LOG.info("Ignoring previously processed resource: " + container); 111 culled.add(container); 112 } 113 } 114 collections.removeAll(culled); 115 116 if (collections.size() == 0) { 117 LOG.debug("No valid new resources found to ingest"); 118 return; 119 } 120 121 Date LOAD_TIME = Calendar.getInstance().getTime(); 122 // synchronization around date format should not be an issue as this code is single-threaded 123 File completeDir = new File(getXmlCompleteDir(), DIR_FORMAT.format(LOAD_TIME)); 124 File failedDir = new File(getXmlProblemDir(), DIR_FORMAT.format(LOAD_TIME)); 125 126 // now ingest the containers 127 Collection failed = null; 128 try { 129 failed = getIngesterService().ingest(collections); 130 } catch (Exception e) { 131 LOG.error("Error ingesting data", e); 132 // throw new RuntimeException(e); 133 } 134 135 // now iterate through all containers again, and move containers to approprate dir 136 LOG.info("Moving files..."); 137 collectionsIt = collections.iterator(); 138 while (collectionsIt.hasNext()) { 139 XmlDocCollection container = (XmlDocCollection) collectionsIt.next(); 140 LOG.debug("container: " + container); 141 try { 142 // "close" the container 143 // this only matters for ZipFiles for now 144 container.close(); 145 } catch (IOException ioe) { 146 LOG.warn("Error closing " + container, ioe); 147 } 148 if (failed.contains(container)) { 149 // some docs must have failed, move the whole 150 // container to the failed dir 151 if (container.getFile() != null) { 152 LOG.error("Moving " + container.getFile() + " to problem dir."); 153 if ((!failedDir.isDirectory() && !failedDir.mkdirs()) || !moveFile(failedDir, container.getFile())) { 154 LOG.error("Could not move: " + container.getFile()); 155 recordUnmovablePendingFile(container.getFile(), LOAD_TIME); 156 } 157 } 158 } else { 159 if (container.getFile() != null) { 160 LOG.info("Moving " + container.getFile() + " to loaded dir."); 161 if ((!completeDir.isDirectory() && !completeDir.mkdirs()) || !moveFile(completeDir, container.getFile())) { 162 LOG.error("Could not move: " + container.getFile()); 163 recordUnmovablePendingFile(container.getFile(), LOAD_TIME); 164 } 165 } 166 } 167 } 168 } 169 170 protected abstract XmlIngesterService getIngesterService(); 171 172 private boolean inPendingMoveFailedArchive(File xmlDataFile) { 173 if (xmlDataFile == null) 174 return false; 175 BufferedReader inFile = null; 176 File movesFailedFile = new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE); 177 if (!movesFailedFile.isFile()) 178 return false; 179 try { 180 inFile = new BufferedReader(new FileReader(movesFailedFile)); 181 String line; 182 183 while ((line = inFile.readLine()) != null) { 184 String trimmedLine = line.trim(); 185 if (trimmedLine.equals(xmlDataFile.getName()) || trimmedLine.startsWith(xmlDataFile.getName() + "=")) { 186 return true; 187 } 188 } 189 } catch (IOException e) { 190 LOG.warn("Error reading file " + movesFailedFile); 191 // TODO try reading the pending file or stop? 192 } finally { 193 if (inFile != null) 194 try { 195 inFile.close(); 196 } catch (Exception e) { 197 LOG.warn("Error closing buffered reader for " + movesFailedFile); 198 } 199 } 200 201 return false; 202 } 203 204 private boolean recordUnmovablePendingFile(File unMovablePendingFile, Date dateLoaded) { 205 boolean recorded = false; 206 FileWriter archiveFile = null; 207 try { 208 archiveFile = new FileWriter(new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE), true); 209 archiveFile.write(unMovablePendingFile.getName() + "=" + dateLoaded.getTime() + NEW_LINE); 210 recorded = true; 211 } catch (IOException e) { 212 LOG.error("Unable to record unmovable pending file " + unMovablePendingFile.getName() + "in the archive file " + PENDING_MOVE_FAILED_ARCHIVE_FILE); 213 } finally { 214 if (archiveFile != null) { 215 try { 216 archiveFile.close(); 217 } catch (IOException ioe) { 218 LOG.error("Error closing unmovable pending file", ioe); 219 } 220 } 221 } 222 return recorded; 223 } 224 225 private boolean moveFile(File toDirectory, File fileToMove) { 226 boolean moved = true; 227 if (!fileToMove.renameTo(new File(toDirectory.getPath(), fileToMove.getName()))) { 228 LOG.error("Unable to move file " + fileToMove.getName() + " to directory " + toDirectory.getPath()); 229 moved = false; 230 } 231 return moved; 232 } 233 234 private File getXmlPendingDir() { 235 return new File(getXmlPendingLocation()); 236 } 237 238 private File getXmlCompleteDir() { 239 return new File(getXmlCompletedLocation()); 240 } 241 242 private File getXmlProblemDir() { 243 return new File(getXmlProblemLocation()); 244 } 245 246 public String getXmlCompletedLocation() { 247 return xmlCompletedLocation; 248 } 249 250 public void setXmlCompletedLocation(String xmlCompletedLocation) { 251 this.xmlCompletedLocation = xmlCompletedLocation; 252 } 253 254 public String getXmlPendingLocation() { 255 return xmlPendingLocation; 256 } 257 258 /* 259 * public boolean validate(File uploadedFile) { XmlDataLoaderFileFilter filter = new XmlDataLoaderFileFilter(); return 260 * filter.accept(uploadedFile); } 261 */ 262 263 public void setXmlPendingLocation(String xmlPendingLocation) { 264 this.xmlPendingLocation = xmlPendingLocation; 265 } 266 267 public String getXmlProblemLocation() { 268 return xmlProblemLocation; 269 } 270 271 public void setXmlProblemLocation(String xmlProblemLocation) { 272 this.xmlProblemLocation = xmlProblemLocation; 273 } 274 275 public String getXmlParentDirectory() { 276 return xmlParentDirectory; 277 } 278 279 public void setXmlParentDirectory(String xmlDataParentDirectory) { 280 this.xmlParentDirectory = xmlDataParentDirectory; 281 } 282 283 /** 284 * Sets the polling interval time in seconds 285 * 286 * @param seconds 287 * the polling interval time in seconds 288 */ 289 public void setPollIntervalSecs(int seconds) { 290 this.pollIntervalSecs = seconds; 291 } 292 293 /** 294 * Gets the polling interval time in seconds 295 * 296 * @return the polling interval time in seconds 297 */ 298 public int getPollIntervalSecs() { 299 return this.pollIntervalSecs; 300 } 301 302 /** 303 * Sets the initial delay time in seconds 304 * 305 * @param seconds 306 * the initial delay time in seconds 307 */ 308 public void setInitialDelaySecs(int seconds) { 309 this.initialDelaySecs = seconds; 310 } 311 312 /** 313 * Gets the initial delay time in seconds 314 * 315 * @return the initial delay time in seconds 316 */ 317 public int getInitialDelaySecs() { 318 return this.initialDelaySecs; 319 } 320}