View Javadoc
1   /**
2    * Copyright 2005-2015 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.ksb.messaging.serviceproxies;
17  
18  import org.apache.log4j.Logger;
19  import org.kuali.rice.core.api.exception.RiceRuntimeException;
20  import org.kuali.rice.core.api.util.ClassLoaderUtils;
21  import org.kuali.rice.core.api.util.ContextClassLoaderProxy;
22  import org.kuali.rice.core.api.util.reflect.BaseInvocationHandler;
23  import org.kuali.rice.core.api.util.reflect.TargetedInvocationHandler;
24  import org.kuali.rice.ksb.api.bus.Endpoint;
25  import org.kuali.rice.ksb.api.bus.ServiceConfiguration;
26  import org.kuali.rice.ksb.api.messaging.AsynchronousCall;
27  import org.kuali.rice.ksb.messaging.PersistedMessageBO;
28  import org.kuali.rice.ksb.messaging.quartz.MessageServiceExecutorJob;
29  import org.kuali.rice.ksb.messaging.quartz.MessageServiceExecutorJobListener;
30  import org.kuali.rice.ksb.service.KSBServiceLocator;
31  import org.quartz.JobDataMap;
32  import org.quartz.JobDetail;
33  import org.quartz.Scheduler;
34  import org.quartz.SchedulerException;
35  import org.quartz.SimpleTrigger;
36  import org.quartz.Trigger;
37  
38  import java.io.Serializable;
39  import java.lang.reflect.Method;
40  import java.lang.reflect.Proxy;
41  import java.sql.Timestamp;
42  import java.util.Calendar;
43  import java.util.List;
44  
45  
46  /**
47   * A proxy which schedules a service to be executed asynchronously after some delay period.
48   * 
49   * @author Kuali Rice Team (rice.collab@kuali.org)
50   */
51  public class DelayedAsynchronousServiceCallProxy extends BaseInvocationHandler implements TargetedInvocationHandler {
52  
53      private static final Logger LOG = Logger.getLogger(DelayedAsynchronousServiceCallProxy.class);
54  
55      List<Endpoint> endpoints;
56      private Serializable context;
57      private String value1;
58      private String value2;
59      private long delayMilliseconds;
60  
61      protected DelayedAsynchronousServiceCallProxy(List<Endpoint> endpoints, Serializable context,
62  	    String value1, String value2, long delayMilliseconds) {
63  	this.endpoints = endpoints;
64  	this.context = context;
65  	this.value1 = value1;
66  	this.value2 = value2;
67  	this.delayMilliseconds = delayMilliseconds;
68      }
69  
70      public static Object createInstance(List<Endpoint> endpoints, Serializable context, String value1,
71  	    String value2, long delayMilliseconds) {
72  	if (endpoints == null || endpoints.isEmpty()) {
73  	    throw new RuntimeException("Cannot create service proxy, no service(s) passed in.");
74  	}
75  	try {
76  	    return Proxy.newProxyInstance(ClassLoaderUtils.getDefaultClassLoader(), ContextClassLoaderProxy
77  		    .getInterfacesToProxy(endpoints.get(0).getService()),
78  		    new DelayedAsynchronousServiceCallProxy(endpoints, context, value1, value2, delayMilliseconds));
79  	} catch (Exception e) {
80  	    throw new RiceRuntimeException(e);
81  	}
82      }
83  
84      @Override
85      protected Object invokeInternal(Object proxy, Method method, Object[] arguments) throws Throwable {
86  	    // there are multiple service calls to make in the case of topics.
87  	    AsynchronousCall methodCall = null;
88  	    PersistedMessageBO message = null;
89  	    synchronized (this) {
90  	        // consider moving all this topic invocation stuff to the service
91  	        // invoker for speed reasons
92  	        for (Endpoint endpoint : this.endpoints) {
93  		        ServiceConfiguration serviceConfiguration = endpoint.getServiceConfiguration();
94  		        methodCall = new AsynchronousCall(method.getParameterTypes(), arguments, serviceConfiguration,
95                          method.getName(), null, this.context);
96  		        message = PersistedMessageBO.buildMessage(serviceConfiguration, methodCall);
97  		        message.setValue1(this.value1);
98  		        message.setValue2(this.value2);
99  		        Calendar now = Calendar.getInstance();
100 		        now.add(Calendar.MILLISECOND, (int) delayMilliseconds);
101 		        message.setQueueDate(new Timestamp(now.getTimeInMillis()));
102 		        scheduleMessage(message);
103 		        // only do one iteration if this is a queue. The load balancing
104 		        // will be handled when the service is
105 		        // fetched by the MessageServiceInvoker through the GRL (and
106 		        // then through the RemoteResourceServiceLocatorImpl)
107 		        if (serviceConfiguration.isQueue()) {
108 		            break;
109 		        }
110 	        }
111 	    }
112 	    return null;
113     }
114 
115     protected void scheduleMessage(PersistedMessageBO message) throws SchedulerException {
116 	LOG.debug("Scheduling execution of a delayed asynchronous message.");
117 	Scheduler scheduler = KSBServiceLocator.getScheduler();
118 	JobDataMap jobData = new JobDataMap();
119 	jobData.put(MessageServiceExecutorJob.MESSAGE_KEY, message);
120 	JobDetail jobDetail = new JobDetail("Delayed_Asynchronous_Call-" + Math.random(), "Delayed_Asynchronous_Call",
121 		MessageServiceExecutorJob.class);
122 	jobDetail.setJobDataMap(jobData);
123 	jobDetail.addJobListener(MessageServiceExecutorJobListener.NAME);
124 	Trigger trigger = new SimpleTrigger("Delayed_Asynchronous_Call_Trigger-" + Math.random(),
125 		"Delayed_Asynchronous_Call", message.getQueueDate());
126 	trigger.setJobDataMap(jobData);// 1.6 bug required or derby will choke
127 	scheduler.scheduleJob(jobDetail, trigger);
128     }
129 
130     /**
131          * Returns the List<RemotedServiceHolder> of asynchronous services which will be invoked by calls to this proxy.
132          * This is a List because, in the case of Topics, there can be more than one service invoked.
133          */
134     public Object getTarget() {
135 	return this.endpoints;
136     }
137     
138 }