001 /**
002 * Copyright 2010 Emmanuel Bourg
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017 package org.apache.qpid.contrib.hessian;
018
019 import java.io.IOException;
020 import java.io.InputStream;
021 import java.io.OutputStream;
022 import java.io.PrintWriter;
023 import java.lang.reflect.Proxy;
024 import java.net.MalformedURLException;
025 import java.net.URI;
026 import java.net.URISyntaxException;
027 import java.util.regex.Matcher;
028 import java.util.regex.Pattern;
029
030 import com.caucho.hessian.io.AbstractHessianInput;
031 import com.caucho.hessian.io.AbstractHessianOutput;
032 import com.caucho.hessian.io.Hessian2Input;
033 import com.caucho.hessian.io.Hessian2Output;
034 import com.caucho.hessian.io.HessianDebugInputStream;
035 import com.caucho.hessian.io.HessianInput;
036 import com.caucho.hessian.io.HessianOutput;
037 import com.caucho.hessian.io.HessianRemoteResolver;
038 import com.caucho.hessian.io.SerializerFactory;
039 import org.apache.qpid.transport.Connection;
040
041 /**
042 * Factory for creating Hessian client stubs. The returned stub will
043 * call the remote object for all methods.
044 *
045 * <pre>
046 * HelloHome hello = (HelloHome) factory.create(HelloHome.class, null);
047 * </pre>
048 *
049 * After creation, the stub can be like a regular Java class. Because
050 * it makes remote calls, it can throw more exceptions than a Java class.
051 * In particular, it may throw protocol exceptions.
052 *
053 * This class is derived from {@link com.caucho.hessian.client.HessianProxyFactory}.
054 *
055 * @author Emmanuel Bourg
056 * @author Scott Ferguson
057 */
058 public class AMQPHessianProxyFactory /*implements ServiceProxyFactory*/
059 {
060 private SerializerFactory _serializerFactory;
061 private HessianRemoteResolver _resolver;
062
063 private String user;
064 private String password;
065 private String hostname;
066 private String virtualhost;
067 private int port = 5672;
068 private boolean ssl = false;
069
070 private String queuePrefix;
071
072 private boolean isOverloadEnabled = false;
073
074 private boolean isHessian2Reply = true;
075 private boolean isHessian2Request = true;
076
077 private boolean debug = false;
078
079 private long readTimeout = -1;
080 private long connectTimeout = -1;
081
082 private boolean compressed;
083
084 /**
085 * Creates the new proxy factory.
086 */
087 public AMQPHessianProxyFactory()
088 {
089 _resolver = new HessianRemoteResolver()
090 {
091 public Object lookup(String type, String url) throws IOException
092 {
093 ClassLoader loader = Thread.currentThread().getContextClassLoader();
094
095 try
096 {
097 Class<?> api = Class.forName(type, false, loader);
098 return create(api, url);
099 }
100 catch (Exception e)
101 {
102 throw new IOException(String.valueOf(e));
103 }
104 }
105 };
106 }
107
108 /**
109 * Sets the user connecting to the AMQP server.
110 */
111 public void setUser(String user)
112 {
113 this.user = user;
114 }
115
116 /**
117 * Sets the password of the user connecting to the AMQP server.
118 */
119 public void setPassword(String password)
120 {
121 this.password = password;
122 }
123
124 /**
125 * Returns the prefix of the queue that receives the hessian requests.
126 */
127 public String getQueuePrefix()
128 {
129 return queuePrefix;
130 }
131
132 /**
133 * Sets the prefix of the queue that receives the hessian requests.
134 */
135 public void setQueuePrefix(String queuePrefix)
136 {
137 this.queuePrefix = queuePrefix;
138 }
139
140 /**
141 * Sets the debug mode.
142 */
143 public void setDebug(boolean isDebug)
144 {
145 this.debug = isDebug;
146 }
147
148 /**
149 * Gets the debug mode.
150 */
151 public boolean isDebug()
152 {
153 return debug;
154 }
155
156 /**
157 * Returns true if overloaded methods are allowed (using mangling)
158 */
159 public boolean isOverloadEnabled()
160 {
161 return isOverloadEnabled;
162 }
163
164 /**
165 * set true if overloaded methods are allowed (using mangling)
166 */
167 public void setOverloadEnabled(boolean isOverloadEnabled)
168 {
169 this.isOverloadEnabled = isOverloadEnabled;
170 }
171
172 /**
173 * Returns the socket timeout on requests in milliseconds.
174 */
175 public long getReadTimeout()
176 {
177 return readTimeout;
178 }
179
180 /**
181 * Sets the socket timeout on requests in milliseconds.
182 */
183 public void setReadTimeout(long timeout)
184 {
185 readTimeout = timeout;
186 }
187
188 /**
189 * Returns the socket timeout on connect in milliseconds.
190 */
191 public long getConnectTimeout()
192 {
193 return connectTimeout;
194 }
195
196 /**
197 * Sets the socket timeout on connect in milliseconds.
198 */
199 public void setConnectTimeout(long _connecTimeout)
200 {
201 this.connectTimeout = _connecTimeout;
202 }
203
204 /**
205 * Indicates if the requests/responses should be compressed.
206 */
207 public boolean isCompressed()
208 {
209 return compressed;
210 }
211
212 /**
213 * Specifies if the requests/responses should be compressed.
214 */
215 public void setCompressed(boolean compressed)
216 {
217 this.compressed = compressed;
218 }
219
220 /**
221 * True if the proxy can read Hessian 2 responses.
222 */
223 public void setHessian2Reply(boolean isHessian2)
224 {
225 isHessian2Reply = isHessian2;
226 }
227
228 /**
229 * True if the proxy should send Hessian 2 requests.
230 */
231 public void setHessian2Request(boolean isHessian2)
232 {
233 isHessian2Request = isHessian2;
234
235 if (isHessian2)
236 {
237 isHessian2Reply = true;
238 }
239 }
240
241 /**
242 * Returns the remote resolver.
243 */
244 public HessianRemoteResolver getRemoteResolver()
245 {
246 return _resolver;
247 }
248
249 /**
250 * Sets the serializer factory.
251 */
252 public void setSerializerFactory(SerializerFactory factory)
253 {
254 _serializerFactory = factory;
255 }
256
257 /**
258 * Gets the serializer factory.
259 */
260 public SerializerFactory getSerializerFactory()
261 {
262 if (_serializerFactory == null)
263 {
264 _serializerFactory = new SerializerFactory();
265 }
266
267 return _serializerFactory;
268 }
269
270 /**
271 * Creates the URL connection.
272 */
273 protected Connection openConnection() throws IOException
274 {
275 Connection conn = new Connection();
276 conn.connect(hostname, port, virtualhost, user, password, ssl);
277
278 return conn;
279 }
280
281 /**
282 * Creates a new proxy with the specified URL. The returned object
283 * is a proxy with the interface specified by api.
284 *
285 * <pre>
286 * String url = "amqp://user:password@localhost:5672/vhost/queue");
287 * HelloHome hello = (HelloHome) factory.create(HelloHome.class, url);
288 * </pre>
289 *
290 * @param api the interface the proxy class needs to implement
291 * @param urlName the URL where the client object is located.
292 * @return a proxy to the object with the specified interface.
293 */
294 public <T> T create(Class<T> api, String urlName) throws MalformedURLException
295 {
296 return create(api, urlName, api.getClassLoader());
297 }
298
299 /**
300 * Creates a new proxy with the specified URL. The returned object
301 * is a proxy with the interface specified by api.
302 *
303 * <pre>
304 * String url = "amqp://user:password@localhost:5672/vhost/queue");
305 * HelloHome hello = (HelloHome) factory.create(HelloHome.class, url);
306 * </pre>
307 *
308 * @param api the interface the proxy class needs to implement
309 * @param urlName the URL where the client object is located.
310 * @return a proxy to the object with the specified interface.
311 */
312 @SuppressWarnings("unchecked")
313 public <T> T create(Class<T> api, String urlName, ClassLoader loader) throws MalformedURLException
314 {
315 try
316 {
317 URI uri = new URI(urlName);
318
319 ssl = "amqps".equals(uri.getScheme());
320
321 hostname = uri.getHost();
322 if (uri.getPort() != -1)
323 {
324 port = uri.getPort();
325 }
326
327 String userinfo = uri.getUserInfo();
328 if (userinfo != null)
329 {
330 String[] parts = userinfo.split(":");
331 user = parts[0];
332 if (parts.length > 0)
333 {
334 password = parts[1];
335 }
336 }
337
338 Pattern pattern = Pattern.compile("/([^/]+)(/(.*))?");
339 Matcher matcher = pattern.matcher(uri.getPath());
340 if (matcher.matches())
341 {
342 virtualhost = matcher.group(1);
343 if (matcher.groupCount() > 1 && matcher.group(3) != null)
344 {
345 queuePrefix = matcher.group(3);
346 }
347 }
348 }
349 catch (URISyntaxException e)
350 {
351 throw (MalformedURLException) new MalformedURLException().initCause(e);
352 }
353
354 AMQPHessianProxy handler = new AMQPHessianProxy(this);
355
356 return (T) Proxy.newProxyInstance(loader, new Class[]{api}, handler);
357 }
358
359 AbstractHessianInput getHessianInput(InputStream is)
360 {
361 return getHessian2Input(is);
362 }
363
364 AbstractHessianInput getHessian1Input(InputStream is)
365 {
366 AbstractHessianInput in;
367
368 if (debug)
369 {
370 is = new HessianDebugInputStream(is, new PrintWriter(System.out));
371 }
372
373 in = new HessianInput(is);
374
375 in.setRemoteResolver(getRemoteResolver());
376
377 in.setSerializerFactory(getSerializerFactory());
378
379 return in;
380 }
381
382 AbstractHessianInput getHessian2Input(InputStream is)
383 {
384 AbstractHessianInput in;
385
386 if (debug)
387 {
388 is = new HessianDebugInputStream(is, new PrintWriter(System.out));
389 }
390
391 in = new Hessian2Input(is);
392
393 in.setRemoteResolver(getRemoteResolver());
394
395 in.setSerializerFactory(getSerializerFactory());
396
397 return in;
398 }
399
400 AbstractHessianOutput getHessianOutput(OutputStream os)
401 {
402 AbstractHessianOutput out;
403
404 if (isHessian2Request)
405 {
406 out = new Hessian2Output(os);
407 }
408 else
409 {
410 HessianOutput out1 = new HessianOutput(os);
411 out = out1;
412
413 if (isHessian2Reply)
414 {
415 out1.setVersion(2);
416 }
417 }
418
419 out.setSerializerFactory(getSerializerFactory());
420
421 return out;
422 }
423 }
424