diff --git a/source/tools/monitor/unity/beaver/export.lua b/source/tools/monitor/unity/beaver/export.lua index e1436c60c8c6a2f51227d5cba9ed833fc26f434e..fe5ebbef88c2903c33efb99ef560681c5af61c6e 100644 --- a/source/tools/monitor/unity/beaver/export.lua +++ b/source/tools/monitor/unity/beaver/export.lua @@ -11,15 +11,6 @@ require("common.class") local Cexport = class("Cexport") -function Cexport:_init_(instance, fYaml) - self._instance = instance - local ms = system:parseYaml(fYaml) - self._freq = ms.config.freq - self._tDescr = ms.metrics - self._fox = CfoxTSDB.new() - self._fox:_setupRead() -end - local function qFormData(from, tData) local res = {} local len = #tData @@ -38,6 +29,19 @@ local function qFormData(from, tData) return res end +local function packLine_us(title, ls, v, time) + local tLs = {} + for k, v in pairs(ls) do + table.insert(tLs, string.format("%s=\"%s\"", k , v)) + end + local label = "" + if #tLs then + label = pystring:join(",", tLs) + label = "{" .. label .. "}" + end + return string.format("%s%s %.1f %d", title, label, v, time/1000) +end + local function packLine(title, ls, v) local tLs = {} for k, v in pairs(ls) do @@ -51,6 +55,21 @@ local function packLine(title, ls, v) return string.format("%s%s %.1f", title, label, v) end +function Cexport:_init_(instance, fYaml) + self._instance = instance + local ms = system:parseYaml(fYaml) + self._freq = ms.config.freq + self._timestamps = ms.config.real_timestamps + if self._timestamps == true then + self.pack_line = packLine_us + else + self.pack_line = packLine + end + self._tDescr = ms.metrics + self._fox = CfoxTSDB.new() + self._fox:_setupRead() +end + function Cexport:export() local qs = {} self._fox:resize() @@ -74,7 +93,7 @@ function Cexport:export() labels.instance = self._instance for k, v in pairs(tFrom.values) do labels[line.head] = k - table.insert(res, packLine(title, labels, v)) + table.insert(res, self.pack_line(title, labels, v, tFrom.time)) end end end diff --git a/source/tools/monitor/unity/beaver/identity.lua b/source/tools/monitor/unity/beaver/identity.lua index 27582ac6379291c6ded83aea88a6f86f7ed603e5..427ca0ae64f12db3c9b417b62c840abea076cdf4 100644 --- a/source/tools/monitor/unity/beaver/identity.lua +++ b/source/tools/monitor/unity/beaver/identity.lua @@ -41,11 +41,11 @@ function Cidentity:hostip() end function Cidentity:curl() - if self._opts.curl then + if self._opts.url then local ChttpCli = require("httplib.httpCli") local cli = ChttpCli.new() - local res = cli:get(self._opts.curl) + local res = cli:get(self._opts.url) return res.body else return "None" @@ -80,4 +80,4 @@ function Cidentity:id() return self._funcs[self._opts.mode]() end -return Cidentity \ No newline at end of file +return Cidentity diff --git a/source/tools/monitor/unity/beaver/localBeaver.lua b/source/tools/monitor/unity/beaver/localBeaver.lua index 1f253c64c4033575a4c38301f6cadb7ad77852f7..dfa99eda209fffb74f312397a4d014078692b774 100644 --- a/source/tools/monitor/unity/beaver/localBeaver.lua +++ b/source/tools/monitor/unity/beaver/localBeaver.lua @@ -17,12 +17,17 @@ local function setupServer(fYaml) local port = config["port"] or 8400 local ip = config["bind_addr"] or "0.0.0.0" local backlog = config["backlog"] or 32 - return port, ip, backlog + local unix_socket = config["unix_socket"] + return port, ip, backlog,unix_socket end function CLocalBeaver:_init_(frame, fYaml) - local port, ip, backlog = setupServer(fYaml) - self._bfd = self:_install_fd(port, ip, backlog) + local port, ip, backlog, unix_socket = setupServer(fYaml) + if not unix_socket then + self._bfd = self:_install_fd(port, ip, backlog) + else + self._bfd = self:_install_fd_unisock(backlog, unix_socket) + end self._efd = self:_installFFI() self._cos = {} @@ -107,6 +112,31 @@ local function localBind(fd, tPort) system:posixError(string.format("bind port %d failed.", tPort.port), err, errno) end +function CLocalBeaver:_install_fd_unisock(backlog,unix_socket) + local fd, res, err, errno + unistd.unlink(unix_socket) + fd, err, errno = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) + if fd then -- for socket + local tPort = {family=socket.AF_UNIX, path=unix_socket} + local r, msg = pcall(localBind, fd, tPort) + if r then + res, err, errno = socket.listen(fd, backlog) + if res then -- for listen + return fd + else + unistd.close(fd) + system:posixError("socket listen failed", err, errno) + end + else + print(msg) + unistd.close(fd) + os.exit(1) + end + else -- socket failed + system:posixError("create socket failed", err, errno) + end +end + function CLocalBeaver:_install_fd(port, ip, backlog) local fd, res, err, errno fd, err, errno = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) diff --git a/source/tools/monitor/unity/collector/loop.lua b/source/tools/monitor/unity/collector/loop.lua index 1b2781815c62048f500f0497c54ee623e985d429..10cae870e69f24f6baccba4b10d9a8cf14414168 100644 --- a/source/tools/monitor/unity/collector/loop.lua +++ b/source/tools/monitor/unity/collector/loop.lua @@ -18,6 +18,7 @@ local CprocSnmpStat = require("collector.proc_snmp_stat") local CprocMounts = require("collector.proc_mounts") local CprocStatm = require("collector.proc_statm") local CprocBuddyinfo = require("collector.proc_buddyinfo") +local CPodAlloc = require("collector.pod_allocpage") local Cplugin = require("collector.plugin") local system = require("common.system") @@ -38,6 +39,7 @@ function Cloop:_init_(que, proto_q, fYaml) CprocMounts.new(self._proto, procffi, res.config.proc_path), CprocStatm.new(self._proto, procffi, res.config.proc_path), CprocBuddyinfo.new(self._proto, procffi, res.config.proc_path), + CPodAlloc.new(self._proto, procffi, res.config.proc_path), } self._plugin = Cplugin.new(self._proto, procffi, que, proto_q, fYaml) end diff --git a/source/tools/monitor/unity/collector/native/plugincffi.lua b/source/tools/monitor/unity/collector/native/plugincffi.lua index dccb08a4be62d614f5a3d7388fead58d9e82a3f5..037ba320191235b2a7cdead8373e7c9cdbbffbf7 100644 --- a/source/tools/monitor/unity/collector/native/plugincffi.lua +++ b/source/tools/monitor/unity/collector/native/plugincffi.lua @@ -38,6 +38,7 @@ int call(int t, struct unity_lines* lines); void deinit(void); void free(void *p); +int setns(int fd, int nstype); ]] return ffi diff --git a/source/tools/monitor/unity/collector/plugin.yaml b/source/tools/monitor/unity/collector/plugin.yaml index b682bbe39b82ac85307ec4126b04b70a5716228f..00de4a289bc83355d70e530f5568ddd2312aa15c 100644 --- a/source/tools/monitor/unity/collector/plugin.yaml +++ b/source/tools/monitor/unity/collector/plugin.yaml @@ -4,10 +4,13 @@ config: bind_addr: 0.0.0.0 # bind ip backlog: 32 # listen backlog identity: # support hostip, curl(need url arg), hostname, file(need path arg), specify(need name arg) - mode: specify - name: test_specify + mode: curl + url: "http://100.100.100.200/latest/meta-data/instance-id" +# name: test_specify # mode: hostip - proc_path: /mnt/host/ # in container mode, like -v /:/mnt/host , should use /mnt/host/ +# real_timestamps: true +# unix_socket: "/tmp/sysom_unity.sock" + proc_path: /mnt/home/ # in container mode, like -v /:/mnt/host , should use /mnt/host/ # proc_path: / # in container mode, like -v /:/mnt/host , should use /mnt/host/ outline: @@ -147,4 +150,8 @@ metrics: head: value help: "nosched:sys hold cpu and didn't scheduling" type: "gauge" - + # - title: sysak_pod_alloc + #from: pod_alloc + #head: value + #help: "get pod alloc page used" + #type: "gauge" diff --git a/source/tools/monitor/unity/collector/pod_allocpage.lua b/source/tools/monitor/unity/collector/pod_allocpage.lua new file mode 100644 index 0000000000000000000000000000000000000000..1d5f00ef8a4b10402dcab49454f4db9761e8f41b --- /dev/null +++ b/source/tools/monitor/unity/collector/pod_allocpage.lua @@ -0,0 +1,190 @@ +--- +--- Generated by EmmyLua(https://github.com/EmmyLua) +--- Created by liuxinwnei. +--- DateTime: 2023/02/08 17:00 PM +--- + +require("common.class") +local fcntl = require("posix.fcntl") +local unistd = require("posix.unistd") +local dirent = require("posix.dirent") +local stdlib = require("posix.stdlib") +local stat = require("posix.sys.stat") +local cjson = require("cjson") +local json = cjson.new() +local CkvProc = require("collector.kvProc") +local CvProc = require("collector.vproc") +local pystring = require("common.pystring") +local dockerinfo = require("common.dockerinfo") + +local CPodAlloc = class("podalloc", CkvProc) + +function CPodAlloc:_init_(proto, pffi, mnt, pFile) + CkvProc._init_(self, proto, pffi, mnt, pFile , "pod_alloc") + self._ffi = require("collector.native.plugincffi") + self.proc_fs, self.sys_fs, self.pods_fs, self.root_fs = dockerinfo:get_hostfs() + self.name_space = {} + self.pod_mem = {} + self.total = 0 +end + +function CPodAlloc:file_exists(file) + local f=stat.lstat(file) + if f ~= nil then + return true + else + return false + end +end + +function CPodAlloc:switch_ns(pid) + local pid_ns = self.proc_fs .. pid .. "/ns/net" + if not self:file_exists(pid_ns) then return end + + local f = fcntl.open(pid_ns,fcntl.O_RDONLY) + self._ffi.C.setns(f,0) + unistd.close(f) +end + +function CPodAlloc:get_container_info(did) + local res = "unknow" + local podname = did + local podns = did + local cname = did + + res = dockerinfo:get_inspect(did) + local restable = json.decode(res) + if #restable > 0 then + restable = restable[1] + end + if restable['Config'] then + local config = restable['Config'] + if config['Labels'] then + local label = config['Labels'] + if label['io.kubernetes.pod.name'] then + podname = label['io.kubernetes.pod.name'] + end + if label['io.kubernetes.container.name'] then + cname = label['io.kubernetes.container.name'] + end + if label['io.kubernetes.pod.namespace'] then + podns = label['io.kubernetes.pod.namespace'] + end + end + if podname == did and restable['Name'] then + cname = restable['Name'] + podname = restable['Name'] + end + elseif restable['status'] then + podname = restable['status']['labels']['io.kubernetes.pod.name'] + cname = restable['status']['labels']['io.kubernetes.container.name'] + podns = restable['status']['labels']['io.kubernetes.pod.namespace'] + end + if pystring:startswith(podname,"/") then podname=string.sub(podname,2,-1) end + if not self.pod_mem[podname] then + self.pod_mem[podname] = {} + self.pod_mem[podname]["allocpage"] = 0 + self.pod_mem[podname]["podns"] = podns + self.pod_mem[podname]["podname"] = podname + end + return podname +end + +function CPodAlloc:get_pidalloc() + local pods = {} + local dockerids = {} + for net,pidn in pairs(self.name_space) do + if pidn == "self" then pidn = "1" end + + self:switch_ns(pidn) + -- local env = posix.getenv() + -- env["PROC_ROOT"] = self.proc_fs + + stdlib.setenv("PROC_ROOT",self.proc_fs) + local pfile = io.popen("ss -anp","r") + io.input(pfile) + for line in io.lines() do + repeat + local proto,recv,task,pid = string.match(line,"(%S*)%s*%S*%s*(%d*).*users:%S*\"(%S*)\",pid=(%d*)") + if not proto or not recv or not task or not pid then break end + if proto ~="tcp" and proto ~="udp" and proto ~="raw" then break end + + recv = tonumber(recv) + + local dockerid = "" + if not dockerids[pid] then + dockerid = dockerinfo:get_dockerid(pid) + if dockerid == "unknow" then break end + dockerids[pid] = dockerid + else + dockerid = dockerids[pid] + end + + local podname = dockerid + if not pods[dockerid] then + podname = self:get_container_info(dockerid) + pods[dockerid] = podname + else + podname = pods[dockerid] + end + if recv < 1024 and podname == dockerid then break end + + if not self.pod_mem[podname] then + self.pod_mem[podname] = {} + self.pod_mem[podname]["allocpage"] = 0 + self.pod_mem[podname]["podns"] = podname + self.pod_mem[podname]["podname"] = podname + end + self.pod_mem[podname]["allocpage"] = self.pod_mem[podname]["allocpage"] + recv + self.total = self.total + recv + until true + end + pfile:close() + self:switch_ns("1") + stdlib.setenv("PROC_ROOT","") + end +end + +function CPodAlloc:scan_namespace() + local root = self.proc_fs + for pid in dirent.files(root) do + repeat + if pystring:startswith(pid,".") then break end + if not self:file_exists(self.proc_fs .. pid .. "/comm") then break end + + local proc_ns = self.proc_fs .. pid .. "/ns/net" + if not self:file_exists(proc_ns) then break end + + local slink = unistd.readlink(proc_ns) + if not slink then break end + if not string.find(slink,"net") then break end + + local inode = string.match(slink,"%[(%S+)%]") + if not inode then break end + + if not self.name_space[inode] then self.name_space[inode] = pystring:strip(pid) end + if not self:file_exists(root .. self.name_space[inode] .. "/comm") then self.name_space[inode] = pystring:strip(pid) end + until true + end +end + +function CPodAlloc:proc(elapsed, lines) + CvProc.proc(self) + self.name_space = {} + self.pod_mem = {} + self.total = 0 + self:scan_namespace() + self:get_pidalloc() + + for k,v in pairs(self.pod_mem) do + local cell = {{name="pod_allocpage", value=v['allocpage']/1024}} + local label = {{name="podname",index=v['podname'],}, {name="namespace",index = v['podns'],},} + self:appendLine(self:_packProto("pod_alloc", label, cell)) + end + local cell = {{name="pod_allocpage_total", value=self.total/1024}} + self:appendLine(self:_packProto("pod_alloc", nil, cell)) + + return self:push(lines) +end + +return CPodAlloc diff --git a/source/tools/monitor/unity/common/dockerinfo.lua b/source/tools/monitor/unity/common/dockerinfo.lua new file mode 100644 index 0000000000000000000000000000000000000000..9bbce2ced08f0681ad39532d23692996b615f81b --- /dev/null +++ b/source/tools/monitor/unity/common/dockerinfo.lua @@ -0,0 +1,104 @@ +--- +--- Generated by EmmyLua(https://github.com/EmmyLua) +--- Created by liuxinwnei. +--- DateTime: 2023/02/08 17:00 PM +--- + +dockerinfo = {} +local posix = require("posix") +local cjson = require("cjson") +local json = cjson.new() +local pystring = require("common.pystring") +local stat = require("posix.sys.stat") + +function file_exists(file) + local f=stat.lstat(file) + if f ~= nil then + return true + else + return false + end +end + +function dockerinfo:get_hostfs() + local proc_fs="/mnt/host/proc/" + local sys_fs="/mnt/host/sys/" + local pods_fs="/mnt/host/var/lib/kubelet/pods/" + local root_fs = "/mnt/host/" + if file_exists(proc_fs) then + return proc_fs, sys_fs, pods_fs, root_fs + end + proc_fs="/proc/" + sys_fs="/sys/" + pods_fs="/var/lib/kubelet/pods/" + root_fs = "/" + return proc_fs, sys_fs, pods_fs, root_fs +end + +function get_runtimesock() + local root_fs = "" + _, _, _, root_fs = dockerinfo:get_hostfs() + local runtime = "docker" + local runtime_sock = root_fs .. "var/run/docker.sock" + local sock={"var/run/docker.sock","run/containerd/containerd.sock", "var/run/dockershim.sock"} + for _,runtimex in pairs(sock) do + if file_exists(root_fs .. runtimex) then + runtime_sock = root_fs .. runtimex + if not string.find(runtime_sock,"docker.sock") then + runtime = "crictl" + end + end + end + return runtime,runtime_sock +end + +function dockerinfo:get_inspect(did) + local runtime,runtime_sock = get_runtimesock() + if runtime == "docker" then + return get_container_inspect(did) + else + return get_crictl_inspect(did) + end +end + +function dockerinfo:get_dockerid(pid) + local proc_fs = dockerinfo:get_hostfs() + local idstring = "unknow" + if not file_exists(proc_fs .. pid .. "/cgroup") then return idstring end + local cmd = "cat " .. proc_fs .. pid .. "/cgroup 2>/dev/null | grep memory:" + local pfile = io.popen(cmd,"r") + local res = pfile:read("*a") + pfile:close() + + if not string.find(res,"kubepods") and not string.find(res,"docker%-") then return idstring end + if string.find(res,"docker%-") then + idstring = pystring:split(res,"docker-")[2] + elseif string.find(res,"cri%-containerd%-") then + idstring = pystring:split(res,"cri-containerd-")[2] + else + local tmp = pystring:split(res,"/",10) + idstring = tmp[#tmp] + end + idstring = string.sub(idstring,0,8) + return idstring +end + +function get_container_inspect(did) + local runtime, runtime_sock = get_runtimesock() + local cmd = "curl --silent -XGET --unix-socket " .. runtime_sock .. " http://localhost/containers/" .. did .. "/json 2>/dev/null " + local f = io.popen(cmd,"r") + local res = f:read("*a") + f:close() + return res +end + +function get_crictl_inspect(did) + local runtime, runtime_sock = get_runtimesock() + local cmd = runtime .. " -r " .. runtime_sock .. " inspect " .. did .. " 2>/dev/null " + local f = io.popen(cmd,"r") + local res = f:read("*a") + f:close() + return res +end + +return dockerinfo