Just a journey

UDP hole punching

• P2P

UDP Hole Punching

Hole Punching技术是一种借助于公网上的服务器完成NAT穿越的
解决方案。在基于UDP的Hole Punching场景中,终端A,B分别与
公网上的服务器S建立UDP连接。当一个终端向服务器S注册时,
服务器S记录下该终端的两对IP地址和端口信息,为描述方便,
我们将一对IP地址和端口信息的组合称之为一个endpoint。一个
endpoint是终端发起与服务器S通信的IP地址和端口;另一个
endpoint是服务器S观察到的该终端实际与自己通信所用的IP
地址和端口。我们可以把前一个endpoint看作是终端的私网IP
地址和私网端口;把后一个endpoint看作是终端的私网IP地址
和端口经过NAT转换后的公网IP地址和公网端口。服务器S可以
从终端的注册报文中得到该终端的私网endpoint相关信息,
可以通过对注册报文的源IP地址和UDP源端口字段获得该终端
的公网endpoint。如果终端不是位于NAT设备后面,那么采用
上述方法得到的两个endpoint应该是完全相同的。
也有一些的NAT设备会扫描UDP报文的数据字段,寻找4字节的
位域,将看上去很像IP地址的位域,改为与IP头一样的地址。
为了避免这种行为的NAT设备对UDP报文数据的修改,应用程序
可以采用直接对IP地址的值进行加密的方式骗过NAT设备的检查。

UDP Hole Punching的过程

当终端A希望与终端B建立连接时,Hole punching(也就是P2P
的连接阶段)过程如下所示:

  1. 终端A最初并不知道如何向B发起连接,于是终端A向服务器
    S发送报文,请求服务器S帮助建立与终端B的UDP连接。

  2. 服务器S将含有终端B的公网及私网的endpoint发给终端A,
    同时,服务器S将含有终端A的公网及私网的endpoint的用于
    请求连接的报文也发给B。一旦这些报文都顺利达到,终端A
    与终端B都知道了对方的公网和私网的endpoint。

  3. 当终端A收到服务器S返回的包含终端B的公网和私网的
    endpoint的报文后,终端A开始分别向这些终端B的endpoint
    发送UDP报文,并且终端A会自动“锁定”第一个给出响应的
    终端B的endpoint。同理,当终端B收到服务器S发送的包含
    终端A的公网和私网的endpoint的报文后,也会开始向终端
    A的公网和私网的endpoint发送UDP报文,并且自动锁定第
    一个得到终端A的回应的endpoint。由于终端A与B的互相向
    对方发送UDP报文的操作是异步的,所以终端A与B发送报文
    的时间先后没有严格的时序要求。

终端位于同一个NAT设备后面

两个终端都位于同一个NAT设备后面,位于同一个内网中,是
最“简单”的一种场景。
在UDP punching之前的情况如下:
如上图,终端A的内网endpoint是10.0.0.1:1000,经过NAT设备
映射之后的endpoint是100.0.0.1:2000。终端B的内网endpoint
是11.0.0.1:1000,经过NAT设备映射之后的endpoint是
100.0.0.1:2010。服务器S的endpoint是111.0.0.1:9000。

如上图A1,终端A向服务器S请求终端B的公网和私网endpoint。
如上图A3,终端A向终端B的公网endpoint发送报文,当NAT支持
回环转换(hairpin translation)的情况下,同一个NAT环境下
终端A和终端B才可以连通。但是终端A与B往对端私网endpoint
发送的UDP报文是一定可以到达的,无论如何,私网报文采用
最短转发路径,要比经过NAT转换来的快。终端A与B有很大的
可能性采用私网的endpoint进行常规的通信。

Spark实现TopN

• Spark

概述

对超过单机内存容量的(K, V)对进行TopN排序,在键值K是否重复的情况
需要分别处理,因为K重复,我们需要先去重。

Spark Implementation: Unique Keys

1. 建立和Spark的连接

/*create a connection to the Spark master*/
JavaSparkContext ctx = new JavaSparkContext();

2. 从HDFS读取数据并建立RDD

JavaRDD<String> lines = ctx.textFile(inputPath, 1);

3. 从初始RDD转化为pair RDD

 JavaPairRDD<String,Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
         @Override
         public Tuple2<String,Integer> call(String s) {
            String[] tokens = s.split(","); // cat7,234
            return new Tuple2<String,Integer>(tokens[0], Integer.parseInt(tokens[1]));
         }
      });

使用mapToPair函数用于创建pair RDD.

4. 在每个partition中建立本地TopN

      /*
      * <U> JavaRDD<U> mapPartitions(FlatMapFunction<java.util.Iterator<T>,U> f)
      * Return a new RDD by applying a function to each partition of this RDD.
      *
      * mapPartitions(func)	Similar to map, but runs separately on each partition   
      * (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when   
      * running on an RDD of type T.
      *
      * public interface FlatMapFunction<T,R> extends java.io.Serializable
      * A function that returns zero or more output records from each input record.
      *
      * java.lang.Iterable<R> call(T t) throws java.lang.Exception
      * */
      // STEP-5: create a local top-10
      JavaRDD<SortedMap<Integer, String>> partitions = pairs.mapPartitions(
         new FlatMapFunction<Iterator<Tuple2<String,Integer>>, SortedMap<Integer, String>>() {
             public Iterable<SortedMap<Integer, String>> call(Iterator<Tuple2<String,Integer>> iter) {
             SortedMap<Integer, String> top10 = new TreeMap<Integer, String>();
             while (iter.hasNext()) {
                Tuple2<String,Integer> tuple = iter.next();
                top10.put(tuple._2, tuple._1);
                // keep only top N 
                if (top10.size() > 10) {
                   top10.remove(top10.firstKey());
                }  
             }
             //return Collections.singletonList(top10).iterator();
             return Collections.singletonList(top10);
         }
      });

