001 /*
002 * Copyright 2005-2007 The Kuali Foundation
003 *
004 *
005 * Licensed under the Educational Community License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.opensource.org/licenses/ecl2.php
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.kuali.rice.kew.batch;
018
019 import java.io.BufferedReader;
020 import java.io.File;
021 import java.io.FileReader;
022 import java.io.FileWriter;
023 import java.io.IOException;
024 import java.text.Format;
025 import java.text.SimpleDateFormat;
026 import java.util.ArrayList;
027 import java.util.Arrays;
028 import java.util.Calendar;
029 import java.util.Collection;
030 import java.util.Date;
031 import java.util.Iterator;
032 import java.util.List;
033
034 import org.apache.commons.io.IOUtils;
035 import org.kuali.rice.kew.service.KEWServiceLocator;
036
037 /**
038 * Utility class responsible for polling and ingesting XML data files containing various forms of workflow engine data
039 * (e.g. document types and rules). Loaded files and problem files are placed into a subdirectory of a configured
040 * 'loaded' and 'problem' directory, respectively. "Problem-ness" is determined by inspecting a 'processed' flag on each
041 * <code>XmlDoc</code> in each collection. If not all <code>XmlDoc</code>s are marked 'processed' an error is assumed,
042 * and the collection file (e.g. for a Zip, the Zip file) is moved to the 'problem' directory. As such, it is the <b>
043 * <code>XmlIngesterService</code>'s responsibility</b> to mark any unknown or otherwise innocuous non-failure
044 * non-processed files, as 'processed'. A different mechanism can be developed if this proves to be a problem, but for
045 * now it is simple enough for the <code>XmlIngesterService</code> to determine this.
046 *
047 * @see org.kuali.rice.kew.batch.XmlPollerService
048 * @see org.kuali.rice.kew.batch.XmlIngesterServiceImpl
049 * @author Kuali Rice Team (rice.collab@kuali.org)
050 */
051 public class MyXMLPollerServiceImpl implements XmlPollerService {
052
053 private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger.getLogger(XmlPollerServiceImpl.class);
054 private static final Format DIR_FORMAT = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss-SSS");
055
056 /**
057 * Specifies the polling interval that should be used with this task.
058 */
059 private int pollIntervalSecs = 5 * 60; // default to 5 minutes
060 /**
061 * Specifies the initial delay the poller should wait before starting to poll
062 */
063 private int initialDelaySecs = 30; // default to 30 seconds
064 /**
065 * Location in which to find XML files to load.
066 */
067 private String xmlPendingLocation;
068 /**
069 * Location in which to place successfully loaded XML files.
070 */
071 private String xmlCompletedLocation;
072 /**
073 * Location in which to place XML files which have failed to load.
074 */
075 private String xmlProblemLocation;
076
077 private String xmlParentDirectory;
078 private static final String PENDING_MOVE_FAILED_ARCHIVE_FILE = "movesfailed";
079 private static final String NEW_LINE = "\n";
080
081 @Override
082 public void run() {
083 // if(!directoriesWritable()){
084 // LOG.error("Error writing to xml data directories. Stopping xmlLoader ...");
085 // this.cancel();
086 // }
087 LOG.debug("checking for xml data files...");
088 File[] files = getXmlPendingDir().listFiles();
089 if (files == null || files.length == 0) {
090 return;
091 }
092 // Sort them so the ordering is deterministic by file name instead of random
093 Arrays.sort(files);
094
095 LOG.info("Found " + files.length + " files to ingest.");
096
097 List<XmlDocCollection> collections = new ArrayList<XmlDocCollection>();
098 for (File file : files) {
099 if (file.isDirectory()) {
100 collections.add(new DirectoryXmlDocCollection(file));
101 } else if (file.getName().equals(PENDING_MOVE_FAILED_ARCHIVE_FILE)) {
102 // the movesfailed file...ignore this
103 continue;
104 } else if (file.getName().toLowerCase().endsWith(".zip")) {
105 try {
106 collections.add(new ZipXmlDocCollection(file));
107 } catch (IOException ioe) {
108 LOG.error("Unable to load file: " + file);
109 }
110 } else if (file.getName().endsWith(".xml")) {
111 collections.add(new FileXmlDocCollection(file));
112 } else {
113 LOG.warn("Ignoring extraneous file in xml pending directory: " + file);
114 }
115 }
116
117 // Cull any resources which were already processed and whose moves failed
118 Iterator<?> collectionsIt = collections.iterator();
119 Collection<XmlDocCollection> culled = new ArrayList<XmlDocCollection>();
120 while (collectionsIt.hasNext()) {
121 XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
122 // if a move has already failed for this archive, ignore it
123 if (inPendingMoveFailedArchive(container.getFile())) {
124 LOG.info("Ignoring previously processed resource: " + container);
125 culled.add(container);
126 }
127 }
128 collections.removeAll(culled);
129
130 if (collections.size() == 0) {
131 LOG.debug("No valid new resources found to ingest");
132 return;
133 }
134
135 Date LOAD_TIME = Calendar.getInstance().getTime();
136 // synchronization around date format should not be an issue as this code is single-threaded
137 File completeDir = new File(getXmlCompleteDir(), DIR_FORMAT.format(LOAD_TIME));
138 File failedDir = new File(getXmlProblemDir(), DIR_FORMAT.format(LOAD_TIME));
139
140 // now ingest the containers
141 Collection<?> failed = null;
142 try {
143 failed = KEWServiceLocator.getXmlIngesterService().ingest(collections);
144 } catch (Exception e) {
145 LOG.error("Error ingesting data", e);
146 // throw new RuntimeException(e);
147 }
148
149 // now iterate through all containers again, and move containers to approprate dir
150 LOG.info("Moving files...");
151 collectionsIt = collections.iterator();
152 while (collectionsIt.hasNext()) {
153 XmlDocCollection container = (XmlDocCollection) collectionsIt.next();
154 LOG.debug("container: " + container);
155 try {
156 // "close" the container
157 // this only matters for ZipFiles for now
158 container.close();
159 } catch (IOException ioe) {
160 LOG.warn("Error closing " + container, ioe);
161 }
162 if (failed.contains(container)) {
163 // some docs must have failed, move the whole
164 // container to the failed dir
165 if (container.getFile() != null) {
166 LOG.error("Moving " + container.getFile() + " to problem dir.");
167 if ((!failedDir.isDirectory() && !failedDir.mkdirs()) || !moveFile(failedDir, container.getFile())) {
168 LOG.error("Could not move: " + container.getFile());
169 recordUnmovablePendingFile(container.getFile(), LOAD_TIME);
170 }
171 }
172 } else {
173 if (container.getFile() != null) {
174 LOG.info("Moving " + container.getFile() + " to loaded dir.");
175 if ((!completeDir.isDirectory() && !completeDir.mkdirs())
176 || !moveFile(completeDir, container.getFile())) {
177 LOG.error("Could not move: " + container.getFile());
178 recordUnmovablePendingFile(container.getFile(), LOAD_TIME);
179 }
180 }
181 }
182 }
183 }
184
185 private boolean inPendingMoveFailedArchive(File xmlDataFile) {
186 if (xmlDataFile == null) {
187 return false;
188 }
189 File movesFailedFile = new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE);
190 if (!movesFailedFile.isFile()) {
191 return false;
192 }
193 BufferedReader inFile = null;
194 try {
195 inFile = new BufferedReader(new FileReader(movesFailedFile));
196 String line;
197 boolean found = false;
198 while ((line = inFile.readLine()) != null) {
199 String trimmedLine = line.trim();
200 if (trimmedLine.equals(xmlDataFile.getName()) || trimmedLine.startsWith(xmlDataFile.getName() + "=")) {
201 found = true;
202 break;
203 }
204 }
205 return found;
206 } catch (IOException e) {
207 LOG.warn("Error reading file " + movesFailedFile);
208 } finally {
209 IOUtils.closeQuietly(inFile);
210 }
211 return false;
212 }
213
214 private boolean recordUnmovablePendingFile(File unMovablePendingFile, Date dateLoaded) {
215 boolean recorded = false;
216 FileWriter archiveFile = null;
217 try {
218 archiveFile = new FileWriter(new File(getXmlPendingDir(), PENDING_MOVE_FAILED_ARCHIVE_FILE), true);
219 archiveFile.write(unMovablePendingFile.getName() + "=" + dateLoaded.getTime() + NEW_LINE);
220 recorded = true;
221 } catch (IOException e) {
222 LOG.error("Unable to record unmovable pending file " + unMovablePendingFile.getName()
223 + "in the archive file " + PENDING_MOVE_FAILED_ARCHIVE_FILE);
224 } finally {
225 if (archiveFile != null) {
226 try {
227 archiveFile.close();
228 } catch (IOException ioe) {
229 LOG.error("Error closing unmovable pending file", ioe);
230 }
231 }
232 }
233 return recorded;
234 }
235
236 private boolean moveFile(File toDirectory, File fileToMove) {
237 boolean moved = true;
238 if (!fileToMove.renameTo(new File(toDirectory.getPath(), fileToMove.getName()))) {
239 LOG.error("Unable to move file " + fileToMove.getName() + " to directory " + toDirectory.getPath());
240 moved = false;
241 }
242 return moved;
243 }
244
245 private File getXmlPendingDir() {
246 return new File(getXmlPendingLocation());
247 }
248
249 private File getXmlCompleteDir() {
250 return new File(getXmlCompletedLocation());
251 }
252
253 private File getXmlProblemDir() {
254 return new File(getXmlProblemLocation());
255 }
256
257 public String getXmlCompletedLocation() {
258 return xmlCompletedLocation;
259 }
260
261 public void setXmlCompletedLocation(String xmlCompletedLocation) {
262 this.xmlCompletedLocation = xmlCompletedLocation;
263 }
264
265 public String getXmlPendingLocation() {
266 return xmlPendingLocation;
267 }
268
269 /*
270 * public boolean validate(File uploadedFile) { XmlDataLoaderFileFilter filter = new XmlDataLoaderFileFilter();
271 * return filter.accept(uploadedFile); }
272 */
273
274 public void setXmlPendingLocation(String xmlPendingLocation) {
275 this.xmlPendingLocation = xmlPendingLocation;
276 }
277
278 public String getXmlProblemLocation() {
279 return xmlProblemLocation;
280 }
281
282 public void setXmlProblemLocation(String xmlProblemLocation) {
283 this.xmlProblemLocation = xmlProblemLocation;
284 }
285
286 public String getXmlParentDirectory() {
287 return xmlParentDirectory;
288 }
289
290 public void setXmlParentDirectory(String xmlDataParentDirectory) {
291 this.xmlParentDirectory = xmlDataParentDirectory;
292 }
293
294 /**
295 * Sets the polling interval time in seconds
296 *
297 * @param seconds
298 * the polling interval time in seconds
299 */
300 public void setPollIntervalSecs(int seconds) {
301 this.pollIntervalSecs = seconds;
302 }
303
304 /**
305 * Gets the polling interval time in seconds
306 *
307 * @return the polling interval time in seconds
308 */
309 @Override
310 public int getPollIntervalSecs() {
311 return this.pollIntervalSecs;
312 }
313
314 /**
315 * Sets the initial delay time in seconds
316 *
317 * @param seconds
318 * the initial delay time in seconds
319 */
320 public void setInitialDelaySecs(int seconds) {
321 this.initialDelaySecs = seconds;
322 }
323
324 /**
325 * Gets the initial delay time in seconds
326 *
327 * @return the initial delay time in seconds
328 */
329 @Override
330 public int getInitialDelaySecs() {
331 return this.initialDelaySecs;
332 }
333 }