dev

KCP 源码剖析

Posted by Disenone on December 17, 2019

阅读本文之前,如果没听说过 KCP ,或者一点都不了解 KCP,麻烦抽一点时间先看看 KCP 项目的说明文档:传送门。本文的目的是深入 KCP 的实现细节去理解 KCP 。

什么是 KCP

KCP 是一个快速可靠协议,能够以比 TCP 更低的延迟来传送数据,数据重传更快,等待时间更短。

TCP是为流量设计的(每秒内可以传输多少KB的数据),讲究的是充分利用带宽。而 KCP是为流速设计的(单个数据包从一端发送到一端需要多少时间),以10%-20%带宽浪费的代价换取了比 TCP快30%-40%的传输速度。TCP信道是一条流速很慢,但每秒流量很大的大运河,而KCP是水流湍急的小激流

以上是 KCP 文档上面写的,关键词是带宽流速,KCP 会损耗带宽,带来的好处是更大更均衡的传输速率。更多的说明参考 KCP 自身的文档。

KCP 数据结构

KCP 源码在 ikcp.hikcp.c 里面,ikcp.h 核心的是数据结构的声明,首先是 SEGMENT 数据包,是 KCP 协议处理数据的最小单位:

SEGMENT 结构
//=====================================================================
// SEGMENT 一个 SETMENT 就是一个数据包
//=====================================================================
struct IKCPSEG
{
    // 链表节点,发送和接受队列都是这里的链表的结构
    struct IQUEUEHEAD node;

    // 会话编号,同一个会话编号相同
    IUINT32 conv;

    // 数据包类型,譬如 DATA 或者 ACK
    IUINT32 cmd;

    // 由于 MTU 的限制,大数据包会拆分成多个小数据包,这个是小数据包的编号
    IUINT32 frg

    // 每个数据包,都会附带上发送方的接受窗口大小
    IUINT32 wnd;

    // 发送时间,如果是 ACK 包,会设置为源数据包的 ts
    IUINT32 ts;

    // 唯一标识数据包的编号
    IUINT32 sn;

    // 代表小于 una 的数据包都接收成功,跟 TCP 含义一致:oldest unacknowledged sequence number SND
    IUINT32 una;

    // 数据长度
    IUINT32 len;

    // 超时重传时间
    IUINT32 resendts;

    // 下次超时等待时间
    IUINT32 rto;

    // 快速重传,收到本数据包之后的数据包的数量,大于一定数量就触发快速重传
    IUINT32 fastack;

    // 发送次数
    IUINT32 xmit;

    // 数据
    char data[1];
};

看完 SEGMENT 的注释,大致能看出 KCP 的核心也是一个 ARQ 协议,通过自动超时重传来保证数据的送达。接着再来看看 KCP 结构 KCPCB 的定义:

KCP 结构
//---------------------------------------------------------------------
// IKCPCB
//---------------------------------------------------------------------
struct IKCPCB
{
    // conv: 会话编号
    // mtu, mss: 最大传输单元,最大报文段大小
    // state: 会话状态,0 有效,-1 断开
    IUINT32 conv, mtu, mss, state;

    // snd_una: 等待 ACK 的包编号
    // snd_nxt: 下一个等待发送的数据包编号
    // rcv_nxt: 下一个等待接收的数据包编号
    IUINT32 snd_una, snd_nxt, rcv_nxt;

    // ts_recent, ts_lastack: 未用到
    // ssthresh: 拥塞控制慢启动阈值
    IUINT32 ts_recent, ts_lastack, ssthresh;

    // rx_rto: rto (retransmission timeout),超时重传时间
    // rx_rttval, rx_srtt, rx_minrto: 计算 rto 的中间变量
    IINT32 rx_rttval, rx_srtt, rx_rto, rx_minrto;

    // snd_wnd, rcv_wnd: 最大发送和接收窗口大小
    // rmt_wnd: remote wnd ,对端剩余接受窗口大小
    // cwnd: 可发送窗口大小
    // probe: 是否要发送控制报文的标志
    IUINT32 snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe;

    // current: 当前时间
    // interval: 更新间隔
    // ts_flush: 下次需要更新的时间
    // xmit: 发送失败次数
    IUINT32 current, interval, ts_flush, xmit;

    // 对应链表的长度
    IUINT32 nrcv_buf, nsnd_buf;
    IUINT32 nrcv_que, nsnd_que;

    // nodelay: 控制超时重传的 rto 增长速度
    // updated: 是否调用过 ikcp_update
    IUINT32 nodelay, updated;

    // ts_probe, probe_wait: 对端接收窗口长时间为 0 时主动定期发起询问
    IUINT32 ts_probe, probe_wait;

    // deal_link: 对端长时间无应答
    // incr: 参与计算发送窗口大小
    IUINT32 dead_link, incr;

    // queue: 跟用户层接触的数据包
    // buf: 协议缓存的数据包
    struct IQUEUEHEAD snd_queue;
    struct IQUEUEHEAD rcv_queue;
    struct IQUEUEHEAD snd_buf;
    struct IQUEUEHEAD rcv_buf;