根据《Learning Spark》的4.5节Data Patitioning的论述:

Spark’s partitioning is available on all RDDs of key/value pairs, and causes the system to group elements based on a function of each key. Although Spark does not give explicit control of which worker node each key goes to (partly because the system is designed to work even if specific nodes fail), it lets the program ensure that a set of keys will appear together on some node.

Spark会自动对数据分区,而在分区的基础上mapPartitions进行工作。

libevent echo client

libevent echo client

## 目标
使用libevent使用echo的client. 编写client的时候,我们需要关心connect操作
是否成功,以及如何处理stdin。

数据结构

对于libevent来说,每个线程有且只有一个event_base,对应一个struct event_base结构体(以及附于其上的事件管理器),用来调度托管给它的一系列event, 当一个事件发生后,event_base会在合适的时间(不一定是立即)去调用绑定在这个事件上的函数(传入一些预定义的参数,以及在绑定时指定的一个参数),直到这个函数执行完,再返回调度其他事件。因此我们需要struct event_bash用于事件管理,使用struct event注册事件。

## 关键代码

套接字中select函数的理解

套接字中select函数的理解

问题

  1. 为什么select函数的时间复杂度是O(n)?
  2. select函数如何判断文件描述符是ready状态?

select函数

select函数允许进程指示内核等待多个事件中的任何一个发生,并只在有一个或 多个事件发生或经历一段指定的时间后才唤醒它。其定义如下:   

#include <sys/time.h> /* For portability */
#include <sys/select.h>

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

	Returns number of ready file descriptors, 0 on timeout, or –1 on error

nfds参数指定待测试的描述符个数,它的值是待测试的最大描述符加1,描 述符0,1,2,…..一直到nfds-1均被测试。参数readfds、writefds和exceptfds
用于监控读、写和异常的文件描述符集合。最后的参数timeout用于超时时间。   

select函数的内核实现   

标准库中的select函数是由系统调用sys_select实现。其调用关系如下:

sys_select==>   
	core_sys_select==>   
		do_select

数据结构

#undef __NFDBITS
#define __NFDBITS   (8 * sizeof(unsigned long))

/* __FD_SETSIZE 代表select可以监控的fd最大数目。*/
#undef __FD_SETSIZE
#define __FD_SETSIZE    1024	

#undef __FDSET_LONGS
#define __FDSET_LONGS   (__FD_SETSIZE/__NFDBITS)

typedef struct {
    unsigned long fds_bits [__FDSET_LONGS];
} __kernel_fd_set;

typedef __kernel_fd_set     fd_set;

typedef struct {
    unsigned long *in, *out, *ex;
    unsigned long *res_in, *res_out, *res_ex;
} fd_set_bits;

/*
 * How many longwords for "nr" bits? nr需要的字节
 */
#define FDS_BITPERLONG  (8*sizeof(long))
#define FDS_LONGS(nr)   (((nr)+FDS_BITPERLONG-1)/FDS_BITPERLONG)
#define FDS_BYTES(nr)   (FDS_LONGS(nr)*sizeof(long))

主要代码流程   

在函数do_select中,将执行一个for循环。

    		for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) {
            	int fput_needed;
                if (i >= n)
                    break;
                if (!(bit & all_bits))
                    continue;
                /* TODO: 这个file 如何和tcp产生联想的? */
                /*
                 struct socket_alloc {
                    struct socket socket;
                    struct inode vfs_inode;
                 };
            
                 TODO: file 的f_op 和 inode 的 i_fop 有什么关系。
                 */
                file = fget_light(i, &fput_needed);
                if (file) {
                    f_op = file->f_op;
                    mask = DEFAULT_POLLMASK;
                    /*  
                     这里应该是tcp_poll/udp_poll 
                     udp的接收应该会调用skb_recv_datagram 
                    */
                    if (f_op && f_op->poll)
                        mask = (*f_op->poll)(file, retval ? NULL : wait);
                    fput_light(file, fput_needed);
                    if ((mask & POLLIN_SET) && (in & bit)) {
                        res_in |= bit;
                        retval++;
                    }
                    if ((mask & POLLOUT_SET) && (out & bit)) {
                        res_out |= bit;
                        retval++;
                    }
                    if ((mask & POLLEX_SET) && (ex & bit)) {
                        res_ex |= bit;
                        retval++;
                    }
                }
                /* perform a process switch */
         		cond_resched();
            }
            if (res_in)
                *rinp = res_in;
            if (res_out)
                *routp = res_out;
            if (res_ex)
                *rexp = res_ex;
        }
        wait = NULL;
        /*
         signal_pending
         Returns the value 1 (true) if the process identified by the *p process descriptor has
         nonblocked pending signals, and returns the value 0 (false) if it doesn't. 
         */
        if (retval || !*timeout || signal_pending(current))
            break;

理解cgroup实现

理解cgroup实现

cgroup 概述

cgroups是Linux内核提供的一种机制,这种机制可以根据特定的行为,把一系列系统任务
及其子任务整合(或分隔)到按资源划分等级的不同组内,从而为系统资源管理提供一
个统一的框架。实现cgroups的主要目的是为不同用户层面的资源管理,提供一个统一化 的接口。从单个进程的资源控制到操作系统层面的虚拟化。Cgroups提供了以下四大功能。

  1. 资源限制(Resource Limitation):cgroups可以对进程组使用的资源总额进行限制。
    如设定应用运行时使用内存的上限,一旦超过这个配额就发出OOM(Out of Memory)。
  2. 优先级分配(Prioritization):通过分配的CPU时间片数量及硬盘IO带宽大小,实际上
    就相当于控制了进程运行的优先级。
  3. 资源统计(Accounting): cgroups可以统计系统的资源使用量,如CPU使用时长、内
    存用量等等,这个功能非常适用于计费。
  4. 进程控制(Control):cgroups可以对进程组执行挂起、恢复等操作。

