ovs原理-内核流表-01

说明

  • Linux kernel version:5.7.1
  • Open_vSwitch version:2.15.0
  • Source Insight version:4.0

概要

本篇文章来介绍ovs内核部分的流表实现原理,这里采用带入式疗法来分析其代码实现,那么首先从内核收包开始说起,以e1000网卡为例开始阅读之旅。

设备收包

这里简单介绍一下驱动收包流程,当网卡收到数据包之后,会发送硬件中断通知cpu进行处理,网卡在打开的时候会注册对应的中断处理函数,这里e1000网卡在启动时候会调用e1000_open函数,在e1000_open函数中会调度e1000_request_irq函数进行网卡硬中断的注册:

static int e1000_request_irq(struct e1000_adapter *adapter)
{
	struct net_device *netdev = adapter->netdev;
	irq_handler_t handler = e1000_intr;
	int irq_flags = IRQF_SHARED;
	int err;

	err = request_irq(adapter->pdev->irq, handler, irq_flags, netdev->name,
			  netdev);
	if (err) {
		e_err(probe, "Unable to allocate interrupt Error: %d\n", err);
	}

	return err;
}

这里可以看到e1000的硬中断响应函数为e1000_intr,即当网卡触发中断时os将会调用到这个函数,在napi模式下触发软中断,然后进入e1000的poll函数,e1000的poll函数为e1000_clean,e1000_clean调用e1000_clean_rx_irq进行数据包的处理,即从dma驱动拷贝数据到内核skb,然后使用e1000_receive_skb进入skb的处理,首先进行gro(和tso对应,gro用于收包,tso用于发包)流程napi_gro_receive,gro的处理函数是dev_gro_receive,napi_skb_finish根据dev_gro_receive的返回结果决定是否将skb上送协议栈,如果dev_gro_receive返回GRO_NORMAL则调用gro_normal_one上送协议栈处理,然后在经过一系列的调用之后(
netif_receive_skb_list_internal->__netif_receive_skb_list->
__netif_receive_skb_list_core->__netif_receive_skb_core
)进入协议栈处理函数__netif_receive_skb_core,从这开始就到了协议栈的部分。

协议栈处理

上面说到数据包来到了__netif_receive_skb_core,这个函数主要完成如下工作:

  • 打时间戳
  • 重置网络头部
  • xdp处理
  • vlan处理
  • tc处理
  • ptype_all处理(tcpdump、tap\tun等)
  • 二层网桥处理(rx_handler)
  • ptype_base处理(ip/arp等)

在二层往网桥处理部分,即rx_handler的处理中,就会到ovs的流程了,目前内核中这个rx_handler的值可能有两个:内核网桥的br_handle_frame(在将设备设置为混杂模式时就会将此设备的rx_handler设置为br_handle_frame)、ovs网桥的netdev_frame_hook(在将设备添加到ovs网桥设备时就会将rx_handler赋值为netdev_frame_hook),所以后面的分析就从netdev_frame_hook函数开始。

ovs设备入口

netdev_frame_hook分析,代码中一些关键点有注释:

/* Called with rcu_read_lock and bottom-halves disabled. */
/* Must be called with rcu_read_lock. */
static void netdev_port_receive(struct sk_buff *skb)
{
	struct vport *vport;

//vport信息保存在dev->rx_handler_data中的
	vport = ovs_netdev_get_vport(skb->dev);
	if (unlikely(!vport))
		goto error;

	if (unlikely(skb_warn_if_lro(skb)))
		goto error;

	/* Make our own copy of the packet.  Otherwise we will mangle the
	 * packet for anyone who came before us (e.g. tcpdump via AF_PACKET).
	 */
	skb = skb_share_check(skb, GFP_ATOMIC);
	if (unlikely(!skb))
		return;

	if (skb->dev->type == ARPHRD_ETHER) {
		skb_push(skb, ETH_HLEN);
		skb_postpush_rcsum(skb, skb->data, ETH_HLEN);
	}
	ovs_vport_receive(vport, skb, skb_tunnel_info(skb));
	return;
error:
	kfree_skb(skb);
}

static rx_handler_result_t netdev_frame_hook(struct sk_buff **pskb)
{
	struct sk_buff *skb = *pskb;

	if (unlikely(skb->pkt_type == PACKET_LOOPBACK))
		return RX_HANDLER_PASS;

	netdev_port_receive(skb);
//rx_handler收到这个返回值之后会直接out 不会进入协议栈处理了
	return RX_HANDLER_CONSUMED;
}

进行基本信息的提取和判断之后进入ovs_vport_receive流程:

/**
 *	ovs_vport_receive - pass up received packet to the datapath for processing
 *
 * @vport: vport that received the packet
 * @skb: skb that was received
 * @tun_key: tunnel (if any) that carried packet
 *
 * Must be called with rcu_read_lock.  The packet cannot be shared and
 * skb->data should point to the Ethernet header.
 */
