DPDK——cpu亲和性线程池

0.为什么要有cpu亲和性

在多核运行的机器上,每个CPU本身自己会有缓存,在缓存中存着进程使用的数据,而没有绑定CPU的话,进程可能会被操作系统调度到其他CPU上,如此CPU cache(高速缓冲存储器)命中率就低了,也就是说调到的CPU缓存区中原来没有这类数据,要先把内存或硬盘的数据载入缓存。而当缓存区绑定CPU后,程序就会一直在指定的CPU执行,不会被操作系统调度到其他CPU,性能上会有一定的提高。

为了让程序拥有更好的性能,有时候需要将进程或线程绑定到特定的CPU上,这样可以减少调度的开销和保护关键进程或线程。

1.重要的结构体

1.1 逻辑核心

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Structure storing internal configuration (per-lcore)
*/
struct lcore_config {
pthread_t thread_id; /**< pthread identifier */
int pipe_main2worker[2]; /**< communication pipe with main */
int pipe_worker2main[2]; /**< communication pipe with main */

lcore_function_t * volatile f; /**< function to call */
void * volatile arg; /**< argument of function */
volatile int ret; /**< return value of function */

volatile enum rte_lcore_state_t state; /**< lcore state */
unsigned int socket_id; /**< physical socket id for this lcore */
unsigned int core_id; /**< core number on socket for this lcore */
int core_index; /**< relative index, starting from 0 */
uint8_t core_role; /**< role of core eg: OFF, RTE, SERVICE */

rte_cpuset_t cpuset; /**< cpu set which the lcore affinity to */
};

extern struct lcore_config lcore_config[RTE_MAX_LCORE];

该结构体在下称之为 逻辑核心描述符 用于描述dpdk线程池中的某一个线程。这里面其实就是一个线程任务描述符,(此线程描述符非彼线程描述符)把他抽象成一个核心而已。

  • 线程id
  • 与主线程通信的两个管道
  • 该逻辑核心要完成的工作(函数指针,参数,返回值)
  • 当前逻辑核心状态
  • socket_id core_id 和numa架构有关暂且不管
  • 然后是索引
  • 然后是状态 是否被dpdk标记使用,是否正在被使用
  • 该逻辑核心与那个cpu集亲和。 比如亲和的是物理编号为(0,1)的cpu那么这个线程就只会被这两个核心去调度运行。我们的程序不做特殊说明,默认的情况下,一个线程与核心是1:1.

1.2 运行时环境设置(Run-time evironment)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* The global RTE configuration structure.
*/
struct rte_config {
uint32_t main_lcore; /**< Id of the main lcore */
uint32_t lcore_count; /**< Number of available logical cores. */
uint32_t numa_node_count; /**< Number of detected NUMA nodes. */
uint32_t numa_nodes[RTE_MAX_NUMA_NODES]; /**< List of detected NUMA nodes. */
uint32_t service_lcore_count;/**< Number of available service cores. */
enum rte_lcore_role_t lcore_role[RTE_MAX_LCORE]; /**< State of cores. */

/** Primary or secondary configuration */
enum rte_proc_type_t process_type;

/** PA or VA mapping mode */
enum rte_iova_mode iova_mode;

/**
* Pointer to memory configuration, which may be shared across multiple
* DPDK instances
*/
struct rte_mem_config *mem_config;
} __rte_packed;

挑重点的说了,

  • 主线程号

  • 逻辑线程数量

  • 每个逻辑线程目前的角色(状态)
  • 进程的类型(同一台机器只能运行一个dpdk主进程,其他的需要加上从进程参数)

2.线程池的创建和初始化

线程池的创建和初始化工作均在环境抽象层初始化的时候完成,

1
int ret = rte_eal_init(initDpdkArgc, (char**)initDpdkArgvBuffer);

其中我们比较关心的参数是,核心数。在我们的代码中我们默认0号核分配给主线程。

首先我们在dpdk环境抽象层的配置中可以找到关于最大核心数的宏定义,

1
#define RTE_MAX_LCORE 128

2.1 首先对操作系统的处理器核心做初始化

1
2
3
4
5
6
7
8
9
10
11
if (eal_create_cpu_map() < 0) {
rte_eal_init_alert("Cannot discover CPU and NUMA.");
/* rte_errno is set */
return -1;
}

if (rte_eal_cpu_init() < 0) {
rte_eal_init_alert("Cannot detect lcores.");
rte_errno = ENOTSUP;
return -1;
}

