文章资料-情感.机器.认知-电子AI 游客
Netty入门之WebSocket初体验
【8651】by1 2020-03-21 最后编辑2020-03-21 09:53:28 浏览1624

说一说IO通信

BIO通信:

BIO即同步阻塞模式一请求一应答的通信模型,该模型最大的问题就是缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程个数和客户端并发访问数呈1:1的正比关系,由于线程是JAVA虚拟机非常宝贵的系统资源,当线程数膨胀之后,系统的性能将急剧下降,随着并发访问量的继续增大,系统会发生线程堆栈溢出、创建新线程失败等问题,并最终导致进程宕机或者僵死,不能对外提供服务。

BIO的服务端通信模型:

一个线程处理一个Socket连接,因为Java Socket是通过InputStream和OutputStream来进行网络读写操作,而这俩个的读写都是阻塞模式,所以当某个Socket链路的读写操作没有完成时,排在后面的Socket连接是无法得到处理的,长时间的等待可能会导致超时,因此在同步阻塞模式下,通常会采用一个Socket链路独占一个线程的模型。

BIO通信模型图:
Netty入门之WebSocket初体验


伪异步IO通信(BIO优化版本):

为了解决同步阻塞IO(BIO)所面临的一个链路需要一个线程处理的问题,后来有人对它的线程模型进行了优化,后端通过一个线程池来处理多个客户端的请求接入,形成客户端个数M:线程池最大线程数N的比例关系,其中M可以远远大于N,通过线程池可以灵活的调配线程资源,设置线程的最大值,防止由于海量并发接入导致线程耗尽。

伪异步IO通信特性:

当有新的客户端接入的时候,将客户端的Socket封装成一个Task(该任务实现java.lang.Runnable接口)投递到后端的线程池中进行处理,JDK的线程池维护一个消息队列和N个活跃线程对消息队列中的任务进行处理。由于线程池可以设置消息队列的大小和最大线程数,因此,它的资源占用是可控的,无论多少个客户端并发访问,都不会导致资源的耗尽和宕机。

但是伪异步IO通信也有其缺陷,当有大量客户端请求的时候,随着并发访问量的增长,伪异步IO就会造成线程池阻塞。

伪异步IO通信模型图:
Netty入门之WebSocket初体验


NIO通信:

NIO是非阻塞IO(Non-block IO),也有人称之为New IO,因为它相对于之前的IO类库是新增的,所以被称为New IO,这是它的官方叫法。它是在 JDK 1.4 中引入的。NIO 弥补了原来同步阻塞I/O 的不足,它在标准 Java 代码中提供了高速的、面向块的 I/O。通过定义包含数据的类,以及通过以块的形式处理这些数据,NIO 不用使用本机代码就可以利用底层优化,这是原来的 I/O 包所无法做到的。

NIO之缓冲区Buffer:

我们首先介绍缓冲区(Buffer)的概念,Buffer 是一个对象, 它包含一些要写入或者要读出的数据。 在 NIO类库 中加入 Buffer 对象,体现了新库与原 I/O 的一个重要区别。在面向流的 I/O 中,我们将数据直接写入或者将数据直接读到 Stream 对象中。

在 NIO 库中,所有数据都是用缓冲区进行处理的。在读取数据时,它是直接读到缓冲区中;在写入数据时,它也是写入到缓冲区中。任何时候访问 NIO 中的数据,我们都是通过缓冲区进行读写操作。

缓冲区实质上是一个数组。通常它是一个字节数组(ByteBuffer),也可以使用其它种类的数组。但是一个缓冲区不仅仅是一个数组,缓冲区提供了对数据的结构化访问,及维护读写位置(limit)等信息。

最常用的缓冲区是ByteBuffer,一个ByteBuffer提供了一组功能用于操作byte数组。除了ByteBuffer,还有其它的一些缓冲区,事实上,每一种Java基本类型(除了Boolean类型)都对应有一种缓冲区,如下所示:

  • ByteBuffer:字节缓冲区

  • CharBuffer:字符缓冲区

  • ShortBuffer:短整型缓冲区

  • IntBuffer:整型缓冲区

  • LongBuffer:长整型缓冲区

  • FloatBuffer:浮点型缓冲区

  • DoubleBuffer:双精度浮点型缓冲区

