001    /**
002     * Copyright 2010 Emmanuel Bourg
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     *     http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    
017    package org.apache.qpid.contrib.hessian;
018    
019    import java.io.IOException;
020    import java.io.InputStream;
021    import java.io.OutputStream;
022    import java.io.PrintWriter;
023    import java.lang.reflect.Proxy;
024    import java.net.MalformedURLException;
025    import java.net.URI;
026    import java.net.URISyntaxException;
027    import java.util.regex.Matcher;
028    import java.util.regex.Pattern;
029    
030    import com.caucho.hessian.io.AbstractHessianInput;
031    import com.caucho.hessian.io.AbstractHessianOutput;
032    import com.caucho.hessian.io.Hessian2Input;
033    import com.caucho.hessian.io.Hessian2Output;
034    import com.caucho.hessian.io.HessianDebugInputStream;
035    import com.caucho.hessian.io.HessianInput;
036    import com.caucho.hessian.io.HessianOutput;
037    import com.caucho.hessian.io.HessianRemoteResolver;
038    import com.caucho.hessian.io.SerializerFactory;
039    import org.apache.qpid.transport.Connection;
040    
041    /**
042     * Factory for creating Hessian client stubs. The returned stub will
043     * call the remote object for all methods.
044     *
045     * <pre>
046     * HelloHome hello = (HelloHome) factory.create(HelloHome.class, null);
047     * </pre>
048     *
049     * After creation, the stub can be like a regular Java class. Because
050     * it makes remote calls, it can throw more exceptions than a Java class.
051     * In particular, it may throw protocol exceptions.
052     * 
053     * This class is derived from {@link com.caucho.hessian.client.HessianProxyFactory}. 
054     * 
055     * @author Emmanuel Bourg
056     * @author Scott Ferguson
057     */
058    public class AMQPHessianProxyFactory /*implements ServiceProxyFactory*/
059    {
060        private SerializerFactory _serializerFactory;
061        private HessianRemoteResolver _resolver;
062    
063        private String user;
064        private String password;
065        private String hostname;
066        private String virtualhost;
067        private int port = 5672;
068        private boolean ssl = false;
069        
070        private String queuePrefix;
071    
072        private boolean isOverloadEnabled = false;
073    
074        private boolean isHessian2Reply = true;
075        private boolean isHessian2Request = true;
076    
077        private boolean debug = false;
078    
079        private long readTimeout = -1;
080        private long connectTimeout = -1;
081        
082        private boolean compressed;
083    
084        /**
085         * Creates the new proxy factory.
086         */
087        public AMQPHessianProxyFactory()
088        {
089            _resolver = new HessianRemoteResolver()
090            {
091                public Object lookup(String type, String url) throws IOException
092                {
093                    ClassLoader loader = Thread.currentThread().getContextClassLoader();
094    
095                    try
096                    {
097                        Class<?> api = Class.forName(type, false, loader);
098                        return create(api, url);
099                    }
100                    catch (Exception e)
101                    {
102                        throw new IOException(String.valueOf(e));
103                    }
104                }
105            };
106        }
107    
108        /**
109         * Sets the user connecting to the AMQP server.
110         */
111        public void setUser(String user)
112        {
113            this.user = user;
114        }
115    
116        /**
117         * Sets the password of the user connecting to the AMQP server.
118         */
119        public void setPassword(String password)
120        {
121            this.password = password;
122        }
123    
124        /**
125         * Returns the prefix of the queue that receives the hessian requests.
126         */
127        public String getQueuePrefix()
128        {
129            return queuePrefix;
130        }
131    
132        /**
133         * Sets the prefix of the queue that receives the hessian requests.
134         */
135        public void setQueuePrefix(String queuePrefix)
136        {
137            this.queuePrefix = queuePrefix;
138        }
139    
140        /**
141         * Sets the debug mode.
142         */
143        public void setDebug(boolean isDebug)
144        {
145            this.debug = isDebug;
146        }
147    
148        /**
149         * Gets the debug mode.
150         */
151        public boolean isDebug()
152        {
153            return debug;
154        }
155    
156        /**
157         * Returns true if overloaded methods are allowed (using mangling)
158         */
159        public boolean isOverloadEnabled()
160        {
161            return isOverloadEnabled;
162        }
163    
164        /**
165         * set true if overloaded methods are allowed (using mangling)
166         */
167        public void setOverloadEnabled(boolean isOverloadEnabled)
168        {
169            this.isOverloadEnabled = isOverloadEnabled;
170        }
171    
172        /**
173         * Returns the socket timeout on requests in milliseconds.
174         */
175        public long getReadTimeout()
176        {
177            return readTimeout;
178        }
179    
180        /**
181         * Sets the socket timeout on requests in milliseconds.
182         */
183        public void setReadTimeout(long timeout)
184        {
185            readTimeout = timeout;
186        }
187    
188        /**
189         * Returns the socket timeout on connect in milliseconds.
190         */
191        public long getConnectTimeout()
192        {
193            return connectTimeout;
194        }
195    
196        /**
197         * Sets the socket timeout on connect in milliseconds.
198         */
199        public void setConnectTimeout(long _connecTimeout)
200        {
201            this.connectTimeout = _connecTimeout;
202        }
203    
204        /**
205         * Indicates if the requests/responses should be compressed.
206         */
207        public boolean isCompressed()
208        {
209            return compressed;
210        }
211    
212        /**
213         * Specifies if the requests/responses should be compressed.
214         */
215        public void setCompressed(boolean compressed) 
216        {
217            this.compressed = compressed;
218        }
219    
220        /**
221         * True if the proxy can read Hessian 2 responses.
222         */
223        public void setHessian2Reply(boolean isHessian2)
224        {
225            isHessian2Reply = isHessian2;
226        }
227    
228        /**
229         * True if the proxy should send Hessian 2 requests.
230         */
231        public void setHessian2Request(boolean isHessian2)
232        {
233            isHessian2Request = isHessian2;
234    
235            if (isHessian2)
236            {
237                isHessian2Reply = true;
238            }
239        }
240    
241        /**
242         * Returns the remote resolver.
243         */
244        public HessianRemoteResolver getRemoteResolver()
245        {
246            return _resolver;
247        }
248    
249        /**
250         * Sets the serializer factory.
251         */
252        public void setSerializerFactory(SerializerFactory factory)
253        {
254            _serializerFactory = factory;
255        }
256    
257        /**
258         * Gets the serializer factory.
259         */
260        public SerializerFactory getSerializerFactory()
261        {
262            if (_serializerFactory == null)
263            {
264                _serializerFactory = new SerializerFactory();
265            }
266    
267            return _serializerFactory;
268        }
269    
270        /**
271         * Creates the URL connection.
272         */
273        protected Connection openConnection() throws IOException
274        {
275            Connection conn = new Connection();
276            conn.connect(hostname, port, virtualhost, user, password, ssl);
277    
278            return conn;
279        }
280    
281        /**
282         * Creates a new proxy with the specified URL.  The returned object
283         * is a proxy with the interface specified by api.
284         *
285         * <pre>
286         * String url = "amqp://user:password@localhost:5672/vhost/queue");
287         * HelloHome hello = (HelloHome) factory.create(HelloHome.class, url);
288         * </pre>
289         *
290         * @param api the interface the proxy class needs to implement
291         * @param urlName the URL where the client object is located.
292         * @return a proxy to the object with the specified interface.
293         */
294        public <T> T create(Class<T> api, String urlName) throws MalformedURLException
295        {
296            return create(api, urlName, api.getClassLoader());
297        }
298    
299        /**
300         * Creates a new proxy with the specified URL.  The returned object
301         * is a proxy with the interface specified by api.
302         *
303         * <pre>
304         * String url = "amqp://user:password@localhost:5672/vhost/queue");
305         * HelloHome hello = (HelloHome) factory.create(HelloHome.class, url);
306         * </pre>
307         *
308         * @param api the interface the proxy class needs to implement
309         * @param urlName the URL where the client object is located.
310         * @return a proxy to the object with the specified interface.
311         */
312        @SuppressWarnings("unchecked")
313        public <T> T create(Class<T> api, String urlName, ClassLoader loader) throws MalformedURLException
314        {
315            try
316            {
317                URI uri = new URI(urlName);
318                
319                ssl = "amqps".equals(uri.getScheme());
320                
321                hostname = uri.getHost();
322                if (uri.getPort() != -1)
323                {
324                    port = uri.getPort();
325                }
326    
327                String userinfo = uri.getUserInfo();
328                if (userinfo != null)
329                {
330                    String[] parts = userinfo.split(":");
331                    user = parts[0];
332                    if (parts.length > 0)
333                    {
334                        password = parts[1];
335                    }
336                }
337    
338                Pattern pattern = Pattern.compile("/([^/]+)(/(.*))?");
339                Matcher matcher = pattern.matcher(uri.getPath());
340                if (matcher.matches())
341                {
342                    virtualhost = matcher.group(1);
343                    if (matcher.groupCount() > 1 && matcher.group(3) != null)
344                    {
345                        queuePrefix = matcher.group(3);
346                    }
347                }
348            }
349            catch (URISyntaxException e)
350            {
351                throw (MalformedURLException) new MalformedURLException().initCause(e);
352            }
353            
354            AMQPHessianProxy handler = new AMQPHessianProxy(this);
355    
356            return (T) Proxy.newProxyInstance(loader, new Class[]{api}, handler);
357        }
358    
359        AbstractHessianInput getHessianInput(InputStream is)
360        {
361            return getHessian2Input(is);
362        }
363    
364        AbstractHessianInput getHessian1Input(InputStream is)
365        {
366            AbstractHessianInput in;
367    
368            if (debug)
369            {
370                is = new HessianDebugInputStream(is, new PrintWriter(System.out));
371            }
372    
373            in = new HessianInput(is);
374    
375            in.setRemoteResolver(getRemoteResolver());
376    
377            in.setSerializerFactory(getSerializerFactory());
378    
379            return in;
380        }
381    
382        AbstractHessianInput getHessian2Input(InputStream is)
383        {
384            AbstractHessianInput in;
385    
386            if (debug)
387            {
388                is = new HessianDebugInputStream(is, new PrintWriter(System.out));
389            }
390    
391            in = new Hessian2Input(is);
392    
393            in.setRemoteResolver(getRemoteResolver());
394    
395            in.setSerializerFactory(getSerializerFactory());
396    
397            return in;
398        }
399    
400        AbstractHessianOutput getHessianOutput(OutputStream os)
401        {
402            AbstractHessianOutput out;
403    
404            if (isHessian2Request)
405            {
406                out = new Hessian2Output(os);
407            }
408            else
409            {
410                HessianOutput out1 = new HessianOutput(os);
411                out = out1;
412    
413                if (isHessian2Reply)
414                {
415                    out1.setVersion(2);
416                }
417            }
418    
419            out.setSerializerFactory(getSerializerFactory());
420    
421            return out;
422        }
423    }
424