1
//! UDP chunked-message framing (SUPER_PLAN_2 P8).
2
//!
3
//! Splits a large payload (a video keyframe) into sequenced datagrams and
4
//! reassembles them on the far side, tolerating reorder + loss: a message whose
5
//! chunks never all arrive is dropped (bounded memory), never retransmitted -
6
//! the fault-tolerant model realtime A/V wants. This is pure (no socket), so
7
//! the dll's `Udp` handle builds on it and the logic is unit-testable here.
8
//!
9
//! Wire format per datagram: an 8-byte little-endian header
10
//! (`msg_id: u32`, `chunk_idx: u16`, `chunk_count: u16`) followed by the chunk
11
//! payload.
12

            
13
use alloc::collections::BTreeMap;
14
use alloc::vec::Vec;
15

            
16
/// Datagram header length: msg_id (u32) + chunk_idx (u16) + chunk_count (u16).
17
pub const CHUNK_HEADER_LEN: usize = 8;
18
/// Conservative per-datagram payload (leaves room under a ~1400-byte path MTU).
19
pub const DEFAULT_CHUNK_PAYLOAD: usize = 1200;
20
/// Cap on in-flight partial messages; the oldest is evicted past this so a lost
21
/// chunk can never leak memory.
22
const MAX_PARTIAL_MESSAGES: usize = 256;
23

            
24
/// Split `data` into chunk datagrams for `msg_id`. Each datagram is
25
/// `CHUNK_HEADER_LEN + <= max_payload` bytes. An empty payload still produces
26
/// one (header-only) chunk, so a zero-length message round-trips.
27
6
pub fn chunk_message(msg_id: u32, data: &[u8], max_payload: usize) -> Vec<Vec<u8>> {
28
6
    let max_payload = max_payload.max(1);
29
6
    let count = if data.is_empty() {
30
1
        1
31
    } else {
32
5
        data.len().div_ceil(max_payload)
33
    };
34
6
    let count_u16 = count.min(u16::MAX as usize) as u16;
35
6
    let mut out = Vec::with_capacity(count_u16 as usize);
36
13
    for idx in 0..count_u16 {
37
13
        let start = idx as usize * max_payload;
38
13
        let end = (start + max_payload).min(data.len());
39
13
        let mut p = Vec::with_capacity(CHUNK_HEADER_LEN + end.saturating_sub(start));
40
13
        p.extend_from_slice(&msg_id.to_le_bytes());
41
13
        p.extend_from_slice(&idx.to_le_bytes());
42
13
        p.extend_from_slice(&count_u16.to_le_bytes());
43
13
        if start < end {
44
12
            p.extend_from_slice(&data[start..end]);
45
12
        }
46
13
        out.push(p);
47
    }
48
6
    out
49
6
}
50

            
51
struct PartialMessage {
52
    count: u16,
53
    chunks: BTreeMap<u16, Vec<u8>>,
54
}
55

            
56
/// Reassembles chunk datagrams into complete messages, tolerating out-of-order
57
/// delivery and dropping incomplete messages once too many pile up.
58
#[derive(Default)]
59
pub struct UdpReassembler {
60
    partial: BTreeMap<u32, PartialMessage>,
61
}
62

            
63
impl UdpReassembler {
64
5
    pub fn new() -> Self {
65
5
        Self::default()
66
5
    }
67

            
68
    /// Ingest one datagram. Returns the fully-reassembled message if this
69
    /// datagram completed one, else `None`. Malformed datagrams are ignored.
70
12
    pub fn ingest(&mut self, datagram: &[u8]) -> Option<Vec<u8>> {
71
12
        if datagram.len() < CHUNK_HEADER_LEN {
72
            return None;
73
12
        }
74
12
        let msg_id = u32::from_le_bytes([datagram[0], datagram[1], datagram[2], datagram[3]]);
75
12
        let idx = u16::from_le_bytes([datagram[4], datagram[5]]);
76
12
        let count = u16::from_le_bytes([datagram[6], datagram[7]]);
77
12
        if count == 0 || idx >= count {
78
            return None;
79
12
        }
80

            
81
12
        let entry = self
82
12
            .partial
83
12
            .entry(msg_id)
84
12
            .or_insert_with(|| PartialMessage {
85
6
                count,
86
6
                chunks: BTreeMap::new(),
87
6
            });
88
12
        entry.chunks.insert(idx, datagram[CHUNK_HEADER_LEN..].to_vec());
89

            
90
12
        if entry.chunks.len() == entry.count as usize {
91
5
            let msg = self.partial.remove(&msg_id).unwrap();
92
5
            let mut out = Vec::new();
93
16
            for (_, chunk) in msg.chunks {
94
11
                out.extend_from_slice(&chunk);
95
11
            }
96
5
            return Some(out);
97
7
        }
98

            
99
        // Bound memory: evict the oldest partial message if too many pile up.
100
7
        if self.partial.len() > MAX_PARTIAL_MESSAGES {
101
            if let Some((&oldest, _)) = self.partial.iter().next() {
102
                self.partial.remove(&oldest);
103
            }
104
7
        }
105
7
        None
106
12
    }
107
}
108

            
109
#[cfg(test)]
110
mod tests {
111
    use super::*;
112
    use alloc::vec;
113

            
114
    #[test]
115
1
    fn chunk_reassemble_roundtrip() {
116
3000
        let data: Vec<u8> = (0..3000u32).map(|i| (i % 256) as u8).collect();
117
1
        let chunks = chunk_message(7, &data, DEFAULT_CHUNK_PAYLOAD);
118
1
        assert_eq!(chunks.len(), 3, "3000 bytes / 1200 = 3 chunks");
119

            
120
1
        let mut r = UdpReassembler::new();
121
1
        let mut done = None;
122
4
        for c in &chunks {
123
3
            if let Some(m) = r.ingest(c) {
124
1
                done = Some(m);
125
2
            }
126
        }
127
1
        assert_eq!(done.expect("message completes"), data);
128
1
    }
129

            
130
    #[test]
131
1
    fn reassembles_out_of_order() {
132
2500
        let data: Vec<u8> = (0..2500u32).map(|i| (i % 7) as u8).collect();
133
1
        let mut chunks = chunk_message(1, &data, DEFAULT_CHUNK_PAYLOAD);
134
1
        chunks.reverse(); // deliver last-chunk-first
135

            
136
1
        let mut r = UdpReassembler::new();
137
1
        let mut done = None;
138
4
        for c in &chunks {
139
3
            if let Some(m) = r.ingest(c) {
140
1
                done = Some(m);
141
2
            }
142
        }
143
1
        assert_eq!(done.expect("reorder-tolerant"), data);
144
1
    }
145

            
146
    #[test]
147
1
    fn incomplete_message_yields_nothing() {
148
1
        let data: Vec<u8> = vec![9u8; 2000]; // 2 chunks
149
1
        let chunks = chunk_message(2, &data, DEFAULT_CHUNK_PAYLOAD);
150
1
        let mut r = UdpReassembler::new();
151
1
        assert!(
152
1
            r.ingest(&chunks[0]).is_none(),
153
            "one of two chunks is not a complete message"
154
        );
155
1
    }
156

            
157
    #[test]
158
1
    fn empty_message_roundtrips() {
159
1
        let chunks = chunk_message(3, &[], DEFAULT_CHUNK_PAYLOAD);
160
1
        assert_eq!(chunks.len(), 1, "empty payload still sends one chunk");
161
1
        let mut r = UdpReassembler::new();
162
1
        assert_eq!(r.ingest(&chunks[0]).expect("completes"), Vec::<u8>::new());
163
1
    }
164

            
165
    #[test]
166
1
    fn two_interleaved_messages() {
167
1
        let a: Vec<u8> = vec![1u8; 1500]; // 2 chunks
168
1
        let b: Vec<u8> = vec![2u8; 1500]; // 2 chunks
169
1
        let ca = chunk_message(10, &a, DEFAULT_CHUNK_PAYLOAD);
170
1
        let cb = chunk_message(11, &b, DEFAULT_CHUNK_PAYLOAD);
171
1
        let mut r = UdpReassembler::new();
172
        // Interleave: a0, b0, a1 (-> a done), b1 (-> b done)
173
1
        assert!(r.ingest(&ca[0]).is_none());
174
1
        assert!(r.ingest(&cb[0]).is_none());
175
1
        assert_eq!(r.ingest(&ca[1]).expect("a done"), a);
176
1
        assert_eq!(r.ingest(&cb[1]).expect("b done"), b);
177
1
    }
178
}