缓冲区的类图继承关系如下所示:
Netty入门之WebSocket初体验

NIO之通道Channel:

Channel是一个通道,可以通过它读取和写入数据,它就像自来水管一样,网络数据通过Channel读取和写入。通道与流的不同之处在于通道是双向的。而流只是在一个方向上移动(一个流必须是 InputStream 或者 OutputStream 的子类),而通道可以用于读、写或者同时用于读写。

NIO之多路复用器Selector:

它是JAVA NIO编程的基础,熟练的掌握Selector对于掌握NIO编程至关重要。多路复用器提供选择已经就绪的任务的能力。简单来讲,Selector会不断的轮询注册在其上的Channel,如果某个Channel上面有新的TCP连接接入、读和写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合进行后续的IO操作。

一个多路复用器Selector可以同时轮询多个Channel,由于JDK使用了epoll()代替传统的select实现,所以它并没有最大连接句柄1024/2048的限制。这也就意味着只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端,这的确是一个巨大的改进。


AIO通信:

与NIO不同,aio需要一个连接注册读写事件和回调方法,当进行读写操作时,只须直接调用API的read或write方法即可。这两种方法均为异步的,对于读操作而言,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,并通知应用程序;对于写操作而言,当操作系统将write方法传递的流写入完毕时,操作系统主动通知应用程序。 即可以理解为,read/write方法都是异步的,完成后会主动调用回调函数。

AIO异步通道提供了两种方式获取操作结果:

  1. 通过java.util.concurrent.Future类来表示异步操作的结果

  2. 在执行异步操作的时候传入一个java.nio.channels.CompletionHandler接口的实现类作为操作完成的回调。 AIO的异步套接字通道是真正的异步非阻塞IO,对应于UNIX网络编程中的事件驱动IO(AIO),它不需要通过多路复用器(Selector)对注册的通道进行轮询操作即可实现异步读写,从而简化了NIO的编程模型。

AIO通信的特性:


四种IO对比:

-同步阻塞I/O(BIO)伪异步I/O非阻塞I/O(NIO)异步I/O(AIO)
客户端个数:IO线程1 : 1M : N(其中M可以大于N)M : 1(1个IO线程处理多个客户端连接)M : 0(不需要启动额外的IO线程,被动回调)
IO类型(阻塞)阻塞IO阻塞IO非阻塞IO非阻塞IO
IO类型(同步)同步IO同步IO同步IO(IO多路复用)异步IO
API使用难度简单简单非常复杂复杂
调试难度简单简单复杂复杂
可靠性非常差
吞吐量

Netty入门

在开始本节之前,我先讲一个亲身经历的故事:曾经有两个项目组同时用到了NIO编程技术,一个项目组选择自己开发NIO服务端,直接使用JDK原生的API,结果2个多月过去了,他们的NIO服务端始终无法稳定,问题频出。由于NIO通信是它们的核心组件之一,因此,项目的进度受到了严重的影响,领导对此非常恼火。另一个项目组直接使用Netty作为NIO服务端,业务的定制开发工作量非常小,测试表明,功能和性能都完全达标,项目组几乎没有在NIO服务端上花费额外的时间和精力,项目进展也非常顺利。

这两个项目组的不同遭遇提醒我们:开发出高质量的NIO程序并不是一件简单的事情,除去NIO固有的复杂性和BUG不谈,作为一个NIO服务端需要能够处理网络的闪断、客户端的重复接入、客户端的安全认证、消息的编解码、半包读写等等,如果你没有足够的NIO编程经验积累,一个NIO框架的稳定往往需要半年甚至更长的时间。更为糟糕的是一旦在生产环境中发生问题,往往会导致跨节点的服务调用中断,严重的可能会导致整个集群环境都不可用,需要重启服务器,这种非正常停机会带来巨大的损失。

从可维护性角度看,由于NIO采用了异步非阻塞编程模型,而且是一个IO线程处理多条链路,它的调试和跟踪非常麻烦,特别是生产环境中的问题,我们无法有效调试和跟踪,往往只能靠一些日志来辅助分析,定位难度很大。

