1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.ksb.messaging;
17
18 import java.io.Serializable;
19 import java.lang.reflect.Method;
20 import java.sql.Timestamp;
21
22 import javax.xml.namespace.QName;
23
24 import org.apache.log4j.Logger;
25 import org.kuali.rice.core.api.config.property.ConfigContext;
26 import org.kuali.rice.core.api.exception.RiceRuntimeException;
27 import org.kuali.rice.ksb.api.KsbApiServiceLocator;
28 import org.kuali.rice.ksb.api.bus.Endpoint;
29 import org.kuali.rice.ksb.api.bus.ServiceBus;
30 import org.kuali.rice.ksb.api.bus.ServiceConfiguration;
31 import org.kuali.rice.ksb.api.messaging.AsynchronousCall;
32 import org.kuali.rice.ksb.api.messaging.AsynchronousCallback;
33 import org.kuali.rice.ksb.service.KSBServiceLocator;
34 import org.kuali.rice.ksb.util.KSBConstants;
35 import org.springframework.transaction.TransactionStatus;
36 import org.springframework.transaction.support.TransactionCallback;
37
38
39
40
41
42
43 public class MessageServiceInvoker implements Runnable {
44
45 protected static final Logger LOG = Logger.getLogger(MessageServiceInvoker.class);
46
47 private PersistedMessageBO message;
48 private Object service;
49 private AsynchronousCall methodCall;
50
51 public MessageServiceInvoker(PersistedMessageBO message) {
52 this.message = message;
53 }
54
55 public void run() {
56 LOG.debug("calling service from persisted message " + getMessage().getRouteQueueId());
57 Object result = null;
58 try {
59 result = KSBServiceLocator.getTransactionTemplate().execute(new TransactionCallback<Object>() {
60 public Object doInTransaction(TransactionStatus status) {
61 AsynchronousCall methodCall = getMessage().getPayload().getMethodCall();
62 Object result = null;
63 try {
64 result = invokeService(methodCall);
65 KSBServiceLocator.getMessageQueueService().delete(getMessage());
66 } catch (Throwable t) {
67 LOG.warn("Caught throwable making async service call " + methodCall, t);
68 throw new MessageProcessingException(t);
69 }
70 return result;
71 }
72 });
73 } catch (Throwable t) {
74
75
76 boolean allowSyncExceptionRouting = new Boolean(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.ALLOW_SYNC_EXCEPTION_ROUTING));
77 if (!allowSyncExceptionRouting && KSBConstants.MESSAGING_SYNCHRONOUS.equals(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_DELIVERY))) {
78 if (t instanceof RuntimeException) {
79 throw (RuntimeException)t;
80 }
81 throw new RiceRuntimeException(t);
82 } else {
83 placeInExceptionRouting(t, getMethodCall(), getService());
84 }
85 } finally {
86 try {
87 notifyOnCallback(methodCall, result);
88 } catch (Exception e) {
89 LOG.warn("Exception caught notifying callback", e);
90 }
91 try {
92 notifyGlobalCallbacks(methodCall, result);
93 } catch (Exception e) {
94 LOG.warn("Exception caught notifying callback", e);
95 }
96 }
97 }
98
99
100
101
102
103
104
105 protected void placeInExceptionRouting(Throwable t, AsynchronousCall call, Object service) {
106 LOG.error("Error processing message: " + this.message, t);
107 final Throwable throwable;
108 if (t instanceof MessageProcessingException) {
109 throwable = t.getCause();
110 } else {
111 throwable = t;
112 }
113 try {
114 try {
115 KSBServiceLocator.getExceptionRoutingService().placeInExceptionRouting(throwable, this.message, service);
116 } catch (Throwable t1) {
117 KSBServiceLocator.getExceptionRoutingService().placeInExceptionRoutingLastDitchEffort(throwable, this.message, service);
118 }
119 } catch (Throwable t2) {
120 LOG.error("An error was encountered when invoking exception handler for message. Attempting to change message status to EXCEPTION.", t2);
121 message.setQueueStatus(KSBConstants.ROUTE_QUEUE_EXCEPTION);
122 message.setQueueDate(new Timestamp(System.currentTimeMillis()));
123 try {
124 KSBServiceLocator.getMessageQueueService().save(message);
125 } catch (Throwable t3) {
126 LOG.fatal("Failed to flip status of message to EXCEPTION!!!", t3);
127 }
128 }
129 }
130
131
132
133
134
135
136 protected Object invokeService(AsynchronousCall methodCall) throws Exception {
137 this.methodCall = methodCall;
138 ServiceConfiguration serviceConfiguration = methodCall.getServiceConfiguration();
139 QName serviceName = serviceConfiguration.getServiceName();
140 if (LOG.isDebugEnabled()) {
141 LOG.debug("Attempting to call service " + serviceName);
142 }
143
144 Object service = getService(serviceConfiguration);
145 if (service == null) {
146 throw new RiceRuntimeException("Failed to locate service endpoint for message: " + serviceConfiguration);
147 }
148 Method method = service.getClass().getMethod(methodCall.getMethodName(), methodCall.getParamTypes());
149 return method.invoke(service, methodCall.getArguments());
150 }
151
152 protected Object getService(ServiceConfiguration serviceConfiguration) {
153 Object service;
154 if (serviceConfiguration.isQueue()) {
155 service = getQueueService(serviceConfiguration);
156 } else {
157 service = getTopicService(serviceConfiguration);
158 }
159 return service;
160 }
161
162
163
164
165
166
167
168
169
170
171
172 protected Object getTopicService(ServiceConfiguration serviceConfiguration) {
173
174
175 ServiceBus serviceBus = KsbApiServiceLocator.getServiceBus();
176 Endpoint endpoint = serviceBus.getConfiguredEndpoint(serviceConfiguration);
177 if (endpoint == null) {
178 return null;
179 }
180 return endpoint.getService();
181 }
182
183
184
185
186
187
188
189 protected Object getQueueService(ServiceConfiguration serviceConfiguration) {
190 ServiceBus serviceBus = KsbApiServiceLocator.getServiceBus();
191 return serviceBus.getService(serviceConfiguration.getServiceName());
192 }
193
194
195
196
197
198
199
200 protected void notifyOnCallback(AsynchronousCall methodCall, Object callResult) {
201 AsynchronousCallback callback = methodCall.getCallback();
202 notifyOnCallback(methodCall, callback, callResult);
203 }
204
205 protected void notifyGlobalCallbacks(AsynchronousCall methodCall, Object callResult) {
206 if (LOG.isDebugEnabled()) {
207 LOG.debug("Notifying global callbacks");
208 }
209 for (AsynchronousCallback globalCallBack : GlobalCallbackRegistry.getCallbacks()) {
210 notifyOnCallback(methodCall, globalCallBack, callResult);
211 }
212 }
213
214 protected void notifyOnCallback(AsynchronousCall methodCall, AsynchronousCallback callback, Object callResult) {
215 if (callback != null) {
216 try {
217 synchronized (callback) {
218 if (LOG.isDebugEnabled()) {
219 LOG.debug("Notifying callback " + callback + " with callResult " + callResult);
220 }
221 callback.notifyAll();
222 if (callResult instanceof Serializable || callResult == null) {
223 callback.callback((Serializable) callResult, methodCall);
224 } else {
225
226 LOG.warn("Attempted to call callback with non-serializable object.");
227 }
228 }
229 } catch (Throwable t) {
230 LOG.error("Caught throwable from callback object " + callback.getClass(), t);
231 }
232 }
233 }
234
235 public PersistedMessageBO getMessage() {
236 return this.message;
237 }
238
239 public void setMessage(PersistedMessageBO message) {
240 this.message = message;
241 }
242
243 public Object getService() {
244 return this.service;
245 }
246
247 public AsynchronousCall getMethodCall() {
248 return this.methodCall;
249 }
250
251 public void setMethodCall(AsynchronousCall methodCall) {
252 this.methodCall = methodCall;
253 }
254
255 public void setService(Object service) {
256 this.service = service;
257 }
258 }