Netty在AI质检引擎中的实践

需求和设计方案

实时质检的复杂度在于外呼系统数据流的边界切割,为了便于和催收平台对接,我们重新设计了接口,把音频流的切割放在AI平台,外呼系统仅需要将音频流数据通过socket推送到AI质检系统即可。

尽管如此,依然无法完全解决产品需求,Socket Header不能携带更多的质检配置信息,亦不能在数据流中既推送文本配置信息(比如授权、规则集等)又推送流媒体,因此,我们又鉴于此设计了Http接口,上传需要本次催收通话中语音质检的配置信息。

时序图.png

上图,是初步设计的数据流时序图。催收平台通过HTTP接口提交质检所需的数据(appKey,appSecret,strategySign)以及一个全局唯一的会话字段session-uuid,AI质检系统将其保存在本地缓存中,为后续的质检提供服务。之后,催收系统在调用外呼系统时,将session-uuid绑到socket协议的header中,AI系统解析外呼系统推送的socket中header信息,并在本地缓存中查找与之对应的质检配置信息,然后处理音频流,切割成不同的片段质检,并将质检结果通过回调接口传给催收平台(AI平台有质检记录,可通过session-uuid查看)。

上述设计中,参考了智能调查系统。因此,如果催收需要我们回调数据,则需要提供回调接口。

Netty解决粘包和拆包

粘包/拆包

所谓的TCP粘包是指一次会话接收的数据不能完整的体现出一个完整的消息。为何TCP会出现粘包?主要因为TCP是以流的形式在网络中传递数据,在加上网络上的最大传输单元(MTU:Maximum Transmission Unit)往往小于在应用处理的消息,所以就会引发一次处理的消息无法满足消息数据的需要,导致粘包的存在。处理粘包的唯一方式就是自定义应用层的数据通讯协议,通过定制协议来控制现有接收的数据是否满足消息数据的需要。

解决方法

  • 消息定长: 报文大小固定长度,不够空格补全,发送和接收方遵循相同的约定,这样即使粘包了通过接收方编程实现获取定长报文也能区分。
  • 包尾添加特殊分隔符: 例如每条报文结束都添加回车换行符(例如FTP协议)或者指定特殊字符作为报文分隔符,接收方通过特殊分隔符切分报文区分。
  • 将消息分为消息头和消息体:消息头中包含表示信息的总长度(或者消息体长度)的字段。

自定义协议。解决TCP粘包/拆包

自定义协议

 
   SVTP(Session Voice Transport Protocol) 基于TCP协议的应用层协议
   数据包格式
  +——----——-------+——-----————----——-------+——----——+
  |协议开始标志     |  长度                  |   数据  |
  |PT(包类型) 8bit |  PL(包长度)16bit        |        |
  +——----——-------+——-----————----——-------+——----——+
  PT: 0(EMPTY)|1(INFO)|2(DATA)|3(TEXT) 的一种
  PL: 0 ~ 65535 的正整数,代表包正文部分的长度,
  DATA: 包正文(文本或二进制数据流)
 
  1.协议开始标志head_data,为int类型的数据,16进制表示为0X76
  2.传输数据的长度contentLength,int类型
  3.要传输的数据
 

协议标记

PT: 0(EMPTY)|1(INFO)|2(DATA)|3(TEXT) 的一种
定义为枚举类型:

  • EMPTY为空包。
  • INFO在语音数据流开始和结束时发送。
  • DATA为正式的语音流数据包。
  • TEXT为文本数据流。
    public enum SVTPType {
    EMPTY, INFO, DATA, TEXT
    }
    

编解码器

将应用程序的数据转换为网络格式,以及将网络格式转换为应用程序的数据的组件分别叫作编码器和解码器,同时具有这两种功能的单一组件叫作编解码器。Netty 提供了一系列用来创建所有这些编码器、解码器以及编解码器的工具,从专门为知名协议(如 HTTP 以及 Base64)预构建的类,到你可以按需定制的通用的消息转换编解码器,应有尽有。