不选择JAVA原生NIO编程的原因:

  1. NIO的类库和API繁杂,使用麻烦,你需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等

  2. 需要具备其它的额外技能做铺垫,例如熟悉Java多线程编程,因为NIO编程涉及到Reactor模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的NIO程序

  3. 可靠性能力补齐,工作量和难度都非常大。例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等等,NIO编程的特点是功能开发相对容易,但是可靠性能力补齐工作量和难度都非常大

  4. JDK NIO的BUG,例如臭名昭著的epoll bug,它会导致Selector空轮询,最终导致CPU占用100%。官方声称在JDK1.6版本的update18修复了该问题,但是直到JDK1.7版本该问题仍旧存在,只不过该bug发生概率降低了一些而已,它并没有被根本解决。该BUG以及与该BUG相关的问题单如下:

异常堆栈如下:

java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
        - locked <0x0000000750928190> (a sun.nio.ch.Util$2)
        - locked <0x00000007509281a8> (a java.util.Collections$UnmodifiableSet)
        - locked <0x0000000750946098> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
        at net.spy.memcached.MemcachedConnection.handleIO(MemcachedConnection.java:217)
        at net.spy.memcached.MemcachedConnection.run(MemcachedConnection.java:836)

由于上述原因,在大多数场景下,我不建议大家直接使用JDK的NIO类库,除非你精通NIO编程或者有特殊的需求,在绝大多数的业务场景中,我们可以使用NIO框架Netty来进行NIO编程,它既可以作为客户端也可以作为服务端,同时支持UDP和异步文件传输,功能非常强大。

为什么选择Netty:

Netty是业界最流行的NIO框架之一,它的健壮性、功能、性能、可定制性和可扩展性在同类框架中都是首屈一指的,它已经得到成百上千的商用项目验证,例如Hadoop的RPC框架avro使用Netty作为底层通信框架。很多其它业界主流的RPC框架,也使用Netty来构建高性能的异步通信能力。

通过对Netty的分析,我们将它的优点总结如下:

  1. API使用简单,开发门槛低;

  2. 功能强大,预置了多种编解码功能,支持多种主流协议;

  3. 定制能力强,可以通过ChannelHandler对通信框架进行灵活的扩展;

  4. 性能高,通过与其它业界主流的NIO框架对比,Netty的综合性能最优;

  5. 成熟、稳定,Netty修复了已经发现的所有JDK NIO BUG,业务开发人员不需要再为NIO的BUG而烦恼;

  6. 社区活跃,版本迭代周期短,发现的BUG可以被及时修复,同时,更多的新功能会被加入;

  7. 经历了大规模的商业应用考验,质量已经得到验证。在互联网、大数据、网络游戏、企业应用、电信软件等众多行业得到成功商用,证明了它可以完全满足不同行业的商业应用。

正是因为这些优点,Netty逐渐成为Java NIO编程的首选框架。


WebSocket入门

WebSocket 是什么?

为什么需要 WebSocket ?

了解计算机网络协议的人,应该都知道:HTTP 协议是一种无状态的、无连接的、单向的应用层协议。它采用了请求/响应模型。通信请求只能由客户端发起,服务端对请求做出应答处理。这种通信模型有一个弊端:HTTP 协议无法实现服务器主动向客户端发起消息。

这种单向请求的特点,注定了如果服务器有连续的状态变化,客户端要获知就非常麻烦。大多数 Web 应用程序将通过频繁的异步JavaScript和XML(AJAX)请求实现长轮询。轮询的效率低,非常浪费资源(因为必须不停连接,或者 HTTP 连接始终打开)。
Netty入门之WebSocket初体验

因此,工程师们一直在思考,有没有更好的方法。WebSocket 就是这样发明的。WebSocket 连接允许客户端和服务器之间进行全双工通信,以便任一方都可以通过建立的连接将数据推送到另一端。WebSocket 只需要建立一次连接,就可以一直保持连接状态。这相比于轮询方式的不停建立连接显然效率要大大提高。
Netty入门之WebSocket初体验