    // 需要发送 ack 的数据包信息
    IUINT32 *acklist;

    // 需要 ack 的包数量
    IUINT32 ackcount;

    // acklist 内存大小
    IUINT32 ackblock;

    // 用户层传进来的数据
    void *user;

    // 存放一个 kcp 包的空间
    char *buffer;

    // 触发快速重传的 fastack 次数
    int fastresend;

    // 快速重传最大次数
    int fastlimit;

    // nocwnd: 不考虑慢启动的发送窗口大小
    // stream: 流模式
    int nocwnd, stream;

    // debug log
    int logmask;

    // 发送数据接口
    int (*output)(const char *buf, int len, struct IKCPCB *kcp, void *user);

    void (*writelog)(const char *log, struct IKCPCB *kcp, void *user);
};

逐一把 KCP 结构里面的字段注释上,可以初步感觉到,整套 KCP 的协议不太复杂,细细去分析代码,你我都能读懂并理解 KCP 协议 :smile:

KCP 的 ARQ 实现

KCP 本质上是一个 ARQ (Auto Repeat-reQuest,自动重传) 协议,最基本的是要保证可靠的传输。那么我们可以先来关注 KCP 的基本 ARQ 部分,KCP 是怎么实现可靠传输的。

ARQ 顾名思义,当我们认为对端接收数据包失败时,自动重新发送对应的数据包,它是通过确认接收和超时重传两个机制,来实现可靠传输。具体的代码实现上, KCP 给每个数据包(就是上一节提到的 SEGMENT ) 分配唯一的 sn 标识符,一旦对端接收到数据包,会回复一个 ACK 包(同样是 SEGMENT),ACK 包的 sn 跟接收到的数据包 sn 相同,通知接收到此数据包已经接收成功。SEGMENT 上还有一个 una 字段,表示下一个期待接收的数据包的编号,换句话说,即是所有在该编号之前的数据包都已经接收完,相当于一个全量的 ACK 包,发送端可以更快的更新发送缓冲和发送窗口。

我们可以通过跟踪 KCP 包的发送和接受代码,来理解最基本的 ARQ 实现:

发送

发送的过程是 ikcp_send -> ikcp_update -> ikcp_output,上层调用 ikcp_send 把数据传给 KCP,KCP 在 ikcp_update 中处理数据的发送。

ikcp_send
//---------------------------------------------------------------------
// 发送数据接口,用户调用 ikcp_send 来让 kcp 发送数据
// user/upper level send, returns below zero for error
//---------------------------------------------------------------------
int ikcp_send(ikcpcb *kcp, const char *buffer, int len)
{
    IKCPSEG *seg;
    int count, i;

    // mss 不能小于1
    assert(kcp->mss > 0);
    if (len < 0) return -1;

    // append to previous segment in streaming mode (if possible)
    if (kcp->stream != 0) {
        // 处理流模式
        // ......
    }

    // 计算分包,如果数据长度 len 大于 mss,需要分成多个包发送,对端接受到之后再拼起来
    if (len <= (int)kcp->mss) count = 1;
    else count = (len + kcp->mss - 1) / kcp->mss;

    if (count >= (int)IKCP_WND_RCV) return -2;

    if (count == 0) count = 1;

    // 分包
    for (i = 0; i < count; i++) {
        // 计算包的数据长度,并分配对应的 seg 结构
        int size = len > (int)kcp->mss ? (int)kcp->mss : len;
        seg = ikcp_segment_new(kcp, size);
        assert(seg);
        if (seg == NULL) {
            return -2;
        }

        // 设置 seg 的 数据信息,frg 表示分包编号
        if (buffer && len > 0) {
            memcpy(seg->data, buffer, size);
        }
        seg->len = size;
        seg->frg = (kcp->stream == 0)? (count - i - 1) : 0;

        // 加到 snd_queue 的末尾,nsnd_qua 加一
        iqueue_init(&seg->node);
        iqueue_add_tail(&seg->node, &kcp->snd_queue);
        kcp->nsnd_que++;
        if (buffer) {
            buffer += size;
        }
        len -= size;
    }

    return 0;
}

ikcp_send 是由 KCP 的上层来调用的发送数据接口,所有让 KCP 发送的数据,都应该通过这个接口。 ikcp_send 做的事情很简单,主要就是把数据,根据 kcp->mss (一个包最大数据长度)来分成多个包,并设置分包编号,最后放到发送链表 snd_queue 的末尾。流模式就是把多次调用 ikcp_send 的数据都看成一个流,会先自动填充未满的 SEGMENT 再分配新的,详细实现本文不讨论,感兴趣的,相信看完本文,再对应看看代码就能理解。

ikcp_send 调用完成之后,数据放在的 KCP 的 snd_queue 中,那么后面 KCP 需要找个时机,把待发送的数据发送出去,这块代码都放在 ikcp_updateikcp_flush 里面:

