1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.ksb.messaging.serviceproxies;
17
18 import org.apache.log4j.Logger;
19 import org.kuali.rice.core.api.exception.RiceRuntimeException;
20 import org.kuali.rice.core.api.util.ClassLoaderUtils;
21 import org.kuali.rice.core.api.util.reflect.BaseInvocationHandler;
22 import org.kuali.rice.core.api.util.reflect.TargetedInvocationHandler;
23 import org.kuali.rice.core.impl.resourceloader.ContextClassLoaderProxy;
24 import org.kuali.rice.ksb.api.bus.Endpoint;
25 import org.kuali.rice.ksb.api.bus.ServiceConfiguration;
26 import org.kuali.rice.ksb.api.messaging.AsynchronousCall;
27 import org.kuali.rice.ksb.messaging.PersistedMessageBO;
28 import org.kuali.rice.ksb.messaging.quartz.MessageServiceExecutorJob;
29 import org.kuali.rice.ksb.messaging.quartz.MessageServiceExecutorJobListener;
30 import org.kuali.rice.ksb.service.KSBServiceLocator;
31 import org.quartz.JobDataMap;
32 import org.quartz.JobDetail;
33 import org.quartz.Scheduler;
34 import org.quartz.SchedulerException;
35 import org.quartz.SimpleTrigger;
36 import org.quartz.Trigger;
37
38 import java.io.Serializable;
39 import java.lang.reflect.Method;
40 import java.lang.reflect.Proxy;
41 import java.sql.Timestamp;
42 import java.util.Calendar;
43 import java.util.List;
44
45
46
47
48
49
50
51 public class DelayedAsynchronousServiceCallProxy extends BaseInvocationHandler implements TargetedInvocationHandler {
52
53 private static final Logger LOG = Logger.getLogger(DelayedAsynchronousServiceCallProxy.class);
54
55 List<Endpoint> endpoints;
56 private Serializable context;
57 private String value1;
58 private String value2;
59 private long delayMilliseconds;
60
61 protected DelayedAsynchronousServiceCallProxy(List<Endpoint> endpoints, Serializable context,
62 String value1, String value2, long delayMilliseconds) {
63 this.endpoints = endpoints;
64 this.context = context;
65 this.value1 = value1;
66 this.value2 = value2;
67 this.delayMilliseconds = delayMilliseconds;
68 }
69
70 public static Object createInstance(List<Endpoint> endpoints, Serializable context, String value1,
71 String value2, long delayMilliseconds) {
72 if (endpoints == null || endpoints.isEmpty()) {
73 throw new RuntimeException("Cannot create service proxy, no service(s) passed in.");
74 }
75 try {
76 return Proxy.newProxyInstance(ClassLoaderUtils.getDefaultClassLoader(), ContextClassLoaderProxy
77 .getInterfacesToProxy(endpoints.get(0).getService()),
78 new DelayedAsynchronousServiceCallProxy(endpoints, context, value1, value2, delayMilliseconds));
79 } catch (Exception e) {
80 throw new RiceRuntimeException(e);
81 }
82 }
83
84 @Override
85 protected Object invokeInternal(Object proxy, Method method, Object[] arguments) throws Throwable {
86
87 AsynchronousCall methodCall = null;
88 PersistedMessageBO message = null;
89 synchronized (this) {
90
91
92 for (Endpoint endpoint : this.endpoints) {
93 ServiceConfiguration serviceConfiguration = endpoint.getServiceConfiguration();
94 methodCall = new AsynchronousCall(method.getParameterTypes(), arguments, serviceConfiguration, method.getName(),
95 null, this.context);
96 message = KSBServiceLocator.getMessageQueueService().getMessage(serviceConfiguration, methodCall);
97 message.setValue1(this.value1);
98 message.setValue2(this.value2);
99 Calendar now = Calendar.getInstance();
100 now.add(Calendar.MILLISECOND, (int) delayMilliseconds);
101 message.setQueueDate(new Timestamp(now.getTimeInMillis()));
102 scheduleMessage(message);
103
104
105
106
107 if (serviceConfiguration.isQueue()) {
108 break;
109 }
110 }
111 }
112 return null;
113 }
114
115 protected void scheduleMessage(PersistedMessageBO message) throws SchedulerException {
116 LOG.debug("Scheduling execution of a delayed asynchronous message.");
117 Scheduler scheduler = KSBServiceLocator.getScheduler();
118 JobDataMap jobData = new JobDataMap();
119 jobData.put(MessageServiceExecutorJob.MESSAGE_KEY, message);
120 JobDetail jobDetail = new JobDetail("Delayed_Asynchronous_Call-" + Math.random(), "Delayed_Asynchronous_Call",
121 MessageServiceExecutorJob.class);
122 jobDetail.setJobDataMap(jobData);
123 jobDetail.addJobListener(MessageServiceExecutorJobListener.NAME);
124 Trigger trigger = new SimpleTrigger("Delayed_Asynchronous_Call_Trigger-" + Math.random(),
125 "Delayed_Asynchronous_Call", message.getQueueDate());
126 trigger.setJobDataMap(jobData);
127 scheduler.scheduleJob(jobDetail, trigger);
128 }
129
130
131
132
133
134 public Object getTarget() {
135 return this.endpoints;
136 }
137
138 }