001/** 002 * Copyright 2005-2016 The Kuali Foundation 003 * 004 * Licensed under the Educational Community License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/ecl2.php 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.kuali.rice.ksb.messaging; 017 018import org.apache.commons.lang.StringUtils; 019import org.kuali.rice.core.api.config.property.ConfigContext; 020import org.kuali.rice.ksb.api.KsbApiServiceLocator; 021import org.kuali.rice.ksb.api.bus.Endpoint; 022import org.kuali.rice.ksb.api.messaging.AsynchronousCallback; 023import org.kuali.rice.ksb.api.messaging.MessageHelper; 024import org.kuali.rice.ksb.messaging.serviceproxies.AsynchronousServiceCallProxy; 025import org.kuali.rice.ksb.messaging.serviceproxies.DelayedAsynchronousServiceCallProxy; 026import org.kuali.rice.ksb.messaging.serviceproxies.SynchronousServiceCallProxy; 027import org.kuali.rice.ksb.util.KSBConstants; 028 029import javax.xml.namespace.QName; 030import java.io.Serializable; 031import java.util.ArrayList; 032import java.util.List; 033 034 035public class MessageHelperImpl implements MessageHelper { 036 037 @Override 038 public <T> T getServiceAsynchronously(QName qname) { 039 return (T) getServiceAsynchronously(qname, null, null, null, null, null); 040 } 041 042 @Override 043 public <T> T getServiceAsynchronously(QName qname, String applicationId) { 044 return (T) getServiceAsynchronously(qname, applicationId, null, null, null, null); 045 } 046 047 @Override 048 public <T> T getServiceAsynchronously(QName qname, AsynchronousCallback callback) { 049 return (T) getServiceAsynchronously(qname, null, callback, null, null, null); 050 } 051 052 @Override 053 public <T> T getServiceAsynchronously(QName qname, AsynchronousCallback callback, Serializable context) { 054 return (T) getServiceAsynchronously(qname, null, callback, context, null, null); 055 } 056 057 @Override 058 public <T> T getServiceAsynchronously(QName qname, AsynchronousCallback callback, Serializable context, String value1, String value2) { 059 return (T) getServiceAsynchronously(qname, null, callback, context, value1, value2); 060 } 061 062 @Override 063 public <T> T getServiceAsynchronously(QName qname, String applicationId, AsynchronousCallback callback, 064 Serializable context, String value1, String value2) { 065 066 List<Endpoint> endpoints = KsbApiServiceLocator.getServiceBus().getEndpoints(qname); 067 endpoints = filterEndpointsByApplicationId(endpoints, applicationId); 068 if (endpoints.isEmpty()) { 069 throw new RuntimeException("Cannot create service proxy, failed to locate any endpoints with the given service name: " + qname + (applicationId != null ? ", and application id: " + applicationId : "")); 070 } 071 return (T) createProxy(syncMode(), endpoints, callback, context, value1, value2); 072 } 073 074 public <T> T getDelayedAsynchronousServiceCallProxy(QName qname, String applicationId, Serializable context, String value1, String value2, long delayMilliseconds) { 075 List<Endpoint> endpoints = KsbApiServiceLocator.getServiceBus().getEndpoints(qname); 076 endpoints = filterEndpointsByApplicationId(endpoints, applicationId); 077 if (endpoints.isEmpty()) { 078 throw new RuntimeException("Cannot create service proxy, failed to locate any endpoints with the given service name: " + qname); 079 } 080 return (T) createProxyDelayed(syncMode(), endpoints, context, value1, value2, delayMilliseconds); 081 } 082 083 @Override 084 public <T> T getServiceAsynchronously(QName qname, Serializable context, String value1, String value2, long delayMilliseconds) { 085 return (T) getDelayedAsynchronousServiceCallProxy(qname, null, context, value1, value2, delayMilliseconds); 086 } 087 088 public <T> T getServiceAsynchronously(QName qname, String applicationId, Serializable context, String value1, String value2, long delayMilliseconds) { 089 return (T) getDelayedAsynchronousServiceCallProxy(qname, applicationId, context, value1, value2, delayMilliseconds); 090 } 091 092 @Override 093 public <T> List<T> getAllRemoteServicesAsynchronously(QName qname) { 094 List<Endpoint> endpoints = KsbApiServiceLocator.getServiceBus().getRemoteEndpoints(qname); 095 if (endpoints.isEmpty()) { 096 throw new RuntimeException("Cannot create service proxy, failed to locate any endpoints with the given service name: " + qname); 097 } 098 final List<T> proxies = new ArrayList<T>(); 099 final boolean syncMode = syncMode(); 100 final String instanceId = KsbApiServiceLocator.getServiceBus().getInstanceId(); 101 for (Endpoint e : endpoints) { 102 if (!e.getServiceConfiguration().getInstanceId().equals(instanceId)) { 103 proxies.add(MessageHelperImpl.<T>createProxy(syncMode, endpoints, null, null, null, null)); 104 } 105 } 106 return proxies; 107 } 108 109 public static <T> T createProxy(boolean sync, List<Endpoint> endpoints, AsynchronousCallback callback, Serializable context, String value1, String value2) { 110 return sync ? (T) SynchronousServiceCallProxy.createInstance(endpoints, callback, context, value1, value2) 111 : (T) AsynchronousServiceCallProxy.createInstance(endpoints, callback, context, value1, value2); 112 } 113 114 public static <T> T createProxyDelayed(boolean sync, List<Endpoint> endpoints,Serializable context, String value1, String value2, long delayMilliseconds) { 115 return sync ? (T) SynchronousServiceCallProxy.createInstance(endpoints, null, context, value1, value2) 116 : (T) DelayedAsynchronousServiceCallProxy.createInstance(endpoints, context, value1, value2, delayMilliseconds); 117 } 118 119 private static boolean syncMode() { 120 return KSBConstants.MESSAGING_SYNCHRONOUS.equals(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_DELIVERY)); 121 } 122 123 private List<Endpoint> filterEndpointsByApplicationId(List<Endpoint> endpoints, String applicationId) { 124 if (StringUtils.isBlank(applicationId)) { 125 return endpoints; 126 } 127 List<Endpoint> filteredEndpoints = new ArrayList<Endpoint>(); 128 for (Endpoint endpoint : endpoints) { 129 if (endpoint.getServiceConfiguration().getApplicationId().equals(applicationId)) { 130 filteredEndpoints.add(endpoint); 131 } 132 } 133 return filteredEndpoints; 134 } 135 136}