cgroup 术语表

  1. task(任务):cgroups的术语中,task就表示系统的一个进程。
  2. cgroup(控制组):cgroups 中的资源控制都以cgroup为单位实现。cgroup
    表示按某种资源控制标准划分而成的任务组,包含一个或多个子系统。一个
    任务可以加入某个cgroup,也可以从某个cgroup迁移到另外一个cgroup。
  3. subsystem(子系统):cgroups中的subsystem就是一个资源调度控制器
    (Resource Controller)。比如CPU子系统可以控制CPU时间分配,内存
    子系统可以限制cgroup内存使用量。
  4. hierarchy(层级树):hierarchy由一系列cgroup以一个树状结构排列而成,
    每个hierarchy通过绑定对应的subsystem进行资源调度。hierarchy中的
    cgroup节点可以包含零或多个子节点,子节点继承父节点的属性。整个系统
    可以有多个hierarchy。

cgroup的基本规则

传统的Unix进程管理,实际上是先启动init进程作为根节点,再由init节点创建
子进程作为子节点,而每个子节点由可以创建新的子节点,如此往复,形成一个
树状结构。而cgroups也是类似的树状结构,子节点都从父节点继承属性。它们
最大的不同在于,系统中cgroup构成的hierarchy可以允许存在多个。如果进程
模型是由init作为根节点构成的一棵树的话,那么cgroups的模型则是由多个
hierarchy构成的森林。这样做的目的也很好理解,如果只有一个hierarchy,
那么所有的task都要受到绑定其上的subsystem的限制,会给那些不需要这些限制
的task造成麻烦。

规则1 同一层级树可以附加多个子系统

同一个hierarchy可以附加一个或多个subsystem。如下图,cpu和memory的
subsystem附加到了一个hierarchy。

规则2 任何子系统最多可附加到一个层级树中

一个subsystem可以附加到多个hierarchy,当且仅当这些hierarchy只有这唯一
一个subsystem。如下图,小圈中的数字表示subsystem附加的时间顺序,CPU subsystem
附加到hierarchy A的同时不能再附加到hierarchy B,因为hierarchy B已经附加了
memory subsystem。如果hierarchy B与hierarchy A状态相同,没有附加过
memory subsystem,那么CPU subsystem同时附加到两个hierarchy是可以的。

规则3 任务只能存在层级树中的唯一cgroup中

系统每次新建一个hierarchy时,该系统上的所有task默认构成了这个新建的
hierarchy的初始化cgroup,这个cgroup也称为root cgroup。对于你创建的
每个hierarchy,task只能存在于其中一个cgroup中,即一个task不能存在于
同一个hierarchy的不同cgroup中,但是一个task可以存在在不同hierarchy
中的多个cgroup中。如果操作时把一个task添加到同一个hierarchy中的另
一个cgroup中,则会从第一个cgroup中移除。在下图中可以看到,httpd
进程已经加入到hierarchy A中的/cg1而不能加入同一个hierarchy中的/cg2,
但是可以加入hierarchy B中的/cg3。实际上不允许让任务加入同一个
hierarchy中的多个cgroup是为了防止出现矛盾,如CPU subsystem为/cg1
分配了30%,而为/cg2分配了50%,此时如果httpd在这两个cgroup中,就会
出现矛盾。

规则4 子进程继承父进程的cgroup属性

进程(task)在fork自身时创建的子任务(child task)默认与原task在
同一个cgroup中,但是child task允许被移动到不同的cgroup中。即fork
完成后,父子进程间是完全独立的。如下图中,小圈中的数字表示task
出现的时间顺序,当httpd刚fork出另一个httpd时,在同一个hierarchy中
的同一个cgroup中。但是随后如果PID为4840的httpd需要移动到其他cgroup
也是可以的,因为父子任务间已经独立。总结起来就是:初始化时子任务与
父任务在同一个cgroup,但是这种关系随后可以改变。

cgroup实现

cgroups的实现本质上是给系统进程挂上钩子(hooks),当task运行的过程
中涉及到某个资源时就会触发钩子上所附带的subsystem进行检测,最终根据
资源类别的不同使用对应的技术进行资源限制和优先级分配。

Linux中管理task进程的数据结构为 task_struct(包含所有进程管理的信息),
其中与cgroup相关的字段主要有两个,一个是css_set *cgroups,表示指向
css_set(包含进程相关的cgroups信息)的指针,一个task只对应一个css_set结构,
但是一个css_set可以被多个task使用。另一个字段是list_head cg_list,是一个
链表的头指针,这个链表包含了所有的链到同一个css_set的task进程(在图中使用
的回环箭头,均表示可以通过该字段找到所有同类结构,获得信息)。

每个css_set结构中都包含了一个指向 cgroup_subsys_state(包含进程与一个
特定子系统相关的信息)的指针数组。cgroup_subsys_state则指向了cgroup结构
(包含一个cgroup的所有信息),通过这种方式间接的把一个进程和cgroup
联系了起来,如下图。

另一方面,cgroup结构体中有一个list_head css_sets字段,它是一个头指针,
指向由cg_cgroup_link(包含cgroup与task之间多对多关系的信息)形成的链表。
由此获得的每一个cg_cgroup_link都包含了一个指向css_set *cg字段,指向了
每一个task的css_set。css_set结构中则包含tasks头指针,指向所有链到
此css_set的task进程构成的链表。至此,我们就明白如何查看在同一个
cgroup中的task有哪些了,如下图。