ikcp_update
//---------------------------------------------------------------------
// ikcp_update 是给上层定期调用的接口,用来更新 kcp 的状态,发送数据
// update state (call it repeatedly, every 10ms-100ms), or you can ask 
// ikcp_check when to call it again (without ikcp_input/_send calling).
// 'current' - current timestamp in millisec. 
//---------------------------------------------------------------------
void ikcp_update(ikcpcb *kcp, IUINT32 current)
{
    IINT32 slap;

    kcp->current = current;

    // ikcp_flush 会检查这个,上层必须调用过 ikcp_update 才能调用 ikcp_flush,建议只使用 ikcp_update
    if (kcp->updated == 0) {
        kcp->updated = 1;
        kcp->ts_flush = kcp->current;
    }

    slap = _itimediff(kcp->current, kcp->ts_flush);

    if (slap >= 10000 || slap < -10000) {
        kcp->ts_flush = kcp->current;
        slap = 0;
    }

    if (slap >= 0) {
        // 下次 flush 的时间
        kcp->ts_flush += kcp->interval;
        if (_itimediff(kcp->current, kcp->ts_flush) >= 0) {
            kcp->ts_flush = kcp->current + kcp->interval;
        }
        ikcp_flush(kcp);
    }
}

ikcp_update 做的事情很简单,判断一下 ts_flush 的时间,符合条件则调用 ikcp_flush,主要的处理逻辑都在 ikcp_flush 里面了,因为 ikcp_flush 内容复杂一点,我们目前只关注跟 ARQ 发送相关的部分:

发送数据
//---------------------------------------------------------------------
// ikcp_flush
//---------------------------------------------------------------------
void ikcp_flush(ikcpcb *kcp)
{
    IUINT32 current = kcp->current;

    // buffer 是要传给 ikcp_output 的数据,初始化为 3 倍数据包大小
    char *buffer = kcp->buffer;
    char *ptr = buffer;
    int count, size, i;
    IUINT32 resent, cwnd;
    IUINT32 rtomin;
    struct IQUEUEHEAD *p;
    int change = 0;
    int lost = 0;
    IKCPSEG seg;

    // 'ikcp_update' haven't been called.
    if (kcp->updated == 0) return;

    seg.conv = kcp->conv;
    seg.cmd = IKCP_CMD_ACK;
    seg.frg = 0;

    // seg.wnd 是表示当前可接收窗口大小
    seg.wnd = ikcp_wnd_unused(kcp);
    seg.una = kcp->rcv_nxt;
    seg.len = 0;
    seg.sn = 0;
    seg.ts = 0;

    // 发送 ack
    // 计算 发送窗口
    //...

    // 把数据包从 snd_queue 移动到 snd_buf
    // 移动是需要满足 发送窗口 大小,发送窗口满了,就停止移动
    // 放在 snd_buf 的里面的数据,就是可以直接调用 ikcp_output 给对端发送的数据
    while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) {
        IKCPSEG *newseg;
        if (iqueue_is_empty(&kcp->snd_queue)) break;

        newseg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node);

        iqueue_del(&newseg->node);
        iqueue_add_tail(&newseg->node, &kcp->snd_buf);
        kcp->nsnd_que--;
        kcp->nsnd_buf++;

        newseg->conv = kcp->conv;
        newseg->cmd = IKCP_CMD_PUSH;
        newseg->wnd = seg.wnd;
        newseg->ts = current;

        // seg 唯一序号,其实就是一个递增的 kcp->snd_nxt
        newseg->sn = kcp->snd_nxt++;

        // una 在这里设置,通知对端下一个等待接收的包序号
        newseg->una = kcp->rcv_nxt;
        newseg->resendts = current;
        newseg->rto = kcp->rx_rto;
        newseg->fastack = 0;
        newseg->xmit = 0;
    }

    // 计算快速重传标志,超时等待时间
    // ...

    // 发送 snd_buf
    for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
        IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node);
        int needsend = 0;
        if (segment->xmit == 0) {
            // 首次发送
            // set->xmit 表示发送次数
            // resendts 超时重传的等待时间
            needsend = 1;
            segment->xmit++;
            segment->rto = kcp->rx_rto;
            segment->resendts = current + segment->rto + rtomin;
        }
        else if (_itimediff(current, segment->resendts) >= 0) {
            // 超时重传
            // ...
        }
        else if (segment->fastack >= resent) {
            // 快速重传
            // ...
        }

        if (needsend) {
            int need;
            segment->ts = current;
            segment->wnd = seg.wnd;
            segment->una = kcp->rcv_nxt;

            size = (int)(ptr - buffer);
            need = IKCP_OVERHEAD + segment->len;

            // 每当 buffer 中的数据超过 mtu ,那就先发出去,尽量避免底层再分包
            if (size + need > (int)kcp->mtu) {
                ikcp_output(kcp, buffer, size);
                ptr = buffer;
            }

            // 把 seg 控制数据复制到 buffer 上,kcp 自己来处理大小端问题
            ptr = ikcp_encode_seg(ptr, segment);

            // 再复制数据
            if (segment->len > 0) {
                memcpy(ptr, segment->data, segment->len);
                ptr += segment->len;
            }


            if (segment->xmit >= kcp->dead_link) {
                kcp->state = (IUINT32)-1;
            }
        }
    }

    // flash remain segments
    size = (int)(ptr - buffer);
    if (size > 0) {
        ikcp_output(kcp, buffer, size);
    }

    // 计算 ssthresh,更新慢启动窗口
    // ...
}

