001/**
002 * Copyright 2005-2016 The Kuali Foundation
003 *
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/ecl2.php
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.kuali.rice.ksb.messaging;
017
018import org.apache.commons.lang.StringUtils;
019import org.kuali.rice.core.api.config.property.ConfigContext;
020import org.kuali.rice.ksb.api.KsbApiServiceLocator;
021import org.kuali.rice.ksb.api.bus.Endpoint;
022import org.kuali.rice.ksb.api.messaging.AsynchronousCallback;
023import org.kuali.rice.ksb.api.messaging.MessageHelper;
024import org.kuali.rice.ksb.messaging.serviceproxies.AsynchronousServiceCallProxy;
025import org.kuali.rice.ksb.messaging.serviceproxies.DelayedAsynchronousServiceCallProxy;
026import org.kuali.rice.ksb.messaging.serviceproxies.SynchronousServiceCallProxy;
027import org.kuali.rice.ksb.util.KSBConstants;
028
029import javax.xml.namespace.QName;
030import java.io.Serializable;
031import java.util.ArrayList;
032import java.util.List;
033
034
035public class MessageHelperImpl implements MessageHelper {
036
037    @Override
038    public <T> T getServiceAsynchronously(QName qname) {
039        return (T) getServiceAsynchronously(qname, null, null, null, null, null);
040    }
041
042    @Override
043    public <T> T getServiceAsynchronously(QName qname, String applicationId) {
044        return (T) getServiceAsynchronously(qname, applicationId, null, null, null, null);
045    }
046
047    @Override
048    public <T> T getServiceAsynchronously(QName qname, AsynchronousCallback callback) {
049        return (T) getServiceAsynchronously(qname, null, callback, null, null, null);
050    }
051
052    @Override
053    public <T> T getServiceAsynchronously(QName qname, AsynchronousCallback callback, Serializable context) {
054        return (T) getServiceAsynchronously(qname, null, callback, context, null, null);
055    }
056
057    @Override
058    public <T> T getServiceAsynchronously(QName qname, AsynchronousCallback callback, Serializable context, String value1, String value2) {
059        return (T) getServiceAsynchronously(qname, null, callback, context, value1, value2);
060    }
061
062    @Override
063    public <T> T getServiceAsynchronously(QName qname, String applicationId, AsynchronousCallback callback,
064            Serializable context, String value1, String value2) {
065
066        List<Endpoint> endpoints = KsbApiServiceLocator.getServiceBus().getEndpoints(qname);
067        endpoints = filterEndpointsByApplicationId(endpoints, applicationId);
068        if (endpoints.isEmpty()) {
069                throw new RuntimeException("Cannot create service proxy, failed to locate any endpoints with the given service name: " + qname + (applicationId != null ? ", and application id: " + applicationId : ""));
070        }
071        return (T) createProxy(syncMode(), endpoints, callback, context, value1, value2);
072    }
073
074    public <T> T getDelayedAsynchronousServiceCallProxy(QName qname, String applicationId, Serializable context, String value1, String value2, long delayMilliseconds) {
075        List<Endpoint> endpoints = KsbApiServiceLocator.getServiceBus().getEndpoints(qname);
076        endpoints = filterEndpointsByApplicationId(endpoints, applicationId);
077        if (endpoints.isEmpty()) {
078                throw new RuntimeException("Cannot create service proxy, failed to locate any endpoints with the given service name: " + qname);
079        }
080        return (T) createProxyDelayed(syncMode(), endpoints, context, value1, value2, delayMilliseconds);
081    }
082
083    @Override
084    public <T> T getServiceAsynchronously(QName qname, Serializable context, String value1, String value2, long delayMilliseconds) {
085        return (T) getDelayedAsynchronousServiceCallProxy(qname, null, context, value1, value2, delayMilliseconds);
086    }
087
088    public <T> T getServiceAsynchronously(QName qname, String applicationId, Serializable context, String value1, String value2, long delayMilliseconds) {
089        return (T) getDelayedAsynchronousServiceCallProxy(qname, applicationId, context, value1, value2, delayMilliseconds);
090    }
091
092    @Override
093    public <T> List<T> getAllRemoteServicesAsynchronously(QName qname) {
094        List<Endpoint> endpoints = KsbApiServiceLocator.getServiceBus().getRemoteEndpoints(qname);
095        if (endpoints.isEmpty()) {
096                throw new RuntimeException("Cannot create service proxy, failed to locate any endpoints with the given service name: " + qname);
097        }
098        final List<T> proxies = new ArrayList<T>();
099        final boolean syncMode = syncMode();
100        final String instanceId = KsbApiServiceLocator.getServiceBus().getInstanceId();
101        for (Endpoint e : endpoints) {
102            if (!e.getServiceConfiguration().getInstanceId().equals(instanceId)) {
103                proxies.add(MessageHelperImpl.<T>createProxy(syncMode, endpoints, null, null, null, null));
104            }
105        }
106        return proxies;
107    }
108
109    public static <T> T createProxy(boolean sync, List<Endpoint> endpoints, AsynchronousCallback callback, Serializable context, String value1, String value2) {
110        return sync ? (T) SynchronousServiceCallProxy.createInstance(endpoints, callback, context, value1, value2)
111                : (T) AsynchronousServiceCallProxy.createInstance(endpoints, callback, context, value1, value2);
112    }
113
114    public static <T> T createProxyDelayed(boolean sync, List<Endpoint> endpoints,Serializable context, String value1, String value2, long delayMilliseconds) {
115        return sync ? (T) SynchronousServiceCallProxy.createInstance(endpoints, null, context, value1, value2)
116                : (T) DelayedAsynchronousServiceCallProxy.createInstance(endpoints, context, value1, value2, delayMilliseconds);
117    }
118
119    private static boolean syncMode() {
120        return KSBConstants.MESSAGING_SYNCHRONOUS.equals(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_DELIVERY));
121    }
122
123    private List<Endpoint> filterEndpointsByApplicationId(List<Endpoint> endpoints, String applicationId) {
124        if (StringUtils.isBlank(applicationId)) {
125            return endpoints;
126        }
127        List<Endpoint> filteredEndpoints = new ArrayList<Endpoint>();
128        for (Endpoint endpoint : endpoints) {
129            if (endpoint.getServiceConfiguration().getApplicationId().equals(applicationId)) {
130                filteredEndpoints.add(endpoint);
131            }
132        }
133        return filteredEndpoints;
134    }
135
136}