001 /**
002 * Copyright 2010 Emmanuel Bourg
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017 package org.apache.qpid.contrib.hessian;
018
019 import java.io.ByteArrayInputStream;
020 import java.io.ByteArrayOutputStream;
021 import java.io.IOException;
022 import java.io.InputStream;
023 import java.io.OutputStream;
024 import java.lang.reflect.InvocationHandler;
025 import java.lang.reflect.Method;
026 import java.lang.reflect.Proxy;
027 import java.util.UUID;
028 import java.util.concurrent.Future;
029 import java.util.concurrent.TimeUnit;
030 import java.util.zip.Deflater;
031 import java.util.zip.DeflaterOutputStream;
032 import java.util.zip.Inflater;
033 import java.util.zip.InflaterInputStream;
034
035 import com.caucho.hessian.client.HessianRuntimeException;
036 import com.caucho.hessian.io.AbstractHessianInput;
037 import com.caucho.hessian.io.AbstractHessianOutput;
038 import com.caucho.hessian.io.HessianProtocolException;
039 import com.caucho.services.server.AbstractSkeleton;
040 import org.apache.qpid.transport.Connection;
041 import org.apache.qpid.transport.DeliveryProperties;
042 import org.apache.qpid.transport.Header;
043 import org.apache.qpid.transport.MessageAcceptMode;
044 import org.apache.qpid.transport.MessageAcquireMode;
045 import org.apache.qpid.transport.MessageCreditUnit;
046 import org.apache.qpid.transport.MessageProperties;
047 import org.apache.qpid.transport.MessageTransfer;
048 import org.apache.qpid.transport.Option;
049 import org.apache.qpid.transport.QueueQueryResult;
050 import org.apache.qpid.transport.ReplyTo;
051 import org.apache.qpid.transport.Session;
052 import org.apache.qpid.transport.SessionException;
053 import org.apache.qpid.transport.SessionListener;
054
055 /**
056 * Proxy implementation for Hessian clients. Applications will generally
057 * use {@link AMQPHessianProxyFactory} to create proxy clients.
058 *
059 * @author Emmanuel Bourg
060 * @author Scott Ferguson
061 */
062 public class AMQPHessianProxy implements InvocationHandler
063 {
064 private AMQPHessianProxyFactory _factory;
065
066 AMQPHessianProxy(AMQPHessianProxyFactory factory)
067 {
068 _factory = factory;
069 }
070
071 /**
072 * Handles the object invocation.
073 *
074 * @param proxy the proxy object to invoke
075 * @param method the method to call
076 * @param args the arguments to the proxy object
077 */
078 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
079 {
080 String methodName = method.getName();
081 Class[] params = method.getParameterTypes();
082
083 // equals and hashCode are special cased
084 if (methodName.equals("equals") && params.length == 1 && params[0].equals(Object.class))
085 {
086 Object value = args[0];
087 if (value == null || !Proxy.isProxyClass(value.getClass()))
088 {
089 return Boolean.FALSE;
090 }
091
092 AMQPHessianProxy handler = (AMQPHessianProxy) Proxy.getInvocationHandler(value);
093
094 return _factory.equals(handler._factory);
095 }
096 else if (methodName.equals("hashCode") && params.length == 0)
097 {
098 return _factory.hashCode();
099 }
100 else if (methodName.equals("toString") && params.length == 0)
101 {
102 return "[HessianProxy " + proxy.getClass() + "]";
103 }
104
105 Session session = openSession();
106
107 try
108 {
109 Future<MessageTransfer> response = sendRequest(session, method, args);
110
111 MessageTransfer message = _factory.getReadTimeout() > 0 ? response.get(_factory.getReadTimeout(), TimeUnit.MILLISECONDS) : response.get();
112 MessageProperties props = message.getHeader().get(MessageProperties.class);
113 boolean compressed = "deflate".equals(props.getContentEncoding());
114
115 AbstractHessianInput in;
116
117 InputStream is = new ByteArrayInputStream(message.getBodyBytes());
118 if (compressed) {
119 is = new InflaterInputStream(is, new Inflater(true));
120 }
121
122 int code = is.read();
123
124 if (code == 'H')
125 {
126 int major = is.read();
127 int minor = is.read();
128
129 in = _factory.getHessian2Input(is);
130
131 return in.readReply(method.getReturnType());
132 }
133 else if (code == 'r')
134 {
135 int major = is.read();
136 int minor = is.read();
137
138 in = _factory.getHessianInput(is);
139
140 in.startReplyBody();
141
142 Object value = in.readObject(method.getReturnType());
143
144 in.completeReply();
145
146 return value;
147 }
148 else
149 {
150 throw new HessianProtocolException("'" + (char) code + "' is an unknown code");
151 }
152 }
153 catch (HessianProtocolException e)
154 {
155 throw new HessianRuntimeException(e);
156 }
157 finally
158 {
159 session.close();
160 session.getConnection().close();
161 }
162 }
163
164 private Session openSession() throws IOException
165 {
166 Connection conn = _factory.openConnection();
167
168 Session session = conn.createSession(0);
169 session.setAutoSync(true);
170
171 return session;
172 }
173
174 /**
175 * Check if the specified queue exists.
176 *
177 * @param session
178 * @param name
179 */
180 private boolean checkQueue(Session session, String name)
181 {
182 org.apache.qpid.transport.Future<QueueQueryResult> future = session.queueQuery(name);
183 QueueQueryResult result = future.get();
184 return result.hasQueue();
185 }
186
187 private Future<MessageTransfer> sendRequest(Session session, Method method, Object[] args) throws IOException
188 {
189 // check if the request queue exists
190 String requestQueue = getRequestQueue(method.getDeclaringClass());
191 org.apache.qpid.transport.Future<QueueQueryResult> future = session.queueQuery(requestQueue);
192 QueueQueryResult result = future.get();
193
194 if (!checkQueue(session, getRequestQueue(method.getDeclaringClass())))
195 {
196 throw new HessianRuntimeException("Service queue not found: " + requestQueue);
197 }
198
199 // create the temporary queue for the response
200 String replyQueue = "temp." + UUID.randomUUID();
201 createQueue(session, replyQueue);
202
203 byte[] payload = createRequestBody(method, args);
204
205 DeliveryProperties deliveryProps = new DeliveryProperties();
206 deliveryProps.setRoutingKey(requestQueue);
207
208 MessageProperties messageProperties = new MessageProperties();
209 messageProperties.setReplyTo(new ReplyTo("amq.direct", replyQueue));
210 messageProperties.setContentType("x-application/hessian");
211 if (_factory.isCompressed())
212 {
213 messageProperties.setContentEncoding("deflate");
214 }
215
216 ResponseListener listener = new ResponseListener();
217 session.setSessionListener(listener);
218
219 session.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, new Header(deliveryProps, messageProperties), payload);
220 session.sync();
221
222 return listener.getResponse();
223 }
224
225 /**
226 * Return the name of the request queue for the service.
227 */
228 private String getRequestQueue(Class cls)
229 {
230 String requestQueue = cls.getSimpleName();
231 if (_factory.getQueuePrefix() != null)
232 {
233 requestQueue = _factory.getQueuePrefix() + "." + requestQueue;
234 }
235
236 return requestQueue;
237 }
238
239 /**
240 * Create an exclusive queue.
241 *
242 * @param session
243 * @param name the name of the queue
244 */
245 private void createQueue(Session session, String name)
246 {
247 session.queueDeclare(name, null, null, Option.EXCLUSIVE, Option.AUTO_DELETE);
248 session.exchangeBind(name, "amq.direct", name, null);
249 session.messageSubscribe(name, name, MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, null, 0, null);
250
251 // issue credits
252 session.messageFlow(name, MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT);
253 session.messageFlow(name, MessageCreditUnit.MESSAGE, Session.UNLIMITED_CREDIT);
254
255 session.sync();
256 }
257
258 private static class ResponseListener implements SessionListener
259 {
260 boolean done = false;
261
262 private AsyncResponse<MessageTransfer> response = new AsyncResponse<MessageTransfer>();
263
264 public Future<MessageTransfer> getResponse()
265 {
266 return response;
267 }
268
269 public void opened(Session session) { }
270 public void resumed(Session session) { }
271 public void exception(Session session, SessionException exception) { }
272 public void closed(Session session) { }
273
274 public void message(Session session, MessageTransfer xfr)
275 {
276 if (!response.isDone())
277 {
278 session.setSessionListener(null);
279 response.set(xfr);
280 done = true;
281 }
282
283 session.processed(xfr);
284 }
285 }
286
287 private byte[] createRequestBody(Method method, Object[] args) throws IOException
288 {
289 String methodName = method.getName();
290
291 if (_factory.isOverloadEnabled() && args != null && args.length > 0)
292 {
293 methodName = AbstractSkeleton.mangleName(method, false);
294 }
295
296 ByteArrayOutputStream payload = new ByteArrayOutputStream(256);
297 OutputStream os;
298 if (_factory.isCompressed())
299 {
300 Deflater deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true);
301 os = new DeflaterOutputStream(payload, deflater);
302 }
303 else
304 {
305 os = payload;
306 }
307
308 AbstractHessianOutput out = _factory.getHessianOutput(os);
309
310 out.call(methodName, args);
311 if (os instanceof DeflaterOutputStream)
312 {
313 ((DeflaterOutputStream) os).finish();
314 }
315 out.flush();
316
317 return payload.toByteArray();
318 }
319 }