int ovs_vport_receive(struct vport *vport, struct sk_buff *skb,
		      const struct ip_tunnel_info *tun_info)
{
	struct sw_flow_key key;
	int error;
//将ovs_skb_cb记录到skb的cb成员
	OVS_CB(skb)->input_vport = vport;
	OVS_CB(skb)->mru = 0;
	OVS_CB(skb)->cutlen = 0;
//这里判断是否在同一个namespace
	if (unlikely(dev_net(skb->dev) != ovs_dp_get_net(vport->dp))) {
		u32 mark;

		mark = skb->mark;
		skb_scrub_packet(skb, true);
		skb->mark = mark;
		tun_info = NULL;
	}

	/* Extract flow from 'skb' into 'key'. */
//这里从skb中提取出key中相关的字段 后面在查找流表的时候使用key
	error = ovs_flow_key_extract(tun_info, skb, &key);
	if (unlikely(error)) {
		kfree_skb(skb);
		return error;
	}
//ovs流表缓存查找过程
	ovs_dp_process_packet(skb, &key);
	return 0;
}

ovs_vport_receive有个比较关键的流程就是ovs_flow_key_extract,作用是将ovs关心的一些字段从skb中提到到key,key中包含了数据包的二层、三层、四层、ct等字段,用于后续流表缓存的匹配过程。

ovs流表缓存匹配

到这里就差不多进入ovs主要处理流程了,入口函数为ovs_dp_process_packet,这个函数中涉及到ovs中的一些概念和架构,这些等把代码看完之后再回头来整理,这里先大致介绍下ovs_dp_process_packet的工作过程:

  • 根据key查找流表缓存
  • 找到流表(hit),执行actions
  • 没找到流表(miss),进入upcall流程,在用户态vswitchd中就行openflow流表查找
  • 更新状态信息

下面我们详细来看这个流变缓存查找过程,先带出几个疑问吧,后面主要解释这几个疑问:
1.流表缓存,一般就是hash表了,那么key是什么,value是什么?
2.流表缓存怎么组织的?
3.流表缓存大小?
输入到这一步的参数有两个
skbkey,下面我们看下ovs_dp_process_packet的实现:

/* Must be called with rcu_read_lock. */
void ovs_dp_process_packet(struct sk_buff *skb, struct sw_flow_key *key)
{
//从skb->cb成员获取vport结构
	const struct vport *p = OVS_CB(skb)->input_vport;
//datapath即ovs bridge结构了 包含了流表 port等信息的一个结构
	struct datapath *dp = p->dp;
	struct sw_flow *flow;
	struct sw_flow_actions *sf_acts;
	struct dp_stats_percpu *stats;
	u64 *stats_counter;
	u32 n_mask_hit;
	int error;
//统计成员 每个cpu一个 避免多线程加锁 减少cpu cache miss
	stats = this_cpu_ptr(dp->stats_percpu);

	/* Look up flow. */
//流表缓存查找过程 注意输入参数 这里在skb中提起了hash之后 后面的过程就不用skb了
	flow = ovs_flow_tbl_lookup_stats(&dp->table, key, skb_get_hash(skb),
					 &n_mask_hit);
//没有命中缓存 进入upcall流程
	if (unlikely(!flow)) {
		struct dp_upcall_info upcall;

		memset(&upcall, 0, sizeof(upcall));
		upcall.cmd = OVS_PACKET_CMD_MISS;
		upcall.portid = ovs_vport_find_upcall_portid(p, skb);
		upcall.mru = OVS_CB(skb)->mru;
		error = ovs_dp_upcall(dp, skb, key, &upcall, 0);
		if (unlikely(error))
			kfree_skb(skb);
		else
			consume_skb(skb);
		stats_counter = &stats->n_missed;
		goto out;
	}
//命中缓存 更新状态 执行actions
	ovs_flow_stats_update(flow, key->tp.flags, skb);
	sf_acts = rcu_dereference(flow->sf_acts);
	error = ovs_execute_actions(dp, skb, sf_acts, key);
	if (unlikely(error))
		net_dbg_ratelimited("ovs: action execution error on datapath %s: %d\n",
							ovs_dp_name(dp), error);

	stats_counter = &stats->n_hit;

out:
	/* Update datapath statistics. */
	u64_stats_update_begin(&stats->syncp);
	(*stats_counter)++;
	stats->n_mask_hit += n_mask_hit;
	u64_stats_update_end(&stats->syncp);
}

流表缓存入口ovs_flow_tbl_lookup_stats,由于流表缓存是支持掩码匹配的,此版本并不是绝对匹配,所以此函数的目标就是根据skb->hash快速在缓存中符合此流的mask,如果无法计算出skb->hash,则只能从0开始遍历所有的mask,实现如下:

/*
 * mask_cache maps flow to probable mask. This cache is not tightly
 * coupled cache, It means updates to  mask list can result in inconsistent
 * cache entry in mask cache.
 * This is per cpu cache and is divided in MC_HASH_SEGS segments.
 * In case of a hash collision the entry is hashed in next segment.
 * */