那么为什么要使用cg_cgroup_link结构体呢?因为task与cgroup之间是多对多的关系。
熟悉数据库的读者很容易理解,在数据库中,如果两张表是多对多的关系,那么如果
不加入第三张关系表,就必须为一个字段的不同添加许多行记录,导致大量冗余。
通过从主表和副表各拿一个主键新建一张关系表,可以提高数据查询的灵活性和效率。
而一个task可能处于不同的cgroup,只要这些cgroup在不同的hierarchy中,并且
每个hierarchy挂载的子系统不同;另一方面,一个cgroup中可以有多个task,这是
显而易见的,但是这些task因为可能还存在在别的cgroup中,所以它们对应的css_set
也不尽相同,所以一个cgroup也可以对应多个css_set。在系统运行之初,内核的
主函数就会对root cgroups和css_set进行初始化,每次task进行fork/exit时,都会
附加(attach)/分离(detach)对应的css_set。综上所述,添加cg_cgroup_link
主要是出于性能方面的考虑,一是节省了task_struct结构体占用的内存,二是提升
了进程fork()/exit()的速度。

定义子系统的结构体是cgroup_subsys,在下图中可以看到,cgroup_subsys中定义了
一组函数的接口,让各个子系统自己去实现,类似的思想还被用在了cgroup_subsys_state
中,cgroup_subsys_state并没有定义控制信息,只是定义了各个子系统都需要用到的
公共信息,由各个子系统各自按需去定义自己的控制信息结构体,最终在自定义的
结构体中把cgroup_subsys_state包含进去,然后内核通过container_of(这个宏可以
通过一个结构体的成员找到结构体自身)等宏定义来获取对应的结构体。

cgroup文件系统的实现

Linux VFS是所谓的虚拟文件系统转换,是一个内核软件层,用来处理与Unix标准文件
系统的所有系统调用。VFS对用户提供统一的读写等文件操作调用接口,当用户调用
读写等函数时,内核则调用特定的文件系统实现。具体而言,文件在内核内存中是
一个file数据结构来表示的。这个数据结构包含一个f_op的字段,该字段中包含了
一组指向特定文件系统实现的函数指针。当用户执行read()操作时,内核调用
sys_read(),然后sys_read()查找到指向该文件属于的文件系统的读函数指针,并
调用它,即file->f_op->read()。

VFS 文件系统定义了以下对象模型:

  1. 超级块对象(superblock object):存放已安装文件系统的有关信息。
  2. 索引节点对象(inode object):存放关于具体文件的一般信息。
  3. 文件对象( file object):存放打开文件与进程之间的交互信息
  4. 目录项对象(dentry object):存放目录项与对应文件进行链接的有关信息。

cgroup 内存子系统

cgroup 内存子系统

内存按用途分类

  1. 用户空间的匿名映射页(Anonymous pages in User Mode address spaces),比如调用
    malloc分配的内存,以及使用MAP_ANONYMOUS的mmap;当系统内存不够时,内核可以将这部分内存交换出去。
  2. 用户空间的文件映射页(Mapped pages in User Mode address spaces),包含map file
    和map tmpfs;前者比如指定文件的mmap,后者比如IPC共享内存;当系统内存不够
    时,内核可以回收这些页,但回收之前可能需要与文件同步数据。
  3. 文件缓存(page in page cache of disk file);发生在程序通过普通的read/write
    读写文件时,当系统内存不够时,内核可以回收这些页,但回收之前可能需要与文件同步数据。
  4. buffer pages,属于page cache;比如读取块设备文件。
    其中1和2是算作进程的RSS,3和4属于page cache。

内存子系统的作用

  1. Accounting memory usage under cgroup, Memory here is physical memory.
  2. Limit memory usage under user specified value.
  3. If necessary, cull(pageout) memory under it.

内存子系统的内核实现

memory子系统是通过linux的resource counter机制实现的。resource counter是内核为
子系统提供的一种资源管理机制。这个机制的实现包括了用于记录资源的数据结构和
相关函数。Resource counter定义了一个res_counter的结构体来管理特定资源,定义如下:

struct res_counter {   
	//usage 用于记录当前已使用的资源 
	unsigned long long usage;
	//max_usage 用于记录使用过的最大资源量 
	unsigned long long max_usage;
	//limit 用于设置资源的使用上限,进程组不能使用超过这个限制的资源 
	unsigned long long limit;
	//soft_limit 用于设定一个软上限,进程组使用的资源可以超过这个限制 
	unsigned long long soft_limit;
	//failcnt 用于记录资源分配失败的次数,管理可以根据这个记录,调整上限值。 
	unsigned long long failcnt; 
	spinlock_t lock;
	//Parent 指向父节点,这个变量用于处理层次性的资源管理。
	struct res_counter *parent;
};

相关函数:

void res_counter_init(struct res_counter *counter, struct res_counter *parent)
int res_counter_charge(struct res_counter *counter, unsigned long val,struct res_counter **limit_fail_at)
void res_counter_uncharge(struct res_counter *counter, unsigned long val)

函数res_counter_charge作用就是记录进程组使用的资源。
在这个函数中:

for (c = counter; c != NULL; c = c->parent) {
	spin_lock(&c->lock);
	ret = res_counter_charge_locked(c, val);
	spin_unlock(&c->lock);
	if (ret < 0) {
		*limit_fail_at = c;
		goto undo;
	}
}

在这个循环里,从当前res_counter开始,从下往上逐层增加资源的使用量。
res_counter_charge_locked这个函数,这个函数顾名思义就是在加锁的情况
下增加使用量。实现如下:

    if (counter->usage + val > counter->limit) {
        counter->failcnt++;
        ret = -ENOMEM;
        if (!force)
            return ret;
    }   

    counter->usage += val;
    if (counter->usage > counter->max_usage)
        counter->max_usage = counter->usage;

首先判断是否已经超过使用上限,如果是的话就增加失败次数,返回相关代码;
否则就增加使用量的值,如果这个值已经超过历史最大值,则更新最大值。

