001 /** 002 * Copyright 2004-2013 The Kuali Foundation 003 * 004 * Licensed under the Educational Community License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/ecl2.php 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 package org.kuali.hr.time.batch; 017 018 import java.util.List; 019 020 import org.apache.commons.lang.StringUtils; 021 import org.apache.log4j.Logger; 022 import org.kuali.hr.time.service.base.TkServiceLocator; 023 import org.kuali.hr.time.util.TKUtils; 024 import org.kuali.hr.time.util.TkConstants; 025 026 /** 027 * Runs on each worker node, schedules jobs to run on the thread pool. 028 */ 029 public class BatchJobEntryPoller extends Thread { 030 private static final Logger LOG = Logger.getLogger(BatchJobEntryPoller.class); 031 032 TkBatchManager manager; 033 int secondsToSleep; 034 int startupSleep; 035 String ipAddress; 036 037 public BatchJobEntryPoller(TkBatchManager manager, int sleepSeconds, int startupSleep) { 038 this.manager = manager; 039 this.secondsToSleep = sleepSeconds; 040 this.startupSleep = startupSleep; 041 this.ipAddress = TKUtils.getIPNumber(); 042 } 043 044 @Override 045 public void run() { 046 047 try { 048 Thread.sleep(1000 * startupSleep); 049 } catch (Exception e) { 050 LOG.error(e); 051 } 052 053 while(true) { 054 LOG.debug("Looking for BatchJobEntries to run on '" + this.ipAddress + "'"); 055 try { 056 //Find any jobs for this ip address that have a status of S 057 List<BatchJobEntry> entries = TkServiceLocator.getBatchJobEntryService().getBatchJobEntries(ipAddress, TkConstants.BATCH_JOB_ENTRY_STATUS.SCHEDULED); 058 059 //Add jobs to the manager 060 for (BatchJobEntry entry : entries) { 061 BatchJobEntryRunnable batchJobEntryRunnable = getRunnable(entry); 062 if (batchJobEntryRunnable != null) { 063 manager.pool.submit(batchJobEntryRunnable); 064 } 065 } 066 Thread.sleep(1000 * secondsToSleep); 067 } catch (Exception e) { 068 LOG.error(e); 069 } 070 } 071 } 072 073 private BatchJobEntryRunnable getRunnable(BatchJobEntry entry) { 074 BatchJobEntryRunnable bjer = null; 075 076 if (StringUtils.equals(entry.getBatchJobName(), TkConstants.BATCH_JOB_NAMES.APPROVE)) { 077 LOG.debug("Creating EmployeeApprovalBatchJobRunnable."); 078 bjer = new EmployeeApprovalBatchJobRunnable(entry); 079 } else if (StringUtils.equals(entry.getBatchJobName(), TkConstants.BATCH_JOB_NAMES.PAY_PERIOD_END)) { 080 LOG.debug("Creating PayPeriodEndBatchJobRunnable."); 081 bjer = new PayPeriodEndBatchJobRunnable(entry); 082 } else if (StringUtils.equals(entry.getBatchJobName(), TkConstants.BATCH_JOB_NAMES.SUPERVISOR_APPROVAL)) { 083 LOG.debug("Creating SupervisorApprovalBatchJobRunnable."); 084 bjer = new SupervisorApprovalBatchJobRunnable(entry); 085 } else if (StringUtils.equals(entry.getBatchJobName(), TkConstants.BATCH_JOB_NAMES.INITIATE)) { 086 LOG.debug("Creating InitiateBatchJobRunnable."); 087 bjer = new InitiateBatchJobRunnable(entry); 088 } else if(StringUtils.equals(entry.getBatchJobName(), TkConstants.BATCH_JOB_NAMES.BATCH_APPROVE_MISSED_PUNCH)) { 089 LOG.debug("BatchApproveMissedPunchJobRunnable is not run on a regular basis, skipping..."); 090 } else { 091 LOG.warn("Unknown BatchJobEntryRunnable found in BatchJobEntry table. Unable to create Runnable."); 092 } 093 094 return bjer; 095 } 096 097 }