1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.ksb.messaging;
17
18 import org.apache.commons.lang.StringUtils;
19 import org.kuali.rice.core.api.config.property.ConfigContext;
20 import org.kuali.rice.ksb.api.KsbApiServiceLocator;
21 import org.kuali.rice.ksb.api.bus.Endpoint;
22 import org.kuali.rice.ksb.api.messaging.AsynchronousCallback;
23 import org.kuali.rice.ksb.api.messaging.MessageHelper;
24 import org.kuali.rice.ksb.messaging.serviceproxies.AsynchronousServiceCallProxy;
25 import org.kuali.rice.ksb.messaging.serviceproxies.DelayedAsynchronousServiceCallProxy;
26 import org.kuali.rice.ksb.messaging.serviceproxies.SynchronousServiceCallProxy;
27 import org.kuali.rice.ksb.util.KSBConstants;
28
29 import javax.xml.namespace.QName;
30 import java.io.Serializable;
31 import java.util.ArrayList;
32 import java.util.List;
33
34
35 public class MessageHelperImpl implements MessageHelper {
36
37 @Override
38 public <T> T getServiceAsynchronously(QName qname) {
39 return (T) getServiceAsynchronously(qname, null, null, null, null, null);
40 }
41
42 @Override
43 public <T> T getServiceAsynchronously(QName qname, String applicationId) {
44 return (T) getServiceAsynchronously(qname, applicationId, null, null, null, null);
45 }
46
47 @Override
48 public <T> T getServiceAsynchronously(QName qname, AsynchronousCallback callback) {
49 return (T) getServiceAsynchronously(qname, null, callback, null, null, null);
50 }
51
52 @Override
53 public <T> T getServiceAsynchronously(QName qname, AsynchronousCallback callback, Serializable context) {
54 return (T) getServiceAsynchronously(qname, null, callback, context, null, null);
55 }
56
57 @Override
58 public <T> T getServiceAsynchronously(QName qname, AsynchronousCallback callback, Serializable context, String value1, String value2) {
59 return (T) getServiceAsynchronously(qname, null, callback, context, value1, value2);
60 }
61
62 @Override
63 public <T> T getServiceAsynchronously(QName qname, String applicationId, AsynchronousCallback callback,
64 Serializable context, String value1, String value2) {
65
66 List<Endpoint> endpoints = KsbApiServiceLocator.getServiceBus().getEndpoints(qname);
67 endpoints = filterEndpointsByApplicationId(endpoints, applicationId);
68 if (endpoints.isEmpty()) {
69 throw new RuntimeException("Cannot create service proxy, failed to locate any endpoints with the given service name: " + qname + (applicationId != null ? ", and application id: " + applicationId : ""));
70 }
71 return (T) createProxy(syncMode(), endpoints, callback, context, value1, value2);
72 }
73
74 public <T> T getDelayedAsynchronousServiceCallProxy(QName qname, String applicationId, Serializable context, String value1, String value2, long delayMilliseconds) {
75 List<Endpoint> endpoints = KsbApiServiceLocator.getServiceBus().getEndpoints(qname);
76 endpoints = filterEndpointsByApplicationId(endpoints, applicationId);
77 if (endpoints.isEmpty()) {
78 throw new RuntimeException("Cannot create service proxy, failed to locate any endpoints with the given service name: " + qname);
79 }
80 return (T) createProxyDelayed(syncMode(), endpoints, context, value1, value2, delayMilliseconds);
81 }
82
83 @Override
84 public <T> T getServiceAsynchronously(QName qname, Serializable context, String value1, String value2, long delayMilliseconds) {
85 return (T) getDelayedAsynchronousServiceCallProxy(qname, null, context, value1, value2, delayMilliseconds);
86 }
87
88 public <T> T getServiceAsynchronously(QName qname, String applicationId, Serializable context, String value1, String value2, long delayMilliseconds) {
89 return (T) getDelayedAsynchronousServiceCallProxy(qname, applicationId, context, value1, value2, delayMilliseconds);
90 }
91
92 @Override
93 public <T> List<T> getAllRemoteServicesAsynchronously(QName qname) {
94 List<Endpoint> endpoints = KsbApiServiceLocator.getServiceBus().getRemoteEndpoints(qname);
95 if (endpoints.isEmpty()) {
96 throw new RuntimeException("Cannot create service proxy, failed to locate any endpoints with the given service name: " + qname);
97 }
98 final List<T> proxies = new ArrayList<T>();
99 final boolean syncMode = syncMode();
100 final String instanceId = KsbApiServiceLocator.getServiceBus().getInstanceId();
101 for (Endpoint e : endpoints) {
102 if (!e.getServiceConfiguration().getInstanceId().equals(instanceId)) {
103 proxies.add(MessageHelperImpl.<T>createProxy(syncMode, endpoints, null, null, null, null));
104 }
105 }
106 return proxies;
107 }
108
109 public static <T> T createProxy(boolean sync, List<Endpoint> endpoints, AsynchronousCallback callback, Serializable context, String value1, String value2) {
110 return sync ? (T) SynchronousServiceCallProxy.createInstance(endpoints, callback, context, value1, value2)
111 : (T) AsynchronousServiceCallProxy.createInstance(endpoints, callback, context, value1, value2);
112 }
113
114 public static <T> T createProxyDelayed(boolean sync, List<Endpoint> endpoints,Serializable context, String value1, String value2, long delayMilliseconds) {
115 return sync ? (T) SynchronousServiceCallProxy.createInstance(endpoints, null, context, value1, value2)
116 : (T) DelayedAsynchronousServiceCallProxy.createInstance(endpoints, context, value1, value2, delayMilliseconds);
117 }
118
119 private static boolean syncMode() {
120 return KSBConstants.MESSAGING_SYNCHRONOUS.equals(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_DELIVERY));
121 }
122
123 private List<Endpoint> filterEndpointsByApplicationId(List<Endpoint> endpoints, String applicationId) {
124 if (StringUtils.isBlank(applicationId)) {
125 return endpoints;
126 }
127 List<Endpoint> filteredEndpoints = new ArrayList<Endpoint>();
128 for (Endpoint endpoint : endpoints) {
129 if (endpoint.getServiceConfiguration().getApplicationId().equals(applicationId)) {
130 filteredEndpoints.add(endpoint);
131 }
132 }
133 return filteredEndpoints;
134 }
135
136 }