如果将消息看作是对于特定的应用程序具有具体含义的结构化的字节序列——它的数据。那么编码器是将消息转换为适合于传输的格式(最有可能的就是字节流);而对应的解码器则是将网络字节流转换回应用程序的消息格式。因此,编码器操作出站数据,而解码器处理入站数据。

——《Netty实战》第二部分编解码器
  • 将字节解码为消息——ByteToMessageDecoder和ReplayingDecoder;
  • 将一种消息类型解码为另一种——MessageToMessageDecoder。

Netty解码器示例图

CODE实现

SOCKET服务:

package cn.socket.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * @author zhoujunwen
 * @date 2019-03-26 11:06:31
 */
@Slf4j
@Component
public class AsrTcpServer {
    @Value("${socket.port:8888}")
    private int port;

    private void startServer() {
        // 创建两个线程组, boss处理客户端连接  work进行客服端连接之后的处理
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup work = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();

            // 服务器配置
            bootstrap.group(boss, work).channel(NioServerSocketChannel.class)
                    .childHandler(new SocketChannelInitializer())
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            //绑定端口  开启事件驱动
            log.info("【服务器启动成功========端口:" + port + "】");
            Channel channel = bootstrap.bind(port).sync().channel();
            channel.closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //关闭资源
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }

    }

    @PostConstruct()
    public void init() {
        //需要开启一个新的线程来执行netty server 服务器
        new Thread(() -> startServer()).start();
    }
}

Handler:

package cn.socket.server;

import cn.socket.decoder.OutboundDecoder;
import cn.socket.encoder.OutboundEncoder;
import cn.socket.handler.SocketServerHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

public class SocketChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();

        pipeline
                //HttpServerCodec: 针对http协议进行编解码
//                .addLast("http-codec", new HttpClientCodec())
//                .addLast("http-encoder", new HttpResponseEncoder())
//                .addLast("http-decoder", new HttpRequestDecoder())
                //解析方式以\n作为标记
//                .addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()))
//                .addLast("encode", new StringEncoder())
//                .addLast("decoder", new StringDecoder())
                //  HttpObjectAggregator:对httpMessage进行聚合,聚合成FullHttpRequest或FullHttpResponse
                // 几乎在netty中的编程 ,都会使用到此handler
//                .addLast("aggregator", new HttpObjectAggregator(65536))
                // ChunkedWriteHandler:对写大数据流的支持
                .addLast("chunked", new ChunkedWriteHandler())
                // 进行设置心跳检测
                .addLast(new IdleStateHandler(60, 30, 60 * 30, TimeUnit.SECONDS))
                // 配置通道处理  来进行业务处理
                /*
                 * websocket 服务器处理的协议 ,用于指定给客户端连接访问的路由 :/ws
                 * 本handler 会帮你处理一些繁重的复杂的事
                 * 会帮你处理握手动作 :handshaking (close,ping,pong)ping+pong=心跳
                 * 对于websocket来讲, 都是以frames进行传输的,不同的数据类型对应的frames也不同
                 * */
//                .addLast(new WebSocketServerProtocolHandler("/ws", null, true))
                // SVTP编码解码
                .addLast("svtp-encode", new OutboundEncoder())
                .addLast("svtp-decoder", new OutboundDecoder())
                // 自定义handler
                .addLast(new SocketServerHandler("/", null, false));
    }
}

自定义协议:

package cn.socket.protocol;

import cn.socket.server.SVTPPacket;
import com.google.common.base.Charsets;
import com.google.common.collect.Maps;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.util.Map;