这里主要涉及到两个主要的函数,eal_create_cpu_map()rte_eal_cpu_init()。前者没细看,大概就是为后者提供一个物理cpu核心的逻辑标识。后者的代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
int
rte_eal_cpu_init(void)
{
/* pointer to global configuration */
struct rte_config *config = rte_eal_get_configuration();
unsigned lcore_id;
unsigned count = 0;
unsigned int socket_id, prev_socket_id;
int lcore_to_socket_id[RTE_MAX_LCORE];

/*
* Parse the maximum set of logical cores, detect the subset of running
* ones and enable them by default.
*/
for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
lcore_config[lcore_id].core_index = count;

/* init cpuset for per lcore config */
CPU_ZERO(&lcore_config[lcore_id].cpuset);

/* find socket first */
socket_id = eal_cpu_socket_id(lcore_id);
lcore_to_socket_id[lcore_id] = socket_id;

if (eal_cpu_detected(lcore_id) == 0) {
config->lcore_role[lcore_id] = ROLE_OFF;
lcore_config[lcore_id].core_index = -1;
continue;
}

/* By default, lcore 1:1 map to cpu id */
CPU_SET(lcore_id, &lcore_config[lcore_id].cpuset);

/* By default, each detected core is enabled */
config->lcore_role[lcore_id] = ROLE_RTE;
lcore_config[lcore_id].core_role = ROLE_RTE;
lcore_config[lcore_id].core_id = eal_cpu_core_id(lcore_id);
lcore_config[lcore_id].socket_id = socket_id;
RTE_LOG(DEBUG, EAL, "Detected lcore %u as "
"core %u on socket %u\n",
lcore_id, lcore_config[lcore_id].core_id,
lcore_config[lcore_id].socket_id);
count++;
}
for (; lcore_id < CPU_SETSIZE; lcore_id++) {
if (eal_cpu_detected(lcore_id) == 0)
continue;
RTE_LOG(DEBUG, EAL, "Skipped lcore %u as core %u on socket %u\n",
lcore_id, eal_cpu_core_id(lcore_id),
eal_cpu_socket_id(lcore_id));
}

/* Set the count of enabled logical cores of the EAL configuration */
config->lcore_count = count;
RTE_LOG(DEBUG, EAL,
"Support maximum %u logical core(s) by configuration.\n",
RTE_MAX_LCORE);
RTE_LOG(INFO, EAL, "Detected %u lcore(s)\n", config->lcore_count);

/* sort all socket id's in ascending order */
qsort(lcore_to_socket_id, RTE_DIM(lcore_to_socket_id),
sizeof(lcore_to_socket_id[0]), socket_id_cmp);

prev_socket_id = -1;
config->numa_node_count = 0;
for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
socket_id = lcore_to_socket_id[lcore_id];
if (socket_id != prev_socket_id)
config->numa_nodes[config->numa_node_count++] =
socket_id;
prev_socket_id = socket_id;
}
RTE_LOG(INFO, EAL, "Detected %u NUMA nodes\n", config->numa_node_count);

return 0;
}

可以看到第一个for循环内,

1
lcore_config[lcore_id].core_index = count;

count也是自增的,所以count实际上和索引号 lcore_id是一样的。然后会有检测,不过这个检测只是检测这个lcore_id是否超出了cpu核的数量。

1
2
3
4
5
if (eal_cpu_detected(lcore_id) == 0) {
config->lcore_role[lcore_id] = ROLE_OFF;
lcore_config[lcore_id].core_index = -1;
continue;
}

然后继续看这个for循环。

1
2
3
4
5
6
7
8
9
10
11
/* init cpuset for per lcore config */
CPU_ZERO(&lcore_config[lcore_id].cpuset);

//定义
#define CPU_ZERO(s) \
do { \
unsigned int _i; \
\
for (_i = 0; _i < _NUM_SETS(CPU_SETSIZE); _i++) \
(s)->_bits[_i] = 0LL; \
} while (0)

这个函数从定义来看的作用是,这个宏定义用于清空一个cpu集合,将其中所有的位都置为0。它的参数是一个指向cpu_set_t类型的指针,这个类型是一个结构体,包含一个数组bits,每个元素是一个64位的整数。NUM_SETS(CPU_SETSIZE)是一个宏函数,用于计算数组bits的长度,它的参数CPU_SETSIZEcpu集合中能表示的最大cpu数目。这个宏定义使用了do-while循环语句,遍历数组bits中的每个元素,并将其赋值为0LL(表示长整型常量0)。这样就实现了清空cpu集合的功能。