内存子系统定义了一个叫mem_cgroup的结构体来管理cgroup相关的内存使用信息。

struct mem_cgroup {
	/* cgroup_subsys_state成员,便于task或cgroup获取mem_cgroup。*/   
	struct cgroup_subsys_state css;	
	/* 两个res_counter成员,分别用于管理memory资源和memory+swap资源 */
	struct res_counter res;
	struct res_counter memsw;
	struct mem_cgroup_lru_info info;
	spinlock_t reclaim_param_lock;
	int prev_priority;
	int last_scanned_child;
	/* use_hierarchy则用来标记资源控制和记录时是否是层次性的。*/
	bool use_hierarchy;
	atomic_t oom_lock;
	atomic_t refcnt;
	unsigned int swappiness;
	/* oom_kill_disable则表示是否使用oom-killer。*/
	int oom_kill_disable;   
	/*
	 如果memsw_is_minimum为true,则res.limit=memsw.limit,即当进程组使用的   
 	 内存超过memory的限制时,不能通过swap来缓解。   
	*/
	bool memsw_is_minimum;
	struct mutex thresholds_lock;
	struct mem_cgroup_thresholds thresholds;
	struct mem_cgroup_thresholds memsw_thresholds;
	/* oom_notify指向一个oom notifier event fd链表。*/
	struct list_head oom_notify;
	unsigned long move_charge_at_immigrate;
	struct mem_cgroup_stat_cpu *stat;
};

内核的实现是通过mm_struct知道属于它的进程,通过函数mem_cgroup_from_task()
得到mem_cgroup,然后进行内存统计。

cgroup的任务和内存子系统之间的联系

mm_struct==>task_struct==>css_set==>cgroup_subsys_state=>mem_cgroup==>res

mem_cgroup_from_task ==>
	mem_cgroup_from_css ==>
		task_css ==>
			task_css_check 
struct mem_cgroup *mem_cgroup_from_css(struct cgroup_subsys_state *s)
{
    return s ? container_of(s, struct mem_cgroup, css) : NULL;
}

Charge/Uncharge

Memory cgroup accounts usage of memory. There are roughly 2 operations, charge/uncharge.

  1. Charge
    • (Memory) Usage += PAGE_SIZE
    • Free/cull memory if usage hit limits
    • Check a page as “This page is charged”

应用场景一

在page fault发生时的情况:

而mem_cgroup_newpage_charge()的调用如下:

mem_cgroup_newpage_charge==>
	mem_cgroup_charge_common==>
		__mem_cgroup_try_charge==>
			mem_cgroup_do_charge==>   
				mem_cgroup_reclaim
				mem_cgroup_oom
		__mem_cgroup_commit_charge

通过上面的调用关系可以知道,在charge中有回收操作。

应用场景二

在文件读写操作过程中会产生缓存,cgroup同样会处理。

/* This function is used to add a page to the pagecache. */
add_to_page_cache_locked==>    
	mem_cgroup_cache_charge==>
		mem_cgroup_charge_common==>
			__mem_cgroup_try_charge
			__mem_cgroup_commit_charge
		__mem_cgroup_try_charge_swapin
		__mem_cgroup_try_charge_swapin		
  1. Uncharge
    • (Memory) Usage -= PAGE_SIZE
    • Remove the check

页面回收

  1. 如果内核检测到某个操作期间内存严重不足,将调用try_to_free_pages。该函数检查当前内存域中所有页,并释放最不常用的那些。   
  2. 一个后台守护进程,名为kswaped,会定期检查内存使用情况,并检测即将发生的内存不足。可使用该守护进程换出页,作为预防措施,
    以防内核在执行其他操作期间发生内存不足。
    页面回收流程如下:

Global VM’s memory reclaim logic is triggered at memory shortage in a zone.

  1. It’s important where we reclaim memory from and the kernel may have to
    reclaim continuous pages.
  2. Kswapd works.
  3. Slabs are dropped.

Memcg’s memory reclaim is triggered at memory usage hits its limit.

  1. Just reducing memory is important. No care about placement of pages.
  2. No kswapd help (now)
  3. No slab drop.
