1 Star 0 Fork 0

王增亮/temp

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
0001-bdev_balance.patch 6.43 KB
一键复制 编辑 原始数据 按行查看 历史
wangzengliang1 提交于 2022-10-12 17:18 +08:00 . bdev_balance
From 6796d7df554be9227620baedad0f45310a19ef53 Mon Sep 17 00:00:00 2001
From: wangzengliang <wangzengliang1@huawei.com>
Date: Wed, 12 Oct 2022 17:17:11 +0800
Subject: [PATCH] bdev_balance
---
module/bdev/rbd/bdev_rbd.c | 99 ++++++++++++++++++++++++++++++++++----
1 file changed, 89 insertions(+), 10 deletions(-)
diff --git a/module/bdev/rbd/bdev_rbd.c b/module/bdev/rbd/bdev_rbd.c
index 0045f34cf..61150c9f6 100644
--- a/module/bdev/rbd/bdev_rbd.c
+++ b/module/bdev/rbd/bdev_rbd.c
@@ -51,6 +51,11 @@
static int bdev_rbd_count = 0;
+struct bdev_rbd_thread {
+ struct spdk_thread *thread;
+ STAILQ_ENTRY(bdev_rbd_thread) link;
+};
+
struct bdev_rbd {
struct spdk_bdev disk;
char *rbd_name;
@@ -67,7 +72,9 @@ struct bdev_rbd {
rbd_image_info_t info;
pthread_mutex_t mutex;
+ bool main_td_rescheduled;
struct spdk_thread *main_td;
+ struct spdk_thread *construct_td;
struct spdk_thread *destruct_td;
uint32_t ch_count;
struct spdk_io_channel *group_ch;
@@ -102,7 +109,9 @@ struct bdev_rbd_cluster {
static STAILQ_HEAD(, bdev_rbd_cluster) g_map_bdev_rbd_cluster = STAILQ_HEAD_INITIALIZER(
g_map_bdev_rbd_cluster);
static pthread_mutex_t g_map_bdev_rbd_cluster_mutex = PTHREAD_MUTEX_INITIALIZER;
-
+static STAILQ_HEAD(, bdev_rbd_thread) g_bdev_rbd_threads =
+ STAILQ_HEAD_INITIALIZER(g_bdev_rbd_threads);
+static struct bdev_rbd_thread *g_bdev_rbd_next_thread;
static void
bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry)
{
@@ -549,18 +558,18 @@ bdev_rbd_destruct(void *ctx)
struct bdev_rbd *rbd = ctx;
struct spdk_thread *td;
- if (rbd->main_td == NULL) {
+ if (rbd->construct_td == NULL) {
td = spdk_get_thread();
} else {
- td = rbd->main_td;
+ td = rbd->construct_td;
}
/* Start the destruct operation on the rbd bdev's
- * main thread. This guarantees it will only start
+ * construct thread. This guarantees it will only start
* executing after any messages related to channel
* deletions have finished completing. *Always*
* send a message, even if this function gets called
- * from the main thread, in case there are pending
+ * from the construct thread, in case there are pending
* channel delete messages in flight to this thread.
*/
assert(rbd->destruct_td == NULL);
@@ -658,7 +667,7 @@ static void
bdev_rbd_free_channel_resources(struct bdev_rbd *disk)
{
assert(disk != NULL);
- assert(disk->main_td == spdk_get_thread());
+ assert(disk->construct_td == spdk_get_thread());
assert(disk->ch_count == 0);
spdk_put_io_channel(disk->group_ch);
@@ -666,6 +675,7 @@ bdev_rbd_free_channel_resources(struct bdev_rbd *disk)
bdev_rbd_exit(disk->image);
}
+ disk->construct_td = NULL;
disk->main_td = NULL;
disk->group_ch = NULL;
}
@@ -698,6 +708,64 @@ _bdev_rbd_create_cb(struct bdev_rbd *disk)
return 0;
}
+
+static void
+bdev_rbd_reschedule_main_thread(struct bdev_rbd *disk)
+{
+ bool main_td_in_list = false;
+ bool cur_td_in_list = false;
+ struct bdev_rbd_thread *entry;
+ struct bdev_rbd_thread *rbd_thread;
+ struct spdk_thread *current_thread = spdk_get_thread();
+
+ if (STAILQ_EMPTY(&g_bdev_rbd_threads)) {
+ g_bdev_rbd_next_thread = NULL;
+ }
+
+ STAILQ_FOREACH(entry, &g_bdev_rbd_threads, link) {
+ if (disk->main_td == entry->thread) {
+ main_td_in_list = true;
+ }
+ if (current_thread == entry->thread) {
+ cur_td_in_list = true;
+ }
+ }
+ if (!main_td_in_list) {
+ rbd_thread = calloc(1, sizeof(struct bdev_rbd_thread));
+ if (rbd_thread) {
+ rbd_thread->thread = disk->main_td;
+ STAILQ_INSERT_TAIL(&g_bdev_rbd_threads, rbd_thread, link);
+ }
+ }
+ if (!cur_td_in_list) {
+ rbd_thread = calloc(1, sizeof(struct bdev_rbd_thread));
+ if (rbd_thread) {
+ rbd_thread->thread = current_thread;
+ STAILQ_INSERT_TAIL(&g_bdev_rbd_threads, rbd_thread, link);
+ }
+ }
+ if (!STAILQ_EMPTY(&g_bdev_rbd_threads) && !disk->main_td_rescheduled) {
+ if (g_bdev_rbd_next_thread == NULL) {
+ g_bdev_rbd_next_thread = STAILQ_FIRST(&g_bdev_rbd_threads);
+ }
+ disk->main_td = g_bdev_rbd_next_thread->thread;
+ disk->main_td_rescheduled = true;
+ g_bdev_rbd_next_thread = STAILQ_NEXT(g_bdev_rbd_next_thread, link);
+ if (g_bdev_rbd_next_thread == NULL) {
+ g_bdev_rbd_next_thread = STAILQ_FIRST(&g_bdev_rbd_threads);
+ }
+ }
+}
+
+/* This API will be called by users who create rbd dev io channels,
+ * adding the caller's threads in a list, and updating each dev's "main"
+ *thread from the list in round-robin fashion. Because the branch
+ * "if(disk->ch_count == 0)" is reentered by auto-examination and caller,
+ ** we only save caller's threads and reschedule to "main" thread for each disk, so
+ ** add the "main" thread to the global list if it is not in. This implementation
+ ** is built on the assumption that caller's threads have been created on different
+ ** CPU cores to share the load. */
+
static int
bdev_rbd_create_cb(void *io_device, void *ctx_buf)
{
@@ -709,6 +777,7 @@ bdev_rbd_create_cb(void *io_device, void *ctx_buf)
pthread_mutex_lock(&disk->mutex);
if (disk->ch_count == 0) {
assert(disk->main_td == NULL);
+ assert(disk->construct_td == NULL);
rc = _bdev_rbd_create_cb(disk);
if (rc) {
SPDK_ERRLOG("Cannot create channel for disk=%p\n", disk);
@@ -717,6 +786,9 @@ bdev_rbd_create_cb(void *io_device, void *ctx_buf)
}
disk->main_td = spdk_get_thread();
+ disk->construct_td = spdk_get_thread();
+ } else {
+ bdev_rbd_reschedule_main_thread(disk);
}
disk->ch_count++;
@@ -754,13 +826,13 @@ bdev_rbd_destroy_cb(void *io_device, void *ctx_buf)
assert(disk->ch_count > 0);
disk->ch_count--;
if (disk->ch_count == 0) {
- assert(disk->main_td != NULL);
- if (disk->main_td != spdk_get_thread()) {
+ assert(disk->construct_td != NULL);
+ if (disk->construct_td != spdk_get_thread()) {
/* The final channel was destroyed on a different thread
* than where the first channel was created. Pass a message
- * to the main thread to unregister the poller. */
+ * to the construct_td thread to unregister the poller. */
disk->ch_count++;
- thread = disk->main_td;
+ thread = disk->construct_td;
pthread_mutex_unlock(&disk->mutex);
spdk_thread_send_msg(thread, _bdev_rbd_destroy_cb, disk);
return;
@@ -1340,6 +1412,13 @@ bdev_rbd_library_init(void)
static void
bdev_rbd_library_fini(void)
{
+ struct bdev_rbd_thread *rbd_thread;
+
+ while ((rbd_thread = STAILQ_FIRST(&g_bdev_rbd_threads)) != NULL) {
+ STAILQ_REMOVE_HEAD(&g_bdev_rbd_threads, link);
+ free(rbd_thread);
+ }
+
spdk_io_device_unregister(&rbd_if, NULL);
}
--
2.27.0
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/wangzengliang1/temp.git
git@gitee.com:wangzengliang1/temp.git
wangzengliang1
temp
temp
master

搜索帮助