使用Netty代理你的请求

你是否在寻找一个代理来调试你的客户端-服务端的通讯?不需要再寻觅!Netty已经通过示例来展现这个功能了!

Netty是一个NIO C/S框架,能够快速、简单的开发协议服务器和客户端等网络应用。它能够很大程度上简单化、流水线化开发网络应用,例如TCP/UDP socket服务器。你可以在这里找到Netty的指导教程。

在Netty中,实现的代码或者业务逻辑的切入点需要依赖于处理器。处理器基于拦截器模式,类似传统servlet-web应用的过滤器。处理器提供一个事件模型可以让应用监控数据的读取/发送、修改数据、转换数据、根据数据做出响应、等等。总而言之,处理器允许你将关注点完全抽象到不同的类中。

处理器依据指定的顺序添加到pipeline中。这个顺序决定处理器如何、何时被调用。如果一个处理器依赖其他的处理器(例如一个压缩处理器),那么你需要确定pipeline中的这个处理器在其他处理器前面译注:upstream类型事件中需要此顺序)。通常数据是异步读取到系统中,这些数据会被包装为一个ChannelBuffer对象。这个对象根据downstream规则从第一个handler向下传送给其他后面的处理器中(触发有一个处理器选择停止这个流程或者抛出异常)。

下面是Netty文档中的组件结构图:


下面的示例代码介绍了如何使用HexDumpProxy类代理客户端和服务器端之间的请求,示例代码如下:

package org.jboss.netty.example.proxy;
import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;

public class HexDumpProxy {
    public static void main(String[] args) throws Exception {
        // 校验命令行参数。
        if (args.length != 3) {
            System.err.println(
                    "Usage: " + HexDumpProxy.class.getSimpleName() +
                    " <local port> <remote host> <remote port>");
            return;
        }

        // 解析命令行参数。
        int localPort = Integer.parseInt(args[0]);
        String remoteHost = args[1];
        int remotePort = Integer.parseInt(args[2]);
        System.err.println(
                "Proxying *:" + localPort + " to " +
                remoteHost + ':' + remotePort + " ...");
        // 配置引导程序(bootstrap)。
        Executor executor = Executors.newCachedThreadPool();
        ServerBootstrap sb = new ServerBootstrap(
                new NioServerSocketChannelFactory(executor, executor));
        // 设置事件pipeline factory。
        ClientSocketChannelFactory cf =
                new NioClientSocketChannelFactory(executor, executor);
        sb.setPipelineFactory(
                new HexDumpProxyPipelineFactory(cf, remoteHost, remotePort));
        // 启动服务器。
        sb.bind(new InetSocketAddress(localPort));
    }
}

这个类仅仅做了3件事:

  • 启动服务器。
  • 通过ClientSocketChannelFactory创建一个自定义的socket pipeline(HexDumProxyPipeline),并且将这个pipeline放置到服务器中。
  • 将服务器绑定到一个ip上。

下面是HexDumpProxyPipelineFactory,将自己注册到Channel Pipeline的后面 :

package org.jboss.netty.example.proxy;

import static org.jboss.netty.channel.Channels.*;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;

public class HexDumpProxyPipelineFactory implements ChannelPipelineFactory {
    private final ClientSocketChannelFactory cf;
    private final String remoteHost;
    private final int remotePort;

    public HexDumpProxyPipelineFactory(
            ClientSocketChannelFactory cf, String remoteHost, int remotePort) {
        this.cf = cf;
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
    }

    public ChannelPipeline getPipeline() throws Exception {
        ChannelPipeline p = pipeline(); // 注意,这里使用了static import。
        p.addLast("handler", new HexDumpProxyInboundHandler(cf, remoteHost, remotePort));
        return p;
    }
}

HexDumpProxyInboundHandler这个处理器负责管理Channel、输出通过代理获取的16进制数据的代码逻辑  :

package org.jboss.netty.example.proxy;
import java.net.InetSocketAddress;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.*;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;