unsigned long try_to_free_mem_cgroup_pages(struct mem_cgroup *memcg,
                       gfp_t gfp_mask,
                       bool noswap)
{
    struct zonelist *zonelist;
    unsigned long nr_reclaimed;
    int nid; 
    struct scan_control sc = {
        .may_writepage = !laptop_mode,
        .may_unmap = 1, 
        .may_swap = !noswap,
        .nr_to_reclaim = SWAP_CLUSTER_MAX,
        .order = 0, 
        .priority = DEF_PRIORITY,
        .target_mem_cgroup = memcg,
        .nodemask = NULL, /* we don't care the placement */
        .gfp_mask = (gfp_mask & GFP_RECLAIM_MASK) |
                (GFP_HIGHUSER_MOVABLE & ~GFP_RECLAIM_MASK),
    };   

sc的成员变量nodemask为NULL,说明可以对所有内存node进行页面回收。

应用中需要注意的要素

  1. cgroup统计内存的时刻为进程加入到cgroup,而在这之前进程使用内存不统计。   
  2. cgroup内存子系统只是设置了使用的上线,当需要使用内存的时候需要临时分配,
    而不是提前分配准备。
  3. cgroup并没有创造出一个隔离的环境,操作系统可以回收cgroup的内存。

需要理解的问题:

  1. 命名空间 和 如何实现Docker ?
  2. 为什么要强调是以线程统计?
  3. 内核/proc统计内存的方法?

自定义TCP包

在无套接字情况下,发送TCP包

目标

  1. 不使用套接字进行数据包传输。
  2. 在源端机器的内核态,组建一个TCP包并发送到目标机器。
  3. 在目标机器的内核态对自定义TCP包进行处理。

TCP/IP 参考模型

因特网协议栈中的层:

  1. 应用层
  2. 传输层
  3. 网络层
  4. 接口层

如下图中的Data Flow所示:

当我们发送一个TCP包的时候,我们应该从最上层的Application层准备需要传输的信息,然后依次填充TransportInternetLink 的协议头部。

发送端实现

发送端整体流程

243     /* 第一步 分配和设置sk_buff. */   
244     skb = alloc_set_skb();  
245     if (!skb)             
246         return -1;        
247                           
248     /* 第二步 拷贝TCP负载 */
249     err = copy_md5sum(skb);
250     if (err != 0) {       
251         kfree_skb(skb);   
252         return err;       
253     }
254                           
255     /* 第三步 组建TCP头部 */
256     err = build_tcphdr(skb);
257     if (err != 0) {       
258         kfree_skb(skb);   
259         return err;       
260     }
261    
262     /* 第四步 组建IP头部 */
263     err = build_iphdr(skb);
264     if (err != 0) {       
265         kfree_skb(skb);   
266         return err;       
267     }
268 
269     /* 第五步 计算TCP头和IP头中的checksum */  
270     iph = ip_hdr(skb);    
271     skb->ip_summed = CHECKSUM_NONE;   
272     skbcsum(skb, iph);
273 
274     /* 第六步 组建网络接口层的头部 */
275     err = build_ethhdr(skb); 
276     if (err != 0) {
277         return err;
278     }
279 
280     /* 第七步 发送SKB */
281     err = dev_queue_xmit(skb);
282 
283     return err;

分配SKB

内核中使用套接字缓冲区(struct sk_buff)表示协议栈中的数据包。

216 struct sk_buff * alloc_set_skb(void)
217 {
218     struct sk_buff *skb;
219
220     /* At least 32-bit aligned.  */
221     int size = ALIGN(sizeof(struct ethhdr), 4) + ALIGN(sizeof(struct iphdr), 4) + ALIGN(sizeof(struct tcphdr), 4) + ALIGN    (MD5LEN, 4);
222 
223     skb = alloc_skb(size, GFP_ATOMIC);
224     if (skb == NULL) {
225         printk(KERN_ERR"alloc skb failed.");
226         return NULL;
227     }
228 
229     /* Reserve space for headers and prepare control bits. */
230     skb_reserve(skb, size);
231 
232     return skb;
233 }

第221行调整size为了四字节边界对齐。第223行分配一个新的sk_buff实例。第230行skb_reserve通过移动skb的data和tail指针, 来调整skb的headroom。

拷贝传输信息

这里直接在内核态中将要传输信息拷贝到skb, 而非像套接字编程中将数据从用户态拷贝到内核态中。

205 int copy_md5sum(struct sk_buff *skb)
206 {
207     uint8_t md5_result[MD5LEN] = {1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16};
208 
209     if (skb_tailroom(skb) < MD5LEN)
210         return -1;
211     else {
212         memcpy(skb_push(skb, MD5LEN), md5_result, MD5LEN);
213     }
214 
215     return 0;
216 }

这里我们需要将一个伪MD5值传递到对端。第209行用于判断内存空间是否可以容纳MD5值,第212行的skb_push用于向前移动skb的data指针,
此时data和tail之间的内存空间就可以存放传输信息。

组装TCP头部

TCP的头部定义如下:

184 int build_tcphdr(struct sk_buff *skb)
185 {
186     struct tcphdr *th;
187 
188     skb_push(skb, ALIGN(sizeof(struct tcphdr), 4));
189     skb_reset_transport_header(skb);
190 
191     /* Build TCP header and checksum it. */
192     th = tcp_hdr(skb);
193     th->source      = htons(SOURCE);
194     th->dest        = htons(DEST);
195     th->seq         = htonl(123);
196     th->ack_seq     = 0;
197     *(((__be16 *)th) + 6)   = htons(((sizeof(struct tcphdr) >> 2) << 12) | TCPCB_FLAG_FIN);
198     th->window = htons(560);
199     th->check = 0;
200     th->urg_ptr = 0;
201 
202     return 0;
203 }

第188行设置TCP头部所需要的内存空间,第189行用于设置skb的transport_header指针,这样做是为了方便后面的tcp_hdr函数使用。
第193和194行用于设置端口,第195和196行用于设置序号和确认号,第197行用于设置TCP头长度和FIN标志位,
第198行用于设置窗口值,第199行校验和先设置为0。

组装IP头部

IP的头部定义如下:

161 int build_iphdr(struct sk_buff *skb)
162 {
163     struct iphdr *iph;
164     
165     skb_push(skb, sizeof(struct iphdr));
166     skb_reset_network_header(skb);
167     
168     iph = ip_hdr(skb);
169     *((__be16 *)iph) = htons((4 << 12) | (5 << 8) | (RT_TOS(20) & 0xff));
170     iph->tot_len = htons(skb->len);
171     iph->frag_off = htons(IP_DF);
172     iph->ttl      = 64;
173     iph->protocol = IPPROTO_TCP;
174     iph->saddr    = in_aton(SOU_IP);
175     iph->daddr    = in_aton(DST_IP);
176     
177     return 0;
178 }

第165行设置IP头部所需要的内存空间,第169用于设置IP的版本、IHL和区分服务。第170行的tot_len为总长度。
第171行设置IP_DF表示没有IP分段。第171用于设置生存期,第172行用于设置协议,IP的头检验和暂时不设置。
第174和175行用于设置源地址和目标地址。

组建网络接口层的头部

在网络接口层主要是获得本机发包网卡的MAC地址和网关的MAC地址。

124 int build_ethhdr(struct sk_buff *skb)
125 {
126     struct ethhdr *eth;
127     struct net_device *dev;
128     int err;
129    
130     dev = dev_get_by_name(&init_net, SOU_DEVICE);
131     if (!dev) {
132         printk(KERN_ERR"get device failed.");
133         return -3;
134     }
135 
136     skb->dev = dev;
137     skb->pkt_type  = PACKET_OUTGOING;
138     skb->local_df  = 1;
139     skb->protocol = htons(ETH_P_IP);
140 
141     eth = (struct ethhdr *)skb_push(skb, ETH_HLEN);
142     skb_reset_mac_header(skb);
143 
144     memcpy(eth->h_source, dev->dev_addr, ETH_ALEN);
145    
146     gate_addr = in_aton(GATE_IP);
147     err = get_mac(eth->h_dest, gate_addr, RT_TOS(20), skb);
148     if (err != 1) {
149         kfree_skb(skb);
150         if (net_ratelimit())
151             printk(KERN_ERR"get device mac failed when send packets.err:%d", err);
152             return err;
153         }
154 
155     return 0;
156 }

第130行获得网卡SOU_DEVICE在内核中的指针,用于获得MAC地址。第147行用于获得网关的MAC地址。
第138行的local_df是指”don’t fragment”,设置为1代表不会被分段。

目的端实现

因为没有在目地端建立socket,我们在netfiter中对包进行处理,netfilter中hook点的位置如下:

60 struct nf_hook_ops nf_in_ops = {
61     .list       = { NULL, NULL},
62     .pf     = PF_INET,
63 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 32)
64     .hook           = nf_in,
65     //.hooknum        = NF_INET_PRE_ROUTING,
66     .hooknum        = NF_INET_LOCAL_IN,
67 #else
68     .hook           = nf_in_p,
69     //.hooknum        = NF_IP_PRE_ROUTING,
70     .hooknum        = NF_IP_LOCAL_IN,
71 #endif
72     .priority       = NF_IP_PRI_FIRST,
73 };