最终也就是将每一个lcore_config[lcore_id].cpuset都初始化为0。初始化之后,

1
2
3
4
5
/* By default, lcore 1:1 map to cpu id */
CPU_SET(lcore_id, &lcore_config[lcore_id].cpuset);

//定义如下
#define CPU_SET(b, s) ((s)->_bits[_WHICH_SET(b)] |= (1LL << _WHICH_BIT(b)))

从定义来看, CPU_SET这个宏定义用于将一个cpu集合中的某个位设置为1,表示将对应的cpu加入到集合中。它的参数是一个整数b,表示要设置的位的序号,和一个指向cpu_set_t类型的指针s,表示要操作的cpu集合。WHICH_SET(b)是一个宏函数,用于计算b所在的数组bits中的元素下标。WHICH_BIT(b)是一个宏函数,用于计算b在该元素中的位偏移量。这个宏定义使用了按位或运算符(|=),将数组bits中相应元素的相应位设置为1,实现了将cpu加入到集合中的功能。

也就是说,将&lcore_config[lcore_id].cpuset中的lcore_id位置为1了,代表用了这个cpu。但这也不是具体的线程与cpu核的绑定过程,可以说这只是一种口头上的绑定。

然后就是对 lcore_config[lcore_id]其他成员的一系列赋值

1
2
3
4
5
6
7
8
9
/* By default, each detected core is enabled */
config->lcore_role[lcore_id] = ROLE_RTE;
lcore_config[lcore_id].core_role = ROLE_RTE;
lcore_config[lcore_id].core_id = eal_cpu_core_id(lcore_id);
lcore_config[lcore_id].socket_id = socket_id;
RTE_LOG(DEBUG, EAL, "Detected lcore %u as "
"core %u on socket %u\n",
lcore_id, lcore_config[lcore_id].core_id,
lcore_config[lcore_id].socket_id);

后面就没什么重要的了(本项目可以不那么关注)。

2.2 线程创建与cpu核的绑定过程

2.1分析的主要是lcore_config 这个逻辑核描述符的初始化过程。 下面分析线程与cpu核的绑定过程。在环境抽象层的初始化过程中,首先绑定的就是dpdk主线程。

1
2
3
4
5
6
if (pthread_setaffinity_np(pthread_self(), sizeof(rte_cpuset_t),
&lcore_config[config->main_lcore].cpuset) != 0) {
rte_eal_init_alert("Cannot set affinity");
rte_errno = EINVAL;
return -1;
}