我们目前只关注 ikcp_flush 里面有关发送数据的逻辑:

  • 首先 KCP 会根据对端的接收窗口大小,把 snd_queue 上的数据移动到 snd_buf 上面,计算移动数量的公式是 num = snd_nxt - (snd_una + cwnd),也就是:已发送成功的最大包序号 snd_una 加上 滑动窗口大小 cwnd 大于 下个待发送的包序号snd_nxt,则可以继续再发送新的数据包。移动 SEG 的同时,设置控制字段。

  • 遍历 snd_buf,如果需要发送数据包,则把数据复制到 buffer 上,复制的同时用 ikcp_encode_seg 处理控制字段数据的大小端问题。

  • 最后调用 ikcp_outputbuffer 上的数据发送出去

至此, KCP 完成数据的发送。

接收

接收的过程是跟发送相反的:ikcp_input -> ikcp_update -> ikcp_recv,用户接收到网络上的数据之后,需要调用 ikcp_input 传给 KCP 解析,调用 ikcp_update 的时候会给发送端回复 ACK 包,上层通过调用 ikcp_recv 来接收 KCP 解析之后的数据。

接收数据
//---------------------------------------------------------------------
// input data
//---------------------------------------------------------------------
int ikcp_input(ikcpcb *kcp, const char *data, long size)
{
    IUINT32 prev_una = kcp->snd_una;
    IUINT32 maxack = 0, latest_ts = 0;
    int flag = 0;

    // 合法性检查
    if (data == NULL || (int)size < (int)IKCP_OVERHEAD) return -1;

    // data 可能是多个 KCP 包,循环处理
    while (1) {
        IUINT32 ts, sn, len, una, conv;
        IUINT16 wnd;
        IUINT8 cmd, frg;
        IKCPSEG *seg;

        // 不够一个 KCP 包,退出
        if (size < (int)IKCP_OVERHEAD) break;

        // 先把控制字段解析出来
        data = ikcp_decode32u(data, &conv);
        if (conv != kcp->conv) return -1;

        data = ikcp_decode8u(data, &cmd);
        data = ikcp_decode8u(data, &frg);
        data = ikcp_decode16u(data, &wnd);
        data = ikcp_decode32u(data, &ts);
        data = ikcp_decode32u(data, &sn);
        data = ikcp_decode32u(data, &una);
        data = ikcp_decode32u(data, &len);

        size -= IKCP_OVERHEAD;

        if ((long)size < (long)len || (int)len < 0) return -2;

        // 数据包类型检查
        if (cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK &&
            cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS) 
            return -3;

        kcp->rmt_wnd = wnd;

        // 这里的 una 是发送方的 kcp->rcv_nxt,根据这个数据,可以去掉已确认接收的数据包
        ikcp_parse_una(kcp, una);
        // 去掉已确认接收的包后,更新 snd_una 下一个要发送的序号
        ikcp_shrink_buf(kcp);

        if (cmd == IKCP_CMD_ACK) {
            // ack 包
            // ...
        }
        else if (cmd == IKCP_CMD_PUSH) {
            // 数据包
            // 如果接收到的数据包序号 sn,在接收窗口内,则正常处理,否则直接丢弃,等重传
            if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) < 0) {

                // 接收到的每个数据包,都要回一个 ack 包,记录下来
                ikcp_ack_push(kcp, sn, ts);

                // 接收的数据调用 ikcp_parse_data 处理
                if (_itimediff(sn, kcp->rcv_nxt) >= 0) {
                    seg = ikcp_segment_new(kcp, len);
                    seg->conv = conv;
                    seg->cmd = cmd;
                    seg->frg = frg;
                    seg->wnd = wnd;
                    seg->ts = ts;
                    seg->sn = sn;
                    seg->una = una;
                    seg->len = len;

                    if (len > 0) {
                        memcpy(seg->data, data, len);
                    }

                    ikcp_parse_data(kcp, seg);
                }
            }
        }
        else if (cmd == IKCP_CMD_WASK) {
            // 查询窗口包
            // ...
        }
        else if (cmd == IKCP_CMD_WINS) {
            // 查询窗口的回复包
            // ...
        }
        else {
            return -3;
        }

        data += len;
        size -= len;
    }

    // 处理快速重传逻辑
    // ...

    // 更新发送窗口
    // ...

    return 0;
}