/**
 * <pre>
 * SVTP(Session Voice Transport Protocol) 基于TCP协议的应用层协议
 *  数据包格式
 * +——----——-------+——-----————----——-------+——----——+
 * |协议开始标志     |  长度                  |   数据  |
 * |PT(包类型) 8bit |  PL(包长度)16bit        |        |
 * +——----——-------+——-----————----——-------+——----——+
 * PT: 0(EMPTY)|1(INFO)|2(DATA)|3(TEXT) 的一种
 * PL: 0 ~ 65535 的正整数,代表包正文部分的长度,
 * DATA: 包正文(文本或二进制数据流)
 *
 * 1.协议开始标志head_data,为int类型的数据,16进制表示为0X76
 * 2.传输数据的长度contentLength,int类型
 * 3.要传输的数据
 * </pre>
 */
@Slf4j
@Getter
public class OutboundProtocol {
    public enum SVTPType {
        EMPTY, INFO, DATA, TEXT
    }

    /**
     * 协议开始标记
     */
    private SVTPType type;
    /**
     * 数据长度
     */
    private int length;
    /**
     * 数据
     */
    private byte[] data;

    private Map<String, String> infoMap;

    public OutboundProtocol(SVTPType type, byte[] data) {
        this.type = type;
        this.data = data;
        this.length = data.length;

        parseInfo();
    }


    /**
     * 转byte
     *
     * @return
     */
    public byte[] toBytes() {
        byte[] bytes = new byte[data.length + 3];
        bytes[0] = (byte) type.ordinal();
        bytes[1] = (byte) (length >> 8 & 0xFF);
        bytes[2] = (byte) (length & 0xFF);
        System.arraycopy(data, 0, bytes, 3, length);
        return bytes;
    }

    /**
     * 获取辅助信息中的数据
     *
     * @param key 信息key
     * @return
     */
    public String getInfo(String key) {
        if (infoMap == null || infoMap.isEmpty() || key == null) {
            return null;
        }

        String val = infoMap.get(key.toLowerCase());
        if (StringUtils.isBlank(val)) {
            return null;
        }
        return val;
    }

    /**
     * 解析解析辅助信息
     */
    private void parseInfo() {
        if (SVTPPacket.SVTPType.INFO.equals(type)) {
            String infoStr = new String(data, Charsets.UTF_8);
            log.debug("Package content: " + infoStr);
            String[] lines = StringUtils.split(infoStr, "\n");
            infoMap = Maps.newHashMap();

            for (String line : lines) {
                line = StringUtils.trimToEmpty(line);
                String[] infoParts = StringUtils.split(line, ":", 2);
                if (infoParts.length == 2) {
                    String key = StringUtils.trimToNull(infoParts[0]);
                    String val = StringUtils.trimToNull(infoParts[1]);
                    if (key != null) {
                        infoMap.put(key.toLowerCase(), val);
                    }
                }
            }
        }
    }

}

编码器

package cn.socket.encoder;

import cn.socket.protocol.OutboundProtocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class OutboundEncoder extends MessageToByteEncoder<OutboundProtocol> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, OutboundProtocol msg, ByteBuf byteBuf) throws Exception {
        // 写入消息SmartCar的具体内容
        // 1.写入消息的开头的信息标志(byte类型)
        byteBuf.writeByte(msg.getType().ordinal());

        // 2.写入消息的长度(2个byte类型)
        int length = msg.getLength() & 0x0000FFFF;
        byte lowEndian = (byte) (length & 0x00FF);
        byte highEndian = (byte) ((length >> 8) & 0xFF);
        byteBuf.writeByte(highEndian);
        byteBuf.writeByte(lowEndian);

        // 3.写入消息的内容(byte[]类型)
        byteBuf.writeBytes(msg.getData());
    }
}

解码器

package cn.socket.decoder;

import cn.socket.protocol.OutboundProtocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

public class OutboundDecoder extends ByteToMessageDecoder {

