likes
comments
collection
share

在 Rust 中实现 TCP : 3. TCP连接四元组

作者站长头像
站长
· 阅读数 12

连接四元组

我们的项目已经取得了很大的进展——接下来能够开始解决 TCP 协议的实现问题。下面将讨论 TCP 的一些行为及其各种状态。

在 Rust 中实现 TCP : 3. TCP连接四元组

在多任务操作系统中,各种应用程序(例如 Web 服务器、电子邮件客户端等)需要同时进行网络访问。为了区分这些不同的网络活动,每个应用程序将自己绑定到一个唯一的端口号。端口号与主机的 IP地址的组合形成(在非常基本的级别)所谓的“套接字” - 内核管理的抽象对象。为了建立连接,需要使用一对这样的套接字——一个用于发送端机器,一个用于接收端机器。而套接字对是唯一的,可以有效地标识一个连接。

与 TCP 连接端点 相关的非常重要的数据结构是 传输控制块 (TCB)。 TCB 充当与已建立或挂起的连接 相关的所有参数和变量的 存储库。 TCB 包含从套接字地址和端口号到序列号和窗口大小的所有内容,是 TCP 操作逻辑的基础。

每个 TCB 都是 某个特定连接 的 TCP 状态的封装。它存储“传输控制协议”中“可靠性”所必需的详细信息,例如未确认的数据、流量控制参数和下一个预期序列号。本质上,TCB充当TCP操作的控制中心,实时维护和更新连接状态。 TCB 充当 TCP 连接的内核账簿,存储从端口号和 IP 地址到流量控制参数和待处理数据的所有内容。本质上,TCB 是 TCP 协议栈需要维护的所有指标和变量的存储库,以便可靠有效地处理连接。

现在,考虑一台涉及多个 TCP 连接的机器。它如何区分它们并辨别哪个传入数据包属于哪个连接?这就是“TCP连接四元组”概念 ,它至关重要。连接四元组由具有四个元素组成:源 IP 地址、源端口、目标 IP 地址和目标端口。该元组充当每个 活动TCP连接 的唯一标识符。连接四元组 作为 TCB 哈希表 的索引很实用。当数据包到达时,TCP 栈使用连接四元组来查找相应的 TCB,进而查找相关的状态机。这确保了根据封装在该 TCB 中的状态机所定义的正确的规则和变量集来处理数据包。 TCP 创建并维护传输控制块 (TCB) 的哈希表来存储每个 TCP 连接的数据。每个活动连接的控制块都会添加到hash表中。连接关闭后不久,控制块就会被删除。

连接在其生命周期内会经历一系列状态。这些状态包括:LISTEN、SYN-SENT、SYN-RECEIVED、ESTABLISHED、FIN-WAIT-1、FIN-WAIT-2、CLOSE-WAIT、CLOSING、LAST-ACK、TIME-WAIT 和虚构状态 CLOSED。这些状态保存在传输控制块 (TCB) 中,充当指导每个连接点的连接行为的关键标记。从最初的握手到数据传输和最终的断开,连接的状态都会被仔细跟踪,以保证可靠和有序的数据交换。

初始状态 - CLOSEDLISTEN - 作为连接的起点。当处于 CLOSED 状态时,不存在传输控制块(TCB),本质上是连接不存在。它类似于未初始化的变量;在此状态下收到的任何数据包都将被忽略。将此与 LISTEN 状态进行对比,在该状态下系统正在主动等待传入的连接请求。一旦收到 SYN 数据包,状态就会转换为 SYN-RECEIVED ,启动 TCP 握手。连接在握手后达到 稳定的 ESTABLISHED 状态,此时发生大部分数据交换。

连接结束时,它会级联一系列 FIN-WAITCLOSE-WAIT 状态,最终达到 TIME-WAITLAST-ACK ,具体取决于拆除动作。这些终端状态确保网络中任何延迟的数据包都得到考虑,从而提供优雅的连接拆除。系统最终恢复到 CLOSED 状态,释放TCB和相关资源以用于将来的连接。

在 Rust 中实现 TCP : 3. TCP连接四元组

TCP 连接状态图 图 6。如 RFC 793 中所示。