public class HexDumpProxyInboundHandler extends SimpleChannelUpstreamHandler {
    private final ClientSocketChannelFactory cf;
    private final String remoteHost;
    private final int remotePort;
    private volatile Channel outboundChannel;
    public HexDumpProxyInboundHandler(
            ClientSocketChannelFactory cf, String remoteHost, int remotePort) {
        this.cf = cf;
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
    }
    @Override
    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
            throws Exception {
        // 挂起输入信息,直至连接到远程服务器。
        final Channel inboundChannel = e.getChannel();
        inboundChannel.setReadable(false);
        // 开始尝试连接。
        ClientBootstrap cb = new ClientBootstrap(cf);
        cb.getPipeline().addLast("handler", new OutboundHandler(e.getChannel()));
        ChannelFuture f = cb.connect(new InetSocketAddress(remoteHost, remotePort));
        outboundChannel = f.getChannel();
        f.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    // 尝试连接成功;
                    // 开始接收输入信息。
                    inboundChannel.setReadable(true);
                } else {
                    // 如果尝试连接失败,则关闭连接。
                    inboundChannel.close();
                }
            }
        });
    }
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
            throws Exception {
        ChannelBuffer msg = (ChannelBuffer) e.getMessage();
        System.out.println(">>> " + ChannelBuffers.hexDump(msg));
        outboundChannel.write(msg);
    }
    @Override
    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
            throws Exception {
        if (outboundChannel != null) {
            closeOnFlush(outboundChannel);
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
            throws Exception {
        e.getCause().printStackTrace();
        closeOnFlush(e.getChannel());
    }
    private static class OutboundHandler extends SimpleChannelUpstreamHandler {
        private final Channel inboundChannel;
        OutboundHandler(Channel inboundChannel) {
            this.inboundChannel = inboundChannel;
        }
        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
                throws Exception {
            ChannelBuffer msg = (ChannelBuffer) e.getMessage();
            System.out.println("<<< " + ChannelBuffers.hexDump(msg));
            inboundChannel.write(msg);
        }
        @Override
        public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
                throws Exception {
            closeOnFlush(inboundChannel);
        }
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
                throws Exception {
            e.getCause().printStackTrace();
            closeOnFlush(e.getChannel());
        }
    }
    /**
     * 在所有队列写请求完成(flush)后,关闭指定channel。
     */
    static void closeOnFlush(Channel ch) {
        if (ch.isConnected()) {
            ch.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
        }
    }
}

编译工程

为了编译这些class,需要将JBoss AS 7发行版中的Netty包添加到classpath库中。

如果工程是maven管理的,可以添加如下依赖:

<dependency>
    <groupId>org.jboss.netty</groupId>
    <artifactId>netty</artifactId>
    <version>3.2.0.Final</version>
</dependency>

如果工程是eclipse格式的,需要添加到jar路径中:


传入必需的参数执行main类-例如为了调试JBoss’ HTTP请求(监听8080端口),可以传入参数:8888 127.0.0.1 8080. 现在让浏览器访问127.0.0.1:8888就可以了。


代理文本数据

调试某种类型网络协议,使用代理16进制的数据是很有用的。然而你仅仅需要以文本类型来查看代理数据,你需要调整一下代码,例如传入默认的编码类型,将信息转为字符串。重写inboundChannel和outboundChannel的messageReceived方法,如下:

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
                throws Exception {
            ChannelBuffer msg = (ChannelBuffer) e.getMessage();
            int len = msg.readUnsignedByte();
            int whichClient = msg.readUnsignedShort();
            assert len == msg.readableBytes();
            System.out.println(whichClient +" &amp;lt;&amp;lt;&amp;lt; " + msg.toString(Charset.defaultCharset()));
            inboundChannel.write(msg);
}
. . . .
  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
            throws Exception {
        ChannelBuffer msg = (ChannelBuffer) e.getMessage();
        int len = msg.readUnsignedByte();
        int whichClient = msg.readUnsignedShort();
        assert len == msg.readableBytes();
        System.out.println(whichClient +" &amp;lt;&amp;lt;&amp;lt; " + msg.toString(Charset.defaultCharset()));
       outboundChannel.write(msg);
}

<a href="http://www.importnew.com/?attachment_id=7505" rel="attachment wp-att-7505"><img class="aligncenter size-medium wp-image-7505" title="debug_netty2" src="http://www.importnew.com/wp-content/uploads/2013/12/debug_netty2-300x68.png" alt="" width="300" height="68" /></a>
原文链接: mastertheboss 翻译: ImportNew.com - 刘海波
译文链接: http://www.importnew.com/7496.html
[ 转载请保留原文出处、译者和译文链接。]



相关文章

发表评论

Comment form

(*) 表示必填项

还没有评论。

跳到底部
返回顶部