001 /**
002 * Copyright 2005-2013 The Kuali Foundation
003 *
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/ecl2.php
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package org.kuali.rice.kew.batch;
017
018 import org.kuali.rice.core.api.CoreApiServiceLocator;
019 import org.kuali.rice.core.api.impex.xml.DirectoryXmlDocCollection;
020 import org.kuali.rice.core.api.impex.xml.FileXmlDocCollection;
021 import org.kuali.rice.core.api.impex.xml.XmlDocCollection;
022 import org.kuali.rice.core.api.impex.xml.ZipXmlDocCollection;
023
024 import java.io.BufferedReader;
025 import java.io.File;
026 import java.io.FileReader;
027 import java.io.FileWriter;
028 import java.io.IOException;
029 import java.text.Format;
030 import java.text.SimpleDateFormat;
031 import java.util.ArrayList;
032 import java.util.Calendar;
033 import java.util.Collection;
034 import java.util.Date;
035 import java.util.Iterator;
036 import java.util.List;
037
038
039 /**
040 * Utility class responsible for polling and ingesting XML data files
041 * containing various forms of workflow engine data (e.g. document types
042 * and rules).
043 * Loaded files and problem files are placed into a subdirectory of a
044 * configured 'loaded' and 'problem' directory, respectively.
045 * "Problem-ness" is determined by inspecting a 'processed' flag on each <code>XmlDoc</code>
046 * in each collection. If not all <code>XmlDoc</code>s are marked 'processed' an
047 * error is assumed, and the collection file (e.g. for a Zip, the Zip file) is moved
048 * to the 'problem' directory.
049 * As such, it is the <b><code>XmlIngesterService</code>'s responsibility</b> to mark
050 * any unknown or otherwise innocuous non-failure non-processed files, as 'processed'.
051 * A different mechanism can be developed if this proves to be a problem, but for now
052 * it is simple enough for the <code>XmlIngesterService</code> to determine this.
053 * @see org.kuali.rice.kew.batch.XmlPollerService
054 * @see org.kuali.rice.kew.batch.XmlIngesterServiceImpl
055 * @author Kuali Rice Team (rice.collab@kuali.org)
056 */
057 public class XmlPollerServiceImpl implements XmlPollerService {
058
059 private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger
060 .getLogger(XmlPollerServiceImpl.class);
061 private static final Format DIR_FORMAT = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss-SSS");
062
063 /**
064 * Specifies the polling interval that should be used with this task.
065 */
066 private int pollIntervalSecs = 5 * 60; // default to 5 minutes
067 /**
068 * Specifies the initial delay the poller should wait before starting to poll
069 */
070 private int initialDelaySecs = 30; // default to 30 seconds
071 /**
072 * Location in which to find XML files to load.
073 */
074 private String xmlPendingLocation;
075 /**
076 * Location in which to place successfully loaded XML files.
077 */
078 private String xmlCompletedLocation;
079 /**
080 * Location in which to place XML files which have failed to load.
081 */
082 private String xmlProblemLocation;
083
084 private String xmlParentDirectory;
085 private static final String PENDING_MOVE_FAILED_ARCHIVE_FILE = "movesfailed";
086 private static final String NEW_LINE = "\n";
087
088 public void run() {
089 // if(!directoriesWritable()){
090 // LOG.error("Error writing to xml data directories. Stopping xmlLoader ...");
091 // this.cancel();
092 // }
093 LOG.debug("checking for xml data files...");
094 File[] files = getXmlPendingDir().listFiles();
095 if (files == null || files.length == 0) {
096 return;
097 }
098 LOG.info("Found " + files.length + " files to ingest.");
099 List<XmlDocCollection> collections = new ArrayList<XmlDocCollection>();
100 for (File file : files)
101 {
102 if (file.isDirectory())
103 {
104 collections.add(new DirectoryXmlDocCollection(file));
105 } else if (file.getName().equals(PENDING_MOVE_FAILED_ARCHIVE_FILE))
106 {
107 // the movesfailed file...ignore this
108 continue;
109 } else if (file.getName().toLowerCase().endsWith(".zip"))
110 {
111 try
112 {
113 collections.add(new ZipXmlDocCollection(file));
114 } catch (IOException ioe)
115 {
116 LOG.error("Unable to load file: " + file);
117 }
118 } else if (file.getName().endsWith(".xml"))
119 {
120 collections.add(new FileXmlDocCollection(file));
121 } else
122 {
123 LOG.warn("Ignoring extraneous file in xml pending directory: " + file);
124 }
125 }
126
127 // Cull any resources which were already processed and whose moves failed
128 Iterator collectionsIt = collections.iterator();
129 Collection<XmlDocCollection> culled = new ArrayList<XmlDocCollection>();
130 while (collectionsIt.hasNext()) {
131 XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
132 // if a move has already failed for this archive, ignore it
133 if (inPendingMoveFailedArchive(container.getFile())) {
134 LOG.info("Ignoring previously processed resource: " + container);
135 culled.add(container);
136 }
137 }
138 collections.removeAll(culled);
139
140 if (collections.size() == 0) {
141 LOG.debug("No valid new resources found to ingest");
142 return;
143 }
144
145 Date LOAD_TIME = Calendar.getInstance().getTime();
146 // synchronization around date format should not be an issue as this code is single-threaded
147 File completeDir = new File(getXmlCompleteDir(), DIR_FORMAT.format(LOAD_TIME));
148 File failedDir = new File(getXmlProblemDir(), DIR_FORMAT.format(LOAD_TIME));
149
150 // now ingest the containers
151 Collection failed = null;
152 try {
153 failed = CoreApiServiceLocator.getXmlIngesterService().ingest(collections);
154 } catch (Exception e) {
155 LOG.error("Error ingesting data", e);
156 //throw new RuntimeException(e);
157 }
158
159 // now iterate through all containers again, and move containers to approprate dir
160 LOG.info("Moving files...");
161 collectionsIt = collections.iterator();
162 while (collectionsIt.hasNext()) {
163 XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
164 LOG.debug("container: " + container);
165 try {
166 // "close" the container
167 // this only matters for ZipFiles for now
168 container.close();
169 } catch (IOException ioe) {
170 LOG.warn("Error closing " + container, ioe);
171 }
172 if (failed.contains(container)) {
173 // some docs must have failed, move the whole
174 // container to the failed dir
175 if (container.getFile() != null) {
176 LOG.error("Moving " + container.getFile() + " to problem dir.");
177 if ((!failedDir.isDirectory() && !failedDir.mkdirs())
178 || !moveFile(failedDir, container.getFile())) {
179 LOG.error("Could not move: " + container.getFile());
180 recordUnmovablePendingFile(container.getFile(), LOAD_TIME);
181 }
182 }
183 } else {
184 if (container.getFile() != null) {
185 LOG.info("Moving " + container.getFile() + " to loaded dir.");
186 if((!completeDir.isDirectory() && !completeDir.mkdirs())
187 || !moveFile(completeDir, container.getFile())){
188 LOG.error("Could not move: " + container.getFile());
189 recordUnmovablePendingFile(container.getFile(), LOAD_TIME);
190 }
191 }
192 }
193 }
194 }
195
196 private boolean inPendingMoveFailedArchive(File xmlDataFile){
197 if (xmlDataFile == null) return false;
198 BufferedReader inFile = null;
199 File movesFailedFile = new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE);
200 if (!movesFailedFile.isFile()) return false;
201 try {
202 inFile = new BufferedReader(new FileReader(movesFailedFile));
203 String line;
204
205 while((line = inFile.readLine()) != null){
206 String trimmedLine = line.trim();
207 if(trimmedLine.equals(xmlDataFile.getName()) ||
208 trimmedLine.startsWith(xmlDataFile.getName() + "=")) {
209 return true;
210 }
211 }
212 } catch (IOException e){
213 LOG.warn("Error reading file " + movesFailedFile);
214 //TODO try reading the pending file or stop?
215 } finally {
216 if (inFile != null) try {
217 inFile.close();
218 } catch (Exception e) {
219 LOG.warn("Error closing buffered reader for " + movesFailedFile);
220 }
221 }
222
223 return false;
224 }
225
226 private boolean recordUnmovablePendingFile(File unMovablePendingFile, Date dateLoaded){
227 boolean recorded = false;
228 FileWriter archiveFile = null;
229 try{
230 archiveFile = new FileWriter(new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE), true);
231 archiveFile.write(unMovablePendingFile.getName() + "=" + dateLoaded.getTime() + NEW_LINE);
232 recorded = true;
233 } catch (IOException e){
234 LOG.error("Unable to record unmovable pending file " + unMovablePendingFile.getName() + "in the archive file " + PENDING_MOVE_FAILED_ARCHIVE_FILE);
235 } finally {
236 if (archiveFile != null) {
237 try {
238 archiveFile.close();
239 } catch (IOException ioe) {
240 LOG.error("Error closing unmovable pending file", ioe);
241 }
242 }
243 }
244 return recorded;
245 }
246
247 private boolean moveFile(File toDirectory, File fileToMove){
248 boolean moved = true;
249 if (!fileToMove.renameTo(new File(toDirectory.getPath(), fileToMove.getName()))){
250 LOG.error("Unable to move file " + fileToMove.getName() + " to directory " + toDirectory.getPath());
251 moved = false;
252 }
253 return moved;
254 }
255
256 private File getXmlPendingDir() {
257 return new File(getXmlPendingLocation());
258 }
259
260 private File getXmlCompleteDir() {
261 return new File(getXmlCompletedLocation());
262 }
263
264 private File getXmlProblemDir() {
265 return new File(getXmlProblemLocation());
266 }
267
268 public String getXmlCompletedLocation() {
269 return xmlCompletedLocation;
270 }
271
272 public void setXmlCompletedLocation(String xmlCompletedLocation) {
273 this.xmlCompletedLocation = xmlCompletedLocation;
274 }
275
276 public String getXmlPendingLocation() {
277 return xmlPendingLocation;
278 }
279
280 /*public boolean validate(File uploadedFile) {
281 XmlDataLoaderFileFilter filter = new XmlDataLoaderFileFilter();
282 return filter.accept(uploadedFile);
283 }*/
284
285 public void setXmlPendingLocation(String xmlPendingLocation) {
286 this.xmlPendingLocation = xmlPendingLocation;
287 }
288
289 public String getXmlProblemLocation() {
290 return xmlProblemLocation;
291 }
292
293 public void setXmlProblemLocation(String xmlProblemLocation) {
294 this.xmlProblemLocation = xmlProblemLocation;
295 }
296 public String getXmlParentDirectory() {
297 return xmlParentDirectory;
298 }
299 public void setXmlParentDirectory(String xmlDataParentDirectory) {
300 this.xmlParentDirectory = xmlDataParentDirectory;
301 }
302
303 /**
304 * Sets the polling interval time in seconds
305 * @param seconds the polling interval time in seconds
306 */
307 public void setPollIntervalSecs(int seconds) {
308 this.pollIntervalSecs = seconds;
309 }
310
311 /**
312 * Gets the polling interval time in seconds
313 * @return the polling interval time in seconds
314 */
315 public int getPollIntervalSecs() {
316 return this.pollIntervalSecs;
317 }
318
319 /**
320 * Sets the initial delay time in seconds
321 * @param seconds the initial delay time in seconds
322 */
323 public void setInitialDelaySecs(int seconds) {
324 this.initialDelaySecs = seconds;
325 }
326
327 /**
328 * Gets the initial delay time in seconds
329 * @return the initial delay time in seconds
330 */
331 public int getInitialDelaySecs() {
332 return this.initialDelaySecs;
333 }
334 }