说得够多了,让我们写一些代码吧!首先,将使用枚举对 状态转换图 进行编码。为了使事情更加模块化,我们可以创建一个名为 TCP 的新模块,并在 TCP.rs 中放置一些代码。与 TCP 状态机相关的 Enum 和其他逻辑将保留在此处。

// tcp.rs
// Defining possible TCP states. 
// Each state represents a specific stage in the TCP connection.
pub enum State { 
/// The connection is closed and no active connection exists. 
Closed, 
/// The endpoint is waiting for a connection attempt from a remote endpoint. 
Listen, 
/// The endpoint has received a SYN (synchronize) segment and has sent a SYN-ACK /// (synchronize-acknowledgment) segment in response. It is awaiting an ACK (acknowledgment) /// segment from the remote endpoint. 
SynRcvd, 
/// The connection is established, and both endpoints can send and receive data. Estab, 
}


// Implementing the Default trait for State. // Sets the default TCP state to 'Listen'. 
impl Default for State { 
	fn default() -> Self { 
		State::Listen 
	} 
}

// Implementing methods for State. 
impl State {
// Method to handle incoming TCP packets. 
// 'iph' contains the parsed IPv4 header, 'tcph' contains the parsed TCP header, and 'data' contains the TCP payload. 
pub fn on_packet<'a>( 
	&mut self, 
	iph: etherparse::Ipv4HeaderSlice<'a>, 
	tcph: etherparse::TcpHeaderSlice<'a>, 
	data: &'a [u8], 
	) { 
	// Log the source and destination IP addresses and ports, as well as the payload length. 
		eprintln!( 
			"{}:{} -> {}:{} {}b of TCP", 
			iph.source_addr(), 
			tcph.source_port(), 
			iph.destination_addr(), 
			tcph.destination_port(), 
			data.len() 
		); 
	} 
}
//main.rs
// Importing necessary modules and packages.
use std::io;
use std::collections::HashMap;
use std::net::Ipv4Addr;

// Defining the Quad struct that holds information about both the source and destination IP address and port.
// This uniquely identifies a TCP connection and will be used as a key in the HashMap.
#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
struct Quad {
    src: (Ipv4Addr, u16),
    dst: (Ipv4Addr, u16),
}

fn main() -> io::Result<()> {
    // Initialize a HashMap to store the TCP connection state against the Quad.
    // Quad is the key and the TCP state (from tcp.rs) is the value.
    let mut connections: HashMap<Quad, tcp::State> = Default::default();
    
    // Initialize the network interface.
    let nic = tun_tap::Iface::new("tun0", tun_tap::Mode::Tun)?;
    
    // Buffer to store the incoming packets.
    let mut buf = [0u8; 1504];

    loop {
    
			-----SKIP-----

        // Attempt to parse the IPv4 header.
        match etherparse::Ipv4HeaderSlice::from_slice(&buf[4..nbytes]) {
            Ok(iph) => {
					-----SKIP-----

                // Attempt to parse the TCP header.
                match etherparse::TcpHeaderSlice::from_slice(&buf[4 + iph.slice().len()..nbytes]) {
                    Ok(tcph) => {
                        // Calculate the start index of the actual data in the packet.
                        let datai = 4 + iph.slice().len() + tcph.slice().len();
                        
                        // Look for or create a new entry in the HashMap for this connection.
  
			// Check if the connection already exists in the HashMap, otherwise create a new entry 
			match connections.entry(Quad { 
			src: (src, tcph.source_port()), 
			dst: (dst, tcph.destination_port()), 
			}) { 
			Entry::Occupied(mut c) => { 
			c.get_mut().on_packet(&mut nic, iph, tcph, &buf[datai..nbytes])?; 
			} 
			Entry::Vacant(e) => { 
			if let Some(c) = tcp::Connection::on_accept(&mut nic, iph, tcph, &buf[datai..nbytes])? { 
			e.insert(c); 
			} } } }
                    }
                    Err(e) => {
                        // Handle TCP header parsing errors.
                        eprintln!("An error occurred while parsing the TCP packet: {:?}", e);
                    }
                }
            }
            Err(e) => {
                // Handle IPv4 header parsing errors.
                eprintln!("An error occurred while parsing the IP packet: {:?}", e);
            }
        }
    }
}

