0%

DFA

Deterministic Finite Automation 确定有穷自动机。
我理解其实就是一个简单的Trie树,节点带output,对于结束的节点,把对应的字符串写在output里面。
对于字符串中的每个字符,查找Trie树中能否走到有单词结束标记的节点。匹配失败的话需要回退。

AC自动机

Aho-Corasick算法,在Trie树的基础上加入了fail指针。Unix工具fgrep的底层实现就是用的AC自动机。

每条边代表一个字符。同一个节点出来的边代表的字符各不相同。每个节点代表一个状态。
状态的迁移由三个函数决定:goto,failure,output。而failure函数是AC自动机最难也是最关键的一环。

假设有字符串ABCDE匹配到模式ABC,结果D没匹配上。这说明模式ABCD是不可能的了。这时候按照传统的算法,
应该从字符串中A的下一个字符也就是B开始重新从根节点进行匹配。但是我们已经知道了B后面是CD,
最好的情况就是模式中刚好有BCD开头的这个模式,我们一路匹配到BCD。那么如果我们能直接从ABC的C跳到BCD这个D节点就好了。
failure指针就是干这个的。如果BCD模式也没有,那么我们就退而求其次看看有没有CD,没有CD,那就直接看D,如果都没有,
那D也不用看了,直接从根节点匹配E吧。失败指针构造就是这么个递归的查找过程,这其中包含了动态规划的思想。
实际构造失败指针的时候,父节点的失败指针是一层层已经构造好了的,所以其实就是一个自底向上的动态规划过程。
简单的说失败指针就是用当前模式的后缀去匹配其他模式的前缀。失败指针代表的字符跟节点本身要么是一样的,
要么是空(也就是根节点)。

node: children, output, fail point

  • build a trie tree with output, when finish, every node only has one output
  • build fail point
    • root node’s children fail to the root, add children to a fifo queue
    • pop one node out of queue, build fail point for children of that node
      • for every child, find the closest children, starting from the fail node,
        check if it has the character of the child node ?
        • if true, then point to that node, add the failed node’s output to the current node
        • if false, fail to the root
      • node in queue, already build fail node
  • match
    • if fail, go to the fail node, and continue the match

构造失败指针这部分比较麻烦一点。思路是从父节点的fail node开始,递归的往上找fail node,
看fail node的子节点是否有包含当前节点相同字符的节点,如果有,将该子节点设为当前节点的
fail节点,并且将fail节点的output也加到当前节点的output列表中。如果没有,一路递归到根节点。
构造的过程是一层层往下的,所以处理低层的节点的时候高层的节点都已经处理好了。
其实就是一个宽度优先的遍历。

匹配的时候从根节点开始往下找,匹配失败就递归的往上找失败节点。匹配失败目标字符串不需要回退。
类似KMP的匹配,所以效率很高。

Erlang实现AC自动机

这里找到了一个erlang版本的ac自动机,
可惜是一个不完整的版本,因为失败指针没有正确的构建,所以有些情况下不能正确匹配。
比如:词库中有”BC”、”ABCD”这两个违禁词,但是尝试匹配”ABC”的时候发现匹配不到。
而正常的AC自动机是能够匹配上的。这是因为这里的自动机少了第二步,构造失败指针,
这样失败指针指向的节点的output也不会加入到当前节点。所以出现上面的问题。

自己撸了一个Erlang版本的AC自动机,用了map结构,代码很精简,一百来行,基本比较完整实现了
功能,包括失败指针的构造,代码放在github上了。
对比了一下之前用re正则模块一个个匹配的方法,AC的性能提升超过1000倍。

binary模块

众里寻他千百度,蓦然回首,那人却在灯火阑珊处。

AC自动机的实现有各种语言的版本,唯独Erlang的非常少。
找到一个纯Erlang实现的还只是实现了一部分。最关键的fail函数没有实现。导致匹配的时候
会出现上面的问题。

其实Erlang已经为我们实现了一个AC自动机,就在binary模块中。通过这个模块
的源码erl_bif_binary.c可以看到这样一段注释:

1
2
3
4
5
6
7
8
/*
* The native implementation functions for the module binary.
* Searching is implemented using either Boyer-Moore or Aho-Corasick
* depending on number of searchstrings (BM if one, AC if more than one).
* Native implementation is mostly for efficiency, nothing
* (except binary:referenced_byte_size) really *needs* to be implemented
* in native code.
*/

可以看出binary的搜索是基于BM和AC算法来实现的。如果模式只有一个就是BM,如果有多个就是AC。因此要实现替换功能,
只需要直接调用binary:replace(Subject, Pattern, Replacement, Options)方法即可。
另外binary还提供了一个binary:compie_pattern(Pattern)函数,这样可以把模式存储下来,
后续的匹配直接使用即可。不过这里每个节点其实不是一个有效的utf8字符,而只是一个字节。

1
2
Cp = binary:compile_pattern(Patterns),
io:format("~ts",[binary:replace(Text,Cp,<<"*">>,[global])]).

参考文档

Erlang系统中有两套时间系统。一个是操作系统时间,一个是虚拟机时间。我们知道,操作系统时间是非常不可靠的,
它依靠ntp跟网络上服服务器同步,也有可能被人为修改。如果依赖操作系统时间,程序可能出现异常的行为。
比如游戏中一个设定是每天0点进行结算,如果结算完一次这时操作系统时间调整回去了,结果又会结算一次。
因此ERTS在操作系统时间的基础之上引入了虚拟机时间。

在Erlang/OTP 18(ERTS 7.0)之前,获取时间的接口主要是两个:

  • erlang:now()返回虚拟机时间
  • os:timestamp()返回操作系统时间。

erlang:now()存在性能问题,因此很多软件被迫使用os:timestamp()来代替。但是操作系统时间又存在时间回退的问题。

18之前,操作系统时间变化时,只能通过时间纠正来慢慢系统时间对齐,这个调整的过程可能是非常漫长的。
1分钟的差异需要100分钟才能调整完。这段时间内的时间间隔,定时器都会受到影响,大约存在1%的偏差。

从OTP 18以后,把虚拟机时间分为了两个部分,time_offset和monotonic_time。前者用来跟操作系统对齐,
后者用来实现稳定的时间频率。同时引入了三种time warp mode。time correction加上time warp mode,
更好的处理系统时间的跳变。

基本概念

  • UT1:世界时
  • UTC:Coordinated Universal Time,协调世界时,对秒的定义跟UT1有差异,包含闰秒。UTC的一天可能为86399, 86400, 86401秒。
  • POSIX Time(aka Unix/Epoch time): Time since EPOCH (UTC 1970-01-01 00:00:00),POSIX Time的一天刚好为86400秒。奇怪的是EPOCH被定义为UTC时间。
  • OS System Time:操作系统视角的POSIX time。存在时间跳跃。
  • Erlang System Time: Erlang运行时视角的POSIX time。跟操作系统时间可能有偏差。
  • Erlang monotonic time: events, timers, time interval, 单调,但是不严格单调递增。
  • Time offset: 通过时间偏移来同步操作系统时间,无需修改单调时间的频率。

时间单位:每秒多少个单位:

  • second,1
  • millisecond,1000
  • microsecond,1000000
  • nanosecond,1000000000
  • native,Erlang runtime system使用的单位,不同的操作系统会不一样。我的电脑里面,Windows下为1024000,CentOS下为1000000000。

时间单位之间的转化可以通过函数实现:
erlang:convert_time_unit(Time, FromUnit, ToUnit)

Time Warp Mode

  • no_time_warp 默认方式,系统启动的时候就决定了time offset,以后也不会改变。
    跟之前的系统兼容。因为offset不会变。所以只能通过调整monotonic_time的频率来接近系统时间。
    这会造成monotonic_time的时间频率存在1%的误差。
  • multi_time_warp 直接改变offset来同步时间,monotonic_time保持相对的稳定,当系统时间发生跳变的时候,
    可以通过erlang:monitor(time_offset, clock_service)来获得通知。
  • single_time_warp 主要用于嵌入式系统。
  • 可以通过虚拟机标记+C no_time_mode | multi_time_warp | single_time_warp来配置。

Time Correction

  • 可以通过虚拟机标记来配置是否开启:+c true | false
  • 如果设置为true,Erlang通过加速和减速来跟操作系统时间同步。幅度最大是1%,也就是说,VM经历1秒实际上可能是0.99秒或者1.01秒。
    当系统时间改变了1分钟,erlang会花100分钟来慢慢校正,并最终与系统时间同步。
  • 如果设置为false,当操作系统时间落后时,虚拟机时间会停滞。直到操作系统时间追上来为止。这意味着重复调用erlang:monotonic_time()会返回相同的值。
    当操作系统时间领先时,monotonic_time前跳。

可以通过函数erlang:system_info(time_correction).来查看时间纠正是否开启。
关闭这个选项不会获得任何好处,而且当发生时间跳变时,Erlang monotonic time可能向前跳或者停止。
所以这个选项一般都是打开的。

一般常用到的配置:

  • +c true +C no_time_warp offset保持不变,mono改变%1来追赶OS时间。跟18之前表现是一样的。
  • +c true +C multi_time_warp offset随着OS时间而变化,mono保持相对稳定的频率。

OS System Time

操作系统时间不是单调递增的。系统时间随时可以修改。比如我取了一个操作系统时间t1,
然后将系统时间改为1天前,再取一个系统时间t2,t2-t1的出来是个负值。

新API主要提供这三个接口来获取操作系统时间。

  • os:system_time() 返回native时间单位的操作系统时间os system time
  • os:system_time(Unit) 将操作系统时间转化为Unit时间单位。等价于 erlang:convert_time_unit(os:system_time(), native, Unit)
  • os:timestamp() -> {MegaSecs, Secs, MicroSecs}
    • calendar:now_to_universal_time/1
    • calendar:now_to_local_time/1

比如计算年月日:

1
2
3
4
5
format_utc_timestamp() ->
TS = {_,_,Micro} = os:timestamp(),
{ {Year,Month,Day},{Hour,Minute,Second} } = calendar:now_to_universal_time(TS),
Mstr = element(Month,{"Jan","Feb","Mar","Apr","May","Jun","Jul","Aug","Sep","Oct","Nov","Dec"}),
io_lib:format("~2w ~s ~4w ~2w:~2..0w:~2..0w.~6..0w",[Day,Mstr,Year,Hour,Minute,Second,Micro]).

另外,erlang:date(),erlang:localtime()等函数都是通过操作系统时间算出来的。
通过下面的接口可以看到操作系统接口调用的底层接口,比如Windows下面是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
> erlang:system_info(os_system_time_source).
[{function,'GetSystemTime'},
{resolution,100},
{parallel,yes},
{time,1558773547665408}]

