Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

用户接口 (Stream) - 熟悉的字节流API

概述

stream模块是整个可靠UDP协议栈最顶层的用户接口。它将底层所有复杂的、面向数据包的异步操作(如连接管理、重传、拥塞控制)封装成一个简洁、易于使用的Stream结构体。这个结构体实现了tokio::io::AsyncReadtokio::io::AsyncWrite两个核心trait,为开发者提供了与tokio::net::TcpStream几乎完全相同的编程体验。

核心使命:

  • 抽象与封装: 向用户完全隐藏底层的EndpointFrame、ACK和重传逻辑。
  • 提供标准接口: 实现AsyncReadAsyncWrite,无缝集成到Tokio的I/O生态系统中。
  • 连接应用层与协议栈: 作为用户应用程序与Endpoint任务之间的桥梁,通过异步通道双向传递数据和命令。
  • 提供高级功能: 暴露如migrate()等协议特有的高级功能。

架构实现:

  • 流结构体: src/core/stream.rs - Stream结构体的定义及其AsyncRead/AsyncWrite的实现。

设计原则

1. 最小意外原则 (Principle of Least Astonishment)

  • API一致性: Stream的API设计刻意模仿了TcpStream,用户可以使用熟悉的read(), write_all(), shutdown()等方法,极大地降低了学习成本。
  • 行为可预测: read()在连接关闭时返回Ok(0)write()在连接断开时返回ErrorKind::BrokenPipe,这些行为都符合标准库I/O对象的惯例。

2. 异步解耦

  • Actor模型交互: Stream本身不包含任何协议的核心逻辑。它是一个轻量级的“句柄”(Handle),其所有操作都被转换成StreamCommand消息,通过mpsc通道发送给在独立任务中运行的Endpoint Actor。
  • 双向通道:
    • 写路径 (Stream -> Endpoint): 用户调用write()时,数据被封装成StreamCommand::SendData发送出去。
    • 读路径 (Endpoint -> Stream): Endpoint将从网络接收并重组好的有序数据通过另一个mpsc通道发送给Stream
  • 无阻塞IO: poll_write通过try_send实现,如果通道已满,则返回Poll::Pending,将背压(Backpressure)自然地传递给调用者。poll_read在内部缓冲区为空时,会异步地poll_recv等待Endpoint传来新数据。

整体架构与数据流

Stream是连接用户代码和Endpoint任务的桥梁。

graph TD
    subgraph "用户应用"
        A["用户代码"] -- "调用 read()/write()" --> B(Stream)
    end

    subgraph "Stream句柄"
        B -- "发送 StreamCommand" --> C(tx_to_endpoint)
        B -- "接收 Vec<Bytes>" --> D(rx_from_endpoint)
        B -- "缓冲数据至" --> E["read_buffer: VecDeque<Bytes>"]
    end

    subgraph "Endpoint任务"
        F["Endpoint事件循环"] -- "接收" --> C
        F -- "发送" --> D
    end

    A -- "读取自" --> E

    style A fill:#333,color:#fff
    style B fill:#333,color:#fff
    style C fill:#333,color:#fff
    style D fill:#333,color:#fff
    style E fill:#333,color:#fff
    style F fill:#333,color:#fff
	

数据流解读:

  • 写入流程:

    1. 用户代码调用stream.write(buf)
    2. poll_write方法被调用,它尝试将buf封装成StreamCommand::SendData并通过tx_to_endpoint通道发送。
    3. 如果通道未满,发送成功,返回Poll::Ready(Ok(buf.len()))
    4. 如果通道已满(表示Endpoint处理不过来),try_send失败,poll_write返回Poll::Pending,用户的write调用会异步地等待。
  • 读取流程:

    1. 用户代码调用stream.read(buf)
    2. poll_read方法被调用,它首先检查内部的read_buffer
    3. 如果read_buffer中有数据: 从read_buffer的第一个Bytes块中拷贝数据到用户的buf中,并返回Poll::Ready(Ok(()))
    4. 如果read_buffer为空: 它会调用rx_from_endpoint.poll_recv(cx)来异步地等待Endpoint任务发送新的数据过来。
    5. 收到新数据: 当poll_recv返回Poll::Ready(Some(data))时,Stream将收到的Vec<Bytes>存入read_buffer,然后循环回到第3步,从刚填充的缓冲区中读取数据。
    6. 通道关闭: 当poll_recv返回Poll::Ready(None)时,表示Endpoint已经终止,连接已关闭。poll_read返回Poll::Ready(Ok(())),用户的read调用会得到Ok(0),表示EOF。
    7. 无新数据: 当poll_recv返回Poll::Pending时,表示当前既没有缓冲数据,也没有新数据到达,poll_read返回Poll::Pending,用户的read调用会异步地等待。

核心实现解析

AsyncWrite 实现

poll_write的实现非常简洁,它是一个非阻塞的“尽力而为”发送。

#![allow(unused)]
fn main() {
// In src/core/stream.rs
fn poll_write(
    self: Pin<&mut Self>,
    _cx: &mut Context<'_>,
    buf: &[u8],
) -> Poll<std::io::Result<usize>> {
    match self.tx_to_endpoint.try_send(...) {
        Ok(_) => Poll::Ready(Ok(buf.len())),
        Err(mpsc::error::TrySendError::Full(_)) => Poll::Pending, // 背压
        Err(mpsc::error::TrySendError::Closed(_)) => {
            Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into()))
        }
    }
}
}

poll_shutdown同样是将一个StreamCommand::Close消息发送给Endpoint,由Endpoint来执行实际的四次挥手关闭逻辑。

AsyncRead 实现

poll_read的实现略微复杂,因为它需要管理一个内部的read_buffer来处理Endpoint一次性发送多个数据块(Vec<Bytes>)和用户read缓冲区大小不匹配的情况。

#![allow(unused)]
fn main() {
// In src/core/stream.rs
fn poll_read(
    mut self: Pin<&mut Self>,
    cx: &mut Context<'_>,
    buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
    loop {
        // 1. 优先从内部缓冲区读取
        if !self.read_buffer.is_empty() {
            // ... copy data from self.read_buffer to buf ...
            if bytes_copied > 0 {
                return Poll::Ready(Ok(())); // 只要读到了数据,就必须立即返回
            }
        }

        // 2. 缓冲区为空,尝试从通道拉取新数据
        match self.rx_from_endpoint.poll_recv(cx) {
            Poll::Ready(Some(data_vec)) => {
                self.read_buffer.extend(data_vec);
                continue; // 收到新数据,回到循环开始处处理
            }
            Poll::Ready(None) => return Poll::Ready(Ok(())), // EOF
            Poll::Pending => return Poll::Pending, // 没数据,等待
        }
    }
}
}

这个loop + continue的模式是AsyncRead实现中的一个关键点,它确保了只要从底层通道收到了新数据,就会立即尝试用这些新数据来满足当前的read请求,而不是错误地返回Pending

总结

Stream模块是协议栈人性化设计的体现。它成功地将底层复杂的、基于消息和数据包的协议核心,转换成了广大Rust开发者所熟悉和喜爱的标准异步字节流接口。通过这种方式,它极大地降低了协议的使用门槛,使得开发者可以像使用TCP一样,轻松地构建基于此可靠UDP协议的应用程序。