From 675f250a55436d3b2000c45e6b06c6c41ae6beef Mon Sep 17 00:00:00 2001 From: wo_cow Date: Wed, 16 Jul 2025 10:53:40 +0800 Subject: [PATCH] feat: add net qos based on oncn-bwm --- docs/config.md | 20 +++- docs/feature.md | 17 ++- pkg/common/constant/constant.go | 2 + pkg/services/preemption/oncn_bwm.go | 114 +++++++++++++++++++++ pkg/services/preemption/preemption.go | 91 +++++++++++++++- pkg/services/preemption/preemption_test.go | 8 +- 6 files changed, 237 insertions(+), 15 deletions(-) create mode 100644 pkg/services/preemption/oncn_bwm.go diff --git a/docs/config.md b/docs/config.md index d472754..2bbe3ce 100644 --- a/docs/config.md +++ b/docs/config.md @@ -51,8 +51,14 @@ OS/Arch: linux/amd64 "preemption": { "resource": [ "cpu", - "memory" - ] + "memory", + "net" + ], + "netQos": { + "waterline": 20, + "bandwidthLow": 10, + "bandwidthHigh": 100 + } }, "quotaTurbo": { "highWaterMark": 50, @@ -140,11 +146,15 @@ spec: ### preemption -`preemption`字段用于标识绝对抢占特性配置。目前,Preemption特性支持CPU和内存的绝对抢占,用户可以按需配置该字段,单独或组合使用资源的绝对抢占。 +`preemption`字段用于标识绝对抢占特性配置。目前,Preemption特性支持CPU,内存和网络的绝对抢占,用户可以按需配置该字段,单独或组合使用资源的绝对抢占。 + | 配置键[=默认值] | 类型 | 描述 | 可选值 | | --------------- | ---------- | -------------------------------- | ----------- | -| resource=[] | string数组 | 资源类型,声明何种资源需要被访问 | cpu, memory | - +| resource=[] | string数组 | 资源类型,声明何种资源需要被访问 | cpu, memory, net | +| netQos | map | 网络带宽抢占配置 | | +| .waterline | int | 在线业务的水线(单位:MB) |[20, 9999*1024] | +| .bandwidthLow | int | 离线业务带宽下限(单位:MB) | [1, 9999*1024] | +| .bandwidthHigh | int | 离线业务带宽上限(单位:MB) | [1, 9999*1024] | ### dynCache `dynCache`字段用于标识支持Pod内存带宽和LLC限制特性配置。`l3Percent`字段用于标识最后一级缓存(LLC)水位控制线,`memBandPercent`字段用于标识内存带宽(MB)水位控制线。 diff --git a/docs/feature.md b/docs/feature.md index 81cc732..8b038eb 100644 --- a/docs/feature.md +++ b/docs/feature.md @@ -2,7 +2,7 @@ 在rubik中,每一个特性以服务形式运行在后台。rubik根据用户配置(config.json)按需启动对应服务。下文是对各个服务的介绍。 ## preemption 绝对抢占 -rubik支持业务优先级配置,针对在离线业务混合部署的场景,确保在线业务相对离线业务的资源抢占。目前仅支持CPU资源和内存资源。使用该特性,用户需要手动为业务指定业务类型,即在业务pod的yaml文件中增加注解`volcano.sh/preemptable`。业务优先级配置示例如下: +rubik支持业务优先级配置,针对在离线业务混合部署的场景,确保在线业务相对离线业务的资源抢占。目前仅支持CPU资源,内存资源和网络资源。使用该特性,用户需要手动为业务指定业务类型,即在业务pod的yaml文件中增加注解`volcano.sh/preemptable`。业务优先级配置示例如下: ```yaml annotations: @@ -24,7 +24,20 @@ annotations: - 内核支持针对cgroup的memory优先级配置,memory子系统存在接口`memory.qos_level`。建议使用内核版本openEuler-22.03+。 - 开启内存优先级支持: `echo 1 > /proc/sys/vm/memcg_qos_enable` -## dynCache 内存带宽和LLC限制 +### 网络绝对抢占 + +针对在离线业务混合部署的场景,确保在线业务相对离线业务的网络带宽资源抢占。 + +#### 前置条件 + +- 主机上已安装oncn-bwm并插入了bwm.ko,host上有如下文件:`/proc/qos/net_qos_enable`。 +- ```shell + 安装oncn-bwm步骤: + yum install oncn-bwm + insmod /lib/modules/bwm/bwm.ko + ``` + +## 安装dth)和LLC(Last Level Cache)限制,通过限制离线业务的内存带宽/LLC使用,减少其对在线业务的干扰。 rubik支持业务的Pod内存带宽(memory bandwidth)和LLC(Last Level Cache)限制,通过限制离线业务的内存带宽/LLC使用,减少其对在线业务的干扰。 该特性依赖于物理机支持intel RDT(x86)和mapm(arm)功能。其将集群中的业务划分为5个控制组(分别为`rubik_max`、`rubik_high`、`rubik_middle`、`rubik_low`、`rubik_dynamic`),每一个控制组会根据配置限制业务对访存待宽和最后一级缓存的使用。rubik启动后,将水位线写入对应控制组的schemata。其中,`rubik_high`、`rubik_middle`、`rubik_low`控制组对应的水位线是全局的,可在`dynCache`中配置;max控制组为默认最大值, dynamic控制组初始水位线和low控制组一致。 diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index 06fc2e4..c77cc30 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -108,6 +108,8 @@ const ( CPUCgroupFileName = "cpu.qos_level" // MemoryCgroupFileName is name of cgroup file used for memory qos level setting MemoryCgroupFileName = "memory.qos_level" + // NetCgroupFileName is name of cgroup file used for net qos level setting + NetCgroupFileName = "net_cls.classid" // PSICPUCgroupFileName is name of cgroup file used for detecting cpu psi PSICPUCgroupFileName = "cpu.pressure" // PSIMemoryCgroupFileName is name of cgroup file used for detecting memory psi diff --git a/pkg/services/preemption/oncn_bwm.go b/pkg/services/preemption/oncn_bwm.go new file mode 100644 index 0000000..342c7cd --- /dev/null +++ b/pkg/services/preemption/oncn_bwm.go @@ -0,0 +1,114 @@ +// Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved. +// rubik licensed under the Mulan PSL v2. +// You can use this software according to the terms and conditions of the Mulan PSL v2. +// You may obtain a copy of Mulan PSL v2 at: +// http://license.coscl.org.cn/MulanPSL2 +// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +// PURPOSE. +// See the Mulan PSL v2 for more details. +// Author: Niu Qianqian +// Create: 2025-07-01 +// Description: Integrating oncn-bwm features + +package preemption + +import ( + "fmt" + "strconv" + "strings" + + "isula.org/rubik/pkg/common/util" + "isula.org/rubik/pkg/core/typedef" + "isula.org/rubik/pkg/core/typedef/cgroup" +) + +var ( + netclsKey = &cgroup.Key{SubSys: "net_cls", FileName: "net_cls.classid"} +) + +const ( + // Enable netqos in the specified process network namespace + // Usage: + // echo $pid > /proc/qos/net_qos_enable + netQosEnablePath = "/proc/qos/net_qos_enable" + // Disable netqos in the specified process network namespace + // Usage: + // echo $pid > /proc/qos/net_qos_disable + netQosDisablePath = "/proc/qos/net_qos_disable" + // Set/get the bandwidth of offline pod + // Usage: + // echo "$low,$high" > /proc/qos/net_qos_bandwidth + // cat /proc/qos/net_qos_bandwidth + netQosBandwidthPath = "/proc/qos/net_qos_bandwidth" + // Set/get the waterline of offline pod + // Usage: + // echo "$val" > /proc/qos/net_qos_waterline + // cat /proc/qos/net_qos_waterline + netQosWaterlinePath = "/proc/qos/net_qos_waterline" +) + +func isSupportNetqos() bool { + return util.PathExist(netQosEnablePath) +} + +func getPodProcID(pod *typedef.PodInfo) (string, error) { + var procID string + cgroupKey := &cgroup.Key{SubSys: "net_cls", FileName: "cgroup.procs"} + for _, container := range pod.IDContainersMap { + key := container.GetCgroupAttr(cgroupKey) + if key.Err != nil { + return "", key.Err + } + procID = strings.Split(key.Value, "\n")[0] + break + } + + procIDInt, err := strconv.Atoi(procID) + if err != nil && procIDInt == 0 { + return "", fmt.Errorf("get Pod ProcID %v err: %v", procID, err) + } + + return procID, nil +} + +func enablePodNetqos(pod *typedef.PodInfo) error { + pid, err := getPodProcID(pod) + if err != nil { + return fmt.Errorf("failed to get Pod procID %s: %v", pid, err) + } + + if err := util.WriteFile(netQosEnablePath, pid); err != nil { + return fmt.Errorf("failed to write %s to file %s: %v", pid, netQosEnablePath, err) + } + return nil +} + +func disablePodNetqos(pod *typedef.PodInfo) error { + pid, err := getPodProcID(pod) + if err != nil { + return fmt.Errorf("failed to get Pod procID %s: %v", pid, err) + } + if err := util.WriteFile(netQosDisablePath, pid); err != nil { + return fmt.Errorf("failed to write %s to file %s: %v", pid, netQosDisablePath, err) + } + return nil +} + +func setPodNetqosBandwidth(bandwidthLow, bandwidthHigh int) error { + bandwidthStr := strconv.Itoa(bandwidthLow) + "mb," + strconv.Itoa(bandwidthHigh) + "mb" + + if err := util.WriteFile(netQosBandwidthPath, "10mb,100mb"); err != nil { + return fmt.Errorf("failed to write %s to file %s: %v", bandwidthStr, netQosBandwidthPath, err) + } + return nil +} + +func setPodNetqosWaterline(waterline int) error { + waterlineStr := strconv.Itoa(waterline) + "mb" + + if err := util.WriteFile(netQosWaterlinePath, "20mb"); err != nil { + return fmt.Errorf("failed to write %s to file %s: %v", waterlineStr, netQosWaterlinePath, err) + } + return nil +} diff --git a/pkg/services/preemption/preemption.go b/pkg/services/preemption/preemption.go index 6a231f9..ef24e10 100644 --- a/pkg/services/preemption/preemption.go +++ b/pkg/services/preemption/preemption.go @@ -27,9 +27,22 @@ import ( "isula.org/rubik/pkg/services/helper" ) +var ( + netqosEnable bool = false + netqosInited bool = false +) + +const ( + minWaterline = 20 + maxWaterline = 9999 * 1024 + minBandwidth = 1 + maxBandwidth = 9999 * 1024 +) + var supportCgroupTypes = map[string]*cgroup.Key{ "cpu": {SubSys: "cpu", FileName: constant.CPUCgroupFileName}, "memory": {SubSys: "memory", FileName: constant.MemoryCgroupFileName}, + "net": {SubSys: "net_cls", FileName: constant.NetCgroupFileName}, } // Preemption define service which related to qos level setting @@ -38,9 +51,16 @@ type Preemption struct { config PreemptionConfig } +type NetQosConfig struct { + Waterline int `json:"waterline,omitempty"` + BandwidthLow int `json:"bandwidthLow,omitempty"` + BandwidthHigh int `json:"bandwidthHigh,omitempty"` +} + // PreemptionConfig define which resources need to use the preemption type PreemptionConfig struct { - Resource []string `json:"resource,omitempty"` + Resource []string `json:"resource,omitempty"` + NetQos NetQosConfig `json:"netQos,omitempty"` } // PreemptionFactory is the factory os Preemption. @@ -80,6 +100,7 @@ func (q *Preemption) PreStart(viewer api.Viewer) error { if viewer == nil { return fmt.Errorf("invalid pods viewer") } + for _, pod := range viewer.ListPodsWithOptions() { if err := q.SetQoSLevel(pod); err != nil { log.Errorf("failed to set the qos level for the previously started pod %v: %v", pod.Name, err) @@ -119,16 +140,28 @@ func (q *Preemption) DeletePod(_ *typedef.PodInfo) error { return nil } +func getQosLevelStr(r string, qosLevel int) string { + var qosLevelStr string + + if r == "net" && qosLevel == constant.Offline { + qosLevelStr = "4294967295" // uint32(-1) + } else { + qosLevelStr = strconv.Itoa(qosLevel) + } + return qosLevelStr +} + // validateConfig will validate pod's qos level between value from // cgroup file and the one from pod info func (q *Preemption) validateConfig(pod *typedef.PodInfo) error { targetLevel := getQoSLevel(pod) for _, r := range q.config.Resource { - if err := pod.GetCgroupAttr(supportCgroupTypes[r]).Expect(targetLevel); err != nil { + qosLevelStr := getQosLevelStr(r, targetLevel) + if err := pod.GetCgroupAttr(supportCgroupTypes[r]).Expect(qosLevelStr); err != nil { return fmt.Errorf("failed to validate the qos level configuration of pod %s: %v", pod.Name, err) } for _, container := range pod.IDContainersMap { - if err := container.GetCgroupAttr(supportCgroupTypes[r]).Expect(targetLevel); err != nil { + if err := container.GetCgroupAttr(supportCgroupTypes[r]).Expect(qosLevelStr); err != nil { return fmt.Errorf("failed to validate the qos level configuration of container %s: %v", pod.Name, err) } } @@ -141,6 +174,29 @@ func (q *Preemption) SetQoSLevel(pod *typedef.PodInfo) error { if pod == nil { return fmt.Errorf("empty pod info") } + + if netqosEnable { + err := enablePodNetqos(pod) + if err != nil { + disablePodNetqos(pod) + return err + } + + // The bandwidth or waterline can be set only after netqos has been enabled at least once. + if !netqosInited { + if err := setPodNetqosWaterline(q.config.NetQos.Waterline); err != nil { + disablePodNetqos(pod) + return err + } + if err := setPodNetqosBandwidth(q.config.NetQos.BandwidthLow, q.config.NetQos.BandwidthHigh); err != nil { + disablePodNetqos(pod) + return err + } + + netqosInited = true + } + } + qosLevel := getQoSLevel(pod) if qosLevel == constant.Online { log.Infof("pod %s(%s) has already been set to online(%d)", pod.Name, pod.UID, qosLevel) @@ -149,12 +205,13 @@ func (q *Preemption) SetQoSLevel(pod *typedef.PodInfo) error { var errs error for _, r := range q.config.Resource { - if err := pod.SetCgroupAttr(supportCgroupTypes[r], strconv.Itoa(qosLevel)); err != nil { + qosLevelStr := getQosLevelStr(r, qosLevel) + if err := pod.SetCgroupAttr(supportCgroupTypes[r], qosLevelStr); err != nil { log.Warnf("failed to set %s-qos-level for pod %s: %v", r, pod.Name, err) errs = util.AppendErr(errs, err) } for _, container := range pod.IDContainersMap { - if err := container.SetCgroupAttr(supportCgroupTypes[r], strconv.Itoa(qosLevel)); err != nil { + if err := container.SetCgroupAttr(supportCgroupTypes[r], qosLevelStr); err != nil { log.Warnf("failed to set %s-qos-level for container %s: %v", r, container.Name, err) errs = util.AppendErr(errs, err) } @@ -186,7 +243,31 @@ func (conf *PreemptionConfig) Validate() error { for _, r := range conf.Resource { if _, ok := supportCgroupTypes[r]; !ok { return fmt.Errorf("does not support setting the %s subsystem", r) + } else if r == "net" { + if !isSupportNetqos() { + return fmt.Errorf("this machine does not support net preemption, please install oncn-bwm first.") + } else { + netqosEnable = true + } } } + + if netqosEnable { + if conf.NetQos.Waterline < minWaterline || conf.NetQos.Waterline > maxWaterline { + return fmt.Errorf("netQos waterline %d out of range [%d,%d]", conf.NetQos.Waterline, minWaterline, maxWaterline) + } + + for _, per := range []int{ + conf.NetQos.BandwidthLow, conf.NetQos.BandwidthHigh} { + if per < minBandwidth || per > maxBandwidth { + return fmt.Errorf("netQos bandwidth %d out of range [%d,%d]", per, minBandwidth, maxBandwidth) + } + } + + if conf.NetQos.BandwidthLow >= conf.NetQos.BandwidthHigh { + return fmt.Errorf("netQos bandwidthLow is larger than bandwidthHigh") + } + } + return nil } diff --git a/pkg/services/preemption/preemption_test.go b/pkg/services/preemption/preemption_test.go index d1d763c..235e346 100644 --- a/pkg/services/preemption/preemption_test.go +++ b/pkg/services/preemption/preemption_test.go @@ -57,27 +57,29 @@ func TestPreemptionAddFunc(t *testing.T) { var addFuncTC = []test{ { name: "TC1-set offline pod qos ok", - fields: getCommonField([]string{"cpu", "memory"}), + fields: getCommonField([]string{"cpu", "memory", "net"}), args: args{ new: try.GenFakeOfflinePod(map[*cgroup.Key]string{ supportCgroupTypes["cpu"]: "0", supportCgroupTypes["memory"]: "0", + supportCgroupTypes["net"]: "0", }), }, }, { name: "TC2-set online pod qos ok", - fields: getCommonField([]string{"cpu", "memory"}), + fields: getCommonField([]string{"cpu", "memory", "net"}), args: args{ new: try.GenFakeOnlinePod(map[*cgroup.Key]string{ supportCgroupTypes["cpu"]: "0", supportCgroupTypes["memory"]: "0", + supportCgroupTypes["net"]: "0", }).WithContainers(containerNum), }, }, { name: "TC3-empty pod info", - fields: getCommonField([]string{"cpu", "memory"}), + fields: getCommonField([]string{"cpu", "memory", "net"}), wantErr: true, }, { -- Gitee