hook函数如下:

10 static unsigned int nf_in(
11 #if LINUX_VERSION_CODE >= KERNEL_VERSION(3, 13, 0)
12         const struct nf_hook_ops *ops,
13 #else
14         unsigned int hooknum,
15 #endif
16         struct sk_buff *skb,
17         const struct net_device *in,
18         const struct net_device *out,
19         int (*okfn)(struct sk_buff *))
20 {
21     unsigned short sport, dport;
22     struct iphdr *iph;
23     struct tcphdr *tcph;
24     char *tcp_payload = NULL;
25     size_t tcp_payload_len = 0;
26     int i;
27 
28     iph = ip_hdr(skb);
29     if (iph->protocol == IPPROTO_TCP) {
30         tcph = (struct tcphdr *)(skb->data + (iph->ihl << 2));
31 
32         sport = tcph->source;
33         dport = tcph->dest;
34         if (ntohs(sport) == 6880 && ntohs(dport) == 6880) {
35             printk(KERN_INFO "find a new packet");
36             tcp_payload = (char *)((unsigned char *)tcph + (tcph->doff << 2));
37             tcp_payload_len = ntohs(iph->tot_len) - (iph->ihl << 2) - (tcph->doff << 2);
38 
39             if (tcp_payload_len == 16)
40                 for (i = 0; i < tcp_payload_len; ++i)
41                     printk("%d:", tcp_payload[i]);
42 
43             return NF_DROP;
44         }
45     }
46     return NF_ACCEPT;
47 }

第29行到43行,我们对我们自己制造的TCP包进行处理。

结论

  1. 使用此种方法对于传输少量数据可行,但是由于丢包的问题,需要增加ACK和重传机制。
  2. 改为per-cpu模式话,可以用于局域网中测试网卡性能。

github地址

github地址

结构体 skb_shared_info 成员变量 nr_frags 和 frags

结构体 skb_shared_info 成员变量 nr_frags 和 frags

结构体声明

struct skb_shared_info {
	atomic_t    dataref;
	unsigned short  nr_frags;   
	unsigned short  gso_size;
	/* Warning: this field is not always filled in (UFO)! */
	unsigned short  gso_segs; 
	unsigned short  gso_type;
	__be32          ip6_frag_id;
	struct sk_buff  *frag_list;
	skb_frag_t  frags[MAX_SKB_FRAGS];
};   

nr_fragsfrags 用于支持 Scatter/Gather I/O buffer,这样SKB的数据部分不需要都 放在线性内存空间而是放在 page.

在发送路径上的应用

在函数 tcp_sendmsg 中, 用于存放从应用层拷贝过来的数据。

/*
 skb_tailroom 用于返回此SKB的skb->tail到skb->end之间的空闲距离,如果有的话,   
 可以利用。
 */