在上面代码中,通过定义 TCP 连接所需的关键数据结构和函数来建立 TCP 协议栈的基础。该架构的核心是 Quad 结构,它充当名为 connectionsHashMap 中每个条目的 唯一标识符。此 HashMap 充当 活动或正在建立的 TCP 连接的内存中注册表。 HashMap 中的每个条目都包含映射到其当前 TCP 连接的 Quad 实例。该状态由 tcp.rs 中定义的枚举类型表示,包含四种 TCP 连接状态之一: ClosedListenSynRcvd ,或 Estab

我们引入两个主要的处理数据包方法: on_accepton_packeton_accept 方法负责处理发起 新连接的传入数据包。相反, on_packet 方法管理 现有连接的数据包。这两种方法都会记录基本信息,例如源和目标 IP 地址和端口以及有效负载长度。最后,在 main.rs 中,我们根据传入数据包利用模式匹配来区分新连接和现有连接。

我们正在稳步取得进展。到目前为止,已经确保接收到正确的 IPv4 数据包,并且已经实现了一种机制,将传入数据包与其各自的状态相关联,并由唯一的连接四元组作为键。我们的下一个目标是专注于实现 TCP 握手,这是保证客户端和服务器之间建立可靠连接的关键步骤。客户端通过发送 SYN(同步)数据包来启动此过程,而服务器则监听这些传入请求。此握手是一个 三阶段过程,涉及 SYN 数据包、随后的 SYN-ACK(同步确认)数据包和最后的 ACK(确认)数据包。在此阶段,我们将增强 accept 方法来管理这些不同类型的握手数据包。

// main.rs
// Required imports
use std::io;
mod tcp;  // Importing the tcp module (defined below)
use std::collections::HashMap;
use std::net::Ipv4Addr;

// Define the Quad struct to represent the 4-tuple of source IP, source port, destination IP, and destination port.
#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
struct Quad {
    src: (Ipv4Addr, u16),  // Source IP and port
    dst: (Ipv4Addr, u16),  // Destination IP and port
}

fn main() -> io::Result<()> {
    // Create a hashmap to manage TCP connections
    let mut connections: HashMap<Quad, tcp::Connection> = Default::default();

    // Create a new virtual interface in TUN mode.
    // TUN mode allows to work with IP packets directly, while TAP mode works at Ethernet frame level.
    let mut nic = tun_tap::Iface::without_packet_info("tun0", tun_tap::Mode::Tun)?;
    
    // Buffer to hold the packet data
    let mut buf = [0u8; 1504];
    
    // Main processing loop to handle incoming packets
    loop {
        // Read the packet into the buffer
        let nbytes = nic.recv(&mut buf[..])?;

        // Parse the IP header from the packet. If successful, it gives a slice of the IP header.
        match etherparse::Ipv4HeaderSlice::from_slice(&buf[..nbytes]) {
            Ok(iph) => {
                // Parsing of the IP header was successful. Further processing happens here.
                // -----SKIP----- (omitting some IP processing steps as indicated)

                // Attempt to parse the TCP header from the packet.
                match etherparse::TcpHeaderSlice::from_slice(&buf[iph.slice().len()..nbytes]) {
                    Ok(tcph) => {
                        // Parsing of the TCP header was successful. Continue processing.
                        // Determine the index where the actual data starts in the packet (after IP and TCP headers).
                        let datai = iph.slice().len() + tcph.slice().len();
                        
                        // Lookup the connection using the Quad (4-tuple) as the key.
                        match connections.entry(Quad {
                            src: (src, tcph.source_port()),
                            dst: (dst, tcph.destination_port()),
                        }) {
                            // If a connection already exists for this Quad:
                            Entry::Occupied(mut c) => {
                                c.get_mut()
                                    .on_packet(&mut nic, iph, tcph, &buf[datai..nbytes])?;
                            }
                            // If there's no connection yet for this Quad:
                            Entry::Vacant(mut e) => {
                                // Attempt to establish a new connection.
                                if let Some(c) = tcp::Connection::accept(
                                    &mut nic,
                                    iph,
                                    tcph,
                                    &buf[datai..nbytes],
                                )? {
                                    e.insert(c);
                                }
                            }
                        }
                    }
                    Err(e) => {
                        // Handle TCP parsing errors.
                        eprintln!("An error occurred while parsing TCP packet {:?}", e);
                    }
                }
            }
            Err(e) => {
                // Handle IP parsing errors.
                eprintln!("An error occurred while parsing IP packet {:?}", e);
            }
        }
    }
}