ikcp_input 循环处理每一个 SEG 包,先检查数据包的合法性和类型,因为每个数据包都会带上 una,存放的是发送端等待接收的包序号,需要小于 una 的包对端都已经接受成功,所以可以把 snd_buff 中需要小于 una 的都删掉,并更新 snd_nxt,这一部分由 ikcp_parse_unaikcp_shrink_buf 来处理。接收到的每个数据包,都需要回复 ACK 包,由 ikcp_ack_push 记录下来,最后调用 ikcp_parse_data 处理数据。

解析数据
void ikcp_parse_data(ikcpcb *kcp, IKCPSEG *newseg)
{
    struct IQUEUEHEAD *p, *prev;
    IUINT32 sn = newseg->sn;
    int repeat = 0;

    // 序号检查
    if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) >= 0 ||
        _itimediff(sn, kcp->rcv_nxt) < 0) {
        ikcp_segment_delete(kcp, newseg);
        return;
    }

    // 找出 newseg 应该放置的位置,因为接收到的 seg 可能是乱序的
    for (p = kcp->rcv_buf.prev; p != &kcp->rcv_buf; p = prev) {
        IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node);
        prev = p->prev;
        if (seg->sn == sn) {
            // 重复收到
            repeat = 1;
            break;
        }
        if (_itimediff(sn, seg->sn) > 0) {
            break;
        }
    }

    // 把 newseg 放到 rcv_buf 正确的位置上
    if (repeat == 0) {
        iqueue_init(&newseg->node);
        iqueue_add(&newseg->node, p);
        kcp->nrcv_buf++;
    }    else {
        ikcp_segment_delete(kcp, newseg);
    }

    // 把数据从 rcv_buf 移动到 rcv_queue
    while (! iqueue_is_empty(&kcp->rcv_buf)) {
        IKCPSEG *seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node);
        // 如果 seg 序号是等待接收的序号,移动到 rcv_queue
        if (seg->sn == kcp->rcv_nxt && kcp->nrcv_que < kcp->rcv_wnd) {
            iqueue_del(&seg->node);
            kcp->nrcv_buf--;
            iqueue_add_tail(&seg->node, &kcp->rcv_queue);
            kcp->nrcv_que++;
            kcp->rcv_nxt++;
        }    else {
            break;
        }
    }
}

ikcp_parse_data 主要的工作就是把 newseg 放置到 kcp->rcv_buf 合适的位置上,并把数据从 rcv_buf 移动到 rcv_queuercv_buf 合适的位置的意思是,rcv_buf 是按照 sn 的递增顺序排列的,newseg 需要根据自己的 sn 大小查找合适的位置。rcv_buf 上的数据要移动到 rcv_queue,条件是 rcv_buf 上的数据包序号,等于 KCP 在等待接收的包序号 kcp->rcv_nxt ,移动一个数据包之后,需要更新 kcp->rvc_nxt,再处理下一个数据包。

ikcp_input 之后,上层调用 ikcp_update 时候会发送 ACK 包,调用 ikcp_recv 会给上层返回有效数据。ikcp_updateikcp_recv 互相独立,没有调用顺序要求,视上层的调用时机而定。我们先来看 ikcp_update 里面有关 ACK 发送的部分:

回复 ACK
// 前面说过,ikcp_update 最终是调用 ikcp_flush
void ikcp_flush(ikcpcb *kcp, IUINT32 current)
{
    // ...

    // 回复 ACK 包
    count = kcp->ackcount;
    for (i = 0; i < count; i++) {
        size = (int)(ptr - buffer);
        if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
            ikcp_output(kcp, buffer, size);
            ptr = buffer;
        }
        ikcp_ack_get(kcp, i, &seg.sn, &seg.ts);
        ptr = ikcp_encode_seg(ptr, &seg);
    }

    kcp->ackcount = 0;

    // ...
}

ACK 包的之前已经由 ikcp_ack_push 保存起来了,所以这里只需要 ikcp_ack_get 获取每个 ACK 包的信息,发送给对方。上层可以使用 ikcp_recv 从 KCP 获取数据:

