数据的接收
当网卡有数据收到时,网卡会产生硬件中断,驱动在中断处理函数中读取通道上的数据并将数据发往上层,由于会频繁的收到网络数据包,中断处理函数将耗时的数据处理放在低半部处理,常用的低半部机制为tasklet和softirq。网卡驱动就将耗时的操作放到了softirq中来完成,内核定义了NET_TX_SOFTIRQ和NET_RX_SOFTIRQ,因此网卡中断处理程序一般处理例程为首先读取状态寄存器信息,判断是发送成功中断还是接受数据中断,如果是接收数据中断则先用dev_alloc_skb分配skb,然后把数据拷贝到有效数据区,调用netif_rx触发NETIF_RX_SOFTIRQ软中断。在中断处理程序外地进程上下文也可用netif_rx_ni处理收到的数据(中断函数往往简单的改变一个全局变量,进程不断轮询该全局变量的值检查是否有数据达到,在中断中可以释放信号量或锁,在处理rx数据的进程中获得信号量或锁)。
分析netif_rx函数
net/core/dev.c
int netif_rx(structsk_buff *skb)
{
trace_netif_rx_entry(skb);
return netif_rx_internal(skb);
}
static intnetif_rx_internal(struct sk_buff *skb)
{
int ret;
net_timestamp_check(netdev_tstamp_prequeue, skb);
trace_netif_rx(skb);
{
unsigned int qtail;
ret = enqueue_to_backlog(skb,get_cpu(), &qtail);
put_cpu();
}
return ret;
}
netif_rx_internal调用enqueue_to_backlog()来处理
* enqueue_to_backlog iscalled to queue an skb to a per CPU backlog
* queue (may be a remoteCPU queue)
static intenqueue_to_backlog(struct sk_buff *skb, int cpu,
unsigned int *qtail)
{
struct softnet_data *sd;
unsigned long flags;
unsigned int qlen;
sd = &per_cpu(softnet_data, cpu); //取指定CPU的per-cpu变量值
local_irq_save(flags);
rps_lock(sd);
if (!netif_running(skb->dev)) //检查dev->state标志,设备是否开启
goto drop;
qlen =skb_queue_len(&sd->input_pkt_queue); //sk_buff queue的长度
if (qlen <= netdev_max_backlog&& !skb_flow_limit(skb, qlen)) {
if (qlen) { //input_pkt_queue不为空,直接将将skb加到队列中,
enqueue: //暂不分析flow_limit
__skb_queue_tail(&sd->input_pkt_queue, skb);
input_queue_tail_incr_save(sd, qtail);
rps_unlock(sd);
local_irq_restore(flags);
return NET_RX_SUCCESS;
}
/* Schedule NAPI for backlogdevice
* We can use non atomicoperation since we own the queue lock
*/
if(!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {
if(!rps_ipi_queued(sd))
____napi_schedule(sd, &sd->backlog);
}//将sd->backlog添加到sd->poll_list,并且触发软中断NET_RX_SOFTIRQ,关闭硬件中断
goto enqueue; //添sk_buf加到队列
}
}
首先获取当前cpu的softnet_data实例sd,然后:
1. 如果接收队列sd->input_pkt_queue不为空,说明已经有软中断在处理数据包了,则不需要再次触发软中断,直接将数据包添加到接收队列尾部即可。
2. 如果接收队列sd->input_pkt_queue为空,说明当前没有软中断在处理数据包,则把虚拟设备backlog添加到sd->poll_list中以便进行轮询,最后设置NET_RX_SOFTIRQ标志触发软中断。
3. 如果接收队列sd->input_pkt_queue满了,则直接丢弃数据包。
触发软中断(NET_RX_SOFTIRQ)后,接收数据包的下半部处理流程为:
net_rx_action // 软中断处理函数
|--> process_backlog() // 默认poll
|-->__netif_receive_skb() // L2处理函数
|--> ip_rcv() // L3入口
net_rx_action主要工作:
遍历sd->poll_list,对于每个处于轮询状态的设备,调用它的poll()函数来处理数据包。
如果设备NAPI被禁止了,则把设备从sd->poll_list上删除,否则把设备移动到sd->poll_list的队尾,每次软中断最多允许处理netdev_budget(300)个数据包,最长运行时间为2jiffies(2ms),每个设备一次最多允许处理weight_p(64)个数据包(非NAPI)。
如果在这次软中断中没处理完,则再次设置NET_RX_SOFTIRQ标志触发软中断。
static void net_rx_action(struct softirq_action *h)
{
struct softnet_data *sd = this_cpu_ptr(&softnet_data);
unsigned long time_limit = jiffies + 2; 一次软中断最长2ms
int budget = netdev_budget;一次最多处理300个skb
LIST_HEAD(list);
LIST_HEAD(repoll);
local_irq_disable();
list_splice_init(&sd->poll_list,&list); 将sd->poll_list合并到list上
local_irq_enable();
for (;;) {
struct napi_struct *n;
n = list_first_entry(&list,struct napi_struct, poll_list);
budget -= napi_poll(n,&repoll);//调用poll方法process_backlog
将包发往IP层, process_backlog函数将进一步调用netif_receive_skb()将数据包传上协议栈;如果用NAPI方式,则需要driver实现poll方法,也将调用netif_receive_skb()函数,而不会用process_backlog.
}
local_irq_disable();
}
static intnapi_poll(struct napi_struct *n, struct list_head *repoll)
{
void *have;
int work, weight;
list_del_init(&n->poll_list);//从poll_list中移除本napi_struct
have = netpoll_poll_lock(n);
weight = n->weight; //net_dev_init中定义为64
work = 0;
if (test_bit(NAPI_STATE_SCHED,&n->state)) {
work = n->poll(n, weight);//work为实际发送的skb数量,一次发送的上限个数为weight,如果为NAPI中这个poll函数在驱动中实现,用netif_napi_add添加
trace_napi_poll(n);
}
WARN_ON_ONCE(work > weight);
if (likely(work < weight))
goto out_unlock;
out_unlock:
netpoll_poll_unlock(have);
return work;
}
static intprocess_backlog(struct napi_struct *napi, int quota)
{
int work = 0;
struct softnet_data *sd =container_of(napi, struct softnet_data, backlog);
napi->weight = weight_p;
local_irq_disable();
while (1) {
struct sk_buff *skb;
/*从process_queue队列中取出一个skb发往层*/
while ((skb =__skb_dequeue(&sd->process_queue))) {
rcu_read_lock();
local_irq_enable();
__netif_receive_skb(skb);
rcu_read_unlock();
local_irq_disable();
input_queue_head_incr(sd);
if (++work >= quota){
local_irq_enable();
return work;
}
}
/*将input_pkt_queue队列接到process_queue,即将input_pkt_queue中的skb放入process_queue队列中*/
skb_queue_splice_tail_init(&sd->input_pkt_queue,
&sd->process_queue);
}
local_irq_enable();
return work;
}
NAPI和非NAPI的区别
以下是非NAPI设备和NAPI设备的数据包接收流程对比图:
(1) 支持NAPI的网卡驱动必须提供轮询方法poll()。
(2) 非NAPI的内核接口为netif_rx(),NAPI的内核接口为napi_schedule()。
(3) 非NAPI使用共享的CPU队列softnet_data->input_pkt_queue,NAPI使用设备内存(或者设备驱动程序的接收环)。
NAPI方式最大的好处是:poll函数中,不一定只处理一个报文。具体怎么处理可以由驱动程序灵活控制。比如说,假设现在网络负载非常大,如果网络设备每接收一个报文都通过一次中断来告知内核,这样做效率并不理想。而此时poll可以做一些轮询的工作,如果网络设备已经接收了多个报文,可以一次性都处理了。并且,就算设备此刻所接收到的报文都已经处理完了,驱动程序也可以根据某种方式预判设备在很短的一段时间内还将收到报文,于是依然将自己对应的napi结构留在poll_list中,等待下一次继续被调度。
因此对于NAPI,驱动要做的是分配napi结构体(可放在netdev的priv数据中一起分配),实现其poll函数,调用napi_schedule函数。