View Javadoc
1   /**
2    * Copyright 2005-2014 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.ksb.messaging.serviceproxies;
17  
18  import org.apache.commons.collections.CollectionUtils;
19  import org.apache.log4j.Logger;
20  import org.kuali.rice.core.api.exception.RiceRuntimeException;
21  import org.kuali.rice.core.api.util.ClassLoaderUtils;
22  import org.kuali.rice.core.api.util.ContextClassLoaderProxy;
23  import org.kuali.rice.core.api.util.reflect.BaseInvocationHandler;
24  import org.kuali.rice.core.api.util.reflect.TargetedInvocationHandler;
25  import org.kuali.rice.ksb.api.bus.Endpoint;
26  import org.kuali.rice.ksb.api.bus.ServiceConfiguration;
27  import org.kuali.rice.ksb.api.messaging.AsynchronousCall;
28  import org.kuali.rice.ksb.api.messaging.AsynchronousCallback;
29  import org.kuali.rice.ksb.messaging.PersistedMessageBO;
30  import org.kuali.rice.ksb.service.KSBServiceLocator;
31  import org.kuali.rice.ksb.util.KSBConstants;
32  
33  import java.io.Serializable;
34  import java.lang.reflect.Method;
35  import java.lang.reflect.Proxy;
36  import java.util.List;
37  
38  /**
39   * Standard default proxy used to call services asynchronously. Persists the method call to the db so call is never
40   * lost and only sent when transaction is committed.
41   *
42   * @author Kuali Rice Team (rice.collab@kuali.org)
43   */
44  public class AsynchronousServiceCallProxy extends BaseInvocationHandler implements TargetedInvocationHandler {
45  
46      private static final Logger LOG = Logger.getLogger(AsynchronousServiceCallProxy.class);
47  
48      private AsynchronousCallback callback;
49  
50      private List<Endpoint> endpoints;
51  
52      private Serializable context;
53  
54      private String value1;
55  
56      private String value2;
57  
58      protected AsynchronousServiceCallProxy(List<Endpoint> endpoints, AsynchronousCallback callback,
59              Serializable context, String value1, String value2) {
60          this.endpoints = endpoints;
61          this.callback = callback;
62          this.context = context;
63          this.value1 = value1;
64          this.value2 = value2;
65      }
66  
67      public static Object createInstance(List<Endpoint> endpoints, AsynchronousCallback callback, Serializable context,
68              String value1, String value2) {
69          if (endpoints == null || endpoints.isEmpty()) {
70              throw new RuntimeException("Cannot create service proxy, no service(s) passed in.");
71          }
72          try {
73              return Proxy.newProxyInstance(ClassLoaderUtils.getDefaultClassLoader(),
74                      ContextClassLoaderProxy.getInterfacesToProxy(endpoints.get(0).getService()),
75                      new AsynchronousServiceCallProxy(endpoints, callback, context, value1, value2));
76          } catch (Exception e) {
77              throw new RiceRuntimeException(e);
78          }
79      }
80  
81      @Override
82      protected Object invokeInternal(Object proxy, Method method, Object[] arguments) throws Throwable {
83  
84          if (LOG.isDebugEnabled()) {
85              LOG.debug("creating messages for method invocation: " + method.getName());
86          }
87          // there are multiple service calls to make in the case of topics.
88          AsynchronousCall methodCall = null;
89          PersistedMessageBO message = null;
90          synchronized (this) {
91              // consider moving all this topic invocation stuff to the service
92              // invoker for speed reasons
93              for (Endpoint endpoint : this.endpoints) {
94                  ServiceConfiguration serviceConfiguration = endpoint.getServiceConfiguration();
95                  methodCall = new AsynchronousCall(method.getParameterTypes(), arguments, serviceConfiguration,
96                          method.getName(), this.callback, this.context);
97                  message = PersistedMessageBO.buildMessage(serviceConfiguration, methodCall);
98                  message.setValue1(this.value1);
99                  message.setValue2(this.value2);
100                 message = saveMessage(message);
101                 executeMessage(message);
102                 // only do one iteration if this is a queue. The load balancing
103                 // will be handled when the service is
104                 // fetched by the MessageServiceInvoker through the GRL (and
105                 // then through the RemoteResourceServiceLocatorImpl)
106                 if (serviceConfiguration.isQueue()) {
107                     break;
108                 }
109             }
110         }
111         if (LOG.isDebugEnabled()) {
112             LOG.debug("finished creating messages for method invocation: " + method.getName());
113         }
114         return null;
115     }
116 
117     @Override
118     protected String proxyToString(Object proxy) {
119         StringBuilder builder = new StringBuilder();
120         builder.append("Service call proxy (" + getClass().getName() + ") - for endpoints with service name: ");
121         if (CollectionUtils.isNotEmpty(this.endpoints)) {
122             builder.append(this.endpoints.get(0).getServiceConfiguration().getServiceName().toString());
123         } else {
124             builder.append("<< no endpoints on this proxy!!! >>");
125         }
126         return builder.toString();
127     }
128 
129     protected PersistedMessageBO saveMessage(PersistedMessageBO message) {
130         message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING);
131         return KSBServiceLocator.getMessageQueueService().save(message);
132     }
133 
134     protected void executeMessage(PersistedMessageBO message) throws Exception {
135         MessageSender.sendMessage(message);
136     }
137 
138     /**
139      * Returns the List<RemotedServiceHolder> of asynchronous services which will be invoked by calls to this proxy.
140      * This is a List because, in the case of Topics, there can be more than one service invoked.
141      */
142     public Object getTarget() {
143         return this.endpoints;
144     }
145 
146     public AsynchronousCallback getCallback() {
147         return this.callback;
148     }
149 
150     public void setCallback(AsynchronousCallback callback) {
151         this.callback = callback;
152     }
153 
154 }