Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

6: 基于SACK的高效确认

功能描述:

协议采用了基于SACK(Selective Acknowledgment,选择性确认)的高效确认机制,取代了传统的累积确认(Cumulative ACK)。这使得接收方可以精确地告知发送方自己收到了哪些离散的数据块,极大地提高了在乱序和丢包网络环境下的确认效率和重传精度。

实现位置:

  • SACK统一管理: src/core/reliability/retransmission/sack_manager.rs - 集中管理所有SACK相关逻辑
  • SACK范围生成: src/core/reliability/recv_buffer.rs - 从接收缓冲区生成SACK范围
  • SACK序列化: src/packet/sack.rs - SACK数据的编解码
  • 可靠性层集成: src/core/reliability.rs - 通过 RetransmissionManager 协调SACK功能

1. SACK的工作原理

当网络发生乱序或丢包时,接收方的缓冲区中会形成不连续的数据块“空洞”。

  • 传统ACK: 只能告知发送方“我已连续收到了X号之前的所有包”,无法表达“我收到了X+2,但没收到X+1”。
  • SACK: ACK包的载荷中可以携带一个或多个SackRange(如{start: 10, end: 20}),明确告知发送方“我收到了序号从10到20的所有包”。

2. SACK管理器架构

2.1 统一的SACK管理器

SackManager 是SACK功能的核心,它被包含在 RetransmissionManager 中,集中管理所有SACK相关的状态和逻辑:

#![allow(unused)]
fn main() {
// 位于 src/core/reliability/retransmission/sack_manager.rs
pub struct SackManager {
    fast_retx_threshold: u16,
    in_flight_packets: BTreeMap<u32, SackInFlightPacket>,
    ack_threshold: u32,
    ack_eliciting_packets_count: u32,
}

impl SackManager {
    /// 处理接收到的ACK和SACK信息
    pub fn process_ack(
        &mut self,
        recv_next_seq: u32,
        sack_ranges: &[SackRange],
        now: Instant,
    ) -> SackProcessResult {
        // 1. 处理累积ACK
        // 2. 处理SACK范围
        // 3. 检查快速重传
        // 4. 计算RTT样本
    }
    
    /// 添加在途数据包
    pub fn add_in_flight_packet(&mut self, frame: Frame, now: Instant);
    
    /// 检查RTO重传
    pub fn check_for_rto(&mut self, rto: Duration, now: Instant) -> Vec<Frame>;
    
    /// 判断是否应发送独立ACK
    pub fn should_send_standalone_ack(&self, sack_ranges: &[SackRange]) -> bool;
}
}

2.2 SACK范围生成

ReceiveBuffer 负责从乱序接收的数据包中生成SACK范围,此部分职责不变:

#![allow(unused)]
fn main() {
// 位于 src/core/reliability/recv_buffer.rs
pub fn get_sack_ranges(&self) -> Vec<SackRange> {
    let mut ranges = Vec::new();
    let mut current_range: Option<SackRange> = None;

    for &seq in self.received.keys() {
        match current_range.as_mut() {
            Some(range) => {
                if seq == range.end + 1 {
                    // 序号是连续的,扩展当前范围
                    range.end = seq;
                } else {
                    // 出现空洞,完成当前范围,开始新范围
                    ranges.push(range.clone());
                    current_range = Some(SackRange { start: seq, end: seq });
                }
            }
            None => { // 开始第一个范围
                current_range = Some(SackRange { start: seq, end: seq });
            }
        }
    }
    // ... 添加最后一个范围 ...
    ranges
}
}

3. SACK信息的编码与发送

3.1 SACK编码

RetransmissionManager 将SACK编解码的职责委托给底层的 SackManager

#![allow(unused)]
fn main() {
// 位于 src/core/reliability/retransmission.rs
impl RetransmissionManager {
    pub fn encode_sack_ranges(&self, ranges: &[SackRange]) -> bytes::Bytes {
        self.sack_manager.encode_sack_ranges(ranges)
    }

    pub fn decode_sack_ranges(&self, payload: bytes::Bytes) -> Vec<SackRange> {
        self.sack_manager.decode_sack_ranges(payload)
    }
}
}

3.2 ACK帧构造

发送方在构造 ACK 包时,通过可靠性层获取SACK信息,此流程基本不变,但内部实现已更新:

#![allow(unused)]
fn main() {
// 位于 src/core/endpoint/frame_factory.rs
pub fn create_ack_frame(
    peer_cid: u32,
    reliability: &mut ReliabilityLayer,
    start_time: Instant,
) -> Frame {
    let (sack_ranges, recv_next, window_size) = reliability.get_ack_info();
    let timestamp = Instant::now().duration_since(start_time).as_millis() as u32;
    
    // 编码SACK范围
    let sack_payload = reliability.retransmission_manager.encode_sack_ranges(&sack_ranges);

    Frame::new_ack(
        peer_cid,
        recv_next,
        window_size,
        sack_payload,
        timestamp,
    )
}
}

4. SACK信息的处理

4.1 统一的ACK处理流程

