001 /** 002 * Copyright 2010 Emmanuel Bourg 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017 package org.apache.qpid.contrib.hessian; 018 019 import java.io.ByteArrayInputStream; 020 import java.io.ByteArrayOutputStream; 021 import java.io.IOException; 022 import java.io.InputStream; 023 import java.io.OutputStream; 024 import java.lang.reflect.InvocationHandler; 025 import java.lang.reflect.Method; 026 import java.lang.reflect.Proxy; 027 import java.util.UUID; 028 import java.util.concurrent.Future; 029 import java.util.concurrent.TimeUnit; 030 import java.util.zip.Deflater; 031 import java.util.zip.DeflaterOutputStream; 032 import java.util.zip.Inflater; 033 import java.util.zip.InflaterInputStream; 034 035 import com.caucho.hessian.client.HessianRuntimeException; 036 import com.caucho.hessian.io.AbstractHessianInput; 037 import com.caucho.hessian.io.AbstractHessianOutput; 038 import com.caucho.hessian.io.HessianProtocolException; 039 import com.caucho.services.server.AbstractSkeleton; 040 import org.apache.qpid.transport.Connection; 041 import org.apache.qpid.transport.DeliveryProperties; 042 import org.apache.qpid.transport.Header; 043 import org.apache.qpid.transport.MessageAcceptMode; 044 import org.apache.qpid.transport.MessageAcquireMode; 045 import org.apache.qpid.transport.MessageCreditUnit; 046 import org.apache.qpid.transport.MessageProperties; 047 import org.apache.qpid.transport.MessageTransfer; 048 import org.apache.qpid.transport.Option; 049 import org.apache.qpid.transport.QueueQueryResult; 050 import org.apache.qpid.transport.ReplyTo; 051 import org.apache.qpid.transport.Session; 052 import org.apache.qpid.transport.SessionException; 053 import org.apache.qpid.transport.SessionListener; 054 055 /** 056 * Proxy implementation for Hessian clients. Applications will generally 057 * use {@link AMQPHessianProxyFactory} to create proxy clients. 058 * 059 * @author Emmanuel Bourg 060 * @author Scott Ferguson 061 */ 062 public class AMQPHessianProxy implements InvocationHandler 063 { 064 private AMQPHessianProxyFactory _factory; 065 066 AMQPHessianProxy(AMQPHessianProxyFactory factory) 067 { 068 _factory = factory; 069 } 070 071 /** 072 * Handles the object invocation. 073 * 074 * @param proxy the proxy object to invoke 075 * @param method the method to call 076 * @param args the arguments to the proxy object 077 */ 078 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable 079 { 080 String methodName = method.getName(); 081 Class[] params = method.getParameterTypes(); 082 083 // equals and hashCode are special cased 084 if (methodName.equals("equals") && params.length == 1 && params[0].equals(Object.class)) 085 { 086 Object value = args[0]; 087 if (value == null || !Proxy.isProxyClass(value.getClass())) 088 { 089 return Boolean.FALSE; 090 } 091 092 AMQPHessianProxy handler = (AMQPHessianProxy) Proxy.getInvocationHandler(value); 093 094 return _factory.equals(handler._factory); 095 } 096 else if (methodName.equals("hashCode") && params.length == 0) 097 { 098 return _factory.hashCode(); 099 } 100 else if (methodName.equals("toString") && params.length == 0) 101 { 102 return "[HessianProxy " + proxy.getClass() + "]"; 103 } 104 105 Session session = openSession(); 106 107 try 108 { 109 Future<MessageTransfer> response = sendRequest(session, method, args); 110 111 MessageTransfer message = _factory.getReadTimeout() > 0 ? response.get(_factory.getReadTimeout(), TimeUnit.MILLISECONDS) : response.get(); 112 MessageProperties props = message.getHeader().get(MessageProperties.class); 113 boolean compressed = "deflate".equals(props.getContentEncoding()); 114 115 AbstractHessianInput in; 116 117 InputStream is = new ByteArrayInputStream(message.getBodyBytes()); 118 if (compressed) { 119 is = new InflaterInputStream(is, new Inflater(true)); 120 } 121 122 int code = is.read(); 123 124 if (code == 'H') 125 { 126 int major = is.read(); 127 int minor = is.read(); 128 129 in = _factory.getHessian2Input(is); 130 131 return in.readReply(method.getReturnType()); 132 } 133 else if (code == 'r') 134 { 135 int major = is.read(); 136 int minor = is.read(); 137 138 in = _factory.getHessianInput(is); 139 140 in.startReplyBody(); 141 142 Object value = in.readObject(method.getReturnType()); 143 144 in.completeReply(); 145 146 return value; 147 } 148 else 149 { 150 throw new HessianProtocolException("'" + (char) code + "' is an unknown code"); 151 } 152 } 153 catch (HessianProtocolException e) 154 { 155 throw new HessianRuntimeException(e); 156 } 157 finally 158 { 159 session.close(); 160 session.getConnection().close(); 161 } 162 } 163 164 private Session openSession() throws IOException 165 { 166 Connection conn = _factory.openConnection(); 167 168 Session session = conn.createSession(0); 169 session.setAutoSync(true); 170 171 return session; 172 } 173 174 /** 175 * Check if the specified queue exists. 176 * 177 * @param session 178 * @param name 179 */ 180 private boolean checkQueue(Session session, String name) 181 { 182 org.apache.qpid.transport.Future<QueueQueryResult> future = session.queueQuery(name); 183 QueueQueryResult result = future.get(); 184 return result.hasQueue(); 185 } 186 187 private Future<MessageTransfer> sendRequest(Session session, Method method, Object[] args) throws IOException 188 { 189 // check if the request queue exists 190 String requestQueue = getRequestQueue(method.getDeclaringClass()); 191 org.apache.qpid.transport.Future<QueueQueryResult> future = session.queueQuery(requestQueue); 192 QueueQueryResult result = future.get(); 193 194 if (!checkQueue(session, getRequestQueue(method.getDeclaringClass()))) 195 { 196 throw new HessianRuntimeException("Service queue not found: " + requestQueue); 197 } 198 199 // create the temporary queue for the response 200 String replyQueue = "temp." + UUID.randomUUID(); 201 createQueue(session, replyQueue); 202 203 byte[] payload = createRequestBody(method, args); 204 205 DeliveryProperties deliveryProps = new DeliveryProperties(); 206 deliveryProps.setRoutingKey(requestQueue); 207 208 MessageProperties messageProperties = new MessageProperties(); 209 messageProperties.setReplyTo(new ReplyTo("amq.direct", replyQueue)); 210 messageProperties.setContentType("x-application/hessian"); 211 if (_factory.isCompressed()) 212 { 213 messageProperties.setContentEncoding("deflate"); 214 } 215 216 ResponseListener listener = new ResponseListener(); 217 session.setSessionListener(listener); 218 219 session.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, new Header(deliveryProps, messageProperties), payload); 220 session.sync(); 221 222 return listener.getResponse(); 223 } 224 225 /** 226 * Return the name of the request queue for the service. 227 */ 228 private String getRequestQueue(Class cls) 229 { 230 String requestQueue = cls.getSimpleName(); 231 if (_factory.getQueuePrefix() != null) 232 { 233 requestQueue = _factory.getQueuePrefix() + "." + requestQueue; 234 } 235 236 return requestQueue; 237 } 238 239 /** 240 * Create an exclusive queue. 241 * 242 * @param session 243 * @param name the name of the queue 244 */ 245 private void createQueue(Session session, String name) 246 { 247 session.queueDeclare(name, null, null, Option.EXCLUSIVE, Option.AUTO_DELETE); 248 session.exchangeBind(name, "amq.direct", name, null); 249 session.messageSubscribe(name, name, MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, null, 0, null); 250 251 // issue credits 252 session.messageFlow(name, MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT); 253 session.messageFlow(name, MessageCreditUnit.MESSAGE, Session.UNLIMITED_CREDIT); 254 255 session.sync(); 256 } 257 258 private static class ResponseListener implements SessionListener 259 { 260 boolean done = false; 261 262 private AsyncResponse<MessageTransfer> response = new AsyncResponse<MessageTransfer>(); 263 264 public Future<MessageTransfer> getResponse() 265 { 266 return response; 267 } 268 269 public void opened(Session session) { } 270 public void resumed(Session session) { } 271 public void exception(Session session, SessionException exception) { } 272 public void closed(Session session) { } 273 274 public void message(Session session, MessageTransfer xfr) 275 { 276 if (!response.isDone()) 277 { 278 session.setSessionListener(null); 279 response.set(xfr); 280 done = true; 281 } 282 283 session.processed(xfr); 284 } 285 } 286 287 private byte[] createRequestBody(Method method, Object[] args) throws IOException 288 { 289 String methodName = method.getName(); 290 291 if (_factory.isOverloadEnabled() && args != null && args.length > 0) 292 { 293 methodName = AbstractSkeleton.mangleName(method, false); 294 } 295 296 ByteArrayOutputStream payload = new ByteArrayOutputStream(256); 297 OutputStream os; 298 if (_factory.isCompressed()) 299 { 300 Deflater deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true); 301 os = new DeflaterOutputStream(payload, deflater); 302 } 303 else 304 { 305 os = payload; 306 } 307 308 AbstractHessianOutput out = _factory.getHessianOutput(os); 309 310 out.call(methodName, args); 311 if (os instanceof DeflaterOutputStream) 312 { 313 ((DeflaterOutputStream) os).finish(); 314 } 315 out.flush(); 316 317 return payload.toByteArray(); 318 } 319 }