WebSocket建立连接步骤:

  1. 客户端发起握手请求

  2. 服务端响应请求

  3. 连接建立

WebSocket的优点:

WebSocket生命周期:

  1. 打开事件:@OnOpen 此事件发生在端点上建立新连接时并且在任何其他事件发生之前

  2. 消息事件:@OnMessage 此事件接收WebSocket对话中另一端发送的消息。

  3. 错误事件:@OnError 此事件在WebSocket连接或者端点发生错误时产生

  4. 关闭事件:@OnClose 此事件表示WebSocket端点的连接目前部分地关闭,它可以由参与连接的任意一个端点发出

WebSocket关闭连接的两种方式:

  1. 服务器关闭底层TCP连接

  2. 客户端发起TCP Close

参考:

https://www.cnblogs.com/jingmoxukong/p/7755643.html
http://www.ruanyifeng.com/blog/2017/05/websocket.html
https://www.cnblogs.com/fuqiang88/p/5956363.html


使用Netty实现WebSocket服务端

功能介绍:

pom.xml文件配置的依赖项如下:

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.zero.netty</groupId>
    <artifactId>websocket</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!-- netty -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.31.Final</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.22</version>
        </dependency>

        <!-- logger -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>
    </dependencies></project>

定义一个全局配置类:

package org.zero.netty.websocket.config;import io.netty.channel.group.ChannelGroup;import io.netty.channel.group.DefaultChannelGroup;import io.netty.util.concurrent.GlobalEventExecutor;/**
 * @program: Netty-WebSocket
 * @description: 工程的全局配置类
 * @author: 01
 * @create: 2018-11-03 17:28
 **/public class NettyConfig {    /**
     * 存储每一个客户端接入进来时的channel对象
     */
    public final static ChannelGroup GROUP =            new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}

创建一个接口,用作项目中的全局常量定义:

package org.zero.netty.websocket.config;/**
 * 项目中的全局常量定义
 *
 * @author 01
 */public interface Constants {
    String WEB_SOCKET_URL = "ws://localhost:8080/websocket";
    String WEBSOCKET_STR = "websocket";
    String UPGRADE_STR = "Upgrade";    int OK_CODE = 200;

    String HTTP_CODEC = "http-codec";
    String AGGREGATOR = "aggregator";
    String HTTP_CHUNKED = "http-chunked";
    String HANDLER = "handler";    int MAX_CONTENT_LENGTH = 65536;    int PORT = 8080;
}

编写接收处理并响应客户端WebSocket请求的核心业务处理类,代码如下:

package org.zero.netty.websocket.core;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.*;import io.netty.handler.codec.http.*;import io.netty.handler.codec.http.websocketx.*;import io.netty.util.CharsetUtil;import lombok.extern.slf4j.Slf4j;import org.zero.netty.websocket.config.Constants;import org.zero.netty.websocket.config.NettyConfig;import java.util.Date;/**
 * @program: Netty-WebSocket
 * @description: 接收处理并响应客户端WebSocket请求的核心业务处理类
 * @author: 01
 * @create: 2018-11-03 17:34
 **/@Slf4j@ChannelHandler.Sharablepublic class MyWebsocketHandler extends SimpleChannelInboundHandler<Object> {    private WebSocketServerHandshaker handshaker;    /**
     * 服务端处理客户端WebSocket请求的核心方法
     *
     * @param ctx ctx
     * @param msg msg
     * @throws Exception Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {        // 处理客户端向服务端发起http握手请求的业务
        if (msg instanceof FullHttpRequest) {
            handHttpRequest(ctx, (FullHttpRequest) msg);
        }        // 处理websocket连接
        else if (msg instanceof WebSocketFrame) {
            handWebsocketFrame(ctx, (WebSocketFrame) msg);
        }
    }    /**
     * 处理客户端与服务端之间的websocket业务
     *
     * @param ctx   ctx
     * @param frame frame
     */
    private void handWebsocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {        // 判断是否是关闭websocket的指令
        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(), ((CloseWebSocketFrame) frame).retain());
            log.debug("接收到关闭websocket的指令");
        }        // 判断是否是ping消息
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            log.debug("接收到ping消息");            return;
        }        // 判断是否是二进制消息,如果是二进制消息,则抛出异常
        if (!(frame instanceof TextWebSocketFrame)) {
            log.error("目前不支持二进制消息");            throw new UnsupportedOperationException("【" + this.getClass().getName() + "】不支持的消息");
        }        // 获取客户端向服务端发送的消息
        String requestStr = ((TextWebSocketFrame) frame).text();
        log.debug("服务端收到客户端的消息: {}", requestStr);        // 返回应答消息
        String responseStr = new Date().toString()
                + ctx.channel().id() +                " ===>>> " + requestStr;
        TextWebSocketFrame tws = new TextWebSocketFrame(responseStr);        // 群发,服务端向每个连接上来的客户端群发消息
        NettyConfig.GROUP.writeAndFlush(tws);
        log.debug("群发消息完成. 群发的消息为: {}", responseStr);
    }    /**
     * 处理客户端向服务端发起http握手请求的业务
     *
     * @param ctx     ctx
     * @param request request
     */
    private void handHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {
        String upgrade = request.headers().get(Constants.UPGRADE_STR);        // 非websocket的http握手请求处理
        if (!request.decoderResult().isSuccess() || !Constants.WEBSOCKET_STR.equals(upgrade)) {
            sendHttpResponse(ctx, request,                    new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            log.warn("非websocket的http握手请求");            return;
        }

        WebSocketServerHandshakerFactory wsFactory =                new WebSocketServerHandshakerFactory(Constants.WEB_SOCKET_URL, null, false);
        handshaker = wsFactory.newHandshaker(request);        if (handshaker == null) {            // 响应不支持的请求
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
            log.warn("不支持的请求");
        } else {
            handshaker.handshake(ctx.channel(), request);
            log.debug("正常处理");
        }
    }    /**
     * 服务端主动向客户端发送消息
     *
     * @param ctx      ctx
     * @param request  request
     * @param response response
     */
    private void sendHttpResponse(ChannelHandlerContext ctx,
                                  FullHttpRequest request,
                                  DefaultFullHttpResponse response) {        // 不成功的响应
        if (response.status().code() != Constants.OK_CODE) {
            ByteBuf buf = Unpooled.copiedBuffer(response.status().toString(), CharsetUtil.UTF_8);
            response.content().writeBytes(buf);
            buf.release();
            log.warn("不成功的响应");
        }        // 服务端向客户端发送数据
        ChannelFuture channelFuture = ctx.channel().writeAndFlush(response);        if (!HttpUtil.isKeepAlive(request) ||
                response.status().code() != Constants.OK_CODE) {            // 如果是非Keep-Alive,或不成功都关闭连接
            channelFuture.addListener(ChannelFutureListener.CLOSE);
            log.info("websocket连接关闭");
        }
    }    /**
     * 客户端与服务端创建连接的时候调用
     *
     * @param ctx ctx
     * @throws Exception Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {        // 将channel添加到channel group中
        NettyConfig.GROUP.add(ctx.channel());
        log.info("客户端与服务端连接开启...");
    }    /**
     * 客户端与服务端断开连接的时候调用
     *
     * @param ctx ctx
     * @throws Exception Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {        // 从channel group中移除这个channel
        NettyConfig.GROUP.remove(ctx.channel());
        log.info("客户端与服务端关闭连接...");
    }    /**
     * 服务端接收客户端发送过来的数据结束之后调用
     *
     * @param ctx ctx
     * @throws Exception Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        // 清空数据
        ctx.flush();

        log.info("flush数据 {}", ctx.name());
    }    /**
     * 工程出现异常的时候调用
     *
     * @param ctx   ctx
     * @param cause cause
     * @throws Exception Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        // 打印异常堆栈
        cause.printStackTrace();        // 主动关闭连接
        ctx.close();
        log.error("WebSocket连接异常");
    }
}

定义一个初始化类,用于初始化连接时的各个组件。代码如下:

package org.zero.netty.websocket.core;import io.netty.channel.ChannelInitializer;import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.http.*;import io.netty.handler.stream.ChunkedWriteHandler;import org.zero.netty.websocket.config.Constants;/**
 * @program: Netty-WebSocket
 * @description: 初始化连接时的各个组件
 * @author: 01
 * @create: 2018-11-03 21:53
 **/public class MyWebsocketChannelHandler extends ChannelInitializer<SocketChannel> {    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(Constants.HTTP_CODEC, new HttpServerCodec());
        ch.pipeline().addLast(Constants.AGGREGATOR, new HttpObjectAggregator(Constants.MAX_CONTENT_LENGTH));
        ch.pipeline().addLast(Constants.HTTP_CHUNKED, new ChunkedWriteHandler());
        ch.pipeline().addLast(Constants.HANDLER, new MyWebsocketHandler());
    }
}

