View Javadoc

1   /*
2    * Copyright 2013 The Kuali Foundation.
3    * 
4    * Licensed under the Educational Community License, Version 1.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    * http://www.opensource.org/licenses/ecl1.php
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.ole;
17  
18  import org.kuali.rice.core.api.CoreApiServiceLocator;
19  import org.kuali.rice.core.api.impex.xml.DirectoryXmlDocCollection;
20  import org.kuali.rice.core.api.impex.xml.FileXmlDocCollection;
21  import org.kuali.rice.core.api.impex.xml.XmlDocCollection;
22  import org.kuali.rice.core.api.impex.xml.ZipXmlDocCollection;
23  import org.kuali.rice.kew.batch.XmlPollerService;
24  
25  import java.io.BufferedReader;
26  import java.io.File;
27  import java.io.FileReader;
28  import java.io.FileWriter;
29  import java.io.IOException;
30  import java.text.Format;
31  import java.text.SimpleDateFormat;
32  import java.util.ArrayList;
33  import java.util.Arrays;
34  import java.util.Calendar;
35  import java.util.Collection;
36  import java.util.Date;
37  import java.util.Iterator;
38  import java.util.List;
39  
40  public class OleXmlPollerServiceImpl implements XmlPollerService {
41  
42  	private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger.getLogger(OleXmlPollerServiceImpl.class);
43  	private static final Format DIR_FORMAT = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss-SSS");
44  
45  	/**
46  	 * Specifies the polling interval that should be used with this task.
47  	 */
48  	private int pollIntervalSecs = 5 * 60; // default to 5 minutes
49  	/**
50  	 * Specifies the initial delay the poller should wait before starting to poll
51  	 */
52  	private int initialDelaySecs = 30; // default to 30 seconds
53  	/**
54  	 * Location in which to find XML files to load.
55  	 */
56  	private String xmlPendingLocation;
57  	/**
58  	 * Location in which to place successfully loaded XML files.
59  	 */
60  	private String xmlCompletedLocation;
61  	/**
62  	 * Location in which to place XML files which have failed to load.
63  	 */
64  	private String xmlProblemLocation;
65  
66  	private String xmlParentDirectory;
67  	private static final String PENDING_MOVE_FAILED_ARCHIVE_FILE = "movesfailed";
68  	private static final String NEW_LINE = "\n";
69  
70  	public void run() {
71  		// if(!directoriesWritable()){
72  		// LOG.error("Error writing to xml data directories. Stopping xmlLoader ...");
73  		// this.cancel();
74  		// }
75  		LOG.debug("checking for xml data files...");
76  		File[] files = getXmlPendingDir().listFiles();
77  		Arrays.sort(files);
78  		if (files == null || files.length == 0) {
79  			return;
80  		}
81  		LOG.info("Found " + files.length + " files to ingest.");
82  		for (int i = 0; i < files.length; i++) {
83  			LOG.info(files[i].getAbsolutePath());
84  		}
85  		List<XmlDocCollection> collections = new ArrayList<XmlDocCollection>();
86  		for (File file : files) {
87  			if (file.isDirectory()) {
88  				collections.add(new DirectoryXmlDocCollection(file));
89  			} else if (file.getName().equals(PENDING_MOVE_FAILED_ARCHIVE_FILE)) {
90  				// the movesfailed file...ignore this
91  				continue;
92  			} else if (file.getName().toLowerCase().endsWith(".zip")) {
93  				try {
94  					collections.add(new ZipXmlDocCollection(file));
95  				} catch (IOException ioe) {
96  					LOG.error("Unable to load file: " + file);
97  				}
98  			} else if (file.getName().endsWith(".xml")) {
99  				collections.add(new FileXmlDocCollection(file));
100 			} else {
101 				LOG.warn("Ignoring extraneous file in xml pending directory: " + file);
102 			}
103 		}
104 
105 		// Cull any resources which were already processed and whose moves failed
106 		Iterator collectionsIt = collections.iterator();
107 		Collection<XmlDocCollection> culled = new ArrayList<XmlDocCollection>();
108 		while (collectionsIt.hasNext()) {
109 			XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
110 			// if a move has already failed for this archive, ignore it
111 			if (inPendingMoveFailedArchive(container.getFile())) {
112 				LOG.info("Ignoring previously processed resource: " + container);
113 				culled.add(container);
114 			}
115 		}
116 		collections.removeAll(culled);
117 
118 		if (collections.size() == 0) {
119 			LOG.debug("No valid new resources found to ingest");
120 			return;
121 		}
122 
123 		Date LOAD_TIME = Calendar.getInstance().getTime();
124 		// synchronization around date format should not be an issue as this code is single-threaded
125 		File completeDir = new File(getXmlCompleteDir(), DIR_FORMAT.format(LOAD_TIME));
126 		File failedDir = new File(getXmlProblemDir(), DIR_FORMAT.format(LOAD_TIME));
127 
128 		// now ingest the containers
129 		Collection failed = null;
130 		try {
131 			failed = CoreApiServiceLocator.getXmlIngesterService().ingest(collections);
132 		} catch (Exception e) {
133 			LOG.error("Error ingesting data", e);
134 			// throw new RuntimeException(e);
135 		}
136 
137 		// now iterate through all containers again, and move containers to approprate dir
138 		LOG.info("Moving files...");
139 		collectionsIt = collections.iterator();
140 		while (collectionsIt.hasNext()) {
141 			XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
142 			LOG.debug("container: " + container);
143 			try {
144 				// "close" the container
145 				// this only matters for ZipFiles for now
146 				container.close();
147 			} catch (IOException ioe) {
148 				LOG.warn("Error closing " + container, ioe);
149 			}
150 			if (failed.contains(container)) {
151 				// some docs must have failed, move the whole
152 				// container to the failed dir
153 				if (container.getFile() != null) {
154 					LOG.error("Moving " + container.getFile() + " to problem dir.");
155 					if ((!failedDir.isDirectory() && !failedDir.mkdirs()) || !moveFile(failedDir, container.getFile())) {
156 						LOG.error("Could not move: " + container.getFile());
157 						recordUnmovablePendingFile(container.getFile(), LOAD_TIME);
158 					}
159 				}
160 			} else {
161 				if (container.getFile() != null) {
162 					LOG.info("Moving " + container.getFile() + " to loaded dir.");
163 					if ((!completeDir.isDirectory() && !completeDir.mkdirs()) || !moveFile(completeDir, container.getFile())) {
164 						LOG.error("Could not move: " + container.getFile());
165 						recordUnmovablePendingFile(container.getFile(), LOAD_TIME);
166 					}
167 				}
168 			}
169 		}
170 	}
171 
172 	private boolean inPendingMoveFailedArchive(File xmlDataFile) {
173 		if (xmlDataFile == null)
174 			return false;
175 		BufferedReader inFile = null;
176 		File movesFailedFile = new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE);
177 		if (!movesFailedFile.isFile())
178 			return false;
179 		try {
180 			inFile = new BufferedReader(new FileReader(movesFailedFile));
181 			String line;
182 
183 			while ((line = inFile.readLine()) != null) {
184 				String trimmedLine = line.trim();
185 				if (trimmedLine.equals(xmlDataFile.getName()) || trimmedLine.startsWith(xmlDataFile.getName() + "=")) {
186 					return true;
187 				}
188 			}
189 		} catch (IOException e) {
190 			LOG.warn("Error reading file " + movesFailedFile);
191 			// TODO try reading the pending file or stop?
192 		} finally {
193 			if (inFile != null)
194 				try {
195 					inFile.close();
196 				} catch (Exception e) {
197 					LOG.warn("Error closing buffered reader for " + movesFailedFile);
198 				}
199 		}
200 
201 		return false;
202 	}
203 
204 	private boolean recordUnmovablePendingFile(File unMovablePendingFile, Date dateLoaded) {
205 		boolean recorded = false;
206 		FileWriter archiveFile = null;
207 		try {
208 			archiveFile = new FileWriter(new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE), true);
209 			archiveFile.write(unMovablePendingFile.getName() + "=" + dateLoaded.getTime() + NEW_LINE);
210 			recorded = true;
211 		} catch (IOException e) {
212 			LOG.error("Unable to record unmovable pending file " + unMovablePendingFile.getName() + "in the archive file " + PENDING_MOVE_FAILED_ARCHIVE_FILE);
213 		} finally {
214 			if (archiveFile != null) {
215 				try {
216 					archiveFile.close();
217 				} catch (IOException ioe) {
218 					LOG.error("Error closing unmovable pending file", ioe);
219 				}
220 			}
221 		}
222 		return recorded;
223 	}
224 
225 	private boolean moveFile(File toDirectory, File fileToMove) {
226 		boolean moved = true;
227 		if (!fileToMove.renameTo(new File(toDirectory.getPath(), fileToMove.getName()))) {
228 			LOG.error("Unable to move file " + fileToMove.getName() + " to directory " + toDirectory.getPath());
229 			moved = false;
230 		}
231 		return moved;
232 	}
233 
234 	private File getXmlPendingDir() {
235 		return new File(getXmlPendingLocation());
236 	}
237 
238 	private File getXmlCompleteDir() {
239 		return new File(getXmlCompletedLocation());
240 	}
241 
242 	private File getXmlProblemDir() {
243 		return new File(getXmlProblemLocation());
244 	}
245 
246 	public String getXmlCompletedLocation() {
247 		return xmlCompletedLocation;
248 	}
249 
250 	public void setXmlCompletedLocation(String xmlCompletedLocation) {
251 		this.xmlCompletedLocation = xmlCompletedLocation;
252 	}
253 
254 	public String getXmlPendingLocation() {
255 		return xmlPendingLocation;
256 	}
257 
258 	/*
259 	 * public boolean validate(File uploadedFile) { XmlDataLoaderFileFilter filter = new XmlDataLoaderFileFilter(); return
260 	 * filter.accept(uploadedFile); }
261 	 */
262 
263 	public void setXmlPendingLocation(String xmlPendingLocation) {
264 		this.xmlPendingLocation = xmlPendingLocation;
265 	}
266 
267 	public String getXmlProblemLocation() {
268 		return xmlProblemLocation;
269 	}
270 
271 	public void setXmlProblemLocation(String xmlProblemLocation) {
272 		this.xmlProblemLocation = xmlProblemLocation;
273 	}
274 
275 	public String getXmlParentDirectory() {
276 		return xmlParentDirectory;
277 	}
278 
279 	public void setXmlParentDirectory(String xmlDataParentDirectory) {
280 		this.xmlParentDirectory = xmlDataParentDirectory;
281 	}
282 
283 	/**
284 	 * Sets the polling interval time in seconds
285 	 * 
286 	 * @param seconds
287 	 *            the polling interval time in seconds
288 	 */
289 	public void setPollIntervalSecs(int seconds) {
290 		this.pollIntervalSecs = seconds;
291 	}
292 
293 	/**
294 	 * Gets the polling interval time in seconds
295 	 * 
296 	 * @return the polling interval time in seconds
297 	 */
298 	public int getPollIntervalSecs() {
299 		return this.pollIntervalSecs;
300 	}
301 
302 	/**
303 	 * Sets the initial delay time in seconds
304 	 * 
305 	 * @param seconds
306 	 *            the initial delay time in seconds
307 	 */
308 	public void setInitialDelaySecs(int seconds) {
309 		this.initialDelaySecs = seconds;
310 	}
311 
312 	/**
313 	 * Gets the initial delay time in seconds
314 	 * 
315 	 * @return the initial delay time in seconds
316 	 */
317 	public int getInitialDelaySecs() {
318 		return this.initialDelaySecs;
319 	}
320 }