//tcp.rs
use std::io;
use std::io::prelude::*;

/// The possible states a TCP connection can be in, based on the TCP state machine.
/// It's a subset of the states available in the full TCP state diagram.
pub enum State {
    Closed,
    Listen,
    SynRcvd,
    Estab,
}

pub struct Connection {
    /// The current state of the TCP connection.
    state: State,
    /// The sequence space for sent data. It keeps track of various sequence numbers for data we've sent.
    send: SendSequenceSpace,
    /// The sequence space for received data. It keeps track of sequence numbers for data we're receiving.
    recv: RecvSequenceSpace,
    ip: etherparse::Ipv4Header
    tcp: etherparse::TcpHeader
}

/// Representation of the Send Sequence Space as described in RFC 793 Section 3.2 Figure 4.
/// It provides a visual representation of various sequence numbers associated with data that's being sent.
///
/// ```
///            1         2          3          4
///       ----------|----------|----------|----------
///              SND.UNA    SND.NXT    SND.UNA
///                                   +SND.WND
///
/// 1 - Old sequence numbers which have been acknowledged by the receiver.
/// 2 - Sequence numbers of unacknowledged data that has been sent.
/// 3 - Sequence numbers allowed for transmitting new data.
/// 4 - Future sequence numbers that are not allowed for transmission yet.
/// ```
struct SendSequenceSpace {
    /// SND.UNA: Oldest sequence number not yet acknowledged by the receiver.
    una: u32,
    /// SND.NXT: Next sequence number to be used for new data for transmission.
    nxt: u32,
    /// SND.WND: The window size or the number of bytes that are allowed to be outstanding (unacknowledged).
    wnd: u16,
    /// Indicates if the URG control bit is set. If true, then the sequence number in the urgent pointer field is in effect.
    up: bool,
    /// Sequence number of the segment used for the last window update. 
    wl1: usize,
    /// Acknowledgment number used for the last window update.
    wl2: usize,
    /// Initial send sequence number. It's the first sequence number used when the connection was established.
    iss: u32,
}

/// Representation of the Receive Sequence Space as described in RFC 793 Section 3.2 Figure 5.
/// It provides a visual representation of sequence numbers associated with data that's being received.
///
/// ```
///                1          2          3
///            ----------|----------|----------
///                   RCV.NXT    RCV.NXT
///                             +RCV.WND
///
/// 1 - Old sequence numbers which have been acknowledged.
/// 2 - Sequence numbers allowed for receiving new data.
/// 3 - Future sequence numbers which are not allowed for reception yet.
/// ```
struct RecvSequenceSpace {
    /// RCV.NXT: Next expected sequence number that the receiver is expecting.
    nxt: u32,
    /// RCV.WND: The number of bytes that the receiver is willing to accept.
    wnd: u16,
    /// Indicates if the URG control bit is set on received data.
    up: bool,
    /// Initial receive sequence number. The sequence number of the first byte received.
    irs: u32,
}

/// Default state for a TCP connection is set to `Listen`.
impl Default for State {
    fn default() -> Self {
        State::Listen
    }
}