> erlang:system_info(os_monotonic_time_source).
[{function,'GetTickCount64'},
{resolution,100},
{extended,no},
{parallel,yes},
{time,1191373658112}]

Erlang System Time

正是因为操作系统时间如此不可靠,我们需要一个经过纠正的虚拟机时间。
这个纠正的虚拟机时间具有以下特性:

  • Never jumps backwards or forwards
  • Never differs more than 1% in speed from OS Monotonic time
  • Attemps to be as close as possible to OS system time

jumping from 1970 to 2015 will take 4500 years to recover, which means all relative time will happen 1% faster for many years.

新API提供以下接口用于获取虚拟机时间:

  • erlang:system_time() 返回native时间单位的虚拟机时间erlang system time,虚拟机时间由两部分构成:time_offset和monotonic_time。
    erlang:system_time() 等价于 erlang:monotonic_time() + erlang:time_offset()
  • erlang:system_time(Unit) 将erlang系统时间转化为Unit时间单位。等价于erlang:convert_time_unit(erlang:system_time(), native, Unit)
  • erlang:monotonic_time() 虚拟机内部的时间引擎。定时器、receive after定时器、BIF定时器、timer模块定时器都是由这个时间触发。
  • erlang:time_offset()
  • erlang:timestamp() -> {MegaSecs, Secs, MicroSecs} Erlang system time,这个函数的存在只是为了兼容现有的代码的时间格式。
    Erlang system time可以通过上面的函数erlang:system_time/1更加高效的获取。这个函数等价于:
1
2
3
4
5
6
timestamp() ->
ErlangSystemTime = erlang:system_time(microsecond),
MegaSecs = ErlangSystemTime div 1000000000000,
Secs = ErlangSystemTime div 1000000 - MegaSecs*1000000,
MicroSecs = ErlangSystemTime rem 1000000,
{MegaSecs, Secs, MicroSecs}.

使用指南

一个总的原则就是:不要使用erlang:now/0。根据不同的目的,选择不同的接口。

获取系统时间

使用erlang:system_time/1获取系统时间。如果需要erlang:now/0返回的数据格式,可以用erlang:timestamp/0。

测量时间差

使用erlang:monotonic_time/0之差来测量时间,结果是native时间单位,可以用erlang:convert_time_unit/3来转化为其他时间单位。
也可以直接使用erlang:monotonic_time/1之差来测量时间,不过这种方式会损失一定的精度。

事件的顺序

erlang:unique_integer([monotonic]). 严格单调递增。

唯一名字

  • erlang:unique_integer/0
  • erlang:unique_integer([positive])

随机数种子

  • erlang:monotonic_time()
  • erlang:time_offset()
  • erlang:unique_integer()

参考文档

windows 7以前的系统需要通过虚拟机软件运行boot2docker来支持docker,
一般使用VirtualBox来运行boot2docker。boot2docker是一个支持docker的linux环境。

安装VirtualBox

windows上安装虚拟机需要先开启cpu的虚拟化支持,否则会有报错:

Error in driver during machine creation: This computer doesn’t have VT-X/AMD-v enabled.
Enabling it in the BIOS is mandatory

这是因为电脑虚拟化技术处于被禁用状态,这时就要启动BIOS的虚拟化设置,开启CPU虚拟化支持。
在BIOS中找到CPU的Intel Virtualization Technology选项,选择Enable,保存退出重启即可。

docker-machine

docker-machine是虚拟机管理程序。我们可以通过它来登陆boot2docker,查看各种虚拟机相关的信息如ip,环境变量等。
在docker客户端输入docker-machine help就能看到帮助。

docker-machine 支持的命令主要有:

  • active 打印运行中的虚拟机名称,默认虚拟机为default
  • config 打印连接参数
  • create 新建虚拟机
  • env
  • inspect
  • ssh 连接虚拟机,docker-machine ssh default 就能连接到默认的虚拟机中。
  • ls
  • ip 查看boot2docker虚拟机的ip地址,通常是192.168.99.100

虚拟机的默认账号是docker, 密码是tcuser。docker用户是sudoers, 直接执行sudo -i就能切换为root账号。

文件夹挂载

跟linux系统不一样,windows下的宿主系统并不是windows本身,而是运行在虚拟机中的boot2docker,
所以在windows下挂载volumn的时候,不能直接把windows下的目录挂载到容器中。

要实现挂载,首先要通过虚拟机系统把windows下的文件夹共享到虚拟机中,默认的虚拟机已经共享了一个文件夹:c:\Users,对应的虚拟机中的目录是:/c/Users
所以一般情况下我们是可以把/c/Users目录下的文件夹挂载到容器中去。
所以docker run --rm -it -v /c/Users:/c/ erlang:19.3.3 ls /c/这条命令能够列出C:\Users下面的文件。

但是如果需要支持其他目录,首先要通过VirtualBox把文件夹共享到虚拟机中,
然后才能挂载到容器中。而且使用的时候使用的宿主目录不是windows下的目录,而是虚拟机中的挂载目录。

直接用打开VirtualBox,能看到一个名为default的虚拟机,这个虚拟机就是docker的宿主机,
在这个虚拟机中创建一个共享文件夹,比如把D:\挂载到/d目录下。
通过docker-machine restart重启虚拟机。

docker run --rm -it -v /d/projects:/apps -w /apps erlang:19.3.3 ./rebar3 as prod tar

这条命令会把windows下的D:\projects目录通过宿主机的/d/projects挂载到容器的/apps目录下。并且把/apps作为容器的工作目录。
这样就能实现通过docker来实现跨平台的编译。顺便说一句,如果只需要支持特定平台的编译,用Jenkins实现目标环境的编译更加方便。

端口映射

容器的端口也是映射到宿主机中而不是windows中。这一点也要注意,windows中访问端口的时候要使用宿主机的ip,而不是自身的ip。
宿主机的ip可以通过docker-machine ip来查看。一般是192.168.99.100.

配置加速器

由于DockerHub默认使用的国外的镜像地址,在国内使用的速度感人,需要配置国内的加速。
我本人使用的是DaoCloud的加速服务,注册账号以后会分配一个私有的加速地址。
也可以直接用https://www.docker-cn.com/registry-mirror。
修改方法如下:
首先通过docker-machine ssh <machine-name>登录虚拟机,默认名字为default.
然后修改/var/lib/boot2docker/profile文件,
sudo vi /var/lib/boot2docker/profile
将–registry-mirror=添加到EXTRA_ARGS中
最后sudo /etc/init.d/docker restart重启Docker服务就可以了。

kcp是一个实现了类似tcp的重传、流控、拥塞控制等机制的传输协议,它没有指定传输协议,但是一般情况下都会用udp来传输,
相当于用udp实现的tcp协议。它在不稳定的网络环境中采用更激进的方法来获得比tcp更快的传输速度,代价就是会消耗更多的流量。
跟tcp一样,kcp采用滑动窗口机制来实现流控,通过重传机制保证了可靠性,通过慢启动,拥塞避免,快速重传和快速恢复来实现拥塞控制。

kcp支持两种模式:流模式和消息模式。

流模式具有更高网络利用率,更大的传输速度,但是解析数据相对更复杂。

消息模式具有更小的网络利用率,更小的传输速度,但解析数据更简单。

相关变量

字段 含义 注释
rcv_wnd receive window 接收窗口大小
rmt_wnd remote window 远端可接收窗口。发送端发送数据的时候带上,接收端接收到数据时更新(input的时候更新)
snd_wnd send window 发送窗口大小,默认 32,静态值。用于计算 cwnd
cwnd congestion window 拥塞窗口大小,会根据丢包或者ack调整,cwnd=min(snd_wnd,rmt_wnd),只有 nc==0 的时候才会更新并使用这个值
rcv_buf receive buffer 接收缓冲区,缓冲底层接收的数据,组装连续以后拷贝到接收队列
rcv_queue receive queue 接收队列,接收的连续数据包,上层应用可以直接使用,不能超过接收窗口大小
snd_queue send queue 发送队列,上层应用数据分片后加入发送队列,发送队列的数据不会立即发送,头部还没有完成初始化
snd_buf send buffer 发送缓冲区,已发送尚未确认的包
buffer 调用output发送的数据
probe ICKP_CMD_WASK IKCP_CMD_WINS
acklist sn,ts 待发送的ack列表
fastresend 快速重传门限,无须等待超时,如果n没收到ack但是收到了n+1,n+2,…n+fastresend那么立即重传n
ssthresh slow start thresh 慢启动拥塞窗口阈值,不小于2
incr 可发送的最大数据量,incr=cwnd*mss,因为拥塞避免阶段cwnd的增加不足一个包,所以需要维护这么个变量,当满了一个mss以后给cwnd自增
stream stream mode 是否使用流模式
mss max segment size 最大报文长度,mtu - (24 头部长度)
mtu max transfer unit 默认1400,不小于50

RTO计算(Retransmission timeout)

kcp的重传超时计算方法参考了tcp的实现Computing TCP’s Retransmission Timer,不过在参数设置上更加激进。
计算过程主要维护两个变量:

  • SRTT: smoothed round-trip time
  • RTTVAR: round-trip time variation

接收端收到确认包以后根据rtt更新这两个变量,并重新计算rto值。报文超时以后tcp的rto会翻倍,
kcp中声称采用的是1.5倍,然而代码中实际上是线性增加,不知道是作者有意为之还是实现bug。

Segment头部

含义 会话id 包类型 是否分片 发送端接收窗口 时间戳 包序列号 未确认包序号 数据长度 数据
字节数 4 1 1 2 4 4 4 4 len
字段名 conv cmd frg wnd ts sn una len data
全名 conversation id command fragment window timestamp segment number unacknowledge length data

报文字段含义:

字段名 含义
conv 通信双方需要保证相同的会话id
cmd ICKP_CMD_PUSH/IKCP_CMD_ACK/IKCP_CMD_WASK/ICKP_CMD_WINS
frg 分片ID,从大到小,0表示最后一个分片
wnd 剩余接收窗口大小(接收窗口大小 - 接收队列大小)
ts message发送时刻的时间戳
sn 分片segment序号
una 待接收消息序号(接收滑动窗口左端)
len 数据长度
data 数据
fastack 收到ack时计算的该分片被跳过的累积次数
xmit 分片发送的次数,每发送一次加1,超出dead_link,说明目标不可达
resendts 下一次超时重传的时间戳
rto 该分片的超时重传等待时间

API

kcp与底层交互

1
2
int ikcp_output(ikcpb *kcp, const void *data, int size)
int ickp_input(ikcpcb *kcp, const char *data, long size)

这两个结构是kcp与底层网络交互的接口。一般底层网络指的是UDP。
output将kcp打包的数据发往目的地址,而input是底层网卡收到udp数据以后喂给kcp层。

