001    /**
002     * Copyright 2010 Emmanuel Bourg
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     *     http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    
017    package org.apache.qpid.contrib.hessian;
018    
019    import java.io.ByteArrayInputStream;
020    import java.io.ByteArrayOutputStream;
021    import java.io.IOException;
022    import java.io.InputStream;
023    import java.io.OutputStream;
024    import java.lang.reflect.InvocationHandler;
025    import java.lang.reflect.Method;
026    import java.lang.reflect.Proxy;
027    import java.util.UUID;
028    import java.util.concurrent.Future;
029    import java.util.concurrent.TimeUnit;
030    import java.util.zip.Deflater;
031    import java.util.zip.DeflaterOutputStream;
032    import java.util.zip.Inflater;
033    import java.util.zip.InflaterInputStream;
034    
035    import com.caucho.hessian.client.HessianRuntimeException;
036    import com.caucho.hessian.io.AbstractHessianInput;
037    import com.caucho.hessian.io.AbstractHessianOutput;
038    import com.caucho.hessian.io.HessianProtocolException;
039    import com.caucho.services.server.AbstractSkeleton;
040    import org.apache.qpid.transport.Connection;
041    import org.apache.qpid.transport.DeliveryProperties;
042    import org.apache.qpid.transport.Header;
043    import org.apache.qpid.transport.MessageAcceptMode;
044    import org.apache.qpid.transport.MessageAcquireMode;
045    import org.apache.qpid.transport.MessageCreditUnit;
046    import org.apache.qpid.transport.MessageProperties;
047    import org.apache.qpid.transport.MessageTransfer;
048    import org.apache.qpid.transport.Option;
049    import org.apache.qpid.transport.QueueQueryResult;
050    import org.apache.qpid.transport.ReplyTo;
051    import org.apache.qpid.transport.Session;
052    import org.apache.qpid.transport.SessionException;
053    import org.apache.qpid.transport.SessionListener;
054    
055    /**
056     * Proxy implementation for Hessian clients. Applications will generally
057     * use {@link AMQPHessianProxyFactory} to create proxy clients.
058     * 
059     * @author Emmanuel Bourg
060     * @author Scott Ferguson
061     */
062    public class AMQPHessianProxy implements InvocationHandler
063    {
064        private AMQPHessianProxyFactory _factory;
065    
066        AMQPHessianProxy(AMQPHessianProxyFactory factory)
067        {
068            _factory = factory;
069        }
070    
071        /**
072         * Handles the object invocation.
073         *
074         * @param proxy  the proxy object to invoke
075         * @param method the method to call
076         * @param args   the arguments to the proxy object
077         */
078        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
079        {
080            String methodName = method.getName();
081            Class[] params = method.getParameterTypes();
082    
083            // equals and hashCode are special cased
084            if (methodName.equals("equals") && params.length == 1 && params[0].equals(Object.class))
085            {
086                Object value = args[0];
087                if (value == null || !Proxy.isProxyClass(value.getClass()))
088                {
089                    return Boolean.FALSE;
090                }
091    
092                AMQPHessianProxy handler = (AMQPHessianProxy) Proxy.getInvocationHandler(value);
093    
094                return _factory.equals(handler._factory);
095            }
096            else if (methodName.equals("hashCode") && params.length == 0)
097            {
098                return _factory.hashCode();
099            }
100            else if (methodName.equals("toString") && params.length == 0)
101            {
102                return "[HessianProxy " + proxy.getClass() + "]";
103            }
104    
105            Session session = openSession();
106    
107            try
108            {
109                Future<MessageTransfer> response = sendRequest(session, method, args);
110                
111                MessageTransfer message = _factory.getReadTimeout() > 0 ? response.get(_factory.getReadTimeout(), TimeUnit.MILLISECONDS) : response.get();
112                MessageProperties props = message.getHeader().get(MessageProperties.class);
113                boolean compressed = "deflate".equals(props.getContentEncoding());
114                
115                AbstractHessianInput in;
116                
117                InputStream is = new ByteArrayInputStream(message.getBodyBytes());
118                if (compressed) {
119                    is = new InflaterInputStream(is, new Inflater(true));
120                }
121                
122                int code = is.read();
123    
124                if (code == 'H')
125                {
126                    int major = is.read();
127                    int minor = is.read();
128    
129                    in = _factory.getHessian2Input(is);
130    
131                    return in.readReply(method.getReturnType());
132                }
133                else if (code == 'r')
134                {
135                    int major = is.read();
136                    int minor = is.read();
137    
138                    in = _factory.getHessianInput(is);
139    
140                    in.startReplyBody();
141    
142                    Object value = in.readObject(method.getReturnType());
143    
144                    in.completeReply();
145    
146                    return value;
147                }
148                else
149                {
150                    throw new HessianProtocolException("'" + (char) code + "' is an unknown code");
151                }
152            }
153            catch (HessianProtocolException e)
154            {
155                throw new HessianRuntimeException(e);
156            }
157            finally
158            {
159                session.close();
160                session.getConnection().close();
161            }
162        }
163    
164        private Session openSession() throws IOException
165        {
166            Connection conn = _factory.openConnection();
167    
168            Session session = conn.createSession(0);
169            session.setAutoSync(true);
170    
171            return session;
172        }
173    
174        /**
175         * Check if the specified queue exists.
176         * 
177         * @param session
178         * @param name
179         */
180        private boolean checkQueue(Session session, String name)
181        {
182            org.apache.qpid.transport.Future<QueueQueryResult> future = session.queueQuery(name);
183            QueueQueryResult result = future.get();
184            return result.hasQueue();
185        }
186    
187        private Future<MessageTransfer> sendRequest(Session session, Method method, Object[] args) throws IOException
188        {
189            // check if the request queue exists
190            String requestQueue = getRequestQueue(method.getDeclaringClass());
191            org.apache.qpid.transport.Future<QueueQueryResult> future = session.queueQuery(requestQueue);
192            QueueQueryResult result = future.get();
193    
194            if (!checkQueue(session, getRequestQueue(method.getDeclaringClass())))
195            {
196                throw new HessianRuntimeException("Service queue not found: " + requestQueue);
197            }
198            
199            // create the temporary queue for the response
200            String replyQueue = "temp." + UUID.randomUUID();
201            createQueue(session, replyQueue);
202    
203            byte[] payload = createRequestBody(method, args);
204    
205            DeliveryProperties deliveryProps = new DeliveryProperties();
206            deliveryProps.setRoutingKey(requestQueue);
207    
208            MessageProperties messageProperties = new MessageProperties();
209            messageProperties.setReplyTo(new ReplyTo("amq.direct", replyQueue));
210            messageProperties.setContentType("x-application/hessian");
211            if (_factory.isCompressed())
212            {
213                messageProperties.setContentEncoding("deflate");
214            }
215    
216            ResponseListener listener = new ResponseListener();
217            session.setSessionListener(listener);
218            
219            session.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, new Header(deliveryProps, messageProperties), payload);
220            session.sync();
221    
222            return listener.getResponse();
223        }
224    
225        /**
226         * Return the name of the request queue for the service.
227         */
228        private String getRequestQueue(Class cls)
229        {
230            String requestQueue = cls.getSimpleName();
231            if (_factory.getQueuePrefix() != null)
232            {
233                requestQueue = _factory.getQueuePrefix() + "." + requestQueue;
234            }
235            
236            return requestQueue;
237        }
238    
239        /**
240         * Create an exclusive queue.
241         * 
242         * @param session
243         * @param name    the name of the queue
244         */
245        private void createQueue(Session session, String name)
246        {
247            session.queueDeclare(name, null, null, Option.EXCLUSIVE, Option.AUTO_DELETE);
248            session.exchangeBind(name, "amq.direct", name, null);
249            session.messageSubscribe(name, name, MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, null, 0, null);
250    
251            // issue credits
252            session.messageFlow(name, MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT);
253            session.messageFlow(name, MessageCreditUnit.MESSAGE, Session.UNLIMITED_CREDIT);
254    
255            session.sync();
256        }
257    
258        private static class ResponseListener implements SessionListener
259        {
260            boolean done = false;
261    
262            private AsyncResponse<MessageTransfer> response = new AsyncResponse<MessageTransfer>();
263    
264            public Future<MessageTransfer> getResponse()
265            {
266                return response;
267            }
268    
269            public void opened(Session session) { }
270            public void resumed(Session session) { }
271            public void exception(Session session, SessionException exception) { }
272            public void closed(Session session) { }
273    
274            public void message(Session session, MessageTransfer xfr)
275            {
276                if (!response.isDone())
277                {
278                    session.setSessionListener(null);
279                    response.set(xfr);
280                    done = true;
281                }
282                
283                session.processed(xfr);
284            }
285        }
286    
287        private byte[] createRequestBody(Method method, Object[] args) throws IOException
288        {
289            String methodName = method.getName();
290            
291            if (_factory.isOverloadEnabled() && args != null && args.length > 0)
292            {
293                methodName = AbstractSkeleton.mangleName(method, false);
294            }
295            
296            ByteArrayOutputStream payload = new ByteArrayOutputStream(256);
297            OutputStream os;
298            if (_factory.isCompressed())
299            {
300                Deflater deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true);
301                os = new DeflaterOutputStream(payload, deflater);
302            }
303            else
304            {
305                os = payload;
306            }
307            
308            AbstractHessianOutput out = _factory.getHessianOutput(os);
309            
310            out.call(methodName, args);
311            if (os instanceof DeflaterOutputStream)
312            {
313                ((DeflaterOutputStream) os).finish();
314            }
315            out.flush();
316            
317            return payload.toByteArray();
318        }
319    }