1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.kuali.rice.ksb.messaging;
18
19 import java.io.ByteArrayInputStream;
20 import java.io.ByteArrayOutputStream;
21 import java.io.IOException;
22 import java.io.ObjectInputStream;
23 import java.io.ObjectOutput;
24 import java.io.ObjectOutputStream;
25 import java.io.Serializable;
26 import java.util.List;
27
28 import javax.xml.namespace.QName;
29
30 import org.apache.commons.codec.binary.Base64;
31 import org.apache.log4j.Logger;
32 import org.kuali.rice.core.api.config.property.ConfigContext;
33 import org.kuali.rice.core.api.exception.RiceRuntimeException;
34 import org.kuali.rice.ksb.api.KsbApiServiceLocator;
35 import org.kuali.rice.ksb.api.bus.Endpoint;
36 import org.kuali.rice.ksb.api.messaging.AsynchronousCallback;
37 import org.kuali.rice.ksb.api.messaging.MessageHelper;
38 import org.kuali.rice.ksb.messaging.serviceproxies.AsynchronousServiceCallProxy;
39 import org.kuali.rice.ksb.messaging.serviceproxies.DelayedAsynchronousServiceCallProxy;
40 import org.kuali.rice.ksb.messaging.serviceproxies.SynchronousServiceCallProxy;
41 import org.kuali.rice.ksb.util.KSBConstants;
42
43
44 public class MessageHelperImpl implements MessageHelper {
45
46 private static final Logger LOG = Logger.getLogger(MessageHelperImpl.class);
47
48 public String serializeObject(Serializable object) {
49 ByteArrayOutputStream bos = new ByteArrayOutputStream();
50 ObjectOutput out = null;
51 try {
52 out = new ObjectOutputStream(bos);
53 out.writeObject(object);
54 } catch (IOException e) {
55 throw new RiceRuntimeException(e);
56 } finally {
57 try {
58 out.close();
59 } catch (IOException e) {
60 LOG.error("Failed to close ObjectOutputStream", e);
61 }
62 }
63 byte[] buf = bos.toByteArray();
64 Base64 b64 = new Base64();
65 byte[] encodedObj = b64.encode(buf);
66 return new String(encodedObj);
67 }
68
69 public Object deserializeObject(String serializedObject) {
70 if (serializedObject == null) {
71 return serializedObject;
72 }
73 Base64 b64 = new Base64();
74 byte[] result = b64.decode(serializedObject.getBytes());
75 Object payload = null;
76 ObjectInputStream ois = null;
77 try {
78 ois = new ObjectInputStream(new ByteArrayInputStream(result));
79 payload = ois.readObject();
80 } catch (Exception e) {
81
82 LOG.error("Caught Error de-serializing message payload", e);
83
84 } finally {
85 try {
86 ois.close();
87 } catch (IOException e) {
88 LOG.error("Failed to close de-serialization stream", e);
89 }
90 }
91 return payload;
92 }
93
94 public Object getServiceAsynchronously(QName qname) {
95 return getAsynchronousServiceCallProxy(qname, null, null, null, null);
96 }
97
98 public Object getServiceAsynchronously(QName qname, AsynchronousCallback callback) {
99 return getAsynchronousServiceCallProxy(qname, callback, null, null, null);
100 }
101
102 public Object getServiceAsynchronously(QName qname, AsynchronousCallback callback, Serializable context) {
103 return getAsynchronousServiceCallProxy(qname, callback, context, null, null);
104 }
105
106 public Object getServiceAsynchronously(QName qname, AsynchronousCallback callback, Serializable context, String value1, String value2) {
107 return getAsynchronousServiceCallProxy(qname, callback, context, value1, value2);
108 }
109
110 public Object getAsynchronousServiceCallProxy(QName qname, AsynchronousCallback callback, Serializable context, String value1, String value2) {
111
112 List<Endpoint> endpoints = KsbApiServiceLocator.getServiceBus().getEndpoints(qname);
113 if (endpoints.isEmpty()) {
114 throw new RuntimeException("Cannot create service proxy, failed to locate any endpoints with the given service name: " + qname);
115 }
116 if (KSBConstants.MESSAGING_SYNCHRONOUS.equals(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_DELIVERY))) {
117 return SynchronousServiceCallProxy.createInstance(endpoints, callback, context, value1, value2);
118 }
119
120 return AsynchronousServiceCallProxy.createInstance(endpoints, callback, context, value1, value2);
121
122 }
123
124 public Object getDelayedAsynchronousServiceCallProxy(QName qname, Serializable context, String value1, String value2, long delayMilliseconds) {
125 List<Endpoint> endpoints = KsbApiServiceLocator.getServiceBus().getEndpoints(qname);
126 if (endpoints.isEmpty()) {
127 throw new RuntimeException("Cannot create service proxy, failed to locate any endpoints with the given service name: " + qname);
128 }
129 if (KSBConstants.MESSAGING_SYNCHRONOUS.equals(ConfigContext.getCurrentContextConfig().getProperty(KSBConstants.Config.MESSAGE_DELIVERY))) {
130 LOG.warn("Executing a delayed service call for " + qname + " with delay of " + delayMilliseconds + " in synchronous mode. Service will be invoked immediately.");
131 return SynchronousServiceCallProxy.createInstance(endpoints, null, context, value1, value2);
132 }
133 return DelayedAsynchronousServiceCallProxy.createInstance(endpoints, context, value1, value2, delayMilliseconds);
134 }
135
136 public Object getServiceAsynchronously(QName qname, Serializable context, String value1, String value2, long delayMilliseconds) {
137 return getDelayedAsynchronousServiceCallProxy(qname, context, value1, value2, delayMilliseconds);
138 }
139
140 }