001 /*
002 * Copyright 2005-2007 The Kuali Foundation
003 *
004 *
005 * Licensed under the Educational Community License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.opensource.org/licenses/ecl2.php
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.kuali.rice.kew.batch;
018
019 import org.kuali.rice.kew.service.KEWServiceLocator;
020
021 import java.io.*;
022 import java.text.Format;
023 import java.text.SimpleDateFormat;
024 import java.util.*;
025
026
027 /**
028 * Utility class responsible for polling and ingesting XML data files
029 * containing various forms of workflow engine data (e.g. document types
030 * and rules).
031 * Loaded files and problem files are placed into a subdirectory of a
032 * configured 'loaded' and 'problem' directory, respectively.
033 * "Problem-ness" is determined by inspecting a 'processed' flag on each <code>XmlDoc</code>
034 * in each collection. If not all <code>XmlDoc</code>s are marked 'processed' an
035 * error is assumed, and the collection file (e.g. for a Zip, the Zip file) is moved
036 * to the 'problem' directory.
037 * As such, it is the <b><code>XmlIngesterService</code>'s responsibility</b> to mark
038 * any unknown or otherwise innocuous non-failure non-processed files, as 'processed'.
039 * A different mechanism can be developed if this proves to be a problem, but for now
040 * it is simple enough for the <code>XmlIngesterService</code> to determine this.
041 * @see org.kuali.rice.kew.batch.XmlPollerService
042 * @see org.kuali.rice.kew.batch.XmlIngesterServiceImpl
043 * @author Kuali Rice Team (rice.collab@kuali.org)
044 */
045 public class XmlPollerServiceImpl implements XmlPollerService {
046
047 private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger
048 .getLogger(XmlPollerServiceImpl.class);
049 private static final Format DIR_FORMAT = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss-SSS");
050
051 /**
052 * Specifies the polling interval that should be used with this task.
053 */
054 private int pollIntervalSecs = 5 * 60; // default to 5 minutes
055 /**
056 * Specifies the initial delay the poller should wait before starting to poll
057 */
058 private int initialDelaySecs = 30; // default to 30 seconds
059 /**
060 * Location in which to find XML files to load.
061 */
062 private String xmlPendingLocation;
063 /**
064 * Location in which to place successfully loaded XML files.
065 */
066 private String xmlCompletedLocation;
067 /**
068 * Location in which to place XML files which have failed to load.
069 */
070 private String xmlProblemLocation;
071
072 private String xmlParentDirectory;
073 private static final String PENDING_MOVE_FAILED_ARCHIVE_FILE = "movesfailed";
074 private static final String NEW_LINE = "\n";
075
076 public void run() {
077 // if(!directoriesWritable()){
078 // LOG.error("Error writing to xml data directories. Stopping xmlLoader ...");
079 // this.cancel();
080 // }
081 LOG.debug("checking for xml data files...");
082 File[] files = getXmlPendingDir().listFiles();
083 if (files == null || files.length == 0) {
084 return;
085 }
086 LOG.info("Found " + files.length + " files to ingest.");
087 List<XmlDocCollection> collections = new ArrayList<XmlDocCollection>();
088 for (File file : files)
089 {
090 if (file.isDirectory())
091 {
092 collections.add(new DirectoryXmlDocCollection(file));
093 } else if (file.getName().equals(PENDING_MOVE_FAILED_ARCHIVE_FILE))
094 {
095 // the movesfailed file...ignore this
096 continue;
097 } else if (file.getName().toLowerCase().endsWith(".zip"))
098 {
099 try
100 {
101 collections.add(new ZipXmlDocCollection(file));
102 } catch (IOException ioe)
103 {
104 LOG.error("Unable to load file: " + file);
105 }
106 } else if (file.getName().endsWith(".xml"))
107 {
108 collections.add(new FileXmlDocCollection(file));
109 } else
110 {
111 LOG.warn("Ignoring extraneous file in xml pending directory: " + file);
112 }
113 }
114
115 // Cull any resources which were already processed and whose moves failed
116 Iterator collectionsIt = collections.iterator();
117 Collection<XmlDocCollection> culled = new ArrayList<XmlDocCollection>();
118 while (collectionsIt.hasNext()) {
119 XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
120 // if a move has already failed for this archive, ignore it
121 if (inPendingMoveFailedArchive(container.getFile())) {
122 LOG.info("Ignoring previously processed resource: " + container);
123 culled.add(container);
124 }
125 }
126 collections.removeAll(culled);
127
128 if (collections.size() == 0) {
129 LOG.debug("No valid new resources found to ingest");
130 return;
131 }
132
133 Date LOAD_TIME = Calendar.getInstance().getTime();
134 // synchronization around date format should not be an issue as this code is single-threaded
135 File completeDir = new File(getXmlCompleteDir(), DIR_FORMAT.format(LOAD_TIME));
136 File failedDir = new File(getXmlProblemDir(), DIR_FORMAT.format(LOAD_TIME));
137
138 // now ingest the containers
139 Collection failed = null;
140 try {
141 failed = KEWServiceLocator.getXmlIngesterService().ingest(collections);
142 } catch (Exception e) {
143 LOG.error("Error ingesting data", e);
144 //throw new RuntimeException(e);
145 }
146
147 // now iterate through all containers again, and move containers to approprate dir
148 LOG.info("Moving files...");
149 collectionsIt = collections.iterator();
150 while (collectionsIt.hasNext()) {
151 XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
152 LOG.debug("container: " + container);
153 try {
154 // "close" the container
155 // this only matters for ZipFiles for now
156 container.close();
157 } catch (IOException ioe) {
158 LOG.warn("Error closing " + container, ioe);
159 }
160 if (failed.contains(container)) {
161 // some docs must have failed, move the whole
162 // container to the failed dir
163 if (container.getFile() != null) {
164 LOG.error("Moving " + container.getFile() + " to problem dir.");
165 if ((!failedDir.isDirectory() && !failedDir.mkdirs())
166 || !moveFile(failedDir, container.getFile())) {
167 LOG.error("Could not move: " + container.getFile());
168 recordUnmovablePendingFile(container.getFile(), LOAD_TIME);
169 }
170 }
171 } else {
172 if (container.getFile() != null) {
173 LOG.info("Moving " + container.getFile() + " to loaded dir.");
174 if((!completeDir.isDirectory() && !completeDir.mkdirs())
175 || !moveFile(completeDir, container.getFile())){
176 LOG.error("Could not move: " + container.getFile());
177 recordUnmovablePendingFile(container.getFile(), LOAD_TIME);
178 }
179 }
180 }
181 }
182 }
183
184 private boolean inPendingMoveFailedArchive(File xmlDataFile){
185 if (xmlDataFile == null) return false;
186 BufferedReader inFile = null;
187 File movesFailedFile = new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE);
188 if (!movesFailedFile.isFile()) return false;
189 try {
190 inFile = new BufferedReader(new FileReader(movesFailedFile));
191 String line;
192
193 while((line = inFile.readLine()) != null){
194 String trimmedLine = line.trim();
195 if(trimmedLine.equals(xmlDataFile.getName()) ||
196 trimmedLine.startsWith(xmlDataFile.getName() + "=")) {
197 return true;
198 }
199 }
200 } catch (IOException e){
201 LOG.warn("Error reading file " + movesFailedFile);
202 //TODO try reading the pending file or stop?
203 } finally {
204 if (inFile != null) try {
205 inFile.close();
206 } catch (Exception e) {
207 LOG.warn("Error closing buffered reader for " + movesFailedFile);
208 }
209 }
210
211 return false;
212 }
213
214 private boolean recordUnmovablePendingFile(File unMovablePendingFile, Date dateLoaded){
215 boolean recorded = false;
216 FileWriter archiveFile = null;
217 try{
218 archiveFile = new FileWriter(new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE), true);
219 archiveFile.write(unMovablePendingFile.getName() + "=" + dateLoaded.getTime() + NEW_LINE);
220 recorded = true;
221 } catch (IOException e){
222 LOG.error("Unable to record unmovable pending file " + unMovablePendingFile.getName() + "in the archive file " + PENDING_MOVE_FAILED_ARCHIVE_FILE);
223 } finally {
224 if (archiveFile != null) {
225 try {
226 archiveFile.close();
227 } catch (IOException ioe) {
228 LOG.error("Error closing unmovable pending file", ioe);
229 }
230 }
231 }
232 return recorded;
233 }
234
235 private boolean moveFile(File toDirectory, File fileToMove){
236 boolean moved = true;
237 if (!fileToMove.renameTo(new File(toDirectory.getPath(), fileToMove.getName()))){
238 LOG.error("Unable to move file " + fileToMove.getName() + " to directory " + toDirectory.getPath());
239 moved = false;
240 }
241 return moved;
242 }
243
244 private File getXmlPendingDir() {
245 return new File(getXmlPendingLocation());
246 }
247
248 private File getXmlCompleteDir() {
249 return new File(getXmlCompletedLocation());
250 }
251
252 private File getXmlProblemDir() {
253 return new File(getXmlProblemLocation());
254 }
255
256 public String getXmlCompletedLocation() {
257 return xmlCompletedLocation;
258 }
259
260 public void setXmlCompletedLocation(String xmlCompletedLocation) {
261 this.xmlCompletedLocation = xmlCompletedLocation;
262 }
263
264 public String getXmlPendingLocation() {
265 return xmlPendingLocation;
266 }
267
268 /*public boolean validate(File uploadedFile) {
269 XmlDataLoaderFileFilter filter = new XmlDataLoaderFileFilter();
270 return filter.accept(uploadedFile);
271 }*/
272
273 public void setXmlPendingLocation(String xmlPendingLocation) {
274 this.xmlPendingLocation = xmlPendingLocation;
275 }
276
277 public String getXmlProblemLocation() {
278 return xmlProblemLocation;
279 }
280
281 public void setXmlProblemLocation(String xmlProblemLocation) {
282 this.xmlProblemLocation = xmlProblemLocation;
283 }
284 public String getXmlParentDirectory() {
285 return xmlParentDirectory;
286 }
287 public void setXmlParentDirectory(String xmlDataParentDirectory) {
288 this.xmlParentDirectory = xmlDataParentDirectory;
289 }
290
291 /**
292 * Sets the polling interval time in seconds
293 * @param seconds the polling interval time in seconds
294 */
295 public void setPollIntervalSecs(int seconds) {
296 this.pollIntervalSecs = seconds;
297 }
298
299 /**
300 * Gets the polling interval time in seconds
301 * @return the polling interval time in seconds
302 */
303 public int getPollIntervalSecs() {
304 return this.pollIntervalSecs;
305 }
306
307 /**
308 * Sets the initial delay time in seconds
309 * @param seconds the initial delay time in seconds
310 */
311 public void setInitialDelaySecs(int seconds) {
312 this.initialDelaySecs = seconds;
313 }
314
315 /**
316 * Gets the initial delay time in seconds
317 * @return the initial delay time in seconds
318 */
319 public int getInitialDelaySecs() {
320 return this.initialDelaySecs;
321 }
322 }