impl Connection {
    /// Handles an incoming TCP packet for establishing a connection.
    /// If the incoming packet is a SYN packet, it prepares and sends a SYN-ACK packet in response.
    /// Otherwise, it ignores the packet.
    ///
    /// Parameters:
    /// - `nic`: The network interface to use for sending the SYN-ACK packet.
    /// - `iph`: The IPv4 header of the incoming packet.
    /// - `tcph`: The TCP header of the incoming packet.
    /// - `data`: The payload of the incoming packet.
    ///
    /// Returns:
    /// A new `Connection` in the `SynRcvd` state if the incoming packet was a SYN packet.
    /// Otherwise, returns `None`.
    pub fn accept<'a>(
        nic: &mut tun_tap::Iface,
        iph: etherparse::Ipv4HeaderSlice<'a>,
        tcph: etherparse::TcpHeaderSlice<'a>,
        data: &'a [u8],
    ) -> io::Result<Option<Self>> {
        let mut buf = [0u8; 1500];
        if !tcph.syn() {
            // Ignore packets that aren't SYN packets.
            return Ok(None);
        }
        let iss = 0;
        let wnd = 10;
        let mut c = Connection {
            state: State::SynRcvd,
            send: SendSequenceSpace {
                iss,
                una: iss,
                nxt: 1,
                wnd: wnd,
                up: false,
                wl1: 0,
                wl2: 0,
            },
            recv: RecvSequenceSpace {
                // Initialize the receive sequence number to the incoming sequence number.
                irs: tcph.sequence_number(),
                // Expect the next byte after the incoming sequence number.
                nxt: tcph.sequence_number() + 1,
                // Use the incoming packet's window size for our receive window.
                wnd: tcph.window_size(),
                up: false,
            },
        // TODO: Consider keeping track of sender info for future use.
        // Prepare a SYN-ACK packet in response to the SYN packet.
        tcp: etherparse::TcpHeader::new(
            tcph.destination_port(),
            tcph.source_port(),
            iss,
            wnd,
		),
            ip: etherparse::Ipv4Header::new(
            syn_ack.header_len(),
            64,
            etherparse::IpNumber::Tcp as u8,
            [
                iph.destination()[0],
                iph.destination()[1],
                iph.destination()[2],
                iph.destination()[3],
            ],
            [
                iph.source()[0],
                iph.source()[1],
                iph.source()[2],
                iph.source()[3],
            ],
        )
        };

        c.tcp.acknowledgment_number = c.recv.nxt;
        c.tcp.syn = true;
        c.tcp.ack = true;
        
        c.ip.set_payload_len(c.tcp.header_len() as usize + 0);

        // Calculate and set the checksum for the SYN-ACK packet.
        c.tcp.checksum = c.tcp
            .calc_checksum_ipv4(&c.ip, &[])
            .expect("Failed to compute checksum");
        
        // Write out the TCP and IP headers to a buffer to be sent.
        let unwritten = {
            let mut unwritten = &mut buf[..];
            ip.write(&mut unwritten);
            syn_ack.write(&mut unwritten);
            unwritten.len()
        };

        // Send the SYN-ACK packet.
        nic.send(&buf[..unwritten])?;
        Ok(Some(c))
    }

    /// TODO: Implement a function to handle incoming packets for an established connection.


    // Function to handle incoming packets once a connection is established.
    pub fn on_packet<'a>(
        &mut self, 
        nic: &mut tun_tap::Iface,                  // The network interface
        iph: etherparse::Ipv4HeaderSlice<'a>,       // The parsed IP header
        tcph: etherparse::TcpHeaderSlice<'a>,       // The parsed TCP header
        data: &'a [u8],                            // The actual data from the packet
    ) -> io::Result<()> {
        // Process the packet based on its flags and the current state of the connection.
        // The code is omitted, but would involve handling the different possible states and flags 
        // (e.g., ACK, FIN, RST) as per the TCP state machine.
        // ----SKIP----
        Ok(())
    }

    // Function to send a SYN-ACK packet.

}

在从包含实现框架的代码过渡到建立 TCP 握手的充实版本的过程中,经历了重大的架构演变。该架构的核心是 State 枚举和 tcp.rs 中的 Connection 结构体的扩展。之前, State 枚举是四种可能的 TCP 状态的简单表示。在新的实现中,重点转移到建立更强大的连接状态表示。 Connection 结构已扩展为现在将 SendSequenceSpaceRecvSequenceSpace 的实例作为其属性。这些结构对于 TCP 序列号和确认号的登记 很有帮助,这对于数据的可靠传输至关重要。让我们深入了解一下具体情况。

