diff --git a/source/tools/monitor/unity/beaver/beaver.c b/source/tools/monitor/unity/beaver/beaver.c
index 073927299d3602300bc43d3e3cfb97c2d1d1d6cf..8cb6b856e682926f61f4156266d85a9f8300d59a 100644
--- a/source/tools/monitor/unity/beaver/beaver.c
+++ b/source/tools/monitor/unity/beaver/beaver.c
@@ -18,13 +18,14 @@ extern int lua_reg_errFunc(lua_State *L);
extern int lua_check_ret(int ret);
int lua_load_do_file(lua_State *L, const char* path);
-static int call_init(lua_State *L, int err_func, char *fYaml) {
+static int call_init(lua_State *L, int err_func, struct beeQ* q, char *fYaml) {
int ret;
lua_Number lret;
lua_getglobal(L, "init");
+ lua_pushlightuserdata(L, q);
lua_pushstring(L, fYaml);
- ret = lua_pcall(L, 1, 1, err_func);
+ ret = lua_pcall(L, 2, 1, err_func);
if (ret) {
lua_check_ret(ret);
goto endCall;
@@ -64,7 +65,8 @@ void LuaAddPath(lua_State *L, char *name, char *value) {
lua_pop(L, 2);
}
-static lua_State * echos_init(char *fYaml) {
+extern int collector_qout(lua_State *L);
+static lua_State * echos_init(struct beeQ* q, char *fYaml) {
int ret;
int err_func;
@@ -80,12 +82,13 @@ static lua_State * echos_init(char *fYaml) {
LuaAddPath(L, "path", "../beaver/?.lua");
err_func = lua_reg_errFunc(L);
+ lua_register(L, "collector_qout", collector_qout);
ret = lua_load_do_file(L, "../beaver/beaver.lua");
if (ret) {
goto endLoad;
}
- ret = call_init(L, err_func, fYaml);
+ ret = call_init(L, err_func, q, fYaml);
if (ret < 0) {
goto endCall;
}
@@ -131,11 +134,11 @@ static int echos(lua_State *L) {
return ret;
}
-int beaver_init(char *fYaml) {
+int beaver_init(struct beeQ* q, char *fYaml) {
int ret = 0;
while (ret == 0) {
- lua_State *L = echos_init(fYaml);
+ lua_State *L = echos_init(q, fYaml);
if (L == NULL) {
break;
}
diff --git a/source/tools/monitor/unity/beaver/beaver.h b/source/tools/monitor/unity/beaver/beaver.h
index f1a6fd3769312b1f0e90c252dc3d80da96547bf5..058fbb946edea7d79afcaa79e10e415de943a04d 100644
--- a/source/tools/monitor/unity/beaver/beaver.h
+++ b/source/tools/monitor/unity/beaver/beaver.h
@@ -5,6 +5,7 @@
#ifndef UNITY_BEAVER_H
#define UNITY_BEAVER_H
-int beaver_init(char *fYaml);
+#include "../beeQ/beeQ.h"
+int beaver_init(struct beeQ* q, char *fYaml);
#endif //UNITY_BEAVER_H
diff --git a/source/tools/monitor/unity/beaver/beaver.lua b/source/tools/monitor/unity/beaver/beaver.lua
index 2431d99ce7bcc5069490f8d0d3510e55df6a4da3..f8287e1a045f77e25a8c6f3b3aab283d93861a02 100644
--- a/source/tools/monitor/unity/beaver/beaver.lua
+++ b/source/tools/monitor/unity/beaver/beaver.lua
@@ -19,13 +19,12 @@ local CbaseQuery = require("beaver.query.baseQuery")
local lb = nil
-function init(fYaml)
+function init(que, fYaml)
fYaml = fYaml or "../collector/plugin.yaml"
- print(fYaml)
local web = Cframe.new()
CurlIndex.new(web)
- CurlApi.new(web, fYaml)
+ CurlApi.new(web, que, fYaml)
CurlRpc.new(web)
CurlGuide.new(web)
CbaseQuery.new(web, fYaml)
diff --git a/source/tools/monitor/unity/beaver/guide/testjs.html b/source/tools/monitor/unity/beaver/guide/testjs.html
new file mode 100644
index 0000000000000000000000000000000000000000..ac199064ac3885984c64d8869a14793340fc7620
--- /dev/null
+++ b/source/tools/monitor/unity/beaver/guide/testjs.html
@@ -0,0 +1,20 @@
+
+
+
+
+ 菜鸟教程(runoob.com)
+
+
+
+
+我的第一个 JavaScript 程序
+这是一个段落
+
+
+
+
+
\ No newline at end of file
diff --git a/source/tools/monitor/unity/beaver/localBeaver.lua b/source/tools/monitor/unity/beaver/localBeaver.lua
index e281593890f391b7bc4ba0b572ffdefae650055a..548bd94afa22c5cdd48e165545b0cd1359823102 100644
--- a/source/tools/monitor/unity/beaver/localBeaver.lua
+++ b/source/tools/monitor/unity/beaver/localBeaver.lua
@@ -162,7 +162,7 @@ function CLocalBeaver:_install_fd(port, ip, backlog)
end
function CLocalBeaver:read(fd, maxLen)
- maxLen = maxLen or 1 * 1024 * 1024 -- signal conversation accept 1M stream max
+ maxLen = maxLen or 2 * 1024 * 1024 -- signal conversation accept 2M stream max
local function readFd()
local e = coroutine.yield()
if e.ev_close > 0 then
diff --git a/source/tools/monitor/unity/beaver/pushLine.lua b/source/tools/monitor/unity/beaver/pushLine.lua
new file mode 100644
index 0000000000000000000000000000000000000000..846568ec6f0fadb5c64e01b1a56b939e382e91f4
--- /dev/null
+++ b/source/tools/monitor/unity/beaver/pushLine.lua
@@ -0,0 +1,58 @@
+---
+--- Generated by EmmyLua(https://github.com/EmmyLua)
+--- Created by liaozhaoyan.
+--- DateTime: 2023/4/24 01:34
+---
+
+require("common.class")
+local system = require("common.system")
+local CprotoData = require("common.protoData")
+local lineParse = require("common.lineParse")
+local pystring = require("common.pystring")
+
+local CpushLine = class("CpushLine")
+
+function CpushLine:_init_(que)
+ self._proto = CprotoData.new(que)
+end
+
+local function trans(title, ls, vs, log)
+ local labels = {}
+ local values = {}
+ local logs = {}
+
+ local c = 0
+ for k, v in pairs(ls) do
+ c = c + 1
+ labels[c] = {name=k, index=v}
+ end
+ c = 0
+ for k, v in pairs(vs) do
+ c = c + 1
+ values[c] = {name=k, value=v}
+ end
+ c = 0
+ for k, v in pairs(log) do
+ c = c + 1
+ logs[c] = {name=k, log=v}
+ end
+ return {line = title, ls = labels, vs = values, log = logs}
+end
+
+function CpushLine:procLine(line)
+ return trans(lineParse.parse(line))
+end
+
+function CpushLine:procLines(stream)
+ local ss = pystring:split(stream, "\n")
+ local lines = self._proto:protoTable()
+
+ for _, line in ipairs(ss) do
+ table.insert(lines.lines, self:procLine(line))
+ end
+ local bytes = self._proto:encode(lines)
+ self._proto:que(bytes)
+ return 0
+end
+
+return CpushLine
\ No newline at end of file
diff --git a/source/tools/monitor/unity/beaver/url_api.lua b/source/tools/monitor/unity/beaver/url_api.lua
index d45281c1b1ddc863956082d2a3b223eac4874ffb..c03d824ebec43a6dcb51b4487de6b23e9b448fa2 100644
--- a/source/tools/monitor/unity/beaver/url_api.lua
+++ b/source/tools/monitor/unity/beaver/url_api.lua
@@ -9,21 +9,32 @@ local system = require("common.system")
local ChttpApp = require("httplib.httpApp")
local CfoxTSDB = require("tsdb.foxTSDB")
local postQue = require("beeQ.postQue.postQue")
+local CpushLine = require("beaver.pushLine")
local CurlApi = class("urlApi", ChttpApp)
-function CurlApi:_init_(frame, fYaml)
+function CurlApi:_init_(frame, que, fYaml)
ChttpApp._init_(self)
+ self._pushLine = CpushLine.new(que)
self._urlCb["/api/sum"] = function(tReq) return self:sum(tReq) end
self._urlCb["/api/sub"] = function(tReq) return self:sub(tReq) end
self._urlCb["/api/query"] = function(tReq) return self:query(tReq) end
- self._urlCb["/api/que"] = function(tReq) return self:que(tReq) end
- self._urlCb["/api/trig"] = function(tReq) return self:que(tReq) end
+ self._urlCb["/api/trig"] = function(tReq) return self:trig(tReq) end
+ self._urlCb["/api/line"] = function(tReq) return self:line(tReq) end
self:_install(frame)
self:_setupQs(fYaml)
end
-function CurlApi:que(tReq)
+function CurlApi:line(tReq)
+ local stat, _ = pcall(self._pushLine.procLines, self._pushLine, tReq.data)
+ if stat then
+ return "ok"
+ else
+ return "bad line " .. tReq.data, 400
+ end
+end
+
+function CurlApi:trig(tReq)
local stat, tJson = pcall(self.getJson, self, tReq)
if stat and tJson then
local s = self:jencode(tJson)
diff --git a/source/tools/monitor/unity/beeQ/apps.c b/source/tools/monitor/unity/beeQ/apps.c
index ca81e6bef7f00f5a2c548955bddce78d9c4baef1..40e37b57efc2196544691a1694c855a2e889bb76 100644
--- a/source/tools/monitor/unity/beeQ/apps.c
+++ b/source/tools/monitor/unity/beeQ/apps.c
@@ -7,6 +7,7 @@
#include
#include
#include
+#include
#include "beeQ.h"
#include "apps.h"
#include "pushTo.h"
@@ -241,6 +242,11 @@ static int lua_setup_daemon(lua_State *L) {
return 1;
}
+static int prctl_death_kill(lua_State *L) {
+ prctl(PR_SET_PDEATHSIG, SIGKILL);
+ return 0;
+}
+
int collector_qout(lua_State *L) {
int ret;
struct beeQ* q = (struct beeQ*) lua_topointer(L, 1);
@@ -281,6 +287,7 @@ static int app_collector_work(void* q, void* proto_q) {
lua_register(L, "collector_qout", collector_qout);
lua_register(L, "lua_local_clock", lua_local_clock);
lua_register(L, "lua_setup_daemon", lua_setup_daemon);
+ lua_register(L, "prctl_death_kill", prctl_death_kill);
ret = lua_load_do_file(L, "../beeQ/collectors.lua");
if (ret) {
diff --git a/source/tools/monitor/unity/beeQ/bees.c b/source/tools/monitor/unity/beeQ/bees.c
index 88239bfb7bdf911f5ac06d40d97f80fbd3779c92..950c4bc9b36d01b2b8afeaf0c4ae7d21c8b53281 100644
--- a/source/tools/monitor/unity/beeQ/bees.c
+++ b/source/tools/monitor/unity/beeQ/bees.c
@@ -91,7 +91,7 @@ int main(int argc, char *argv[]) {
if (pid_outline == 0) {
exit(1);
}
- beaver_init(g_yaml_file);
+ beaver_init(q, g_yaml_file);
fprintf(stderr, "loop exit.");
beeQ_stop(q);
diff --git a/source/tools/monitor/unity/beeQ/collectors.lua b/source/tools/monitor/unity/beeQ/collectors.lua
index 5e97d982141e571f7ba6e521bec06f436a135681..10206def303eba01448380f1b534a4895f0c5f85 100644
--- a/source/tools/monitor/unity/beeQ/collectors.lua
+++ b/source/tools/monitor/unity/beeQ/collectors.lua
@@ -121,18 +121,12 @@ local function setupPostEngine(que, proto_q, fYaml, tid)
return w, 1
end
-local function setupIODiagnose(que, proto_q, fYaml, tid)
- local CioDiagnose = require("collector.io.io_diagnose")
- local w = CioDiagnose.new(que, proto_q, fYaml, tid)
- return w, 1
-end
-
function work(que, proto_q, yaml, tid)
local fYaml = yaml or "../collector/plugin.yaml"
checkSos()
local e = CrbEvent.new()
- local main, engine, io, unit
+ local main, engine, unit
main, unit = setupMainCollector(que, proto_q, fYaml, tid)
e:addEvent("mainCollector", main, unit)
@@ -140,7 +134,5 @@ function work(que, proto_q, yaml, tid)
engine:setTask(main.postPlugin.tasks)
e:addEvent("postEngine", engine, unit)
- io, unit = setupIODiagnose(que, proto_q, fYaml, tid)
- e:addEvent("mainCollector", io, unit, true)
return e:proc()
end
diff --git a/source/tools/monitor/unity/beeQ/pack.sh b/source/tools/monitor/unity/beeQ/pack.sh
index 57f6e8a1520cad32a0c34bc88569b532f7821714..42eea82828dc5695dbb2d61501afce042e1fb55f 100755
--- a/source/tools/monitor/unity/beeQ/pack.sh
+++ b/source/tools/monitor/unity/beeQ/pack.sh
@@ -43,13 +43,17 @@ mkdir ${APP}/collector
mkdir ${APP}/collector/native
mkdir ${APP}/collector/guard
mkdir ${APP}/collector/outline
+mkdir ${APP}/collector/postPlugin
mkdir ${APP}/collector/postEngine
+mkdir ${APP}/collector/io
cp collector/native/*.so* ${APP}/collector/native/
cp collector/native/*.lua ${APP}/collector/native/
cp collector/*.lua ${APP}/collector/
cp collector/guard/*.lua ${APP}/collector/guard
cp collector/outline/*.lua ${APP}/collector/outline
+cp collector/postPlugin/*.lua ${APP}/collector/postPlugin
cp collector/postEngine/*.lua ${APP}/collector/postEngine
+cp collector/io/*.lua ${APP}/collector/io
cp collector/plugin.yaml ${APP}/collector/
mkdir ${APP}/common
diff --git a/source/tools/monitor/unity/beeQ/proto_queue.lua b/source/tools/monitor/unity/beeQ/proto_queue.lua
index ecece0f1ab04fbcc11a73fb2cec5fb63f4cfdceb..91f296493e0e8cb4f78da573cc2d95b3a15c8efb 100644
--- a/source/tools/monitor/unity/beeQ/proto_queue.lua
+++ b/source/tools/monitor/unity/beeQ/proto_queue.lua
@@ -8,6 +8,8 @@ require("common.class")
local CprotoData = require("common.protoData")
local CprotoQueue = class("loop")
+local cjson = require("cjson.safe")
+local json = cjson.new()
local system = require("common.system")
function CprotoQueue:_init_(que)
@@ -77,7 +79,6 @@ function CprotoQueue:_proc(unity_lines, lines)
end
function CprotoQueue:send(num, pline)
- --print(string.format("proto que send a %d message.", num))
local unity_lines = self._ffi.new("struct unity_lines")
local lines = self._proto:protoTable()
unity_lines.num = num
diff --git a/source/tools/monitor/unity/beeQ/pushTo.lua b/source/tools/monitor/unity/beeQ/pushTo.lua
index 9daecebb58c510fc3bb939cdd3af90497554601f..bf301ffedc8bbb7ed1e97d450ef577e55268960c 100644
--- a/source/tools/monitor/unity/beeQ/pushTo.lua
+++ b/source/tools/monitor/unity/beeQ/pushTo.lua
@@ -8,14 +8,10 @@ package.path = package.path .. ";../?.lua;"
local coCli = require("httplib.coCli")
local coInflux = require("httplib.coInflux")
-local unistd = require("posix.unistd")
-local system = require("common.system")
function work(fd, fYaml)
- local res = system:parseYaml(fYaml)
- local pushTo = res.pushTo
local frame = coCli.new(fd)
- local c = coInflux.new(pushTo.host, pushTo.port, pushTo.url)
+ local c = coInflux.new(fYaml)
frame:poll(c)
print("end push.")
return 0
diff --git a/source/tools/monitor/unity/beeQ/rbtree/rbEvent.lua b/source/tools/monitor/unity/beeQ/rbtree/rbEvent.lua
index bb904d01a6a556c8d42e460d9ebb178e847d613a..c9b45317037d83002ad251fdc545cf29d1021f8d 100644
--- a/source/tools/monitor/unity/beeQ/rbtree/rbEvent.lua
+++ b/source/tools/monitor/unity/beeQ/rbtree/rbEvent.lua
@@ -60,8 +60,7 @@ function CrbEvent:addEvent(name, obj, period, delay, loop)
end
function CrbEvent:_proc(now, node)
- --print(now .. ": node " .. node.name .. " work")
- local ret
+ local ret = -1
if node.obj.work then
ret = node.obj:work(node.period, self)
end
diff --git a/source/tools/monitor/unity/beeQ/run.sh b/source/tools/monitor/unity/beeQ/run.sh
index 9c23dcdd510933a747a26ef07e111ea320f80d48..23bf470225248e97a88bd1c2036a6b5b92cd68e2 100755
--- a/source/tools/monitor/unity/beeQ/run.sh
+++ b/source/tools/monitor/unity/beeQ/run.sh
@@ -7,7 +7,7 @@ export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:../beaver/
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:../../install/
export LUA_PATH="../../lua/?.lua;../../lua/?/init.lua;"
-export LUA_CPATH="../../lib/?.so;../../lib/loadall.so;"
+export LUA_CPATH="./lib/?.so;../../lib/?.so;../../lib/loadall.so;"
yaml_path=$1
[ ! $yaml_path ] && yaml_path="/etc/sysak/plugin.yaml"
diff --git a/source/tools/monitor/unity/collector/execEngine/forkRun.lua b/source/tools/monitor/unity/collector/execEngine/forkRun.lua
new file mode 100644
index 0000000000000000000000000000000000000000..5ca34e9d87f0bdff7279fe2a596eda475e1b9658
--- /dev/null
+++ b/source/tools/monitor/unity/collector/execEngine/forkRun.lua
@@ -0,0 +1,73 @@
+---
+--- Generated by EmmyLua(https://github.com/EmmyLua)
+--- Created by liaozhaoyan.
+--- DateTime: 2023/4/22 01:34
+---
+
+---
+--- Generated by EmmyLua(https://github.com/EmmyLua)
+--- Created by liaozhaoyan.
+--- DateTime: 2023/4/12 00:01
+---
+
+require("common.class")
+local unistd = require("posix.unistd")
+local pwait = require("posix.sys.wait")
+local signal = require("posix.signal")
+local pystring = require("common.pystring")
+local system = require("common.system")
+local CvProc = require("collector.vproc")
+
+local CforkRun = class("execBase", CvProc)
+
+local function run(cmd, args)
+ local pid, err = unistd.fork()
+ if pid > 0 then -- for self
+ return pid
+ elseif pid == 0 then -- for child
+ local errno
+ prctl_death_kill()
+ _, err, errno = unistd.exec(cmd, args)
+ assert(not errno, "exec failed." .. err .. errno)
+ else
+ error("fork report" .. err)
+ end
+end
+
+local function kill(pid)
+ signal.kill(pid, signal.SIGKILL) -- force to kill task
+ pwait.wait(pid) -- wait task
+end
+
+function CforkRun:_init_(opts, proto, pffi, mnt)
+ CvProc._init_(self, proto, pffi, mnt)
+ self._pid = run(opts.cmd, opts.args)
+ self._opts = opts
+
+ print("fork run: ", self._pid)
+end
+
+function CforkRun:_del_()
+ if self._pid then
+ kill(self._pid)
+ end
+ print("kill " .. self._pid)
+end
+
+function CforkRun:proc(elapsed, lines)
+ local pid, stat, exit = pwait.wait(self._pid, pwait.WNOHANG)
+ if pid == nil then
+ error("wait failed " .. stat .. exit)
+ elseif exit then
+ print("mon thread exit " .. self._pid)
+ self._pid = nil
+ return -1
+ end
+end
+
+local function kill(pid)
+ signal.kill(pid, signal.SIGKILL) -- force to kill task
+ pwait.wait(pid) -- wait task
+end
+
+return CforkRun
diff --git a/source/tools/monitor/unity/collector/guard/calcJiffies.lua b/source/tools/monitor/unity/collector/guard/calcJiffies.lua
index 5e12adfa2adec0af349f0cb8234db025fc409da7..922f345bcc7ab5c9d6ee6f6bef882741e0a2d40d 100644
--- a/source/tools/monitor/unity/collector/guard/calcJiffies.lua
+++ b/source/tools/monitor/unity/collector/guard/calcJiffies.lua
@@ -56,7 +56,7 @@ function mod.calc(mnt, procffi)
local comp = delta1 / delta2
if comp >= 1.1 or comp < 0.9 then
- errno("calculate jiffies failed.")
+ error("calculate jiffies failed.")
end
return (delta1 + delta2) * 2.5 / nproc()
diff --git a/source/tools/monitor/unity/collector/guard/guardSched.lua b/source/tools/monitor/unity/collector/guard/guardSched.lua
index b8a73fd6cf6ec79816072a33268ec750f85d99e9..cccbc15eeb781db1a1f2aa93c67faaa3526fb733 100644
--- a/source/tools/monitor/unity/collector/guard/guardSched.lua
+++ b/source/tools/monitor/unity/collector/guard/guardSched.lua
@@ -24,9 +24,16 @@ function CguardSched:proc(t, lines)
local start = lua_local_clock() -- unit us
local stop = 0
local j1 = self._stat:jiffies()
+ local ret
for i, obj in ipairs(self._procs) do
- obj:proc(t, lines)
+ ret = obj:proc(t, lines)
+
+ if ret == -1 then
+ table.insert(toRemove, i)
+ goto continue
+ end
+
stop = lua_local_clock()
if stop - start >= self._limit then --
local j2 = self._stat:jiffies()
@@ -35,6 +42,8 @@ function CguardSched:proc(t, lines)
end
end
start = stop
+
+ ::continue::
end
if #toRemove > 0 then
diff --git a/source/tools/monitor/unity/collector/guard/guardSelfStat.lua b/source/tools/monitor/unity/collector/guard/guardSelfStat.lua
index 0564b2749c54bcaf7da7b52df93e8724b050bf9a..bbdc5e3ddab8e7f3a96b2693358dd810590c4e5c 100644
--- a/source/tools/monitor/unity/collector/guard/guardSelfStat.lua
+++ b/source/tools/monitor/unity/collector/guard/guardSelfStat.lua
@@ -34,6 +34,21 @@ function CguardSelfStat:_init_(proto, pffi, mnt, resYaml, jperiod)
self._memLimit = resYaml.config.limit.mem * 1024 * 1024
end
+local function rssRssAnon()
+ local anon = 0
+
+ local f = io.open("/proc/self/status")
+ for line in f:lines() do
+ if pystring:startswith(line, "RssAnon:") then
+ local res = pystring:split(line)
+ anon = tonumber(res[2]) * 1024
+ end
+ end
+ f:close()
+
+ return anon
+end
+
function CguardSelfStat:proc(elapsed, lines)
CvProc.proc(self)
@@ -46,7 +61,9 @@ function CguardSelfStat:proc(elapsed, lines)
print("last cpu usage overflow." .. cpus)
os.exit(1)
end
- if rss * 4096 > self._memLimit then
+
+ local anon = rssRssAnon()
+ if anon > self._memLimit then
print("last mem usage overflow." .. rss)
os.exit(1)
end
@@ -62,12 +79,15 @@ function CguardSelfStat:proc(elapsed, lines)
{
name = "vsize",
value = vsize
- }
- ,
+ },
{
name = "rss",
value = rss * 4096
- }
+ },
+ {
+ name = "anon",
+ value = anon,
+ },
}
self:appendLine(self:_packProto("self_stat", nil, vs))
self:push(lines)
diff --git a/source/tools/monitor/unity/collector/io/diskFifo.lua b/source/tools/monitor/unity/collector/io/diskFifo.lua
new file mode 100644
index 0000000000000000000000000000000000000000..ac57e6edcb9e70f3d9d8fcdd6b89f558b725f591
--- /dev/null
+++ b/source/tools/monitor/unity/collector/io/diskFifo.lua
@@ -0,0 +1,75 @@
+---
+--- Generated by EmmyLua(https://github.com/EmmyLua)
+--- Created by liaozhaoyan.
+--- DateTime: 2023/4/12 15:51
+---
+
+require("common.class")
+local system = require("common.system")
+local Cfifo = require("common.fifo")
+
+local CdiskFifo = class("diskFifo", Cfifo)
+
+function CdiskFifo:_init_(maxLen)
+ Cfifo._init_(self, maxLen)
+ self._nr = 0
+end
+
+function CdiskFifo:push(v)
+ Cfifo.push(self, v)
+ self._nr = self._nr + 1
+end
+
+function CdiskFifo:iowait()
+ local sum = 0
+ local value
+ local cells = {}
+ local len = self:len()
+ local c = 0
+
+ if len < self:capacity() then
+ return
+ end
+
+ for _, v in pairs(self.list) do
+ c = c + 1
+ value = v.iowait
+ cells[c], sum = value, sum + value
+ end
+ return {max = math.max(unpack(cells)),
+ min = math.min(unpack(cells)),
+ nr = self._nr,
+ avg = sum / len,
+ last = value}
+end
+
+function CdiskFifo:values(disk, key)
+ local c = 0
+ local sum = 0
+ local value
+ local cells = {}
+ local len = self:len()
+
+ if len < self:capacity() then
+ return
+ end
+
+ local d
+ for _, v in pairs(self.list) do
+ d = v[disk]
+ if d then
+ value = d[key]
+ c = c + 1
+ cells[c], sum = value, sum + value
+ else -- not full
+ return
+ end
+ end
+ return {max = math.max(unpack(cells)),
+ min = math.min(unpack(cells)),
+ nr = self._nr,
+ avg = sum / len,
+ last = value}
+end
+
+return CdiskFifo
diff --git a/source/tools/monitor/unity/collector/io/exceptCheck.lua b/source/tools/monitor/unity/collector/io/exceptCheck.lua
new file mode 100644
index 0000000000000000000000000000000000000000..8947e0bd4bf5eb8ee928aed55f1fcab9bc7eff3b
--- /dev/null
+++ b/source/tools/monitor/unity/collector/io/exceptCheck.lua
@@ -0,0 +1,252 @@
+---
+--- Generated by EmmyLua(https://github.com/EmmyLua)
+--- Created by liaozhaoyan.
+--- DateTime: 2023/4/12 16:26
+---
+
+require("common.class")
+local system = require("common.system")
+local CdiskFifo = require("collector.io.diskFifo")
+local CexecptCheck = class("CexecptCheck")
+
+-- [[ v format
+-- iowait: 1
+-- disk_a: {
+-- iops = 1,
+-- bps = 2,
+-- qusize = 3,
+-- await = 4,
+-- util = 5,
+-- }
+-- ]]
+
+local fifoSize = 60
+local keys = {"iops", "bps", "qusize", "await", "util"}
+
+local function addItem()
+ return {
+ baseThresh = {
+ nrSample = 0,
+ curWinMinVal = 1e9,
+ curWinMaxVal = 0,
+ moveAvg = 0,
+ last = 0,
+ thresh = 0
+ },
+ compensation = {
+ thresh = 0,
+ shouldUpdThreshComp = true,
+ decRangeThreshAvg = 0,
+ decRangeCnt = 0,
+ minStableThresh = 1e9,
+ maxStableThresh = 0,
+ stableThreshAvg = 0,
+ nrStableThreshSample = 0,
+ },
+ dynTresh = 1e9,
+ usedWin = 0,
+ }
+end
+
+local function addItems()
+ local ret = {}
+ for _, key in ipairs(keys) do
+ ret[key] = addItem()
+ end
+ return ret
+end
+
+function CexecptCheck:_init_(diag)
+ self._diag = diag
+
+ self._fifo = CdiskFifo.new(fifoSize)
+ self._waitItem = addItem()
+ self._diskItem = {}
+
+ self._cpuStatIowait = {sum = 0, iowait = 0}
+ self._uploadInter = 0
+ self._exceptionStat = {system = {['IOwait-High'] = {cur = 0, max = 0}}}
+ self._dataStat = {system = {iowait= 0}}
+
+ self._diagSwitch = {
+ diagIowait = { sw = self._diag.cfg.diagIowait,
+ esi = 'IOwait-High'},
+ diagIoburst = { sw = self._diag.cfg.diagIoburst,
+ esi ='IO-Delay'},
+ diagIolat = {sw = self._diag.cfg.diagIolat,
+ esi = 'IO-Burst'},
+ diagIohang = {sw = self._diag.cfg.diagIohang,
+ esi = 'IO-Hang'}
+ }
+end
+
+local function calcBase(item, vs)
+ local bt = item.baseThresh
+
+ local min, max, avg, nr = vs.min, vs.max, vs.avg, vs.nr
+
+ bt.nrSample = nr
+ bt.curWinMinVal = math.min(min, bt.curWinMinVal)
+ bt.curWinMaxVal = math.max(max, bt.curWinMaxVal)
+ bt.last = vs.last
+
+ local nrThreshSample = nr + 1 - fifoSize
+ local thresh = math.max(max - avg, avg - min)
+ local threshAvg = (bt.thresh * (nrThreshSample - 1) + thresh) / nrThreshSample
+ bt.thresh = threshAvg
+ bt.moveAvg = avg
+
+ local usedWin = item.usedWin
+ usedWin = usedWin + 1
+ if usedWin >= fifoSize then
+ bt.curWinMinVal = 1e9
+ bt.curWinMaxVal = 0
+ item.usedWin = 0
+ else
+ item.usedWin = usedWin
+ end
+ return thresh
+end
+
+local function calcStableThresh(ct, curBaseThresh, curThresh)
+ local avg = ct.decRangeThreshAvg
+
+ if (curThresh - avg) < ((curBaseThresh - avg) / 10.0) then
+ local nrStableThreshSample = ct.nrStableThreshSample
+
+ local tSum = ct.stableThreshAvg * ct.nrStableThreshSample + curThresh
+ nrStableThreshSample = nrStableThreshSample + 1
+
+ ct.nrStableThreshSample = nrStableThreshSample
+ ct.stableThreshAvg = tSum / nrStableThreshSample
+ ct.minStableThresh = math.min(ct.minStableThresh, curThresh)
+ ct.maxStableThresh = math.max(ct.maxStableThresh, curThresh)
+
+ if nrStableThreshSample > fifoSize * 1.5 then
+ ct.thresh = math.max(ct.stableThreshAvg - ct.minStableThresh,
+ ct.maxStableThresh - ct.stableThreshAvg)
+ ct.shouldUpdThreshComp = false
+ ct.minStableThresh = 1e9
+ ct.maxStableThresh = 0
+ ct.stableThreshAvg, ct.decRangeThreshAvg = 0, 0
+ ct.nrStableThreshSample, ct.decRangeCnt = 0, 0
+ end
+ end
+end
+
+local function calcCompThresh(item, lastBaseThresh, curThresh)
+ local curBaseThresh = item.baseThresh.thresh
+ local ct = item.compensation
+
+ if ct.shouldUpdThreshComp and (ct.thresh < curThresh or item.usedWin == 0) then
+ ct.thresh = curThresh
+ end
+
+ if curBaseThresh < lastBaseThresh then
+ local decRangeCnt = ct.decRangeCnt
+ local decRangeThreshAvg = ct.decRangeThreshAvg
+ local tSum = decRangeThreshAvg * decRangeCnt + curThresh
+
+ decRangeCnt = decRangeCnt + 1
+ ct.decRangeThreshAvg = tSum / decRangeCnt
+ ct.decRangeCnt = decRangeCnt
+
+ if decRangeCnt >= fifoSize * 1.5 then
+ calcStableThresh(ct, curBaseThresh, curThresh)
+ else
+ ct.minStableThresh = 1e9
+ ct.maxStableThresh = 0
+ ct.stableThreshAvg, ct.decRangeThreshAvg = 0, 0
+ ct.nrStableThreshSample, ct.decRangeCnt = 0, 0
+ end
+ end
+end
+
+local function updateDynThresh(item, vs)
+ local bt = item.baseThresh
+ local ct = item.compensation
+ local lastBaseThresh = bt.thresh
+ local curThresh = calcBase(item, vs)
+
+ calcCompThresh(item, lastBaseThresh, curThresh)
+ item.dynTresh = bt.thresh + bt.moveAvg + ct.thresh
+ --print("thresh: ", item.dynTresh)
+end
+
+function CexecptCheck:calcs()
+ local iowaits = self._fifo:iowait()
+ if iowaits then
+ updateDynThresh(self._waitItem, iowaits)
+ self:checkIOwaitException(self._waitItem)
+ end
+
+ local vs
+ for disk, item in pairs(self._diskItem) do
+ for _, key in ipairs(keys) do
+ vs = self._fifo:values(disk, key)
+ if vs then
+ updateDynThresh(item[key], vs)
+ end
+ end
+ end
+end
+
+function CexecptCheck:addValue(v)
+ for disk, _ in pairs(v) do -- new iowait names one disk
+ if disk ~= "iowait" then
+ if not system:keyIsIn(self._diskItem, disk) then
+ self._diskItem[disk] = addItems()
+ end
+ end
+ end
+ for disk, _ in pairs(self._diskItem) do -- del
+ if not system:keyIsIn(v, disk) then
+ self._diskItem[disk] = nil
+ end
+ end
+ self._fifo:push(v)
+ self:calcs()
+end
+
+local function disableThreshComp(item)
+ local ct = item.compensation
+ local bt = item.baseThresh
+ if ct.shouldUpdThreshComp then
+ ct.shouldUpdThreshComp = false
+ item.dynTresh = bt.thresh + bt.moveAvg
+ ct.thresh = 0.000001
+ end
+end
+
+function CexecptCheck:checkIOwaitException(item)
+ local iowait = item.baseThresh.last
+ local dataStat = self._dataStat.system
+ local es = self._exceptionStat.system['IOwait-High']
+ local uploadInter = self._uploadInter
+
+ if iowait >= self._diag.cfg.iowait then
+ disableThreshComp(item)
+ end
+
+ dataStat.iowait = (dataStat.iowait * (uploadInter - 1) + iowait) / uploadInter
+
+ local minThresh = self._diag.cfg.iowait
+ local iowaitThresh = math.max(item.dynTresh, minThresh)
+ if minThresh >= iowaitThresh then
+ es.cur = es.cur + 1
+ local diagSW = self._diagSwitch.diagIowait
+ local rDiagValid = nil
+ end
+end
+
+function CexecptCheck:checks()
+ local uploadInter = self._uploadInter
+
+ if uploadInter % fifoSize == 0 then
+ self._uploadInter = 1
+ else
+ self._uploadInter = uploadInter + 1
+ end
+end
+
+return CexecptCheck
diff --git a/source/tools/monitor/unity/collector/io/io_diagnose.lua b/source/tools/monitor/unity/collector/io/io_diagnose.lua
index 7c3fc5526921dd68c37dea588ec7f50a1bc35b74..5725acd96944f7e49ed17d1ee73eddbfc70435bc 100644
--- a/source/tools/monitor/unity/collector/io/io_diagnose.lua
+++ b/source/tools/monitor/unity/collector/io/io_diagnose.lua
@@ -11,13 +11,14 @@ local CioDiagnose = class("ioDiagnose", CvProto)
local system = require("common.system")
local procffi = require("collector.native.procffi")
local pystring = require("common.pystring")
+local CexceptCheck = require("collector.io.exceptCheck")
local unistd = require("posix.unistd")
-local function ioWait(fStat)
- local data = procffi["ffi"].new("var_long_t")
+local function readIoWait(fStat)
+ local data = procffi["ffi"].new("var_kvs_t")
local stat = io.open(fStat, "r")
local s = stat:read()
- assert(procffi["cffi"].var_input_long(procffi["ffi"].string(s), data) == 0)
+ assert(procffi["cffi"].var_input_kvs(procffi["ffi"].string(s), data) == 0)
stat:close()
local sum = 0
for i = 0, data.no do
@@ -36,7 +37,7 @@ local function distStat(fDisks)
local disks = {}
for line in io.lines(fDisks) do
local s = pystring:split(line, nil, 3)[4]
- local data = procffi["ffi"].new("var_kvs_t")
+ local data= procffi["ffi"].new("var_kvs_t")
assert(procffi["cffi"].var_input_kvs(procffi["ffi"].string(s), data) == 0)
local disk = procffi["ffi"].string(data.s)
disks[disk] = data
@@ -46,25 +47,32 @@ end
local function pickValue(value)
return {
- rws = tonumber(value[1] + value[5]),
- rwSecs = tonumber(value[3] + value[7]),
- qusize = tonumber(value[11]),
- rwTiks = tonumber(value[4] + value[8]),
+ iops = tonumber(value[0] + value[4]), -- 1 5
+ bps = tonumber(value[2] + value[6]) * 512, -- 3 7
+ qusize = tonumber(value[10]) / 1000.0, -- 11
+ await = tonumber(value[3] + value[7]), -- 4 8
+ util = tonumber(value[9]) / 10.0 -- 10
}
end
function CioDiagnose:_init_(que, proto_q, fYaml, tid)
CvProto._init_(self, CprotoData.new(que))
self._tid = tid
+
local res = system:parseYaml(fYaml)
- self.fStat = res.config.proc_path .. "proc/stat"
- self.fDiskStat = res.config.proc_path .. "proc/diskstats"
- self.dirSys = res.config.proc_path .. "sys/block/"
- self._lastCpuTotal, self._lastCpuIO = ioWait(self.fStat)
+ local proc_path = res.config.proc_path
+ self.fStat = proc_path .. "proc/stat"
+ self.fDiskStat = proc_path .. "proc/diskstats"
+ self.dirSys = proc_path .. "sys/block/"
+
+ self._lastCpuTotal, self._lastCpuIO = readIoWait(self.fStat)
self._disks = {}
self._disksLast = {}
self:readProc()
self:storeProc()
+
+ local diag = res.ioDiagnose
+ self._check = CexceptCheck.new(diag)
end
function CioDiagnose:readProc()
@@ -85,8 +93,8 @@ function CioDiagnose:storeProc()
self._disks = {}
end
-function CioDiagnose:diff(t)
- local res = {}
+function CioDiagnose:diff(t, iowait)
+ local res = {iowait = iowait}
for disk, value in pairs(self._disks) do
local vLast = self._disksLast[disk]
if vLast then
@@ -94,6 +102,9 @@ function CioDiagnose:diff(t)
for k, v in pairs(value) do
vs[k] = (v - vLast[k]) / t
end
+ if vs.iops > 0 then
+ vs.await = vs.await / vs.iops
+ end
res[disk] = vs
end
end
@@ -101,15 +112,14 @@ function CioDiagnose:diff(t)
end
function CioDiagnose:work(t)
- local cpuTotal, cpuIO = ioWait(self.fStat)
- --print( cpuTotal - self._lastCpuTotal, cpuIO - self._lastCpuIO)
+ local cpuTotal, cpuIO = readIoWait(self.fStat)
+ local iowait = (cpuIO - self._lastCpuIO) * 100 / (cpuTotal - self._lastCpuTotal) / t
self._lastCpuTotal, self._lastCpuIO = cpuTotal, cpuIO
self:readProc()
- local res = self:diff(t)
- --system:dumps(res)
- --system:dumps(self._disks)
- --system:dumps(self._disksLast)
+ self._check:checks()
+ local res = self:diff(t, iowait)
+ self._check:addValue(res)
self:storeProc()
end
diff --git a/source/tools/monitor/unity/collector/loop.lua b/source/tools/monitor/unity/collector/loop.lua
index 7ea663116fe66c98fe6c7de5070d4b7fd5da389d..933c87b8e2e3f1d0416d5af3242c3fcc957141ba 100644
--- a/source/tools/monitor/unity/collector/loop.lua
+++ b/source/tools/monitor/unity/collector/loop.lua
@@ -14,6 +14,7 @@ local CguardSched = require("collector.guard.guardSched")
local CguardDaemon = require("collector.guard.guardDaemon")
local CguardSelfStat = require("collector.guard.guardSelfStat")
local CpostPlugin = require("collector.postPlugin.postPlugin")
+local CforkRun = require("collector.execEngine.forkRun")
local Cloop = class("loop")
@@ -24,6 +25,7 @@ function Cloop:_init_(que, proto_q, fYaml, tid)
self._proto = CprotoData.new(que)
self._tid = tid
self:loadLuaPlugin(res, res.config.proc_path)
+ self:forkRun(res)
local jperiod = calcJiffies.calc(res.config.proc_path, procffi) --
self._guardSched = CguardSched.new(tid, self._procs, self._names, jperiod)
@@ -49,6 +51,16 @@ function Cloop:loadLuaPlugin(res, proc_path)
print("add " .. system:keyCount(self._procs) .. " lua plugin.")
end
+function Cloop:forkRun(res)
+ local runs = res.forkRun
+ local c = system:keyCount(self._procs)
+ for _, run in ipairs(runs) do
+ c = c + 1
+ self._procs[c] = CforkRun.new(run, self._proto, procffi)
+ self._names[c] = run.cmd
+ end
+end
+
function Cloop:work(t)
local lines = self._proto:protoTable()
diff --git a/source/tools/monitor/unity/collector/plugin.lua b/source/tools/monitor/unity/collector/plugin.lua
index 38cdb271f5f5bc94c660704cb0b842fd4b8d553b..a7590294b6dabcc8fbf9116616fe4d26f67559f4 100644
--- a/source/tools/monitor/unity/collector/plugin.lua
+++ b/source/tools/monitor/unity/collector/plugin.lua
@@ -7,6 +7,8 @@
require("common.class")
local Cplugin = class("plugin")
local dockerinfo = require("common.dockerinfo")
+local cjson = require("cjson.safe")
+local json = cjson.new()
function Cplugin:_init_(resYaml, ffi, proto_q, so)
self._ffi = ffi
diff --git a/source/tools/monitor/unity/collector/plugin.yaml b/source/tools/monitor/unity/collector/plugin.yaml
index 26be92f9a70ef6a7eca0a47f3de752ace2b16882..45fda75b9aa69c6fb8925b614f18532bf8367d08 100644
--- a/source/tools/monitor/unity/collector/plugin.yaml
+++ b/source/tools/monitor/unity/collector/plugin.yaml
@@ -21,14 +21,19 @@ config:
mem: 40 # unit mb
tasks: 10 # monitor 10 pid max.
+forkRun:
+ -
+ cmd: "/usr/bin/python"
+ args: ["../test/curl/forkRun.py"]
+
outline:
- - /tmp/sysom
+ - /var/sysom/outline
-#pushTo:
-# to: "Influx"
-# host: "ld-wz9d17b514mg6kjkx-proxy-tsdb.lindorm.rds.aliyuncs.com"
-# port: 8242
-# url: "/api/v2/write?db=lua"
+pushTo:
+ to: "Influx"
+ host: "ld-wz9d17b514mg6kjkx-proxy-tsdb.lindorm.rds.aliyuncs.com"
+ port: 8242
+ url: "/api/v2/write?db=lua"
container:
mode: "pods"
diff --git a/source/tools/monitor/unity/collector/plugin/Makefile b/source/tools/monitor/unity/collector/plugin/Makefile
index 7ec4fa48628198b70c585bbe835576490111d0a6..1523bc193c4af5ca438c5e5fef673ae90bbd3883 100644
--- a/source/tools/monitor/unity/collector/plugin/Makefile
+++ b/source/tools/monitor/unity/collector/plugin/Makefile
@@ -5,7 +5,7 @@ OBJS := proto_sender.o
LIB := libproto_sender.a
-DEPMOD=sample threads kmsg proc_schedstat proc_loadavg unity_nosched unity_irqoff cpudist cpu_bled net_health net_retrans netlink cpufreq gpuinfo pmu_events
+DEPMOD=sample threads kmsg proc_schedstat proc_loadavg unity_nosched unity_irqoff cpudist cpu_bled net_health net_retrans netlink cpufreq gpuinfo pmu_events virtout sum_retrans
all: $(LIB) $(DEPMOD)
diff --git a/source/tools/monitor/unity/collector/plugin/kmsg/kmsg.c b/source/tools/monitor/unity/collector/plugin/kmsg/kmsg.c
index e8b028e20bab5fc4e2e62ea8ded698ba115586ac..591159f9db65bad32b41761ca5ecac121e8adfb1 100644
--- a/source/tools/monitor/unity/collector/plugin/kmsg/kmsg.c
+++ b/source/tools/monitor/unity/collector/plugin/kmsg/kmsg.c
@@ -41,12 +41,12 @@ static int kmsg_thread_func(struct beeQ* q, void * arg) {
fd = open("/dev/kmsg", O_RDONLY | O_NONBLOCK);
if (fd < 0) {
goto endOpen;
- }
+ }/*
ret = lseek(fd, 0, SEEK_DATA);
if (ret < 0) {
perror("kmsg seek error");
goto endLseek;
- }
+ }*/
ret = 1; // strip old message.
while (ret > 0) {
@@ -79,6 +79,7 @@ static int kmsg_thread_func(struct beeQ* q, void * arg) {
perror("kmsg read2 failed.");
goto endRead;
}
+ buff[ret -1] = '\0';
unity_alloc_lines(lines, 1);
line = unity_get_line(lines, 0);
diff --git a/source/tools/monitor/unity/collector/plugin/sum_retrans/Makefile b/source/tools/monitor/unity/collector/plugin/sum_retrans/Makefile
new file mode 100644
index 0000000000000000000000000000000000000000..d0c3391a855a9e62347a513e73d253a4c8cced4f
--- /dev/null
+++ b/source/tools/monitor/unity/collector/plugin/sum_retrans/Makefile
@@ -0,0 +1,8 @@
+
+newdirs := $(shell find ./ -type d)
+
+bpfsrcs := sum_retrans.bpf.c
+csrcs := sum_retrans.c
+so := libsum_retrans.so
+
+include ../bpfso.mk
\ No newline at end of file
diff --git a/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.bpf.c b/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.bpf.c
new file mode 100644
index 0000000000000000000000000000000000000000..16ac99107678b8a6ad0b831e110093c6f25c2cbf
--- /dev/null
+++ b/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.bpf.c
@@ -0,0 +1,47 @@
+//
+// Created by 廖肇燕 on 2023/4/11.
+//
+#define BPF_NO_GLOBAL_DATA
+#include
+#include
+
+BPF_HASH(dips, u32, u64, 1024);
+BPF_HASH(inums, u32, u64, 1024);
+
+static inline u32 read_ns_inum(struct sock *sk)
+{
+ if (sk) {
+ return BPF_CORE_READ(sk, __sk_common.skc_net.net, ns.inum);
+ }
+ return 0;
+}
+
+static void inc_value(struct bpf_map* maps, u32 k) {
+ u64 *pv = bpf_map_lookup_elem(maps, &k);
+ if (pv) {
+ __sync_fetch_and_add(pv, 1);
+ } else {
+ u64 v = 1;
+ bpf_map_update_elem(maps, &k, &v, BPF_ANY);
+ }
+}
+
+struct tcp_retrans_args {
+ u64 pad;
+ u64 skb;
+ u64 sk;
+ u16 sport;
+ u16 dport;
+ u32 sip;
+ u32 dip;
+};
+SEC("tracepoint/tcp/tcp_retransmit_skb")
+int tcp_retransmit_skb_hook(struct tcp_retrans_args *args){
+ struct sock *sk = (struct sock *)args->sk;
+ u32 inum = read_ns_inum(sk);
+ u32 ip = args->dip;
+
+ inc_value((struct bpf_map *)&inums, inum);
+ inc_value((struct bpf_map *)&dips, ip);
+ return 0;
+}
diff --git a/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.c b/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.c
new file mode 100644
index 0000000000000000000000000000000000000000..ea11ba9900cfb9b7cd97d548b0eed8334fe558fc
--- /dev/null
+++ b/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.c
@@ -0,0 +1,110 @@
+//
+// Created by 廖肇燕 on 2023/4/11.
+//
+
+#include "sum_retrans.h"
+#include "../bpf_head.h"
+#include "sum_retrans.skel.h"
+
+#include
+#include
+
+#include
+#include
+#include
+
+
+DEFINE_SEKL_OBJECT(sum_retrans);
+static int inum_fd = 0;
+static int dip_fd = 0;
+int init(void *arg)
+{
+ int ret;
+ printf("sum_retrans plugin install.\n");
+ ret = LOAD_SKEL_OBJECT(sum_retrans, perf);
+ inum_fd = coobpf_map_find(sum_retrans->obj, "inums");
+ dip_fd = coobpf_map_find(sum_retrans->obj, "dips");
+ return ret;
+}
+
+#define LOG_MAX 4096
+static char log[LOG_MAX];
+
+static int transIP(unsigned long lip, char *result, int size) {
+ inet_ntop(AF_INET, (void *) &lip, result, size);
+ return 0;
+}
+
+static void pack_dip() {
+ char ips[32];
+ char buff[64];
+ unsigned long value;
+ unsigned int ip, ip_next;
+
+ log[0] = '\0';
+ ip = 0;
+ while (coobpf_key_next(dip_fd, &ip, &ip_next) == 0) {
+ bpf_map_lookup_elem(dip_fd, &ip_next, &value);
+ ip = ip_next;
+
+ transIP(ip, ips, 32);
+ snprintf(buff, 64, "%s:%ld,", ips, value);
+ strncat(log, buff, 4096);
+ ip = ip_next;
+ }
+
+ ip = 0;
+ while (coobpf_key_next(dip_fd, &ip, &ip_next) == 0) {
+ bpf_map_delete_elem(dip_fd, &ip_next);
+ ip = ip_next;
+ }
+}
+
+static void pack_inum() {
+ char buff[64];
+ unsigned long value;
+ unsigned int inum, inum_next;
+
+ log[0] = '\0';
+
+ inum = 0;
+ while (coobpf_key_next(inum_fd, &inum, &inum_next) == 0) {
+ bpf_map_lookup_elem(inum_fd, &inum_next, &value);
+ snprintf(buff, 64, "%u:%ld,", inum_next, value);
+ strncat(log, buff, 4096);
+ inum = inum_next;
+ }
+
+ inum = 0;
+ while (coobpf_key_next(inum_fd, &inum, &inum_next) == 0) {
+ bpf_map_delete_elem(inum_fd, &inum_next);
+ inum = inum_next;
+ }
+}
+
+int call(int t, struct unity_lines *lines)
+{
+ struct unity_line* line;
+
+ pack_dip();
+ if (strlen(log) > 0) {
+ unity_alloc_lines(lines, 2); // 预分配好
+
+ line = unity_get_line(lines, 0);
+ unity_set_table(line, "retrans_dip");
+ unity_set_log(line, "ip_log", log);
+
+ pack_inum();
+ line = unity_get_line(lines, 1);
+ unity_set_table(line, "retrans_inum");
+ unity_set_log(line, "inum_log", log);
+ }
+
+ return 0;
+}
+
+void deinit(void)
+{
+ printf("sum_retrans plugin uninstall.\n");
+ DESTORY_SKEL_BOJECT(sum_retrans);
+}
diff --git a/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.h b/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.h
new file mode 100644
index 0000000000000000000000000000000000000000..4faecd9a3ec06c061b25523644c3dba9963a836c
--- /dev/null
+++ b/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.h
@@ -0,0 +1,8 @@
+//
+// Created by 廖肇燕 on 2023/4/11.
+//
+
+#ifndef UNITY_SUM_RETRANS_H
+#define UNITY_SUM_RETRANS_H
+
+#endif //UNITY_SUM_RETRANS_H
diff --git a/source/tools/monitor/unity/collector/plugin/virtout/Makefile b/source/tools/monitor/unity/collector/plugin/virtout/Makefile
new file mode 100644
index 0000000000000000000000000000000000000000..f0e8a47f3f89b8f918ce8aaf7422838616af5cc6
--- /dev/null
+++ b/source/tools/monitor/unity/collector/plugin/virtout/Makefile
@@ -0,0 +1,8 @@
+
+newdirs := $(shell find ./ -type d)
+
+bpfsrcs := virtout.bpf.c
+csrcs := virtout.c
+so := libvirtout.so
+
+include ../bpfso.mk
\ No newline at end of file
diff --git a/source/tools/monitor/unity/collector/plugin/virtout/virtout.bpf.c b/source/tools/monitor/unity/collector/plugin/virtout/virtout.bpf.c
new file mode 100644
index 0000000000000000000000000000000000000000..25d3e612edc1773a9d26307bad4f1e5fcee3ee84
--- /dev/null
+++ b/source/tools/monitor/unity/collector/plugin/virtout/virtout.bpf.c
@@ -0,0 +1,69 @@
+//
+// Created by 廖肇燕 on 2023/2/23.
+//
+#define BPF_NO_GLOBAL_DATA
+
+#include
+#include
+#include "virtout.h"
+
+#define MAX_ENTRY 128
+#define BPF_F_FAST_STACK_CMP (1ULL << 9)
+#define KERN_STACKID_FLAGS (0 | BPF_F_FAST_STACK_CMP)
+
+#define PERIOD_TIME (10 * 1000 * 1000ULL)
+#define THRESHOLD_TIME (200 * 1000 * 1000ULL)
+
+BPF_ARRAY(virtdist, u64, 20);
+BPF_PERF_OUTPUT(perf, 1024);
+BPF_STACK_TRACE(stack, MAX_ENTRY);
+BPF_PERCPU_ARRAY(perRec, u64, 2);
+
+static inline u64 get_last(int index) {
+ u64 *pv = bpf_map_lookup_elem(&perRec, &index);
+ if (pv) {
+ return *pv;
+ }
+ return 0;
+}
+
+static inline void save_last(int index, u64 t) {
+ bpf_map_update_elem(&perRec, &index, &t, BPF_ANY);
+}
+
+static inline void check_time(struct bpf_perf_event_data *ctx,
+ int index,
+ struct bpf_map* event) {
+ u64 t = ns();
+ u64 last = get_last(index);
+ s64 delta;
+
+ save_last(index, t);
+ delta = t - last;
+
+ if (last && delta > 2 * PERIOD_TIME) {
+ delta -= PERIOD_TIME;
+ bpf_printk("delta %llu.\n", delta);
+ hist10_push((struct bpf_map_def *)&virtdist, delta / PERIOD_TIME);
+ if (delta >= THRESHOLD_TIME) {
+ struct task_struct* task = (struct task_struct *)bpf_get_current_task();
+ struct data_t data = {};
+
+ data.pid = pid();
+ data.stack_id = bpf_get_stackid(ctx, &stack, KERN_STACKID_FLAGS);
+ data.delta = delta;
+ data.cpu = cpu();
+ bpf_get_current_comm(&data.comm, TASK_COMM_LEN);
+ bpf_printk("delta %llu, pid:%d.\n", delta, data.pid);
+
+ bpf_perf_event_output(ctx, event, BPF_F_CURRENT_CPU, &data, sizeof(data));
+ }
+ }
+}
+
+SEC("perf_event")
+int sw_clock(struct bpf_perf_event_data *ctx)
+{
+ check_time(ctx, 1, (struct bpf_map *)&perf);
+ return 0;
+}
diff --git a/source/tools/monitor/unity/collector/plugin/virtout/virtout.c b/source/tools/monitor/unity/collector/plugin/virtout/virtout.c
new file mode 100644
index 0000000000000000000000000000000000000000..e3b3270223100de5758dcdd4ab1a800a0e9fd2d5
--- /dev/null
+++ b/source/tools/monitor/unity/collector/plugin/virtout/virtout.c
@@ -0,0 +1,229 @@
+//
+// Created by 廖肇燕 on 2023/2/23.
+//
+
+#include
+#define COOLBPF_PERF_THREAD
+#include "../bpf_head.h"
+#include "virtout.h"
+#include "virtout.skel.h"
+
+#include
+#include
+#include
+#include
+#include
+
+
+#define CPU_DIST_INDEX 4
+#define DIST_ARRAY_SIZE 20
+
+static volatile int budget = 0; // for log budget
+static int dist_fd = 0;
+static int stack_fd = 0;
+static int perf_fds[1024];
+
+int proc(int stack_fd, struct data_t *e, struct unity_line *line);
+void handle_event(void *ctx, int cpu, void *data, __u32 data_sz)
+{
+ int ret;
+ if (budget > 0) {
+ struct data_t *e = (struct data_t *)data;
+ struct beeQ *q = (struct beeQ *)ctx;
+ struct unity_line *line;
+ struct unity_lines *lines = unity_new_lines();
+
+ unity_alloc_lines(lines, 1);
+ line = unity_get_line(lines, 0);
+ ret = proc(stack_fd, e, line);
+ if (ret >= 0) {
+ beeQ_send(q, lines);
+ }
+ budget --;
+ }
+}
+
+static void close_perf_fds(void) {
+ int i;
+ for (i = 0; i < 1024; i ++) {
+ if (perf_fds[i] > 0) {
+ close(perf_fds[i]);
+ perf_fds[i] = 0;
+ }
+ }
+}
+
+static long perf_event_open(struct perf_event_attr *hw_event, pid_t pid,
+ int cpu, int group_fd, int flags)
+{
+ int ret;
+
+ ret = syscall(__NR_perf_event_open, hw_event, pid, cpu,
+ group_fd, flags);
+ return ret;
+}
+
+DEFINE_SEKL_OBJECT(virtout);
+static struct bpf_prog_skeleton *search_progs(const char* func) {
+ struct bpf_object_skeleton *s;
+ int i;
+
+ s = virtout->skeleton;
+ for (i = 0; i < s->prog_cnt; i ++) {
+ if (strcmp(s->progs[i].name, func) == 0) {
+ return &(s->progs[i]);
+ }
+ }
+ return NULL;
+}
+
+static int setup_perf_events(const char* func) {
+ int nr_cpus = libbpf_num_possible_cpus();
+ int i;
+ struct bpf_link *link;
+ struct bpf_prog_skeleton *progs;
+
+ progs = search_progs(func);
+
+ struct perf_event_attr perf_attr = {
+ .type = PERF_TYPE_SOFTWARE,
+ .config = PERF_COUNT_SW_CPU_CLOCK,
+ .freq = 0,
+ .sample_period = 10 * 1000 * 1000,
+ };
+
+ for (i = 0; i < nr_cpus; i ++) {
+ perf_fds[i] = perf_event_open(&perf_attr, -1, i, -1, 0);
+ if (perf_fds[i] < 0) {
+ if (errno == ENODEV) {
+ printf("skip offline cpu id: %d\n", i);
+ continue;
+ } else {
+ perror("syscall failed.");
+ close_perf_fds();
+ return -1;
+ }
+ }
+
+ link = bpf_program__attach_perf_event(*(progs->prog), perf_fds[i]);
+ if (!link) {
+ perror("attach failed.");
+ close_perf_fds();
+ return -1;
+ }
+ *(progs->link) = link;
+ }
+ printf("setup virout OK.\n");
+ return 0;
+}
+
+int init(void *arg) {
+ int ret;
+ printf("virtout plugin install.\n");
+
+ ret = LOAD_SKEL_OBJECT(virtout, perf);
+ if (ret < 0) {
+ return ret;
+ }
+ ret = setup_perf_events("sw_clock");
+ if (ret < 0) {
+ return ret;
+ }
+ dist_fd = coobpf_map_find(virtout->obj, "virtdist");
+ stack_fd = coobpf_map_find(virtout->obj, "stack");
+ return ret;
+}
+
+static int get_dist(unsigned long *locals) {
+ int i = 0;
+ unsigned long value = 0;
+ int key, key_next;
+
+ key = 0;
+ while (coobpf_key_next(dist_fd, &key, &key_next) == 0) {
+ coobpf_key_value(dist_fd, &key_next, &value);
+ locals[i ++] = value;
+ if (i > DIST_ARRAY_SIZE) {
+ break;
+ }
+ key = key_next;
+ }
+ return i;
+}
+
+static int cal_dist(unsigned long* values) {
+ int i, j;
+ int size;
+ static unsigned long rec[DIST_ARRAY_SIZE] = {0};
+ unsigned long locals[DIST_ARRAY_SIZE];
+
+ size = get_dist(locals);
+ for (i = 0; i < CPU_DIST_INDEX - 1; i ++) {
+ values[i] = locals[i] - rec[i];
+ rec[i] = locals[i];
+ }
+ j = i;
+ values[j] = 0;
+ for (; i < size; i ++) {
+ values[j] += locals[i] - rec[i];
+ rec[i] = locals[i];
+ }
+ return 0;
+}
+
+int call(int t, struct unity_lines *lines) {
+ int i;
+ unsigned long values[CPU_DIST_INDEX];
+ const char *names[] = { "ms100", "s1", "s10", "so"};
+ struct unity_line* line;
+
+ budget = t;
+
+ unity_alloc_lines(lines, 1); // 预分配好
+ line = unity_get_line(lines, 0);
+ unity_set_table(line, "virtout_dist");
+
+ cal_dist(values);
+ for (i = 0; i < CPU_DIST_INDEX; i ++ ) {
+ unity_set_value(line, i, names[i], values[i]);
+ }
+ return 0;
+}
+
+
+void deinit(void)
+{
+ printf("virout plugin uninstall.\n");
+ close_perf_fds();
+ DESTORY_SKEL_BOJECT(virtout);
+}
+
+#define LOG_MAX 4096
+static char log[LOG_MAX];
+
+int proc(int stack_fd, struct data_t *e, struct unity_line *line) {
+ int i;
+ unsigned long addr[128];
+ int id = e->stack_id; //last stack
+ struct ksym_cell* cell;
+
+ snprintf(log, LOG_MAX, "task:%d(%s);cpu:%d;delayed:%ld;callstack:", e->pid, e->comm, e->cpu, e->delta);
+ coobpf_key_value(stack_fd, &id, &addr);
+
+ for (i = 0; i < 128; i ++) {
+ if (addr[i] > 0) {
+ cell = ksym_search(addr[i]);
+ if (cell != NULL) {
+ strncat(log, cell->func, LOG_MAX);
+ } else {
+ strncat(log, "!nil", LOG_MAX);
+ }
+ strncat(log, ",", LOG_MAX);
+ } else {
+ break;
+ }
+ }
+ unity_set_table(line, "virtout_log");
+ unity_set_log(line, "log", log);
+ return 0;
+}
diff --git a/source/tools/monitor/unity/collector/plugin/virtout/virtout.h b/source/tools/monitor/unity/collector/plugin/virtout/virtout.h
new file mode 100644
index 0000000000000000000000000000000000000000..a0acb6c82bb20347c6340ca0ead678968ec6b0ba
--- /dev/null
+++ b/source/tools/monitor/unity/collector/plugin/virtout/virtout.h
@@ -0,0 +1,18 @@
+//
+// Created by 廖肇燕 on 2023/4/10.
+//
+
+#ifndef UNITY_VIRTOUT_H
+#define UNITY_VIRTOUT_H
+
+#define TASK_COMM_LEN 16
+
+struct data_t {
+ int pid;
+ int cpu;
+ unsigned int stack_id;
+ unsigned long delta;
+ char comm[TASK_COMM_LEN];
+};
+
+#endif //UNITY_VIRTOUT_H
diff --git a/source/tools/monitor/unity/collector/postEngine/engine.lua b/source/tools/monitor/unity/collector/postEngine/engine.lua
index 644ad2af428b963a8d7fd129a9cc1c74874212ac..c3de1f829d4f61722998b98984129804924bd2c0 100644
--- a/source/tools/monitor/unity/collector/postEngine/engine.lua
+++ b/source/tools/monitor/unity/collector/postEngine/engine.lua
@@ -11,6 +11,7 @@ local CvProto = require("collector.vproto")
local pystring = require("common.pystring")
local system = require("common.system")
local cjson = require("cjson.safe")
+local CexecBase = require("collector.postEngine.execBase")
local Cengine = class("engine", CvProto)
@@ -25,13 +26,20 @@ function Cengine:setTask(taskMons)
self._task = taskMons
end
-function Cengine:pushTask(msgs)
+function Cengine:pushTask(e, msgs)
local events = pystring:split(msgs, '\n')
for _, msg in ipairs(events) do
print(msg)
local res = cjson.decode(msg)
- if res.cmd == "mon_pid" then
+ local cmd = res.cmd
+ if cmd == "mon_pid" then
self._task:add(res.pid, res.loop)
+ elseif cmd == "exec" then -- exec a cmd
+ local execCmd = res.exec
+ local args = res.args or {}
+ local second = res.second or 1
+ local exec = CexecBase.new(execCmd, args, second)
+ exec:addEvents(e)
end
end
end
@@ -45,7 +53,7 @@ function Cengine:proc(t, event, msgs)
local bytes = self._proto:encode(lines)
self._proto:que(bytes)
- self:pushTask(msgs)
+ self:pushTask(event, msgs)
end
function Cengine:work(t, event)
diff --git a/source/tools/monitor/unity/collector/postEngine/execBase.lua b/source/tools/monitor/unity/collector/postEngine/execBase.lua
new file mode 100644
index 0000000000000000000000000000000000000000..59c3ac40a98ecf989f5ef91a64ac5982d34ed3b2
--- /dev/null
+++ b/source/tools/monitor/unity/collector/postEngine/execBase.lua
@@ -0,0 +1,65 @@
+---
+--- Generated by EmmyLua(https://github.com/EmmyLua)
+--- Created by liaozhaoyan.
+--- DateTime: 2023/4/12 00:01
+---
+
+require("common.class")
+local unistd = require("posix.unistd")
+local pwait = require("posix.sys.wait")
+local signal = require("posix.signal")
+local pystring = require("common.pystring")
+local system = require("common.system")
+
+local CexecBase = class("execBase")
+local interval = 5 --- poll for every 5 second
+
+local function run(cmd, args)
+ local pid, err = unistd.fork()
+ if pid > 0 then -- for self
+ print("pid: " .. pid)
+ return pid
+ elseif pid == 0 then -- for child
+ local errno
+ prctl_death_kill()
+ _, err, errno = unistd.exec(cmd, args)
+ assert(not errno, "exec failed." .. err .. errno)
+ else
+ error("fork report" .. err)
+ end
+end
+
+function CexecBase:_init_(cmd, args, seconds)
+ self.cmd = cmd
+ self._cnt = 0
+ self._loop = seconds / interval
+
+ self._pid = run(cmd, args)
+end
+
+function CexecBase:addEvents(e)
+ e:addEvent(self.cmd, self, interval, true, self._loop)
+end
+
+local function kill(pid)
+ signal.kill(pid, signal.SIGKILL) -- force to kill task
+ pwait.wait(pid) -- wait task
+end
+
+function CexecBase:work()
+ local cnt = self._cnt
+ if cnt >= self._loop then
+ local pid, stat, exit = pwait.wait(self._pid, pwait.WNOHANG)
+ if pid == nil then
+ error("wait failed " .. stat .. exit)
+ end
+ if not exit then -- process not exit
+ print("force to kill " .. self._pid)
+ kill(self._pid)
+ end
+ return -1 -- delete from task list.
+ end
+ self._cnt = cnt + 1
+end
+
+return CexecBase
diff --git a/source/tools/monitor/unity/common/fifo.lua b/source/tools/monitor/unity/common/fifo.lua
index 352d4d2fab5e6a9e18eadf24ba8693751ab84af1..76dfd3075fd964f592251cbc80eb6d242c584208 100644
--- a/source/tools/monitor/unity/common/fifo.lua
+++ b/source/tools/monitor/unity/common/fifo.lua
@@ -46,6 +46,10 @@ function Cfifo:len()
return self._count
end
+function Cfifo:capacity()
+ return self._max
+end
+
function Cfifo:value(index)
index = index + self._head - 1
return self.list[index]
diff --git a/source/tools/monitor/unity/common/inotifies.lua b/source/tools/monitor/unity/common/inotifies.lua
index f9aaf7c94b4c84a84da321c1833e716fed47b344..d8b33c784f23322e43c0b1ef0c579179d9e9fe4c 100644
--- a/source/tools/monitor/unity/common/inotifies.lua
+++ b/source/tools/monitor/unity/common/inotifies.lua
@@ -9,7 +9,6 @@ require("common.class")
local Cinotifies = class("inotifies")
local dirent = require("posix.dirent")
local pstat = require("posix.sys.stat")
-local fcntl = require("posix.fcntl")
local system = require("common.system")
local inotify = require('inotify')
@@ -27,21 +26,6 @@ local function walk_dirs(path, dirs)
end
end
-local function fdNonBlocking(fd)
- local res
- local flag, err, errno = fcntl.fcntl(fd, fcntl.F_GETFL)
- if flag then
- res, err, errno = fcntl.fcntl(fd, fcntl.F_SETFL, bit.bor(flag, fcntl.O_NONBLOCK))
- if res then
- return 0
- else
- system:posixError("fcntl set failed", err, errno)
- end
- else
- system:posixError("fcntl get failed", err, errno)
- end
-end
-
local function mon_dirs(path)
local handle = inotify.init()
local ws = {}
@@ -60,7 +44,7 @@ local function mon_dirs(path)
end
end
- fdNonBlocking(handle:getfd())
+ system:fdNonBlocking(handle:getfd())
return handle, ws
end
diff --git a/source/tools/monitor/unity/common/lineParse.lua b/source/tools/monitor/unity/common/lineParse.lua
index 925fecddeb66432fafab6ec7d267ca50208b9b46..a70e0d35758dbd0fc55508750e37c5d3695ce511 100644
--- a/source/tools/monitor/unity/common/lineParse.lua
+++ b/source/tools/monitor/unity/common/lineParse.lua
@@ -10,6 +10,8 @@ local system = require("common.system")
local module = {}
local json = cjson.new()
+json.encode_escape_forward_slash(false)
+
local function parseLabel(sls, ls)
local lss = pystring:split(sls, ",")
for _, cell in ipairs(lss) do
diff --git a/source/tools/monitor/unity/common/ping.lua b/source/tools/monitor/unity/common/ping.lua
index 2d926ebb827391dad7182ba27d6a7a15eb0dd32d..1e9315a10d008ca46550da3eba50e75c86ca7741 100644
--- a/source/tools/monitor/unity/common/ping.lua
+++ b/source/tools/monitor/unity/common/ping.lua
@@ -7,7 +7,10 @@
require("common.class")
local psocket = require("posix.sys.socket")
+local ptime = require("posix.sys.time")
+local poll = require("posix.poll")
local unistd = require("posix.unistd")
+local system = require("common.system")
local Cping = class("ping")
function Cping:_init_(ip, dev)
@@ -20,6 +23,8 @@ function Cping:_init_(ip, dev)
local ok, err = psocket.setsockopt(fd, psocket.SOL_SOCKET, psocket.SO_BINDTODEVICE, dev)
assert(ok, err)
+ system:fdNonBlocking(fd)
+
self.fd = fd
end
@@ -29,16 +34,31 @@ function Cping:_del_()
end
end
+local function diff_us(t1, t0)
+ local sum1 = t1.tv_sec * 1e6 + t1.tv_usec
+ local sum0 = t0.tv_sec * 1e6 + t0.tv_usec
+ return sum1 - sum0
+end
+
function Cping:ping()
+ local fd = self.fd
local data = string.char(0x08, 0x00, 0x89, 0x98, 0x6e, 0x63, 0x00, 0x04, 0x00)
- local ok, err = psocket.sendto(self.fd, data, {family=psocket.AF_INET, addr=self.ip, port=0})
+ local t0 = ptime.gettimeofday()
+ local ok, err = psocket.sendto(fd, data, {family=psocket.AF_INET, addr=self.ip, port=0})
assert(ok, err)
- local data, sa = psocket.recvfrom(self.fd, 1024)
- assert(data, sa)
-
- if data then
- print('Received ICMP message from ' .. sa.addr)
+ local r = poll.rpoll(fd, 300)
+ if r == 0 then
+ print("receive over time.")
+ return -1
+ elseif r == 1 then
+ local data, sa = psocket.recvfrom(fd, 1024)
+ local t1 = ptime.gettimeofday()
+ assert(data, sa)
+ return diff_us(t1, t0)
+ else
+ print("closed.")
+ return -2
end
end
diff --git a/source/tools/monitor/unity/common/system.lua b/source/tools/monitor/unity/common/system.lua
index 4c0be4974c7e57f018fa9c419bdcc7a44e77b702..c43a7ffd075cb11c2ad4ae0ee33407460e7aca47 100644
--- a/source/tools/monitor/unity/common/system.lua
+++ b/source/tools/monitor/unity/common/system.lua
@@ -6,13 +6,30 @@
local socket = require("socket")
local serpent = require("common.serpent")
-
local system = {}
function system:sleep(t)
socket.select(nil, nil, t)
end
+function system:fdNonBlocking(fd)
+ local res
+ local fcntl = require("posix.fcntl")
+ local bit = require("bit")
+
+ local flag, err, errno = fcntl.fcntl(fd, fcntl.F_GETFL)
+ if flag then
+ res, err, errno = fcntl.fcntl(fd, fcntl.F_SETFL, bit.bor(flag, fcntl.O_NONBLOCK))
+ if res then
+ return 0
+ else
+ system:posixError("fcntl set failed", err, errno)
+ end
+ else
+ system:posixError("fcntl get failed", err, errno)
+ end
+end
+
function system:deepcopy(object)
local lookup_table = {}
local function _copy(object)
diff --git a/source/tools/monitor/unity/httplib/coHttpCli.lua b/source/tools/monitor/unity/httplib/coHttpCli.lua
index 67af16b2df45043ac5447b5293bba6d5899aa2b6..b8d40a734afcdfb41f8b9e6dcbed29083ce5ddad 100644
--- a/source/tools/monitor/unity/httplib/coHttpCli.lua
+++ b/source/tools/monitor/unity/httplib/coHttpCli.lua
@@ -113,11 +113,18 @@ local function setupSocket(host, port)
end
end
-function CcoHttpCli:_init_(host, port, url, persistent)
- self._host = host
- self._port = port or 80
- self._url = url or "/"
+function CcoHttpCli:_init_(fYaml, persistent)
+ local res = system:parseYaml(fYaml)
+ local pushTo = res.pushTo
+ self._host = pushTo.host
+ self._port = pushTo.port or 80
+ self._url = pushTo.url or "/"
self._persistent = persistent
+
+ local Cidentity = require("beaver.identity")
+ local inst = Cidentity.new(fYaml)
+ self._instance = inst:id()
+
self.status = enumStat.closed
end
@@ -144,6 +151,31 @@ function CcoHttpCli:trans(msgs, body, filter)
return ""
end
+function CcoHttpCli:addInstance(line) -- add instance id for line index.
+ local cells = line.ls
+ local hasInstance = false
+
+ if cells then
+ for _, cell in ipairs(cells) do
+ if cell.name == "instance" then
+ hasInstance = true
+ end
+ end
+ end
+
+ if not hasInstance then
+ local cell = {
+ name = "instance",
+ index = self._instance
+ }
+ if cells then
+ table.insert(cells, cell)
+ else
+ line.ls = {cell}
+ end
+ end
+end
+
function CcoHttpCli:pack(body)
local line = self:packCliHead('GET', self._url)
local head = {
@@ -395,7 +427,7 @@ function CcoHttpCli:closureRead(fd, maxLen)
return nil
end
else
- print(system:dump(e))
+ system:dumps(e)
end
return nil
end
@@ -482,7 +514,6 @@ function CcoHttpCli:work(cffi, efd)
self.online = os.time()
while true do
local body = coroutine.yield()
- --print("-- " .. type(msg))
self.status = enumStat.sending
local s = self:pack(body)
if not self:coWrite(cffi, efd, fd, s) then
@@ -497,9 +528,7 @@ function CcoHttpCli:work(cffi, efd)
else
goto failed
end
- if self._persistent then
- print("continue.")
- else
+ if not self._persistent then
goto failed
end
end
diff --git a/source/tools/monitor/unity/httplib/coInflux.lua b/source/tools/monitor/unity/httplib/coInflux.lua
index 002ac09ab0485b2f900d7ab5267536f6b356b328..6b00a4c930c8b53f66d0f86efb6d90213545ac9f 100644
--- a/source/tools/monitor/unity/httplib/coInflux.lua
+++ b/source/tools/monitor/unity/httplib/coInflux.lua
@@ -11,12 +11,14 @@ local lineParse = require("common.lineParse")
local CcoInflux = class("coInflux", CcoHttpCli)
-function CcoInflux:_init_(host, port, url)
- CcoHttpCli._init_(self, host, port, url)
+function CcoInflux:_init_(fYaml)
+ CcoHttpCli._init_(self, fYaml)
end
function CcoInflux:echo(tReq)
- print(tReq.code, tReq.data)
+ if tReq.code ~= "204" then
+ print(tReq.code, tReq.data)
+ end
end
function CcoInflux:trans(msgs, body, filter)
@@ -28,6 +30,7 @@ function CcoInflux:trans(msgs, body, filter)
lines = msgs.lines
for _, line in ipairs(lines) do
c = c + 1
+ self:addInstance(line)
bodies[c] = lineParse.packs(line)
end
@@ -40,7 +43,6 @@ function CcoInflux:trans(msgs, body, filter)
end
function CcoInflux:pack(body)
- print(#body)
local line = self:packCliHead('POST', self._url)
local head = {
Host = self._host,
diff --git a/source/tools/monitor/unity/httplib/httpApp.lua b/source/tools/monitor/unity/httplib/httpApp.lua
index 92b6cd278923a37db627a9406b99cf201ab08838..996792efbde2ccbd057012e05c185ebf672f5c5e 100644
--- a/source/tools/monitor/unity/httplib/httpApp.lua
+++ b/source/tools/monitor/unity/httplib/httpApp.lua
@@ -14,8 +14,9 @@ function ChttpApp:_init_(frame)
ChttpBase._init_(self)
end
-function ChttpApp:echo(tRet, keep)
- local stat = self:packStat(200)
+function ChttpApp:echo(tRet, keep, code)
+ code = code or 200
+ local stat = self:packStat(code)
local tHead = {
["Content-Type"] = "application/json",
["Connection"] = (keep and "keep-alive") or "close"
diff --git a/source/tools/monitor/unity/httplib/httpBase.lua b/source/tools/monitor/unity/httplib/httpBase.lua
index 3ec1ff4f4c09243a7b1965358e4816db35e307cf..0de7ba31a391a9b5508b605da98d70a1da72d070 100644
--- a/source/tools/monitor/unity/httplib/httpBase.lua
+++ b/source/tools/monitor/unity/httplib/httpBase.lua
@@ -25,7 +25,7 @@ function ChttpBase:_installRe(path, frame)
frame:registerRe(path, self)
end
-function ChttpBase:echo(tRet, keep)
+function ChttpBase:echo(tRet, keep, code)
error("ChttpBase:echo is a virtual function.")
end
@@ -48,8 +48,8 @@ end
function ChttpBase:call(tReq)
local keep = checkKeep(tReq)
- local tRet = self._urlCb[tReq.path](tReq)
- local res = self:echo(tRet, keep)
+ local tRet, code = self._urlCb[tReq.path](tReq)
+ local res = self:echo(tRet, keep, code)
return res, keep
end
diff --git a/source/tools/monitor/unity/httplib/httpCli.lua b/source/tools/monitor/unity/httplib/httpCli.lua
index c83888f7f7055f37379ea9ad3241f7e0db02a9af..3e0674125de0cb97a3813adeed6fca932170e900 100644
--- a/source/tools/monitor/unity/httplib/httpCli.lua
+++ b/source/tools/monitor/unity/httplib/httpCli.lua
@@ -61,4 +61,12 @@ function ChttpCli:postTable(Url, t)
return self:post(Url, req, headers)
end
+function ChttpCli:postLine(Url, line)
+ local headers = {
+ ["Content-Type"] = "text/plain",
+ ["Content-Length"] = #line,
+ }
+ return self:post(Url, line, headers)
+end
+
return ChttpCli
diff --git a/source/tools/monitor/unity/httplib/httpComm.lua b/source/tools/monitor/unity/httplib/httpComm.lua
index 46a51516106e92d5d7a39cff0c5c895d102927b1..5986a65b08080d54cbc57aa9eb4bb41b5ce9d90b 100644
--- a/source/tools/monitor/unity/httplib/httpComm.lua
+++ b/source/tools/monitor/unity/httplib/httpComm.lua
@@ -15,6 +15,8 @@ local ChttpComm = class("httplib.httpComm")
local cjson = require("cjson.safe")
local json = cjson.new()
+json.encode_escape_forward_slash(false)
+
local function codeTable()
return {
[100] = "Continue",
diff --git a/source/tools/monitor/unity/httplib/httpHtml.lua b/source/tools/monitor/unity/httplib/httpHtml.lua
index 9eca219df672a51b9ade78a9e3fddbb058a3c2cc..3d63de08415c7773bb29906aeb9673ff83fbbb67 100644
--- a/source/tools/monitor/unity/httplib/httpHtml.lua
+++ b/source/tools/monitor/unity/httplib/httpHtml.lua
@@ -100,8 +100,9 @@ local function htmlPack(title, content)
return pystring:join("", bodies)
end
-function ChttpHtml:pack(cType, keep, body)
- local stat = self:packStat(200)
+function ChttpHtml:pack(cType, keep, body, code)
+ code = code or 200
+ local stat = self:packStat(code)
local tHead = {
["Content-Type"] = cType,
["Connection"] = (keep and "keep-alive") or "close"
@@ -111,11 +112,11 @@ function ChttpHtml:pack(cType, keep, body)
return pystring:join("\r\n", tHttp)
end
-function ChttpHtml:echo(tRet, keep)
+function ChttpHtml:echo(tRet, keep, code)
local cType = tRet.type or "text/html"
local body = htmlPack(tRet.title, tRet.content)
- return self:pack(cType, keep, body)
+ return self:pack(cType, keep, body, code)
end
return ChttpHtml
diff --git a/source/tools/monitor/unity/httplib/httpPlain.lua b/source/tools/monitor/unity/httplib/httpPlain.lua
index f0999d89fec522e292232187489fd8781bab66e6..fc0f075e66d1e03a2b6f805cbd38babd3aaffe19 100644
--- a/source/tools/monitor/unity/httplib/httpPlain.lua
+++ b/source/tools/monitor/unity/httplib/httpPlain.lua
@@ -14,8 +14,9 @@ function ChttpPlain:_init_(frame)
ChttpBase._init_(self)
end
-function ChttpPlain:echo(tRet, keep)
- local stat = self:packStat(200)
+function ChttpPlain:echo(tRet, keep, code)
+ code = code or 200
+ local stat = self:packStat(code)
local tHead = {
["Content-Type"] = "text/plain",
["Connection"] = (keep and "keep-alive") or "close"
diff --git a/source/tools/monitor/unity/httplib/influxCli.lua b/source/tools/monitor/unity/httplib/influxCli.lua
index 7b8c60473be28e0dc1c8feaafe6c0b127d179d67..a1eef70cf77b2fa17ee77dd36bf398eb6d36201b 100644
--- a/source/tools/monitor/unity/httplib/influxCli.lua
+++ b/source/tools/monitor/unity/httplib/influxCli.lua
@@ -9,8 +9,8 @@ local ChttpCli = require("httplib.httpCli")
local CinfluxCli = class("influxCli", ChttpCli)
-function CinfluxCli:_init_(url, db, user, pass, proxy)
- self._url = string.format("%swrite?u=%s&p=%s&db=%s", url, user, pass, db)
+function CinfluxCli:_init_(url, proxy)
+ self._url = url
ChttpCli._init_(self, proxy)
end
diff --git a/source/tools/monitor/unity/test/curl/forkRun.py b/source/tools/monitor/unity/test/curl/forkRun.py
new file mode 100644
index 0000000000000000000000000000000000000000..7ab0ceb8032c5f36c030c64107dc3acba27e238c
--- /dev/null
+++ b/source/tools/monitor/unity/test/curl/forkRun.py
@@ -0,0 +1,17 @@
+
+import time
+from outLine import CnfPut
+
+
+def loop(nf):
+ i = 1
+ while True:
+ nf.puts('forkRun,mode=java log="hello runtime.",count=%d' % i)
+ i += 1
+ time.sleep(15)
+
+
+if __name__ == "__main__":
+ time.sleep(2)
+ nf = CnfPut()
+ loop(nf)
\ No newline at end of file
diff --git a/source/tools/monitor/unity/test/curl/hello.py b/source/tools/monitor/unity/test/curl/hello.py
new file mode 100644
index 0000000000000000000000000000000000000000..c497ca2f47acc5395f3ad944dd7eb6a028e1f034
--- /dev/null
+++ b/source/tools/monitor/unity/test/curl/hello.py
@@ -0,0 +1,7 @@
+import time
+import os
+
+print(os.getcwd())
+while True:
+ print("hello.")
+ time.sleep(3)
diff --git a/source/tools/monitor/unity/test/curl/influx.lua b/source/tools/monitor/unity/test/curl/influx.lua
index eb8ce1540ab336e83cc6670cfb7770eea9ec6d29..188992c461a80b6fec5b3641848abbcc9a37ba20 100644
--- a/source/tools/monitor/unity/test/curl/influx.lua
+++ b/source/tools/monitor/unity/test/curl/influx.lua
@@ -8,12 +8,6 @@ package.path = package.path .. ";../../?.lua;"
local system = require("common.system")
local CinfluxCli = require("httplib.influxCli")
-local cli = CinfluxCli.new("http://172.24.90.148:8086/", "lua", "xxx", "xxxxxx")
-
-local v = 1.1
-while true do
- local s = string.format("test,label=a v=%f", v)
- cli:puts(s)
- v = v + 1
- system:sleep(2)
-end
+local cli = CinfluxCli.new("http://ld-wz9d17b514mg6kjkx-proxy-tsdb.lindorm.rds.aliyuncs.com:8242/api/v2/write?db=lua")
+local res = cli:puts('virtout,instance=self log="task:0(swapper/13), cpu:13, delayed:162994631, callstack:mwait_idle_with_hints.constprop.0,intel_idle,cpuidle_enter_state,cpuidle_enter,cpuidle_idle_call,do_idle,cpu_startup_entry,secondary_startup_64_no_verify,"')
+system:dumps(res)
diff --git a/source/tools/monitor/unity/test/curl/outLine.py b/source/tools/monitor/unity/test/curl/outLine.py
new file mode 100644
index 0000000000000000000000000000000000000000..cd5ad58cfb4341acf3b186158c1bd021d84e6dad
--- /dev/null
+++ b/source/tools/monitor/unity/test/curl/outLine.py
@@ -0,0 +1,23 @@
+import os
+import socket
+
+PIPE_PATH = "/var/sysom/outline"
+MAX_BUFF = 128 * 1024
+
+
+class CnfPut(object):
+ def __init__(self, path=PIPE_PATH):
+ self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+ self._path = path
+ if not os.path.exists(self._path):
+ raise ValueError("pipe path is not exist. please check Netinfo is running.")
+
+ def puts(self, s):
+ if len(s) > MAX_BUFF:
+ raise ValueError("message len %d, is too long ,should less than%d" % (len(s), MAX_BUFF))
+ return self._sock.sendto(s, self._path)
+
+
+if __name__ == "__main__":
+ nf = CnfPut()
+ nf.puts('runtime,mode=java log="hello runtime."')
diff --git a/source/tools/monitor/unity/test/curl/poBeaver/poBeaver.lua b/source/tools/monitor/unity/test/curl/poBeaver/poBeaver.lua
index 7271e4f3c48fb2b19b496bbf44e50cc5099bd635..137c9a0dcac878bdedc426f97c4e07e52f717318 100644
--- a/source/tools/monitor/unity/test/curl/poBeaver/poBeaver.lua
+++ b/source/tools/monitor/unity/test/curl/poBeaver/poBeaver.lua
@@ -63,22 +63,6 @@ function CpoBeaver:_install_fd(port, ip, backlog)
end
end
-local function fdNonBlocking(fd)
- local res
- local flag, err, errno = fcntl.fcntl(fd, fcntl.F_GETFL)
- if flag then
- res, err, errno = fcntl.fcntl(fd, fcntl.F_SETFL, bit.bor(flag, fcntl.O_NONBLOCK))
- if res then
- return 0
- else
- posixError("fcntl set failed", err, errno)
- end
- else
- posixError("fcntl get failed", err, errno)
- end
-end
-
-
function CpoBeaver:read(fd, maxLen)
maxLen = maxLen or 1 * 1024 * 1024 -- signal conversation accept 1M stream max
local function readFd()
@@ -203,7 +187,7 @@ function CpoBeaver:accept(fd, e)
elseif e.IN then
local nfd, err, errno = socket.accept(fd)
if nfd then
- fdNonBlocking(nfd)
+ system:fdNonBlocking(nfd)
self:co_add(nfd)
else
posixError("accept new socket failed", err, errno)
diff --git a/source/tools/monitor/unity/test/curl/postCmd.lua b/source/tools/monitor/unity/test/curl/postCmd.lua
new file mode 100644
index 0000000000000000000000000000000000000000..3bbf577a92878e926043ee0b9c53d5bef5b4a0bd
--- /dev/null
+++ b/source/tools/monitor/unity/test/curl/postCmd.lua
@@ -0,0 +1,15 @@
+---
+--- Generated by EmmyLua(https://github.com/EmmyLua)
+--- Created by liaozhaoyan.
+--- DateTime: 2023/4/17 19:55
+---
+
+package.path = package.path .. ";../../?.lua;"
+local system = require("common.system")
+local ChttpCli = require("httplib.httpCli")
+
+local cli = ChttpCli.new()
+local url = "http://127.0.0.1:8400/api/trig"
+local req = {cmd = "exec", exec = "/usr/bin/pwd"}
+local res = cli:postTable(url, req)
+system:dumps(res)
diff --git a/source/tools/monitor/unity/test/curl/postLine.lua b/source/tools/monitor/unity/test/curl/postLine.lua
new file mode 100644
index 0000000000000000000000000000000000000000..e04791a91168261ab57b404481b8610bdf96c1e8
--- /dev/null
+++ b/source/tools/monitor/unity/test/curl/postLine.lua
@@ -0,0 +1,14 @@
+---
+--- Generated by EmmyLua(https://github.com/EmmyLua)
+--- Created by liaozhaoyan.
+--- DateTime: 2023/4/24 02:21
+---
+
+package.path = package.path .. ";../../?.lua;"
+local system = require("common.system")
+local ChttpCli = require("httplib.httpCli")
+
+local cli = ChttpCli.new()
+local url = "http://127.0.0.1:8400/api/line"
+local res = cli:postLine(url, "lineTable,index=abc value=1")
+system:dumps(res)
diff --git a/source/tools/monitor/unity/test/curl/postLine.py b/source/tools/monitor/unity/test/curl/postLine.py
new file mode 100644
index 0000000000000000000000000000000000000000..9885d89c09d751534b74a3c390f3e4c159b22b92
--- /dev/null
+++ b/source/tools/monitor/unity/test/curl/postLine.py
@@ -0,0 +1,6 @@
+import requests
+
+url = "http://127.0.0.1:8400/api/line"
+line = "lineTable,index=abc value=2"
+res = requests.post(url, data=line)
+print(res)
diff --git a/source/tools/monitor/unity/test/curl/postPy.lua b/source/tools/monitor/unity/test/curl/postPy.lua
new file mode 100644
index 0000000000000000000000000000000000000000..d4085f0a4a8bfd1571d56be462e157d70204046a
--- /dev/null
+++ b/source/tools/monitor/unity/test/curl/postPy.lua
@@ -0,0 +1,15 @@
+---
+--- Generated by EmmyLua(https://github.com/EmmyLua)
+--- Created by liaozhaoyan.
+--- DateTime: 2023/4/17 20:21
+---
+
+package.path = package.path .. ";../../?.lua;"
+local system = require("common.system")
+local ChttpCli = require("httplib.httpCli")
+
+local cli = ChttpCli.new()
+local url = "http://127.0.0.1:8400/api/trig"
+local req = {cmd = "exec", exec = "/usr/bin/python", args = {"../test/curl/hello.py",}, }
+local res = cli:postTable(url, req)
+system:dumps(res)
\ No newline at end of file
diff --git a/source/tools/monitor/unity/test/lab/fileSync/fileSync.py b/source/tools/monitor/unity/test/lab/fileSync/fileSync.py
new file mode 100644
index 0000000000000000000000000000000000000000..37b7369770ac754cd69c5a58c2e2998a776e07a4
--- /dev/null
+++ b/source/tools/monitor/unity/test/lab/fileSync/fileSync.py
@@ -0,0 +1,87 @@
+# -*- coding: utf-8 -*-
+"""
+-------------------------------------------------
+ File Name: fileSync.py
+ Description :
+ Author : liaozhaoyan
+ date: 2023/4/18
+-------------------------------------------------
+ Change Activity:
+ 2023/4/18:
+-------------------------------------------------
+"""
+import os
+import sys
+
+
+FILE_SYNC = 0x010000
+FILE_DIRECT = 0x040000
+
+
+def getFlags(fdPath):
+ try:
+ with open(fdPath) as f:
+ for i, line in enumerate(f):
+ if line.startswith("flags:"):
+ _, ret = line.split(":", 1)
+ return ret.strip()
+ except IOError:
+ return 0
+
+
+def getFdName(pid, fd):
+ fdPath = "/proc/%s/fd/%s" % (pid, fd)
+ return os.readlink(fdPath)
+
+
+def getCmd(pid):
+ path = "/proc/%s/cmdline" % pid
+ try:
+ with open(path) as f:
+ return f.read()
+ except IOError:
+ return "nil"
+
+
+def checkFile(fileLink):
+ if fileLink.startswith("/") and not fileLink.startswith("/dev"):
+ return True
+ return False
+
+
+def checkFlag(pid, fd, flag):
+ vFlag = int(flag)
+ link = getFdName(pid, fd)
+ if checkFile(link):
+ if vFlag & FILE_SYNC:
+ print("pid:%s, cmd:%s, file:%s, set sync flag" % (pid, getCmd(pid), link))
+ if vFlag & FILE_DIRECT:
+ print("pid:%s, cmd:%s, file:%s, set direct flag" % (pid, getCmd(pid), link))
+
+
+def checkPid(pid):
+ path = "/proc/" + pid + "/fdinfo"
+ for fd in os.listdir(path):
+ fdPath = "/".join([path, fd])
+ try:
+ flag = getFlags(fdPath)
+ except IOError:
+ continue
+ checkFlag(pid, fd, flag)
+
+
+def walkGroup(path):
+ path += "/tasks"
+ try:
+ with open(path) as f:
+ for i, line in enumerate(f):
+ checkPid(line.strip())
+ except IOError:
+ print("bad path.")
+
+
+if __name__ == "__main__":
+ path = "/sys/fs/cgroup/pids/kubepods/besteffort/slot_976/231a6b41a7be49fdcecac835bc1487272ba8319b8106692d2134c34b025931fa"
+ if len(sys.argv) > 1:
+ path = sys.argv[1]
+ walkGroup(path)
diff --git a/source/tools/monitor/unity/test/lab/hashList.lua b/source/tools/monitor/unity/test/lab/hashList.lua
new file mode 100644
index 0000000000000000000000000000000000000000..b9dab8fb760a2c31c308f1445b463c47ed6eabe5
--- /dev/null
+++ b/source/tools/monitor/unity/test/lab/hashList.lua
@@ -0,0 +1,48 @@
+---
+--- Generated by EmmyLua(https://github.com/EmmyLua)
+--- Created by liaozhaoyan.
+--- DateTime: 2023/4/21 18:25
+---
+
+local aList = {'a', 'ab', 'abc'}
+print(#aList, table.getn(aList))
+for _, v in ipairs(aList) do
+ print(v)
+end
+
+aList[2] = nil
+print(#aList, table.getn(aList))
+for _, v in ipairs(aList) do
+ print(v)
+end
+
+for k, v in pairs(aList) do
+ print(k, v)
+end
+
+local bList = {
+ a = 1,
+ b = 2,
+ c = 3
+}
+
+bList.b = nil
+
+print("list.")
+aList = {'a', 'ab', 'abc'}
+for i, v in ipairs(aList) do
+ print(v)
+ if i == 2 then
+ table.remove(aList, i)
+ end
+end
+
+print("list reverse.")
+aList = {'a', 'ab', 'abc'}
+for i = #aList, 1, -1 do
+ print(aList[i])
+ if i == 2 then
+ table.remove(aList, i)
+ end
+end
+
diff --git a/source/tools/monitor/unity/test/lab/jencode.lua b/source/tools/monitor/unity/test/lab/jencode.lua
new file mode 100644
index 0000000000000000000000000000000000000000..89fbb74db9a17bbd785e26168255d0e5d154946b
--- /dev/null
+++ b/source/tools/monitor/unity/test/lab/jencode.lua
@@ -0,0 +1,13 @@
+---
+--- Generated by EmmyLua(https://github.com/EmmyLua)
+--- Created by liaozhaoyan.
+--- DateTime: 2023/4/18 15:10
+---
+
+local cjson = require("cjson.safe")
+local json = cjson.new()
+json.encode_escape_forward_slash(false)
+
+local log = "task:0(swapper/13), cpu:13, delayed:162994631, callstack:mwait_idle_with_hints.constprop.0,intel_idle,cpuidle_enter_state,cpuidle_enter,cpuidle_idle_call,do_idle,cpu_startup_entry,secondary_startup_64_no_verify,"
+
+print(json.encode(log))
diff --git a/source/tools/monitor/unity/test/lab/kmsg/kmsg.c b/source/tools/monitor/unity/test/lab/kmsg/kmsg.c
new file mode 100644
index 0000000000000000000000000000000000000000..613fa03cd2a61300dd0b0177d60568cafdb1c4f0
--- /dev/null
+++ b/source/tools/monitor/unity/test/lab/kmsg/kmsg.c
@@ -0,0 +1,88 @@
+//
+// Created by 廖肇燕 on 2023/4/18.
+//
+
+#define _GNU_SOURCE
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#define KMSG_LINE 8192
+
+static int kmsg_set_block(int fd) {
+ int ret;
+ int flags = fcntl(fd, F_GETFL);
+ flags &= ~O_NONBLOCK;
+ ret = fcntl(fd, F_SETFL, flags);
+ return ret;
+}
+
+int kmsg_thread_func(void) {
+ int fd;
+ int ret;
+ char buff[KMSG_LINE];
+
+
+ fd = open("/dev/kmsg", O_RDONLY | O_NONBLOCK);
+ if (fd < 0) {
+ goto endOpen;
+ }/*
+ ret = lseek(fd, 0, SEEK_DATA);
+ if (ret < 0) {
+ perror("kmsg seek error");
+ goto endLseek;
+ }*/
+
+ ret = 1; // strip old message.
+ while (ret > 0) {
+ ret = read(fd, buff, KMSG_LINE - 1);
+ buff[ret] = '\0';
+ printf("%s\n", buff);
+ if (ret < 0) {
+ if (errno == EAGAIN) {
+ break;
+ }
+ perror("kmsg read failed.");
+ goto endRead;
+ }
+ }
+
+ ret = kmsg_set_block(fd);
+ if (ret < 0) {
+ perror("kmsg set block failed.");
+ goto endBlock;
+ }
+
+ while (1) {
+
+ ret = read(fd, buff, KMSG_LINE - 1);
+ if (ret < 0) {
+ if (errno == EINTR) {
+ break;
+ }
+ perror("kmsg read2 failed.");
+ goto endRead;
+ }
+ buff[ret -1] = '\0';
+
+ printf("read: %s\n", buff);
+ }
+
+ close(fd);
+ return 0;
+ endBlock:
+ endRead:
+ endLseek:
+ close(fd);
+ endOpen:
+ return 1;
+}
+
+int main(void) {
+ kmsg_thread_func();
+ return 0;
+}
\ No newline at end of file
diff --git a/source/tools/monitor/unity/test/lab/listSun.lua b/source/tools/monitor/unity/test/lab/listSun.lua
new file mode 100644
index 0000000000000000000000000000000000000000..815cadee79f7c26cefc46d48a0bebf78db0c5228
--- /dev/null
+++ b/source/tools/monitor/unity/test/lab/listSun.lua
@@ -0,0 +1,9 @@
+---
+--- Generated by EmmyLua(https://github.com/EmmyLua)
+--- Created by liaozhaoyan.
+--- DateTime: 2023/4/12 17:54
+---
+
+local aList = {1, 2, 3, sum=6}
+print(#aList)
+print(math.max(aList))
\ No newline at end of file
diff --git a/source/tools/monitor/unity/test/lab/tping.lua b/source/tools/monitor/unity/test/lab/tping.lua
index 29d15355f595cc1baaeef9405708045d670ec664..d8a4e72cfc17ade4ec8cba4055c619cc52e7a9e7 100644
--- a/source/tools/monitor/unity/test/lab/tping.lua
+++ b/source/tools/monitor/unity/test/lab/tping.lua
@@ -9,6 +9,8 @@ package.path = package.path .. ";../../?.lua;"
local Cping = require("common.ping")
local p = Cping.new("8.8.8.8", "eth0")
-p:ping()
+print(p:ping())
p = Cping.new("1.2.3.4", "eth0")
-p:ping()
\ No newline at end of file
+print(p:ping())
+p = Cping.new("172.16.0.253", "eth0")
+print(p:ping())
\ No newline at end of file