底层收到数据报文后调用input,将数据丢给kcp。

上层通过recv获得处理后的数据,循环从data中取出kcp包。直到剩下的长度小于包头长度退出循环。

input传入的数据包含kcp包头,kcp拿到以后会尝试去掉kcp包头,组合成一个完整的数据包:

  • 长度小于包头长度24,退出循环。
  • 构造包头,分别检查会话id,数据长度,包体类型。
  • 设置远程剩余接收窗口大小。
  • 将对方已经确认收到的包从发送缓存snd_buf中删除,相当于发送窗口右移。
  • 计算本地真实snd_una,也就是下一个等待确认的发送包。
    • 如果还有未确认的包,为发送窗口左端包序列号
    • 如果没有未确认包,即发送窗口为空,则为下一个发送包snd_nxt
  • 根据包的不同类型分别处理
    • ACK包:
      • 计算rtt,更新rto,更新算法参考rfc6298
      • 从发送窗口中删除对应的包,并更新此包之前未确认包的fastack信息
    • PUSH数据包:
      • 判断收到的包的sn是不是位于区间:[rcv_nxt, rcv_nxt+rcv_wnd), 是则继续,否则丢弃
      • 在接收窗口内,将其加入acklist,下一次flush的时候确认
      • 构造一个kcp包,判断是否重复,如果不是重复包,插入接收缓存rcv_buf中对应位置
      • 将rcv_buf中的已经收到的连续包移到接收队列rcv_queue中,供上层应用读取,并从rcv_buf中移除
    • WASK包:
      • 设置probe的TELL标记,flush的时候推送
    • WINS包:
      • 无须处理,因为之前已经设置了对方的剩余接收窗口大小

所有包处理完毕以后,根据对方确认包的信息,更新拥塞窗口大小。

  • 如果拥塞窗口小于对方剩余接收窗口,则需要更新
  • 如果拥塞窗口尚未达到ssthresh(慢启动门限),cwnd++
  • 如果大于ssthresh,

kcp与应用层交互

1
int ikcp_recv(ikcpcb *kcp, char *buffer, int len)
  • 将接收队列中的消息传递给上层应用,因为消息会被拆分成kcp包,所以消息长度需小于接收队列长度*报文长度。
  • 将接收缓冲中的连续报文拷贝到接收队列。
  • 如果接收队列从满到不满,推送窗口通知消息给发送端,通知有接收窗口,可以继续发送新的报文。
  • ikcp_recv返回大于0的数的时候,buffer里面必然是一个完整的包。所谓完整的包,也就是对应发送方调用ikcp_send的时候发送的buffer。应该是一一对应的。原本我以为如果发送的包需要分段,那么需要把长度加到buffer前面,其实是没有必要的。因为peeksize就保证了,必须是一个完整的包才会返回大于0的数。
  • stream=0的话可以保证收到的是一个 send时传过去的完整的包。流模式和包模式的问题
1
int ikcp_peeksize(const ikcpcb *kcp)

检查接收队列中是否有一个完整的消息,并返回消息的长度。

1
int ickp_send(ikcpcb *kcp, const char *buffer, int len)

发送应用层的数据,会根据mtu大小分片,每个分片加上segment头部24字节,加入发送队列snd_queue。

根据stream字段区分流模式和消息模式。不同模式组包的机制不一样。如果是消息模式,分片数据的frg字段从count-1到0,0表示分片结束。发送的数据最大不能超过 128 * mss 大小。kcp-go 为了突破这个限制,在应用层对包的大小进行了切分,每次只发一个mss,这样就调用 write 的时候就可以发送很大的数据。为什么有这个限制呢,这是因为 kcp 支持包模式,包模式依赖一个 frg 来组装大包,这个id 在 kcp 头部占用的字节就是一个。

1
void ikcp_update(ikcpcb *kcp, IUINT32 current)

根据传入的时钟,决定是否调用flush。kcp维护一个ts_flush表示下一次flush的时间,update的时候如果发现当前时间已经大于等于ts_flush,则执行flush。

1
int ikcp_nodelay(ikcpcb *kcp, int nodelay, int interval, int resend, int nc)

配置参数。interval的范围是10ms到5000ms。默认是100ms。interval决定了实际调用flush的频率。
nc 0: normal congestion control(default), 1: disable congestion control 。这个值控制 kcp 是否进行拥塞控制。
nc 表示 nocwnd,表示是否尊重拥塞控制。默认是 0,会尊重拥塞控制,在发生丢包或者重复确认的时候减小拥塞窗口。
如果配置为 1 ,则表示不遵守拥塞控制。所以默认情况下 kcp 也可以像 tcp 那样遵守公共规则,但是如果打开这个开关,
kcp 就不遵守公共规则,表现的比较自私。

cwnd 表示可以 inflight 的包的数量,这个值越大,kcp 发包越多。类似于压测工具的 pipeline 参数。能够 pending 的包的个数。这个参数收到两个参数的严格控制,一个是自己的发送窗口,一个是对端的接收窗口。如果考虑到拥塞控制,还应该受到拥塞窗口的控制。kcp 通过 nc 选项来决定是不是遵守拥塞控制规则,即ss,cc,fastack,fast recovery等拥塞控制计算出来的拥塞窗口。

kcp 会将 cwnd 个数的包挂在 snd_buf 队列中。

1
void ikcp_flush(ikcp *kcp)

实际发送数据的接口。

  • 发送acklist中的ack消息
  • 如果对方接收窗口为0,需要发送IKCP_CMD_WASK消息,检查probe对方接收窗口是否ready
  • 发送自己的接收窗口大小
  • 发送数据

snd_buf 是一个带 sentinel 的循环链表。

ikcp_parse_una 接收端接收到数据包,解析出对端尚未确认的包,那么小于una的说明都已经确认,如果发送缓冲中还在,直接删除。
同时,由于接收端接收到了新的数据,所以需要更新自己的 send_una 也就是尚未确认收到的包。如果发送缓冲中有包,那么就是发送缓冲中的第一个包的sn。
否则就是下一个需要发送的sn:kcp->snd_nxt。

kcp-go

go 版的 kcp ,会话有一个 writeDelay 的标识。表示有数据是是否立即发送,还是等到下一次 update 的时候发送。默认是不 delay,立刻发送。
当 waitsnd < snd_wnd 时,要发送的数据直接挂到 kcp 缓存中。然后再次检查 wait_snd 是否超过 snd_wnd ,如果是,则立刻 flush 。
所以有两种情况下,Write 是立刻发送的:一个是 waitSnd 大于 snd_wnd ,另一个是会话的默认配置,不 delay。

rmt_wnd 每个包都会带上一个wnd,收到对面的wnd,就是rmt_wnd。
kcp.cwnd 不会超过 rmt_wnd。

如果 rmt_wnd 为0,表示对面已经没有能力接收。隔 7 秒发送一个 probe window size。每次1.5倍增加试探时间。最多增加到 120 秒。

参考文献

MMO游戏中经常需要实现各种技能的效果,不同技能拥有不同的伤害区域,
一般情况下,我们需要对周围的目标进行遍历,检查目标是否落在伤害区域内。
一般的伤害预期主要是三种:圆形,扇形,矩形。

圆形

圆形伤害区域的计算非常简单,给点圆心C, 半径R, 计算P是否在圆内。
只需要计算P到C的欧式距离|P - C|是否小于R即可。为了节省开方计算,
我们可以直接用平方来比较。