struct sw_flow *ovs_flow_tbl_lookup_stats(struct flow_table *tbl,
					  const struct sw_flow_key *key,
					  u32 skb_hash,
					  u32 *n_mask_hit)
{
//保存了所有的mask的数组,大小是动态变化的
	struct mask_array *ma = rcu_dereference(tbl->mask_array);
//保存了所有的sw_flow,就哈希表的值,里面保存了mask和对应的actions,即这个就是流表缓存查找的结果
	struct table_instance *ti = rcu_dereference(tbl->ti);
	struct mask_cache_entry *entries, *ce;
	struct sw_flow *flow;
	u32 hash;
	int seg;

	*n_mask_hit = 0;
//如果无法计算出skb_hash,则从mask_array的第0位开始匹配缓存
	if (unlikely(!skb_hash)) {
		u32 mask_index = 0;
		return flow_lookup(tbl, ti, ma, key, n_mask_hit, &mask_index);
	}

	/* Pre and post recirulation flows usually have the same skb_hash
	 * value. To avoid hash collisions, rehash the 'skb_hash' with
	 * 'recirc_id'.  */
	if (key->recirc_id)
		skb_hash = jhash_1word(skb_hash, key->recirc_id);

	ce = NULL;
	hash = skb_hash;
	entries = this_cpu_ptr(tbl->mask_cache);

	/* Find the cache entry 'ce' to operate on. */
//这里是可以计算skb_hash值,根据值可以快速定位mask_array的索引
	for (seg = 0; seg < mc_hash_segs seg 4wayhash4 int index='hash' mc_hash_entries - 1 struct mask_cache_entry e e='&entries[index];' if e->skb_hash == skb_hash) {
			flow = flow_lookup(tbl, ti, ma, key, n_mask_hit,
					   &e->mask_index);
			if (!flow)
				e->skb_hash = 0;
			return flow;
		}

		if (!ce || e->skb_hash < ce->skb_hash)
			ce = e;  /* A better replacement cache candidate. */

		hash >>= MC_HASH_SHIFT;
	}
//如果上面查找失败了 从索引0开始匹配
	/* Cache miss, do full lookup. */
	flow = flow_lookup(tbl, ti, ma, key, n_mask_hit, &ce->mask_index);
	if (flow)
		ce->skb_hash = skb_hash;

	return flow;
}

上面查找过程中的表是mask_cache(entry里面保存的是mask_key在mask_array中的index),key为skb->hash,value为 mask_cache_entry(entry里面保存的是mask_key在mask_array中的index),所以这阶段查找的结果就是一个index,经过上面查找到的index,进入flow_lookup流程:

/* Flow lookup does full lookup on flow table. It starts with
 * mask from index passed in *index.
 */
static struct sw_flow *flow_lookup(struct flow_table *tbl,
				   struct table_instance *ti,
				   struct mask_array *ma,
				   const struct sw_flow_key *key,
				   u32 *n_mask_hit,
				   u32 *index)
{
	struct sw_flow *flow;
	struct sw_flow_mask *mask;
	int i;

	if (likely(*index < ma->max)) {
		mask = rcu_dereference_ovsl(ma->masks[*index]);
		if (mask) {
			flow = masked_flow_lookup(ti, key, mask, n_mask_hit);
			if (flow)
				return flow;
		}
	}

	for (i = 0; i < ma->max; i++)  {

		if (i == *index)
			continue;

		mask = rcu_dereference_ovsl(ma->masks[i]);
		if (unlikely(!mask))
			break;

		flow = masked_flow_lookup(ti, key, mask, n_mask_hit);
		if (flow) { /* Found */
			*index = i;
			return flow;
		}
	}

	return NULL;
}

index的目的是从mask_array->masks[index]从取sw_flow_mask,此sw_flow_mask作为入参进入masked_flow_lookup流程:

static struct sw_flow *masked_flow_lookup(struct table_instance *ti,
					  const struct sw_flow_key *unmasked,
					  const struct sw_flow_mask *mask,
					  u32 *n_mask_hit)
{
	struct sw_flow *flow;
	struct hlist_head *head;
	u32 hash;
	struct sw_flow_key masked_key;

	ovs_flow_mask_key(&masked_key, unmasked, false, mask);
	hash = flow_hash(&masked_key, &mask->range);
	head = find_bucket(ti, hash);
	(*n_mask_hit)++;

	hlist_for_each_entry_rcu(flow, head, flow_table.node[ti->node_ver],
				lockdep_ovsl_is_held()) {
		if (flow->mask == mask && flow->flow_table.hash == hash &&
		    flow_cmp_masked_key(flow, &masked_key, &mask->range))
			return flow;
	}
	return NULL;
}

此流程便是查找sw_flow的最后流程,此流程所做的工作如下:

  • 根据sw_flow_mask和从skb中提取的sw_flow_key按mask中的range做与运算,结果为hash
  • 根据hash在hlist哈希表中找桶头head
  • 根据head桶头依次遍历所有的flow
  • 匹配成功的条件为flow->mask==maskflow->flow_table.hash==hashsw_flow_key内容相同
    至此,查找过程就结束了。

总结

因为此文仅为ovs kernel流表缓存查找过程,所以画个图总结一下此过程,流表怎么建立更新另开一文。

原文链接:,转发请注明来源!