ikcp_recv
//---------------------------------------------------------------------
// user/upper level recv: returns size, returns below zero for EAGAIN
//---------------------------------------------------------------------
int ikcp_recv(ikcpcb *kcp, char *buffer, int len)
{
    struct IQUEUEHEAD *p;
    int ispeek = (len < 0)? 1 : 0;
    int peeksize;
    int recover = 0;
    IKCPSEG *seg;
    assert(kcp);

    // 一些有效性检查
    if (iqueue_is_empty(&kcp->rcv_queue))
        return -1;
    if (len < 0) len = -len;

    // 计算能返回的数据长度
    peeksize = ikcp_peeksize(kcp);

    if (peeksize < 0)
        return -2;
    if (peeksize > len)
        return -3;

    // 判断下接收窗口
    if (kcp->nrcv_que >= kcp->rcv_wnd)
        recover = 1;

    // 遍历 rcv_queue,把数据复制到 buffer 上
    for (len = 0, p = kcp->rcv_queue.next; p != &kcp->rcv_queue; ) {
        int fragment;
        seg = iqueue_entry(p, IKCPSEG, node);
        p = p->next;

        if (buffer) {
            memcpy(buffer, seg->data, seg->len);
            buffer += seg->len;
        }

        len += seg->len;

        // 判断分包
        fragment = seg->frg;

        // 移除数据包
        if (ispeek == 0) {
            iqueue_del(&seg->node);
            ikcp_segment_delete(kcp, seg);
            kcp->nrcv_que--;
        }

        // 所有分包都复制完,退出循环
        if (fragment == 0)
            break;
    }

    assert(len == peeksize);

    // rcv_queue 又空了一些,尝试继续从 rcv_buf 移动到 rcv_queue
    while (! iqueue_is_empty(&kcp->rcv_buf)) {
        seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node);
        if (seg->sn == kcp->rcv_nxt && kcp->nrcv_que < kcp->rcv_wnd) {
            iqueue_del(&seg->node);
            kcp->nrcv_buf--;
            iqueue_add_tail(&seg->node, &kcp->rcv_queue);
            kcp->nrcv_que++;
            kcp->rcv_nxt++;
        }    else {
            break;
        }
    }

    return len;
}

ikcp_recv 一次调用只会返回一个完整的数据包,上层可以循环调用直到没有数据返回为止,函数的逻辑比较简单,就是从 rcv_queue 中复制数据到上层传进来的 buffer 里面,至此接收方对于接收到的数据包已经处理完毕。

接收方处理数据包的时候,给发送方发送了 ACK 包,我们再来看看发送方接受 ACK 包的处理:

处理 ACK 包
int ikcp_input(ikcpcb *kcp, const char *data, long size)
{
    // ...
    IUINT32 maxack = 0, latest_ts = 0;
    // ...
    while (1) {
        // ...
        // ts 是对端的 kcp-> current
        data = ikcp_decode32u(data, &ts);
        data = ikcp_decode32u(data, &sn);

        if (cmd == IKCP_CMD_ACK) {
            // 更新 rot
            if (_itimediff(kcp->current, ts) >= 0) {
                ikcp_update_ack(kcp, _itimediff(kcp->current, ts));
            }
            // 更新 snd_buf
            ikcp_parse_ack(kcp, sn);
            ikcp_shrink_buf(kcp);

            // maxack = 这次 input 的所有 ACK 包中最大的 sn
            if (flag == 0) {
                flag = 1;
                maxack = sn;
                latest_ts = ts;
            }    else {
                if (_itimediff(sn, maxack) > 0) {
                #ifndef IKCP_FASTACK_CONSERVE
                    maxack = sn;
                    latest_ts = ts;
                #else
                    if (_itimediff(ts, latest_ts) > 0) {
                        maxack = sn;
                        latest_ts = ts;
                    }
                #endif
                }
            }
        }
        // ...
    }

    // 如果有收到 ACK 包,记录用来做快速重传
    if (flag != 0) {
        ikcp_parse_fastack(kcp, maxack, latest_ts);
    }
}

可以看到接收到 ACK 包后同样会需要 ikcp_parse_ackikcp_shrink_buf 来更新 snd_buf,另外还需要调用 ikcp_update_ack 来计算更新 rto (retransmission timeout,超时重传时间)。ikcp_input 计算收到的 ACK 包中最大序号,来记录做快速重传用。就这样,发送方收到 ACK 包,把发送数据从 snd_buf 中移除,该数据包可靠地送达到了接收方,一次完整的 ARQ 确认接收过程结束。

超时重传

前面介绍的是 KCP 实现的 ARQ 中的 确认接收机制,ARQ 还需要一个超时重传来保证可靠性,下面我们来看看 KCP 是怎么做超时重传的。

让我们回到 ikcp_flush 函数:

超时重传
void ikcp_flush(ikcpcb *kcp)
{
    // ...
    // 发送 snd_buf
    for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
        IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node);
        int needsend = 0;
        if (segment->xmit == 0) {
            // 首次发送
            needsend = 1;
            segment->xmit++;
            // 设置 segment->rto
            // 通过 segment->rto 计算 segment->resendts 超时重传时间
            segment->rto = kcp->rx_rto;
            segment->resendts = current + segment->rto + rtomin;
        }
        else if (_itimediff(current, segment->resendts) >= 0) {
            // 超时重传
            needsend = 1;
            segment->xmit++;
            kcp->xmit++;
            // nodelay 控制下一次超时重传时间的计算
            if (kcp->nodelay == 0) {
                segment->rto += kcp->rx_rto;
            }    else {
                segment->rto += kcp->rx_rto / 2;
            }
            segment->resendts = current + segment->rto;
            lost = 1;
        }
        else if (segment->fastack >= resent) {
            // 快速重传
            // ...
        }
        if (needsend) {
            // 发送数据
            // ...
        }
    // ...
}