    /**
     * PT(包类型) 8bit +  PL(包长度)16bit
     */
    public final int BASE_LENGTH = 1 + 2;

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        // 可读长度必须大于基本长度
        if (byteBuf.readableBytes() > BASE_LENGTH) {
            // 防止socket字节流攻击
            // 防止,客户端传来的数据过大
            // 因为,太大的数据,是不合理的
           /* if (byteBuf.readableBytes() > 2048) {
                byteBuf.skipBytes(byteBuf.readableBytes());
            }*/

            // 记录包头开始的index
            int beginReader;
            OutboundProtocol.SVTPType type;

            while (true) {
                // 获取包头开始的index
                beginReader = byteBuf.readerIndex();
                // 标记包头开始的index
                byteBuf.markReaderIndex();
                // 读到了协议的开始标志,结束while循环
                int pktType = byteBuf.readByte();
                if (pktType >= 0 && pktType < OutboundProtocol.SVTPType.values().length) {
                    type = OutboundProtocol.SVTPType.values()[pktType];
                    break;
                }

                // 未读到包头,略过一个字节
                // 每次略过,一个字节,去读取,包头信息的开始标记
                byteBuf.resetReaderIndex();
                byteBuf.readByte();

                // 当略过,一个字节之后,
                // 数据包的长度,又变得不满足
                // 此时,应该结束。等待后面的数据到达
                if (byteBuf.readableBytes() < BASE_LENGTH) {
                    return;
                }
            }
            // 消息的长度
            int ll = byteBuf.readByte();
            int lr = byteBuf.readByte();
            int length = (0xFF00 & (ll << 8)) | (0x00FF & lr);

            // 判断请求数据包数据是否到齐
            if (byteBuf.readableBytes() < length) {
                // 还原读指针
                byteBuf.readerIndex(beginReader);
                return;
            }

            // 读取data数据
            byte[] data = new byte[length];
            byteBuf.readBytes(data);

            OutboundProtocol protocol = new OutboundProtocol(type, data);
            list.add(protocol);

        }
    }
}

Netty中处理TCP会话session

在同步阻塞的网络编程中,代码都是按照TCP操作顺序编写的,即创建连接、多次读写、关闭连接,这样很容易判断这一系列操作是否是同一个TCP连接。而在事件驱动的异步编程网络模型中,IO操作都会触发一个事件Event调用事件函数处理该事件,例如接收到客户端的新数据,Mina会调用messageReceived,Netty会调用channelRead, Twisted会调用dataReceived,同一个TCP连接的多次请求和多个客户端的请求是一样的。

传统阻塞网络编程中,每个TCP连接都是独立的一个线程,同一个连接的数据流不会发送到其他的TCP连接。

public class CServer extends Thread {

    private static final Logger LOGGER = LoggerFactory.getLogger(CServer.class);

    private ServerSocket serverSocket;

    private int port = 4004;
    private int maxConnections = 1024;
    private String host = "0.0.0.0";
    private int clientNumber = 400;

    private void startSVTPServer() {
        while (Boolean.TRUE) {
            try {
                new CNetWorker(serverSocket.accept(), clientNumber++).start();
                LOGGER.info("accept socket, clientNumber: {}", clientNumber);
            } catch (IOException ignore) {
            }
        }
    }

