001 /** 002 * Copyright 2010 Emmanuel Bourg 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017 package org.apache.qpid.contrib.hessian; 018 019 import java.io.ByteArrayInputStream; 020 import java.io.ByteArrayOutputStream; 021 import java.io.IOException; 022 import java.io.InputStream; 023 import java.io.OutputStream; 024 import java.util.zip.Deflater; 025 import java.util.zip.DeflaterOutputStream; 026 import java.util.zip.Inflater; 027 import java.util.zip.InflaterInputStream; 028 029 import com.caucho.hessian.io.AbstractHessianOutput; 030 import com.caucho.hessian.io.HessianFactory; 031 import com.caucho.hessian.io.HessianInputFactory; 032 import com.caucho.hessian.io.SerializerFactory; 033 import com.caucho.hessian.server.HessianSkeleton; 034 import org.apache.qpid.transport.Connection; 035 import org.apache.qpid.transport.DeliveryProperties; 036 import org.apache.qpid.transport.Header; 037 import org.apache.qpid.transport.MessageAcceptMode; 038 import org.apache.qpid.transport.MessageAcquireMode; 039 import org.apache.qpid.transport.MessageCreditUnit; 040 import org.apache.qpid.transport.MessageProperties; 041 import org.apache.qpid.transport.MessageTransfer; 042 import org.apache.qpid.transport.Option; 043 import org.apache.qpid.transport.ReplyTo; 044 import org.apache.qpid.transport.Session; 045 import org.apache.qpid.transport.SessionException; 046 import org.apache.qpid.transport.SessionListener; 047 048 /** 049 * Endpoint for serving Hessian services. 050 * 051 * This class is derived from {@link com.caucho.hessian.server.HessianServlet}. 052 * 053 * @author Emmanuel Bourg 054 */ 055 public class HessianEndpoint 056 { 057 private Class serviceAPI; 058 private Object serviceImpl; 059 private SerializerFactory serializerFactory; 060 061 /** The prefix of the queue created to receive the hessian requests */ 062 private String queuePrefix; 063 064 /** 065 * Creates an hessian endpoint. 066 */ 067 public HessianEndpoint() 068 { 069 // Initialize the service 070 setServiceAPI(findRemoteAPI(getClass())); 071 setServiceImpl(this); 072 } 073 074 /** 075 * Creates an hessian endpoint for the specified service. 076 * 077 * @param serviceImpl The remote object to be exposed by the endpoint 078 */ 079 public HessianEndpoint(Object serviceImpl) 080 { 081 // Initialize the service 082 setServiceAPI(findRemoteAPI(serviceImpl.getClass())); 083 setServiceImpl(serviceImpl); 084 } 085 086 /** 087 * Specifies the interface of the service. 088 */ 089 public void setServiceAPI(Class serviceAPI) 090 { 091 this.serviceAPI = serviceAPI; 092 } 093 094 /** 095 * Specifies the object implementing the service. 096 */ 097 public void setServiceImpl(Object serviceImpl) 098 { 099 this.serviceImpl = serviceImpl; 100 101 } 102 103 /** 104 * Sets the serializer factory. 105 */ 106 public void setSerializerFactory(SerializerFactory factory) 107 { 108 serializerFactory = factory; 109 } 110 111 /** 112 * Gets the serializer factory. 113 */ 114 public SerializerFactory getSerializerFactory() 115 { 116 if (serializerFactory == null) 117 { 118 serializerFactory = new SerializerFactory(); 119 } 120 121 return serializerFactory; 122 } 123 124 /** 125 * Returns the prefix of the queue created to receive the hessian requests. 126 */ 127 public String getQueuePrefix() 128 { 129 return queuePrefix; 130 } 131 132 /** 133 * Sets the prefix of the queue created to receive the hessian requests. 134 */ 135 public void setQueuePrefix(String prefix) 136 { 137 queuePrefix = prefix; 138 } 139 140 /** 141 * Sets the serializer send collection java type. 142 */ 143 public void setSendCollectionType(boolean sendType) 144 { 145 getSerializerFactory().setSendCollectionType(sendType); 146 } 147 148 private Class findRemoteAPI(Class implClass) 149 { 150 if (implClass == null) 151 { 152 return null; 153 } 154 155 Class[] interfaces = implClass.getInterfaces(); 156 157 if (interfaces.length == 1) 158 { 159 return interfaces[0]; 160 } 161 162 return findRemoteAPI(implClass.getSuperclass()); 163 } 164 165 /** 166 * Return the name of the request queue for the service. 167 * The queue name is based on the class of the API implemented. 168 */ 169 private String getRequestQueue(Class cls) 170 { 171 String requestQueue = cls.getSimpleName(); 172 if (queuePrefix != null) 173 { 174 requestQueue = queuePrefix + "." + requestQueue; 175 } 176 177 return requestQueue; 178 } 179 180 /** 181 * Create an exclusive queue. 182 * 183 * @param session 184 * @param name the name of the queue 185 */ 186 private void createQueue(Session session, String name) 187 { 188 session.queueDeclare(name, null, null, Option.EXCLUSIVE, Option.AUTO_DELETE); 189 session.exchangeBind(name, "amq.direct", name, null); 190 session.messageSubscribe(name, name, MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, null, 0, null); 191 192 // issue credits 193 session.messageFlow(name, MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT); 194 session.messageFlow(name, MessageCreditUnit.MESSAGE, Session.UNLIMITED_CREDIT); 195 196 session.sync(); 197 } 198 199 /** 200 * Starts the endpoint on the connection specified. A session bound to a 201 * dedicated queue is created on the connection and a listener is installed 202 * to respond to hessian requests. The endpoint is stopped by closing the 203 * session returned. 204 * 205 * @param conn The AMQP connection 206 * @return the AMQP session handling the requests for the endpoint 207 */ 208 public Session run(Connection conn) 209 { 210 Session session = conn.createSession(0); 211 212 // create the queue receiving the requests 213 createQueue(session, getRequestQueue(serviceAPI)); 214 215 session.setSessionListener(new SessionListener() 216 { 217 public void opened(Session session) {} 218 public void exception(Session session, SessionException exception) {} 219 public void closed(Session session) {} 220 221 public void resumed(final Session session) 222 { 223 new Thread("Hessian/AMQP Resume Handler [" + getRequestQueue(serviceAPI) + "]") 224 { 225 public void run() 226 { 227 // recreate the queue 228 createQueue(session, getRequestQueue(serviceAPI)); 229 } 230 }.start(); 231 } 232 233 public void message(final Session session, final MessageTransfer xfr) 234 { 235 // send the response in a separate thread, otherwise the call to session.messageTransfer() blocks 236 new Thread("Hessian/AMQP Responder [" + getRequestQueue(serviceAPI) + "]") 237 { 238 public void run() 239 { 240 try 241 { 242 sendReponse(session, xfr); 243 } 244 catch (IOException e) 245 { 246 e.printStackTrace(); 247 } 248 finally 249 { 250 session.processed(xfr); 251 } 252 } 253 }.start(); 254 } 255 256 private void sendReponse(Session session, MessageTransfer xfr) throws IOException 257 { 258 MessageProperties props = xfr.getHeader().get(MessageProperties.class); 259 ReplyTo from = props.getReplyTo(); 260 boolean compressed = "deflate".equals(props.getContentEncoding()); 261 262 DeliveryProperties deliveryProps = new DeliveryProperties(); 263 deliveryProps.setRoutingKey(from.getRoutingKey()); 264 265 byte[] response; 266 try 267 { 268 response = createResponseBody(xfr.getBodyBytes(), compressed); 269 } 270 catch (Exception e) 271 { 272 e.printStackTrace(); 273 compressed = false; 274 response = createFaultBody(xfr.getBodyBytes(), e); 275 } 276 277 MessageProperties messageProperties = new MessageProperties(); 278 messageProperties.setContentType("x-application/hessian"); 279 if (compressed) 280 { 281 messageProperties.setContentEncoding("deflate"); 282 } 283 284 session.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, new Header(deliveryProps, messageProperties), response); 285 session.sync(); 286 } 287 }); 288 289 return session; 290 } 291 292 /** 293 * Execute a request. 294 */ 295 private byte[] createResponseBody(byte[] request, boolean compressed) throws Exception 296 { 297 InputStream in = new ByteArrayInputStream(request); 298 if (compressed) 299 { 300 in = new InflaterInputStream(new ByteArrayInputStream(request), new Inflater(true)); 301 } 302 303 ByteArrayOutputStream bout = new ByteArrayOutputStream(); 304 OutputStream out; 305 if (compressed) 306 { 307 Deflater deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true); 308 out = new DeflaterOutputStream(bout, deflater); 309 } 310 else 311 { 312 out = bout; 313 } 314 315 HessianSkeleton skeleton = new HessianSkeleton(serviceImpl, serviceAPI); 316 skeleton.invoke(in, out, getSerializerFactory()); 317 318 if (out instanceof DeflaterOutputStream) 319 { 320 ((DeflaterOutputStream) out).finish(); 321 } 322 out.flush(); 323 out.close(); 324 325 return bout.toByteArray(); 326 } 327 328 private byte[] createFaultBody(byte[] request, Throwable cause) 329 { 330 try 331 { 332 ByteArrayInputStream is = new ByteArrayInputStream(request); 333 ByteArrayOutputStream os = new ByteArrayOutputStream(); 334 335 AbstractHessianOutput out = createHessianOutput(new HessianInputFactory().readHeader(is), os); 336 337 out.writeFault(cause.getClass().getSimpleName(), cause.getMessage(), cause); 338 out.close(); 339 340 return os.toByteArray(); 341 } 342 catch (IOException e) 343 { 344 throw new RuntimeException(e); 345 } 346 } 347 348 private AbstractHessianOutput createHessianOutput(HessianInputFactory.HeaderType header, OutputStream os) 349 { 350 AbstractHessianOutput out; 351 352 HessianFactory hessianfactory = new HessianFactory(); 353 switch (header) 354 { 355 case CALL_1_REPLY_1: 356 out = hessianfactory.createHessianOutput(os); 357 break; 358 359 case CALL_1_REPLY_2: 360 case HESSIAN_2: 361 out = hessianfactory.createHessian2Output(os); 362 break; 363 364 default: 365 throw new IllegalStateException(header + " is an unknown Hessian call"); 366 } 367 368 return out; 369 } 370 }