说明
- Linux kernel version:5.7.1
- Open_vSwitch version:2.15.0
- Source Insight version:4.0
概要
本篇文章来介绍ovs内核部分的流表实现原理,这里采用带入式疗法来分析其代码实现,那么首先从内核收包开始说起,以e1000网卡为例开始阅读之旅。
设备收包
这里简单介绍一下驱动收包流程,当网卡收到数据包之后,会发送硬件中断通知cpu进行处理,网卡在打开的时候会注册对应的中断处理函数,这里e1000网卡在启动时候会调用e1000_open函数,在e1000_open函数中会调度e1000_request_irq函数进行网卡硬中断的注册:
static int e1000_request_irq(struct e1000_adapter *adapter)
{
struct net_device *netdev = adapter->netdev;
irq_handler_t handler = e1000_intr;
int irq_flags = IRQF_SHARED;
int err;
err = request_irq(adapter->pdev->irq, handler, irq_flags, netdev->name,
netdev);
if (err) {
e_err(probe, "Unable to allocate interrupt Error: %d\n", err);
}
return err;
}
这里可以看到e1000的硬中断响应函数为e1000_intr,即当网卡触发中断时os将会调用到这个函数,在napi模式下触发软中断,然后进入e1000的poll函数,e1000的poll函数为e1000_clean,e1000_clean调用e1000_clean_rx_irq进行数据包的处理,即从dma驱动拷贝数据到内核skb,然后使用e1000_receive_skb进入skb的处理,首先进行gro(和tso对应,gro用于收包,tso用于发包)流程napi_gro_receive,gro的处理函数是dev_gro_receive,napi_skb_finish根据dev_gro_receive的返回结果决定是否将skb上送协议栈,如果dev_gro_receive返回GRO_NORMAL则调用gro_normal_one上送协议栈处理,然后在经过一系列的调用之后(
netif_receive_skb_list_internal->__netif_receive_skb_list->
__netif_receive_skb_list_core->__netif_receive_skb_core)进入协议栈处理函数__netif_receive_skb_core,从这开始就到了协议栈的部分。
协议栈处理
上面说到数据包来到了__netif_receive_skb_core,这个函数主要完成如下工作:
- 打时间戳
- 重置网络头部
- xdp处理
- vlan处理
- tc处理
- ptype_all处理(tcpdump、tap\tun等)
- 二层网桥处理(rx_handler)
- ptype_base处理(ip/arp等)
在二层往网桥处理部分,即rx_handler的处理中,就会到ovs的流程了,目前内核中这个rx_handler的值可能有两个:内核网桥的br_handle_frame(在将设备设置为混杂模式时就会将此设备的rx_handler设置为br_handle_frame)、ovs网桥的netdev_frame_hook(在将设备添加到ovs网桥设备时就会将rx_handler赋值为netdev_frame_hook),所以后面的分析就从netdev_frame_hook函数开始。
ovs设备入口
netdev_frame_hook分析,代码中一些关键点有注释:
/* Called with rcu_read_lock and bottom-halves disabled. */
/* Must be called with rcu_read_lock. */
static void netdev_port_receive(struct sk_buff *skb)
{
struct vport *vport;
//vport信息保存在dev->rx_handler_data中的
vport = ovs_netdev_get_vport(skb->dev);
if (unlikely(!vport))
goto error;
if (unlikely(skb_warn_if_lro(skb)))
goto error;
/* Make our own copy of the packet. Otherwise we will mangle the
* packet for anyone who came before us (e.g. tcpdump via AF_PACKET).
*/
skb = skb_share_check(skb, GFP_ATOMIC);
if (unlikely(!skb))
return;
if (skb->dev->type == ARPHRD_ETHER) {
skb_push(skb, ETH_HLEN);
skb_postpush_rcsum(skb, skb->data, ETH_HLEN);
}
ovs_vport_receive(vport, skb, skb_tunnel_info(skb));
return;
error:
kfree_skb(skb);
}
static rx_handler_result_t netdev_frame_hook(struct sk_buff **pskb)
{
struct sk_buff *skb = *pskb;
if (unlikely(skb->pkt_type == PACKET_LOOPBACK))
return RX_HANDLER_PASS;
netdev_port_receive(skb);
//rx_handler收到这个返回值之后会直接out 不会进入协议栈处理了
return RX_HANDLER_CONSUMED;
}
进行基本信息的提取和判断之后进入ovs_vport_receive流程:
/**
* ovs_vport_receive - pass up received packet to the datapath for processing
*
* @vport: vport that received the packet
* @skb: skb that was received
* @tun_key: tunnel (if any) that carried packet
*
* Must be called with rcu_read_lock. The packet cannot be shared and
* skb->data should point to the Ethernet header.
*/
int ovs_vport_receive(struct vport *vport, struct sk_buff *skb,
const struct ip_tunnel_info *tun_info)
{
struct sw_flow_key key;
int error;
//将ovs_skb_cb记录到skb的cb成员
OVS_CB(skb)->input_vport = vport;
OVS_CB(skb)->mru = 0;
OVS_CB(skb)->cutlen = 0;
//这里判断是否在同一个namespace
if (unlikely(dev_net(skb->dev) != ovs_dp_get_net(vport->dp))) {
u32 mark;
mark = skb->mark;
skb_scrub_packet(skb, true);
skb->mark = mark;
tun_info = NULL;
}
/* Extract flow from 'skb' into 'key'. */
//这里从skb中提取出key中相关的字段 后面在查找流表的时候使用key
error = ovs_flow_key_extract(tun_info, skb, &key);
if (unlikely(error)) {
kfree_skb(skb);
return error;
}
//ovs流表缓存查找过程
ovs_dp_process_packet(skb, &key);
return 0;
}
ovs_vport_receive有个比较关键的流程就是ovs_flow_key_extract,作用是将ovs关心的一些字段从skb中提到到key,key中包含了数据包的二层、三层、四层、ct等字段,用于后续流表缓存的匹配过程。
ovs流表缓存匹配
到这里就差不多进入ovs主要处理流程了,入口函数为ovs_dp_process_packet,这个函数中涉及到ovs中的一些概念和架构,这些等把代码看完之后再回头来整理,这里先大致介绍下ovs_dp_process_packet的工作过程:
- 根据key查找流表缓存
- 找到流表(hit),执行actions
- 没找到流表(miss),进入upcall流程,在用户态vswitchd中就行openflow流表查找
- 更新状态信息
下面我们详细来看这个流变缓存查找过程,先带出几个疑问吧,后面主要解释这几个疑问:
1.流表缓存,一般就是hash表了,那么key是什么,value是什么?
2.流表缓存怎么组织的?
3.流表缓存大小?
输入到这一步的参数有两个skb和key,下面我们看下ovs_dp_process_packet的实现:
/* Must be called with rcu_read_lock. */
void ovs_dp_process_packet(struct sk_buff *skb, struct sw_flow_key *key)
{
//从skb->cb成员获取vport结构
const struct vport *p = OVS_CB(skb)->input_vport;
//datapath即ovs bridge结构了 包含了流表 port等信息的一个结构
struct datapath *dp = p->dp;
struct sw_flow *flow;
struct sw_flow_actions *sf_acts;
struct dp_stats_percpu *stats;
u64 *stats_counter;
u32 n_mask_hit;
int error;
//统计成员 每个cpu一个 避免多线程加锁 减少cpu cache miss
stats = this_cpu_ptr(dp->stats_percpu);
/* Look up flow. */
//流表缓存查找过程 注意输入参数 这里在skb中提起了hash之后 后面的过程就不用skb了
flow = ovs_flow_tbl_lookup_stats(&dp->table, key, skb_get_hash(skb),
&n_mask_hit);
//没有命中缓存 进入upcall流程
if (unlikely(!flow)) {
struct dp_upcall_info upcall;
memset(&upcall, 0, sizeof(upcall));
upcall.cmd = OVS_PACKET_CMD_MISS;
upcall.portid = ovs_vport_find_upcall_portid(p, skb);
upcall.mru = OVS_CB(skb)->mru;
error = ovs_dp_upcall(dp, skb, key, &upcall, 0);
if (unlikely(error))
kfree_skb(skb);
else
consume_skb(skb);
stats_counter = &stats->n_missed;
goto out;
}
//命中缓存 更新状态 执行actions
ovs_flow_stats_update(flow, key->tp.flags, skb);
sf_acts = rcu_dereference(flow->sf_acts);
error = ovs_execute_actions(dp, skb, sf_acts, key);
if (unlikely(error))
net_dbg_ratelimited("ovs: action execution error on datapath %s: %d\n",
ovs_dp_name(dp), error);
stats_counter = &stats->n_hit;
out:
/* Update datapath statistics. */
u64_stats_update_begin(&stats->syncp);
(*stats_counter)++;
stats->n_mask_hit += n_mask_hit;
u64_stats_update_end(&stats->syncp);
}
流表缓存入口ovs_flow_tbl_lookup_stats,由于流表缓存是支持掩码匹配的,此版本并不是绝对匹配,所以此函数的目标就是根据skb->hash快速在缓存中符合此流的mask,如果无法计算出skb->hash,则只能从0开始遍历所有的mask,实现如下:
/*
* mask_cache maps flow to probable mask. This cache is not tightly
* coupled cache, It means updates to mask list can result in inconsistent
* cache entry in mask cache.
* This is per cpu cache and is divided in MC_HASH_SEGS segments.
* In case of a hash collision the entry is hashed in next segment.
* */
struct sw_flow *ovs_flow_tbl_lookup_stats(struct flow_table *tbl,
const struct sw_flow_key *key,
u32 skb_hash,
u32 *n_mask_hit)
{
//保存了所有的mask的数组,大小是动态变化的
struct mask_array *ma = rcu_dereference(tbl->mask_array);
//保存了所有的sw_flow,就哈希表的值,里面保存了mask和对应的actions,即这个就是流表缓存查找的结果
struct table_instance *ti = rcu_dereference(tbl->ti);
struct mask_cache_entry *entries, *ce;
struct sw_flow *flow;
u32 hash;
int seg;
*n_mask_hit = 0;
//如果无法计算出skb_hash,则从mask_array的第0位开始匹配缓存
if (unlikely(!skb_hash)) {
u32 mask_index = 0;
return flow_lookup(tbl, ti, ma, key, n_mask_hit, &mask_index);
}
/* Pre and post recirulation flows usually have the same skb_hash
* value. To avoid hash collisions, rehash the 'skb_hash' with
* 'recirc_id'. */
if (key->recirc_id)
skb_hash = jhash_1word(skb_hash, key->recirc_id);
ce = NULL;
hash = skb_hash;
entries = this_cpu_ptr(tbl->mask_cache);
/* Find the cache entry 'ce' to operate on. */
//这里是可以计算skb_hash值,根据值可以快速定位mask_array的索引
for (seg = 0; seg < mc_hash_segs seg 4wayhash4 int index='hash' mc_hash_entries - 1 struct mask_cache_entry e e='&entries[index];' if e->skb_hash == skb_hash) {
flow = flow_lookup(tbl, ti, ma, key, n_mask_hit,
&e->mask_index);
if (!flow)
e->skb_hash = 0;
return flow;
}
if (!ce || e->skb_hash < ce->skb_hash)
ce = e; /* A better replacement cache candidate. */
hash >>= MC_HASH_SHIFT;
}
//如果上面查找失败了 从索引0开始匹配
/* Cache miss, do full lookup. */
flow = flow_lookup(tbl, ti, ma, key, n_mask_hit, &ce->mask_index);
if (flow)
ce->skb_hash = skb_hash;
return flow;
}
上面查找过程中的表是mask_cache(entry里面保存的是mask_key在mask_array中的index),key为skb->hash,value为 mask_cache_entry(entry里面保存的是mask_key在mask_array中的index),所以这阶段查找的结果就是一个index,经过上面查找到的index,进入flow_lookup流程:
/* Flow lookup does full lookup on flow table. It starts with
* mask from index passed in *index.
*/
static struct sw_flow *flow_lookup(struct flow_table *tbl,
struct table_instance *ti,
struct mask_array *ma,
const struct sw_flow_key *key,
u32 *n_mask_hit,
u32 *index)
{
struct sw_flow *flow;
struct sw_flow_mask *mask;
int i;
if (likely(*index < ma->max)) {
mask = rcu_dereference_ovsl(ma->masks[*index]);
if (mask) {
flow = masked_flow_lookup(ti, key, mask, n_mask_hit);
if (flow)
return flow;
}
}
for (i = 0; i < ma->max; i++) {
if (i == *index)
continue;
mask = rcu_dereference_ovsl(ma->masks[i]);
if (unlikely(!mask))
break;
flow = masked_flow_lookup(ti, key, mask, n_mask_hit);
if (flow) { /* Found */
*index = i;
return flow;
}
}
return NULL;
}
index的目的是从mask_array->masks[index]从取sw_flow_mask,此sw_flow_mask作为入参进入masked_flow_lookup流程:
static struct sw_flow *masked_flow_lookup(struct table_instance *ti,
const struct sw_flow_key *unmasked,
const struct sw_flow_mask *mask,
u32 *n_mask_hit)
{
struct sw_flow *flow;
struct hlist_head *head;
u32 hash;
struct sw_flow_key masked_key;
ovs_flow_mask_key(&masked_key, unmasked, false, mask);
hash = flow_hash(&masked_key, &mask->range);
head = find_bucket(ti, hash);
(*n_mask_hit)++;
hlist_for_each_entry_rcu(flow, head, flow_table.node[ti->node_ver],
lockdep_ovsl_is_held()) {
if (flow->mask == mask && flow->flow_table.hash == hash &&
flow_cmp_masked_key(flow, &masked_key, &mask->range))
return flow;
}
return NULL;
}
此流程便是查找sw_flow的最后流程,此流程所做的工作如下:
- 根据sw_flow_mask和从skb中提取的sw_flow_key按mask中的range做与运算,结果为hash
- 根据hash在hlist哈希表中找桶头head
- 根据head桶头依次遍历所有的flow
- 匹配成功的条件为flow->mask==mask且flow->flow_table.hash==hash且sw_flow_key内容相同
至此,查找过程就结束了。
总结
因为此文仅为ovs kernel流表缓存查找过程,所以画个图总结一下此过程,流表怎么建立更新另开一文。