From 4dd507e56e7ea4d5491f5b20b40ca5fff66b47f2 Mon Sep 17 00:00:00 2001 From: wo_cow Date: Thu, 17 Jul 2025 15:31:19 +0800 Subject: [PATCH] feat: add net qos based on oncn-bwm --- docs/config.md | 19 +++- docs/feature.md | 15 ++- pkg/common/constant/constant.go | 2 + pkg/services/preemption/net.go | 106 +++++++++++++++++++++ pkg/services/preemption/oncn_bwm.go | 99 +++++++++++++++++++ pkg/services/preemption/preemption.go | 70 ++++++++++++-- pkg/services/preemption/preemption_test.go | 20 ++-- 7 files changed, 307 insertions(+), 24 deletions(-) create mode 100644 pkg/services/preemption/net.go create mode 100644 pkg/services/preemption/oncn_bwm.go diff --git a/docs/config.md b/docs/config.md index d472754..8ea64a8 100644 --- a/docs/config.md +++ b/docs/config.md @@ -51,8 +51,14 @@ OS/Arch: linux/amd64 "preemption": { "resource": [ "cpu", - "memory" - ] + "memory", + "net" + ], + "net": { + "waterline": 20, + "bandwidthLow": 10, + "bandwidthHigh": 100 + } }, "quotaTurbo": { "highWaterMark": 50, @@ -140,10 +146,15 @@ spec: ### preemption -`preemption`字段用于标识绝对抢占特性配置。目前,Preemption特性支持CPU和内存的绝对抢占,用户可以按需配置该字段,单独或组合使用资源的绝对抢占。 +`preemption`字段用于标识绝对抢占特性配置。目前,Preemption特性支持CPU,内存和网络的绝对抢占,用户可以按需配置该字段,单独或组合使用资源的绝对抢占。 + | 配置键[=默认值] | 类型 | 描述 | 可选值 | | --------------- | ---------- | -------------------------------- | ----------- | -| resource=[] | string数组 | 资源类型,声明何种资源需要被访问 | cpu, memory | +| resource=[] | string数组 | 资源类型,声明何种资源需要被访问 | cpu, memory, net | +| net | map | 网络带宽抢占配置 | | +| .waterline | int | 在线业务的水线(单位:MB) |[20, 9999*1024] | +| .bandwidthLow | int | 离线业务带宽下限(单位:MB) | [1, bandwidthHigh) | +| .bandwidthHigh | int | 离线业务带宽上限(单位:MB) | (bandwidthLow, 9999*1024] | ### dynCache diff --git a/docs/feature.md b/docs/feature.md index 81cc732..650b18e 100644 --- a/docs/feature.md +++ b/docs/feature.md @@ -2,7 +2,7 @@ 在rubik中,每一个特性以服务形式运行在后台。rubik根据用户配置(config.json)按需启动对应服务。下文是对各个服务的介绍。 ## preemption 绝对抢占 -rubik支持业务优先级配置,针对在离线业务混合部署的场景,确保在线业务相对离线业务的资源抢占。目前仅支持CPU资源和内存资源。使用该特性,用户需要手动为业务指定业务类型,即在业务pod的yaml文件中增加注解`volcano.sh/preemptable`。业务优先级配置示例如下: +rubik支持业务优先级配置,针对在离线业务混合部署的场景,确保在线业务相对离线业务的资源抢占。目前仅支持CPU资源,内存资源和网络资源。使用该特性,用户需要手动为业务指定业务类型,即在业务pod的yaml文件中增加注解`volcano.sh/preemptable`。业务优先级配置示例如下: ```yaml annotations: @@ -24,6 +24,19 @@ annotations: - 内核支持针对cgroup的memory优先级配置,memory子系统存在接口`memory.qos_level`。建议使用内核版本openEuler-22.03+。 - 开启内存优先级支持: `echo 1 > /proc/sys/vm/memcg_qos_enable` +### 网络绝对抢占 + +针对在离线业务混合部署的场景,确保在线业务相对离线业务的网络带宽资源抢占。 + +#### 前置条件 + +- 主机上已安装oncn-bwm并插入了bwm.ko,host上有如下文件:`/proc/qos/net_qos_enable`。 +- ```shell + 安装oncn-bwm步骤: + yum install -y oncn-bwm + insmod /lib/modules/bwm/bwm.ko + ``` + ## dynCache 内存带宽和LLC限制 rubik支持业务的Pod内存带宽(memory bandwidth)和LLC(Last Level Cache)限制,通过限制离线业务的内存带宽/LLC使用,减少其对在线业务的干扰。 diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index 06fc2e4..c77cc30 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -108,6 +108,8 @@ const ( CPUCgroupFileName = "cpu.qos_level" // MemoryCgroupFileName is name of cgroup file used for memory qos level setting MemoryCgroupFileName = "memory.qos_level" + // NetCgroupFileName is name of cgroup file used for net qos level setting + NetCgroupFileName = "net_cls.classid" // PSICPUCgroupFileName is name of cgroup file used for detecting cpu psi PSICPUCgroupFileName = "cpu.pressure" // PSIMemoryCgroupFileName is name of cgroup file used for detecting memory psi diff --git a/pkg/services/preemption/net.go b/pkg/services/preemption/net.go new file mode 100644 index 0000000..8e4e361 --- /dev/null +++ b/pkg/services/preemption/net.go @@ -0,0 +1,106 @@ +// Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved. +// rubik licensed under the Mulan PSL v2. +// You can use this software according to the terms and conditions of the Mulan PSL v2. +// You may obtain a copy of Mulan PSL v2 at: +// http://license.coscl.org.cn/MulanPSL2 +// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +// PURPOSE. +// See the Mulan PSL v2 for more details. +// Author: Niu Qianqian +// Create: 2025-07-01 +// Description: Integrating oncn-bwm features + +package preemption + +import ( + "fmt" + "os" + "strconv" + + "isula.org/rubik/pkg/common/constant" + "isula.org/rubik/pkg/core/typedef" +) + +const ( + minWaterline = 20 + maxWaterline = 9999 * 1024 + minBandwidth = 1 + maxBandwidth = 9999 * 1024 +) + +type NetConfig struct { + Waterline int `json:"waterline,omitempty"` + BandwidthLow int `json:"bandwidthLow,omitempty"` + BandwidthHigh int `json:"bandwidthHigh,omitempty"` +} + +func getNetLevelStr(qosLevel int) string { + if qosLevel == constant.Offline { + return "4294967295" // uint32(-1) + } + return "0" +} + +func validateNetResConf(conf *PreemptionConfig) error { + if !isSupportNetqos() { + return fmt.Errorf("this machine does not support net preemption, please install oncn-bwm first.") + } + + if conf.Net.Waterline < minWaterline || conf.Net.Waterline > maxWaterline { + return fmt.Errorf("net waterline %d out of range [%d,%d]", conf.Net.Waterline, minWaterline, maxWaterline) + } + + for _, per := range []int{ + conf.Net.BandwidthLow, conf.Net.BandwidthHigh} { + if per < minBandwidth || per > maxBandwidth { + return fmt.Errorf("net bandwidth %d out of range [%d,%d]", per, minBandwidth, maxBandwidth) + } + } + + if conf.Net.BandwidthLow >= conf.Net.BandwidthHigh { + return fmt.Errorf("net bandwidthLow is larger than bandwidthHigh") + } + + return nil +} + +func enableNetRes(pod *typedef.PodInfo) error { + var err error + var pid string + + if pid, err = getPodProcID(pod); err != nil { + return fmt.Errorf("failed to get Pod procID %s: %v", pid, err) + } + + if err = enablePodNetqos(pid); err != nil { + disablePodNetqos(pid) + return err + } + + return nil +} + +func initNetRes(conf *PreemptionConfig) error { + var err error + pid := strconv.Itoa(os.Getpid()) + defer func() { + if err != nil { + disablePodNetqos(pid) + } + }() + if err = enablePodNetqos(pid); err != nil { + return err + } + + // The bandwidth or waterline can be set only after netqos has been enabled at least once. + if err = setPodNetqosWaterline(conf.Net.Waterline); err != nil { + return err + } + + if err = setPodNetqosBandwidth(conf.Net.BandwidthLow, conf.Net.BandwidthHigh); err != nil { + return err + } + + return err +} diff --git a/pkg/services/preemption/oncn_bwm.go b/pkg/services/preemption/oncn_bwm.go new file mode 100644 index 0000000..d7dc1b8 --- /dev/null +++ b/pkg/services/preemption/oncn_bwm.go @@ -0,0 +1,99 @@ +// Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved. +// rubik licensed under the Mulan PSL v2. +// You can use this software according to the terms and conditions of the Mulan PSL v2. +// You may obtain a copy of Mulan PSL v2 at: +// http://license.coscl.org.cn/MulanPSL2 +// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +// PURPOSE. +// See the Mulan PSL v2 for more details. +// Author: Niu Qianqian +// Create: 2025-07-01 +// Description: Integrating oncn-bwm features + +package preemption + +import ( + "fmt" + "strconv" + "strings" + + "isula.org/rubik/pkg/common/util" + "isula.org/rubik/pkg/core/typedef" + "isula.org/rubik/pkg/core/typedef/cgroup" +) + +const ( + // Enable netqos in the specified process network namespace + // Usage: + // echo $pid > /proc/qos/net_qos_enable + netQosEnablePath = "/proc/qos/net_qos_enable" + // Disable netqos in the specified process network namespace + // Usage: + // echo $pid > /proc/qos/net_qos_disable + netQosDisablePath = "/proc/qos/net_qos_disable" + // Set/get the bandwidth of offline pod + // Usage: + // echo "$low,$high" > /proc/qos/net_qos_bandwidth + // cat /proc/qos/net_qos_bandwidth + netQosBandwidthPath = "/proc/qos/net_qos_bandwidth" + // Set/get the waterline of offline pod + // Usage: + // echo "$val" > /proc/qos/net_qos_waterline + // cat /proc/qos/net_qos_waterline + netQosWaterlinePath = "/proc/qos/net_qos_waterline" +) + +func isSupportNetqos() bool { + return util.PathExist(netQosEnablePath) +} + +func getPodProcID(pod *typedef.PodInfo) (string, error) { + var procID string + cgroupKey := &cgroup.Key{SubSys: "net_cls", FileName: "cgroup.procs"} + for _, container := range pod.IDContainersMap { + key := container.GetCgroupAttr(cgroupKey) + if key.Err != nil { + continue + } + procID = strings.Split(key.Value, "\n")[0] + procIDInt, err := strconv.Atoi(procID) + if err == nil && procIDInt != 0 { + return procID, nil + } + } + + return "", fmt.Errorf("failed to find valid proc") +} + +func enablePodNetqos(pid string) error { + if err := util.WriteFile(netQosEnablePath, pid); err != nil { + return fmt.Errorf("failed to write %s to file %s: %v", pid, netQosEnablePath, err) + } + return nil +} + +func disablePodNetqos(pid string) error { + if err := util.WriteFile(netQosDisablePath, pid); err != nil { + return fmt.Errorf("failed to write %s to file %s: %v", pid, netQosDisablePath, err) + } + return nil +} + +func setPodNetqosBandwidth(bandwidthLow, bandwidthHigh int) error { + bandwidthStr := strconv.Itoa(bandwidthLow) + "mb," + strconv.Itoa(bandwidthHigh) + "mb" + + if err := util.WriteFile(netQosBandwidthPath, bandwidthStr); err != nil { + return fmt.Errorf("failed to write %s to file %s: %v", bandwidthStr, netQosBandwidthPath, err) + } + return nil +} + +func setPodNetqosWaterline(waterline int) error { + waterlineStr := strconv.Itoa(waterline) + "mb" + + if err := util.WriteFile(netQosWaterlinePath, waterlineStr); err != nil { + return fmt.Errorf("failed to write %s to file %s: %v", waterlineStr, netQosWaterlinePath, err) + } + return nil +} diff --git a/pkg/services/preemption/preemption.go b/pkg/services/preemption/preemption.go index 6a231f9..1dc9190 100644 --- a/pkg/services/preemption/preemption.go +++ b/pkg/services/preemption/preemption.go @@ -27,9 +27,30 @@ import ( "isula.org/rubik/pkg/services/helper" ) -var supportCgroupTypes = map[string]*cgroup.Key{ - "cpu": {SubSys: "cpu", FileName: constant.CPUCgroupFileName}, - "memory": {SubSys: "memory", FileName: constant.MemoryCgroupFileName}, +type resOpt struct { + cgKey *cgroup.Key + getQosStr func(int) string + validateResConf func(*PreemptionConfig) error + initRes func(*PreemptionConfig) error + enableRes func(*typedef.PodInfo) error +} + +var supportCgroupTypes = map[string]resOpt{ + "cpu": { + cgKey: &cgroup.Key{SubSys: "cpu", FileName: constant.CPUCgroupFileName}, + getQosStr: func(qosLevel int) string { return strconv.Itoa(qosLevel) }, + }, + "memory": { + cgKey: &cgroup.Key{SubSys: "memory", FileName: constant.MemoryCgroupFileName}, + getQosStr: func(qosLevel int) string { return strconv.Itoa(qosLevel) }, + }, + "net": { + cgKey: &cgroup.Key{SubSys: "net_cls", FileName: constant.NetCgroupFileName}, + getQosStr: getNetLevelStr, + validateResConf: validateNetResConf, + initRes: initNetRes, + enableRes: enableNetRes, + }, } // Preemption define service which related to qos level setting @@ -40,7 +61,8 @@ type Preemption struct { // PreemptionConfig define which resources need to use the preemption type PreemptionConfig struct { - Resource []string `json:"resource,omitempty"` + Resource []string `json:"resource,omitempty"` + Net NetConfig `json:"net,omitempty"` } // PreemptionFactory is the factory os Preemption. @@ -80,6 +102,16 @@ func (q *Preemption) PreStart(viewer api.Viewer) error { if viewer == nil { return fmt.Errorf("invalid pods viewer") } + + for _, r := range q.config.Resource { + if supportCgroupTypes[r].initRes == nil { + continue + } + if err := supportCgroupTypes[r].initRes(&q.config); err != nil { + return err + } + } + for _, pod := range viewer.ListPodsWithOptions() { if err := q.SetQoSLevel(pod); err != nil { log.Errorf("failed to set the qos level for the previously started pod %v: %v", pod.Name, err) @@ -124,11 +156,12 @@ func (q *Preemption) DeletePod(_ *typedef.PodInfo) error { func (q *Preemption) validateConfig(pod *typedef.PodInfo) error { targetLevel := getQoSLevel(pod) for _, r := range q.config.Resource { - if err := pod.GetCgroupAttr(supportCgroupTypes[r]).Expect(targetLevel); err != nil { + resOpt := supportCgroupTypes[r] + if err := pod.GetCgroupAttr(resOpt.cgKey).Expect(resOpt.getQosStr(targetLevel)); err != nil { return fmt.Errorf("failed to validate the qos level configuration of pod %s: %v", pod.Name, err) } for _, container := range pod.IDContainersMap { - if err := container.GetCgroupAttr(supportCgroupTypes[r]).Expect(targetLevel); err != nil { + if err := container.GetCgroupAttr(resOpt.cgKey).Expect(resOpt.getQosStr(targetLevel)); err != nil { return fmt.Errorf("failed to validate the qos level configuration of container %s: %v", pod.Name, err) } } @@ -141,6 +174,16 @@ func (q *Preemption) SetQoSLevel(pod *typedef.PodInfo) error { if pod == nil { return fmt.Errorf("empty pod info") } + + for _, r := range q.config.Resource { + resOpt := supportCgroupTypes[r] + if resOpt.enableRes != nil { + if err := resOpt.enableRes(pod); err != nil { + return err + } + } + } + qosLevel := getQoSLevel(pod) if qosLevel == constant.Online { log.Infof("pod %s(%s) has already been set to online(%d)", pod.Name, pod.UID, qosLevel) @@ -149,12 +192,13 @@ func (q *Preemption) SetQoSLevel(pod *typedef.PodInfo) error { var errs error for _, r := range q.config.Resource { - if err := pod.SetCgroupAttr(supportCgroupTypes[r], strconv.Itoa(qosLevel)); err != nil { + resOpt := supportCgroupTypes[r] + if err := pod.SetCgroupAttr(resOpt.cgKey, resOpt.getQosStr(qosLevel)); err != nil { log.Warnf("failed to set %s-qos-level for pod %s: %v", r, pod.Name, err) errs = util.AppendErr(errs, err) } for _, container := range pod.IDContainersMap { - if err := container.SetCgroupAttr(supportCgroupTypes[r], strconv.Itoa(qosLevel)); err != nil { + if err := container.SetCgroupAttr(resOpt.cgKey, resOpt.getQosStr(qosLevel)); err != nil { log.Warnf("failed to set %s-qos-level for container %s: %v", r, container.Name, err) errs = util.AppendErr(errs, err) } @@ -184,9 +228,17 @@ func (conf *PreemptionConfig) Validate() error { return fmt.Errorf("empty resource preemption configuration") } for _, r := range conf.Resource { - if _, ok := supportCgroupTypes[r]; !ok { + resOpt, ok := supportCgroupTypes[r] + if !ok { return fmt.Errorf("does not support setting the %s subsystem", r) } + if resOpt.validateResConf == nil { + continue + } + if err := resOpt.validateResConf(conf); err != nil { + return err + } } + return nil } diff --git a/pkg/services/preemption/preemption_test.go b/pkg/services/preemption/preemption_test.go index d1d763c..6aa0879 100644 --- a/pkg/services/preemption/preemption_test.go +++ b/pkg/services/preemption/preemption_test.go @@ -60,8 +60,8 @@ func TestPreemptionAddFunc(t *testing.T) { fields: getCommonField([]string{"cpu", "memory"}), args: args{ new: try.GenFakeOfflinePod(map[*cgroup.Key]string{ - supportCgroupTypes["cpu"]: "0", - supportCgroupTypes["memory"]: "0", + supportCgroupTypes["cpu"].cgKey: "0", + supportCgroupTypes["memory"].cgKey: "0", }), }, }, @@ -70,21 +70,21 @@ func TestPreemptionAddFunc(t *testing.T) { fields: getCommonField([]string{"cpu", "memory"}), args: args{ new: try.GenFakeOnlinePod(map[*cgroup.Key]string{ - supportCgroupTypes["cpu"]: "0", - supportCgroupTypes["memory"]: "0", + supportCgroupTypes["cpu"].cgKey: "0", + supportCgroupTypes["memory"].cgKey: "0", }).WithContainers(containerNum), }, }, { name: "TC3-empty pod info", - fields: getCommonField([]string{"cpu", "memory"}), + fields: getCommonField([]string{"cpu", "memory", "net"}), wantErr: true, }, { name: "TC4-invalid annotation key", fields: getCommonField([]string{"cpu"}), args: args{ - new: try.GenFakeBestEffortPod(map[*cgroup.Key]string{supportCgroupTypes["cpu"]: "0"}), + new: try.GenFakeBestEffortPod(map[*cgroup.Key]string{supportCgroupTypes["cpu"].cgKey: "0"}), }, preHook: func(pod *try.FakePod) *try.FakePod { newPod := pod.DeepCopy() @@ -96,7 +96,7 @@ func TestPreemptionAddFunc(t *testing.T) { name: "TC5-invalid annotation value", fields: getCommonField([]string{"cpu"}), args: args{ - new: try.GenFakeBestEffortPod(map[*cgroup.Key]string{supportCgroupTypes["cpu"]: "0"}), + new: try.GenFakeBestEffortPod(map[*cgroup.Key]string{supportCgroupTypes["cpu"].cgKey: "0"}), }, preHook: func(pod *try.FakePod) *try.FakePod { newPod := pod.DeepCopy() @@ -134,7 +134,7 @@ func TestPreemptionUpdatePod(t *testing.T) { { name: "TC1-online to offline", fields: getCommonField([]string{"cpu"}), - args: args{old: try.GenFakeOnlinePod(map[*cgroup.Key]string{supportCgroupTypes["cpu"]: "0"}).WithContainers(3)}, + args: args{old: try.GenFakeOnlinePod(map[*cgroup.Key]string{supportCgroupTypes["cpu"].cgKey: "0"}).WithContainers(3)}, preHook: func(pod *try.FakePod) *try.FakePod { newPod := pod.DeepCopy() newAnnotation := make(map[string]string, 0) @@ -146,7 +146,7 @@ func TestPreemptionUpdatePod(t *testing.T) { { name: "TC2-offline to online", fields: getCommonField([]string{"cpu"}), - args: args{old: try.GenFakeOfflinePod(map[*cgroup.Key]string{supportCgroupTypes["cpu"]: "0"})}, + args: args{old: try.GenFakeOfflinePod(map[*cgroup.Key]string{supportCgroupTypes["cpu"].cgKey: "0"})}, preHook: func(pod *try.FakePod) *try.FakePod { newPod := pod.DeepCopy() newAnnotation := make(map[string]string, 0) @@ -159,7 +159,7 @@ func TestPreemptionUpdatePod(t *testing.T) { { name: "TC3-online to online", fields: getCommonField([]string{"cpu"}), - args: args{old: try.GenFakeOnlinePod(map[*cgroup.Key]string{supportCgroupTypes["cpu"]: "0"})}, + args: args{old: try.GenFakeOnlinePod(map[*cgroup.Key]string{supportCgroupTypes["cpu"].cgKey: "0"})}, preHook: func(pod *try.FakePod) *try.FakePod { return pod.DeepCopy() }, -- Gitee