当发送方收到一个 ACK 包时,ReliabilityLayer 通过 RetransmissionManager 进行统一处理:

#![allow(unused)]
fn main() {
// 位于 src/core/reliability.rs
pub fn handle_ack(
    &mut self,
    recv_next_seq: u32,
    sack_ranges: Vec<SackRange>,
    now: Instant,
) -> Vec<Frame> {
    // 使用统一重传管理器处理ACK
    let result = self.retransmission_manager.process_ack(recv_next_seq, &sack_ranges, now);

    // 更新RTT和拥塞控制
    for rtt_sample in result.rtt_samples {
        self.rto_estimator.update(rtt_sample, self.config.min_rto);
        self.congestion_control.on_ack(rtt_sample);
    }

    // 处理丢包事件
    if !result.frames_to_retransmit.is_empty() {
        self.congestion_control.on_packet_loss(now);
    }

    result.frames_to_retransmit
}
}

4.2 SACK处理的核心逻辑

SackManager::process_ack 实现了完整的SACK处理流程,这部分的核心逻辑保持不变,但现在它位于 sack_manager.rs 中,职责更加清晰。

#![allow(unused)]
fn main() {
// 位于 src/core/reliability/retransmission/sack_manager.rs
pub fn process_ack(
    &mut self,
    recv_next_seq: u32,
    sack_ranges: &[SackRange],
    now: Instant,
) -> SackProcessResult {
    let mut rtt_samples = Vec::new();
    let mut newly_acked_sequences = Vec::new();

    // 步骤1: 处理累积ACK
    let mut cumulative_acked_keys = Vec::new();
    for (&seq, packet) in self.in_flight_packets.iter() {
        if seq < recv_next_seq {
            cumulative_acked_keys.push(seq);
            rtt_samples.push(now.saturating_duration_since(packet.last_sent_at));
            newly_acked_sequences.push(seq);
        } else {
            break; // BTreeMap是有序的
        }
    }

    for key in cumulative_acked_keys {
        self.in_flight_packets.remove(&key);
    }

    // 步骤2: 处理SACK范围
    let mut sack_acked_sequences = Vec::new();
    for range in sack_ranges {
        for seq in range.start..=range.end {
            if let Some(packet) = self.in_flight_packets.remove(&seq) {
                rtt_samples.push(now.saturating_duration_since(packet.last_sent_at));
                newly_acked_sequences.push(seq);
                sack_acked_sequences.push(seq);
            }
        }
    }

    // 步骤3: 检查快速重传
    let frames_to_retransmit = self.check_fast_retransmission(
        &sack_acked_sequences,
        now,
    );

    SackProcessResult {
        frames_to_retransmit,
        rtt_samples,
        newly_acked_sequences,
    }
}
}

4.3 快速重传逻辑

快速重传逻辑完全封装在 SackManager 中,细节与之前相同。

#![allow(unused)]
fn main() {
// 位于 src/core/reliability/retransmission/sack_manager.rs
fn check_fast_retransmission(
    &mut self,
    sack_acked_sequences: &[u32],
    now: Instant,
) -> Vec<Frame> {
    let mut frames_to_retransmit = Vec::new();

    // 找到本次ACK中SACK确认的最高序列号
    if let Some(highest_sacked_seq) = sack_acked_sequences.iter().max().copied() {
        // 检查所有序列号小于最高SACK序列号的在途包
        let mut keys_to_modify = Vec::new();
        for (&seq, _) in self.in_flight_packets.range(..highest_sacked_seq) {
            keys_to_modify.push(seq);
        }

        // 增加快速重传计数器,达到阈值时触发重传
        for seq in keys_to_modify {
            if let Some(packet) = self.in_flight_packets.get_mut(&seq) {
                packet.fast_retx_count += 1;
                
                if packet.fast_retx_count >= self.fast_retx_threshold {
                    frames_to_retransmit.push(packet.frame.clone());
                    packet.last_sent_at = now;
                    packet.fast_retx_count = 0; // 重传后重置计数器
                }
            }
        }
    }

    frames_to_retransmit
}
}

5. 架构优势

5.1 集中化管理

  • 所有SACK相关逻辑(在途包跟踪、ACK处理、快速重传)集中在 SackManager 中,便于维护和测试。
  • RetransmissionManager 统一了接口,对上层屏蔽了SACK和简单重传的差异。

5.2 清晰的职责分离

  • ReceiveBuffer: 负责SACK范围生成。
  • SackManager: 负责SACK确认和重传的核心逻辑。
  • SimpleRetransmissionManager: 负责非SACK帧的简单重传。
  • ReliabilityLayer: 负责协调各组件。
  • SendBuffer: 职责被极大简化,仅作为待发送数据的流缓冲区。

5.3 高效的处理流程

  • 双重确认机制(累积ACK + SACK)最大化信息利用。
  • 精确的快速重传检测,减少不必要的重传。
  • 统一的RTT计算和拥塞控制集成。

这种重构后的架构确保了SACK功能的高效性和可维护性,同时为未来的扩展提供了良好的基础。