1
2
3
4
is_in_circle(#pb_vector3{x = X, z = Z} = P, #pb_vector3{x = X0, z = Y0}, R) ->
Dx = X - X0,
Dz = Z - Z0,
Dx * Dx + Dz * Dz < R * R.

扇形

扇形伤害区域的计算相对于圆形要复杂一点,Milo的文章对扇形的处理有很好的阐述,可惜部分公式显示不出来了。
圆形相当于扇形的特殊形式。这里我们只考虑锐角扇形,即不超过180度的扇形。
给点圆心C, 半径R, 施法者朝向V, 扇形角度Thelta,P是在扇形内需要满足两个条件:

  • P到C的欧式距离不超过R,跟圆形的判断一致。
  • P到V的夹角小于Thelta/2

判断P到V的夹角有两种方式,一种是分别计算CP和V的与x轴的夹角,然后看这两个夹角检测这两个夹角的差值范围是否在Thelta/2范围内。
这个夹角可以通过atan2函数求得,这里要特别小心角度的范围。考虑到V是常数,可以预先算出夹角,可是还是免不了要计算一个atan2.
另一种办法是用点积。具体参考Milo的文章。

这里我们采用另外一种方法,这个方法来自StackOverflow, 这个方法需要算出扇形的起始向量,假设为StartVector,EndVector,
然后我们只需要判断CP是否在StartVector的逆时针方向,并且在EndVector的顺时针方向。
这里StartVector和EndVector都可以预先计算出来。而判断方向的方法异常简单:

检测v2是否在v1的顺时针方向:

  • 找到v1的逆时针法向量,法向量相当于将v1逆时针旋转90度:(x1, y1) -> (-y1, x1)
  • 找到v2在法向量上的投影,利用点积计算:p = v2.x * n1.x + v2.y * n1.y
  • 如果投影为正,那么v2在v1的逆时针方向,否则为顺时针方向。

我们的输入不包括扇形的两个向量,所以先要计算出这两个向量,根据欧拉公式:

一定要注意sectorStart和sectorEnd都是相对于扇形圆心的坐标,而不是绝对坐标!!!!

1
2
3
4
5
%% #pb_vector3{x = X0, z = Z0} 为施法者坐标指向施法目标的向量
Sin = math:sin(Thelta/2),
Cos = math:cos(Thelta/2),
SectorStart = #pb_vector3{x = X0 * Cos + Z0 * Sin, y = 0, z = -X0 * Sin + Z0 * Cos},
SectorEnd = #pb_vector3{x = X0 * Cos - Z0 * Sin, y = 0, z = X0 * Sin + Z0 * Cos},

给点起始和结束向量,求解坐标是否在扇形内:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
%% 判断V2是否在V1的顺时针方向
are_clock_wise(#pb_vector3{x = X1, z = Z1}=V1, #pb_vector3{x = X2, z = Z2}=V2) ->
-X1 * Z2 + Z1 * X2 > 0.

%% 是否在圆内
is_within_radius(#pb_vector3{x = X1, z = Z1}, RadiusSquared) ->
X1 * X1 + Z1 * Z1 =< RadiusSquared.

%% 是否在扇形内
is_inside_sector(CheckPos, CastingPos, SectorStart, SectorEnd, RadiusSquared) ->
RelPoint = lib_map_util:get_raw_vector(CastingPos, CheckPos),
(not are_clock_wise(SectorStart, RelPoint)) andalso
are_clock_wise(SectorEnd, RelPoint) andalso
is_within_radius(RelPoint, RadiusSquared).

矩形

给点施法者的位置C,施法者前方的终点坐标F,技能决定施法的范围Range,可以得到一个矩形区域。

矩形的计算也有几种方法,比如先进行坐标系的转换,在相对坐标系中判断就很简单了。不过坐标的转换略复杂。

这里采用的是另一种方法,通过求点到线段的距离来判断矩形区域。

  • 先判断点是否超出线段的起点和终点
  • 若没有,则求出点到直线的距离

根据点积可以判断坐标是否落在线段的中部。点积大于0且小于线段长度的平方说明坐标落在线段的中间区间,没有超出两端的范围。
如果没有超出两端,我们就可以算出点到直线的投影点,然后计算点到投影点的距离,从而得到点到直线的距离。

坐标的投影所占的线段的比例为:R = Dot / LengthSquared, 则投影点为:C + R * (F - C)。

1
2
3
4
5
6
7
8
9
10
11
is_in_rectangle(#pb_vector3{x = X, z = Z} = P, #pb_vector3{x = X1, z = Z1} = C, #pb_vector3{x = X2, z = Z2} = F, RangeSquared) ->
Dx = X2 - X1,
Dz = Z2 - Z1,
LengthSquared = Dx * Dx + Dz * Dz,
Dot = Dx * (X - X1) + Dz * (Z - Z1),
Dot >= 0 andalso Dot < LengthSquared andalso begin
R = Dot / LengthSquared,
Xp = X1 + Dx * R,
Zp = Z1 + Dz * R,
(X-Xp)*(X-Xp) + (Z-Zp)*(Z-Zp) =< RangeSquared
end.

参考文献

附扇形的检验程序

直接存到html文件在浏览器中执行即可,可以通过修改参数改变输出的图形。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
<!DOCTYPE html>
<html>
<body>

<canvas id="myCanvas" width="400" height="400"
style="border:1px solid #d3d3d3;">
Your browser does not support the canvas element.
</canvas>

<script>
function isInsideSector(point, center, sectorStart, sectorEnd, radiusSquared) {
var relPoint = {
x: point.x - center.x,
y: point.y - center.y
};

return !areClockwise(sectorStart, relPoint) &&
areClockwise(sectorEnd, relPoint) &&
isWithinRadius(relPoint, radiusSquared);
}

function areClockwise(v1, v2) {
return -v1.x*v2.y + v1.y*v2.x > 0;
}

function isWithinRadius(v, radiusSquared) {
return v.x*v.x + v.y*v.y <= radiusSquared;
}

function isInsideSector2(point,center, sectorStart, sectorEnd, radiusSquared) {
var relPoint = {
x: point.x - center.x,
y: point.y - center.y
};
return !areClockwise(sectorStart, relPoint) &&
areClockwise(sectorEnd, relPoint) &&
isWithinRadius(relPoint, radiusSquared);
}

var canvasSize = 400;
var canvas = document.getElementById("myCanvas");
var ctx = canvas.getContext("2d");
//ctx.beginPath();
//ctx.arc(200,200,200,-Math.PI,0);
//ctx.stroke();

function drawLine(point) {
ctx.moveTo(200,200);
ctx.lineTo(point.x, point.y);
ctx.stroke();
}

var center = { x: 200, y: 200 };
var sectorStart = { x: 400, y: 200 };
var sectorEnd = { x: 200, y: 400 };
drawLine(sectorStart);
drawLine(sectorEnd);


for (var i = 0; i < 800; ++i) {

// generate a random point
var point = {
x: Math.random() * canvasSize,
y: Math.random() * canvasSize
};

var sectorCenter = {x:0,y:200};
var cx = sectorCenter.x - center.x;
var cy = sectorCenter.y - center.y;
var thelta = Math.PI/4;
var sin = Math.sin(thelta);
var cos = Math.cos(thelta);
var sectorStart = {
x: cx*cos + cy*sin,
y: -cx*sin + cy*cos
};
var sectorEnd = {
x: cx*cos - cy*sin,
y: cx*sin + cy*cos
};

// test if the point is inside the sector
var isInside = isInsideSector2(point, center, sectorStart,sectorEnd, 40000);
//var isInside = isInsideSector(point, center, {x:200,y:0}, {x:0,y:200}, 40000);

// draw the point
if (isInside) {
drawLine(point);
}

}

</script>

</body>
</html>

  • 面向连接,一对一,所以基于广播和多播的应用程序不能使用TCP服务。而无连接的UDP则非常适合广播和多播。
  • 字节流,TCP 字节流,send()写recv()读次数没有固定关系,UDP 数据报,sendto()写recvfrom()读次数相同。
  • 可靠传输。

传输层-Segment, 网络层-Packet, 链路层-Frame

TCP HEADER

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
 0                   1                   2                   3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Source Port | Destination Port |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Sequence Number |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Acknowledgment Number |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Offset| Res. | Flags | Window |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Checksum | Urgent Pointer |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Options | Padding |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

tcp-header

  • 32位序号sequence number, ISN + byte offset of the first byte in the packet
    • ISN initial sequence number, random generated
  • 4 bit header length: unit 32bit,4字节。
    • min = 5,最小为5,表示没有选项,头部长度为:5 * 4 = 20
    • max = 15,4 比特最大能表示15,15 * 4 = 60,表示选项不能超过 40 字节。
  • Flags
    • CWR Explicit Congestion Notification, rfc3168
    • ECE
    • URG
    • ACK
    • PSH
    • RST
    • SYN
    • FIN
  • RWND receiver window,接收窗口,每个包都会携带。这个数据非常重要。决定了发送端的发包策略,即滑动窗口机制实现的接收端流量控制。通过控制发送端的发送窗口实现对接收方的流量控制。2 字节,最大64k?超过64k怎么办?tcp_window_scaling。最大可到1G。rfc 1323,通过选项的窗口的扩大因子。
  • checksum crc(header + data)
  • options <= 40 byte

tcp 选项

分为单字节的选项和可变长度的选项:

  • 单字节选项由一个字节的选项类型 Option Kind 构成。
  • 可变长度选项由 1 字节的选项类型,1 字节的选项长度,加可变长度的选项数据构成。

tcp-options

  • kind 1, length 1, info n

  • kind:

    • 0 terminate,用于选项列表的结束。

    • 1 nop,用在两个选项之间,用于选项结构的对齐。

    • 2 MSS(Max Segment Size), 最大报文长度,单位是字节数,最大能表示64k,因为选项数据的长度是2字节。选项长度为4,1+1+2,以太网MTU为1500字节,减去TCP、IP头部40字节,MSS还剩1460字节。只能用于初始化连接的报文中 (SYN = 1)。目标:尽量多的传输数据,避免切片。不大于接收缓冲区。MSS 的长度为 MTU - 40 字节的固定长度。发送端保证 tcp 数据长度不超过 MSS - tcp和ip 选项的长度。rfc 6691

      The MSS value to be sent in an MSS option should be equal to the
      effective MTU minus the fixed IP and TCP headers. By ignoring both
      IP and TCP options when calculating the value for the MSS option, if
      there are any IP or TCP options to be sent in a packet, then the
      sender must decrease the size of the TCP data accordingly.

    • 3 window scale, 窗口扩大因子。0~14, /proc/sys/net/ipv4/tcp_window_scaling only for syn, rfc1323

    • 8 timestamp for rtt calculation 时间戳选项 /proc/sys/net/ipv4/tcp_timestamps
      4,5,6,7 都用于ack选择和回显。

    • 4 sack-permitted only in syn, /proc/sys/net/ipv4/tcp_sack

    • 5 sack content
      11,12,13 用于 tcp 事务

sack (tcp selective acknowledgments)

选择性确认,可通过 /proc/sys/net/ipv4/tcp_sack 配置开关。Linux 2.4以后默认打开,不过需要两边都打开才能生效。
发生丢包以后,发送端要么重传所有从丢包的包以后的所有包,要么发送丢失的包以后再等待一个RTT时间来发现下一个要传的包。
选择确认机制就是为了解决这个问题。接收端可以告诉发送端哪些包收到了,发送端只需要发送未收到的包即可。提高了效率。

TCP 状态转移

tcp-status

主动断开连接的一方,在连接关闭以后进入TIME_WAIT状态, 需要等待2*MSL(Maximum Segment Life) 报文段最大生存时间,rfc1122建议2min。
TIME_WAIT存在的原因:

  • 可靠的终止TCP连接。
  • 保证让迟来的TCP报文段有足够的时间被识别并丢弃。

服务器主动关闭连接后,监听的端口处于TIME_WAIT状态导致它不能立即重启,需要设置socket选项SO_REUSEADDR来强制进程立即使用处于TIME_WAIT状态占用的端口。
正常终止连接需要四步握手。TCP还提供了异常终止连接的办法,即给对方发送一个复位报文段。一旦发送了复位报文段,发送端素有排队等待发送的数据都将被丢弃。
应用程序可以使用socket选项SO_LINGER来发送复位报文段,以异常终止一个连接。

TCP 流控

发送方如果发送太快导致接收端来不及接收就会导致丢包。流量控制就是让发送方发送速率不要太快,要让接收方来得及接收。
TCP使用滑动窗口机制来实现对发送方的流量控制。

  • 滑动窗口 Sliding Window
    接收端向发送端通告自己的接收窗口rwnd大小,表示接收方能够接收的字节数。
    发送端维护一个发送窗口swnd,保存已发送且尚未收到ack的包。发送窗口不大于接收方的rwnd。
    当接收窗口为0的时候,发送端需要设置持续计时器,persistence timer, 触发发送ZWP(Zero Window Probe)探测接收端的接收窗口大小(Zero Window Probe Ack)。
    wireshark中使用tcp.analysis.zero_window过滤包,然后后右键follow TCP stream
    有等待的地方就会可能出现DDos攻击,Zero Window也不例外,一种可能的攻击方式:
    攻击者跟服务器建立连接后发送GET请求,然后将窗口设置为0,服务端只能等待进行ZWP,攻击者并发大量这样的请求,把服务器资源耗尽(sockstress)。
    一般会探测3次,每次30-60秒,如果3次过后还是0,有的TCP就会发RST把链接断开。
    发送方的发送数据可以分为:
    - 已经收到ack确认的数据
    - 已经发送但是未收到ack确认的数据
    - 可以发送的数据(接收方还有空间)
    - 不能发送的数据(接收方没有空间)
    中间两者加起来就是发送窗口。
  • Nagle算法(TCP_NODELAY) RFC896 (Congestion Control in IP/TCP internetworks)
    • if there are unacknowledged in-flight data, new data is buffered
    • if the data to be send is < MSS, it is buffered until MSS
      When to send data (rfc 1122)
      • Immediately if a full MSS size package can be sent (at least MSS data is accumulated)
      • All previously sent data has been acknowledged AND ((PSH flag is set) OR buffered data > 1/2 * send window)
      • PSH flag is set AND the override timeout(0.1 … 1s) expired
        针对小包应用的优化。发送方发包的时机,发送方收到对方上一个包的确认后才发送下一个包。确认快那么发送也快,确认慢则发送慢,
        可以根据网速动态调整速率。网速慢时可以显著减少网络上报文的数量。对于实时性要求高的应用如telnet,ssh,mmoarpg,应该要关闭Nagle算法。
        糊涂窗口综合征 Silly Window Syndrom
        接收窗口已满,而交互式应用一次只从接收缓存中读取一个字节,然后向发送方发送确认,并把窗口设置成1个字节。如此往复,网络效率低。
        解决的办法是避免对小的window size做出响应,知道有足够大的window size再响应:
        接收方:David D Clark’s方案,等到缓存有足够的空间容纳一个MSS,或者接收缓存已有一半空闲空间的时候才向发送端回确认报文。
        发送端:Nagle’s algorithm 不要发送太小的报文,而是把数据报累积成足够大的报文段,或者达到接收方缓存空间一半大小。
        Nagle + Delay ack 导致延迟 内格尔算法,纳格算法。john nagle
        ACK is delayed until return data is available(piggy-backing of ack) or until delayed ack timer expires
        MSS: 最大报文长度 maximum segment size,通过 tcp 协议的选项字段协商。建立连接时协商,根据双方提供的 MSS 最小值决定。实现时往往用 MTU 代替。需要减去 ip 头 20 字节和 tcp 头 20字节,所以是 1460。是 tcp 报文中的数据最大长度。数据字段加上头部才是整个报文的长度。MSS = tcp 报文长度 - tcp 头部长度。
        https://tools.ietf.org/html/rfc6691
        https://www.fanhaobai.com/2017/11/40ms-delay-and-tcp-nodelay.html

超时重传RTO(Retransmission timeout)

RTO计算方法见Computing TCP’s Retransmission Timer,主要维护两个变量:

  • SRTT: smoothed round-trip time
  • RTTVAR: round-trip time variation

计算过程如下:

  • 初始化先设置:

    1
    RTO = 1 or 3 seconds;
  • 报文超时,重传以后设置改报文新的超时时间为:

    1
    RTO = RTO * 2 (back off the timer)
  • 更新srtt和rttvar之后,计算新的rro:

    1
    2
    3
    RTO = SRTT + max(G, K*TRRVAR);
    RTO < 1 then RTO = 1;
    RTO > 60 then RTO = 60;

srtt和rttvar的计算过程如下:

  • 收到第一个ACK以后,设置:
1
2
SRTT = R;
RTTVAR = R / 2;
  • 收到后续的ACK,依次设置:
    1
    2
    RTTVAR = (1 - bata) * RTTVAR + bata * |SRTT - R'|;
    SRTT = (1 - alpha) * SRTT + alpha * R';

其中

1
alpha = 1/8, beta = 1/4

不能用重传的包来计算rtt, 因为无法确定包是哪个时间点发出的,除非包本身有timestamp。

拥塞控制算法 congestion control algorithm

最大化网络上瓶颈链路的带宽。提高网络利用率,降低丢包率,保证公平性。RFC5681

  • 慢启动和拥塞避免 slow start and congestion avoidance
  • 快速重传和快速恢复 fast retransmit / fast recovery

Linux下课通过sysctl查看使用的拥塞算法,

$ sysctl -a | grep congestion_control
net.ipv4.tcp_congestion_control = cubic
net.ipv4.tcp_available_congestion_control = cubic reno
net.ipv4.tcp_allowed_congestion_control = cubic reno

通过 /proc/sys/net/ipv4/tcp_congestion_control 可以控制使用的拥塞算法。

主要的实现算法有:

  • loss based congestion control (bufferbloat problem)
    • reno
    • vegas
    • cubic 看了下centos下都是cubic
  • bbr (Bottleneck Bandwidth and RTT)google最新推出的

if CWND < ssthresh
slow start cwnd每个rtt扩大一倍,指数扩大

  • IW
  • CWND += min(N, SMSS)

if CWND > ssthresh

  • CWND += SMSS*SMSS/CWND cwnd每个rtt加1

早期拥塞避免阶段的cwnd计算公式为:cwnd += (MSS * MSS / cwnd) + MSS/8
rfc2525指出了Extra additive constant in congestion avoidance,带来的问题,
即公式后面多余的部分会导致一个RTT内多个包丢失而无法通过快速重传恢复,从而导致超时重传,从而降低性能。
正确的做法应该是去掉后面的部分:cwnd += (MSS * MSS / cwnd)

如果CWND=ssthresh,慢启动或者拥塞避免随意。

发送端判断拥塞:

  • 传输超时,tcp重传定时器溢出:慢启动和拥塞避免
    • ssthresh = max(FlightSize/2, 2*MSS)
    • CWND <= SMSS
    • 再次进入慢启动

slow-start-and-congestion-avoidance

  • 接收到重复的确认报文:快速重传,快速恢复
    • 接收端收到失序报文立即发送重复确认,而不必等待自己发送数据时捎带确认
    • 发送端收到前两个重复确认时,在允许的情况下应该发送一个之前尚未发送的数据包,(FlightSize <= cwnd+2*MSS), CWND保持不变。
    • 发送端收到3个重复确认报文立即重发对方未收到的报文,而不必等待超时。
      ssthresh=max(FlightSize/2, 2MSS), CWND = ssthresh + 3MSS,加3是因为收到了3个重复ack报文,意味着有三个报文离开了网络。
    • 每收到一个重复确认:CWND = CWND + SMSS,每收到一个重复确认,那么有一个报文离开了网络。
    • 收到新数据确认:CWND = ssthresh
    • 快速重传和快速恢复完成之后,恢复到拥塞避免阶段

fast-retransmit
fast-recovery

reno

BBR

传统的拥塞控制算法存在两个问题:

  • 无法区分丢包类型,错误丢包和拥塞丢包

  • 缓冲膨胀区问题

    • 增加网络延时
    • 缓冲区被填满而丢包
      google新出的拥塞控制算法。
  • 既然不容易区分拥塞丢包和错误丢包,TCP BBR 就干脆不考虑丢包。

  • 既然灌满水管的方式容易造成缓冲区膨胀,TCP BBR 就分别估计带宽和延迟,而不是直接估计水管的容积。

  • 在有一定丢包率的网络链路上充分利用带宽。

  • 降低网络链路上的buffer占用率,从而降低延迟。

随机早期检测RED

random early detection

网络层的策略对拥塞控制算法影响最大的就是路由器的丢弃策略。在简单的情况下,路由器通常按照先进先出的策略处理到来的分组。
当路由器的缓存装不下分组的时候就丢弃到来的分组,这就叫尾部丢弃策略。这样会导致分组丢失,发送方认为网络产生阻塞。
当网络中存在很多TCP连接,若发生路由器的尾部丢弃,可能影响很多条TCP连接,结果就是很多TCP同一时间进入slow start状态。
这种情况称为全局同步。全局同步回事网络的通信量突然下降很多,而在网络恢复正常以后,通信量又突然增大很多。

为避免产生网络中的全局同步现象,路由器采用随机早期检测算法:
路由器的队列维持两个参数,队列最小门限min和最大门限max,每当一个分组到来的时候,RED就计算平均队列长度。然后分情况对待到来的分组:

  • 平均队列长度小于最小门限,将新分组加入队列排队。
  • 平均队列长度在min和max之间,按照概率丢弃分组。
  • 平均队列长度大于max,丢弃分组。
    以概率p随机丢弃分组,让拥塞控制只在个别的TCP连接上执行,因而避免全局性的拥塞控制。

red

RED的关键就是选择三个参数:最小门限,最大门限,丢弃概率以及计算平均队列长度。
平均队列长度采用加权的方法计算,跟计算RTT的策略一样。

red-la

AIMD -> PRR 比例降速 Linux 3.2+

针对TCP的攻击

  • syn flood 攻击方发送大量syn,耗尽服务器资源。
  • zero window probe,攻击方建立连接以后通知自己窗口为0,被攻击放只能隔三差五发个zwp包去问一下。

MTU

以太网一般为1500字节,拨号网为576.
802.11 MTU: 2304
MTU + MAC Header + Encryption Header
WEP: 2304 + 34 + 8 = 2346 bytes
WPA(TKIP): 2304 + 34 + 20 = 2358 bytes
WPA2(CCMP): 2304 + 34 + 16 = 2354 bytes

连接迁移

4元组。ip地址改变(比如wifi切换到移动网络)或者端口改变时(当客户端的nat绑定超时导致服务器看到的端口号改变)连接会断掉。
尽管MPTCP解决了TCP的连接迁移问题,但依然缺少中间设备和OS部署支持。

锐速

一个TCP加速软件,被用来加速VPS。

1
wget -N --no-check-certificate https://raw.githubusercontent.com/91yun/serverspeeder/master/serverspeeder.sh && bash serverspeeder.sh
1
chattr -i /serverspeeder/etc/apx* && /serverspeeder/bin/serverSpeeder.sh uninstall -f
  • /serverspeeder/bin/serverSpeeder.sh restart
  • /serverspeeder/bin/serverSpeeder.sh start
  • /serverspeeder/bin/serverSpeeder.sh stop
  • /serverspeeder/bin/serverSpeeder.sh status

tcp参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
net.core.netdev_max_backlog = 400000
#该参数决定了,网络设备接收数据包的速率比内核处理这些包的速率快时,允许送到队列的数据包的最大数目。
net.core.optmem_max = 10000000
#该参数指定了每个套接字所允许的最大缓冲区的大小
net.core.rmem_default = 10000000
#指定了接收套接字缓冲区大小的缺省值(以字节为单位)。
net.core.rmem_max = 10000000
#指定了接收套接字缓冲区大小的最大值(以字节为单位)。
net.core.somaxconn = 100000
#Linux kernel参数,表示socket监听的backlog(监听队列)上限
net.core.wmem_default = 11059200
#定义默认的发送窗口大小;对于更大的 BDP 来说,这个大小也应该更大。
net.core.wmem_max = 11059200
#定义发送窗口的最大大小;对于更大的 BDP 来说,这个大小也应该更大。
net.ipv4.conf.all.rp_filter = 1
net.ipv4.conf.default.rp_filter = 1
#严谨模式 1 (推荐)
#松散模式 0
net.ipv4.tcp_congestion_control = bic
#默认推荐设置是 htcp
net.ipv4.tcp_window_scaling = 0
#关闭tcp_window_scaling
#启用 RFC 1323 定义的 window scaling;要支持超过 64KB 的窗口,必须启用该值。
net.ipv4.tcp_ecn = 0
#把TCP的直接拥塞通告(tcp_ecn)关掉
net.ipv4.tcp_sack = 1
#关闭tcp_sack
#启用有选择的应答(Selective Acknowledgment),
#这可以通过有选择地应答乱序接收到的报文来提高性能(这样可以让发送者只发送丢失的报文段);
#(对于广域网通信来说)这个选项应该启用,但是这会增加对 CPU 的占用。
net.ipv4.tcp_max_tw_buckets = 10000
#表示系统同时保持TIME_WAIT套接字的最大数量
net.ipv4.tcp_max_syn_backlog = 8192
#表示SYN队列长度,默认1024,改成8192,可以容纳更多等待连接的网络连接数。
net.ipv4.tcp_syncookies = 1
#表示开启SYN Cookies。当出现SYN等待队列溢出时,启用cookies来处理,可防范少量SYN攻击,默认为0,表示关闭;
net.ipv4.tcp_timestamps = 1
#开启TCP时间戳
#以一种比重发超时更精确的方法(请参阅 RFC 1323)来启用对 RTT 的计算;为了实现更好的性能应该启用这个选项。
net.ipv4.tcp_tw_reuse = 1
#表示开启重用。允许将TIME-WAIT sockets重新用于新的TCP连接,默认为0,表示关闭;
net.ipv4.tcp_tw_recycle = 1
#表示开启TCP连接中TIME-WAIT sockets的快速回收,默认为0,表示关闭。
net.ipv4.tcp_fin_timeout = 10
#表示如果套接字由本端要求关闭,这个参数决定了它保持在FIN-WAIT-2状态的时间。
net.ipv4.tcp_keepalive_time = 1800
#表示当keepalive起用的时候,TCP发送keepalive消息的频度。缺省是2小时,改为30分钟。
net.ipv4.tcp_keepalive_probes = 3
#如果对方不予应答,探测包的发送次数
net.ipv4.tcp_keepalive_intvl = 15
#keepalive探测包的发送间隔
net.ipv4.tcp_mem
# tcp所有socket的内存。确定 TCP 栈应该如何反映内存使用;每个值的单位都是内存页(通常是 4KB)。getconf PAGESIZE
#第一个值是内存使用的下限,低于这个值,不会限制内存使用。
#第二个值是内存压力模式开始对缓冲区使用应用压力的上限,直到低于low值。
#第三个值是内存上限。在这个层次上可以将报文丢弃,从而减少对内存的使用。对于较大的 BDP 可以增大这些值(但是要记住,其单位是内存页,而不是字节)。
net.ipv4.tcp_rmem
# 对象是每个socket。与 tcp_wmem 类似,不过它表示的是为自动调优所使用的接收缓冲区的值。单位是字节。min,default,max。default覆盖 core.rmem_default。max 被 core.rmem_max 复写。
net.ipv4.tcp_wmem = 30000000 30000000 30000000
#为自动调优定义每个 socket 使用的内存。
#第一个值是为 socket 的发送缓冲区分配的最少字节数。
#第二个值是默认值(该值会覆盖 wmem_default ),缓冲区在系统负载不重的情况下可以增长到这个值。
#第三个值是发送缓冲区空间的最大字节数(/proc/sys/net/core/wmem_max value overrides this value, and hence this value should always be smaller than that value.

)。
net.ipv4.ip_local_port_range = 1024 65000
#表示用于向外连接的端口范围。缺省情况下很小:32768到61000,改为1024到65000。
net.ipv4.netfilter.ip_conntrack_max=204800
#设置系统对最大跟踪的TCP连接数的限制
net.ipv4.tcp_slow_start_after_idle = 0
#关闭tcp的连接传输的慢启动,即先休止一段时间,再初始化拥塞窗口。
net.ipv4.route.gc_timeout = 100
#路由缓存刷新频率,当一个路由失败后多长时间跳到另一个路由,默认是300。
net.ipv4.tcp_syn_retries = 1
#在内核放弃建立连接之前发送SYN包的数量。
net.ipv4.icmp_echo_ignore_broadcasts = 1
# 避免放大攻击
net.ipv4.icmp_ignore_bogus_error_responses = 1
# 开启恶意icmp错误消息保护
net.inet.udp.checksum=1
#防止不正确的udp包的攻击
net.ipv4.conf.default.accept_source_route = 0
#是否接受含有源路由信息的ip包。参数值为布尔值,1表示接受,0表示不接受。
#在充当网关的linux主机上缺省值为1,在一般的linux主机上缺省值为0。
#从安全性角度出发,建议你关闭该功能。

https://www.frozentux.net/ipsysctl-tutorial/chunkyhtml/tcpvariables.html

三步握手

https://102.alibaba.com/detail?id=140

cat /proc/sys/net/ipv4/tcp_max_syn_backlog # 默认 128
ss -lnt # Send-Q 表示backlog值。Recv-Q表示当前使用的

半连接队列 max(64, /proc/sys/net/ipv4/tcp_max_syn_backlog) syns queue, syn floods 攻击
全连接队列 min(backlog, /proc/sys/net/core/somaxconn) accept queue,如果全队列满了,根据tcp_abort_on_overflow指示执行。

netstat -nt # w/o servers,Recv-Q就是指收到的数据还在缓存中,还没被进程读取,这个值就是还没被进程读取的 bytes;而 Send 则是发送队列中没有被远程主机确认的 bytes 数
netstat -nlt # only servers

比如如下netstat -t 看到的Recv-Q有大量数据堆积,那么一般是CPU处理不过来导致的。
netstat -s
overflowed: 全连接队列溢出
ignored

断开 tcp 连接

netstat -ntp | grep 12002
sudo tcpkill -i enp0s3 host 120.78.163.171 and port 12002

参考文献

the work flow:

  • use ssh-keygen to generate a pair of private and public keys.
  • use ssh-copy-id or scp to copy files across local and remote server
  • add public key to authorized_keys
  • use private key to login

ssh-key-auth

server side

1
2
3
4
5
6
7
8
cd ~
mkdir .ssh
chmod 700 .ssh

cat id_rsa.pub >> authorized_keys

chmod 600 authorized_keys
chmod 600 id_rsa

optional:

1
chmod 622 id_rsa.pub

client side

add hostname to hosts file, so we don’t have to remember the ip address every time we try to login the server.
although it’s not required to do so, we will introduce another way to specify hostname and login user for servers later.

1
2
3
cd ~
mkdir .ssh
scp user@remotehost:/home/user/.ssh/id_rsa ./ssh

note: copy paste may not working, here we use scp to get the private key, and ssh-copy-id is recommended.

try:

1
ssh user@remotehost

if the key pair is generated on the client side, then:

1
2
3
4
centos6: ssh-copy-id -i ~/.ssh/id_rsa.pub "zsy@10.1.0.3 -p 22222"
centos7: ssh-copy-id -i ~/.ssh/id_rsa.pub zsy@10.1.0.3 -p 22222
or
cat ~/.ssh/id_rsa.pub | ssh -p 22 zsy@10.1.0.3 "umask 077;mkdir -p ~/.ssh;cat - >> ~/.ssh/authorized_keys"

config

add these lines to file ~/.ssh/config, as before, chmod 600 ~/.ssh/config.

1
2
3
4
5
6
7
8
9
10
host shortname1
hostname server_ip_1
user user1
port 22
##IdentityFile somefile

host shortname2
hostname server_ip_2
user user2
port 22

then you can type ssh shortname1 or ssh shortname2 to login.

more about config

some enties:

  • Host: Defines for which host or hosts the configuration section applies. The section ends with a new Host section or the end of the file. A single * as a pattern can be used to provide global defaults for all hosts.
  • HostName : Specifies the real host name to log into. Numeric IP addresses are also permitted.
  • User : Defines the username for the SSH connection.
  • IdentityFile : Specifies a file from which the user’s DSA, ECDSA or DSA authentication identity is read. The default is ~/.ssh/identity for protocol version 1, and ~/.ssh/id_dsa, ~/.ssh/id_ecdsa and ~/.ssh/id_rsa for protocol version 2.
  • ProxyCommand : Specifies the command to use to connect to the server. The command string extends to the end of the line, and is executed with the user’s shell. In the command string, any occurrence of %h will be substituted by the host name to connect, %p by the port, and %r by the remote user name. The command can be basically anything, and should read from its standard input and write to its standard output. This directive is useful in conjunction with nc(1) and its proxy support. For example, the following directive would connect via an HTTP proxy at 192.1.0.253: ProxyCommand /usr/bin/nc -X connect -x 192.1.0.253:3128 %h %p
  • LocalForward : Specifies that a TCP port on the local machine be forwarded over the secure channel to the specified host and port from the remote machine. The first argument must be [bind_address:]port and the second argument must be host:hostport.
  • Port : Specifies the port number to connect on the remote host.
  • Protocol : Specifies the protocol versions ssh(1) should support in order of preference. The possible values are 1 and 2.
  • ServerAliveInterval : Sets a timeout interval in seconds after which if no data has been received from the server, ssh(1) will send a message through the encrypted channel to request a response from the server.
  • ServerAliveCountMax : Sets the number of server alive messages which may be sent without ssh(1) receiving any messages back from the server. If this threshold is reached while server alive messages are being sent, ssh will disconnect from the server, terminating the session.

a more detailed example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
### default for all ##
Host *
ForwardAgent no
ForwardX11 no
ForwardX11Trusted yes
User nixcraft
Port 22
Protocol 2
ServerAliveInterval 60
ServerAliveCountMax 30

## override as per host ##
Host server1
HostName server1.cyberciti.biz
User nixcraft
Port 4242
IdentityFile /nfs/shared/users/nixcraft/keys/server1/id_rsa

## Home nas server ##
Host nas01
HostName 192.168.1.100
User root
IdentityFile ~/.ssh/nas01.key

## Login AWS Cloud ##
Host aws.apache
HostName 1.2.3.4
User wwwdata
IdentityFile ~/.ssh/aws.apache.key

## Login to internal lan server at 192.168.0.251 via our public uk office ssh based gateway using ##
## $ ssh uk.gw.lan ##
Host uk.gw.lan uk.lan
HostName 192.168.0.251
User nixcraft
ProxyCommand ssh nixcraft@gateway.uk.cyberciti.biz nc %h %p 2> /dev/null

## Our Us Proxy Server ##
## Forward all local port 3128 traffic to port 3128 on the remote vps1.cyberciti.biz server ##
## $ ssh -f -N proxyus ##
Host proxyus
HostName vps1.cyberciti.biz
User breakfree
IdentityFile ~/.ssh/vps1.cyberciti.biz.key
LocalForward 3128 127.0.0.1:3128

debug

when unexpected happened, use option -vvv to output debug message.

ssh-keygen -y -f id_rsa 读取私钥,生成公钥

安全

防暴力破解密码:denyhosts.py

reference

布雷森汉姆光栅直线算法,用于计算光栅图中两点间的直线经过的点,因为其简单高效,被广泛使用。
在网格图中,怪物从A点移动到B点,如果允许怪物走对角线,那么最简单的移动方式就是从x,y方向上同时向目标点靠近,
直到其中方向距离为0,接下来从另一个方向上向目标点移动,那么表现上就是先走对角然后走直线。BTW,A*算法中的启发函数h,
在允许对角线移动的网格图中就可以用这种方法计算,称为octile算法,相比曼哈顿距离更精确,比欧氏距离更简单。

1
h = max(dx, dy) + (sqrt2 - 1) * min(dx, dy)

本文简单介绍了该算法的推导过程,并用Erlang实现了该算法。

推导过程

给定起始点(X0,Y0), (X1, Y1),先考虑特殊情况,斜率范围从0到1,X0<X1, Y0<Y1。
那么x每前进1步,y前进距离为m,其中m=dy/dx, 我们用e记录y前进的累积值,x每前进1步,e=e+m。
当e>0.5时,应当使y也前进1步,并将1从e中扣去:e=e-1。

概括起来就是一个判断条件,两种更新策略:

1
2
3
4
5
6
7
// e = 0;
e = e + m;
if (e > 0.5) {
e = e - 1;
} else {
e = e;
}

优化

为了去掉除法和浮点计算,我们令D = 2*dx*e - dx, 带入上面的三个公式,上面的判断条件和更新策略变成这样:

1
2
3
4
5
6
7
// D = -Dx;
D = D + 2*dy;
if(D > 0) {
D = D - 2*dx
} else {
D = D
}

初始e=0, 所以D=-dx,然后每次D增加2dy,当D>0时,减去2dx。

实现

实现的时候需要把特殊情况一般化。下面的代码给出了一个一般化的计算过程。首先根据轴的长短选择主轴,
主轴就是每次前进一个单位的轴。然后根据起点和终点的大小决定前进的方向,终点大于起点,则+1,否则-1.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
plot({X0, Y0}, {X1, Y1}) ->
Dx = abs(X1 - X0),
Dy = abs(Y1 - Y0),
StepX = if X1 > X0 -> 1; true -> -1 end,
StepY = if Y1 > Y0 -> 1; true -> -1 end,
if
Dx > Dy -> plot_x({X0, Y0}, {X1, Y1}, 2*Dx, 2*Dy, StepX, StepY, -Dx, [{X0,Y0}]);
true -> plot_y({X0, Y0}, {X1, Y1}, 2*Dx, 2*Dy, StepX, StepY, -Dy, [{X0,Y0}])
end.

plot_x({X1, _}, {X1, _}, _DeltaX, _DeltaY, _, _, _D, Path) ->
lists:reverse(Path);
plot_x({X0, Y0}, {X1, Y1}, DeltaX, DeltaY, StepX, StepY, D0, Path) ->
Nx = X0 + StepX,
D1 = D0 + DeltaY,
{Ny, Nd} = if
D1 > 0 -> {Y0 + StepY, D1 - DeltaX};
true -> {Y0, D1}
end,
plot_x({Nx, Ny}, {X1, Y1}, DeltaX, DeltaY, StepX, StepY, Nd, [{Nx, Ny}|Path]).


plot_y({_, Y1}, {_, Y1}, _DeltaX, _DeltaY, _, _, _D, Path) ->
lists:reverse(Path);
plot_y({X0, Y0}, {X1, Y1}, DeltaX, DeltaY, StepX, StepY, D0, Path) ->
Ny = Y0 + StepY,
D1 = D0 + DeltaX,
{Nx, Nd} = if
D1 > 0 -> {X0 + StepX, D1 - DeltaY};
true -> {X0, D1}
end,
plot_y({Nx, Ny}, {X1, Y1}, DeltaX, DeltaY, StepX, StepY, Nd, [{Nx, Ny}|Path]).

参考文档

特殊进程是通过proc_lib来启动的进程,并实现了system消息处理进程。
包括但不限于常用的gen_server, gen_statem, gen_event等标准Behavior。

为啥自己实现

虽然Behavior很好很强大,可以满足绝大部分的需求,但是它们也存在缺点,那就是过于通用。为了达到通用的目的,
标准Behavior包含了大量处理边界条件的逻辑,一般情况下不会成为问题,但是当你的进程成为瓶颈的时候,可能需要考虑自己实现一个。
比方说:

  • 有一个supervisor进程监控大量work进程,还有另一个gen_server进程来控制work的数量,那么这两个进程有一些工作是重复的。
  • 有一个gen_server只会被local进程使用到,但是他包含了大量call,那么通用的call机制可能成为瓶颈。

为啥不用普通进程

除了一些需要异步进行的临时任务,尽量不要使用普通进程。特殊进程可以为你提供:

  • 告诉你哪个进程是它的父进程
  • 父进程退出时优雅的退出
  • 异常退出时生成log
  • 能够查看或者替换进程状态

这些好处值得多花几分钟来实现。

如何实现

特殊进程必须通过proc_lib和sys来实现。

proc_lib

通过proc_lib启动的进程总是会把两个信息写入进程字典:

  • $ancestors
  • $initial_call

这两个信息被各种调试工具用到。如果开启了SASL,那么proc_lib启动的进程崩溃的时候会生成崩溃日志。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
$ erl -boot start_sasl
...
1> proc_lib:spawn_link(fun() -> 1 = 2 end).
=CRASH REPORT==== 8-Sep-2017::17:05:42 ===
crasher:
initial call: erl_eval:-expr/5-fun-1-/0
pid: <0.43.0>
registered_name: []
exception error: no match of right hand side value 2
ancestors: [<0.41.0>]
messages: []
links: [<0.41.0>]
dictionary: []
trap_exit: false
status: running
heap_size: 233
stack_size: 27
reductions: 97
neighbours:
neighbour: [{pid,<0.41.0>},
{registered_name,[]},
{initial_call,{erlang,apply,2}},
{current_function,{io,wait_io_mon_reply,2}},
{ancestors,[]},
{messages,[]},
{links,[<0.26.0>,<0.43.0>]},
{dictionary,[]},
{trap_exit,false},
{status,waiting},
{heap_size,610},
{stack_size,29},
{reductions,1161}]
** exception exit: {badmatch,2}

从上面log可以看到父进程以及初始函数都出现在崩溃日志中。
最后,用pro_lib还提供一个可选的特征,使用确认函数来同步启动进程。

sys

通过proc_lib启动的进程必须实现sys协议。
sys能够为你的进程带来更多调试以及跟踪机制。

  • sys:get_status/1
  • sys:get_state/1
  • sys:replace_state/2

除此之外,实现sys协议的进程还能够暂停以及恢复。

异步启动

  • 通过proc_lib:spawn_link/1..4或者proc_lib:spawn_opt/2..5启动进程。
  • 写一个receive的循环。
  • 父进程退出时退出,这意味着如果trap exit消息,需要处理父进程退出消息。
  • 处理系统消息。
  • 实现system_continue/3, system_terminate/4, system_code_change/4回调函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
start_link() ->
proc_lib:spawn_link(?MODULE, init, [self()]).
init(Parent) ->
loop(Parent).
loop(Parent) ->
receive
%% Only required when trap_exit is enabled.
{’EXIT’, Parent, Reason} ->
terminate(State, Reason, NbChildren);
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {state, Parent});
Msg ->
error_logger:error_msg("Unexpected message ~p~n", [Msg]),
loop(Parent)
end.
system_continue(_, _, {state, Parent}) ->
loop(Parent).
system_terminate(Reason, _, _, _) ->
exit(Reason).
system_code_change(Misc, _, _, _) ->
{ok, Misc}.

同步启动

  • 使用proc_lib:start_link/1..4启动进程
  • 在进入循环之前调用proc_lib:init_ack/1
  • 其他跟异步启动类似
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
start_link() ->
proc_lib:start_link(?MODULE, init, [self()]).
init(Parent) ->
ok = proc_lib:init_ack(Parent, {ok, self()}),
loop(Parent).
loop(Parent) ->
receive
%% Only required when trap_exit is enabled.
{’EXIT’, Parent, Reason} ->
terminate(State, Reason, NbChildren);
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
{state, Parent});
Msg ->
error_logger:error_msg("Unexpected message ~p~n", [Msg]),
loop(Parent)
end.
system_continue(_, _, {state, Parent}) ->
loop(Parent).
system_terminate(Reason, _, _, _) ->
exit(Reason).
system_code_change(Misc, _, _, _) ->
{ok, Misc}.

Call

OTP的call实现考虑了很多特殊情况,整个实现非常复杂。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
do_call(Process, Label, Request, Timeout) ->
try erlang:monitor(process, Process) of
Mref ->
%% If the monitor/2 call failed to set up a connection to a
%% remote node, we don't want the '!' operator to attempt
%% to set up the connection again. (If the monitor/2 call
%% failed due to an expired timeout, '!' too would probably
%% have to wait for the timeout to expire.) Therefore,
%% use erlang:send/3 with the 'noconnect' option so that it
%% will fail immediately if there is no connection to the
%% remote node.

catch erlang:send(Process, {Label, {self(), Mref}, Request},
[noconnect]),
receive
{Mref, Reply} ->
erlang:demonitor(Mref, [flush]),
{ok, Reply};
{'DOWN', Mref, _, _, noconnection} ->
Node = get_node(Process),
exit({nodedown, Node});
{'DOWN', Mref, _, _, Reason} ->
exit(Reason)
after Timeout ->
erlang:demonitor(Mref, [flush]),
exit(timeout)
end
catch
error:_ ->
%% Node (C/Java?) is not supporting the monitor.
%% The other possible case -- this node is not distributed
%% -- should have been handled earlier.
%% Do the best possible with monitor_node/2.
%% This code may hang indefinitely if the Process
%% does not exist. It is only used for featureweak remote nodes.
Node = get_node(Process),
monitor_node(Node, true),
receive
{nodedown, Node} ->
monitor_node(Node, false),
exit({nodedown, Node})
after 0 ->
Tag = make_ref(),
Process ! {Label, {self(), Tag}, Request},
wait_resp(Node, Tag, Timeout)
end
end.

我们自己实现的时候可以去掉那些特殊情况,针对我们自己的情况优化冗余的检查和逻辑。
比如,我们没有注册名字,就不需要解析名字;如果没有用到C或者Java节点,可以去掉处理这部分的代码。

实例

参考ranch的ranch_conns_sup模块。很经典的例子。

参考文档

Ranch是一个TCP连接管理开源库,从著名的cowboy开源库中剥离出来的。
本文对ranch的重要模块一一进行解读,来品味一下这个优雅小巧而又功能强大的TCP管理库。
本文基于ranch的1.0版本。

ranch

提供对外接口。最重要的接口是start_listener和stop_listener。需要指定一个唯一的名字Ref,
socket处理模块以及参数,协议处理回调模块及参数。这个函数会将整个监控树启动起来。
Ref是负责处理这个端口连接的监控树的名字,有了这个名字就可以对这颗监控树执行一些操作,
比方说停止监听端口、设置连接上限等。

1
2
start_listener(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) -> {ok, Pid} | {error, badarg}
stop_listener(Ref) -> ok | {error, not_found}

ranch_sup

根监控树。启动ranch_server进程以及ranch_listener_sup监控树。
其中ranch_server是启动application的时候就会启动,而ranch_listener_sup则是开始监听的时候动态启动的。
启动ranch_listener_sup需要提供各种参数,而且支持启动多个实例。

ranch_server

是个打杂的进程,主要提供一些获取、修改参数的接口。主要数据都保存在ranch_server这个ets表中。
进程重启的时候也会ets表中恢复出进程状态。所以这个ets表的宿主进程并不是这个进程本身,而是ranch_sup。
即使这个进程崩溃,也不会丢失ets表的数据。

  • {max_conns, Ref}: MaxConns
  • {opts, Ref}: Opts
  • {conns_sup, Ref}: Pid
  • {addr, Ref}: Addr

ranch_listener_sup

监控进程,负责启动ranch_conns_sup以及ranch_acceptors_sup两个监控树。
因为没有注册名字,ranch_listener_sup监控树可以启动多个实例,一般来说,每一个监听的端口对应一个监控树。
这些监控树都挂在ranch_sup下面。

ranch_acceptors_sup

也是监控树,开始监听端口,并启动NbAcceptors个数的accept进程来接收socket接请求。
这个进程拥有监听socket的所有权。

ranch_transport

定义操作socket的接口。如listen,accept,recv,send,shutdown,close等。
ranch实现了两个数据传输协议,tcp和ssl。分别由ranch_tcp和ranch_ssl实现。
可以这么理解,在面向对象世界里,ranch_transport对应接口,ranch_tcp和ranch_ssl对应接口的具体实现。

ranch_tcp

实现ranch_transport定义的接口,主要是封装了gen_tcp的一些基本操作。
监听socket的属性分类两类,一类是可以设置的,一类是默认的。

  • 可设置的包括:backlog,ip,linger,nodelay,port,raw,send_timeout,send_timeout_close
  • 默认的包括:binary,{active,false},{packet,raw},{reuseaddr,true},{nodelay,true}

可以看出,binary,{active,false},{packet,raw},{reuseaddr,true}这几个是写死了无法通过传入参数来改变的。

ranch_ssl

实现ranch_transport定义的接口,主要是封装了ssl的一些基本操作。

ranch_acceptor

ranch_accept接收到连接请求以后,将socket控制权转交ranch_conns_sup,
同时通知ranch_conns_sup启动连接处理进程,处理此socket的数据收发。
ranch_conns_sup启动连接进程以后,accept进程继续accept新的连接。如果由于某些原因(比如连接数达到上限)
连接进程没有启动起来,那么accept进程就会阻塞在这里,无法继续接收连接,直到连接进程启动起来为止。
如果系统文件句柄消耗完毕,accept进程会等待100ms才继续接收连接请求。
如果监听socket关闭了,那么accpet进程会因为匹配不到消息崩溃退出。
因为监听socket使用了active,false选项,在socket的宿主进程主动recv之前,宿主进程不会收到来自socket的数据。
所以accept进程以及随后的ranch_conns_sup进程都不会收到socket的数据,从而保证连接进程接收到的数据是完整的。
不过为了防止意外情况(比如socket用了active其他选项)导致accpet进程收到了其他消息,
accpet进程会在进入下一次accpet前清空一下消息队列。

ranch_conns_sup

连接进程的管理进程,负责连接进程的启动。作为ranch_listener_sup的子进程之一,这个进程启动时会想ranch_server注册自己。
这样后续启动的ranch_acceptors_sup就能拿到连接管理进程,可以作为参数传递给ranch_acceptor进程,以便后者收到连接请求是启动连接进程。
这是一个特殊进程,也就是用proc_lib来启动的进程,并且会处理system消息,这个进程同时具有gen_server和supervisor的功能。
在作者的书<>中第一章就讲到了特殊进程,以及为什么我们要用特殊进程。这个进程就是一个非常好的例子。

这个进程收到socket发送的启动连接进程消息以后,启动连接进程,将socket控制权转交连接进程,并通知连接进程开始进行数据收发。

此进程会将连接进程Pid存入进程字典,并维护连接进程的数量。
如果数量达到上限,则暂时不回复accept进程,而是将accept进程存入Sleeper列表中。这将导致accept进程阻塞在receive语句中,
无法继续accept连接。如果还没有到上限,则立即回复accept进程,accept进程收到回复就可以继续accpet连接了。
所以达到配置的连接数量上限以后,实际上我们的还能创建NbAcceptors个连接进程,然后所有accept进程都阻塞在receive过程,
不会继续接收socket连接。

如果启动连接进程失败了,那么这个进程需要回复accpet进程,防止accept阻塞在receive语句,同时关闭socket连接,并打印错误日志。

这个进程trap_exit标记值为true,连接进程通过start_link创建,所以当连接进程正常或者异常退出时,
这个进程能收到{'EXIT',Pid,Reason}消息,方便我们维护连接子进程。
这时候如果还有阻塞的accept进程即Sleeper,则会取出一个来回复,然后这个accpet进程就能继续接收连接请求了。

ranch_conns_sup进程是作为ranch_listener_sup监控下的进程启动的,而且它启动的type是supervisor。
所以他需要向它的父进程提供supervisor的功能,主要是以下几点:

  • which_children
  • count_children
  • shutdown子进程。包括强制退出,以及等待超时退出。父进程退出时,或者系统退出时,等待通知子进程退出,超时后强制kill子进程。

不过它并不需要向子进程提供supervisor功能,就算子进程异常退出,它也不会尝试重启子进程,因为没有这个必要。
关闭子进程时,先monitor,再unlink,再调用exit发退出消息。先unlink是防止子进程退出时又一次收到EXIT消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
terminate(#state{shutdown=brutal_kill}, Reason, _) ->
Pids = get_keys(true),
_ = [begin
unlink(P),
exit(P, kill)
end || P <- Pids],
exit(Reason);
terminate(#state{shutdown=Shutdown}, Reason, NbChildren) ->
shutdown_children(),
_ = if
Shutdown =:= infinity ->
ok;
true ->
erlang:send_after(Shutdown, self(), kill)
end,
wait_children(NbChildren),
exit(Reason).

shutdown_children() ->
Pids = get_keys(true),
_ = [begin
monitor(process, P),
unlink(P),
exit(P, shutdown)
end || P <- Pids],
ok.

wait_children(0) ->
ok;
wait_children(NbChildren) ->
receive
{'DOWN', _, process, Pid, _} ->
_ = erase(Pid),
wait_children(NbChildren - 1);
kill ->
Pids = get_keys(true),
_ = [exit(P, kill) || P <- Pids],
ok
end.

值得一提的还有子进程的存储方式,这里使用了进程字典的get_keys接口来获取所有子进程。如果shutdown指定了超时时间,
那么就会通过一个定时器来触发kill消息,将所有尚未退出的子进程调用exit(Pid, kill)强行退出。

连接进程(ranch_protocol的具体实现)

ranch_protocol定义了启动连接进程的接口。只有一个接口:

1
start_link(Ref, Socket, Transport, ProtocolOptions) -> {ok, Pid}

实现这个接口的模块用来处理建立连接以后的数据收发,start_link函数创建一个进程用来处理协议数据。
注意这里只能是start_link,意味着父进程(即ranch_conns_sup进程)和创建的进程之间有link的关系。
这个要求跟supervisor要求子进程创建函数必须是start_link是一致的。
因为supervisor进程就是依靠link关系来管理子进程的。

父进程ranch_conns_sup借助link关系来监控连接进程的退出状态,借此维护连接的数量,然而并不会重启子进程。

Ranch并没有提供这个模块的默认实现。我们利用ranch实现自己功能的时候,首先就需要实现这个接口,来处理数据收发的逻辑。
然后将其作为参数传入启动监听过程的函数中,即start_listener中的Protocol参数。

这里就是我们要实现的逻辑所在了。首先要实现ranch_protocol的start_link/4接口,以便ranch_conns_sup调用。
通常这里都是spwan_link一个进程出来,返回。在进程进入循环之前调用ranch:accept_ack(Ref)等待ranch_conns_sup通知我们,
socket已经转移控制权,然后就可以进入循环进行消息接收和处理了。

如果要用gen_server来实现连接进程,我们就要注意了。因为我们需要先返回{ok, Pid}到ranch_conns_sup进程,
然后等待ranch_conns_sup进程给我们发消息,通知我们shoot,才能进入消息循环。
(这是因为连接进程的消息循环一般会将socket的active由false改成once,如果ranch_conns_sup还没有将控制权转交过来就调用了set_opts,
那么ranch_conns_sup就可能会收到来自socket的消息。所以我们需要确保已经转移了控制权才能够进入消息循环。参考
如果我们直接用gen_server:start_link来启动这个连接进程,
就会出现了一个困境:首先,ranch_conns_sup需要start_link返回Pid以后才能通知连接进程就绪,而连接进程需要接收到通知才能从init中返回。
这样就出现死锁了。解决的方法主要有两种:

  • 一种是在主循环来接收shoot消息,比如init返回一个{ok, Pid, 0},那么进入循环后第一条处理的就是handle_info(timeout),
    我们可以在这里接收shoot消息,设置socket选项等。

  • 要么使用其他proc_lib方法来启动进程,比如proc_lib:start_link/3,然后在初始化的时候直接通过
    proc_lib:init_ack({ok, self()})来向父进程返回Pid,然后等待父进程的shoot消息。

个人更倾向于第二种方式。因为shoot消息是一次性的,更适合在初始化的时候搞定,而不是放在主循环里面。

1
2
3
4
5
6
7
8
9
10
start_link(Ref, Socket, Transport, Opts) ->
proc_lib:start_link(?MODULE, init, [Ref, Socket, Transport, Opts]).

init(Ref, Socket, Transport, _Opts = []) ->
ok = proc_lib:init_ack({ok, self()}),
ok = ranch:accept_ack(Ref),
ok = Transport:setopts(Socket, [{active, once}]),
gen_server:enter_loop(?MODULE, [],
#state{socket=Socket, transport=Transport},
?TIMEOUT).

源码