最后我们还需要编写程序的启动类,负责启动应用。代码如下:

package org.zero.netty.websocket;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.Channel;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import lombok.extern.slf4j.Slf4j;import org.zero.netty.websocket.config.Constants;import org.zero.netty.websocket.core.MyWebsocketChannelHandler;/**
 * @program: Netty-WebSocket
 * @description: 程序的入口,负责启动应用
 * @author: 01
 * @create: 2018-11-03 22:06
 **/@Slf4jpublic class Main {    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workGroup);
            bootstrap.channel(NioServerSocketChannel.class);
            bootstrap.childHandler(new MyWebsocketChannelHandler());
            log.info("服务端开启等待客户端连接...");

            Channel channel = bootstrap.bind(Constants.PORT).sync().channel();
            channel.closeFuture().sync();
        } catch (Exception e) {
            log.error("服务端启动失败", e);
        } finally {            // 退出程序
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
            log.info("服务端已关闭");
        }
    }
}

HTML实现客户端

在上一小节中,我们完成了服务端的开发,这一小节我们来编写一个简单的html网页作为我们的客户端。代码如下:

<!DOCTYPE html><html lang="en"><head>
    <meta charset="UTF-8" http-equiv="Content-Type" content="text/html;charset=utf-8">
    <title>WebSocket客户端</title>
    <script type="text/javascript">
        var socket;        if (!window.WebSocket) {            window.WebSocket = window.MozWebSocket;
        }        if (window.WebSocket) {
            socket = new WebSocket("ws://localhost:8080/websocket");
            socket.onmessage = function (ev) {                var ta = document.getElementById('responseContent');
                ta.value += ev.data + "\r\n";
            };

            socket.onopen = function (ev) {                var ta = document.getElementById('responseContent');
                ta.value = "您当前的浏览器支持WebSocket, 请进行后续操作\r\n";
            };

            socket.onclose = function (ev) {                var ta = document.getElementById('responseContent');
                ta.value = "WebSocket连接已经关闭\r\n";
            };

            socket.onerror = function (ev) {                var ta = document.getElementById('responseContent');
                ta.value = ev.data + "WebSocket连接异常\r\n";
            };
        } else {
            alert("您的浏览器不支持WebSocket");
        }        function send(message) {            if (!window.WebSocket) {                return;
            }            if (socket.readyState === WebSocket.OPEN) {
                socket.send(message);
            } else {
                alert("WebSocket连接建立失败, 请重试");                console.log(socket.readyState)
            }
        }    </script></head><body><form onsubmit="return false;">
    <label for="message">
        <input type="text" id="message" name="message" value=""/>
    </label>
    <br><br>
    <input type="button" value="发送WebSocket请求消息" onclick="send(this.form.message.value)"/>
    <br><br>
    <hr color="red" size="5">
    <h2>客户端接收到服务端返回的应答消息: </h2>
    <label for="responseContent">
        <textarea id="responseContent" style="width: 1024px;height: 300px"></textarea>
    </label></form></body></html>

在浏览器中打开,效果如下:
Netty入门之WebSocket初体验

启动服务端,刷新网页:
Netty入门之WebSocket初体验

发送消息:
Netty入门之WebSocket初体验

至此我们就成功使用netty完成了一个websocket通信的demo,该demo源码的GitHub地址如下:

https://github.com/Binary-ZeroOne/netty-websocket-demo

©著作权归作者所有:来自51CTO博客作者ZeroOne01的原创作品