001 /**
002 * Copyright 2010 Emmanuel Bourg
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017 package org.apache.qpid.contrib.hessian;
018
019 import java.io.ByteArrayInputStream;
020 import java.io.ByteArrayOutputStream;
021 import java.io.IOException;
022 import java.io.InputStream;
023 import java.io.OutputStream;
024 import java.util.zip.Deflater;
025 import java.util.zip.DeflaterOutputStream;
026 import java.util.zip.Inflater;
027 import java.util.zip.InflaterInputStream;
028
029 import com.caucho.hessian.io.AbstractHessianOutput;
030 import com.caucho.hessian.io.HessianFactory;
031 import com.caucho.hessian.io.HessianInputFactory;
032 import com.caucho.hessian.io.SerializerFactory;
033 import com.caucho.hessian.server.HessianSkeleton;
034 import org.apache.qpid.transport.Connection;
035 import org.apache.qpid.transport.DeliveryProperties;
036 import org.apache.qpid.transport.Header;
037 import org.apache.qpid.transport.MessageAcceptMode;
038 import org.apache.qpid.transport.MessageAcquireMode;
039 import org.apache.qpid.transport.MessageCreditUnit;
040 import org.apache.qpid.transport.MessageProperties;
041 import org.apache.qpid.transport.MessageTransfer;
042 import org.apache.qpid.transport.Option;
043 import org.apache.qpid.transport.ReplyTo;
044 import org.apache.qpid.transport.Session;
045 import org.apache.qpid.transport.SessionException;
046 import org.apache.qpid.transport.SessionListener;
047
048 /**
049 * Endpoint for serving Hessian services.
050 *
051 * This class is derived from {@link com.caucho.hessian.server.HessianServlet}.
052 *
053 * @author Emmanuel Bourg
054 */
055 public class HessianEndpoint
056 {
057 private Class serviceAPI;
058 private Object serviceImpl;
059 private SerializerFactory serializerFactory;
060
061 /** The prefix of the queue created to receive the hessian requests */
062 private String queuePrefix;
063
064 /**
065 * Creates an hessian endpoint.
066 */
067 public HessianEndpoint()
068 {
069 // Initialize the service
070 setServiceAPI(findRemoteAPI(getClass()));
071 setServiceImpl(this);
072 }
073
074 /**
075 * Creates an hessian endpoint for the specified service.
076 *
077 * @param serviceImpl The remote object to be exposed by the endpoint
078 */
079 public HessianEndpoint(Object serviceImpl)
080 {
081 // Initialize the service
082 setServiceAPI(findRemoteAPI(serviceImpl.getClass()));
083 setServiceImpl(serviceImpl);
084 }
085
086 /**
087 * Specifies the interface of the service.
088 */
089 public void setServiceAPI(Class serviceAPI)
090 {
091 this.serviceAPI = serviceAPI;
092 }
093
094 /**
095 * Specifies the object implementing the service.
096 */
097 public void setServiceImpl(Object serviceImpl)
098 {
099 this.serviceImpl = serviceImpl;
100
101 }
102
103 /**
104 * Sets the serializer factory.
105 */
106 public void setSerializerFactory(SerializerFactory factory)
107 {
108 serializerFactory = factory;
109 }
110
111 /**
112 * Gets the serializer factory.
113 */
114 public SerializerFactory getSerializerFactory()
115 {
116 if (serializerFactory == null)
117 {
118 serializerFactory = new SerializerFactory();
119 }
120
121 return serializerFactory;
122 }
123
124 /**
125 * Returns the prefix of the queue created to receive the hessian requests.
126 */
127 public String getQueuePrefix()
128 {
129 return queuePrefix;
130 }
131
132 /**
133 * Sets the prefix of the queue created to receive the hessian requests.
134 */
135 public void setQueuePrefix(String prefix)
136 {
137 queuePrefix = prefix;
138 }
139
140 /**
141 * Sets the serializer send collection java type.
142 */
143 public void setSendCollectionType(boolean sendType)
144 {
145 getSerializerFactory().setSendCollectionType(sendType);
146 }
147
148 private Class findRemoteAPI(Class implClass)
149 {
150 if (implClass == null)
151 {
152 return null;
153 }
154
155 Class[] interfaces = implClass.getInterfaces();
156
157 if (interfaces.length == 1)
158 {
159 return interfaces[0];
160 }
161
162 return findRemoteAPI(implClass.getSuperclass());
163 }
164
165 /**
166 * Return the name of the request queue for the service.
167 * The queue name is based on the class of the API implemented.
168 */
169 private String getRequestQueue(Class cls)
170 {
171 String requestQueue = cls.getSimpleName();
172 if (queuePrefix != null)
173 {
174 requestQueue = queuePrefix + "." + requestQueue;
175 }
176
177 return requestQueue;
178 }
179
180 /**
181 * Create an exclusive queue.
182 *
183 * @param session
184 * @param name the name of the queue
185 */
186 private void createQueue(Session session, String name)
187 {
188 session.queueDeclare(name, null, null, Option.EXCLUSIVE, Option.AUTO_DELETE);
189 session.exchangeBind(name, "amq.direct", name, null);
190 session.messageSubscribe(name, name, MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, null, 0, null);
191
192 // issue credits
193 session.messageFlow(name, MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT);
194 session.messageFlow(name, MessageCreditUnit.MESSAGE, Session.UNLIMITED_CREDIT);
195
196 session.sync();
197 }
198
199 /**
200 * Starts the endpoint on the connection specified. A session bound to a
201 * dedicated queue is created on the connection and a listener is installed
202 * to respond to hessian requests. The endpoint is stopped by closing the
203 * session returned.
204 *
205 * @param conn The AMQP connection
206 * @return the AMQP session handling the requests for the endpoint
207 */
208 public Session run(Connection conn)
209 {
210 Session session = conn.createSession(0);
211
212 // create the queue receiving the requests
213 createQueue(session, getRequestQueue(serviceAPI));
214
215 session.setSessionListener(new SessionListener()
216 {
217 public void opened(Session session) {}
218 public void exception(Session session, SessionException exception) {}
219 public void closed(Session session) {}
220
221 public void resumed(final Session session)
222 {
223 new Thread("Hessian/AMQP Resume Handler [" + getRequestQueue(serviceAPI) + "]")
224 {
225 public void run()
226 {
227 // recreate the queue
228 createQueue(session, getRequestQueue(serviceAPI));
229 }
230 }.start();
231 }
232
233 public void message(final Session session, final MessageTransfer xfr)
234 {
235 // send the response in a separate thread, otherwise the call to session.messageTransfer() blocks
236 new Thread("Hessian/AMQP Responder [" + getRequestQueue(serviceAPI) + "]")
237 {
238 public void run()
239 {
240 try
241 {
242 sendReponse(session, xfr);
243 }
244 catch (IOException e)
245 {
246 e.printStackTrace();
247 }
248 finally
249 {
250 session.processed(xfr);
251 }
252 }
253 }.start();
254 }
255
256 private void sendReponse(Session session, MessageTransfer xfr) throws IOException
257 {
258 MessageProperties props = xfr.getHeader().get(MessageProperties.class);
259 ReplyTo from = props.getReplyTo();
260 boolean compressed = "deflate".equals(props.getContentEncoding());
261
262 DeliveryProperties deliveryProps = new DeliveryProperties();
263 deliveryProps.setRoutingKey(from.getRoutingKey());
264
265 byte[] response;
266 try
267 {
268 response = createResponseBody(xfr.getBodyBytes(), compressed);
269 }
270 catch (Exception e)
271 {
272 e.printStackTrace();
273 compressed = false;
274 response = createFaultBody(xfr.getBodyBytes(), e);
275 }
276
277 MessageProperties messageProperties = new MessageProperties();
278 messageProperties.setContentType("x-application/hessian");
279 if (compressed)
280 {
281 messageProperties.setContentEncoding("deflate");
282 }
283
284 session.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, new Header(deliveryProps, messageProperties), response);
285 session.sync();
286 }
287 });
288
289 return session;
290 }
291
292 /**
293 * Execute a request.
294 */
295 private byte[] createResponseBody(byte[] request, boolean compressed) throws Exception
296 {
297 InputStream in = new ByteArrayInputStream(request);
298 if (compressed)
299 {
300 in = new InflaterInputStream(new ByteArrayInputStream(request), new Inflater(true));
301 }
302
303 ByteArrayOutputStream bout = new ByteArrayOutputStream();
304 OutputStream out;
305 if (compressed)
306 {
307 Deflater deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true);
308 out = new DeflaterOutputStream(bout, deflater);
309 }
310 else
311 {
312 out = bout;
313 }
314
315 HessianSkeleton skeleton = new HessianSkeleton(serviceImpl, serviceAPI);
316 skeleton.invoke(in, out, getSerializerFactory());
317
318 if (out instanceof DeflaterOutputStream)
319 {
320 ((DeflaterOutputStream) out).finish();
321 }
322 out.flush();
323 out.close();
324
325 return bout.toByteArray();
326 }
327
328 private byte[] createFaultBody(byte[] request, Throwable cause)
329 {
330 try
331 {
332 ByteArrayInputStream is = new ByteArrayInputStream(request);
333 ByteArrayOutputStream os = new ByteArrayOutputStream();
334
335 AbstractHessianOutput out = createHessianOutput(new HessianInputFactory().readHeader(is), os);
336
337 out.writeFault(cause.getClass().getSimpleName(), cause.getMessage(), cause);
338 out.close();
339
340 return os.toByteArray();
341 }
342 catch (IOException e)
343 {
344 throw new RuntimeException(e);
345 }
346 }
347
348 private AbstractHessianOutput createHessianOutput(HessianInputFactory.HeaderType header, OutputStream os)
349 {
350 AbstractHessianOutput out;
351
352 HessianFactory hessianfactory = new HessianFactory();
353 switch (header)
354 {
355 case CALL_1_REPLY_1:
356 out = hessianfactory.createHessianOutput(os);
357 break;
358
359 case CALL_1_REPLY_2:
360 case HESSIAN_2:
361 out = hessianfactory.createHessian2Output(os);
362 break;
363
364 default:
365 throw new IllegalStateException(header + " is an unknown Hessian call");
366 }
367
368 return out;
369 }
370 }