SendSequenceSpace 结构体封装了几个对于 TCP 连接发送端至关重要的变量。其中关键是:

  • una (send unacknowledged): una (发送未确认):发送缓冲区中尚未收到确认的最早序列号。

  • nxt (send next): nxt (发送下一个):用于新数据的下一个序列号。

  • wnd (send window): wnd (发送窗口):指定接收方可接受的序列号范围。这些字段对于管理流量控制、确保可靠的数据传输以及实现滑动窗口等功能至关重要。

    作为补充, RecvSequenceSpace 结构处理接收端。它拥有以下字段:

  • nxt (receive next): nxt (接收下一个):下一个传入数据包的预期序列号。

  • wnd (receive window): wnd (接收窗口):通告发送方可接受的序列号范围的窗口大小。

    值得注意的是, SendSequenceSpace 和 RecvSequenceSpace 都包含初始序列号(分别为 iss 和 irs )。这些在连接建立阶段非常关键,特别是在 TCP 三向握手过程中。

我们开始实现 accept 方法,该方法现已增强以处理 SYN 数据包,从而通过将 SYN-ACK 数据包分派回客户端来启动三向握手。为了实现这一目标,使用 etherparse 库函数来构建和发送 TCP/IP 头,使我们的实现与协议规范紧密结合。

顺便说一句 - 值得注意的是,在当前的实现中, connections HashMap 容易受到 SYN 洪水攻击。在此类攻击中,攻击者可以发送大量 TCP SYN(同步)数据包,每个数据包具有不同的源地址和端口,但都针对相同的目的地。由于 connections HashMap 自动为每个唯一的 Connection 创建一个新条目,因此攻击者可以通过使用大量虚假条目填充 HashMap 来轻松耗尽系统内存。这可能会导致资源耗尽,并最终导致拒绝服务 (DoS)。

在生产级 TCP 实施中,通常会采取其他措施来减轻此类风险。这可以包括使用 syn cookie - 一种无状态方法,其中服务器在握手完成之前不会为 SYN 请求分配资源。然而,对于这个项目,我们不会采取任何措施来防止 syn 洪水攻击。

此时,您可能会好奇并看到我们编写的这么长的代码的实际效果。我们应该继续测试应用程序。首先,通过运行 run.sh 脚本来启动程序,该脚本将构建并执行二进制文件并为其提供必要的提升网络访问权限,接下来,将使用 Netcat 尝试与应用程序建立 TCP 连接,最后为了可视化,将使用 tshark 通过运行 tshark -I tun0 来监视和捕获 tun0 接口上的数据包。

1   0.000000   fe80::a2b3:c4d5:e6f7 -> ff02::2  ICMPv6 110 Router Solicitation from a2:b3:c4:d5:e6:f7
2   0.002123   fe80::1:1 -> fe80::a2b3:c4d5:e6f7  ICMPv6 150 Router Advertisement from 00:11:22:33:44:55 (MTU: 1500)
3   0.004567   fe80::a2b3:c4d5:e6f7 -> fe80::1:1   TCP 86 51234->80 [SYN] Seq=0 Win=42800 Len=0 MSS=1440 WS=16 SACK_PERM=1
4   0.006789   fe80::1:1 -> fe80::a2b3:c4d5:e6f7   TCP 86 80->51234 [SYN, ACK] Seq=0 Ack=1 Win=64000 Len=0 MSS=1440 SACK_PERM=1 WS=16
5   0.006999   fe80::a2b3:c4d5:e6f7 -> fe80::1:1   TCP 66 51234->80 [ACK] Seq=1 Ack=1 Win=43000 Len=0

如果一切顺利,应该会看到与于上面看到的类似的内容。这表明主机首先请求路由器信息,并从路由器获得答复。之后,将看到建立 TCP 连接的步骤。

下一步计划是什么?如果您留意的话,您会注意到我们已经解决了 TCP 三向握手的最初两个步骤。当来自客户端的 SYN 到达我们这里时,服务器会立即用 SYN-ACK 进行回复。发送 SYN-ACK 后,服务器优雅地进入 SynRcvd 状态,准备好等待客户端的 ACK 来 完成握手协议。当捕获并处理这个 ACK​​ 时,理想情况下会将连接转换到 Established 状态,标志着一个成熟的 TCP 连接的诞生。然而,这里缺少一块拼图:我们的代码仍在等待中,尚未处理客户端的 ACK。这是我们下一个目标。

参考

原文地址

转载自:https://juejin.cn/post/7336093196354273299
评论
请登录