View Javadoc
1   /**
2    * Copyright 2005-2015 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.ksb.messaging;
17  
18  import org.apache.commons.lang.StringUtils;
19  import org.kuali.rice.core.api.config.property.ConfigContext;
20  import org.kuali.rice.ksb.api.KsbApiServiceLocator;
21  import org.kuali.rice.ksb.api.bus.Endpoint;
22  import org.kuali.rice.ksb.api.messaging.AsynchronousCallback;
23  import org.kuali.rice.ksb.api.messaging.MessageHelper;
24  import org.kuali.rice.ksb.messaging.serviceproxies.AsynchronousServiceCallProxy;
25  import org.kuali.rice.ksb.messaging.serviceproxies.DelayedAsynchronousServiceCallProxy;
26  import org.kuali.rice.ksb.messaging.serviceproxies.SynchronousServiceCallProxy;
27  import org.kuali.rice.ksb.util.KSBConstants;
28  
29  import javax.xml.namespace.QName;
30  import java.io.Serializable;
31  import java.util.ArrayList;
32  import java.util.List;
33  
34  
35  public class MessageHelperImpl implements MessageHelper {
36  
37      @Override
38      public <T> T getServiceAsynchronously(QName qname) {
39          return (T) getServiceAsynchronously(qname, null, null, null, null, null);
40      }
41  
42      @Override
43      public <T> T getServiceAsynchronously(QName qname, String applicationId) {
44          return (T) getServiceAsynchronously(qname, applicationId, null, null, null, null);
45      }
46  
47      @Override
48      public <T> T getServiceAsynchronously(QName qname, AsynchronousCallback callback) {
49          return (T) getServiceAsynchronously(qname, null, callback, null, null, null);
50      }
51  
52      @Override
53      public <T> T getServiceAsynchronously(QName qname, AsynchronousCallback callback, Serializable context) {
54          return (T) getServiceAsynchronously(qname, null, callback, context, null, null);
55      }
56  
57      @Override
58      public <T> T getServiceAsynchronously(QName qname, AsynchronousCallback callback, Serializable context, String value1, String value2) {
59          return (T) getServiceAsynchronously(qname, null, callback, context, value1, value2);
60      }
61  
62      @Override
63      public <T> T getServiceAsynchronously(QName qname, String applicationId, AsynchronousCallback callback,
64              Serializable context, String value1, String value2) {
65  
66      	List<Endpoint> endpoints = KsbApiServiceLocator.getServiceBus().getEndpoints(qname);
67          endpoints = filterEndpointsByApplicationId(endpoints, applicationId);
68      	if (endpoints.isEmpty()) {
69      		throw new RuntimeException("Cannot create service proxy, failed to locate any endpoints with the given service name: " + qname + (applicationId != null ? ", and application id: " + applicationId : ""));
70      	}
71          return (T) createProxy(syncMode(), endpoints, callback, context, value1, value2);
72      }
73  
74      public <T> T getDelayedAsynchronousServiceCallProxy(QName qname, String applicationId, Serializable context, String value1, String value2, long delayMilliseconds) {
75      	List<Endpoint> endpoints = KsbApiServiceLocator.getServiceBus().getEndpoints(qname);
76          endpoints = filterEndpointsByApplicationId(endpoints, applicationId);
77      	if (endpoints.isEmpty()) {
78      		throw new RuntimeException("Cannot create service proxy, failed to locate any endpoints with the given service name: " + qname);
79      	}
80          return (T) createProxyDelayed(syncMode(), endpoints, context, value1, value2, delayMilliseconds);
81      }
82  
83      @Override
84      public <T> T getServiceAsynchronously(QName qname, Serializable context, String value1, String value2, long delayMilliseconds) {
85          return (T) getDelayedAsynchronousServiceCallProxy(qname, null, context, value1, value2, delayMilliseconds);
86      }
87  
88      public <T> T getServiceAsynchronously(QName qname, String applicationId, Serializable context, String value1, String value2, long delayMilliseconds) {
89          return (T) getDelayedAsynchronousServiceCallProxy(qname, applicationId, context, value1, value2, delayMilliseconds);
90      }
91  
92      @Override
93      public <T> List<T> getAllRemoteServicesAsynchronously(QName qname) {
94          List<Endpoint> endpoints = KsbApiServiceLocator.getServiceBus().getRemoteEndpoints(qname);
95      	if (endpoints.isEmpty()) {
96      		throw new RuntimeException("Cannot create service proxy, failed to locate any endpoints with the given service name: " + qname);
97      	}
98          final List<T> proxies = new ArrayList<T>();
99          final boolean syncMode = syncMode();
100         final String instanceId = KsbApiServiceLocator.getServiceBus().getInstanceId();
101         for (Endpoint e : endpoints) {
102             if (!e.getServiceConfiguration().getInstanceId().equals(instanceId)) {
103                 proxies.add(MessageHelperImpl.<T>createProxy(syncMode, endpoints, null, null, null, null));
104             }
105         }
106         return proxies;
107     }
108 
109     public static <T> T createProxy(boolean sync, List<Endpoint> endpoints, AsynchronousCallback callback, Serializable context, String value1, String value2) {
110         return sync ? (T) SynchronousServiceCallProxy.createInstance(endpoints, callback, context, value1, value2)
111                 : (T) AsynchronousServiceCallProxy.createInstance(endpoints, callback, context, value1, value2);
112     }
113 
114     public static <T> T createProxyDelayed(boolean sync, List<Endpoint> endpoints,Serializable context, String value1, String value2, long delayMilliseconds) {
115         return sync ? (T) SynchronousServiceCallProxy.createInstance(endpoints, null, context, value1, value2)
116                 : (T) DelayedAsynchronousServiceCallProxy.createInstance(endpoints, context, value1, value2, delayMilliseconds);
117     }
118 
119     private static boolean syncMode() {
120         return KSBConstants.MESSAGING_SYNCHRONOUS.equals(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_DELIVERY));
121     }
122 
123     private List<Endpoint> filterEndpointsByApplicationId(List<Endpoint> endpoints, String applicationId) {
124         if (StringUtils.isBlank(applicationId)) {
125             return endpoints;
126         }
127         List<Endpoint> filteredEndpoints = new ArrayList<Endpoint>();
128         for (Endpoint endpoint : endpoints) {
129             if (endpoint.getServiceConfiguration().getApplicationId().equals(applicationId)) {
130                 filteredEndpoints.add(endpoint);
131             }
132         }
133         return filteredEndpoints;
134     }
135 
136 }