一旦当前时间 current 大于 segment->resendts 超时重传时间,说明在这段时间内,都没有收到接收方的 ACK 包,触发超时重传机制,needsend = 1,重新发送数据。

有了确认接收和超时重传机制,KCP 就可以保证基础的可靠数据传输了。但是为了能够保持更稳定的数据流速,KCP 还做了更多的事情,下面我们一起看看 KCP 还做了那些优化。

KCP 提升流速的策略

快速重传

发送方发送了序号为 snsn + 1 两个数据包,如果只收到了 sn + 1 的 ACK 包,那可能是因为 sn 的 ACK 包在网路中还没到达,又或者 ACK 包丢了,又或者 sn 数据包丢了,如果此时还没到超时重传的时间,网络也还不太拥堵,只是因为某种原因而突发丢包,那么发送方主动提前发送 sn 数据包,可以帮助接收方更快地接收数据,提高流速。

KCP 里面也相应实现了快速重传机制,也在 ikcp_flush 里面:

快速重传
void ikcp_flush(ikcpcb *kcp)
{
    // ...
    resent = (kcp->fastresend > 0)? (IUINT32)kcp->fastresend : 0xffffffff;

    // 发送 snd_buf
    for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
        IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node);
        int needsend = 0;
        if (segment->xmit == 0) {
            // ...
        }
        else if (_itimediff(current, segment->resendts) >= 0) {
            // ...
        }
        else if (segment->fastack >= resent) {
            // 快速重传
            if ((int)segment->xmit <= kcp->fastlimit ||
                kcp->fastlimit <= 0) {
                needsend = 1;
                segment->xmit++;
                segment->fastack = 0;
                segment->resendts = current + segment->rto;
                change++;
            }
        }
        if (needsend) {
            // 发送数据
            // ...
        }
    // ...
}

要出发快速重传,有两个条件:

  • segment->fastack >= resent,resent 是可配置的参数 kcp->fastresend,配置为 0 会关闭快速重传。segment->fastack 是在函数 ikcp_parse_fastack 里面设置的,这个函数是在 ikcp_input 里面调用,会根据 ikcp_input 算出的 maxack 来给所有 sn 小于 maxacksegment->fastack 加一,所以 segment->fastack 就是表示收到比 sn 大的包的次数。
  • segment->xmit <= kcp->fastlimit || kcp->fastlimit <= 0setgment->xmit 是发送次数,kcp->fastlimit 是可配置的最大快速重传次数,发送次数需要小于最大快速重传次数

一旦满足快速重传的以上条件,KCP 就会执行快速重传,要注意快速重传并不会重置超时重传时间,原来的超时时间依然会生效。

缩短超时重传时间

超时重传是个很好的机制,但就是太花时间了,按照 TCP 的策略,每次超时重传时间翻倍,等待时间膨胀得很快,在等待时间内,很可能由于接收端的接收窗口已耗尽,无法接收新数据,而等待重传的包序号是在最前面,接收方要接收到重传的包才能把所有数据返回给上层,这种情况,整个网路的流速几乎为 0。KCP 增加了配置可以减缓等待的时间增长,而且也不会是翻倍,通过配置 kcp->nodelay 控制每次等待时间只会增长 1 倍的 RTO 或者 0.5 倍的 RTO,有效减缓等待时间的增长,帮助网路尽快恢复流速。

更新发送窗口

发送窗口表示的是同时传输的数据包数量,窗口越大,同时传输的数据越多,流速越大,但窗口过大,会导致网络拥塞,丢包率上升,数据重传增多,流速下降。所以发送窗口需要根据网络情况不断更新,慢慢趋近最优。 KCP 中关于发送窗口的代码:

发送窗口
ikcpcb* ikcp_create(IUINT32 conv, void *user)
{
    // ...
    // snd_wnd,rcv_wnd 发送和接受的缓冲区大小
    kcp->snd_wnd = IKCP_WND_SND;    // 32
    kcp->rcv_wnd = IKCP_WND_RCV;    // 128
    // 对端接收窗口大小              // 128
    kcp->rmt_wnd = IKCP_WND_RCV
    // 发送窗口 cwnd 初始化 0
    kcp->cwnd = 0;
    // 发送窗口字节数大小,参与计算 cwnd
    kcp->incr = 0
    // 慢启动阈值,slow start threshold
    kcp->ssthresh = IKCP_THRESH_INIT;
    // nocwnd 是可配置参数,1 不考虑 cwnd
    kcp->nocwnd = 0;
    // ...
}

