001 /** 002 * Copyright 2004-2012 The Kuali Foundation 003 * 004 * Licensed under the Educational Community License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/ecl2.php 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 package org.kuali.hr.time.batch; 017 018 import java.util.List; 019 020 import org.apache.commons.lang.StringUtils; 021 import org.apache.log4j.Logger; 022 import org.kuali.hr.time.service.base.TkServiceLocator; 023 import org.kuali.hr.time.util.TKUtils; 024 import org.kuali.hr.time.util.TkConstants; 025 026 /** 027 * Runs on each worker node, schedules jobs to run on the thread pool. 028 */ 029 public class BatchJobEntryPoller extends Thread { 030 private static final Logger LOG = Logger.getLogger(BatchJobEntryPoller.class); 031 032 TkBatchManager manager; 033 int secondsToSleep; 034 int startupSleep; 035 String ipAddress; 036 037 public BatchJobEntryPoller(TkBatchManager manager, int sleepSeconds, int startupSleep) { 038 this.manager = manager; 039 this.secondsToSleep = sleepSeconds; 040 this.startupSleep = startupSleep; 041 this.ipAddress = TKUtils.getIPNumber(); 042 } 043 044 @Override 045 public void run() { 046 047 try { 048 Thread.sleep(1000 * startupSleep); 049 } catch (Exception e) { 050 LOG.error(e); 051 } 052 053 while(true) { 054 LOG.debug("Looking for BatchJobEntries to run on '" + this.ipAddress + "'"); 055 try { 056 //Find any jobs for this ip address that have a status of S 057 List<BatchJobEntry> entries = TkServiceLocator.getBatchJobEntryService().getBatchJobEntries(ipAddress, TkConstants.BATCH_JOB_ENTRY_STATUS.SCHEDULED); 058 059 //Add jobs to the manager 060 for (BatchJobEntry entry : entries) { 061 manager.pool.submit(getRunnable(entry)); 062 } 063 Thread.sleep(1000 * secondsToSleep); 064 } catch (Exception e) { 065 LOG.error(e); 066 } 067 } 068 } 069 070 private BatchJobEntryRunnable getRunnable(BatchJobEntry entry) { 071 BatchJobEntryRunnable bjer = null; 072 073 if (StringUtils.equals(entry.getBatchJobName(), TkConstants.BATCH_JOB_NAMES.APPROVE)) { 074 LOG.debug("Creating EmployeeApprovalBatchJobRunnable."); 075 bjer = new EmployeeApprovalBatchJobRunnable(entry); 076 } else if (StringUtils.equals(entry.getBatchJobName(), TkConstants.BATCH_JOB_NAMES.PAY_PERIOD_END)) { 077 LOG.debug("Creating PayPeriodEndBatchJobRunnable."); 078 bjer = new PayPeriodEndBatchJobRunnable(entry); 079 } else if (StringUtils.equals(entry.getBatchJobName(), TkConstants.BATCH_JOB_NAMES.SUPERVISOR_APPROVAL)) { 080 LOG.debug("Creating SupervisorApprovalBatchJobRunnabble."); 081 bjer = new SupervisorApprovalBatchJobRunnable(entry); 082 } else if (StringUtils.equals(entry.getBatchJobName(), TkConstants.BATCH_JOB_NAMES.INITIATE)) { 083 LOG.debug("Creating InitiateBatchJobRunnable."); 084 bjer = new InitiateBatchJobRunnable(entry); 085 } else if(StringUtils.equals(entry.getBatchJobName(), TkConstants.BATCH_JOB_NAMES.BATCH_APPROVE_MISSED_PUNCH)) { 086 LOG.debug("Creating BatchApproveMissedPunchJobRunnable."); 087 bjer = new BatchApproveMissedPunchJobRunnable(entry); 088 } else { 089 LOG.warn("Unknown BatchJobEntryRunnable found in BatchJobEntry table. Unable to create Runnable."); 090 } 091 092 return bjer; 093 } 094 095 }