    @Override
    public void run() {
        try {
            InetAddress address = InetAddress.getByName(host);
            serverSocket = new ServerSocket(port, maxConnections, address);
            LOGGER.info("Server started at {}:{}", host, port);
            startSVTPServer();
        } catch (UnknownHostException e) {
            LOGGER.error("Bind wrong ip {}.", host, e);
        } catch (IOException e) {
            LOGGER.error("IO Exception happend: ", e);
        } finally {
            if (serverSocket != null) {
                try {
                    serverSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }


    private static class CNetWorker extends Thread {
        enum State {
            CONNECTED, ESTABLISHED, CLOSED
        }

        private Socket socket;
        private int socketTimeout = 100000;
        private int clientNumber;
        private State state;
        private ByteArrayOutputStream dataBuffer;
        private OutputStream wavFile;
        private String uuid;
        private int rate;
        private InputStream socketIn;
        private OutputStream socketOut;

        public CNetWorker(Socket socket, int clientNumber) {
            this.socket = socket;
            try {
                socket.setSoTimeout(socketTimeout);
            } catch (SocketException ignore) {
            }
            this.clientNumber = clientNumber;
            state = State.CONNECTED;
            dataBuffer = new ByteArrayOutputStream();
        }

        public void run() {
            try {
                socketIn = new BufferedInputStream(socket.getInputStream());
                socketOut = socket.getOutputStream();
                while (Boolean.TRUE) {
                    SVTPPacket pkt = new SVTPPacket(socketIn);
                    if (State.CONNECTED.equals(state)) {
                        if (SVTPPacket.SVTPType.INFO.equals(pkt.getType())) {
                            String action = pkt.getInfo("action");
                            if ("start".equalsIgnoreCase(action)) {
                                String samplerate = pkt.getInfo("sample-rate");
                                this.rate = 8000;
                                if (rate == 8000 || rate == 16000) {
                                    this.rate = rate;
                                }
                                String uuid = pkt.getInfo("session-uuid");
                                this.uuid = uuid;
                                LOGGER.info("Session-uuid: {} started with samplerate: {}", uuid, rate);
                                state = State.ESTABLISHED;
                            }
                        }
                    } else if (State.ESTABLISHED.equals(state)) {
                        if (SVTPPacket.SVTPType.INFO.equals(pkt.getType())) {
                            if ("stop".equalsIgnoreCase(pkt.getInfo("action"))) {
                                state = State.CLOSED;
                                LOGGER.info("Session-uuid: {}, stoped.", uuid);
                                break;
                            }
                        } else if (SVTPPacket.SVTPType.DATA.equals(pkt.getType())) {
//                            asrFeed(pkt.getData());

                        }
                    }
                }
            } catch (IOException e) {
            }
        }
    }

}

在异步网络编程中,多个连接使用同一个ChannelHandler,如果同一个会话中的多个完整的消息数据流才能聚集成一个最终的数据包,比如语音流,需要处理session问题。

Mina定义了IoSession接口存储session信息。


public interface IoSession {     getAttribute(Object key)     getAttribute(Object key, Object defaultValue)     setAttribute(Object key)     setAttribute(Object key, Object value)     setAttributeIfAbsent(Object key)     setAttributeIfAbsent(Object key, Object value)     replaceAttribute(Object key, Object oldValue, Object newValue)     removeAttribute(Object key)     removeAttribute(Object key, Object value)     containsAttribute(Object key)     getAttributeKeys() }

Netty将这种看似Map的功能进一步抽象,形成了AttributeMap接口:


public interface AttributeMap { <T> Attribute<T> attr(AttributeKey<T> key); }

AttributeMap接口只有一个attr()方法,接收一个AttributeKey类型的key,返回一个Attribute类型的value。按照Javadoc,AttributeMap实现必须是线程安全的。

Netty中,所有的Channle和ChannelHandlerContext都实现了AttributeMap接口。

上图来源:https://blog.csdn.net/zxhoo/article/details/17719333

AttributeMap的key的类型全部是AttributeKey,AttributeKey是个泛型类,灵活方便。同时,AttributeKey继承了UniqueName类,内部使用ConcurrentHashMap来保证name的唯一性。


public interface Attribute<T> { AttributeKey<T> key(); T get(); void set(T value); T getAndSet(T value);     T setIfAbsent(T value);     T getAndRemove();     boolean compareAndSet(T oldValue, T newValue);     void remove(); }

参考文章/书籍

[1] Netty 实战

[2] Netty之解决TCP粘包拆包(自定义协议)

[3] Netty处理TCP连接的session

[4] Netty4学习笔记(7)– AttributeMap

点击量:14

发表评论