View Javadoc

1   /*
2    * Copyright 2006-2011 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.kuali.rice.ksb.messaging;
18  
19  import java.io.ByteArrayInputStream;
20  import java.io.ByteArrayOutputStream;
21  import java.io.IOException;
22  import java.io.ObjectInputStream;
23  import java.io.ObjectOutput;
24  import java.io.ObjectOutputStream;
25  import java.io.Serializable;
26  import java.util.List;
27  
28  import javax.xml.namespace.QName;
29  
30  import org.apache.commons.codec.binary.Base64;
31  import org.apache.log4j.Logger;
32  import org.kuali.rice.core.api.config.property.ConfigContext;
33  import org.kuali.rice.core.api.exception.RiceRuntimeException;
34  import org.kuali.rice.ksb.api.KsbApiServiceLocator;
35  import org.kuali.rice.ksb.api.bus.Endpoint;
36  import org.kuali.rice.ksb.api.messaging.AsynchronousCallback;
37  import org.kuali.rice.ksb.api.messaging.MessageHelper;
38  import org.kuali.rice.ksb.messaging.serviceproxies.AsynchronousServiceCallProxy;
39  import org.kuali.rice.ksb.messaging.serviceproxies.DelayedAsynchronousServiceCallProxy;
40  import org.kuali.rice.ksb.messaging.serviceproxies.SynchronousServiceCallProxy;
41  import org.kuali.rice.ksb.util.KSBConstants;
42  
43  
44  public class MessageHelperImpl implements MessageHelper {
45  
46      private static final Logger LOG = Logger.getLogger(MessageHelperImpl.class);
47  
48      public String serializeObject(Serializable object) {
49          ByteArrayOutputStream bos = new ByteArrayOutputStream();
50          ObjectOutput out = null;
51          try {
52              out = new ObjectOutputStream(bos);
53              out.writeObject(object);
54          } catch (IOException e) {
55              throw new RiceRuntimeException(e);
56          } finally {
57              try {
58                  out.close();
59              } catch (IOException e) {
60                  LOG.error("Failed to close ObjectOutputStream", e);
61              }
62          }
63          byte[] buf = bos.toByteArray();
64          Base64 b64 = new Base64();
65          byte[] encodedObj = b64.encode(buf);
66          return new String(encodedObj);
67      }
68  
69      public Object deserializeObject(String serializedObject) {
70          if (serializedObject == null) {
71              return serializedObject;
72          }
73          Base64 b64 = new Base64();
74          byte[] result = b64.decode(serializedObject.getBytes());
75          Object payload = null;
76          ObjectInputStream ois = null;
77          try {
78              ois = new ObjectInputStream(new ByteArrayInputStream(result));
79              payload = ois.readObject();
80          } catch (Exception e) {
81              // may want to move this loggging up
82              LOG.error("Caught Error de-serializing message payload", e);
83              // throw new RiceRuntimeException(e);
84          } finally {
85              try {
86                  ois.close();
87              } catch (IOException e) {
88                  LOG.error("Failed to close de-serialization stream", e);
89              }
90          }
91          return payload;
92      }
93  
94      public Object getServiceAsynchronously(QName qname) {
95          return getAsynchronousServiceCallProxy(qname, null, null, null, null);
96      }
97  
98      public Object getServiceAsynchronously(QName qname, AsynchronousCallback callback) {
99          return getAsynchronousServiceCallProxy(qname, callback, null, null, null);
100     }
101 
102     public Object getServiceAsynchronously(QName qname, AsynchronousCallback callback, Serializable context) {
103         return getAsynchronousServiceCallProxy(qname, callback, context, null, null);
104     }
105 
106     public Object getServiceAsynchronously(QName qname, AsynchronousCallback callback, Serializable context, String value1, String value2) {
107         return getAsynchronousServiceCallProxy(qname, callback, context, value1, value2);
108     }
109 
110     public Object getAsynchronousServiceCallProxy(QName qname, AsynchronousCallback callback, Serializable context, String value1, String value2) {
111 
112     	List<Endpoint> endpoints = KsbApiServiceLocator.getServiceBus().getEndpoints(qname);
113     	if (endpoints.isEmpty()) {
114     		throw new RuntimeException("Cannot create service proxy, failed to locate any endpoints with the given service name: " + qname);
115     	}
116         if (KSBConstants.MESSAGING_SYNCHRONOUS.equals(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_DELIVERY))) {
117             return SynchronousServiceCallProxy.createInstance(endpoints, callback, context, value1, value2);
118         }
119 
120         return AsynchronousServiceCallProxy.createInstance(endpoints, callback, context, value1, value2);
121 
122     }
123 
124     public Object getDelayedAsynchronousServiceCallProxy(QName qname, Serializable context, String value1, String value2, long delayMilliseconds) {
125     	List<Endpoint> endpoints = KsbApiServiceLocator.getServiceBus().getEndpoints(qname);
126     	if (endpoints.isEmpty()) {
127     		throw new RuntimeException("Cannot create service proxy, failed to locate any endpoints with the given service name: " + qname);
128     	}
129         if (KSBConstants.MESSAGING_SYNCHRONOUS.equals(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_DELIVERY))) {
130             LOG.warn("Executing a delayed service call for " + qname + " with delay of " + delayMilliseconds + " in synchronous mode.  Service will be invoked immediately.");
131             return SynchronousServiceCallProxy.createInstance(endpoints, null, context, value1, value2);
132         }
133         return DelayedAsynchronousServiceCallProxy.createInstance(endpoints, context, value1, value2, delayMilliseconds);
134     }
135 
136     public Object getServiceAsynchronously(QName qname, Serializable context, String value1, String value2, long delayMilliseconds) {
137         return getDelayedAsynchronousServiceCallProxy(qname, context, value1, value2, delayMilliseconds);
138     }
139 
140 }