可以看到,使用到了`pthread_setaffinity_nppthread库为我们提供的设置线程的cpu亲和性的接口:

pthread_setaffinity_np函数用法详解 (noerror.net)

Linux中CPU亲和性(affinity) - 知乎 (zhihu.com)

源码没怎么看懂,不过根据网上关于pthread_setaffinity_np 的解释。简单来说,当调用 pthread_setaffinity_np 时,如果线程不在 cpuset 指定的 CPU 集合中运行,那么调度器会将该线程从当前 CPU 上移除,并将其加入到 cpuset 中某个 CPU 的就绪队列中。然后,调度器会选择一个合适的时机,将该线程从就绪队列中取出,并加载到目标 CPU 上执行。

因此接下来,由主线程创建具有cpu亲和性的线程就是使用的pthread_setaffinity_np。具体流程如下

rte_eal_init函数中我们找到了这样一段比较重要的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
RTE_LCORE_FOREACH_WORKER(i) {

/*
* create communication pipes between main thread
* and children
*/
if (pipe(lcore_config[i].pipe_main2worker) < 0)
rte_panic("Cannot create pipe\n");
if (pipe(lcore_config[i].pipe_worker2main) < 0)
rte_panic("Cannot create pipe\n");

lcore_config[i].state = WAIT;

/* create a thread for each lcore */
ret = pthread_create(&lcore_config[i].thread_id, NULL,
eal_thread_loop, NULL);
if (ret != 0)
rte_panic("Cannot create thread\n");

/* Set thread_name for aid in debugging. */
snprintf(thread_name, sizeof(thread_name),
"lcore-worker-%d", i);
ret = rte_thread_setname(lcore_config[i].thread_id,
thread_name);
if (ret != 0)
RTE_LOG(DEBUG, EAL,
"Cannot set name for lcore thread\n");

ret = pthread_setaffinity_np(lcore_config[i].thread_id,
sizeof(rte_cpuset_t), &lcore_config[i].cpuset);
if (ret != 0)
rte_panic("Cannot set affinity\n");
}

其中 RTE_LCORE_FOREACH_WORKER(i)的定义是

也就是找到所有供dpdk使用的逻辑线程。且不为主线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#define RTE_LCORE_FOREACH_WORKER(i)					\
for (i = rte_get_next_lcore(-1, 1, 0); \
i < RTE_MAX_LCORE; \
i = rte_get_next_lcore(i, 1, 0))

// ==》
unsigned int rte_get_next_lcore(unsigned int i, int skip_main, int wrap)
{
i++;
if (wrap)
i %= RTE_MAX_LCORE;

while (i < RTE_MAX_LCORE) {
if (!rte_lcore_is_enabled(i) ||
(skip_main && (i == rte_get_main_lcore()))) {
i++;
if (wrap)
i %= RTE_MAX_LCORE;
continue;
}
break;
}
return i;
}

//==》
int rte_lcore_is_enabled(unsigned int lcore_id)
{
struct rte_config *cfg = rte_eal_get_configuration();

if (lcore_id >= RTE_MAX_LCORE)
return 0;
return cfg->lcore_role[lcore_id] == ROLE_RTE;
}

看完for循环的是什么东西,再看看for循环里面的内容。首先创建两个通道,用来实现主线程和工作线程的通信。也就是说每个工作线程和主线程之间都有两个通信管道。管道创建之后,再用lcore_config[i].thread_id创建线程。最后线程的id号会赋给lcore_config[i].thread_id。也就是重点关注这两个函数。

1
2
3
4
5
6
/* create a thread for each lcore */
ret = pthread_create(&lcore_config[i].thread_id, NULL,
eal_thread_loop, NULL);

ret = pthread_setaffinity_np(lcore_config[i].thread_id,
sizeof(rte_cpuset_t), &lcore_config[i].cpuset);

也就是说用前者创建线程,用后者设置cpu亲和性。就通了。至此,总结一下环境抽象层初始化阶段对于cpu亲和性线程池的初始化以及绑定cpu工作。 首先初始化128个,逻辑核描述符,给这个描述符的cpuset进行初始化,后面依次分配。我们默认是一比一分配。也就是一个逻辑核对应一个cpu核。然后再用逻辑描述符的thread_id创建线程(调用pthread_create创建),并调用pthread_setaffinity_np进行线程与cpu核的绑定。绑定之后task_struct也就是PCB(linux并不区分进程线程,所以调度都是以task_struct进行调度)中的cpus_allowed只有一位会是1。

注:task_struct 中关于 CPU 集合的成员是 cpus_allowed,它是一个 cpumask_t 类型的变量,用于表示该任务可以运行在哪些 CPU 上。cpumask_t 是一个位图结构,每一位对应一个 CPU 编号,如果该位为 1,则表示该任务可以运行在该 CPU 上;如果为 0,则表示不能运行在该 CPU 上。

3.线程池的工作模式

之前我们创建线程池中的线程的时候,使用到的函数指针是eal_thread_loop,也就是我们的每个工作线程需要去处理的工作,我们看看到底是啥

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/* main loop of threads */
__rte_noreturn void *
eal_thread_loop(__rte_unused void *arg)
{
char c;
int n, ret;
unsigned lcore_id;
pthread_t thread_id;
int m2w, w2m;
char cpuset[RTE_CPU_AFFINITY_STR_LEN];

thread_id = pthread_self();

/* retrieve our lcore_id from the configuration structure */
RTE_LCORE_FOREACH_WORKER(lcore_id) {
if (thread_id == lcore_config[lcore_id].thread_id)
break;
}

if (lcore_id == RTE_MAX_LCORE)
rte_panic("cannot retrieve lcore id\n");

m2w = lcore_config[lcore_id].pipe_main2worker[0];
w2m = lcore_config[lcore_id].pipe_worker2main[1];

__rte_thread_init(lcore_id, &lcore_config[lcore_id].cpuset);

ret = eal_thread_dump_current_affinity(cpuset, sizeof(cpuset));
RTE_LOG(DEBUG, EAL, "lcore %u is ready (tid=%zx;cpuset=[%s%s])\n",
lcore_id, (uintptr_t)thread_id, cpuset, ret == 0 ? "" : "...");

rte_eal_trace_thread_lcore_ready(lcore_id, cpuset);

/* read on our pipe to get commands */
while (1) {
void *fct_arg;

/* wait command */
do {
n = read(m2w, &c, 1);
} while (n < 0 && errno == EINTR);

if (n <= 0)
rte_panic("cannot read on configuration pipe\n");

lcore_config[lcore_id].state = RUNNING;

/* send ack */
n = 0;
while (n == 0 || (n < 0 && errno == EINTR))
n = write(w2m, &c, 1);
if (n < 0)
rte_panic("cannot write on configuration pipe\n");

if (lcore_config[lcore_id].f == NULL)
rte_panic("NULL function pointer\n");

/* call the function and store the return value */
fct_arg = lcore_config[lcore_id].arg;
ret = lcore_config[lcore_id].f(fct_arg);
lcore_config[lcore_id].ret = ret;
rte_wmb();

/* when a service core returns, it should go directly to WAIT
* state, because the application will not lcore_wait() for it.
*/
if (lcore_config[lcore_id].core_role == ROLE_SERVICE)
lcore_config[lcore_id].state = WAIT;
else
lcore_config[lcore_id].state = FINISHED;
}

/* never reached */
/* pthread_exit(NULL); */
/* return NULL; */
}

3.1 首先拿到自己对应的lcore_config

通过比对自身的thread_idlcore_config[lcore_id].thread_id拿到对应的lcore_config

1
2
3
4
5
6
thread_id = pthread_self();
/* retrieve our lcore_id from the configuration structure */
RTE_LCORE_FOREACH_WORKER(lcore_id) {
if (thread_id == lcore_config[lcore_id].thread_id)
break;
}

3.2 根据lcore_config做一些设置

获取到属于自己的与主进程进行通信的管道

1
2
3
m2w = lcore_config[lcore_id].pipe_main2worker[0];
w2m = lcore_config[lcore_id].pipe_worker2main[1];

3.3 工作线程一直在做的事情

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/* read on our pipe to get commands */
while (1) {
void *fct_arg;

/* wait command */
do {
n = read(m2w, &c, 1);
} while (n < 0 && errno == EINTR);

if (n <= 0)
rte_panic("cannot read on configuration pipe\n");

lcore_config[lcore_id].state = RUNNING;

/* send ack */
n = 0;
while (n == 0 || (n < 0 && errno == EINTR))
n = write(w2m, &c, 1);
if (n < 0)
rte_panic("cannot write on configuration pipe\n");

if (lcore_config[lcore_id].f == NULL)
rte_panic("NULL function pointer\n");

/* call the function and store the return value */
fct_arg = lcore_config[lcore_id].arg;
ret = lcore_config[lcore_id].f(fct_arg);
lcore_config[lcore_id].ret = ret;
rte_wmb();

/* when a service core returns, it should go directly to WAIT
* state, because the application will not lcore_wait() for it.
*/
if (lcore_config[lcore_id].core_role == ROLE_SERVICE)
lcore_config[lcore_id].state = WAIT;
else
lcore_config[lcore_id].state = FINISHED;
}

首先,一直阻塞,直到从管道内读到了主线程对自己发出的命令。状态由 WAIT切换为 RUNNING,然后发出相应表明自己收到了这个命令。一般收到命令的时候,lcore_configfarg就已经被赋值完毕了。相当于是先得知要做的任务以及任务参数,然后在收到开始做的命令。

然后

1
2
3
4
/* call the function and store the return value */
fct_arg = lcore_config[lcore_id].arg;
ret = lcore_config[lcore_id].f(fct_arg);
lcore_config[lcore_id].ret = ret;

调用这个函数,由于我们的项目function本身就自带一个死循环,只要程序执行就不会跳出,所以一旦一个线程分配了任务,就会把这个任务一直执行下去直到我们的项目程序停止。所以后面就没再看了。

4. 线程池初始化首尾

环境抽象层的初始化工作对于线程池的初始化工作进行到了最后的部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
 rte_eal_mp_remote_launch(sync_func, NULL, SKIP_MAIN);
rte_eal_mp_wait_lcore();

//以及这两个函数的定义
int
rte_eal_mp_remote_launch(int (*f)(void *), void *arg,
enum rte_rmt_call_main_t call_main)
{
int lcore_id;
int main_lcore = rte_get_main_lcore();

/* check state of lcores */
RTE_LCORE_FOREACH_WORKER(lcore_id) {
if (lcore_config[lcore_id].state != WAIT)
return -EBUSY;
}

/* send messages to cores */
RTE_LCORE_FOREACH_WORKER(lcore_id) {
rte_eal_remote_launch(f, arg, lcore_id);
}

if (call_main == CALL_MAIN) {
lcore_config[main_lcore].ret = f(arg);
lcore_config[main_lcore].state = FINISHED;
}

return 0;
}

/*
* Do a rte_eal_wait_lcore() for every lcore. The return values are
* ignored.
*/
void
rte_eal_mp_wait_lcore(void)
{
unsigned lcore_id;

RTE_LCORE_FOREACH_WORKER(lcore_id) {
rte_eal_wait_lcore(lcore_id);
}
}

/*
* Wait until a lcore finished its job.
*/
int
rte_eal_wait_lcore(unsigned worker_id)
{
if (lcore_config[worker_id].state == WAIT)
return 0;

while (lcore_config[worker_id].state != WAIT &&
lcore_config[worker_id].state != FINISHED)
rte_pause();

rte_rmb();

/* we are in finished state, go to wait state */
lcore_config[worker_id].state = WAIT;
return lcore_config[worker_id].ret;
}


5.如何使用线程池中的线程

也就是如何通过主线程给工作线程分配任务呢?主要是通过下面的接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/*
* Send a message to a worker lcore identified by worker_id to call a
* function f with argument arg. Once the execution is done, the
* remote lcore switch in FINISHED state.
*/
int
rte_eal_remote_launch(int (*f)(void *), void *arg, unsigned int worker_id)
{
int n;
char c = 0;
int m2w = lcore_config[worker_id].pipe_main2worker[1];
int w2m = lcore_config[worker_id].pipe_worker2main[0];
int rc = -EBUSY;

if (lcore_config[worker_id].state != WAIT)
goto finish;

lcore_config[worker_id].f = f;
lcore_config[worker_id].arg = arg;

/* send message */
n = 0;
while (n == 0 || (n < 0 && errno == EINTR))
n = write(m2w, &c, 1);
if (n < 0)
rte_panic("cannot write on configuration pipe\n");

/* wait ack */
do {
n = read(w2m, &c, 1);
} while (n < 0 && errno == EINTR);

if (n <= 0)
rte_panic("cannot read on configuration pipe\n");

rc = 0;
finish:
rte_eal_trace_thread_remote_launch(f, arg, worker_id, rc);
return rc;
}

逻辑很简单,给一个lcore_config 分配 farg,再用过队列给他下达开始执行的命令就好了。

6.总结

cpu亲和性线程池实现的核心就是 lcore_config。它关联了 线程 ,cpu,任务。首先我们先用主线程id初始化lcore_config并绑定cpu核。我们默认cpu核心与线程是一比一绑定。所以lcore_config的索引就是我们要使用cpu的索引。

通过主线程初始化指定个数的工作线程,

  • 首先就是要初始化对应个数的lcore_config

  • 然后在初始化亲和的cpu。一般用掩码表示,比如64位哪位为1,就代表64个cpu核心我用哪一个。就像我们刚才所讲的我们默认cpu核心与线程是一比一绑定。所以lcore_config的索引就是我们要使用cpu的索引。设置过后

  • 通过调用 pthread_create 创建线程,线程跑的是 eal_thread_loop

  • 通过调用pthread_setaffinity_np进行亲和性cpu的绑定。

  • 通过调用

    1
    2
    int 
    rte_eal_remote_launch(int (*f)(void *), void *arg, unsigned int worker_id)

    给线程发布任务。主要就是给lcore_configfarg通过消息队列告诉他,可以执行啦,他就去执行了,

  • eal_thread_loop中就是首先根据自己的进程id找到对应的lcore_config。拿到这个就可以通过管道得知有任务交给我了。那么这个时候lcore_configfarg往往已经赋值好了,执行就完事儿了。

实际上害有更新亲和性cpu的操作,但是基本用不到所以那块儿我就没看。

哇!!!终于走通了!!!好耶!!!!!!!! 单纯使用的话,其实不需要搞懂这么多。