if (skb_tailroom(skb) > 0) {
	/* We have some space in skb head. Superb! */
    if (copy > skb_tailroom(skb))
    	copy = skb_tailroom(skb);

    /*
     skb_add_data 拷贝数据的同时计算checksum.
    */
    if ((err = skb_add_data(skb, from, copy)) != 0)
    	goto do_fault;
    } else {
    	int merge = 0;  
		/*
		 i 是 frags 已经使用的数目。
		*/
        int i = skb_shinfo(skb)->nr_frags;	  
        struct page *page = TCP_PAGE(sk);   
        int off = TCP_OFF(sk);  

		/*
		 skb_can_coalesce 检查SKB的frags中最后一个page是否有空闲空间。
		 */
        if (skb_can_coalesce(skb, i, page, off) &&
                    off != PAGE_SIZE) {
             merge = 1;
        } else if (i == MAX_SKB_FRAGS ||
        	(!i &&
            !(sk->sk_route_caps & NETIF_F_SG))) {
                /*
                 执行到这里说明 frags的使用到达上限,需要重新分配一个sk_buff.
                */
                tcp_mark_push(tp, skb);
                goto new_segment;
        } else if (page) {
        	/*  a new page is needed */
            if (off == PAGE_SIZE) {
            	put_page(page);
                TCP_PAGE(sk) = page = NULL;
                off = 0;
            }
        } else
        	off = 0;

在接收路径上的应用

函数 skb_copy_datagram_iovec 用于将数据从socket buffer拷贝到应用层。

    /*
     skb_headlen(skb) 返回的值为 (skb->len - skb->data_len),skb->len   
     为所有TCP负载的长度,skb->data_len 为使用SG功能存放于page中的数据   
     长度。offset 为缓存区中的偏移。 
     */
	int start = skb_headlen(skb);
    int i, copy = start - offset;

    /*
     copy 大于0,说明skb->data 到 skb->tail 之间的线性数据区域依然有数据需要拷贝。
    */
    if (copy > 0) {
        if (copy > len)
            copy = len;
        if (memcpy_toiovec(to, skb->data + offset, copy))
            goto fault;
        if ((len -= copy) == 0)
            return 0;
        offset += copy;
    }   

    /* Copy paged appendix. Hmm... why does this look so complicated? */
    for (i = 0; i < skb_shinfo(skb)->nr_frags; i++) {
        int end;

        BUG_TRAP(start <= offset + len);

        /*
         end 表示此 page 所包含负载的终点,如果offset已经大于等于此end,说明这个page   
         已经拷贝完,那么执行循环的下一次。
         */
        end = start + skb_shinfo(skb)->frags[i].size;   
        if ((copy = end - offset) > 0) {
            int err;
            u8  *vaddr;
            skb_frag_t *frag = &skb_shinfo(skb)->frags[i];
            struct page *page = frag->page;
      
            if (copy > len)
                copy = len;
            vaddr = kmap(page);
            err = memcpy_toiovec(to, vaddr + frag->page_offset +
                         offset - start, copy);
            kunmap(page);
            if (err)
                goto fault;
            if (!(len -= copy))
                return 0;
            offset += copy;
        }

        /*
         累积拷贝负载的起始位置用于下次循环。
        */
        start = end;  
    }

cgroup 应用

cgroup 应用

cgroup 介绍

  CGroup 是 Control Groups 的缩写,是 Linux 内核提供的一种可以限制、记录、
隔离进程组 (process groups) 所使用的物力资源 (如 cpu memory i/o 等等) 的机制。CGroup
提供了一个 CGroup 虚拟文件系统,作为进行分组管理和各子系统设置的用户接口。要使
用CGroup,必须挂载 CGroup 文件系统。这时通过挂载选项指定使用哪个子系统。

cgroup 功能

cgroup 子系统介绍

  1. blkio 设置限制每个块设备的输入输出控制。例如:磁盘,光盘以及usb等等。
  2. cpu 使用调度程序为cgroup任务提供cpu的访问。
  3. cpuacct 产生cgroup任务的cpu资源报告。
  4. cpuset 如果是多核心的cpu,这个子系统会为cgroup任务分配单独的cpu和内存。
  5. devices 允许或拒绝cgroup任务对设备的访问。
  6. freezer 暂停和恢复cgroup任务。
  7. memory 设置每个cgroup的内存限制以及产生内存资源报告。
  8. net_cls 标记每个网络包以供cgroup方便使用。
  9. ns 名称空间子系统。
  10. perf_event 增加了对每group的监测跟踪的能力,即可以监测属于某个特定的group的所有线程以及运行在特定CPU上的线程,此功能对于监测整个group非常有用。

cgroup 使用

安装

sudo yum install libcgroup

启动关闭服务

The cgconfig(control group config) service is used to create cgroups and manage subsystems.

sudo service cgconfig start/stop

start up at the boot time.

chkconfig cgconfig on    

配置

  The cgroup configuration file is /etc/cgconfig.conf. Depending on the contents of the configuration file, cgconfig can create hierarchies, mount necessary file systems, create cgroups, and set subsystem parameters (resource limits) for each cgroup.

/etc/cgconfig 的格式

mount {
    <controller> = <path>;
    ...
}
group <name> {
    [<permissions>]
    <controller> {
        <param name> = <param value>;
        …
    }
    …
}

cgroup 实验

cgroup 实验

安装cgroup

在 centOS 6.5 上使用如下命令安装:

yum install cgroup

启动cgroup

service cgconfig start   #开启cgroups服务
chkconfig cgconfig on    #开机启动

创建控制组

mkdir -p /cgroup/cpu/limit_user
mkdir -p /cgroup/memory/limit_user

设置控制条件

echo 500000 > /cgroup/cpu/limit_user/cpu.cfs_quota_us

以上的命令是指此控制组只可以使用50%的CPU。cfs_period_us表示控制组可以访问
CPU的时间段,并以微秒为单位,cfs_quota_us表示控制组在执行时间中的配额。如果
让一个cgroup中的task可以执行0.2秒,那么就需要设置cfs_quota_us为200000 和
cpu.cfs_period_us 为 1000000.这里10000000的单位为微秒。

echo 1048576 >  /cgroup/memory/limit_user/memory.limit_in_bytes

分配1MB的内存给cgroup

测试脚本:

脚本一消耗CPU

#! /bin/bash

x=0
while [ True ];do
    x=$x+1
done;

执行效果:

如上图所示CPU占用99%,执行以下命令:
echo 30036 > /cgroup/cpu/limit_user/tasks 可以看到CPU的占用会被限制到50%。

脚本二消耗内存

#! /bin/bash

echo $$ > /cgroup/memory/foo/tasks     #脚本主动将自身进程加入到cgroup

x="a"
while [ True ];do
    x=$x$x
done;

执行此脚本,当此进程试图占用的内存超过了cgroups的限制,会触发out of memory,导致进程被kill掉。