View Javadoc
1   /*
2    * Copyright 2013 The Kuali Foundation.
3    * 
4    * Licensed under the Educational Community License, Version 1.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    * http://www.opensource.org/licenses/ecl1.php
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.ole;
17  
18  import org.kuali.rice.core.api.CoreApiServiceLocator;
19  import org.kuali.rice.core.api.impex.xml.*;
20  import org.kuali.rice.kew.batch.XmlPollerService;
21  
22  import java.io.BufferedReader;
23  import java.io.File;
24  import java.io.FileReader;
25  import java.io.FileWriter;
26  import java.io.IOException;
27  import java.text.Format;
28  import java.text.SimpleDateFormat;
29  import java.util.ArrayList;
30  import java.util.Arrays;
31  import java.util.Calendar;
32  import java.util.Collection;
33  import java.util.Date;
34  import java.util.Iterator;
35  import java.util.List;
36  
37  public abstract class OleXmlPollerServiceImpl implements OleXmlPollerService {
38  
39  
40  
41  	private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger.getLogger(OleXmlPollerServiceImpl.class);
42  	private static final Format DIR_FORMAT = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss-SSS");
43  
44  	/**
45  	 * Specifies the polling interval that should be used with this task.
46  	 */
47  	private int pollIntervalSecs = 5 * 60; // default to 5 minutes
48  	/**
49  	 * Specifies the initial delay the poller should wait before starting to poll
50  	 */
51  	private int initialDelaySecs = 30; // default to 30 seconds
52  	/**
53  	 * Location in which to find XML files to load.
54  	 */
55  	private String xmlPendingLocation;
56  	/**
57  	 * Location in which to place successfully loaded XML files.
58  	 */
59  	private String xmlCompletedLocation;
60  	/**
61  	 * Location in which to place XML files which have failed to load.
62  	 */
63  	private String xmlProblemLocation;
64      /**
65  	 * Location in which to place Report files.
66  	 */
67  	private String xmlReportLocation;
68  
69  	private String xmlParentDirectory;
70  	private static final String PENDING_MOVE_FAILED_ARCHIVE_FILE = "movesfailed";
71  	private static final String NEW_LINE = "\n";
72  
73  	public void run() {
74  		// if(!directoriesWritable()){
75  		// LOG.error("Error writing to xml data directories. Stopping xmlLoader ...");
76  		// this.cancel();
77  		// }		LOG.debug("checking for xml data files...");
78  		File[] files = getXmlPendingDir().listFiles();
79  		Arrays.sort(files);
80  		if (files == null || files.length == 0) {
81  			return;
82  		}
83  		LOG.info("Found " + files.length + " files to ingest.");
84  		for (int i = 0; i < files.length; i++) {
85  			LOG.info(files[i].getAbsolutePath());
86  		}
87  		List<XmlDocCollection> collections = new ArrayList<XmlDocCollection>();
88  		for (File file : files) {
89  			if (file.isDirectory()) {
90  				collections.add(new DirectoryXmlDocCollection(file));
91  			} else if (file.getName().equals(PENDING_MOVE_FAILED_ARCHIVE_FILE)) {
92  				// the movesfailed file...ignore this
93  				continue;
94  			} else if (file.getName().toLowerCase().endsWith(".zip")) {
95  				try {
96  					collections.add(new ZipXmlDocCollection(file));
97  				} catch (IOException ioe) {
98  					LOG.error("Unable to load file: " + file);
99  				}
100 			} else if (file.getName().endsWith(".xml")) {
101 				collections.add(new FileXmlDocCollection(file));
102 			} else {
103 				LOG.warn("Ignoring extraneous file in xml pending directory: " + file);
104 			}
105 		}
106 
107 		// Cull any resources which were already processed and whose moves failed
108 		Iterator collectionsIt = collections.iterator();
109 		Collection<XmlDocCollection> culled = new ArrayList<XmlDocCollection>();
110 		while (collectionsIt.hasNext()) {
111 			XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
112 			// if a move has already failed for this archive, ignore it
113 			if (inPendingMoveFailedArchive(container.getFile())) {
114 				LOG.info("Ignoring previously processed resource: " + container);
115 				culled.add(container);
116 			}
117 		}
118 		collections.removeAll(culled);
119 
120 		if (collections.size() == 0) {
121 			LOG.debug("No valid new resources found to ingest");
122 			return;
123 		}
124 
125 		Date LOAD_TIME = Calendar.getInstance().getTime();
126 		// synchronization around date format should not be an issue as this code is single-threaded
127 		File completeDir = new File(getXmlCompleteDir(), DIR_FORMAT.format(LOAD_TIME));
128 		File failedDir = new File(getXmlProblemDir(), DIR_FORMAT.format(LOAD_TIME));
129 
130 		// now ingest the containers
131 		Collection failed = null;
132 		try {
133 			failed = getIngesterService().ingest(collections);
134 		} catch (Exception e) {
135 			LOG.error("Error ingesting data", e);
136 			// throw new RuntimeException(e);
137 		}
138 
139 		// now iterate through all containers again, and move containers to approprate dir
140 		LOG.info("Moving files...");
141 		collectionsIt = collections.iterator();
142 		while (collectionsIt.hasNext()) {
143 			XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
144 			LOG.debug("container: " + container);
145 			try {
146 				// "close" the container
147 				// this only matters for ZipFiles for now
148 				container.close();
149 			} catch (IOException ioe) {
150 				LOG.warn("Error closing " + container, ioe);
151 			}
152 			if (failed.contains(container)) {
153 				// some docs must have failed, move the whole
154 				// container to the failed dir
155 				if (container.getFile() != null) {
156 					LOG.error("Moving " + container.getFile() + " to problem dir.");
157 					if ((!failedDir.isDirectory() && !failedDir.mkdirs()) || !moveFile(failedDir, container.getFile())) {
158 						LOG.error("Could not move: " + container.getFile());
159 						recordUnmovablePendingFile(container.getFile(), LOAD_TIME);
160 					}
161 				}
162 			} else {
163 				if (container.getFile() != null) {
164 					LOG.info("Moving " + container.getFile() + " to loaded dir.");
165 					if ((!completeDir.isDirectory() && !completeDir.mkdirs()) || !moveFile(completeDir, container.getFile())) {
166 						LOG.error("Could not move: " + container.getFile());
167 						recordUnmovablePendingFile(container.getFile(), LOAD_TIME);
168 					}
169 				}
170 			}
171             }
172         processReportContent(collections, DIR_FORMAT.format(LOAD_TIME));
173 		}
174 
175     protected void processReportContent(List<XmlDocCollection> collections, String loadTime) {
176         // Write custom process for report.
177     }
178 
179 	protected abstract XmlIngesterService getIngesterService();
180 
181     private boolean inPendingMoveFailedArchive(File xmlDataFile) {
182 		if (xmlDataFile == null)
183 			return false;
184 		BufferedReader inFile = null;
185 		File movesFailedFile = new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE);
186 		if (!movesFailedFile.isFile())
187 			return false;
188 		try {
189 			inFile = new BufferedReader(new FileReader(movesFailedFile));
190 			String line;
191 
192 			while ((line = inFile.readLine()) != null) {
193 				String trimmedLine = line.trim();
194 				if (trimmedLine.equals(xmlDataFile.getName()) || trimmedLine.startsWith(xmlDataFile.getName() + "=")) {
195 					return true;
196 				}
197 			}
198 		} catch (IOException e) {
199 			LOG.warn("Error reading file " + movesFailedFile);
200 			// TODO try reading the pending file or stop?
201 		} finally {
202 			if (inFile != null)
203 				try {
204 					inFile.close();
205 				} catch (Exception e) {
206 					LOG.warn("Error closing buffered reader for " + movesFailedFile);
207 				}
208 		}
209 
210 		return false;
211 	}
212 
213 	private boolean recordUnmovablePendingFile(File unMovablePendingFile, Date dateLoaded) {
214 		boolean recorded = false;
215 		FileWriter archiveFile = null;
216 		try {
217 			archiveFile = new FileWriter(new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE), true);
218 			archiveFile.write(unMovablePendingFile.getName() + "=" + dateLoaded.getTime() + NEW_LINE);
219 			recorded = true;
220 		} catch (IOException e) {
221 			LOG.error("Unable to record unmovable pending file " + unMovablePendingFile.getName() + "in the archive file " + PENDING_MOVE_FAILED_ARCHIVE_FILE);
222 		} finally {
223 			if (archiveFile != null) {
224 				try {
225 					archiveFile.close();
226 				} catch (IOException ioe) {
227 					LOG.error("Error closing unmovable pending file", ioe);
228 				}
229 			}
230 		}
231 		return recorded;
232 	}
233 
234 	private boolean moveFile(File toDirectory, File fileToMove) {
235 		boolean moved = true;
236 		if (!fileToMove.renameTo(new File(toDirectory.getPath(), fileToMove.getName()))) {
237 			LOG.error("Unable to move file " + fileToMove.getName() + " to directory " + toDirectory.getPath());
238 			moved = false;
239 		}
240 		return moved;
241 	}
242 
243 	private File getXmlPendingDir() {
244 		return new File(getXmlPendingLocation());
245 	}
246 
247 	private File getXmlCompleteDir() {
248 		return new File(getXmlCompletedLocation());
249 	}
250 
251 	private File getXmlProblemDir() {
252 		return new File(getXmlProblemLocation());
253 	}
254 
255 	public String getXmlCompletedLocation() {
256 		return xmlCompletedLocation;
257 	}
258 
259 	public void setXmlCompletedLocation(String xmlCompletedLocation) {
260 		this.xmlCompletedLocation = xmlCompletedLocation;
261 	}
262 
263 	public String getXmlPendingLocation() {
264 		return xmlPendingLocation;
265 	}
266 
267 	/*
268 	 * public boolean validate(File uploadedFile) { XmlDataLoaderFileFilter filter = new XmlDataLoaderFileFilter(); return
269 	 * filter.accept(uploadedFile); }
270 	 */
271 
272 	public void setXmlPendingLocation(String xmlPendingLocation) {
273 		this.xmlPendingLocation = xmlPendingLocation;
274 	}
275 
276 	public String getXmlProblemLocation() {
277 		return xmlProblemLocation;
278 	}
279 
280 	public void setXmlProblemLocation(String xmlProblemLocation) {
281 		this.xmlProblemLocation = xmlProblemLocation;
282 	}
283 
284     public String getXmlReportLocation() {
285         return xmlReportLocation;
286     }
287 
288     public void setXmlReportLocation(String xmlReportLocation) {
289         this.xmlReportLocation = xmlReportLocation;
290     }
291 
292     public String getXmlParentDirectory() {
293 		return xmlParentDirectory;
294 	}
295 
296 	public void setXmlParentDirectory(String xmlDataParentDirectory) {
297 		this.xmlParentDirectory = xmlDataParentDirectory;
298 	}
299 
300 	/**
301 	 * Sets the polling interval time in seconds
302 	 * 
303 	 * @param seconds
304 	 *            the polling interval time in seconds
305 	 */
306 	public void setPollIntervalSecs(int seconds) {
307 		this.pollIntervalSecs = seconds;
308 	}
309 
310 	/**
311 	 * Gets the polling interval time in seconds
312 	 * 
313 	 * @return the polling interval time in seconds
314 	 */
315 	public int getPollIntervalSecs() {
316 		return this.pollIntervalSecs;
317 	}
318 
319 	/**
320 	 * Sets the initial delay time in seconds
321 	 * 
322 	 * @param seconds
323 	 *            the initial delay time in seconds
324 	 */
325 	public void setInitialDelaySecs(int seconds) {
326 		this.initialDelaySecs = seconds;
327 	}
328 
329 	/**
330 	 * Gets the initial delay time in seconds
331 	 * 
332 	 * @return the initial delay time in seconds
333 	 */
334 	public int getInitialDelaySecs() {
335 		return this.initialDelaySecs;
336 	}
337 }