Lines
94.29 %
Functions
57.89 %
Branches
100 %
//! UDP chunked-message framing (SUPER_PLAN_2 P8).
//!
//! Splits a large payload (a video keyframe) into sequenced datagrams and
//! reassembles them on the far side, tolerating reorder + loss: a message whose
//! chunks never all arrive is dropped (bounded memory), never retransmitted -
//! the fault-tolerant model realtime A/V wants. This is pure (no socket), so
//! the dll's `Udp` handle builds on it and the logic is unit-testable here.
//! Wire format per datagram: an 8-byte little-endian header
//! (`msg_id: u32`, `chunk_idx: u16`, `chunk_count: u16`) followed by the chunk
//! payload.
use alloc::collections::BTreeMap;
use alloc::vec::Vec;
/// Datagram header length: msg_id (u32) + chunk_idx (u16) + chunk_count (u16).
pub const CHUNK_HEADER_LEN: usize = 8;
/// Conservative per-datagram payload (leaves room under a ~1400-byte path MTU).
pub const DEFAULT_CHUNK_PAYLOAD: usize = 1200;
/// Cap on in-flight partial messages; the oldest is evicted past this so a lost
/// chunk can never leak memory.
const MAX_PARTIAL_MESSAGES: usize = 256;
/// Split `data` into chunk datagrams for `msg_id`. Each datagram is
/// `CHUNK_HEADER_LEN + <= max_payload` bytes. An empty payload still produces
/// one (header-only) chunk, so a zero-length message round-trips.
pub fn chunk_message(msg_id: u32, data: &[u8], max_payload: usize) -> Vec<Vec<u8>> {
let max_payload = max_payload.max(1);
let count = if data.is_empty() {
1
} else {
data.len().div_ceil(max_payload)
};
let count_u16 = count.min(u16::MAX as usize) as u16;
let mut out = Vec::with_capacity(count_u16 as usize);
for idx in 0..count_u16 {
let start = idx as usize * max_payload;
let end = (start + max_payload).min(data.len());
let mut p = Vec::with_capacity(CHUNK_HEADER_LEN + end.saturating_sub(start));
p.extend_from_slice(&msg_id.to_le_bytes());
p.extend_from_slice(&idx.to_le_bytes());
p.extend_from_slice(&count_u16.to_le_bytes());
if start < end {
p.extend_from_slice(&data[start..end]);
}
out.push(p);
out
struct PartialMessage {
count: u16,
chunks: BTreeMap<u16, Vec<u8>>,
/// Reassembles chunk datagrams into complete messages, tolerating out-of-order
/// delivery and dropping incomplete messages once too many pile up.
#[derive(Default)]
pub struct UdpReassembler {
partial: BTreeMap<u32, PartialMessage>,
impl UdpReassembler {
pub fn new() -> Self {
Self::default()
/// Ingest one datagram. Returns the fully-reassembled message if this
/// datagram completed one, else `None`. Malformed datagrams are ignored.
pub fn ingest(&mut self, datagram: &[u8]) -> Option<Vec<u8>> {
if datagram.len() < CHUNK_HEADER_LEN {
return None;
let msg_id = u32::from_le_bytes([datagram[0], datagram[1], datagram[2], datagram[3]]);
let idx = u16::from_le_bytes([datagram[4], datagram[5]]);
let count = u16::from_le_bytes([datagram[6], datagram[7]]);
if count == 0 || idx >= count {
let entry = self
.partial
.entry(msg_id)
.or_insert_with(|| PartialMessage {
count,
chunks: BTreeMap::new(),
});
entry.chunks.insert(idx, datagram[CHUNK_HEADER_LEN..].to_vec());
if entry.chunks.len() == entry.count as usize {
let msg = self.partial.remove(&msg_id).unwrap();
let mut out = Vec::new();
for (_, chunk) in msg.chunks {
out.extend_from_slice(&chunk);
return Some(out);
// Bound memory: evict the oldest partial message if too many pile up.
if self.partial.len() > MAX_PARTIAL_MESSAGES {
if let Some((&oldest, _)) = self.partial.iter().next() {
self.partial.remove(&oldest);
None
#[cfg(test)]
mod tests {
use super::*;
use alloc::vec;
#[test]
fn chunk_reassemble_roundtrip() {
let data: Vec<u8> = (0..3000u32).map(|i| (i % 256) as u8).collect();
let chunks = chunk_message(7, &data, DEFAULT_CHUNK_PAYLOAD);
assert_eq!(chunks.len(), 3, "3000 bytes / 1200 = 3 chunks");
let mut r = UdpReassembler::new();
let mut done = None;
for c in &chunks {
if let Some(m) = r.ingest(c) {
done = Some(m);
assert_eq!(done.expect("message completes"), data);
fn reassembles_out_of_order() {
let data: Vec<u8> = (0..2500u32).map(|i| (i % 7) as u8).collect();
let mut chunks = chunk_message(1, &data, DEFAULT_CHUNK_PAYLOAD);
chunks.reverse(); // deliver last-chunk-first
assert_eq!(done.expect("reorder-tolerant"), data);
fn incomplete_message_yields_nothing() {
let data: Vec<u8> = vec![9u8; 2000]; // 2 chunks
let chunks = chunk_message(2, &data, DEFAULT_CHUNK_PAYLOAD);
assert!(
r.ingest(&chunks[0]).is_none(),
"one of two chunks is not a complete message"
);
fn empty_message_roundtrips() {
let chunks = chunk_message(3, &[], DEFAULT_CHUNK_PAYLOAD);
assert_eq!(chunks.len(), 1, "empty payload still sends one chunk");
assert_eq!(r.ingest(&chunks[0]).expect("completes"), Vec::<u8>::new());
fn two_interleaved_messages() {
let a: Vec<u8> = vec![1u8; 1500]; // 2 chunks
let b: Vec<u8> = vec![2u8; 1500]; // 2 chunks
let ca = chunk_message(10, &a, DEFAULT_CHUNK_PAYLOAD);
let cb = chunk_message(11, &b, DEFAULT_CHUNK_PAYLOAD);
// Interleave: a0, b0, a1 (-> a done), b1 (-> b done)
assert!(r.ingest(&ca[0]).is_none());
assert!(r.ingest(&cb[0]).is_none());
assert_eq!(r.ingest(&ca[1]).expect("a done"), a);
assert_eq!(r.ingest(&cb[1]).expect("b done"), b);