java.nio.channels.IllegalBlockingModeException

异常信息

编写一个 NIO 程序 demo 时,抛出了如下异常:

java.nio.channels.IllegalBlockingModeException

相关代码:

public class NIO_Demo2 {
    public static void main(String[] args) throws IOException, InterruptedException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress("0.0.0.0", 3333), 1000);
        serverSocketChannel.configureBlocking(false);

        final Selector[] selectors = new Selector[Runtime.getRuntime().availableProcessors()];
        for (int i = 0; i < selectors.length; i++) {
            final Selector selector = Selector.open();
            selectors[i] = selector;
            new Thread(new ClientProcessor(selector)).start();

        }

        AtomicInteger id = new AtomicInteger();
        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            selector.select();
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                iterator.next();
                SocketChannel socketChannel = serverSocketChannel.accept();
                socketChannel.configureBlocking(false);
                socketChannel.register(selectors[id.getAndIncrement() % selectors.length], SelectionKey.OP_READ);
                iterator.remove();
            }
        }
    }

    /**
     * 客户端消息处理器
     */
    static class ClientProcessor implements Runnable {
        private Selector selector;

        public ClientProcessor(Selector selector) {
            this.selector = selector;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    selector.select();
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();

                    for (SelectionKey key : selectionKeys) {
                        if (!key.isValid()) {
                            continue;
                        }

                        if (key.isReadable()) {
                            SocketChannel socketChannel = (SocketChannel) key.channel();
                            ByteBuffer readBuff = (ByteBuffer) key.attachment();
                            int read = socketChannel.read(readBuff);

                            if (read == -1) {
                                //通道连接关闭,可以取消这个注册键,后续不在触发。
                                key.cancel();
                                socketChannel.close();
                            } else {
                                //翻转buffer,从写入状态切换到读取状态
                                readBuff.flip();
                                int position = readBuff.position();
                                int limit = readBuff.limit();

                                List<ByteBuffer> buffers = new ArrayList<>();
                                // 按照协议从流中分割出消息
                                /**从readBuffer确认每一个字节,发现分割符则切分出一个消息**/
                                for (int i = position; i < limit; i++) {
                                    //读取到消息结束符
                                    if (readBuff.get() == '\r') {
                                        ByteBuffer message = ByteBuffer.allocate(i - readBuff.position());
                                        readBuff.limit(i);
                                        message.put(readBuff);
                                        readBuff.limit(limit);
                                        message.flip();
                                        buffers.add(message);
                                    }
                                }

                                /**从readBuffer确认每一个字节,发现分割符则切分出一个消息**/
                                /**将所有得到的消息发送出去**/
                                for (ByteBuffer buffer : buffers) {
                                    while (buffer.hasRemaining()) {
                                        socketChannel.write(buffer);
                                    }
                                }
                                /**将所有得到的消息发送出去**/
                                // 压缩readBuffer,压缩完毕后进入写入状态。并且由于长度是256,压缩之后必然有足够的空间可以写入一条消息
                                readBuff.compact();
                            }

                        }
                    }
                    selectionKeys.clear();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

发生错误语句:

socketChannel.register(selectors[id.getAndIncrement() % selectors.length], SelectionKey.OP_READ);

错误原因:

必须设置通道为 非阻塞,才能向 Selector 注册。

解决方法:

在发生错误的语句前添加:

socketChannel.configureBlocking(false);

注意参数值,false 为 非阻塞,true 为 阻塞。

点击量:13

发表评论