001/**
002 * Copyright 2005-2012 The Kuali Foundation
003 *
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/ecl2.php
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.kuali.rice.ksb.messaging.serviceproxies;
017
018import org.apache.log4j.Logger;
019import org.kuali.rice.core.api.exception.RiceRuntimeException;
020import org.kuali.rice.core.api.util.ClassLoaderUtils;
021import org.kuali.rice.core.api.util.ContextClassLoaderProxy;
022import org.kuali.rice.core.api.util.reflect.BaseInvocationHandler;
023import org.kuali.rice.core.api.util.reflect.TargetedInvocationHandler;
024import org.kuali.rice.ksb.api.bus.Endpoint;
025import org.kuali.rice.ksb.api.bus.ServiceConfiguration;
026import org.kuali.rice.ksb.api.messaging.AsynchronousCall;
027import org.kuali.rice.ksb.messaging.PersistedMessageBO;
028import org.kuali.rice.ksb.messaging.quartz.MessageServiceExecutorJob;
029import org.kuali.rice.ksb.messaging.quartz.MessageServiceExecutorJobListener;
030import org.kuali.rice.ksb.service.KSBServiceLocator;
031import org.quartz.JobDataMap;
032import org.quartz.JobDetail;
033import org.quartz.Scheduler;
034import org.quartz.SchedulerException;
035import org.quartz.SimpleTrigger;
036import org.quartz.Trigger;
037
038import java.io.Serializable;
039import java.lang.reflect.Method;
040import java.lang.reflect.Proxy;
041import java.sql.Timestamp;
042import java.util.Calendar;
043import java.util.List;
044
045
046/**
047 * A proxy which schedules a service to be executed asynchronously after some delay period.
048 * 
049 * @author Kuali Rice Team (rice.collab@kuali.org)
050 */
051public class DelayedAsynchronousServiceCallProxy extends BaseInvocationHandler implements TargetedInvocationHandler {
052
053    private static final Logger LOG = Logger.getLogger(DelayedAsynchronousServiceCallProxy.class);
054
055    List<Endpoint> endpoints;
056    private Serializable context;
057    private String value1;
058    private String value2;
059    private long delayMilliseconds;
060
061    protected DelayedAsynchronousServiceCallProxy(List<Endpoint> endpoints, Serializable context,
062            String value1, String value2, long delayMilliseconds) {
063        this.endpoints = endpoints;
064        this.context = context;
065        this.value1 = value1;
066        this.value2 = value2;
067        this.delayMilliseconds = delayMilliseconds;
068    }
069
070    public static Object createInstance(List<Endpoint> endpoints, Serializable context, String value1,
071            String value2, long delayMilliseconds) {
072        if (endpoints == null || endpoints.isEmpty()) {
073            throw new RuntimeException("Cannot create service proxy, no service(s) passed in.");
074        }
075        try {
076            return Proxy.newProxyInstance(ClassLoaderUtils.getDefaultClassLoader(), ContextClassLoaderProxy
077                    .getInterfacesToProxy(endpoints.get(0).getService()),
078                    new DelayedAsynchronousServiceCallProxy(endpoints, context, value1, value2, delayMilliseconds));
079        } catch (Exception e) {
080            throw new RiceRuntimeException(e);
081        }
082    }
083
084    @Override
085    protected Object invokeInternal(Object proxy, Method method, Object[] arguments) throws Throwable {
086            // there are multiple service calls to make in the case of topics.
087            AsynchronousCall methodCall = null;
088            PersistedMessageBO message = null;
089            synchronized (this) {
090                // consider moving all this topic invocation stuff to the service
091                // invoker for speed reasons
092                for (Endpoint endpoint : this.endpoints) {
093                        ServiceConfiguration serviceConfiguration = endpoint.getServiceConfiguration();
094                        methodCall = new AsynchronousCall(method.getParameterTypes(), arguments, serviceConfiguration,
095                        method.getName(), null, this.context);
096                        message = PersistedMessageBO.buildMessage(serviceConfiguration, methodCall);
097                        message.setValue1(this.value1);
098                        message.setValue2(this.value2);
099                        Calendar now = Calendar.getInstance();
100                        now.add(Calendar.MILLISECOND, (int) delayMilliseconds);
101                        message.setQueueDate(new Timestamp(now.getTimeInMillis()));
102                        scheduleMessage(message);
103                        // only do one iteration if this is a queue. The load balancing
104                        // will be handled when the service is
105                        // fetched by the MessageServiceInvoker through the GRL (and
106                        // then through the RemoteResourceServiceLocatorImpl)
107                        if (serviceConfiguration.isQueue()) {
108                            break;
109                        }
110                }
111            }
112            return null;
113    }
114
115    protected void scheduleMessage(PersistedMessageBO message) throws SchedulerException {
116        LOG.debug("Scheduling execution of a delayed asynchronous message.");
117        Scheduler scheduler = KSBServiceLocator.getScheduler();
118        JobDataMap jobData = new JobDataMap();
119        jobData.put(MessageServiceExecutorJob.MESSAGE_KEY, message);
120        JobDetail jobDetail = new JobDetail("Delayed_Asynchronous_Call-" + Math.random(), "Delayed_Asynchronous_Call",
121                MessageServiceExecutorJob.class);
122        jobDetail.setJobDataMap(jobData);
123        jobDetail.addJobListener(MessageServiceExecutorJobListener.NAME);
124        Trigger trigger = new SimpleTrigger("Delayed_Asynchronous_Call_Trigger-" + Math.random(),
125                "Delayed_Asynchronous_Call", message.getQueueDate());
126        trigger.setJobDataMap(jobData);// 1.6 bug required or derby will choke
127        scheduler.scheduleJob(jobDetail, trigger);
128    }
129
130    /**
131         * Returns the List<RemotedServiceHolder> of asynchronous services which will be invoked by calls to this proxy.
132         * This is a List because, in the case of Topics, there can be more than one service invoked.
133         */
134    public Object getTarget() {
135        return this.endpoints;
136    }
137    
138}