001    /**
002     * Copyright 2010 Emmanuel Bourg
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     *     http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    
017    package org.apache.qpid.contrib.hessian;
018    
019    import java.io.ByteArrayInputStream;
020    import java.io.ByteArrayOutputStream;
021    import java.io.IOException;
022    import java.io.InputStream;
023    import java.io.OutputStream;
024    import java.util.zip.Deflater;
025    import java.util.zip.DeflaterOutputStream;
026    import java.util.zip.Inflater;
027    import java.util.zip.InflaterInputStream;
028    
029    import com.caucho.hessian.io.AbstractHessianOutput;
030    import com.caucho.hessian.io.HessianFactory;
031    import com.caucho.hessian.io.HessianInputFactory;
032    import com.caucho.hessian.io.SerializerFactory;
033    import com.caucho.hessian.server.HessianSkeleton;
034    import org.apache.qpid.transport.Connection;
035    import org.apache.qpid.transport.DeliveryProperties;
036    import org.apache.qpid.transport.Header;
037    import org.apache.qpid.transport.MessageAcceptMode;
038    import org.apache.qpid.transport.MessageAcquireMode;
039    import org.apache.qpid.transport.MessageCreditUnit;
040    import org.apache.qpid.transport.MessageProperties;
041    import org.apache.qpid.transport.MessageTransfer;
042    import org.apache.qpid.transport.Option;
043    import org.apache.qpid.transport.ReplyTo;
044    import org.apache.qpid.transport.Session;
045    import org.apache.qpid.transport.SessionException;
046    import org.apache.qpid.transport.SessionListener;
047    
048    /**
049     * Endpoint for serving Hessian services.
050     * 
051     * This class is derived from {@link com.caucho.hessian.server.HessianServlet}. 
052     * 
053     * @author Emmanuel Bourg
054     */
055    public class HessianEndpoint
056    {
057        private Class serviceAPI;
058        private Object serviceImpl;
059        private SerializerFactory serializerFactory;
060    
061        /** The prefix of the queue created to receive the hessian requests */
062        private String queuePrefix;
063    
064        /**
065         * Creates an hessian endpoint.
066         */
067        public HessianEndpoint()
068        {
069            // Initialize the service
070            setServiceAPI(findRemoteAPI(getClass()));
071            setServiceImpl(this);
072        }
073    
074        /**
075         * Creates an hessian endpoint for the specified service.
076         * 
077         * @param serviceImpl The remote object to be exposed by the endpoint
078         */
079        public HessianEndpoint(Object serviceImpl)
080        {
081            // Initialize the service
082            setServiceAPI(findRemoteAPI(serviceImpl.getClass()));
083            setServiceImpl(serviceImpl);
084        }
085    
086        /**
087         * Specifies the interface of the service.
088         */
089        public void setServiceAPI(Class serviceAPI)
090        {
091            this.serviceAPI = serviceAPI;
092        }
093    
094        /**
095         * Specifies the object implementing the service.
096         */
097        public void setServiceImpl(Object serviceImpl)
098        {
099            this.serviceImpl = serviceImpl;
100            
101        }
102    
103        /**
104         * Sets the serializer factory.
105         */
106        public void setSerializerFactory(SerializerFactory factory)
107        {
108            serializerFactory = factory;
109        }
110    
111        /**
112         * Gets the serializer factory.
113         */
114        public SerializerFactory getSerializerFactory()
115        {
116            if (serializerFactory == null)
117            {
118                serializerFactory = new SerializerFactory();
119            }
120    
121            return serializerFactory;
122        }
123    
124        /**
125         * Returns the prefix of the queue created to receive the hessian requests.
126         */
127        public String getQueuePrefix()
128        {
129            return queuePrefix;
130        }
131    
132        /**
133         * Sets the prefix of the queue created to receive the hessian requests.
134         */
135        public void setQueuePrefix(String prefix)
136        {
137            queuePrefix = prefix;
138        }
139    
140        /**
141         * Sets the serializer send collection java type.
142         */
143        public void setSendCollectionType(boolean sendType)
144        {
145            getSerializerFactory().setSendCollectionType(sendType);
146        }
147    
148        private Class findRemoteAPI(Class implClass)
149        {
150            if (implClass == null)
151            {
152                return null;
153            }
154            
155            Class[] interfaces = implClass.getInterfaces();
156    
157            if (interfaces.length == 1)
158            {
159                return interfaces[0];
160            }
161    
162            return findRemoteAPI(implClass.getSuperclass());
163        }
164    
165        /**
166         * Return the name of the request queue for the service.
167         * The queue name is based on the class of the API implemented.
168         */
169        private String getRequestQueue(Class cls)
170        {
171            String requestQueue = cls.getSimpleName();
172            if (queuePrefix != null)
173            {
174                requestQueue = queuePrefix + "." + requestQueue;
175            }
176            
177            return requestQueue;
178        }
179    
180        /**
181         * Create an exclusive queue.
182         * 
183         * @param session
184         * @param name    the name of the queue
185         */
186        private void createQueue(Session session, String name)
187        {
188            session.queueDeclare(name, null, null, Option.EXCLUSIVE, Option.AUTO_DELETE);
189            session.exchangeBind(name, "amq.direct", name, null);
190            session.messageSubscribe(name, name, MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, null, 0, null);
191            
192            // issue credits
193            session.messageFlow(name, MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT);
194            session.messageFlow(name, MessageCreditUnit.MESSAGE, Session.UNLIMITED_CREDIT);
195            
196            session.sync();
197        }
198    
199        /**
200         * Starts the endpoint on the connection specified. A session bound to a
201         * dedicated queue is created on the connection and a listener is installed
202         * to respond to hessian requests. The endpoint is stopped by closing the
203         * session returned.
204         * 
205         * @param conn The AMQP connection
206         * @return the AMQP session handling the requests for the endpoint
207         */
208        public Session run(Connection conn)
209        {
210            Session session = conn.createSession(0);
211            
212            // create the queue receiving the requests 
213            createQueue(session, getRequestQueue(serviceAPI));
214            
215            session.setSessionListener(new SessionListener()
216            {
217                public void opened(Session session) {}
218                public void exception(Session session, SessionException exception) {}
219                public void closed(Session session) {}
220    
221                public void resumed(final Session session)
222                {
223                    new Thread("Hessian/AMQP Resume Handler [" + getRequestQueue(serviceAPI) + "]")
224                    {
225                        public void run()
226                        {
227                            // recreate the queue
228                            createQueue(session, getRequestQueue(serviceAPI));
229                        }
230                    }.start();
231                }
232    
233                public void message(final Session session, final MessageTransfer xfr)
234                {
235                    // send the response in a separate thread, otherwise the call to session.messageTransfer() blocks
236                    new Thread("Hessian/AMQP Responder [" + getRequestQueue(serviceAPI) + "]")
237                    {
238                        public void run()
239                        {
240                            try
241                            {
242                                sendReponse(session, xfr);
243                            }
244                            catch (IOException e)
245                            {
246                                e.printStackTrace();
247                            }
248                            finally
249                            {
250                                session.processed(xfr);
251                            }
252                        }
253                    }.start();
254                }
255    
256                private void sendReponse(Session session, MessageTransfer xfr) throws IOException
257                {
258                    MessageProperties props = xfr.getHeader().get(MessageProperties.class);
259                    ReplyTo from = props.getReplyTo();
260                    boolean compressed = "deflate".equals(props.getContentEncoding());
261    
262                    DeliveryProperties deliveryProps = new DeliveryProperties();
263                    deliveryProps.setRoutingKey(from.getRoutingKey());
264                    
265                    byte[] response;
266                    try
267                    {
268                        response = createResponseBody(xfr.getBodyBytes(), compressed);
269                    }
270                    catch (Exception e)
271                    {
272                        e.printStackTrace();
273                        compressed = false;
274                        response = createFaultBody(xfr.getBodyBytes(), e);
275                    }
276                    
277                    MessageProperties messageProperties = new MessageProperties();
278                    messageProperties.setContentType("x-application/hessian");
279                    if (compressed)
280                    {
281                        messageProperties.setContentEncoding("deflate");
282                    }
283    
284                    session.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, new Header(deliveryProps, messageProperties), response);
285                    session.sync();
286                }
287            });
288            
289            return session;
290        }
291    
292        /**
293         * Execute a request.
294         */
295        private byte[] createResponseBody(byte[] request, boolean compressed) throws Exception
296        {
297            InputStream in = new ByteArrayInputStream(request);
298            if (compressed)
299            {
300                in = new InflaterInputStream(new ByteArrayInputStream(request), new Inflater(true));
301            }
302            
303            ByteArrayOutputStream bout = new ByteArrayOutputStream();
304            OutputStream out;
305            if (compressed)
306            {
307                Deflater deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true);
308                out = new DeflaterOutputStream(bout, deflater);
309            }
310            else
311            {
312                out = bout;
313            }
314            
315            HessianSkeleton skeleton = new HessianSkeleton(serviceImpl, serviceAPI);
316            skeleton.invoke(in, out, getSerializerFactory());
317            
318            if (out instanceof DeflaterOutputStream)
319            {
320                ((DeflaterOutputStream) out).finish();
321            }
322            out.flush();
323            out.close();
324    
325            return bout.toByteArray();
326        }
327    
328        private byte[] createFaultBody(byte[] request, Throwable cause)
329        {
330            try
331            {
332                ByteArrayInputStream is = new ByteArrayInputStream(request);
333                ByteArrayOutputStream os = new ByteArrayOutputStream();
334    
335                AbstractHessianOutput out = createHessianOutput(new HessianInputFactory().readHeader(is), os);
336    
337                out.writeFault(cause.getClass().getSimpleName(), cause.getMessage(), cause);
338                out.close();
339    
340                return os.toByteArray();
341            }
342            catch (IOException e)
343            {
344                throw new RuntimeException(e);
345            }
346        }
347    
348        private AbstractHessianOutput createHessianOutput(HessianInputFactory.HeaderType header, OutputStream os)
349        {
350            AbstractHessianOutput out;
351            
352            HessianFactory hessianfactory = new HessianFactory();
353            switch (header)
354            {
355                case CALL_1_REPLY_1:
356                    out = hessianfactory.createHessianOutput(os);
357                    break;
358    
359                case CALL_1_REPLY_2:
360                case HESSIAN_2:
361                    out = hessianfactory.createHessian2Output(os);
362                    break;
363    
364                default:
365                    throw new IllegalStateException(header + " is an unknown Hessian call");
366            }
367            
368            return out;
369        }
370    }