diff --git a/source/tools/monitor/unity/beaver/beaver.c b/source/tools/monitor/unity/beaver/beaver.c index 073927299d3602300bc43d3e3cfb97c2d1d1d6cf..8cb6b856e682926f61f4156266d85a9f8300d59a 100644 --- a/source/tools/monitor/unity/beaver/beaver.c +++ b/source/tools/monitor/unity/beaver/beaver.c @@ -18,13 +18,14 @@ extern int lua_reg_errFunc(lua_State *L); extern int lua_check_ret(int ret); int lua_load_do_file(lua_State *L, const char* path); -static int call_init(lua_State *L, int err_func, char *fYaml) { +static int call_init(lua_State *L, int err_func, struct beeQ* q, char *fYaml) { int ret; lua_Number lret; lua_getglobal(L, "init"); + lua_pushlightuserdata(L, q); lua_pushstring(L, fYaml); - ret = lua_pcall(L, 1, 1, err_func); + ret = lua_pcall(L, 2, 1, err_func); if (ret) { lua_check_ret(ret); goto endCall; @@ -64,7 +65,8 @@ void LuaAddPath(lua_State *L, char *name, char *value) { lua_pop(L, 2); } -static lua_State * echos_init(char *fYaml) { +extern int collector_qout(lua_State *L); +static lua_State * echos_init(struct beeQ* q, char *fYaml) { int ret; int err_func; @@ -80,12 +82,13 @@ static lua_State * echos_init(char *fYaml) { LuaAddPath(L, "path", "../beaver/?.lua"); err_func = lua_reg_errFunc(L); + lua_register(L, "collector_qout", collector_qout); ret = lua_load_do_file(L, "../beaver/beaver.lua"); if (ret) { goto endLoad; } - ret = call_init(L, err_func, fYaml); + ret = call_init(L, err_func, q, fYaml); if (ret < 0) { goto endCall; } @@ -131,11 +134,11 @@ static int echos(lua_State *L) { return ret; } -int beaver_init(char *fYaml) { +int beaver_init(struct beeQ* q, char *fYaml) { int ret = 0; while (ret == 0) { - lua_State *L = echos_init(fYaml); + lua_State *L = echos_init(q, fYaml); if (L == NULL) { break; } diff --git a/source/tools/monitor/unity/beaver/beaver.h b/source/tools/monitor/unity/beaver/beaver.h index f1a6fd3769312b1f0e90c252dc3d80da96547bf5..058fbb946edea7d79afcaa79e10e415de943a04d 100644 --- a/source/tools/monitor/unity/beaver/beaver.h +++ b/source/tools/monitor/unity/beaver/beaver.h @@ -5,6 +5,7 @@ #ifndef UNITY_BEAVER_H #define UNITY_BEAVER_H -int beaver_init(char *fYaml); +#include "../beeQ/beeQ.h" +int beaver_init(struct beeQ* q, char *fYaml); #endif //UNITY_BEAVER_H diff --git a/source/tools/monitor/unity/beaver/beaver.lua b/source/tools/monitor/unity/beaver/beaver.lua index 2431d99ce7bcc5069490f8d0d3510e55df6a4da3..f8287e1a045f77e25a8c6f3b3aab283d93861a02 100644 --- a/source/tools/monitor/unity/beaver/beaver.lua +++ b/source/tools/monitor/unity/beaver/beaver.lua @@ -19,13 +19,12 @@ local CbaseQuery = require("beaver.query.baseQuery") local lb = nil -function init(fYaml) +function init(que, fYaml) fYaml = fYaml or "../collector/plugin.yaml" - print(fYaml) local web = Cframe.new() CurlIndex.new(web) - CurlApi.new(web, fYaml) + CurlApi.new(web, que, fYaml) CurlRpc.new(web) CurlGuide.new(web) CbaseQuery.new(web, fYaml) diff --git a/source/tools/monitor/unity/beaver/guide/testjs.html b/source/tools/monitor/unity/beaver/guide/testjs.html new file mode 100644 index 0000000000000000000000000000000000000000..ac199064ac3885984c64d8869a14793340fc7620 --- /dev/null +++ b/source/tools/monitor/unity/beaver/guide/testjs.html @@ -0,0 +1,20 @@ + + + + + 菜鸟教程(runoob.com) + + + + +

我的第一个 JavaScript 程序

+

这是一个段落

+ + + + + \ No newline at end of file diff --git a/source/tools/monitor/unity/beaver/localBeaver.lua b/source/tools/monitor/unity/beaver/localBeaver.lua index e281593890f391b7bc4ba0b572ffdefae650055a..548bd94afa22c5cdd48e165545b0cd1359823102 100644 --- a/source/tools/monitor/unity/beaver/localBeaver.lua +++ b/source/tools/monitor/unity/beaver/localBeaver.lua @@ -162,7 +162,7 @@ function CLocalBeaver:_install_fd(port, ip, backlog) end function CLocalBeaver:read(fd, maxLen) - maxLen = maxLen or 1 * 1024 * 1024 -- signal conversation accept 1M stream max + maxLen = maxLen or 2 * 1024 * 1024 -- signal conversation accept 2M stream max local function readFd() local e = coroutine.yield() if e.ev_close > 0 then diff --git a/source/tools/monitor/unity/beaver/pushLine.lua b/source/tools/monitor/unity/beaver/pushLine.lua new file mode 100644 index 0000000000000000000000000000000000000000..846568ec6f0fadb5c64e01b1a56b939e382e91f4 --- /dev/null +++ b/source/tools/monitor/unity/beaver/pushLine.lua @@ -0,0 +1,58 @@ +--- +--- Generated by EmmyLua(https://github.com/EmmyLua) +--- Created by liaozhaoyan. +--- DateTime: 2023/4/24 01:34 +--- + +require("common.class") +local system = require("common.system") +local CprotoData = require("common.protoData") +local lineParse = require("common.lineParse") +local pystring = require("common.pystring") + +local CpushLine = class("CpushLine") + +function CpushLine:_init_(que) + self._proto = CprotoData.new(que) +end + +local function trans(title, ls, vs, log) + local labels = {} + local values = {} + local logs = {} + + local c = 0 + for k, v in pairs(ls) do + c = c + 1 + labels[c] = {name=k, index=v} + end + c = 0 + for k, v in pairs(vs) do + c = c + 1 + values[c] = {name=k, value=v} + end + c = 0 + for k, v in pairs(log) do + c = c + 1 + logs[c] = {name=k, log=v} + end + return {line = title, ls = labels, vs = values, log = logs} +end + +function CpushLine:procLine(line) + return trans(lineParse.parse(line)) +end + +function CpushLine:procLines(stream) + local ss = pystring:split(stream, "\n") + local lines = self._proto:protoTable() + + for _, line in ipairs(ss) do + table.insert(lines.lines, self:procLine(line)) + end + local bytes = self._proto:encode(lines) + self._proto:que(bytes) + return 0 +end + +return CpushLine \ No newline at end of file diff --git a/source/tools/monitor/unity/beaver/url_api.lua b/source/tools/monitor/unity/beaver/url_api.lua index d45281c1b1ddc863956082d2a3b223eac4874ffb..c03d824ebec43a6dcb51b4487de6b23e9b448fa2 100644 --- a/source/tools/monitor/unity/beaver/url_api.lua +++ b/source/tools/monitor/unity/beaver/url_api.lua @@ -9,21 +9,32 @@ local system = require("common.system") local ChttpApp = require("httplib.httpApp") local CfoxTSDB = require("tsdb.foxTSDB") local postQue = require("beeQ.postQue.postQue") +local CpushLine = require("beaver.pushLine") local CurlApi = class("urlApi", ChttpApp) -function CurlApi:_init_(frame, fYaml) +function CurlApi:_init_(frame, que, fYaml) ChttpApp._init_(self) + self._pushLine = CpushLine.new(que) self._urlCb["/api/sum"] = function(tReq) return self:sum(tReq) end self._urlCb["/api/sub"] = function(tReq) return self:sub(tReq) end self._urlCb["/api/query"] = function(tReq) return self:query(tReq) end - self._urlCb["/api/que"] = function(tReq) return self:que(tReq) end - self._urlCb["/api/trig"] = function(tReq) return self:que(tReq) end + self._urlCb["/api/trig"] = function(tReq) return self:trig(tReq) end + self._urlCb["/api/line"] = function(tReq) return self:line(tReq) end self:_install(frame) self:_setupQs(fYaml) end -function CurlApi:que(tReq) +function CurlApi:line(tReq) + local stat, _ = pcall(self._pushLine.procLines, self._pushLine, tReq.data) + if stat then + return "ok" + else + return "bad line " .. tReq.data, 400 + end +end + +function CurlApi:trig(tReq) local stat, tJson = pcall(self.getJson, self, tReq) if stat and tJson then local s = self:jencode(tJson) diff --git a/source/tools/monitor/unity/beeQ/apps.c b/source/tools/monitor/unity/beeQ/apps.c index ca81e6bef7f00f5a2c548955bddce78d9c4baef1..40e37b57efc2196544691a1694c855a2e889bb76 100644 --- a/source/tools/monitor/unity/beeQ/apps.c +++ b/source/tools/monitor/unity/beeQ/apps.c @@ -7,6 +7,7 @@ #include #include #include +#include #include "beeQ.h" #include "apps.h" #include "pushTo.h" @@ -241,6 +242,11 @@ static int lua_setup_daemon(lua_State *L) { return 1; } +static int prctl_death_kill(lua_State *L) { + prctl(PR_SET_PDEATHSIG, SIGKILL); + return 0; +} + int collector_qout(lua_State *L) { int ret; struct beeQ* q = (struct beeQ*) lua_topointer(L, 1); @@ -281,6 +287,7 @@ static int app_collector_work(void* q, void* proto_q) { lua_register(L, "collector_qout", collector_qout); lua_register(L, "lua_local_clock", lua_local_clock); lua_register(L, "lua_setup_daemon", lua_setup_daemon); + lua_register(L, "prctl_death_kill", prctl_death_kill); ret = lua_load_do_file(L, "../beeQ/collectors.lua"); if (ret) { diff --git a/source/tools/monitor/unity/beeQ/bees.c b/source/tools/monitor/unity/beeQ/bees.c index 88239bfb7bdf911f5ac06d40d97f80fbd3779c92..950c4bc9b36d01b2b8afeaf0c4ae7d21c8b53281 100644 --- a/source/tools/monitor/unity/beeQ/bees.c +++ b/source/tools/monitor/unity/beeQ/bees.c @@ -91,7 +91,7 @@ int main(int argc, char *argv[]) { if (pid_outline == 0) { exit(1); } - beaver_init(g_yaml_file); + beaver_init(q, g_yaml_file); fprintf(stderr, "loop exit."); beeQ_stop(q); diff --git a/source/tools/monitor/unity/beeQ/collectors.lua b/source/tools/monitor/unity/beeQ/collectors.lua index 5e97d982141e571f7ba6e521bec06f436a135681..10206def303eba01448380f1b534a4895f0c5f85 100644 --- a/source/tools/monitor/unity/beeQ/collectors.lua +++ b/source/tools/monitor/unity/beeQ/collectors.lua @@ -121,18 +121,12 @@ local function setupPostEngine(que, proto_q, fYaml, tid) return w, 1 end -local function setupIODiagnose(que, proto_q, fYaml, tid) - local CioDiagnose = require("collector.io.io_diagnose") - local w = CioDiagnose.new(que, proto_q, fYaml, tid) - return w, 1 -end - function work(que, proto_q, yaml, tid) local fYaml = yaml or "../collector/plugin.yaml" checkSos() local e = CrbEvent.new() - local main, engine, io, unit + local main, engine, unit main, unit = setupMainCollector(que, proto_q, fYaml, tid) e:addEvent("mainCollector", main, unit) @@ -140,7 +134,5 @@ function work(que, proto_q, yaml, tid) engine:setTask(main.postPlugin.tasks) e:addEvent("postEngine", engine, unit) - io, unit = setupIODiagnose(que, proto_q, fYaml, tid) - e:addEvent("mainCollector", io, unit, true) return e:proc() end diff --git a/source/tools/monitor/unity/beeQ/pack.sh b/source/tools/monitor/unity/beeQ/pack.sh index 57f6e8a1520cad32a0c34bc88569b532f7821714..42eea82828dc5695dbb2d61501afce042e1fb55f 100755 --- a/source/tools/monitor/unity/beeQ/pack.sh +++ b/source/tools/monitor/unity/beeQ/pack.sh @@ -43,13 +43,17 @@ mkdir ${APP}/collector mkdir ${APP}/collector/native mkdir ${APP}/collector/guard mkdir ${APP}/collector/outline +mkdir ${APP}/collector/postPlugin mkdir ${APP}/collector/postEngine +mkdir ${APP}/collector/io cp collector/native/*.so* ${APP}/collector/native/ cp collector/native/*.lua ${APP}/collector/native/ cp collector/*.lua ${APP}/collector/ cp collector/guard/*.lua ${APP}/collector/guard cp collector/outline/*.lua ${APP}/collector/outline +cp collector/postPlugin/*.lua ${APP}/collector/postPlugin cp collector/postEngine/*.lua ${APP}/collector/postEngine +cp collector/io/*.lua ${APP}/collector/io cp collector/plugin.yaml ${APP}/collector/ mkdir ${APP}/common diff --git a/source/tools/monitor/unity/beeQ/proto_queue.lua b/source/tools/monitor/unity/beeQ/proto_queue.lua index ecece0f1ab04fbcc11a73fb2cec5fb63f4cfdceb..91f296493e0e8cb4f78da573cc2d95b3a15c8efb 100644 --- a/source/tools/monitor/unity/beeQ/proto_queue.lua +++ b/source/tools/monitor/unity/beeQ/proto_queue.lua @@ -8,6 +8,8 @@ require("common.class") local CprotoData = require("common.protoData") local CprotoQueue = class("loop") +local cjson = require("cjson.safe") +local json = cjson.new() local system = require("common.system") function CprotoQueue:_init_(que) @@ -77,7 +79,6 @@ function CprotoQueue:_proc(unity_lines, lines) end function CprotoQueue:send(num, pline) - --print(string.format("proto que send a %d message.", num)) local unity_lines = self._ffi.new("struct unity_lines") local lines = self._proto:protoTable() unity_lines.num = num diff --git a/source/tools/monitor/unity/beeQ/pushTo.lua b/source/tools/monitor/unity/beeQ/pushTo.lua index 9daecebb58c510fc3bb939cdd3af90497554601f..bf301ffedc8bbb7ed1e97d450ef577e55268960c 100644 --- a/source/tools/monitor/unity/beeQ/pushTo.lua +++ b/source/tools/monitor/unity/beeQ/pushTo.lua @@ -8,14 +8,10 @@ package.path = package.path .. ";../?.lua;" local coCli = require("httplib.coCli") local coInflux = require("httplib.coInflux") -local unistd = require("posix.unistd") -local system = require("common.system") function work(fd, fYaml) - local res = system:parseYaml(fYaml) - local pushTo = res.pushTo local frame = coCli.new(fd) - local c = coInflux.new(pushTo.host, pushTo.port, pushTo.url) + local c = coInflux.new(fYaml) frame:poll(c) print("end push.") return 0 diff --git a/source/tools/monitor/unity/beeQ/rbtree/rbEvent.lua b/source/tools/monitor/unity/beeQ/rbtree/rbEvent.lua index bb904d01a6a556c8d42e460d9ebb178e847d613a..c9b45317037d83002ad251fdc545cf29d1021f8d 100644 --- a/source/tools/monitor/unity/beeQ/rbtree/rbEvent.lua +++ b/source/tools/monitor/unity/beeQ/rbtree/rbEvent.lua @@ -60,8 +60,7 @@ function CrbEvent:addEvent(name, obj, period, delay, loop) end function CrbEvent:_proc(now, node) - --print(now .. ": node " .. node.name .. " work") - local ret + local ret = -1 if node.obj.work then ret = node.obj:work(node.period, self) end diff --git a/source/tools/monitor/unity/beeQ/run.sh b/source/tools/monitor/unity/beeQ/run.sh index 9c23dcdd510933a747a26ef07e111ea320f80d48..23bf470225248e97a88bd1c2036a6b5b92cd68e2 100755 --- a/source/tools/monitor/unity/beeQ/run.sh +++ b/source/tools/monitor/unity/beeQ/run.sh @@ -7,7 +7,7 @@ export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:../beaver/ export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:../../install/ export LUA_PATH="../../lua/?.lua;../../lua/?/init.lua;" -export LUA_CPATH="../../lib/?.so;../../lib/loadall.so;" +export LUA_CPATH="./lib/?.so;../../lib/?.so;../../lib/loadall.so;" yaml_path=$1 [ ! $yaml_path ] && yaml_path="/etc/sysak/plugin.yaml" diff --git a/source/tools/monitor/unity/collector/execEngine/forkRun.lua b/source/tools/monitor/unity/collector/execEngine/forkRun.lua new file mode 100644 index 0000000000000000000000000000000000000000..5ca34e9d87f0bdff7279fe2a596eda475e1b9658 --- /dev/null +++ b/source/tools/monitor/unity/collector/execEngine/forkRun.lua @@ -0,0 +1,73 @@ +--- +--- Generated by EmmyLua(https://github.com/EmmyLua) +--- Created by liaozhaoyan. +--- DateTime: 2023/4/22 01:34 +--- + +--- +--- Generated by EmmyLua(https://github.com/EmmyLua) +--- Created by liaozhaoyan. +--- DateTime: 2023/4/12 00:01 +--- + +require("common.class") +local unistd = require("posix.unistd") +local pwait = require("posix.sys.wait") +local signal = require("posix.signal") +local pystring = require("common.pystring") +local system = require("common.system") +local CvProc = require("collector.vproc") + +local CforkRun = class("execBase", CvProc) + +local function run(cmd, args) + local pid, err = unistd.fork() + if pid > 0 then -- for self + return pid + elseif pid == 0 then -- for child + local errno + prctl_death_kill() + _, err, errno = unistd.exec(cmd, args) + assert(not errno, "exec failed." .. err .. errno) + else + error("fork report" .. err) + end +end + +local function kill(pid) + signal.kill(pid, signal.SIGKILL) -- force to kill task + pwait.wait(pid) -- wait task +end + +function CforkRun:_init_(opts, proto, pffi, mnt) + CvProc._init_(self, proto, pffi, mnt) + self._pid = run(opts.cmd, opts.args) + self._opts = opts + + print("fork run: ", self._pid) +end + +function CforkRun:_del_() + if self._pid then + kill(self._pid) + end + print("kill " .. self._pid) +end + +function CforkRun:proc(elapsed, lines) + local pid, stat, exit = pwait.wait(self._pid, pwait.WNOHANG) + if pid == nil then + error("wait failed " .. stat .. exit) + elseif exit then + print("mon thread exit " .. self._pid) + self._pid = nil + return -1 + end +end + +local function kill(pid) + signal.kill(pid, signal.SIGKILL) -- force to kill task + pwait.wait(pid) -- wait task +end + +return CforkRun diff --git a/source/tools/monitor/unity/collector/guard/calcJiffies.lua b/source/tools/monitor/unity/collector/guard/calcJiffies.lua index 5e12adfa2adec0af349f0cb8234db025fc409da7..922f345bcc7ab5c9d6ee6f6bef882741e0a2d40d 100644 --- a/source/tools/monitor/unity/collector/guard/calcJiffies.lua +++ b/source/tools/monitor/unity/collector/guard/calcJiffies.lua @@ -56,7 +56,7 @@ function mod.calc(mnt, procffi) local comp = delta1 / delta2 if comp >= 1.1 or comp < 0.9 then - errno("calculate jiffies failed.") + error("calculate jiffies failed.") end return (delta1 + delta2) * 2.5 / nproc() diff --git a/source/tools/monitor/unity/collector/guard/guardSched.lua b/source/tools/monitor/unity/collector/guard/guardSched.lua index b8a73fd6cf6ec79816072a33268ec750f85d99e9..cccbc15eeb781db1a1f2aa93c67faaa3526fb733 100644 --- a/source/tools/monitor/unity/collector/guard/guardSched.lua +++ b/source/tools/monitor/unity/collector/guard/guardSched.lua @@ -24,9 +24,16 @@ function CguardSched:proc(t, lines) local start = lua_local_clock() -- unit us local stop = 0 local j1 = self._stat:jiffies() + local ret for i, obj in ipairs(self._procs) do - obj:proc(t, lines) + ret = obj:proc(t, lines) + + if ret == -1 then + table.insert(toRemove, i) + goto continue + end + stop = lua_local_clock() if stop - start >= self._limit then -- local j2 = self._stat:jiffies() @@ -35,6 +42,8 @@ function CguardSched:proc(t, lines) end end start = stop + + ::continue:: end if #toRemove > 0 then diff --git a/source/tools/monitor/unity/collector/guard/guardSelfStat.lua b/source/tools/monitor/unity/collector/guard/guardSelfStat.lua index 0564b2749c54bcaf7da7b52df93e8724b050bf9a..bbdc5e3ddab8e7f3a96b2693358dd810590c4e5c 100644 --- a/source/tools/monitor/unity/collector/guard/guardSelfStat.lua +++ b/source/tools/monitor/unity/collector/guard/guardSelfStat.lua @@ -34,6 +34,21 @@ function CguardSelfStat:_init_(proto, pffi, mnt, resYaml, jperiod) self._memLimit = resYaml.config.limit.mem * 1024 * 1024 end +local function rssRssAnon() + local anon = 0 + + local f = io.open("/proc/self/status") + for line in f:lines() do + if pystring:startswith(line, "RssAnon:") then + local res = pystring:split(line) + anon = tonumber(res[2]) * 1024 + end + end + f:close() + + return anon +end + function CguardSelfStat:proc(elapsed, lines) CvProc.proc(self) @@ -46,7 +61,9 @@ function CguardSelfStat:proc(elapsed, lines) print("last cpu usage overflow." .. cpus) os.exit(1) end - if rss * 4096 > self._memLimit then + + local anon = rssRssAnon() + if anon > self._memLimit then print("last mem usage overflow." .. rss) os.exit(1) end @@ -62,12 +79,15 @@ function CguardSelfStat:proc(elapsed, lines) { name = "vsize", value = vsize - } - , + }, { name = "rss", value = rss * 4096 - } + }, + { + name = "anon", + value = anon, + }, } self:appendLine(self:_packProto("self_stat", nil, vs)) self:push(lines) diff --git a/source/tools/monitor/unity/collector/io/diskFifo.lua b/source/tools/monitor/unity/collector/io/diskFifo.lua new file mode 100644 index 0000000000000000000000000000000000000000..ac57e6edcb9e70f3d9d8fcdd6b89f558b725f591 --- /dev/null +++ b/source/tools/monitor/unity/collector/io/diskFifo.lua @@ -0,0 +1,75 @@ +--- +--- Generated by EmmyLua(https://github.com/EmmyLua) +--- Created by liaozhaoyan. +--- DateTime: 2023/4/12 15:51 +--- + +require("common.class") +local system = require("common.system") +local Cfifo = require("common.fifo") + +local CdiskFifo = class("diskFifo", Cfifo) + +function CdiskFifo:_init_(maxLen) + Cfifo._init_(self, maxLen) + self._nr = 0 +end + +function CdiskFifo:push(v) + Cfifo.push(self, v) + self._nr = self._nr + 1 +end + +function CdiskFifo:iowait() + local sum = 0 + local value + local cells = {} + local len = self:len() + local c = 0 + + if len < self:capacity() then + return + end + + for _, v in pairs(self.list) do + c = c + 1 + value = v.iowait + cells[c], sum = value, sum + value + end + return {max = math.max(unpack(cells)), + min = math.min(unpack(cells)), + nr = self._nr, + avg = sum / len, + last = value} +end + +function CdiskFifo:values(disk, key) + local c = 0 + local sum = 0 + local value + local cells = {} + local len = self:len() + + if len < self:capacity() then + return + end + + local d + for _, v in pairs(self.list) do + d = v[disk] + if d then + value = d[key] + c = c + 1 + cells[c], sum = value, sum + value + else -- not full + return + end + end + return {max = math.max(unpack(cells)), + min = math.min(unpack(cells)), + nr = self._nr, + avg = sum / len, + last = value} +end + +return CdiskFifo diff --git a/source/tools/monitor/unity/collector/io/exceptCheck.lua b/source/tools/monitor/unity/collector/io/exceptCheck.lua new file mode 100644 index 0000000000000000000000000000000000000000..8947e0bd4bf5eb8ee928aed55f1fcab9bc7eff3b --- /dev/null +++ b/source/tools/monitor/unity/collector/io/exceptCheck.lua @@ -0,0 +1,252 @@ +--- +--- Generated by EmmyLua(https://github.com/EmmyLua) +--- Created by liaozhaoyan. +--- DateTime: 2023/4/12 16:26 +--- + +require("common.class") +local system = require("common.system") +local CdiskFifo = require("collector.io.diskFifo") +local CexecptCheck = class("CexecptCheck") + +-- [[ v format +-- iowait: 1 +-- disk_a: { +-- iops = 1, +-- bps = 2, +-- qusize = 3, +-- await = 4, +-- util = 5, +-- } +-- ]] + +local fifoSize = 60 +local keys = {"iops", "bps", "qusize", "await", "util"} + +local function addItem() + return { + baseThresh = { + nrSample = 0, + curWinMinVal = 1e9, + curWinMaxVal = 0, + moveAvg = 0, + last = 0, + thresh = 0 + }, + compensation = { + thresh = 0, + shouldUpdThreshComp = true, + decRangeThreshAvg = 0, + decRangeCnt = 0, + minStableThresh = 1e9, + maxStableThresh = 0, + stableThreshAvg = 0, + nrStableThreshSample = 0, + }, + dynTresh = 1e9, + usedWin = 0, + } +end + +local function addItems() + local ret = {} + for _, key in ipairs(keys) do + ret[key] = addItem() + end + return ret +end + +function CexecptCheck:_init_(diag) + self._diag = diag + + self._fifo = CdiskFifo.new(fifoSize) + self._waitItem = addItem() + self._diskItem = {} + + self._cpuStatIowait = {sum = 0, iowait = 0} + self._uploadInter = 0 + self._exceptionStat = {system = {['IOwait-High'] = {cur = 0, max = 0}}} + self._dataStat = {system = {iowait= 0}} + + self._diagSwitch = { + diagIowait = { sw = self._diag.cfg.diagIowait, + esi = 'IOwait-High'}, + diagIoburst = { sw = self._diag.cfg.diagIoburst, + esi ='IO-Delay'}, + diagIolat = {sw = self._diag.cfg.diagIolat, + esi = 'IO-Burst'}, + diagIohang = {sw = self._diag.cfg.diagIohang, + esi = 'IO-Hang'} + } +end + +local function calcBase(item, vs) + local bt = item.baseThresh + + local min, max, avg, nr = vs.min, vs.max, vs.avg, vs.nr + + bt.nrSample = nr + bt.curWinMinVal = math.min(min, bt.curWinMinVal) + bt.curWinMaxVal = math.max(max, bt.curWinMaxVal) + bt.last = vs.last + + local nrThreshSample = nr + 1 - fifoSize + local thresh = math.max(max - avg, avg - min) + local threshAvg = (bt.thresh * (nrThreshSample - 1) + thresh) / nrThreshSample + bt.thresh = threshAvg + bt.moveAvg = avg + + local usedWin = item.usedWin + usedWin = usedWin + 1 + if usedWin >= fifoSize then + bt.curWinMinVal = 1e9 + bt.curWinMaxVal = 0 + item.usedWin = 0 + else + item.usedWin = usedWin + end + return thresh +end + +local function calcStableThresh(ct, curBaseThresh, curThresh) + local avg = ct.decRangeThreshAvg + + if (curThresh - avg) < ((curBaseThresh - avg) / 10.0) then + local nrStableThreshSample = ct.nrStableThreshSample + + local tSum = ct.stableThreshAvg * ct.nrStableThreshSample + curThresh + nrStableThreshSample = nrStableThreshSample + 1 + + ct.nrStableThreshSample = nrStableThreshSample + ct.stableThreshAvg = tSum / nrStableThreshSample + ct.minStableThresh = math.min(ct.minStableThresh, curThresh) + ct.maxStableThresh = math.max(ct.maxStableThresh, curThresh) + + if nrStableThreshSample > fifoSize * 1.5 then + ct.thresh = math.max(ct.stableThreshAvg - ct.minStableThresh, + ct.maxStableThresh - ct.stableThreshAvg) + ct.shouldUpdThreshComp = false + ct.minStableThresh = 1e9 + ct.maxStableThresh = 0 + ct.stableThreshAvg, ct.decRangeThreshAvg = 0, 0 + ct.nrStableThreshSample, ct.decRangeCnt = 0, 0 + end + end +end + +local function calcCompThresh(item, lastBaseThresh, curThresh) + local curBaseThresh = item.baseThresh.thresh + local ct = item.compensation + + if ct.shouldUpdThreshComp and (ct.thresh < curThresh or item.usedWin == 0) then + ct.thresh = curThresh + end + + if curBaseThresh < lastBaseThresh then + local decRangeCnt = ct.decRangeCnt + local decRangeThreshAvg = ct.decRangeThreshAvg + local tSum = decRangeThreshAvg * decRangeCnt + curThresh + + decRangeCnt = decRangeCnt + 1 + ct.decRangeThreshAvg = tSum / decRangeCnt + ct.decRangeCnt = decRangeCnt + + if decRangeCnt >= fifoSize * 1.5 then + calcStableThresh(ct, curBaseThresh, curThresh) + else + ct.minStableThresh = 1e9 + ct.maxStableThresh = 0 + ct.stableThreshAvg, ct.decRangeThreshAvg = 0, 0 + ct.nrStableThreshSample, ct.decRangeCnt = 0, 0 + end + end +end + +local function updateDynThresh(item, vs) + local bt = item.baseThresh + local ct = item.compensation + local lastBaseThresh = bt.thresh + local curThresh = calcBase(item, vs) + + calcCompThresh(item, lastBaseThresh, curThresh) + item.dynTresh = bt.thresh + bt.moveAvg + ct.thresh + --print("thresh: ", item.dynTresh) +end + +function CexecptCheck:calcs() + local iowaits = self._fifo:iowait() + if iowaits then + updateDynThresh(self._waitItem, iowaits) + self:checkIOwaitException(self._waitItem) + end + + local vs + for disk, item in pairs(self._diskItem) do + for _, key in ipairs(keys) do + vs = self._fifo:values(disk, key) + if vs then + updateDynThresh(item[key], vs) + end + end + end +end + +function CexecptCheck:addValue(v) + for disk, _ in pairs(v) do -- new iowait names one disk + if disk ~= "iowait" then + if not system:keyIsIn(self._diskItem, disk) then + self._diskItem[disk] = addItems() + end + end + end + for disk, _ in pairs(self._diskItem) do -- del + if not system:keyIsIn(v, disk) then + self._diskItem[disk] = nil + end + end + self._fifo:push(v) + self:calcs() +end + +local function disableThreshComp(item) + local ct = item.compensation + local bt = item.baseThresh + if ct.shouldUpdThreshComp then + ct.shouldUpdThreshComp = false + item.dynTresh = bt.thresh + bt.moveAvg + ct.thresh = 0.000001 + end +end + +function CexecptCheck:checkIOwaitException(item) + local iowait = item.baseThresh.last + local dataStat = self._dataStat.system + local es = self._exceptionStat.system['IOwait-High'] + local uploadInter = self._uploadInter + + if iowait >= self._diag.cfg.iowait then + disableThreshComp(item) + end + + dataStat.iowait = (dataStat.iowait * (uploadInter - 1) + iowait) / uploadInter + + local minThresh = self._diag.cfg.iowait + local iowaitThresh = math.max(item.dynTresh, minThresh) + if minThresh >= iowaitThresh then + es.cur = es.cur + 1 + local diagSW = self._diagSwitch.diagIowait + local rDiagValid = nil + end +end + +function CexecptCheck:checks() + local uploadInter = self._uploadInter + + if uploadInter % fifoSize == 0 then + self._uploadInter = 1 + else + self._uploadInter = uploadInter + 1 + end +end + +return CexecptCheck diff --git a/source/tools/monitor/unity/collector/io/io_diagnose.lua b/source/tools/monitor/unity/collector/io/io_diagnose.lua index 7c3fc5526921dd68c37dea588ec7f50a1bc35b74..5725acd96944f7e49ed17d1ee73eddbfc70435bc 100644 --- a/source/tools/monitor/unity/collector/io/io_diagnose.lua +++ b/source/tools/monitor/unity/collector/io/io_diagnose.lua @@ -11,13 +11,14 @@ local CioDiagnose = class("ioDiagnose", CvProto) local system = require("common.system") local procffi = require("collector.native.procffi") local pystring = require("common.pystring") +local CexceptCheck = require("collector.io.exceptCheck") local unistd = require("posix.unistd") -local function ioWait(fStat) - local data = procffi["ffi"].new("var_long_t") +local function readIoWait(fStat) + local data = procffi["ffi"].new("var_kvs_t") local stat = io.open(fStat, "r") local s = stat:read() - assert(procffi["cffi"].var_input_long(procffi["ffi"].string(s), data) == 0) + assert(procffi["cffi"].var_input_kvs(procffi["ffi"].string(s), data) == 0) stat:close() local sum = 0 for i = 0, data.no do @@ -36,7 +37,7 @@ local function distStat(fDisks) local disks = {} for line in io.lines(fDisks) do local s = pystring:split(line, nil, 3)[4] - local data = procffi["ffi"].new("var_kvs_t") + local data= procffi["ffi"].new("var_kvs_t") assert(procffi["cffi"].var_input_kvs(procffi["ffi"].string(s), data) == 0) local disk = procffi["ffi"].string(data.s) disks[disk] = data @@ -46,25 +47,32 @@ end local function pickValue(value) return { - rws = tonumber(value[1] + value[5]), - rwSecs = tonumber(value[3] + value[7]), - qusize = tonumber(value[11]), - rwTiks = tonumber(value[4] + value[8]), + iops = tonumber(value[0] + value[4]), -- 1 5 + bps = tonumber(value[2] + value[6]) * 512, -- 3 7 + qusize = tonumber(value[10]) / 1000.0, -- 11 + await = tonumber(value[3] + value[7]), -- 4 8 + util = tonumber(value[9]) / 10.0 -- 10 } end function CioDiagnose:_init_(que, proto_q, fYaml, tid) CvProto._init_(self, CprotoData.new(que)) self._tid = tid + local res = system:parseYaml(fYaml) - self.fStat = res.config.proc_path .. "proc/stat" - self.fDiskStat = res.config.proc_path .. "proc/diskstats" - self.dirSys = res.config.proc_path .. "sys/block/" - self._lastCpuTotal, self._lastCpuIO = ioWait(self.fStat) + local proc_path = res.config.proc_path + self.fStat = proc_path .. "proc/stat" + self.fDiskStat = proc_path .. "proc/diskstats" + self.dirSys = proc_path .. "sys/block/" + + self._lastCpuTotal, self._lastCpuIO = readIoWait(self.fStat) self._disks = {} self._disksLast = {} self:readProc() self:storeProc() + + local diag = res.ioDiagnose + self._check = CexceptCheck.new(diag) end function CioDiagnose:readProc() @@ -85,8 +93,8 @@ function CioDiagnose:storeProc() self._disks = {} end -function CioDiagnose:diff(t) - local res = {} +function CioDiagnose:diff(t, iowait) + local res = {iowait = iowait} for disk, value in pairs(self._disks) do local vLast = self._disksLast[disk] if vLast then @@ -94,6 +102,9 @@ function CioDiagnose:diff(t) for k, v in pairs(value) do vs[k] = (v - vLast[k]) / t end + if vs.iops > 0 then + vs.await = vs.await / vs.iops + end res[disk] = vs end end @@ -101,15 +112,14 @@ function CioDiagnose:diff(t) end function CioDiagnose:work(t) - local cpuTotal, cpuIO = ioWait(self.fStat) - --print( cpuTotal - self._lastCpuTotal, cpuIO - self._lastCpuIO) + local cpuTotal, cpuIO = readIoWait(self.fStat) + local iowait = (cpuIO - self._lastCpuIO) * 100 / (cpuTotal - self._lastCpuTotal) / t self._lastCpuTotal, self._lastCpuIO = cpuTotal, cpuIO self:readProc() - local res = self:diff(t) - --system:dumps(res) - --system:dumps(self._disks) - --system:dumps(self._disksLast) + self._check:checks() + local res = self:diff(t, iowait) + self._check:addValue(res) self:storeProc() end diff --git a/source/tools/monitor/unity/collector/loop.lua b/source/tools/monitor/unity/collector/loop.lua index 7ea663116fe66c98fe6c7de5070d4b7fd5da389d..933c87b8e2e3f1d0416d5af3242c3fcc957141ba 100644 --- a/source/tools/monitor/unity/collector/loop.lua +++ b/source/tools/monitor/unity/collector/loop.lua @@ -14,6 +14,7 @@ local CguardSched = require("collector.guard.guardSched") local CguardDaemon = require("collector.guard.guardDaemon") local CguardSelfStat = require("collector.guard.guardSelfStat") local CpostPlugin = require("collector.postPlugin.postPlugin") +local CforkRun = require("collector.execEngine.forkRun") local Cloop = class("loop") @@ -24,6 +25,7 @@ function Cloop:_init_(que, proto_q, fYaml, tid) self._proto = CprotoData.new(que) self._tid = tid self:loadLuaPlugin(res, res.config.proc_path) + self:forkRun(res) local jperiod = calcJiffies.calc(res.config.proc_path, procffi) -- self._guardSched = CguardSched.new(tid, self._procs, self._names, jperiod) @@ -49,6 +51,16 @@ function Cloop:loadLuaPlugin(res, proc_path) print("add " .. system:keyCount(self._procs) .. " lua plugin.") end +function Cloop:forkRun(res) + local runs = res.forkRun + local c = system:keyCount(self._procs) + for _, run in ipairs(runs) do + c = c + 1 + self._procs[c] = CforkRun.new(run, self._proto, procffi) + self._names[c] = run.cmd + end +end + function Cloop:work(t) local lines = self._proto:protoTable() diff --git a/source/tools/monitor/unity/collector/plugin.lua b/source/tools/monitor/unity/collector/plugin.lua index 38cdb271f5f5bc94c660704cb0b842fd4b8d553b..a7590294b6dabcc8fbf9116616fe4d26f67559f4 100644 --- a/source/tools/monitor/unity/collector/plugin.lua +++ b/source/tools/monitor/unity/collector/plugin.lua @@ -7,6 +7,8 @@ require("common.class") local Cplugin = class("plugin") local dockerinfo = require("common.dockerinfo") +local cjson = require("cjson.safe") +local json = cjson.new() function Cplugin:_init_(resYaml, ffi, proto_q, so) self._ffi = ffi diff --git a/source/tools/monitor/unity/collector/plugin.yaml b/source/tools/monitor/unity/collector/plugin.yaml index 26be92f9a70ef6a7eca0a47f3de752ace2b16882..45fda75b9aa69c6fb8925b614f18532bf8367d08 100644 --- a/source/tools/monitor/unity/collector/plugin.yaml +++ b/source/tools/monitor/unity/collector/plugin.yaml @@ -21,14 +21,19 @@ config: mem: 40 # unit mb tasks: 10 # monitor 10 pid max. +forkRun: + - + cmd: "/usr/bin/python" + args: ["../test/curl/forkRun.py"] + outline: - - /tmp/sysom + - /var/sysom/outline -#pushTo: -# to: "Influx" -# host: "ld-wz9d17b514mg6kjkx-proxy-tsdb.lindorm.rds.aliyuncs.com" -# port: 8242 -# url: "/api/v2/write?db=lua" +pushTo: + to: "Influx" + host: "ld-wz9d17b514mg6kjkx-proxy-tsdb.lindorm.rds.aliyuncs.com" + port: 8242 + url: "/api/v2/write?db=lua" container: mode: "pods" diff --git a/source/tools/monitor/unity/collector/plugin/Makefile b/source/tools/monitor/unity/collector/plugin/Makefile index 7ec4fa48628198b70c585bbe835576490111d0a6..1523bc193c4af5ca438c5e5fef673ae90bbd3883 100644 --- a/source/tools/monitor/unity/collector/plugin/Makefile +++ b/source/tools/monitor/unity/collector/plugin/Makefile @@ -5,7 +5,7 @@ OBJS := proto_sender.o LIB := libproto_sender.a -DEPMOD=sample threads kmsg proc_schedstat proc_loadavg unity_nosched unity_irqoff cpudist cpu_bled net_health net_retrans netlink cpufreq gpuinfo pmu_events +DEPMOD=sample threads kmsg proc_schedstat proc_loadavg unity_nosched unity_irqoff cpudist cpu_bled net_health net_retrans netlink cpufreq gpuinfo pmu_events virtout sum_retrans all: $(LIB) $(DEPMOD) diff --git a/source/tools/monitor/unity/collector/plugin/kmsg/kmsg.c b/source/tools/monitor/unity/collector/plugin/kmsg/kmsg.c index e8b028e20bab5fc4e2e62ea8ded698ba115586ac..591159f9db65bad32b41761ca5ecac121e8adfb1 100644 --- a/source/tools/monitor/unity/collector/plugin/kmsg/kmsg.c +++ b/source/tools/monitor/unity/collector/plugin/kmsg/kmsg.c @@ -41,12 +41,12 @@ static int kmsg_thread_func(struct beeQ* q, void * arg) { fd = open("/dev/kmsg", O_RDONLY | O_NONBLOCK); if (fd < 0) { goto endOpen; - } + }/* ret = lseek(fd, 0, SEEK_DATA); if (ret < 0) { perror("kmsg seek error"); goto endLseek; - } + }*/ ret = 1; // strip old message. while (ret > 0) { @@ -79,6 +79,7 @@ static int kmsg_thread_func(struct beeQ* q, void * arg) { perror("kmsg read2 failed."); goto endRead; } + buff[ret -1] = '\0'; unity_alloc_lines(lines, 1); line = unity_get_line(lines, 0); diff --git a/source/tools/monitor/unity/collector/plugin/sum_retrans/Makefile b/source/tools/monitor/unity/collector/plugin/sum_retrans/Makefile new file mode 100644 index 0000000000000000000000000000000000000000..d0c3391a855a9e62347a513e73d253a4c8cced4f --- /dev/null +++ b/source/tools/monitor/unity/collector/plugin/sum_retrans/Makefile @@ -0,0 +1,8 @@ + +newdirs := $(shell find ./ -type d) + +bpfsrcs := sum_retrans.bpf.c +csrcs := sum_retrans.c +so := libsum_retrans.so + +include ../bpfso.mk \ No newline at end of file diff --git a/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.bpf.c b/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.bpf.c new file mode 100644 index 0000000000000000000000000000000000000000..16ac99107678b8a6ad0b831e110093c6f25c2cbf --- /dev/null +++ b/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.bpf.c @@ -0,0 +1,47 @@ +// +// Created by 廖肇燕 on 2023/4/11. +// +#define BPF_NO_GLOBAL_DATA +#include +#include + +BPF_HASH(dips, u32, u64, 1024); +BPF_HASH(inums, u32, u64, 1024); + +static inline u32 read_ns_inum(struct sock *sk) +{ + if (sk) { + return BPF_CORE_READ(sk, __sk_common.skc_net.net, ns.inum); + } + return 0; +} + +static void inc_value(struct bpf_map* maps, u32 k) { + u64 *pv = bpf_map_lookup_elem(maps, &k); + if (pv) { + __sync_fetch_and_add(pv, 1); + } else { + u64 v = 1; + bpf_map_update_elem(maps, &k, &v, BPF_ANY); + } +} + +struct tcp_retrans_args { + u64 pad; + u64 skb; + u64 sk; + u16 sport; + u16 dport; + u32 sip; + u32 dip; +}; +SEC("tracepoint/tcp/tcp_retransmit_skb") +int tcp_retransmit_skb_hook(struct tcp_retrans_args *args){ + struct sock *sk = (struct sock *)args->sk; + u32 inum = read_ns_inum(sk); + u32 ip = args->dip; + + inc_value((struct bpf_map *)&inums, inum); + inc_value((struct bpf_map *)&dips, ip); + return 0; +} diff --git a/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.c b/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.c new file mode 100644 index 0000000000000000000000000000000000000000..ea11ba9900cfb9b7cd97d548b0eed8334fe558fc --- /dev/null +++ b/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.c @@ -0,0 +1,110 @@ +// +// Created by 廖肇燕 on 2023/4/11. +// + +#include "sum_retrans.h" +#include "../bpf_head.h" +#include "sum_retrans.skel.h" + +#include +#include + +#include +#include +#include + + +DEFINE_SEKL_OBJECT(sum_retrans); +static int inum_fd = 0; +static int dip_fd = 0; +int init(void *arg) +{ + int ret; + printf("sum_retrans plugin install.\n"); + ret = LOAD_SKEL_OBJECT(sum_retrans, perf); + inum_fd = coobpf_map_find(sum_retrans->obj, "inums"); + dip_fd = coobpf_map_find(sum_retrans->obj, "dips"); + return ret; +} + +#define LOG_MAX 4096 +static char log[LOG_MAX]; + +static int transIP(unsigned long lip, char *result, int size) { + inet_ntop(AF_INET, (void *) &lip, result, size); + return 0; +} + +static void pack_dip() { + char ips[32]; + char buff[64]; + unsigned long value; + unsigned int ip, ip_next; + + log[0] = '\0'; + ip = 0; + while (coobpf_key_next(dip_fd, &ip, &ip_next) == 0) { + bpf_map_lookup_elem(dip_fd, &ip_next, &value); + ip = ip_next; + + transIP(ip, ips, 32); + snprintf(buff, 64, "%s:%ld,", ips, value); + strncat(log, buff, 4096); + ip = ip_next; + } + + ip = 0; + while (coobpf_key_next(dip_fd, &ip, &ip_next) == 0) { + bpf_map_delete_elem(dip_fd, &ip_next); + ip = ip_next; + } +} + +static void pack_inum() { + char buff[64]; + unsigned long value; + unsigned int inum, inum_next; + + log[0] = '\0'; + + inum = 0; + while (coobpf_key_next(inum_fd, &inum, &inum_next) == 0) { + bpf_map_lookup_elem(inum_fd, &inum_next, &value); + snprintf(buff, 64, "%u:%ld,", inum_next, value); + strncat(log, buff, 4096); + inum = inum_next; + } + + inum = 0; + while (coobpf_key_next(inum_fd, &inum, &inum_next) == 0) { + bpf_map_delete_elem(inum_fd, &inum_next); + inum = inum_next; + } +} + +int call(int t, struct unity_lines *lines) +{ + struct unity_line* line; + + pack_dip(); + if (strlen(log) > 0) { + unity_alloc_lines(lines, 2); // 预分配好 + + line = unity_get_line(lines, 0); + unity_set_table(line, "retrans_dip"); + unity_set_log(line, "ip_log", log); + + pack_inum(); + line = unity_get_line(lines, 1); + unity_set_table(line, "retrans_inum"); + unity_set_log(line, "inum_log", log); + } + + return 0; +} + +void deinit(void) +{ + printf("sum_retrans plugin uninstall.\n"); + DESTORY_SKEL_BOJECT(sum_retrans); +} diff --git a/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.h b/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.h new file mode 100644 index 0000000000000000000000000000000000000000..4faecd9a3ec06c061b25523644c3dba9963a836c --- /dev/null +++ b/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.h @@ -0,0 +1,8 @@ +// +// Created by 廖肇燕 on 2023/4/11. +// + +#ifndef UNITY_SUM_RETRANS_H +#define UNITY_SUM_RETRANS_H + +#endif //UNITY_SUM_RETRANS_H diff --git a/source/tools/monitor/unity/collector/plugin/virtout/Makefile b/source/tools/monitor/unity/collector/plugin/virtout/Makefile new file mode 100644 index 0000000000000000000000000000000000000000..f0e8a47f3f89b8f918ce8aaf7422838616af5cc6 --- /dev/null +++ b/source/tools/monitor/unity/collector/plugin/virtout/Makefile @@ -0,0 +1,8 @@ + +newdirs := $(shell find ./ -type d) + +bpfsrcs := virtout.bpf.c +csrcs := virtout.c +so := libvirtout.so + +include ../bpfso.mk \ No newline at end of file diff --git a/source/tools/monitor/unity/collector/plugin/virtout/virtout.bpf.c b/source/tools/monitor/unity/collector/plugin/virtout/virtout.bpf.c new file mode 100644 index 0000000000000000000000000000000000000000..25d3e612edc1773a9d26307bad4f1e5fcee3ee84 --- /dev/null +++ b/source/tools/monitor/unity/collector/plugin/virtout/virtout.bpf.c @@ -0,0 +1,69 @@ +// +// Created by 廖肇燕 on 2023/2/23. +// +#define BPF_NO_GLOBAL_DATA + +#include +#include +#include "virtout.h" + +#define MAX_ENTRY 128 +#define BPF_F_FAST_STACK_CMP (1ULL << 9) +#define KERN_STACKID_FLAGS (0 | BPF_F_FAST_STACK_CMP) + +#define PERIOD_TIME (10 * 1000 * 1000ULL) +#define THRESHOLD_TIME (200 * 1000 * 1000ULL) + +BPF_ARRAY(virtdist, u64, 20); +BPF_PERF_OUTPUT(perf, 1024); +BPF_STACK_TRACE(stack, MAX_ENTRY); +BPF_PERCPU_ARRAY(perRec, u64, 2); + +static inline u64 get_last(int index) { + u64 *pv = bpf_map_lookup_elem(&perRec, &index); + if (pv) { + return *pv; + } + return 0; +} + +static inline void save_last(int index, u64 t) { + bpf_map_update_elem(&perRec, &index, &t, BPF_ANY); +} + +static inline void check_time(struct bpf_perf_event_data *ctx, + int index, + struct bpf_map* event) { + u64 t = ns(); + u64 last = get_last(index); + s64 delta; + + save_last(index, t); + delta = t - last; + + if (last && delta > 2 * PERIOD_TIME) { + delta -= PERIOD_TIME; + bpf_printk("delta %llu.\n", delta); + hist10_push((struct bpf_map_def *)&virtdist, delta / PERIOD_TIME); + if (delta >= THRESHOLD_TIME) { + struct task_struct* task = (struct task_struct *)bpf_get_current_task(); + struct data_t data = {}; + + data.pid = pid(); + data.stack_id = bpf_get_stackid(ctx, &stack, KERN_STACKID_FLAGS); + data.delta = delta; + data.cpu = cpu(); + bpf_get_current_comm(&data.comm, TASK_COMM_LEN); + bpf_printk("delta %llu, pid:%d.\n", delta, data.pid); + + bpf_perf_event_output(ctx, event, BPF_F_CURRENT_CPU, &data, sizeof(data)); + } + } +} + +SEC("perf_event") +int sw_clock(struct bpf_perf_event_data *ctx) +{ + check_time(ctx, 1, (struct bpf_map *)&perf); + return 0; +} diff --git a/source/tools/monitor/unity/collector/plugin/virtout/virtout.c b/source/tools/monitor/unity/collector/plugin/virtout/virtout.c new file mode 100644 index 0000000000000000000000000000000000000000..e3b3270223100de5758dcdd4ab1a800a0e9fd2d5 --- /dev/null +++ b/source/tools/monitor/unity/collector/plugin/virtout/virtout.c @@ -0,0 +1,229 @@ +// +// Created by 廖肇燕 on 2023/2/23. +// + +#include +#define COOLBPF_PERF_THREAD +#include "../bpf_head.h" +#include "virtout.h" +#include "virtout.skel.h" + +#include +#include +#include +#include +#include + + +#define CPU_DIST_INDEX 4 +#define DIST_ARRAY_SIZE 20 + +static volatile int budget = 0; // for log budget +static int dist_fd = 0; +static int stack_fd = 0; +static int perf_fds[1024]; + +int proc(int stack_fd, struct data_t *e, struct unity_line *line); +void handle_event(void *ctx, int cpu, void *data, __u32 data_sz) +{ + int ret; + if (budget > 0) { + struct data_t *e = (struct data_t *)data; + struct beeQ *q = (struct beeQ *)ctx; + struct unity_line *line; + struct unity_lines *lines = unity_new_lines(); + + unity_alloc_lines(lines, 1); + line = unity_get_line(lines, 0); + ret = proc(stack_fd, e, line); + if (ret >= 0) { + beeQ_send(q, lines); + } + budget --; + } +} + +static void close_perf_fds(void) { + int i; + for (i = 0; i < 1024; i ++) { + if (perf_fds[i] > 0) { + close(perf_fds[i]); + perf_fds[i] = 0; + } + } +} + +static long perf_event_open(struct perf_event_attr *hw_event, pid_t pid, + int cpu, int group_fd, int flags) +{ + int ret; + + ret = syscall(__NR_perf_event_open, hw_event, pid, cpu, + group_fd, flags); + return ret; +} + +DEFINE_SEKL_OBJECT(virtout); +static struct bpf_prog_skeleton *search_progs(const char* func) { + struct bpf_object_skeleton *s; + int i; + + s = virtout->skeleton; + for (i = 0; i < s->prog_cnt; i ++) { + if (strcmp(s->progs[i].name, func) == 0) { + return &(s->progs[i]); + } + } + return NULL; +} + +static int setup_perf_events(const char* func) { + int nr_cpus = libbpf_num_possible_cpus(); + int i; + struct bpf_link *link; + struct bpf_prog_skeleton *progs; + + progs = search_progs(func); + + struct perf_event_attr perf_attr = { + .type = PERF_TYPE_SOFTWARE, + .config = PERF_COUNT_SW_CPU_CLOCK, + .freq = 0, + .sample_period = 10 * 1000 * 1000, + }; + + for (i = 0; i < nr_cpus; i ++) { + perf_fds[i] = perf_event_open(&perf_attr, -1, i, -1, 0); + if (perf_fds[i] < 0) { + if (errno == ENODEV) { + printf("skip offline cpu id: %d\n", i); + continue; + } else { + perror("syscall failed."); + close_perf_fds(); + return -1; + } + } + + link = bpf_program__attach_perf_event(*(progs->prog), perf_fds[i]); + if (!link) { + perror("attach failed."); + close_perf_fds(); + return -1; + } + *(progs->link) = link; + } + printf("setup virout OK.\n"); + return 0; +} + +int init(void *arg) { + int ret; + printf("virtout plugin install.\n"); + + ret = LOAD_SKEL_OBJECT(virtout, perf); + if (ret < 0) { + return ret; + } + ret = setup_perf_events("sw_clock"); + if (ret < 0) { + return ret; + } + dist_fd = coobpf_map_find(virtout->obj, "virtdist"); + stack_fd = coobpf_map_find(virtout->obj, "stack"); + return ret; +} + +static int get_dist(unsigned long *locals) { + int i = 0; + unsigned long value = 0; + int key, key_next; + + key = 0; + while (coobpf_key_next(dist_fd, &key, &key_next) == 0) { + coobpf_key_value(dist_fd, &key_next, &value); + locals[i ++] = value; + if (i > DIST_ARRAY_SIZE) { + break; + } + key = key_next; + } + return i; +} + +static int cal_dist(unsigned long* values) { + int i, j; + int size; + static unsigned long rec[DIST_ARRAY_SIZE] = {0}; + unsigned long locals[DIST_ARRAY_SIZE]; + + size = get_dist(locals); + for (i = 0; i < CPU_DIST_INDEX - 1; i ++) { + values[i] = locals[i] - rec[i]; + rec[i] = locals[i]; + } + j = i; + values[j] = 0; + for (; i < size; i ++) { + values[j] += locals[i] - rec[i]; + rec[i] = locals[i]; + } + return 0; +} + +int call(int t, struct unity_lines *lines) { + int i; + unsigned long values[CPU_DIST_INDEX]; + const char *names[] = { "ms100", "s1", "s10", "so"}; + struct unity_line* line; + + budget = t; + + unity_alloc_lines(lines, 1); // 预分配好 + line = unity_get_line(lines, 0); + unity_set_table(line, "virtout_dist"); + + cal_dist(values); + for (i = 0; i < CPU_DIST_INDEX; i ++ ) { + unity_set_value(line, i, names[i], values[i]); + } + return 0; +} + + +void deinit(void) +{ + printf("virout plugin uninstall.\n"); + close_perf_fds(); + DESTORY_SKEL_BOJECT(virtout); +} + +#define LOG_MAX 4096 +static char log[LOG_MAX]; + +int proc(int stack_fd, struct data_t *e, struct unity_line *line) { + int i; + unsigned long addr[128]; + int id = e->stack_id; //last stack + struct ksym_cell* cell; + + snprintf(log, LOG_MAX, "task:%d(%s);cpu:%d;delayed:%ld;callstack:", e->pid, e->comm, e->cpu, e->delta); + coobpf_key_value(stack_fd, &id, &addr); + + for (i = 0; i < 128; i ++) { + if (addr[i] > 0) { + cell = ksym_search(addr[i]); + if (cell != NULL) { + strncat(log, cell->func, LOG_MAX); + } else { + strncat(log, "!nil", LOG_MAX); + } + strncat(log, ",", LOG_MAX); + } else { + break; + } + } + unity_set_table(line, "virtout_log"); + unity_set_log(line, "log", log); + return 0; +} diff --git a/source/tools/monitor/unity/collector/plugin/virtout/virtout.h b/source/tools/monitor/unity/collector/plugin/virtout/virtout.h new file mode 100644 index 0000000000000000000000000000000000000000..a0acb6c82bb20347c6340ca0ead678968ec6b0ba --- /dev/null +++ b/source/tools/monitor/unity/collector/plugin/virtout/virtout.h @@ -0,0 +1,18 @@ +// +// Created by 廖肇燕 on 2023/4/10. +// + +#ifndef UNITY_VIRTOUT_H +#define UNITY_VIRTOUT_H + +#define TASK_COMM_LEN 16 + +struct data_t { + int pid; + int cpu; + unsigned int stack_id; + unsigned long delta; + char comm[TASK_COMM_LEN]; +}; + +#endif //UNITY_VIRTOUT_H diff --git a/source/tools/monitor/unity/collector/postEngine/engine.lua b/source/tools/monitor/unity/collector/postEngine/engine.lua index 644ad2af428b963a8d7fd129a9cc1c74874212ac..c3de1f829d4f61722998b98984129804924bd2c0 100644 --- a/source/tools/monitor/unity/collector/postEngine/engine.lua +++ b/source/tools/monitor/unity/collector/postEngine/engine.lua @@ -11,6 +11,7 @@ local CvProto = require("collector.vproto") local pystring = require("common.pystring") local system = require("common.system") local cjson = require("cjson.safe") +local CexecBase = require("collector.postEngine.execBase") local Cengine = class("engine", CvProto) @@ -25,13 +26,20 @@ function Cengine:setTask(taskMons) self._task = taskMons end -function Cengine:pushTask(msgs) +function Cengine:pushTask(e, msgs) local events = pystring:split(msgs, '\n') for _, msg in ipairs(events) do print(msg) local res = cjson.decode(msg) - if res.cmd == "mon_pid" then + local cmd = res.cmd + if cmd == "mon_pid" then self._task:add(res.pid, res.loop) + elseif cmd == "exec" then -- exec a cmd + local execCmd = res.exec + local args = res.args or {} + local second = res.second or 1 + local exec = CexecBase.new(execCmd, args, second) + exec:addEvents(e) end end end @@ -45,7 +53,7 @@ function Cengine:proc(t, event, msgs) local bytes = self._proto:encode(lines) self._proto:que(bytes) - self:pushTask(msgs) + self:pushTask(event, msgs) end function Cengine:work(t, event) diff --git a/source/tools/monitor/unity/collector/postEngine/execBase.lua b/source/tools/monitor/unity/collector/postEngine/execBase.lua new file mode 100644 index 0000000000000000000000000000000000000000..59c3ac40a98ecf989f5ef91a64ac5982d34ed3b2 --- /dev/null +++ b/source/tools/monitor/unity/collector/postEngine/execBase.lua @@ -0,0 +1,65 @@ +--- +--- Generated by EmmyLua(https://github.com/EmmyLua) +--- Created by liaozhaoyan. +--- DateTime: 2023/4/12 00:01 +--- + +require("common.class") +local unistd = require("posix.unistd") +local pwait = require("posix.sys.wait") +local signal = require("posix.signal") +local pystring = require("common.pystring") +local system = require("common.system") + +local CexecBase = class("execBase") +local interval = 5 --- poll for every 5 second + +local function run(cmd, args) + local pid, err = unistd.fork() + if pid > 0 then -- for self + print("pid: " .. pid) + return pid + elseif pid == 0 then -- for child + local errno + prctl_death_kill() + _, err, errno = unistd.exec(cmd, args) + assert(not errno, "exec failed." .. err .. errno) + else + error("fork report" .. err) + end +end + +function CexecBase:_init_(cmd, args, seconds) + self.cmd = cmd + self._cnt = 0 + self._loop = seconds / interval + + self._pid = run(cmd, args) +end + +function CexecBase:addEvents(e) + e:addEvent(self.cmd, self, interval, true, self._loop) +end + +local function kill(pid) + signal.kill(pid, signal.SIGKILL) -- force to kill task + pwait.wait(pid) -- wait task +end + +function CexecBase:work() + local cnt = self._cnt + if cnt >= self._loop then + local pid, stat, exit = pwait.wait(self._pid, pwait.WNOHANG) + if pid == nil then + error("wait failed " .. stat .. exit) + end + if not exit then -- process not exit + print("force to kill " .. self._pid) + kill(self._pid) + end + return -1 -- delete from task list. + end + self._cnt = cnt + 1 +end + +return CexecBase diff --git a/source/tools/monitor/unity/common/fifo.lua b/source/tools/monitor/unity/common/fifo.lua index 352d4d2fab5e6a9e18eadf24ba8693751ab84af1..76dfd3075fd964f592251cbc80eb6d242c584208 100644 --- a/source/tools/monitor/unity/common/fifo.lua +++ b/source/tools/monitor/unity/common/fifo.lua @@ -46,6 +46,10 @@ function Cfifo:len() return self._count end +function Cfifo:capacity() + return self._max +end + function Cfifo:value(index) index = index + self._head - 1 return self.list[index] diff --git a/source/tools/monitor/unity/common/inotifies.lua b/source/tools/monitor/unity/common/inotifies.lua index f9aaf7c94b4c84a84da321c1833e716fed47b344..d8b33c784f23322e43c0b1ef0c579179d9e9fe4c 100644 --- a/source/tools/monitor/unity/common/inotifies.lua +++ b/source/tools/monitor/unity/common/inotifies.lua @@ -9,7 +9,6 @@ require("common.class") local Cinotifies = class("inotifies") local dirent = require("posix.dirent") local pstat = require("posix.sys.stat") -local fcntl = require("posix.fcntl") local system = require("common.system") local inotify = require('inotify') @@ -27,21 +26,6 @@ local function walk_dirs(path, dirs) end end -local function fdNonBlocking(fd) - local res - local flag, err, errno = fcntl.fcntl(fd, fcntl.F_GETFL) - if flag then - res, err, errno = fcntl.fcntl(fd, fcntl.F_SETFL, bit.bor(flag, fcntl.O_NONBLOCK)) - if res then - return 0 - else - system:posixError("fcntl set failed", err, errno) - end - else - system:posixError("fcntl get failed", err, errno) - end -end - local function mon_dirs(path) local handle = inotify.init() local ws = {} @@ -60,7 +44,7 @@ local function mon_dirs(path) end end - fdNonBlocking(handle:getfd()) + system:fdNonBlocking(handle:getfd()) return handle, ws end diff --git a/source/tools/monitor/unity/common/lineParse.lua b/source/tools/monitor/unity/common/lineParse.lua index 925fecddeb66432fafab6ec7d267ca50208b9b46..a70e0d35758dbd0fc55508750e37c5d3695ce511 100644 --- a/source/tools/monitor/unity/common/lineParse.lua +++ b/source/tools/monitor/unity/common/lineParse.lua @@ -10,6 +10,8 @@ local system = require("common.system") local module = {} local json = cjson.new() +json.encode_escape_forward_slash(false) + local function parseLabel(sls, ls) local lss = pystring:split(sls, ",") for _, cell in ipairs(lss) do diff --git a/source/tools/monitor/unity/common/ping.lua b/source/tools/monitor/unity/common/ping.lua index 2d926ebb827391dad7182ba27d6a7a15eb0dd32d..1e9315a10d008ca46550da3eba50e75c86ca7741 100644 --- a/source/tools/monitor/unity/common/ping.lua +++ b/source/tools/monitor/unity/common/ping.lua @@ -7,7 +7,10 @@ require("common.class") local psocket = require("posix.sys.socket") +local ptime = require("posix.sys.time") +local poll = require("posix.poll") local unistd = require("posix.unistd") +local system = require("common.system") local Cping = class("ping") function Cping:_init_(ip, dev) @@ -20,6 +23,8 @@ function Cping:_init_(ip, dev) local ok, err = psocket.setsockopt(fd, psocket.SOL_SOCKET, psocket.SO_BINDTODEVICE, dev) assert(ok, err) + system:fdNonBlocking(fd) + self.fd = fd end @@ -29,16 +34,31 @@ function Cping:_del_() end end +local function diff_us(t1, t0) + local sum1 = t1.tv_sec * 1e6 + t1.tv_usec + local sum0 = t0.tv_sec * 1e6 + t0.tv_usec + return sum1 - sum0 +end + function Cping:ping() + local fd = self.fd local data = string.char(0x08, 0x00, 0x89, 0x98, 0x6e, 0x63, 0x00, 0x04, 0x00) - local ok, err = psocket.sendto(self.fd, data, {family=psocket.AF_INET, addr=self.ip, port=0}) + local t0 = ptime.gettimeofday() + local ok, err = psocket.sendto(fd, data, {family=psocket.AF_INET, addr=self.ip, port=0}) assert(ok, err) - local data, sa = psocket.recvfrom(self.fd, 1024) - assert(data, sa) - - if data then - print('Received ICMP message from ' .. sa.addr) + local r = poll.rpoll(fd, 300) + if r == 0 then + print("receive over time.") + return -1 + elseif r == 1 then + local data, sa = psocket.recvfrom(fd, 1024) + local t1 = ptime.gettimeofday() + assert(data, sa) + return diff_us(t1, t0) + else + print("closed.") + return -2 end end diff --git a/source/tools/monitor/unity/common/system.lua b/source/tools/monitor/unity/common/system.lua index 4c0be4974c7e57f018fa9c419bdcc7a44e77b702..c43a7ffd075cb11c2ad4ae0ee33407460e7aca47 100644 --- a/source/tools/monitor/unity/common/system.lua +++ b/source/tools/monitor/unity/common/system.lua @@ -6,13 +6,30 @@ local socket = require("socket") local serpent = require("common.serpent") - local system = {} function system:sleep(t) socket.select(nil, nil, t) end +function system:fdNonBlocking(fd) + local res + local fcntl = require("posix.fcntl") + local bit = require("bit") + + local flag, err, errno = fcntl.fcntl(fd, fcntl.F_GETFL) + if flag then + res, err, errno = fcntl.fcntl(fd, fcntl.F_SETFL, bit.bor(flag, fcntl.O_NONBLOCK)) + if res then + return 0 + else + system:posixError("fcntl set failed", err, errno) + end + else + system:posixError("fcntl get failed", err, errno) + end +end + function system:deepcopy(object) local lookup_table = {} local function _copy(object) diff --git a/source/tools/monitor/unity/httplib/coHttpCli.lua b/source/tools/monitor/unity/httplib/coHttpCli.lua index 67af16b2df45043ac5447b5293bba6d5899aa2b6..b8d40a734afcdfb41f8b9e6dcbed29083ce5ddad 100644 --- a/source/tools/monitor/unity/httplib/coHttpCli.lua +++ b/source/tools/monitor/unity/httplib/coHttpCli.lua @@ -113,11 +113,18 @@ local function setupSocket(host, port) end end -function CcoHttpCli:_init_(host, port, url, persistent) - self._host = host - self._port = port or 80 - self._url = url or "/" +function CcoHttpCli:_init_(fYaml, persistent) + local res = system:parseYaml(fYaml) + local pushTo = res.pushTo + self._host = pushTo.host + self._port = pushTo.port or 80 + self._url = pushTo.url or "/" self._persistent = persistent + + local Cidentity = require("beaver.identity") + local inst = Cidentity.new(fYaml) + self._instance = inst:id() + self.status = enumStat.closed end @@ -144,6 +151,31 @@ function CcoHttpCli:trans(msgs, body, filter) return "" end +function CcoHttpCli:addInstance(line) -- add instance id for line index. + local cells = line.ls + local hasInstance = false + + if cells then + for _, cell in ipairs(cells) do + if cell.name == "instance" then + hasInstance = true + end + end + end + + if not hasInstance then + local cell = { + name = "instance", + index = self._instance + } + if cells then + table.insert(cells, cell) + else + line.ls = {cell} + end + end +end + function CcoHttpCli:pack(body) local line = self:packCliHead('GET', self._url) local head = { @@ -395,7 +427,7 @@ function CcoHttpCli:closureRead(fd, maxLen) return nil end else - print(system:dump(e)) + system:dumps(e) end return nil end @@ -482,7 +514,6 @@ function CcoHttpCli:work(cffi, efd) self.online = os.time() while true do local body = coroutine.yield() - --print("-- " .. type(msg)) self.status = enumStat.sending local s = self:pack(body) if not self:coWrite(cffi, efd, fd, s) then @@ -497,9 +528,7 @@ function CcoHttpCli:work(cffi, efd) else goto failed end - if self._persistent then - print("continue.") - else + if not self._persistent then goto failed end end diff --git a/source/tools/monitor/unity/httplib/coInflux.lua b/source/tools/monitor/unity/httplib/coInflux.lua index 002ac09ab0485b2f900d7ab5267536f6b356b328..6b00a4c930c8b53f66d0f86efb6d90213545ac9f 100644 --- a/source/tools/monitor/unity/httplib/coInflux.lua +++ b/source/tools/monitor/unity/httplib/coInflux.lua @@ -11,12 +11,14 @@ local lineParse = require("common.lineParse") local CcoInflux = class("coInflux", CcoHttpCli) -function CcoInflux:_init_(host, port, url) - CcoHttpCli._init_(self, host, port, url) +function CcoInflux:_init_(fYaml) + CcoHttpCli._init_(self, fYaml) end function CcoInflux:echo(tReq) - print(tReq.code, tReq.data) + if tReq.code ~= "204" then + print(tReq.code, tReq.data) + end end function CcoInflux:trans(msgs, body, filter) @@ -28,6 +30,7 @@ function CcoInflux:trans(msgs, body, filter) lines = msgs.lines for _, line in ipairs(lines) do c = c + 1 + self:addInstance(line) bodies[c] = lineParse.packs(line) end @@ -40,7 +43,6 @@ function CcoInflux:trans(msgs, body, filter) end function CcoInflux:pack(body) - print(#body) local line = self:packCliHead('POST', self._url) local head = { Host = self._host, diff --git a/source/tools/monitor/unity/httplib/httpApp.lua b/source/tools/monitor/unity/httplib/httpApp.lua index 92b6cd278923a37db627a9406b99cf201ab08838..996792efbde2ccbd057012e05c185ebf672f5c5e 100644 --- a/source/tools/monitor/unity/httplib/httpApp.lua +++ b/source/tools/monitor/unity/httplib/httpApp.lua @@ -14,8 +14,9 @@ function ChttpApp:_init_(frame) ChttpBase._init_(self) end -function ChttpApp:echo(tRet, keep) - local stat = self:packStat(200) +function ChttpApp:echo(tRet, keep, code) + code = code or 200 + local stat = self:packStat(code) local tHead = { ["Content-Type"] = "application/json", ["Connection"] = (keep and "keep-alive") or "close" diff --git a/source/tools/monitor/unity/httplib/httpBase.lua b/source/tools/monitor/unity/httplib/httpBase.lua index 3ec1ff4f4c09243a7b1965358e4816db35e307cf..0de7ba31a391a9b5508b605da98d70a1da72d070 100644 --- a/source/tools/monitor/unity/httplib/httpBase.lua +++ b/source/tools/monitor/unity/httplib/httpBase.lua @@ -25,7 +25,7 @@ function ChttpBase:_installRe(path, frame) frame:registerRe(path, self) end -function ChttpBase:echo(tRet, keep) +function ChttpBase:echo(tRet, keep, code) error("ChttpBase:echo is a virtual function.") end @@ -48,8 +48,8 @@ end function ChttpBase:call(tReq) local keep = checkKeep(tReq) - local tRet = self._urlCb[tReq.path](tReq) - local res = self:echo(tRet, keep) + local tRet, code = self._urlCb[tReq.path](tReq) + local res = self:echo(tRet, keep, code) return res, keep end diff --git a/source/tools/monitor/unity/httplib/httpCli.lua b/source/tools/monitor/unity/httplib/httpCli.lua index c83888f7f7055f37379ea9ad3241f7e0db02a9af..3e0674125de0cb97a3813adeed6fca932170e900 100644 --- a/source/tools/monitor/unity/httplib/httpCli.lua +++ b/source/tools/monitor/unity/httplib/httpCli.lua @@ -61,4 +61,12 @@ function ChttpCli:postTable(Url, t) return self:post(Url, req, headers) end +function ChttpCli:postLine(Url, line) + local headers = { + ["Content-Type"] = "text/plain", + ["Content-Length"] = #line, + } + return self:post(Url, line, headers) +end + return ChttpCli diff --git a/source/tools/monitor/unity/httplib/httpComm.lua b/source/tools/monitor/unity/httplib/httpComm.lua index 46a51516106e92d5d7a39cff0c5c895d102927b1..5986a65b08080d54cbc57aa9eb4bb41b5ce9d90b 100644 --- a/source/tools/monitor/unity/httplib/httpComm.lua +++ b/source/tools/monitor/unity/httplib/httpComm.lua @@ -15,6 +15,8 @@ local ChttpComm = class("httplib.httpComm") local cjson = require("cjson.safe") local json = cjson.new() +json.encode_escape_forward_slash(false) + local function codeTable() return { [100] = "Continue", diff --git a/source/tools/monitor/unity/httplib/httpHtml.lua b/source/tools/monitor/unity/httplib/httpHtml.lua index 9eca219df672a51b9ade78a9e3fddbb058a3c2cc..3d63de08415c7773bb29906aeb9673ff83fbbb67 100644 --- a/source/tools/monitor/unity/httplib/httpHtml.lua +++ b/source/tools/monitor/unity/httplib/httpHtml.lua @@ -100,8 +100,9 @@ local function htmlPack(title, content) return pystring:join("", bodies) end -function ChttpHtml:pack(cType, keep, body) - local stat = self:packStat(200) +function ChttpHtml:pack(cType, keep, body, code) + code = code or 200 + local stat = self:packStat(code) local tHead = { ["Content-Type"] = cType, ["Connection"] = (keep and "keep-alive") or "close" @@ -111,11 +112,11 @@ function ChttpHtml:pack(cType, keep, body) return pystring:join("\r\n", tHttp) end -function ChttpHtml:echo(tRet, keep) +function ChttpHtml:echo(tRet, keep, code) local cType = tRet.type or "text/html" local body = htmlPack(tRet.title, tRet.content) - return self:pack(cType, keep, body) + return self:pack(cType, keep, body, code) end return ChttpHtml diff --git a/source/tools/monitor/unity/httplib/httpPlain.lua b/source/tools/monitor/unity/httplib/httpPlain.lua index f0999d89fec522e292232187489fd8781bab66e6..fc0f075e66d1e03a2b6f805cbd38babd3aaffe19 100644 --- a/source/tools/monitor/unity/httplib/httpPlain.lua +++ b/source/tools/monitor/unity/httplib/httpPlain.lua @@ -14,8 +14,9 @@ function ChttpPlain:_init_(frame) ChttpBase._init_(self) end -function ChttpPlain:echo(tRet, keep) - local stat = self:packStat(200) +function ChttpPlain:echo(tRet, keep, code) + code = code or 200 + local stat = self:packStat(code) local tHead = { ["Content-Type"] = "text/plain", ["Connection"] = (keep and "keep-alive") or "close" diff --git a/source/tools/monitor/unity/httplib/influxCli.lua b/source/tools/monitor/unity/httplib/influxCli.lua index 7b8c60473be28e0dc1c8feaafe6c0b127d179d67..a1eef70cf77b2fa17ee77dd36bf398eb6d36201b 100644 --- a/source/tools/monitor/unity/httplib/influxCli.lua +++ b/source/tools/monitor/unity/httplib/influxCli.lua @@ -9,8 +9,8 @@ local ChttpCli = require("httplib.httpCli") local CinfluxCli = class("influxCli", ChttpCli) -function CinfluxCli:_init_(url, db, user, pass, proxy) - self._url = string.format("%swrite?u=%s&p=%s&db=%s", url, user, pass, db) +function CinfluxCli:_init_(url, proxy) + self._url = url ChttpCli._init_(self, proxy) end diff --git a/source/tools/monitor/unity/test/curl/forkRun.py b/source/tools/monitor/unity/test/curl/forkRun.py new file mode 100644 index 0000000000000000000000000000000000000000..7ab0ceb8032c5f36c030c64107dc3acba27e238c --- /dev/null +++ b/source/tools/monitor/unity/test/curl/forkRun.py @@ -0,0 +1,17 @@ + +import time +from outLine import CnfPut + + +def loop(nf): + i = 1 + while True: + nf.puts('forkRun,mode=java log="hello runtime.",count=%d' % i) + i += 1 + time.sleep(15) + + +if __name__ == "__main__": + time.sleep(2) + nf = CnfPut() + loop(nf) \ No newline at end of file diff --git a/source/tools/monitor/unity/test/curl/hello.py b/source/tools/monitor/unity/test/curl/hello.py new file mode 100644 index 0000000000000000000000000000000000000000..c497ca2f47acc5395f3ad944dd7eb6a028e1f034 --- /dev/null +++ b/source/tools/monitor/unity/test/curl/hello.py @@ -0,0 +1,7 @@ +import time +import os + +print(os.getcwd()) +while True: + print("hello.") + time.sleep(3) diff --git a/source/tools/monitor/unity/test/curl/influx.lua b/source/tools/monitor/unity/test/curl/influx.lua index eb8ce1540ab336e83cc6670cfb7770eea9ec6d29..188992c461a80b6fec5b3641848abbcc9a37ba20 100644 --- a/source/tools/monitor/unity/test/curl/influx.lua +++ b/source/tools/monitor/unity/test/curl/influx.lua @@ -8,12 +8,6 @@ package.path = package.path .. ";../../?.lua;" local system = require("common.system") local CinfluxCli = require("httplib.influxCli") -local cli = CinfluxCli.new("http://172.24.90.148:8086/", "lua", "xxx", "xxxxxx") - -local v = 1.1 -while true do - local s = string.format("test,label=a v=%f", v) - cli:puts(s) - v = v + 1 - system:sleep(2) -end +local cli = CinfluxCli.new("http://ld-wz9d17b514mg6kjkx-proxy-tsdb.lindorm.rds.aliyuncs.com:8242/api/v2/write?db=lua") +local res = cli:puts('virtout,instance=self log="task:0(swapper/13), cpu:13, delayed:162994631, callstack:mwait_idle_with_hints.constprop.0,intel_idle,cpuidle_enter_state,cpuidle_enter,cpuidle_idle_call,do_idle,cpu_startup_entry,secondary_startup_64_no_verify,"') +system:dumps(res) diff --git a/source/tools/monitor/unity/test/curl/outLine.py b/source/tools/monitor/unity/test/curl/outLine.py new file mode 100644 index 0000000000000000000000000000000000000000..cd5ad58cfb4341acf3b186158c1bd021d84e6dad --- /dev/null +++ b/source/tools/monitor/unity/test/curl/outLine.py @@ -0,0 +1,23 @@ +import os +import socket + +PIPE_PATH = "/var/sysom/outline" +MAX_BUFF = 128 * 1024 + + +class CnfPut(object): + def __init__(self, path=PIPE_PATH): + self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + self._path = path + if not os.path.exists(self._path): + raise ValueError("pipe path is not exist. please check Netinfo is running.") + + def puts(self, s): + if len(s) > MAX_BUFF: + raise ValueError("message len %d, is too long ,should less than%d" % (len(s), MAX_BUFF)) + return self._sock.sendto(s, self._path) + + +if __name__ == "__main__": + nf = CnfPut() + nf.puts('runtime,mode=java log="hello runtime."') diff --git a/source/tools/monitor/unity/test/curl/poBeaver/poBeaver.lua b/source/tools/monitor/unity/test/curl/poBeaver/poBeaver.lua index 7271e4f3c48fb2b19b496bbf44e50cc5099bd635..137c9a0dcac878bdedc426f97c4e07e52f717318 100644 --- a/source/tools/monitor/unity/test/curl/poBeaver/poBeaver.lua +++ b/source/tools/monitor/unity/test/curl/poBeaver/poBeaver.lua @@ -63,22 +63,6 @@ function CpoBeaver:_install_fd(port, ip, backlog) end end -local function fdNonBlocking(fd) - local res - local flag, err, errno = fcntl.fcntl(fd, fcntl.F_GETFL) - if flag then - res, err, errno = fcntl.fcntl(fd, fcntl.F_SETFL, bit.bor(flag, fcntl.O_NONBLOCK)) - if res then - return 0 - else - posixError("fcntl set failed", err, errno) - end - else - posixError("fcntl get failed", err, errno) - end -end - - function CpoBeaver:read(fd, maxLen) maxLen = maxLen or 1 * 1024 * 1024 -- signal conversation accept 1M stream max local function readFd() @@ -203,7 +187,7 @@ function CpoBeaver:accept(fd, e) elseif e.IN then local nfd, err, errno = socket.accept(fd) if nfd then - fdNonBlocking(nfd) + system:fdNonBlocking(nfd) self:co_add(nfd) else posixError("accept new socket failed", err, errno) diff --git a/source/tools/monitor/unity/test/curl/postCmd.lua b/source/tools/monitor/unity/test/curl/postCmd.lua new file mode 100644 index 0000000000000000000000000000000000000000..3bbf577a92878e926043ee0b9c53d5bef5b4a0bd --- /dev/null +++ b/source/tools/monitor/unity/test/curl/postCmd.lua @@ -0,0 +1,15 @@ +--- +--- Generated by EmmyLua(https://github.com/EmmyLua) +--- Created by liaozhaoyan. +--- DateTime: 2023/4/17 19:55 +--- + +package.path = package.path .. ";../../?.lua;" +local system = require("common.system") +local ChttpCli = require("httplib.httpCli") + +local cli = ChttpCli.new() +local url = "http://127.0.0.1:8400/api/trig" +local req = {cmd = "exec", exec = "/usr/bin/pwd"} +local res = cli:postTable(url, req) +system:dumps(res) diff --git a/source/tools/monitor/unity/test/curl/postLine.lua b/source/tools/monitor/unity/test/curl/postLine.lua new file mode 100644 index 0000000000000000000000000000000000000000..e04791a91168261ab57b404481b8610bdf96c1e8 --- /dev/null +++ b/source/tools/monitor/unity/test/curl/postLine.lua @@ -0,0 +1,14 @@ +--- +--- Generated by EmmyLua(https://github.com/EmmyLua) +--- Created by liaozhaoyan. +--- DateTime: 2023/4/24 02:21 +--- + +package.path = package.path .. ";../../?.lua;" +local system = require("common.system") +local ChttpCli = require("httplib.httpCli") + +local cli = ChttpCli.new() +local url = "http://127.0.0.1:8400/api/line" +local res = cli:postLine(url, "lineTable,index=abc value=1") +system:dumps(res) diff --git a/source/tools/monitor/unity/test/curl/postLine.py b/source/tools/monitor/unity/test/curl/postLine.py new file mode 100644 index 0000000000000000000000000000000000000000..9885d89c09d751534b74a3c390f3e4c159b22b92 --- /dev/null +++ b/source/tools/monitor/unity/test/curl/postLine.py @@ -0,0 +1,6 @@ +import requests + +url = "http://127.0.0.1:8400/api/line" +line = "lineTable,index=abc value=2" +res = requests.post(url, data=line) +print(res) diff --git a/source/tools/monitor/unity/test/curl/postPy.lua b/source/tools/monitor/unity/test/curl/postPy.lua new file mode 100644 index 0000000000000000000000000000000000000000..d4085f0a4a8bfd1571d56be462e157d70204046a --- /dev/null +++ b/source/tools/monitor/unity/test/curl/postPy.lua @@ -0,0 +1,15 @@ +--- +--- Generated by EmmyLua(https://github.com/EmmyLua) +--- Created by liaozhaoyan. +--- DateTime: 2023/4/17 20:21 +--- + +package.path = package.path .. ";../../?.lua;" +local system = require("common.system") +local ChttpCli = require("httplib.httpCli") + +local cli = ChttpCli.new() +local url = "http://127.0.0.1:8400/api/trig" +local req = {cmd = "exec", exec = "/usr/bin/python", args = {"../test/curl/hello.py",}, } +local res = cli:postTable(url, req) +system:dumps(res) \ No newline at end of file diff --git a/source/tools/monitor/unity/test/lab/fileSync/fileSync.py b/source/tools/monitor/unity/test/lab/fileSync/fileSync.py new file mode 100644 index 0000000000000000000000000000000000000000..37b7369770ac754cd69c5a58c2e2998a776e07a4 --- /dev/null +++ b/source/tools/monitor/unity/test/lab/fileSync/fileSync.py @@ -0,0 +1,87 @@ +# -*- coding: utf-8 -*- +""" +------------------------------------------------- + File Name: fileSync.py + Description : + Author : liaozhaoyan + date: 2023/4/18 +------------------------------------------------- + Change Activity: + 2023/4/18: +------------------------------------------------- +""" +import os +import sys + + +FILE_SYNC = 0x010000 +FILE_DIRECT = 0x040000 + + +def getFlags(fdPath): + try: + with open(fdPath) as f: + for i, line in enumerate(f): + if line.startswith("flags:"): + _, ret = line.split(":", 1) + return ret.strip() + except IOError: + return 0 + + +def getFdName(pid, fd): + fdPath = "/proc/%s/fd/%s" % (pid, fd) + return os.readlink(fdPath) + + +def getCmd(pid): + path = "/proc/%s/cmdline" % pid + try: + with open(path) as f: + return f.read() + except IOError: + return "nil" + + +def checkFile(fileLink): + if fileLink.startswith("/") and not fileLink.startswith("/dev"): + return True + return False + + +def checkFlag(pid, fd, flag): + vFlag = int(flag) + link = getFdName(pid, fd) + if checkFile(link): + if vFlag & FILE_SYNC: + print("pid:%s, cmd:%s, file:%s, set sync flag" % (pid, getCmd(pid), link)) + if vFlag & FILE_DIRECT: + print("pid:%s, cmd:%s, file:%s, set direct flag" % (pid, getCmd(pid), link)) + + +def checkPid(pid): + path = "/proc/" + pid + "/fdinfo" + for fd in os.listdir(path): + fdPath = "/".join([path, fd]) + try: + flag = getFlags(fdPath) + except IOError: + continue + checkFlag(pid, fd, flag) + + +def walkGroup(path): + path += "/tasks" + try: + with open(path) as f: + for i, line in enumerate(f): + checkPid(line.strip()) + except IOError: + print("bad path.") + + +if __name__ == "__main__": + path = "/sys/fs/cgroup/pids/kubepods/besteffort/slot_976/231a6b41a7be49fdcecac835bc1487272ba8319b8106692d2134c34b025931fa" + if len(sys.argv) > 1: + path = sys.argv[1] + walkGroup(path) diff --git a/source/tools/monitor/unity/test/lab/hashList.lua b/source/tools/monitor/unity/test/lab/hashList.lua new file mode 100644 index 0000000000000000000000000000000000000000..b9dab8fb760a2c31c308f1445b463c47ed6eabe5 --- /dev/null +++ b/source/tools/monitor/unity/test/lab/hashList.lua @@ -0,0 +1,48 @@ +--- +--- Generated by EmmyLua(https://github.com/EmmyLua) +--- Created by liaozhaoyan. +--- DateTime: 2023/4/21 18:25 +--- + +local aList = {'a', 'ab', 'abc'} +print(#aList, table.getn(aList)) +for _, v in ipairs(aList) do + print(v) +end + +aList[2] = nil +print(#aList, table.getn(aList)) +for _, v in ipairs(aList) do + print(v) +end + +for k, v in pairs(aList) do + print(k, v) +end + +local bList = { + a = 1, + b = 2, + c = 3 +} + +bList.b = nil + +print("list.") +aList = {'a', 'ab', 'abc'} +for i, v in ipairs(aList) do + print(v) + if i == 2 then + table.remove(aList, i) + end +end + +print("list reverse.") +aList = {'a', 'ab', 'abc'} +for i = #aList, 1, -1 do + print(aList[i]) + if i == 2 then + table.remove(aList, i) + end +end + diff --git a/source/tools/monitor/unity/test/lab/jencode.lua b/source/tools/monitor/unity/test/lab/jencode.lua new file mode 100644 index 0000000000000000000000000000000000000000..89fbb74db9a17bbd785e26168255d0e5d154946b --- /dev/null +++ b/source/tools/monitor/unity/test/lab/jencode.lua @@ -0,0 +1,13 @@ +--- +--- Generated by EmmyLua(https://github.com/EmmyLua) +--- Created by liaozhaoyan. +--- DateTime: 2023/4/18 15:10 +--- + +local cjson = require("cjson.safe") +local json = cjson.new() +json.encode_escape_forward_slash(false) + +local log = "task:0(swapper/13), cpu:13, delayed:162994631, callstack:mwait_idle_with_hints.constprop.0,intel_idle,cpuidle_enter_state,cpuidle_enter,cpuidle_idle_call,do_idle,cpu_startup_entry,secondary_startup_64_no_verify," + +print(json.encode(log)) diff --git a/source/tools/monitor/unity/test/lab/kmsg/kmsg.c b/source/tools/monitor/unity/test/lab/kmsg/kmsg.c new file mode 100644 index 0000000000000000000000000000000000000000..613fa03cd2a61300dd0b0177d60568cafdb1c4f0 --- /dev/null +++ b/source/tools/monitor/unity/test/lab/kmsg/kmsg.c @@ -0,0 +1,88 @@ +// +// Created by 廖肇燕 on 2023/4/18. +// + +#define _GNU_SOURCE + +#include +#include +#include +#include +#include +#include +#include +#define KMSG_LINE 8192 + +static int kmsg_set_block(int fd) { + int ret; + int flags = fcntl(fd, F_GETFL); + flags &= ~O_NONBLOCK; + ret = fcntl(fd, F_SETFL, flags); + return ret; +} + +int kmsg_thread_func(void) { + int fd; + int ret; + char buff[KMSG_LINE]; + + + fd = open("/dev/kmsg", O_RDONLY | O_NONBLOCK); + if (fd < 0) { + goto endOpen; + }/* + ret = lseek(fd, 0, SEEK_DATA); + if (ret < 0) { + perror("kmsg seek error"); + goto endLseek; + }*/ + + ret = 1; // strip old message. + while (ret > 0) { + ret = read(fd, buff, KMSG_LINE - 1); + buff[ret] = '\0'; + printf("%s\n", buff); + if (ret < 0) { + if (errno == EAGAIN) { + break; + } + perror("kmsg read failed."); + goto endRead; + } + } + + ret = kmsg_set_block(fd); + if (ret < 0) { + perror("kmsg set block failed."); + goto endBlock; + } + + while (1) { + + ret = read(fd, buff, KMSG_LINE - 1); + if (ret < 0) { + if (errno == EINTR) { + break; + } + perror("kmsg read2 failed."); + goto endRead; + } + buff[ret -1] = '\0'; + + printf("read: %s\n", buff); + } + + close(fd); + return 0; + endBlock: + endRead: + endLseek: + close(fd); + endOpen: + return 1; +} + +int main(void) { + kmsg_thread_func(); + return 0; +} \ No newline at end of file diff --git a/source/tools/monitor/unity/test/lab/listSun.lua b/source/tools/monitor/unity/test/lab/listSun.lua new file mode 100644 index 0000000000000000000000000000000000000000..815cadee79f7c26cefc46d48a0bebf78db0c5228 --- /dev/null +++ b/source/tools/monitor/unity/test/lab/listSun.lua @@ -0,0 +1,9 @@ +--- +--- Generated by EmmyLua(https://github.com/EmmyLua) +--- Created by liaozhaoyan. +--- DateTime: 2023/4/12 17:54 +--- + +local aList = {1, 2, 3, sum=6} +print(#aList) +print(math.max(aList)) \ No newline at end of file diff --git a/source/tools/monitor/unity/test/lab/tping.lua b/source/tools/monitor/unity/test/lab/tping.lua index 29d15355f595cc1baaeef9405708045d670ec664..d8a4e72cfc17ade4ec8cba4055c619cc52e7a9e7 100644 --- a/source/tools/monitor/unity/test/lab/tping.lua +++ b/source/tools/monitor/unity/test/lab/tping.lua @@ -9,6 +9,8 @@ package.path = package.path .. ";../../?.lua;" local Cping = require("common.ping") local p = Cping.new("8.8.8.8", "eth0") -p:ping() +print(p:ping()) p = Cping.new("1.2.3.4", "eth0") -p:ping() \ No newline at end of file +print(p:ping()) +p = Cping.new("172.16.0.253", "eth0") +print(p:ping()) \ No newline at end of file