View Javadoc

1   /*
2    * Copyright 2006-2011 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.ksb.messaging.serviceproxies;
17  
18  import org.apache.log4j.Logger;
19  import org.kuali.rice.core.api.exception.RiceRuntimeException;
20  import org.kuali.rice.core.api.util.ClassLoaderUtils;
21  import org.kuali.rice.core.api.util.reflect.BaseInvocationHandler;
22  import org.kuali.rice.core.api.util.reflect.TargetedInvocationHandler;
23  import org.kuali.rice.core.impl.resourceloader.ContextClassLoaderProxy;
24  import org.kuali.rice.ksb.api.bus.Endpoint;
25  import org.kuali.rice.ksb.api.bus.ServiceConfiguration;
26  import org.kuali.rice.ksb.api.messaging.AsynchronousCall;
27  import org.kuali.rice.ksb.api.messaging.AsynchronousCallback;
28  import org.kuali.rice.ksb.messaging.PersistedMessageBO;
29  import org.kuali.rice.ksb.service.KSBServiceLocator;
30  import org.kuali.rice.ksb.util.KSBConstants;
31  
32  import java.io.Serializable;
33  import java.lang.reflect.Method;
34  import java.lang.reflect.Proxy;
35  import java.util.List;
36  
37  
38  /**
39   * Standard default proxy used to call services asynchronously. Persists the method call to the db so call is never lost and
40   * only sent when transaction is committed.
41   * 
42   * @author Kuali Rice Team (rice.collab@kuali.org)
43   * 
44   */
45  public class AsynchronousServiceCallProxy extends BaseInvocationHandler implements TargetedInvocationHandler {
46      
47      private static final Logger LOG = Logger.getLogger(AsynchronousServiceCallProxy.class);
48  
49      private AsynchronousCallback callback;
50  
51      private List<Endpoint> endpoints;
52  
53      private Serializable context;
54  
55      private String value1;
56  
57      private String value2;
58  
59      protected AsynchronousServiceCallProxy(List<Endpoint> endpoints, AsynchronousCallback callback,
60  	    Serializable context, String value1, String value2) {
61  	this.endpoints = endpoints;
62  	this.callback = callback;
63  	this.context = context;
64  	this.value1 = value1;
65  	this.value2 = value2;
66      }
67  
68      public static Object createInstance(List<Endpoint> endpoints, AsynchronousCallback callback,
69  	    Serializable context, String value1, String value2) {
70  	if (endpoints == null || endpoints.isEmpty()) {
71  	    throw new RuntimeException("Cannot create service proxy, no service(s) passed in.");
72  	}
73  	try {
74  	    return Proxy.newProxyInstance(ClassLoaderUtils.getDefaultClassLoader(), ContextClassLoaderProxy
75  		    .getInterfacesToProxy(endpoints.get(0).getService()), new AsynchronousServiceCallProxy(
76  		    endpoints, callback, context, value1, value2));
77  	} catch (Exception e) {
78  	    throw new RiceRuntimeException(e);
79  	}
80      }
81  
82      @Override
83      protected Object invokeInternal(Object proxy, Method method, Object[] arguments) throws Throwable {
84  
85  	if (LOG.isDebugEnabled()) {
86  	    LOG.debug("creating messages for method invocation: " + method.getName());
87  	}
88  	// there are multiple service calls to make in the case of topics.
89  	AsynchronousCall methodCall = null;
90  	PersistedMessageBO message = null;
91  	synchronized (this) {
92  	    // consider moving all this topic invocation stuff to the service
93  	    // invoker for speed reasons
94  	    for (Endpoint endpoint : this.endpoints) {
95  		ServiceConfiguration serviceConfiguration = endpoint.getServiceConfiguration();
96  		methodCall = new AsynchronousCall(method.getParameterTypes(), arguments, serviceConfiguration, method.getName(),
97  			this.callback, this.context);
98  		message = KSBServiceLocator.getMessageQueueService().getMessage(serviceConfiguration, methodCall);
99  		message.setValue1(this.value1);
100 		message.setValue2(this.value2);
101 		saveMessage(message);
102 		executeMessage(message);
103 		// only do one iteration if this is a queue. The load balancing
104 		// will be handled when the service is
105 		// fetched by the MessageServiceInvoker through the GRL (and
106 		// then through the RemoteResourceServiceLocatorImpl)
107 		if (serviceConfiguration.isQueue()) {
108 		    break;
109 		}
110 	    }
111 	}
112 	if (LOG.isDebugEnabled()) {
113 	    LOG.debug("finished creating messages for method invocation: " + method.getName());
114 	}
115 	return null;
116     }
117 
118     protected void saveMessage(PersistedMessageBO message) {
119 	message.setQueueStatus(KSBConstants.ROUTE_QUEUE_ROUTING);
120 	KSBServiceLocator.getMessageQueueService().save(message);
121     }
122 
123     protected void executeMessage(PersistedMessageBO message) throws Exception {
124 	MessageSender.sendMessage(message);
125     }
126 
127     /**
128          * Returns the List<RemotedServiceHolder> of asynchronous services which will be invoked by calls to this proxy.
129          * This is a List because, in the case of Topics, there can be more than one service invoked.
130          */
131     public Object getTarget() {
132     	return this.endpoints;
133     }
134 
135     public AsynchronousCallback getCallback() {
136 	return this.callback;
137     }
138 
139     public void setCallback(AsynchronousCallback callback) {
140 	this.callback = callback;
141     }
142 
143 }