void ikcp_flush(ikcpcb *kcp)
{
    // ...
    // 发送数据时先计算 发送窗口大小,是发送缓冲区大小和对方接收窗口大小的小值
    cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd);
    // 默认还需要考虑 kcp->cwnd,即是不断更新的发送窗口
    if (kcp->nocwnd == 0) cwnd = _imin_(kcp->cwnd, cwnd);

    // 根据 cwnd 大小,snd_queue 移动到 snd_buf
    while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) {
    }
    // 发送数据
    resent = (kcp->fastresend > 0)? (IUINT32)kcp->fastresend : 0xffffffff;
    // 触发超时重传 lost = 1
    // 触发快速重传 change++

    // 更新慢启动阈值和发送窗口
    if (change) {
        // 如果有触发快速重传,ssthresh 设置为网络上正在传输的数据包数量的一半
        IUINT32 inflight = kcp->snd_nxt - kcp->snd_una;
        kcp->ssthresh = inflight / 2;
        if (kcp->ssthresh < IKCP_THRESH_MIN)
            kcp->ssthresh = IKCP_THRESH_MIN;

        // 发送窗口为阈值再加上快速重传相关的 resent
        kcp->cwnd = kcp->ssthresh + resent;
        kcp->incr = kcp->cwnd * kcp->mss;
    }

    if (lost) {
        // 如果有超时重传,触发慢启动, ssthresh 阈值为发送窗口的一半
        kcp->ssthresh = cwnd / 2;
        if (kcp->ssthresh < IKCP_THRESH_MIN)
            kcp->ssthresh = IKCP_THRESH_MIN;
        // 发送窗口回到 1,重新慢启动增长
        kcp->cwnd = 1;
        kcp->incr = kcp->mss;
    }

    if (kcp->cwnd < 1) {
        // 因为初始化为 0,来到这里会再设置成 1
        kcp->cwnd = 1;
        kcp->incr = kcp->mss;
    }
}

int ikcp_input(ikcpcb *kcp, const char *data, long size)
{
    IUINT32 prev_una = kcp->snd_una;
    // 处理接收的数据

    while (1) {
        // ...
        data = ikcp_decode16u(data, &wnd)
        // rmt_wnd 是对方的接收窗口大小
        kcp->rmt_wnd = wnd
        // ...
        // 处理数据
    }

    // 最后更新发送窗口
    // kcp->snd_una - prev_una > 0,表示本次 input 有接受到 ACK 并且发送缓冲 snd_buf 有变化
    if (_itimediff(kcp->snd_una, prev_una) > 0) {
        // 再判断对方的接收窗口
        if (kcp->cwnd < kcp->rmt_wnd) {
            IUINT32 mss = kcp->mss;

            if (kcp->cwnd < kcp->ssthresh) {
                // 小于慢启动阈值,双倍增长
                kcp->cwnd++;
                kcp->incr += mss;

            }    else {
                // 大于慢启动阈值之后,通过公式更新 incr ,进而计算 cwnd
                if (kcp->incr < mss) kcp->incr = mss;
                kcp->incr += (mss * mss) / kcp->incr + (mss / 16);
                if ((kcp->cwnd + 1) * mss <= kcp->incr) {
                    kcp->cwnd++;
                }
            }
            // 更新出来的值还要再比较下 rmt_wnd
            if (kcp->cwnd > kcp->rmt_wnd) {
                kcp->cwnd = kcp->rmt_wnd;
                kcp->incr = kcp->rmt_wnd * mss;
            }
        }
    }
}

计算发送窗口 kcp->cwnd 的大小涉及的代码片段会稍微多一些,因为在发送和接收数据的时候,都会需要更新。kcp->cwnd 初始化为 0, 之后会在第一次调用 ikcp_flush 的时候,判断小于 1 ,便修改成 1。之后发送方根据发送窗口大小,发送出相应数量的数据包,等待 ACK 回复包。ACK 包在 kcp->input 中进行处理,kcp->input 中如果判断有 ACK 包,并有清除发送缓冲中的发送数据包,说明有数据包已经完成送达,kcp->cwnd++。实际上很可能是一次 kcp->input 只处理到一个 ACK 包,可以理解为,每收到一个 ACK 包都会有 kcp->cwnd++,这句自增实现的是翻倍的效果,譬如当前 kcp->cwnd = 2,发送两个数据包,收到两个 ACK 包,触发了两次自增,最后就是 kcp->cwnd = 4 翻倍。

cwnd 可以一直指数增长,直到超过慢启动阈值,或者发生拥堵超时重传,快速重传的情况。发生了超时重传之后,会触发慢启动,慢启动阈值 ssthresh = kcp->cwnd / 2,发送窗口 kcp->cwnd = 1,回到最初重新指数增长。如果发生了快速重传,KCP 先提前减少 ssthresh,也即是减少了 cwnd 指数增长的空间,降低增长速度,提前减缓拥堵的情况。

KCP 还增加了一个配置 nocwnd,当 nocwnd = 1,发送数据是不再考虑发送窗口大小,直接让最大能发送的数量发送数据包,满足高速模式下的要求。

小结

本文简单地分析了 KCP 的源码,并讨论了 KCP 上 ARQ 的实现,和一些 KCP 提升流速的策略。还有很多细节没有提到,感兴趣的可以自己翻 KCP 的源码对照着看,相信也能有不少的收获。