From 8d321ce4c2d01ad187ea54b612ace3d27243c5f5 Mon Sep 17 00:00:00 2001 From: liaozhaoyan Date: Mon, 10 Apr 2023 15:04:17 +0800 Subject: [PATCH 01/20] change fdNonBlocking to commo system. --- .../tools/monitor/unity/common/inotifies.lua | 18 +--------------- source/tools/monitor/unity/common/system.lua | 19 ++++++++++++++++- .../tools/monitor/unity/httplib/coHttpCli.lua | 21 +------------------ .../unity/test/curl/poBeaver/poBeaver.lua | 18 +--------------- 4 files changed, 21 insertions(+), 55 deletions(-) diff --git a/source/tools/monitor/unity/common/inotifies.lua b/source/tools/monitor/unity/common/inotifies.lua index f9aaf7c9..d8b33c78 100644 --- a/source/tools/monitor/unity/common/inotifies.lua +++ b/source/tools/monitor/unity/common/inotifies.lua @@ -9,7 +9,6 @@ require("common.class") local Cinotifies = class("inotifies") local dirent = require("posix.dirent") local pstat = require("posix.sys.stat") -local fcntl = require("posix.fcntl") local system = require("common.system") local inotify = require('inotify') @@ -27,21 +26,6 @@ local function walk_dirs(path, dirs) end end -local function fdNonBlocking(fd) - local res - local flag, err, errno = fcntl.fcntl(fd, fcntl.F_GETFL) - if flag then - res, err, errno = fcntl.fcntl(fd, fcntl.F_SETFL, bit.bor(flag, fcntl.O_NONBLOCK)) - if res then - return 0 - else - system:posixError("fcntl set failed", err, errno) - end - else - system:posixError("fcntl get failed", err, errno) - end -end - local function mon_dirs(path) local handle = inotify.init() local ws = {} @@ -60,7 +44,7 @@ local function mon_dirs(path) end end - fdNonBlocking(handle:getfd()) + system:fdNonBlocking(handle:getfd()) return handle, ws end diff --git a/source/tools/monitor/unity/common/system.lua b/source/tools/monitor/unity/common/system.lua index 4c0be497..c43a7ffd 100644 --- a/source/tools/monitor/unity/common/system.lua +++ b/source/tools/monitor/unity/common/system.lua @@ -6,13 +6,30 @@ local socket = require("socket") local serpent = require("common.serpent") - local system = {} function system:sleep(t) socket.select(nil, nil, t) end +function system:fdNonBlocking(fd) + local res + local fcntl = require("posix.fcntl") + local bit = require("bit") + + local flag, err, errno = fcntl.fcntl(fd, fcntl.F_GETFL) + if flag then + res, err, errno = fcntl.fcntl(fd, fcntl.F_SETFL, bit.bor(flag, fcntl.O_NONBLOCK)) + if res then + return 0 + else + system:posixError("fcntl set failed", err, errno) + end + else + system:posixError("fcntl get failed", err, errno) + end +end + function system:deepcopy(object) local lookup_table = {} local function _copy(object) diff --git a/source/tools/monitor/unity/httplib/coHttpCli.lua b/source/tools/monitor/unity/httplib/coHttpCli.lua index 67af16b2..99ce059d 100644 --- a/source/tools/monitor/unity/httplib/coHttpCli.lua +++ b/source/tools/monitor/unity/httplib/coHttpCli.lua @@ -59,31 +59,12 @@ local function setTimeOut(fd) end end -local function fdNonBlocking(fd) - local res - local flag, err, errno = fcntl.fcntl(fd, fcntl.F_GETFL) - if flag then - res, err, errno = fcntl.fcntl(fd, fcntl.F_SETFL, bit.bor(flag, fcntl.O_NONBLOCK)) - if res then - return - else - checkInt(errno, fd) - print(string.format("fcntl set failed, report:%d, %s", err, errno)) - return -1 - end - else - checkInt(errno, fd) - print(string.format("fcntl get failed, report:%d, %s", err, errno)) - return -1 - end -end - local function installFd(ip, port) local fd, res, err, errno fd, err, errno = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) if fd then -- for socket - if fdNonBlocking(fd) then + if system:fdNonBlocking(fd) then return end local tConn = {family=socket.AF_INET, addr=ip, port=port} diff --git a/source/tools/monitor/unity/test/curl/poBeaver/poBeaver.lua b/source/tools/monitor/unity/test/curl/poBeaver/poBeaver.lua index 7271e4f3..137c9a0d 100644 --- a/source/tools/monitor/unity/test/curl/poBeaver/poBeaver.lua +++ b/source/tools/monitor/unity/test/curl/poBeaver/poBeaver.lua @@ -63,22 +63,6 @@ function CpoBeaver:_install_fd(port, ip, backlog) end end -local function fdNonBlocking(fd) - local res - local flag, err, errno = fcntl.fcntl(fd, fcntl.F_GETFL) - if flag then - res, err, errno = fcntl.fcntl(fd, fcntl.F_SETFL, bit.bor(flag, fcntl.O_NONBLOCK)) - if res then - return 0 - else - posixError("fcntl set failed", err, errno) - end - else - posixError("fcntl get failed", err, errno) - end -end - - function CpoBeaver:read(fd, maxLen) maxLen = maxLen or 1 * 1024 * 1024 -- signal conversation accept 1M stream max local function readFd() @@ -203,7 +187,7 @@ function CpoBeaver:accept(fd, e) elseif e.IN then local nfd, err, errno = socket.accept(fd) if nfd then - fdNonBlocking(nfd) + system:fdNonBlocking(nfd) self:co_add(nfd) else posixError("accept new socket failed", err, errno) -- Gitee From e538abd86c8be8f8413dd816f6ff57fa45f219cb Mon Sep 17 00:00:00 2001 From: liaozhaoyan Date: Mon, 10 Apr 2023 15:16:42 +0800 Subject: [PATCH 02/20] revert fdNonBlocking for cohttpcli. --- .../tools/monitor/unity/httplib/coHttpCli.lua | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/source/tools/monitor/unity/httplib/coHttpCli.lua b/source/tools/monitor/unity/httplib/coHttpCli.lua index 99ce059d..a55fbd94 100644 --- a/source/tools/monitor/unity/httplib/coHttpCli.lua +++ b/source/tools/monitor/unity/httplib/coHttpCli.lua @@ -59,12 +59,31 @@ local function setTimeOut(fd) end end +local function fdNonBlocking(fd) + local res + local flag, err, errno = fcntl.fcntl(fd, fcntl.F_GETFL) + if flag then + res, err, errno = fcntl.fcntl(fd, fcntl.F_SETFL, bit.bor(flag, fcntl.O_NONBLOCK)) + if res then + return + else + checkInt(errno, fd) + print(string.format("fcntl set failed, report:%d, %s", err, errno)) + return -1 + end + else + checkInt(errno, fd) + print(string.format("fcntl get failed, report:%d, %s", err, errno)) + return -1 + end +end + local function installFd(ip, port) local fd, res, err, errno fd, err, errno = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) if fd then -- for socket - if system:fdNonBlocking(fd) then + if fdNonBlocking(fd) then return end local tConn = {family=socket.AF_INET, addr=ip, port=port} @@ -376,7 +395,7 @@ function CcoHttpCli:closureRead(fd, maxLen) return nil end else - print(system:dump(e)) + system:dumps(e) end return nil end -- Gitee From a8c839e8dded111e6bcd059c2241a1de3882fcdf Mon Sep 17 00:00:00 2001 From: liaozhaoyan Date: Mon, 10 Apr 2023 16:06:41 +0800 Subject: [PATCH 03/20] add ping time. --- source/tools/monitor/unity/common/ping.lua | 32 +++++++++++++++---- source/tools/monitor/unity/test/lab/tping.lua | 6 ++-- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/source/tools/monitor/unity/common/ping.lua b/source/tools/monitor/unity/common/ping.lua index 2d926ebb..1e9315a1 100644 --- a/source/tools/monitor/unity/common/ping.lua +++ b/source/tools/monitor/unity/common/ping.lua @@ -7,7 +7,10 @@ require("common.class") local psocket = require("posix.sys.socket") +local ptime = require("posix.sys.time") +local poll = require("posix.poll") local unistd = require("posix.unistd") +local system = require("common.system") local Cping = class("ping") function Cping:_init_(ip, dev) @@ -20,6 +23,8 @@ function Cping:_init_(ip, dev) local ok, err = psocket.setsockopt(fd, psocket.SOL_SOCKET, psocket.SO_BINDTODEVICE, dev) assert(ok, err) + system:fdNonBlocking(fd) + self.fd = fd end @@ -29,16 +34,31 @@ function Cping:_del_() end end +local function diff_us(t1, t0) + local sum1 = t1.tv_sec * 1e6 + t1.tv_usec + local sum0 = t0.tv_sec * 1e6 + t0.tv_usec + return sum1 - sum0 +end + function Cping:ping() + local fd = self.fd local data = string.char(0x08, 0x00, 0x89, 0x98, 0x6e, 0x63, 0x00, 0x04, 0x00) - local ok, err = psocket.sendto(self.fd, data, {family=psocket.AF_INET, addr=self.ip, port=0}) + local t0 = ptime.gettimeofday() + local ok, err = psocket.sendto(fd, data, {family=psocket.AF_INET, addr=self.ip, port=0}) assert(ok, err) - local data, sa = psocket.recvfrom(self.fd, 1024) - assert(data, sa) - - if data then - print('Received ICMP message from ' .. sa.addr) + local r = poll.rpoll(fd, 300) + if r == 0 then + print("receive over time.") + return -1 + elseif r == 1 then + local data, sa = psocket.recvfrom(fd, 1024) + local t1 = ptime.gettimeofday() + assert(data, sa) + return diff_us(t1, t0) + else + print("closed.") + return -2 end end diff --git a/source/tools/monitor/unity/test/lab/tping.lua b/source/tools/monitor/unity/test/lab/tping.lua index 29d15355..d8a4e72c 100644 --- a/source/tools/monitor/unity/test/lab/tping.lua +++ b/source/tools/monitor/unity/test/lab/tping.lua @@ -9,6 +9,8 @@ package.path = package.path .. ";../../?.lua;" local Cping = require("common.ping") local p = Cping.new("8.8.8.8", "eth0") -p:ping() +print(p:ping()) p = Cping.new("1.2.3.4", "eth0") -p:ping() \ No newline at end of file +print(p:ping()) +p = Cping.new("172.16.0.253", "eth0") +print(p:ping()) \ No newline at end of file -- Gitee From 07f04fe273e9ba5e156d0e477e85af2fae6a8dc3 Mon Sep 17 00:00:00 2001 From: liaozhaoyan Date: Tue, 11 Apr 2023 15:35:16 +0800 Subject: [PATCH 04/20] use kprobe instead. --- .../monitor/unity/collector/plugin/Makefile | 2 +- .../collector/plugin/sum_retrans/Makefile | 8 + .../plugin/sum_retrans/sum_retrans.bpf.c | 51 +++++++ .../plugin/sum_retrans/sum_retrans.c | 117 +++++++++++++++ .../plugin/sum_retrans/sum_retrans.h | 8 + .../unity/collector/plugin/virtout/Makefile | 8 + .../collector/plugin/virtout/virtout.bpf.c | 66 +++++++++ .../unity/collector/plugin/virtout/virtout.c | 139 ++++++++++++++++++ .../unity/collector/plugin/virtout/virtout.h | 18 +++ 9 files changed, 416 insertions(+), 1 deletion(-) create mode 100644 source/tools/monitor/unity/collector/plugin/sum_retrans/Makefile create mode 100644 source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.bpf.c create mode 100644 source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.c create mode 100644 source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.h create mode 100644 source/tools/monitor/unity/collector/plugin/virtout/Makefile create mode 100644 source/tools/monitor/unity/collector/plugin/virtout/virtout.bpf.c create mode 100644 source/tools/monitor/unity/collector/plugin/virtout/virtout.c create mode 100644 source/tools/monitor/unity/collector/plugin/virtout/virtout.h diff --git a/source/tools/monitor/unity/collector/plugin/Makefile b/source/tools/monitor/unity/collector/plugin/Makefile index 7ec4fa48..1523bc19 100644 --- a/source/tools/monitor/unity/collector/plugin/Makefile +++ b/source/tools/monitor/unity/collector/plugin/Makefile @@ -5,7 +5,7 @@ OBJS := proto_sender.o LIB := libproto_sender.a -DEPMOD=sample threads kmsg proc_schedstat proc_loadavg unity_nosched unity_irqoff cpudist cpu_bled net_health net_retrans netlink cpufreq gpuinfo pmu_events +DEPMOD=sample threads kmsg proc_schedstat proc_loadavg unity_nosched unity_irqoff cpudist cpu_bled net_health net_retrans netlink cpufreq gpuinfo pmu_events virtout sum_retrans all: $(LIB) $(DEPMOD) diff --git a/source/tools/monitor/unity/collector/plugin/sum_retrans/Makefile b/source/tools/monitor/unity/collector/plugin/sum_retrans/Makefile new file mode 100644 index 00000000..d0c3391a --- /dev/null +++ b/source/tools/monitor/unity/collector/plugin/sum_retrans/Makefile @@ -0,0 +1,8 @@ + +newdirs := $(shell find ./ -type d) + +bpfsrcs := sum_retrans.bpf.c +csrcs := sum_retrans.c +so := libsum_retrans.so + +include ../bpfso.mk \ No newline at end of file diff --git a/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.bpf.c b/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.bpf.c new file mode 100644 index 00000000..12842cb3 --- /dev/null +++ b/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.bpf.c @@ -0,0 +1,51 @@ +// +// Created by 廖肇燕 on 2023/4/11. +// +#define BPF_NO_GLOBAL_DATA +#include +#include + +BPF_HASH(dips, u32, u64, 1024); +BPF_HASH(inums, u32, u64, 1024); + +static inline u32 read_ns_inum(struct sock *sk) +{ + if (sk) { + return BPF_CORE_READ(sk, __sk_common.skc_net.net, ns.inum); + } + return 0; +} + +static inline u32 read_dip(struct sock *sk) +{ + if (sk) { + return BPF_CORE_READ(sk, __sk_common.skc_daddr); + } + return 0; +} + +static void inc_value(struct bpf_map* maps, u32 k) { + u64 *pv = bpf_map_lookup_elem(maps, &k); + if (pv) { + __sync_fetch_and_add(pv, 1); + } else { + u64 v = 1; + bpf_map_update_elem(maps, &k, &v, BPF_ANY); + } +} + +SEC("kprobe/tcp_retransmit_skb") +int j_tcp_retransmit_skb(struct pt_regs *ctx){ + struct sock *sk; + u32 inum, ip; + + sk = (struct sock *)PT_REGS_PARM1(ctx); + inum = read_ns_inum(sk); + ip = read_dip(sk); + + bpf_printk("hello inum: %u\n", inum); + + inc_value((struct bpf_map *)&inums, inum); + inc_value((struct bpf_map *)&dips, ip); + return 0; +} diff --git a/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.c b/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.c new file mode 100644 index 00000000..02e09fab --- /dev/null +++ b/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.c @@ -0,0 +1,117 @@ +// +// Created by 廖肇燕 on 2023/4/11. +// + +#include "sum_retrans.h" +#include "../bpf_head.h" +#include "sum_retrans.skel.h" + +#include +#include + +#include +#include +#include + + +DEFINE_SEKL_OBJECT(sum_retrans); +static int inum_fd = 0; +static int dip_fd = 0; +int init(void *arg) +{ + int ret; + printf("sum_retrans plugin install.\n"); + ret = LOAD_SKEL_OBJECT(sum_retrans, perf); + inum_fd = coobpf_map_find(sum_retrans->obj, "inums"); + dip_fd = coobpf_map_find(sum_retrans->obj, "dips"); + printf("inum_fd: %d %d\n", inum_fd, dip_fd); + return ret; +} + +#define LOG_MAX 4096 +static char log[LOG_MAX]; + +static int transIP(unsigned long lip, char *result, int size) { + inet_ntop(AF_INET, (void *) &lip, result, size); + return 0; +} + +static void pack_dip() { + int ret = 0; + char ips[32]; + char buff[64]; + unsigned long value; + unsigned int ip, ip_next; + + log[0] = '\0'; + ip = 0; + while (coobpf_key_next(dip_fd, &ip, &ip_next) == 0) { + bpf_map_lookup_elem(dip_fd, &ip_next, &value); + ip = ip_next; + + transIP(ip, ips, 32); + snprintf(buff, 64, "%s:%ld,", ips, value); + strncat(log, buff, 4096); + ip = ip_next; + } + + ip = 0; + while (coobpf_key_next(dip_fd, &ip, &ip_next) == 0) { + ret = bpf_map_delete_elem(dip_fd, &ip_next); + printf("ret2: %d\n", ret); + ip = ip_next; + } +} + +static void pack_inum() { + int ret; + char buff[64]; + unsigned long value; + unsigned int inum, inum_next; + + log[0] = '\0'; + + inum = 0; + while (coobpf_key_next(inum_fd, &inum, &inum_next) == 0) { + bpf_map_lookup_elem(inum_fd, &inum_next, &value); + printf("inum: %d, %ld\n", inum, value); + snprintf(buff, 64, "%d:%ld,", inum, value); + strncat(log, buff, 4096); + inum = inum_next; + } + + inum = 0; + while (coobpf_key_next(inum_fd, &inum, &inum_next) == 0) { + ret = bpf_map_delete_elem(inum_fd, &inum_next); + printf("ret inum: %d\n", ret); + inum = inum_next; + } +} + +int call(int t, struct unity_lines *lines) +{ + struct unity_line* line; + + pack_dip(); + printf("strlen: %ld\n", strlen(log)); + if (strlen(log) > 0) { + unity_alloc_lines(lines, 2); // 预分配好 + + line = unity_get_line(lines, 0); + unity_set_table(line, "retrans_dip"); + unity_set_log(line, "ip_log", log); + + pack_inum(); + line = unity_get_line(lines, 1); + unity_set_table(line, "retrans_inum"); + unity_set_log(line, "inum_log", log); + } + + return 0; +} + +void deinit(void) +{ + printf("sum_retrans plugin uninstall.\n"); + DESTORY_SKEL_BOJECT(sum_retrans); +} diff --git a/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.h b/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.h new file mode 100644 index 00000000..4faecd9a --- /dev/null +++ b/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.h @@ -0,0 +1,8 @@ +// +// Created by 廖肇燕 on 2023/4/11. +// + +#ifndef UNITY_SUM_RETRANS_H +#define UNITY_SUM_RETRANS_H + +#endif //UNITY_SUM_RETRANS_H diff --git a/source/tools/monitor/unity/collector/plugin/virtout/Makefile b/source/tools/monitor/unity/collector/plugin/virtout/Makefile new file mode 100644 index 00000000..f0e8a47f --- /dev/null +++ b/source/tools/monitor/unity/collector/plugin/virtout/Makefile @@ -0,0 +1,8 @@ + +newdirs := $(shell find ./ -type d) + +bpfsrcs := virtout.bpf.c +csrcs := virtout.c +so := libvirtout.so + +include ../bpfso.mk \ No newline at end of file diff --git a/source/tools/monitor/unity/collector/plugin/virtout/virtout.bpf.c b/source/tools/monitor/unity/collector/plugin/virtout/virtout.bpf.c new file mode 100644 index 00000000..9ab4e78a --- /dev/null +++ b/source/tools/monitor/unity/collector/plugin/virtout/virtout.bpf.c @@ -0,0 +1,66 @@ +// +// Created by 廖肇燕 on 2023/2/23. +// + +#include +#include +#include "virtout.h" + +#define MAX_ENTRY 128 +#define BPF_F_FAST_STACK_CMP (1ULL << 9) +#define KERN_STACKID_FLAGS (0 | BPF_F_FAST_STACK_CMP) + +#define PERIOD_TIME (10 * 1000 * 1000ULL) +#define THRESHOLD_TIME (200 * 1000 * 1000ULL) + +BPF_ARRAY(virtdist, u64, 20); +BPF_PERF_OUTPUT(perf, 1024); +BPF_STACK_TRACE(stack, MAX_ENTRY); +BPF_PERCPU_ARRAY(perRec, u64, 2); + +static inline u64 get_last(int index) { + u64 *pv = bpf_map_lookup_elem(&perRec, &index); + if (pv) { + return *pv; + } + return 0; +} + +static inline void save_last(int index, u64 t) { + bpf_map_update_elem(&perRec, &index, &t, BPF_ANY); +} + +static inline void check_time(struct bpf_perf_event_data *ctx, + int index, + struct bpf_map* event) { + u64 t = ns(); + u64 last = get_last(index); + s64 delta; + + save_last(index, t); + delta = t - last; + + if (last && delta > 2 * PERIOD_TIME) { + delta -= PERIOD_TIME; + hist10_push((struct bpf_map_def *)&virtdist, delta / PERIOD_TIME); + if (delta >= THRESHOLD_TIME) { + struct task_struct* task = (struct task_struct *)bpf_get_current_task(); + struct data_t data = {}; + + data.pid = pid(); + data.stack_id = bpf_get_stackid(ctx, &stack, KERN_STACKID_FLAGS); + data.delta = delta; + data.cpu = cpu(); + bpf_get_current_comm(&data.comm, TASK_COMM_LEN); + + bpf_perf_event_output(ctx, event, BPF_F_CURRENT_CPU, &data, sizeof(data)); + } + } +} + +SEC("perf_event") +int sw_clock(struct bpf_perf_event_data *ctx) +{ + check_time(ctx, 1, (struct bpf_map *)&perf); + return 0; +} diff --git a/source/tools/monitor/unity/collector/plugin/virtout/virtout.c b/source/tools/monitor/unity/collector/plugin/virtout/virtout.c new file mode 100644 index 00000000..a0599da3 --- /dev/null +++ b/source/tools/monitor/unity/collector/plugin/virtout/virtout.c @@ -0,0 +1,139 @@ +// +// Created by 廖肇燕 on 2023/2/23. +// + +#include +#define COOLBPF_PERF_THREAD +#include "../bpf_head.h" +#include "virtout.h" +#include "virtout.skel.h" + +#include +#include +#include + +#define CPU_DIST_INDEX 4 +#define DIST_ARRAY_SIZE 20 + +static volatile int budget = 0; // for log budget +static int dist_fd = 0; +static int stack_fd = 0; +int proc(int stack_fd, struct data_t *e, struct unity_line *line); +void handle_event(void *ctx, int cpu, void *data, __u32 data_sz) +{ + int ret; + if (budget > 0) { + struct data_t *e = (struct data_t *)data; + struct beeQ *q = (struct beeQ *)ctx; + struct unity_line *line; + struct unity_lines *lines = unity_new_lines(); + + unity_alloc_lines(lines, 1); + line = unity_get_line(lines, 0); + ret = proc(stack_fd, e, line); + if (ret >= 0) { + beeQ_send(q, lines); + } + budget --; + } +} + +DEFINE_SEKL_OBJECT(virtout); +int init(void *arg) { + int ret; + printf("virtout plugin install.\n"); + + ret = LOAD_SKEL_OBJECT(virtout, perf); + dist_fd = coobpf_map_find(virtout->obj, "virtdist"); + stack_fd = coobpf_map_find(virtout->obj, "stack"); + return ret; +} + +static int get_dist(unsigned long *locals) { + int i = 0; + unsigned long value = 0; + int key, key_next; + + key = 0; + while (coobpf_key_next(dist_fd, &key, &key_next) == 0) { + coobpf_key_value(dist_fd, &key_next, &value); + locals[i ++] = value; + if (i > DIST_ARRAY_SIZE) { + break; + } + key = key_next; + } + return i; +} + +static int cal_dist(unsigned long* values) { + int i, j; + int size; + static unsigned long rec[DIST_ARRAY_SIZE] = {0}; + unsigned long locals[DIST_ARRAY_SIZE]; + + size = get_dist(locals); + for (i = 0; i < CPU_DIST_INDEX - 1; i ++) { + values[i] = locals[i] - rec[i]; + rec[i] = locals[i]; + } + j = i; + values[j] = 0; + for (; i < size; i ++) { + values[j] += locals[i] - rec[i]; + rec[i] = locals[i]; + } + return 0; +} + +int call(int t, struct unity_lines *lines) { + int i; + unsigned long values[CPU_DIST_INDEX]; + const char *names[] = { "ms100", "s1", "s10", "so"}; + struct unity_line* line; + + budget = t; + + unity_alloc_lines(lines, 1); // 预分配好 + line = unity_get_line(lines, 0); + unity_set_table(line, "virtout_dist"); + + cal_dist(values); + for (i = 0; i < CPU_DIST_INDEX; i ++ ) { + unity_set_value(line, i, names[i], values[i]); + } + return 0; +} + + +void deinit(void) +{ + printf("virout plugin uninstall.\n"); + DESTORY_SKEL_BOJECT(virtout); +} + +#define LOG_MAX 256 +static char log[LOG_MAX]; + +int proc(int stack_fd, struct data_t *e, struct unity_line *line) { + int i; + unsigned long addr[128]; + int id = e->stack_id; //last stack + struct ksym_cell* cell; + + snprintf(log, LOG_MAX, "task:%d(%s), cpu:%d, delayed:%ld, callstack:", e->pid, e->comm, e->cpu, e->delta); + coobpf_key_value(stack_fd, &id, &addr); + + for (i = 0; i < 128; i ++) { + if (addr[i] > 0) { + cell = ksym_search(addr[i]); + strncpy(log, cell->func, LOG_MAX); + strncpy(log, ",", LOG_MAX); + } else { + break; + } + } + unity_set_table(line, "virtout_log"); + unity_set_log(line, "log", log); + return 0; +} diff --git a/source/tools/monitor/unity/collector/plugin/virtout/virtout.h b/source/tools/monitor/unity/collector/plugin/virtout/virtout.h new file mode 100644 index 00000000..a0acb6c8 --- /dev/null +++ b/source/tools/monitor/unity/collector/plugin/virtout/virtout.h @@ -0,0 +1,18 @@ +// +// Created by 廖肇燕 on 2023/4/10. +// + +#ifndef UNITY_VIRTOUT_H +#define UNITY_VIRTOUT_H + +#define TASK_COMM_LEN 16 + +struct data_t { + int pid; + int cpu; + unsigned int stack_id; + unsigned long delta; + char comm[TASK_COMM_LEN]; +}; + +#endif //UNITY_VIRTOUT_H -- Gitee From a1db0f215cf7750aaa555c6a436db2a6dac01d82 Mon Sep 17 00:00:00 2001 From: liaozhaoyan Date: Tue, 11 Apr 2023 22:50:45 +0800 Subject: [PATCH 05/20] switch to tracepoint mode. --- .../plugin/sum_retrans/sum_retrans.bpf.c | 32 ++++++++----------- .../plugin/sum_retrans/sum_retrans.c | 13 ++------ 2 files changed, 17 insertions(+), 28 deletions(-) diff --git a/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.bpf.c b/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.bpf.c index 12842cb3..16ac9910 100644 --- a/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.bpf.c +++ b/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.bpf.c @@ -16,14 +16,6 @@ static inline u32 read_ns_inum(struct sock *sk) return 0; } -static inline u32 read_dip(struct sock *sk) -{ - if (sk) { - return BPF_CORE_READ(sk, __sk_common.skc_daddr); - } - return 0; -} - static void inc_value(struct bpf_map* maps, u32 k) { u64 *pv = bpf_map_lookup_elem(maps, &k); if (pv) { @@ -34,16 +26,20 @@ static void inc_value(struct bpf_map* maps, u32 k) { } } -SEC("kprobe/tcp_retransmit_skb") -int j_tcp_retransmit_skb(struct pt_regs *ctx){ - struct sock *sk; - u32 inum, ip; - - sk = (struct sock *)PT_REGS_PARM1(ctx); - inum = read_ns_inum(sk); - ip = read_dip(sk); - - bpf_printk("hello inum: %u\n", inum); +struct tcp_retrans_args { + u64 pad; + u64 skb; + u64 sk; + u16 sport; + u16 dport; + u32 sip; + u32 dip; +}; +SEC("tracepoint/tcp/tcp_retransmit_skb") +int tcp_retransmit_skb_hook(struct tcp_retrans_args *args){ + struct sock *sk = (struct sock *)args->sk; + u32 inum = read_ns_inum(sk); + u32 ip = args->dip; inc_value((struct bpf_map *)&inums, inum); inc_value((struct bpf_map *)&dips, ip); diff --git a/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.c b/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.c index 02e09fab..ea11ba99 100644 --- a/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.c +++ b/source/tools/monitor/unity/collector/plugin/sum_retrans/sum_retrans.c @@ -24,7 +24,6 @@ int init(void *arg) ret = LOAD_SKEL_OBJECT(sum_retrans, perf); inum_fd = coobpf_map_find(sum_retrans->obj, "inums"); dip_fd = coobpf_map_find(sum_retrans->obj, "dips"); - printf("inum_fd: %d %d\n", inum_fd, dip_fd); return ret; } @@ -37,7 +36,6 @@ static int transIP(unsigned long lip, char *result, int size) { } static void pack_dip() { - int ret = 0; char ips[32]; char buff[64]; unsigned long value; @@ -57,14 +55,12 @@ static void pack_dip() { ip = 0; while (coobpf_key_next(dip_fd, &ip, &ip_next) == 0) { - ret = bpf_map_delete_elem(dip_fd, &ip_next); - printf("ret2: %d\n", ret); + bpf_map_delete_elem(dip_fd, &ip_next); ip = ip_next; } } static void pack_inum() { - int ret; char buff[64]; unsigned long value; unsigned int inum, inum_next; @@ -74,16 +70,14 @@ static void pack_inum() { inum = 0; while (coobpf_key_next(inum_fd, &inum, &inum_next) == 0) { bpf_map_lookup_elem(inum_fd, &inum_next, &value); - printf("inum: %d, %ld\n", inum, value); - snprintf(buff, 64, "%d:%ld,", inum, value); + snprintf(buff, 64, "%u:%ld,", inum_next, value); strncat(log, buff, 4096); inum = inum_next; } inum = 0; while (coobpf_key_next(inum_fd, &inum, &inum_next) == 0) { - ret = bpf_map_delete_elem(inum_fd, &inum_next); - printf("ret inum: %d\n", ret); + bpf_map_delete_elem(inum_fd, &inum_next); inum = inum_next; } } @@ -93,7 +87,6 @@ int call(int t, struct unity_lines *lines) struct unity_line* line; pack_dip(); - printf("strlen: %ld\n", strlen(log)); if (strlen(log) > 0) { unity_alloc_lines(lines, 2); // 预分配好 -- Gitee From a84799693c17a7e0072eddf4537861ad2abef1d8 Mon Sep 17 00:00:00 2001 From: liaozhaoyan Date: Fri, 14 Apr 2023 10:19:07 +0800 Subject: [PATCH 06/20] add io except check. --- source/tools/monitor/unity/beaver/url_api.lua | 5 +- .../monitor/unity/beeQ/rbtree/rbEvent.lua | 3 +- .../monitor/unity/collector/io/diskFifo.lua | 58 ++++++++++ .../unity/collector/io/exceptCheck.lua | 107 ++++++++++++++++++ .../unity/collector/io/io_diagnose.lua | 38 ++++--- .../unity/collector/postEngine/execBase.lua | 53 +++++++++ .../tools/monitor/unity/test/lab/listSun.lua | 9 ++ 7 files changed, 251 insertions(+), 22 deletions(-) create mode 100644 source/tools/monitor/unity/collector/io/diskFifo.lua create mode 100644 source/tools/monitor/unity/collector/io/exceptCheck.lua create mode 100644 source/tools/monitor/unity/collector/postEngine/execBase.lua create mode 100644 source/tools/monitor/unity/test/lab/listSun.lua diff --git a/source/tools/monitor/unity/beaver/url_api.lua b/source/tools/monitor/unity/beaver/url_api.lua index d45281c1..29f1c219 100644 --- a/source/tools/monitor/unity/beaver/url_api.lua +++ b/source/tools/monitor/unity/beaver/url_api.lua @@ -17,13 +17,12 @@ function CurlApi:_init_(frame, fYaml) self._urlCb["/api/sum"] = function(tReq) return self:sum(tReq) end self._urlCb["/api/sub"] = function(tReq) return self:sub(tReq) end self._urlCb["/api/query"] = function(tReq) return self:query(tReq) end - self._urlCb["/api/que"] = function(tReq) return self:que(tReq) end - self._urlCb["/api/trig"] = function(tReq) return self:que(tReq) end + self._urlCb["/api/trig"] = function(tReq) return self:trig(tReq) end self:_install(frame) self:_setupQs(fYaml) end -function CurlApi:que(tReq) +function CurlApi:trig(tReq) local stat, tJson = pcall(self.getJson, self, tReq) if stat and tJson then local s = self:jencode(tJson) diff --git a/source/tools/monitor/unity/beeQ/rbtree/rbEvent.lua b/source/tools/monitor/unity/beeQ/rbtree/rbEvent.lua index bb904d01..c9b45317 100644 --- a/source/tools/monitor/unity/beeQ/rbtree/rbEvent.lua +++ b/source/tools/monitor/unity/beeQ/rbtree/rbEvent.lua @@ -60,8 +60,7 @@ function CrbEvent:addEvent(name, obj, period, delay, loop) end function CrbEvent:_proc(now, node) - --print(now .. ": node " .. node.name .. " work") - local ret + local ret = -1 if node.obj.work then ret = node.obj:work(node.period, self) end diff --git a/source/tools/monitor/unity/collector/io/diskFifo.lua b/source/tools/monitor/unity/collector/io/diskFifo.lua new file mode 100644 index 00000000..dc3e4a7a --- /dev/null +++ b/source/tools/monitor/unity/collector/io/diskFifo.lua @@ -0,0 +1,58 @@ +--- +--- Generated by EmmyLua(https://github.com/EmmyLua) +--- Created by liaozhaoyan. +--- DateTime: 2023/4/12 15:51 +--- + +require("common.class") +local Cfifo = require("common.fifo") + +local CdiskFifo = class("diskFifo", Cfifo) + +function CdiskFifo:_init_(maxLen) + Cfifo._init_(self, maxLen) +end + +function CdiskFifo:iowait() + local sum = 0 + local value + local cells = {} + local count = self._count + + if #self.list < count then + return + end + + for i, v in ipairs(self.list) do + value = v.iowait + cells[i], sum = value, sum + value + end + return {max = math.max(unpack(cells)), min = math.min(unpack(cells)), count = count, avg = sum / count} +end + +function CdiskFifo:values(disk, key) + local c = 0 + local sum = 0 + local value + local cells = {} + local count = self._count + + if #self.list < count then + return + end + + local d + for _, v in ipairs(self.list) do + d = v[disk] + if d then + value = d[key] + c = c + 1 + cells[c], sum = value, sum + value + else -- not full + return + end + end + return {max = math.max(unpack(cells)), min = math.min(unpack(cells)), count = count, avg = sum / count} +end + +return CdiskFifo diff --git a/source/tools/monitor/unity/collector/io/exceptCheck.lua b/source/tools/monitor/unity/collector/io/exceptCheck.lua new file mode 100644 index 00000000..e56302ef --- /dev/null +++ b/source/tools/monitor/unity/collector/io/exceptCheck.lua @@ -0,0 +1,107 @@ +--- +--- Generated by EmmyLua(https://github.com/EmmyLua) +--- Created by liaozhaoyan. +--- DateTime: 2023/4/12 16:26 +--- + +require("common.class") +local system = require("common.system") +local CdiskFifo = require("collector.io.diskFifo") +local CexecptCheck = class("CexecptCheck") + +-- [[ v format +-- iowait: 1 +-- disk_a: { +-- iops = 1, +-- bps = 2, +-- qusize = 3, +-- await = 4, +-- util = 5, +-- } +-- ]] + +local fifoSize = 60 +local keys = {"iops", "bps", "qusize", "await", "util"} + +local function addItem() + return { + baseThresh = { + nrSample = 0, + curWinMinVal = 1e9, + curWinMaxVal = 0, + moveAvg = 0, + thresh = 0 + }, + compensation = { + thresh = 0, + shouldUpdThreshComp = true, + decRangeThreshAvg = 0, + decRangeCnt = 0, + minStableThresh = 1e9, + maxStableThresh = 0, + stableThreshAvg = 0, + nrStableThreshSample = 0, + }, + dynTresh = 1e9, + usedWin = 0, + } +end + +local function addItems() + local ret = {} + for _, key in ipairs(keys) do + ret[key] = addItem() + end + return ret +end + +function CexecptCheck:_init_() + self._fifo = CdiskFifo.new(fifoSize) + self._waitItem = addItem() + self._diskItem = {} +end + +local function calc(item, vs) + local bt = item.baseThresh + bt.curWinMinVal = vs.min + bt.curWinMaxVal = vs.max + bt.moveAvg = vs.arg + bt.nrSample = vs.count +end + +function CexecptCheck:calcs() + local iowaits = self._fifo:iowait() + if iowaits then + calc(self._waitItem, iowaits) + end + + local vs + for disk, item in pairs(self._diskItem) do + for _, key in ipairs(keys) do + vs = self._fifo:values(disk, key) + if vs then + system:dumps(item) + calc(item[key], vs) + end + end + end +end + +function CexecptCheck:addValue(v) + for disk, _ in pairs(v) do -- new iowait names one disk + if disk ~= "iowait" then + if not system:keyIsIn(self._diskItem, disk) then + self._diskItem[disk] = addItems() + end + end + end + for disk, _ in pairs(self._diskItem) do -- del + if not system:keyIsIn(v, disk) then + self._diskItem[disk] = nil + end + end + self._fifo:push(v) + self:calcs() +end + +return CexecptCheck diff --git a/source/tools/monitor/unity/collector/io/io_diagnose.lua b/source/tools/monitor/unity/collector/io/io_diagnose.lua index 7c3fc552..a26cdec7 100644 --- a/source/tools/monitor/unity/collector/io/io_diagnose.lua +++ b/source/tools/monitor/unity/collector/io/io_diagnose.lua @@ -11,13 +11,14 @@ local CioDiagnose = class("ioDiagnose", CvProto) local system = require("common.system") local procffi = require("collector.native.procffi") local pystring = require("common.pystring") +local CexceptCheck = require("collector.io.exceptCheck") local unistd = require("posix.unistd") -local function ioWait(fStat) - local data = procffi["ffi"].new("var_long_t") +local function readIoWait(fStat) + local data = procffi["ffi"].new("var_kvs_t") local stat = io.open(fStat, "r") local s = stat:read() - assert(procffi["cffi"].var_input_long(procffi["ffi"].string(s), data) == 0) + assert(procffi["cffi"].var_input_kvs(procffi["ffi"].string(s), data) == 0) stat:close() local sum = 0 for i = 0, data.no do @@ -36,7 +37,7 @@ local function distStat(fDisks) local disks = {} for line in io.lines(fDisks) do local s = pystring:split(line, nil, 3)[4] - local data = procffi["ffi"].new("var_kvs_t") + local data= procffi["ffi"].new("var_kvs_t") assert(procffi["cffi"].var_input_kvs(procffi["ffi"].string(s), data) == 0) local disk = procffi["ffi"].string(data.s) disks[disk] = data @@ -46,10 +47,11 @@ end local function pickValue(value) return { - rws = tonumber(value[1] + value[5]), - rwSecs = tonumber(value[3] + value[7]), - qusize = tonumber(value[11]), - rwTiks = tonumber(value[4] + value[8]), + iops = tonumber(value[0] + value[4]), -- 1 5 + bps = tonumber(value[2] + value[6]) * 512, -- 3 7 + qusize = tonumber(value[10]) / 1000.0, -- 11 + await = tonumber(value[3] + value[7]), -- 4 8 + util = tonumber(value[9]) / 10.0 -- 10 } end @@ -60,11 +62,12 @@ function CioDiagnose:_init_(que, proto_q, fYaml, tid) self.fStat = res.config.proc_path .. "proc/stat" self.fDiskStat = res.config.proc_path .. "proc/diskstats" self.dirSys = res.config.proc_path .. "sys/block/" - self._lastCpuTotal, self._lastCpuIO = ioWait(self.fStat) + self._lastCpuTotal, self._lastCpuIO = readIoWait(self.fStat) self._disks = {} self._disksLast = {} self:readProc() self:storeProc() + self._check = CexceptCheck.new() end function CioDiagnose:readProc() @@ -85,8 +88,8 @@ function CioDiagnose:storeProc() self._disks = {} end -function CioDiagnose:diff(t) - local res = {} +function CioDiagnose:diff(t, iowait) + local res = {iowait = iowait} for disk, value in pairs(self._disks) do local vLast = self._disksLast[disk] if vLast then @@ -94,6 +97,9 @@ function CioDiagnose:diff(t) for k, v in pairs(value) do vs[k] = (v - vLast[k]) / t end + if vs.iops > 0 then + vs.await = vs.await / vs.iops + end res[disk] = vs end end @@ -101,15 +107,13 @@ function CioDiagnose:diff(t) end function CioDiagnose:work(t) - local cpuTotal, cpuIO = ioWait(self.fStat) - --print( cpuTotal - self._lastCpuTotal, cpuIO - self._lastCpuIO) + local cpuTotal, cpuIO = readIoWait(self.fStat) + local iowait = (cpuIO - self._lastCpuIO) * 100 / (cpuTotal - self._lastCpuTotal) / t self._lastCpuTotal, self._lastCpuIO = cpuTotal, cpuIO self:readProc() - local res = self:diff(t) - --system:dumps(res) - --system:dumps(self._disks) - --system:dumps(self._disksLast) + local res = self:diff(t, iowait) + self._check:addValue(res) self:storeProc() end diff --git a/source/tools/monitor/unity/collector/postEngine/execBase.lua b/source/tools/monitor/unity/collector/postEngine/execBase.lua new file mode 100644 index 00000000..0199fd0d --- /dev/null +++ b/source/tools/monitor/unity/collector/postEngine/execBase.lua @@ -0,0 +1,53 @@ +--- +--- Generated by EmmyLua(https://github.com/EmmyLua) +--- Created by liaozhaoyan. +--- DateTime: 2023/4/12 00:01 +--- + +require("common.class") +local unistd = require("posix.unistd") +local pwait = require("posix.sys.wait") +local signal = require(" posix.signal") +local pystring = require("common.pystring") +local system = require("common.system") + +local CexecBase = class("execBase") +local interval = 5 --- poll for every 5 second + +function CexecBase:_init_(cmd, args, seconds) + self._pid = 0 + self.cmd = cmd + self._args = args + self._cnt = 0 + self._loop = seconds / interval +end + +function CexecBase:run() + local pid, err = unistd.fork() + assert(pid, "fork report" .. err) + if pid > 0 then + self._pid = pid + self._cnt = 0 + else + local errno + _, err, errno = unistd.exec(self.cmd, self._args) + assert(not errno, "exec failed." .. err .. errno) + end +end + +function CexecBase:addEvents(e) + e:addEvent(self.cmd, self, interval, true, self._loop) +end + +function CexecBase:work() + local cnt = self._cnt + if cnt >= self._loop then + local pid, stat, exit = pwait.wait(self._pid, pwait.WNOHANG) + assert(pid, "wait failed " .. stat .. exit) + if not exit then -- process not exit + signal.signal(self._pid, signal.SIGKILL) -- force to kill task + pwait.wait(self._pid) -- wait task + end + end + self._cnt = cnt + 1 +end diff --git a/source/tools/monitor/unity/test/lab/listSun.lua b/source/tools/monitor/unity/test/lab/listSun.lua new file mode 100644 index 00000000..815cadee --- /dev/null +++ b/source/tools/monitor/unity/test/lab/listSun.lua @@ -0,0 +1,9 @@ +--- +--- Generated by EmmyLua(https://github.com/EmmyLua) +--- Created by liaozhaoyan. +--- DateTime: 2023/4/12 17:54 +--- + +local aList = {1, 2, 3, sum=6} +print(#aList) +print(math.max(aList)) \ No newline at end of file -- Gitee From 8a09fc88ee0ba6309abc9a38b2b1b8eb8c674759 Mon Sep 17 00:00:00 2001 From: liaozhaoyan Date: Sat, 15 Apr 2023 11:50:05 +0800 Subject: [PATCH 07/20] fix virtout bug. --- source/tools/monitor/unity/beeQ/pack.sh | 4 + source/tools/monitor/unity/beeQ/pushTo.lua | 6 +- source/tools/monitor/unity/beeQ/run.sh | 2 +- .../unity/collector/io/exceptCheck.lua | 1 - .../collector/plugin/virtout/virtout.bpf.c | 5 +- .../unity/collector/plugin/virtout/virtout.c | 89 ++++++++++++++++++- .../tools/monitor/unity/httplib/coHttpCli.lua | 44 +++++++-- .../tools/monitor/unity/httplib/coInflux.lua | 5 +- 8 files changed, 137 insertions(+), 19 deletions(-) diff --git a/source/tools/monitor/unity/beeQ/pack.sh b/source/tools/monitor/unity/beeQ/pack.sh index 57f6e8a1..42eea828 100755 --- a/source/tools/monitor/unity/beeQ/pack.sh +++ b/source/tools/monitor/unity/beeQ/pack.sh @@ -43,13 +43,17 @@ mkdir ${APP}/collector mkdir ${APP}/collector/native mkdir ${APP}/collector/guard mkdir ${APP}/collector/outline +mkdir ${APP}/collector/postPlugin mkdir ${APP}/collector/postEngine +mkdir ${APP}/collector/io cp collector/native/*.so* ${APP}/collector/native/ cp collector/native/*.lua ${APP}/collector/native/ cp collector/*.lua ${APP}/collector/ cp collector/guard/*.lua ${APP}/collector/guard cp collector/outline/*.lua ${APP}/collector/outline +cp collector/postPlugin/*.lua ${APP}/collector/postPlugin cp collector/postEngine/*.lua ${APP}/collector/postEngine +cp collector/io/*.lua ${APP}/collector/io cp collector/plugin.yaml ${APP}/collector/ mkdir ${APP}/common diff --git a/source/tools/monitor/unity/beeQ/pushTo.lua b/source/tools/monitor/unity/beeQ/pushTo.lua index 9daecebb..bf301ffe 100644 --- a/source/tools/monitor/unity/beeQ/pushTo.lua +++ b/source/tools/monitor/unity/beeQ/pushTo.lua @@ -8,14 +8,10 @@ package.path = package.path .. ";../?.lua;" local coCli = require("httplib.coCli") local coInflux = require("httplib.coInflux") -local unistd = require("posix.unistd") -local system = require("common.system") function work(fd, fYaml) - local res = system:parseYaml(fYaml) - local pushTo = res.pushTo local frame = coCli.new(fd) - local c = coInflux.new(pushTo.host, pushTo.port, pushTo.url) + local c = coInflux.new(fYaml) frame:poll(c) print("end push.") return 0 diff --git a/source/tools/monitor/unity/beeQ/run.sh b/source/tools/monitor/unity/beeQ/run.sh index 9c23dcdd..23bf4702 100755 --- a/source/tools/monitor/unity/beeQ/run.sh +++ b/source/tools/monitor/unity/beeQ/run.sh @@ -7,7 +7,7 @@ export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:../beaver/ export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:../../install/ export LUA_PATH="../../lua/?.lua;../../lua/?/init.lua;" -export LUA_CPATH="../../lib/?.so;../../lib/loadall.so;" +export LUA_CPATH="./lib/?.so;../../lib/?.so;../../lib/loadall.so;" yaml_path=$1 [ ! $yaml_path ] && yaml_path="/etc/sysak/plugin.yaml" diff --git a/source/tools/monitor/unity/collector/io/exceptCheck.lua b/source/tools/monitor/unity/collector/io/exceptCheck.lua index e56302ef..1f88fc47 100644 --- a/source/tools/monitor/unity/collector/io/exceptCheck.lua +++ b/source/tools/monitor/unity/collector/io/exceptCheck.lua @@ -80,7 +80,6 @@ function CexecptCheck:calcs() for _, key in ipairs(keys) do vs = self._fifo:values(disk, key) if vs then - system:dumps(item) calc(item[key], vs) end end diff --git a/source/tools/monitor/unity/collector/plugin/virtout/virtout.bpf.c b/source/tools/monitor/unity/collector/plugin/virtout/virtout.bpf.c index 9ab4e78a..ee86f871 100644 --- a/source/tools/monitor/unity/collector/plugin/virtout/virtout.bpf.c +++ b/source/tools/monitor/unity/collector/plugin/virtout/virtout.bpf.c @@ -1,6 +1,7 @@ // // Created by 廖肇燕 on 2023/2/23. // +#define BPF_NO_GLOBAL_DATA #include #include @@ -11,7 +12,7 @@ #define KERN_STACKID_FLAGS (0 | BPF_F_FAST_STACK_CMP) #define PERIOD_TIME (10 * 1000 * 1000ULL) -#define THRESHOLD_TIME (200 * 1000 * 1000ULL) +#define THRESHOLD_TIME (150 * 1000 * 1000ULL) BPF_ARRAY(virtdist, u64, 20); BPF_PERF_OUTPUT(perf, 1024); @@ -42,6 +43,7 @@ static inline void check_time(struct bpf_perf_event_data *ctx, if (last && delta > 2 * PERIOD_TIME) { delta -= PERIOD_TIME; + bpf_printk("delta %llu.\n", delta); hist10_push((struct bpf_map_def *)&virtdist, delta / PERIOD_TIME); if (delta >= THRESHOLD_TIME) { struct task_struct* task = (struct task_struct *)bpf_get_current_task(); @@ -52,6 +54,7 @@ static inline void check_time(struct bpf_perf_event_data *ctx, data.delta = delta; data.cpu = cpu(); bpf_get_current_comm(&data.comm, TASK_COMM_LEN); + bpf_printk("delta %llu, pid:%d.\n", delta, data.pid); bpf_perf_event_output(ctx, event, BPF_F_CURRENT_CPU, &data, sizeof(data)); } diff --git a/source/tools/monitor/unity/collector/plugin/virtout/virtout.c b/source/tools/monitor/unity/collector/plugin/virtout/virtout.c index a0599da3..7c6d7321 100644 --- a/source/tools/monitor/unity/collector/plugin/virtout/virtout.c +++ b/source/tools/monitor/unity/collector/plugin/virtout/virtout.c @@ -11,6 +11,9 @@ #include #include #include +#include +#include + #define CPU_DIST_INDEX 4 #define DIST_ARRAY_SIZE 20 @@ -18,6 +21,8 @@ static volatile int budget = 0; // for log budget static int dist_fd = 0; static int stack_fd = 0; +static int perf_fds[1024]; + int proc(int stack_fd, struct data_t *e, struct unity_line *line); void handle_event(void *ctx, int cpu, void *data, __u32 data_sz) { @@ -38,12 +43,90 @@ void handle_event(void *ctx, int cpu, void *data, __u32 data_sz) } } +static void close_perf_fds(void) { + int i; + for (i = 0; i < 1024; i ++) { + if (perf_fds[i] > 0) { + close(perf_fds[i]); + perf_fds[i] = 0; + } else { + break; + } + } +} + +static long perf_event_open(struct perf_event_attr *hw_event, pid_t pid, + int cpu, int group_fd, int flags) +{ + int ret; + + ret = syscall(__NR_perf_event_open, hw_event, pid, cpu, + group_fd, flags); + return ret; +} + DEFINE_SEKL_OBJECT(virtout); +static struct bpf_prog_skeleton *search_progs(const char* func) { + struct bpf_object_skeleton *s; + int i; + + s = virtout->skeleton; + for (i = 0; i < s->prog_cnt; i ++) { + if (strcmp(s->progs[i].name, func) == 0) { + return &(s->progs[i]); + } + } + return NULL; +} + +static int setup_perf_events(const char* func) { + int nr_cpus = libbpf_num_possible_cpus(); + int i; + struct bpf_link *link; + struct bpf_prog_skeleton *progs; + + progs = search_progs(func); + + struct perf_event_attr perf_attr = { + .type = PERF_TYPE_SOFTWARE, + .config = PERF_COUNT_SW_CPU_CLOCK, + .freq = 0, + .sample_period = 10 * 1000 * 1000, + }; + + for (i = 0; i < nr_cpus; i ++) { + perf_fds[i] = perf_event_open(&perf_attr, -1, i, -1, 0); + if (perf_fds[i] < 0) { + perror("syscall failed."); + perf_fds[i] = 0; + close_perf_fds(); + return -1; + } + + link = bpf_program__attach_perf_event(*(progs->prog), perf_fds[i]); + if (!link) { + perror("attach failed."); + close_perf_fds(); + return -1; + } + *(progs->link) = link; + } + printf("setup virout OK.\n"); + return 0; +} + int init(void *arg) { int ret; printf("virtout plugin install.\n"); ret = LOAD_SKEL_OBJECT(virtout, perf); + if (ret < 0) { + return ret; + } + ret = setup_perf_events("sw_clock"); + if (ret < 0) { + return ret; + } dist_fd = coobpf_map_find(virtout->obj, "virtdist"); stack_fd = coobpf_map_find(virtout->obj, "stack"); return ret; @@ -109,6 +192,7 @@ int call(int t, struct unity_lines *lines) { void deinit(void) { printf("virout plugin uninstall.\n"); + close_perf_fds(); DESTORY_SKEL_BOJECT(virtout); } @@ -127,12 +211,13 @@ int proc(int stack_fd, struct data_t *e, struct unity_line *line) { for (i = 0; i < 128; i ++) { if (addr[i] > 0) { cell = ksym_search(addr[i]); - strncpy(log, cell->func, LOG_MAX); - strncpy(log, ",", LOG_MAX); + strncat(log, cell->func, LOG_MAX); + strncat(log, ",", LOG_MAX); } else { break; } } + printf("log:%s\n", log); unity_set_table(line, "virtout_log"); unity_set_log(line, "log", log); return 0; diff --git a/source/tools/monitor/unity/httplib/coHttpCli.lua b/source/tools/monitor/unity/httplib/coHttpCli.lua index a55fbd94..461689be 100644 --- a/source/tools/monitor/unity/httplib/coHttpCli.lua +++ b/source/tools/monitor/unity/httplib/coHttpCli.lua @@ -113,11 +113,18 @@ local function setupSocket(host, port) end end -function CcoHttpCli:_init_(host, port, url, persistent) - self._host = host - self._port = port or 80 - self._url = url or "/" +function CcoHttpCli:_init_(fYaml, persistent) + local res = system:parseYaml(fYaml) + local pushTo = res.pushTo + self._host = pushTo.host + self._port = pushTo.port or 80 + self._url = pushTo.url or "/" self._persistent = persistent + + local Cidentity = require("beaver.identity") + local inst = Cidentity.new(fYaml) + self._instance = inst:id() + self.status = enumStat.closed end @@ -144,6 +151,31 @@ function CcoHttpCli:trans(msgs, body, filter) return "" end +function CcoHttpCli:addInstance(line) -- add instance id for line index. + local cells = line.ls + local hasInstance = false + + if cells then + for _, cell in ipairs(cells) do + if cell.name == "instance" then + hasInstance = true + end + end + end + + if not hasInstance then + local cell = { + name = "instance", + index = self._instance + } + if cells then + table.insert(cells, cell) + else + line.ls = {cell} + end + end +end + function CcoHttpCli:pack(body) local line = self:packCliHead('GET', self._url) local head = { @@ -497,9 +529,7 @@ function CcoHttpCli:work(cffi, efd) else goto failed end - if self._persistent then - print("continue.") - else + if not self._persistent then goto failed end end diff --git a/source/tools/monitor/unity/httplib/coInflux.lua b/source/tools/monitor/unity/httplib/coInflux.lua index 002ac09a..b808e3fd 100644 --- a/source/tools/monitor/unity/httplib/coInflux.lua +++ b/source/tools/monitor/unity/httplib/coInflux.lua @@ -11,8 +11,8 @@ local lineParse = require("common.lineParse") local CcoInflux = class("coInflux", CcoHttpCli) -function CcoInflux:_init_(host, port, url) - CcoHttpCli._init_(self, host, port, url) +function CcoInflux:_init_(fYaml) + CcoHttpCli._init_(self, fYaml) end function CcoInflux:echo(tReq) @@ -28,6 +28,7 @@ function CcoInflux:trans(msgs, body, filter) lines = msgs.lines for _, line in ipairs(lines) do c = c + 1 + self:addInstance(line) bodies[c] = lineParse.packs(line) end -- Gitee From f901d3ec44c642288fc427249809084d3c8fa1e0 Mon Sep 17 00:00:00 2001 From: liaozhaoyan Date: Sat, 15 Apr 2023 22:56:06 +0800 Subject: [PATCH 08/20] skip offline cpu. --- .../monitor/unity/collector/guard/calcJiffies.lua | 2 +- .../unity/collector/plugin/virtout/virtout.c | 14 ++++++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/source/tools/monitor/unity/collector/guard/calcJiffies.lua b/source/tools/monitor/unity/collector/guard/calcJiffies.lua index 5e12adfa..922f345b 100644 --- a/source/tools/monitor/unity/collector/guard/calcJiffies.lua +++ b/source/tools/monitor/unity/collector/guard/calcJiffies.lua @@ -56,7 +56,7 @@ function mod.calc(mnt, procffi) local comp = delta1 / delta2 if comp >= 1.1 or comp < 0.9 then - errno("calculate jiffies failed.") + error("calculate jiffies failed.") end return (delta1 + delta2) * 2.5 / nproc() diff --git a/source/tools/monitor/unity/collector/plugin/virtout/virtout.c b/source/tools/monitor/unity/collector/plugin/virtout/virtout.c index 7c6d7321..070c483c 100644 --- a/source/tools/monitor/unity/collector/plugin/virtout/virtout.c +++ b/source/tools/monitor/unity/collector/plugin/virtout/virtout.c @@ -49,8 +49,6 @@ static void close_perf_fds(void) { if (perf_fds[i] > 0) { close(perf_fds[i]); perf_fds[i] = 0; - } else { - break; } } } @@ -97,10 +95,14 @@ static int setup_perf_events(const char* func) { for (i = 0; i < nr_cpus; i ++) { perf_fds[i] = perf_event_open(&perf_attr, -1, i, -1, 0); if (perf_fds[i] < 0) { - perror("syscall failed."); - perf_fds[i] = 0; - close_perf_fds(); - return -1; + if (errno == ENODEV) { + printf("skip offline cpu id: %d\n", i); + continue; + } else { + perror("syscall failed."); + close_perf_fds(); + return -1; + } } link = bpf_program__attach_perf_event(*(progs->prog), perf_fds[i]); -- Gitee From 6d16d823892fe480733933faa5e4b7719e20710f Mon Sep 17 00:00:00 2001 From: liaozhaoyan Date: Sun, 16 Apr 2023 22:42:16 +0800 Subject: [PATCH 09/20] add json encode for load_log. --- source/tools/monitor/unity/beeQ/proto_queue.lua | 4 +++- source/tools/monitor/unity/collector/plugin.lua | 4 +++- source/tools/monitor/unity/collector/plugin/kmsg/kmsg.c | 1 + 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/source/tools/monitor/unity/beeQ/proto_queue.lua b/source/tools/monitor/unity/beeQ/proto_queue.lua index ecece0f1..b1597f3e 100644 --- a/source/tools/monitor/unity/beeQ/proto_queue.lua +++ b/source/tools/monitor/unity/beeQ/proto_queue.lua @@ -8,6 +8,8 @@ require("common.class") local CprotoData = require("common.protoData") local CprotoQueue = class("loop") +local cjson = require("cjson.safe") +local json = cjson.new() local system = require("common.system") function CprotoQueue:_init_(que) @@ -55,7 +57,7 @@ function CprotoQueue:load_log(unity_line, line) if #name > 0 then local log = self._ffi.string(unity_line.logs[0].log) self._ffi.C.free(unity_line.logs[0].log) -- should free from strdup - table.insert(line.log, {name = name, log = log}) + table.insert(line.log, {name = name, log = json.encode(log)}) end end diff --git a/source/tools/monitor/unity/collector/plugin.lua b/source/tools/monitor/unity/collector/plugin.lua index 38cdb271..216d1160 100644 --- a/source/tools/monitor/unity/collector/plugin.lua +++ b/source/tools/monitor/unity/collector/plugin.lua @@ -7,6 +7,8 @@ require("common.class") local Cplugin = class("plugin") local dockerinfo = require("common.dockerinfo") +local cjson = require("cjson.safe") +local json = cjson.new() function Cplugin:_init_(resYaml, ffi, proto_q, so) self._ffi = ffi @@ -73,7 +75,7 @@ function Cplugin:load_log(unity_line, line) if #name > 0 then local log = self._ffi.string(unity_line.logs[0].log) self._ffi.C.free(unity_line.logs[0].log) -- should free from strdup - table.insert(line.log, {name = name, log = log}) + table.insert(line.log, {name = name, log = json.encode(log)}) end end diff --git a/source/tools/monitor/unity/collector/plugin/kmsg/kmsg.c b/source/tools/monitor/unity/collector/plugin/kmsg/kmsg.c index e8b028e2..ee6d41cb 100644 --- a/source/tools/monitor/unity/collector/plugin/kmsg/kmsg.c +++ b/source/tools/monitor/unity/collector/plugin/kmsg/kmsg.c @@ -79,6 +79,7 @@ static int kmsg_thread_func(struct beeQ* q, void * arg) { perror("kmsg read2 failed."); goto endRead; } + buff[ret -1] = '\0'; unity_alloc_lines(lines, 1); line = unity_get_line(lines, 0); -- Gitee From 52a8c5185c8a3216821fa774d12aabd82bbde583 Mon Sep 17 00:00:00 2001 From: liaozhaoyan Date: Mon, 17 Apr 2023 14:06:30 +0800 Subject: [PATCH 10/20] strip nouse print info. --- source/tools/monitor/unity/beeQ/proto_queue.lua | 1 - 1 file changed, 1 deletion(-) diff --git a/source/tools/monitor/unity/beeQ/proto_queue.lua b/source/tools/monitor/unity/beeQ/proto_queue.lua index b1597f3e..618fe3f4 100644 --- a/source/tools/monitor/unity/beeQ/proto_queue.lua +++ b/source/tools/monitor/unity/beeQ/proto_queue.lua @@ -79,7 +79,6 @@ function CprotoQueue:_proc(unity_lines, lines) end function CprotoQueue:send(num, pline) - --print(string.format("proto que send a %d message.", num)) local unity_lines = self._ffi.new("struct unity_lines") local lines = self._proto:protoTable() unity_lines.num = num -- Gitee From d1b93bfdb45f3866c1845862799ca56804e3e26a Mon Sep 17 00:00:00 2001 From: liaozhaoyan Date: Mon, 17 Apr 2023 17:25:27 +0800 Subject: [PATCH 11/20] add outLine.py. --- .../tools/monitor/unity/test/curl/outLine.py | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 source/tools/monitor/unity/test/curl/outLine.py diff --git a/source/tools/monitor/unity/test/curl/outLine.py b/source/tools/monitor/unity/test/curl/outLine.py new file mode 100644 index 00000000..1307c695 --- /dev/null +++ b/source/tools/monitor/unity/test/curl/outLine.py @@ -0,0 +1,23 @@ +import os +import socket + +PIPE_PATH = "/var/sysom/line" +MAX_BUFF = 128 * 1024 + + +class CnfPut(object): + def __init__(self, path=PIPE_PATH): + self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + self._path = path + if not os.path.exists(self._path): + raise ValueError("pipe path is not exist. please check Netinfo is running.") + + def puts(self, s): + if len(s) > MAX_BUFF: + raise ValueError("message len %d, is too long ,should less than%d" % (len(s), MAX_BUFF)) + return self._sock.sendto(s, self._path) + + +if __name__ == "__main__": + nf = CnfPut() + nf.puts('runtime,mode=java log="hello runtime."') -- Gitee From b3fe428afac41ab76311b82df44eee54cf775433 Mon Sep 17 00:00:00 2001 From: liaozhaoyan Date: Mon, 17 Apr 2023 20:19:48 +0800 Subject: [PATCH 12/20] trig execBase for engine. --- .../unity/collector/postEngine/engine.lua | 14 ++++-- .../unity/collector/postEngine/execBase.lua | 47 ++++++++++++------- .../tools/monitor/unity/test/curl/postCmd.lua | 15 ++++++ 3 files changed, 56 insertions(+), 20 deletions(-) create mode 100644 source/tools/monitor/unity/test/curl/postCmd.lua diff --git a/source/tools/monitor/unity/collector/postEngine/engine.lua b/source/tools/monitor/unity/collector/postEngine/engine.lua index 644ad2af..c3de1f82 100644 --- a/source/tools/monitor/unity/collector/postEngine/engine.lua +++ b/source/tools/monitor/unity/collector/postEngine/engine.lua @@ -11,6 +11,7 @@ local CvProto = require("collector.vproto") local pystring = require("common.pystring") local system = require("common.system") local cjson = require("cjson.safe") +local CexecBase = require("collector.postEngine.execBase") local Cengine = class("engine", CvProto) @@ -25,13 +26,20 @@ function Cengine:setTask(taskMons) self._task = taskMons end -function Cengine:pushTask(msgs) +function Cengine:pushTask(e, msgs) local events = pystring:split(msgs, '\n') for _, msg in ipairs(events) do print(msg) local res = cjson.decode(msg) - if res.cmd == "mon_pid" then + local cmd = res.cmd + if cmd == "mon_pid" then self._task:add(res.pid, res.loop) + elseif cmd == "exec" then -- exec a cmd + local execCmd = res.exec + local args = res.args or {} + local second = res.second or 1 + local exec = CexecBase.new(execCmd, args, second) + exec:addEvents(e) end end end @@ -45,7 +53,7 @@ function Cengine:proc(t, event, msgs) local bytes = self._proto:encode(lines) self._proto:que(bytes) - self:pushTask(msgs) + self:pushTask(event, msgs) end function Cengine:work(t, event) diff --git a/source/tools/monitor/unity/collector/postEngine/execBase.lua b/source/tools/monitor/unity/collector/postEngine/execBase.lua index 0199fd0d..88bc4d3a 100644 --- a/source/tools/monitor/unity/collector/postEngine/execBase.lua +++ b/source/tools/monitor/unity/collector/postEngine/execBase.lua @@ -7,47 +7,60 @@ require("common.class") local unistd = require("posix.unistd") local pwait = require("posix.sys.wait") -local signal = require(" posix.signal") +local signal = require("posix.signal") local pystring = require("common.pystring") local system = require("common.system") local CexecBase = class("execBase") local interval = 5 --- poll for every 5 second +local function run(cmd, args) + local pid, err = unistd.fork() + if pid > 0 then -- for self + print("pid: " .. pid) + return pid + elseif pid == 0 then -- for child + local errno + local execArgs = {} + for i, arg in ipairs(args) do + execArgs[i - 1] = arg -- arguments (table can include index 0) + end + _, err, errno = unistd.exec(cmd, execArgs) + assert(not errno, "exec failed." .. err .. errno) + else + error("fork report" .. err) + end +end + function CexecBase:_init_(cmd, args, seconds) - self._pid = 0 self.cmd = cmd - self._args = args self._cnt = 0 self._loop = seconds / interval -end -function CexecBase:run() - local pid, err = unistd.fork() - assert(pid, "fork report" .. err) - if pid > 0 then - self._pid = pid - self._cnt = 0 - else - local errno - _, err, errno = unistd.exec(self.cmd, self._args) - assert(not errno, "exec failed." .. err .. errno) - end + self._pid = run(cmd, args) end function CexecBase:addEvents(e) e:addEvent(self.cmd, self, interval, true, self._loop) end +local function kill(pid) + signal.signal(self._pid, signal.SIGKILL) -- force to kill task + pwait.wait(self._pid) -- wait task +end + function CexecBase:work() local cnt = self._cnt if cnt >= self._loop then local pid, stat, exit = pwait.wait(self._pid, pwait.WNOHANG) assert(pid, "wait failed " .. stat .. exit) if not exit then -- process not exit - signal.signal(self._pid, signal.SIGKILL) -- force to kill task - pwait.wait(self._pid) -- wait task + print("force to kill " .. self._pid) + kill(self._pid) end + return -1 -- delete from task list. end self._cnt = cnt + 1 end + +return CexecBase diff --git a/source/tools/monitor/unity/test/curl/postCmd.lua b/source/tools/monitor/unity/test/curl/postCmd.lua new file mode 100644 index 00000000..3bbf577a --- /dev/null +++ b/source/tools/monitor/unity/test/curl/postCmd.lua @@ -0,0 +1,15 @@ +--- +--- Generated by EmmyLua(https://github.com/EmmyLua) +--- Created by liaozhaoyan. +--- DateTime: 2023/4/17 19:55 +--- + +package.path = package.path .. ";../../?.lua;" +local system = require("common.system") +local ChttpCli = require("httplib.httpCli") + +local cli = ChttpCli.new() +local url = "http://127.0.0.1:8400/api/trig" +local req = {cmd = "exec", exec = "/usr/bin/pwd"} +local res = cli:postTable(url, req) +system:dumps(res) -- Gitee From 15321e07ec042c32d5a07b80ad09d8bd6f5b8d57 Mon Sep 17 00:00:00 2001 From: liaozhaoyan Date: Mon, 17 Apr 2023 22:50:16 +0800 Subject: [PATCH 13/20] add postPy test for execBase. --- .../unity/collector/postEngine/execBase.lua | 14 ++++++-------- source/tools/monitor/unity/test/curl/hello.py | 7 +++++++ source/tools/monitor/unity/test/curl/postPy.lua | 15 +++++++++++++++ 3 files changed, 28 insertions(+), 8 deletions(-) create mode 100644 source/tools/monitor/unity/test/curl/hello.py create mode 100644 source/tools/monitor/unity/test/curl/postPy.lua diff --git a/source/tools/monitor/unity/collector/postEngine/execBase.lua b/source/tools/monitor/unity/collector/postEngine/execBase.lua index 88bc4d3a..407b2ba6 100644 --- a/source/tools/monitor/unity/collector/postEngine/execBase.lua +++ b/source/tools/monitor/unity/collector/postEngine/execBase.lua @@ -21,11 +21,7 @@ local function run(cmd, args) return pid elseif pid == 0 then -- for child local errno - local execArgs = {} - for i, arg in ipairs(args) do - execArgs[i - 1] = arg -- arguments (table can include index 0) - end - _, err, errno = unistd.exec(cmd, execArgs) + _, err, errno = unistd.exec(cmd, args) assert(not errno, "exec failed." .. err .. errno) else error("fork report" .. err) @@ -45,15 +41,17 @@ function CexecBase:addEvents(e) end local function kill(pid) - signal.signal(self._pid, signal.SIGKILL) -- force to kill task - pwait.wait(self._pid) -- wait task + signal.kill(pid, signal.SIGKILL) -- force to kill task + pwait.wait(pid) -- wait task end function CexecBase:work() local cnt = self._cnt if cnt >= self._loop then local pid, stat, exit = pwait.wait(self._pid, pwait.WNOHANG) - assert(pid, "wait failed " .. stat .. exit) + if pid == nil then + error("wait failed " .. stat .. exit) + end if not exit then -- process not exit print("force to kill " .. self._pid) kill(self._pid) diff --git a/source/tools/monitor/unity/test/curl/hello.py b/source/tools/monitor/unity/test/curl/hello.py new file mode 100644 index 00000000..c497ca2f --- /dev/null +++ b/source/tools/monitor/unity/test/curl/hello.py @@ -0,0 +1,7 @@ +import time +import os + +print(os.getcwd()) +while True: + print("hello.") + time.sleep(3) diff --git a/source/tools/monitor/unity/test/curl/postPy.lua b/source/tools/monitor/unity/test/curl/postPy.lua new file mode 100644 index 00000000..8fb9ad14 --- /dev/null +++ b/source/tools/monitor/unity/test/curl/postPy.lua @@ -0,0 +1,15 @@ +--- +--- Generated by EmmyLua(https://github.com/EmmyLua) +--- Created by liaozhaoyan. +--- DateTime: 2023/4/17 20:21 +--- + +package.path = package.path .. ";../../?.lua;" +local system = require("common.system") +local ChttpCli = require("httplib.httpCli") + +local cli = ChttpCli.new() +local url = "http://127.0.0.1:8400/api/trig" +local req = {cmd = "exec", exec = "/usr/bin/python", args = {"../test/curl/hello.py",}} +local res = cli:postTable(url, req) +system:dumps(res) \ No newline at end of file -- Gitee From 14da19ac7e69d1c1cbad3ba28f74a8d1741f62a8 Mon Sep 17 00:00:00 2001 From: liaozhaoyan Date: Tue, 18 Apr 2023 17:29:03 +0800 Subject: [PATCH 14/20] delete fseek function for kmsg. --- .../monitor/unity/beaver/guide/testjs.html | 20 +++++ .../tools/monitor/unity/beeQ/proto_queue.lua | 2 +- .../tools/monitor/unity/collector/plugin.lua | 2 +- .../tools/monitor/unity/collector/plugin.yaml | 10 +-- .../unity/collector/plugin/kmsg/kmsg.c | 4 +- .../collector/plugin/virtout/virtout.bpf.c | 2 +- .../unity/collector/plugin/virtout/virtout.c | 6 +- .../tools/monitor/unity/common/lineParse.lua | 2 + .../tools/monitor/unity/httplib/httpComm.lua | 2 + .../tools/monitor/unity/httplib/influxCli.lua | 4 +- .../tools/monitor/unity/test/curl/influx.lua | 12 +-- .../tools/monitor/unity/test/curl/postPy.lua | 2 +- .../tools/monitor/unity/test/lab/jencode.lua | 13 +++ .../tools/monitor/unity/test/lab/kmsg/kmsg.c | 88 +++++++++++++++++++ 14 files changed, 146 insertions(+), 23 deletions(-) create mode 100644 source/tools/monitor/unity/beaver/guide/testjs.html create mode 100644 source/tools/monitor/unity/test/lab/jencode.lua create mode 100644 source/tools/monitor/unity/test/lab/kmsg/kmsg.c diff --git a/source/tools/monitor/unity/beaver/guide/testjs.html b/source/tools/monitor/unity/beaver/guide/testjs.html new file mode 100644 index 00000000..ac199064 --- /dev/null +++ b/source/tools/monitor/unity/beaver/guide/testjs.html @@ -0,0 +1,20 @@ + + + + + 菜鸟教程(runoob.com) + + + + +

我的第一个 JavaScript 程序

+

这是一个段落

+ + + + + \ No newline at end of file diff --git a/source/tools/monitor/unity/beeQ/proto_queue.lua b/source/tools/monitor/unity/beeQ/proto_queue.lua index 618fe3f4..91f29649 100644 --- a/source/tools/monitor/unity/beeQ/proto_queue.lua +++ b/source/tools/monitor/unity/beeQ/proto_queue.lua @@ -57,7 +57,7 @@ function CprotoQueue:load_log(unity_line, line) if #name > 0 then local log = self._ffi.string(unity_line.logs[0].log) self._ffi.C.free(unity_line.logs[0].log) -- should free from strdup - table.insert(line.log, {name = name, log = json.encode(log)}) + table.insert(line.log, {name = name, log = log}) end end diff --git a/source/tools/monitor/unity/collector/plugin.lua b/source/tools/monitor/unity/collector/plugin.lua index 216d1160..a7590294 100644 --- a/source/tools/monitor/unity/collector/plugin.lua +++ b/source/tools/monitor/unity/collector/plugin.lua @@ -75,7 +75,7 @@ function Cplugin:load_log(unity_line, line) if #name > 0 then local log = self._ffi.string(unity_line.logs[0].log) self._ffi.C.free(unity_line.logs[0].log) -- should free from strdup - table.insert(line.log, {name = name, log = json.encode(log)}) + table.insert(line.log, {name = name, log = log}) end end diff --git a/source/tools/monitor/unity/collector/plugin.yaml b/source/tools/monitor/unity/collector/plugin.yaml index 26be92f9..c9f982a6 100644 --- a/source/tools/monitor/unity/collector/plugin.yaml +++ b/source/tools/monitor/unity/collector/plugin.yaml @@ -24,11 +24,11 @@ config: outline: - /tmp/sysom -#pushTo: -# to: "Influx" -# host: "ld-wz9d17b514mg6kjkx-proxy-tsdb.lindorm.rds.aliyuncs.com" -# port: 8242 -# url: "/api/v2/write?db=lua" +pushTo: + to: "Influx" + host: "ld-wz9d17b514mg6kjkx-proxy-tsdb.lindorm.rds.aliyuncs.com" + port: 8242 + url: "/api/v2/write?db=lua" container: mode: "pods" diff --git a/source/tools/monitor/unity/collector/plugin/kmsg/kmsg.c b/source/tools/monitor/unity/collector/plugin/kmsg/kmsg.c index ee6d41cb..591159f9 100644 --- a/source/tools/monitor/unity/collector/plugin/kmsg/kmsg.c +++ b/source/tools/monitor/unity/collector/plugin/kmsg/kmsg.c @@ -41,12 +41,12 @@ static int kmsg_thread_func(struct beeQ* q, void * arg) { fd = open("/dev/kmsg", O_RDONLY | O_NONBLOCK); if (fd < 0) { goto endOpen; - } + }/* ret = lseek(fd, 0, SEEK_DATA); if (ret < 0) { perror("kmsg seek error"); goto endLseek; - } + }*/ ret = 1; // strip old message. while (ret > 0) { diff --git a/source/tools/monitor/unity/collector/plugin/virtout/virtout.bpf.c b/source/tools/monitor/unity/collector/plugin/virtout/virtout.bpf.c index ee86f871..25d3e612 100644 --- a/source/tools/monitor/unity/collector/plugin/virtout/virtout.bpf.c +++ b/source/tools/monitor/unity/collector/plugin/virtout/virtout.bpf.c @@ -12,7 +12,7 @@ #define KERN_STACKID_FLAGS (0 | BPF_F_FAST_STACK_CMP) #define PERIOD_TIME (10 * 1000 * 1000ULL) -#define THRESHOLD_TIME (150 * 1000 * 1000ULL) +#define THRESHOLD_TIME (200 * 1000 * 1000ULL) BPF_ARRAY(virtdist, u64, 20); BPF_PERF_OUTPUT(perf, 1024); diff --git a/source/tools/monitor/unity/collector/plugin/virtout/virtout.c b/source/tools/monitor/unity/collector/plugin/virtout/virtout.c index 070c483c..1c636ad4 100644 --- a/source/tools/monitor/unity/collector/plugin/virtout/virtout.c +++ b/source/tools/monitor/unity/collector/plugin/virtout/virtout.c @@ -213,7 +213,11 @@ int proc(int stack_fd, struct data_t *e, struct unity_line *line) { for (i = 0; i < 128; i ++) { if (addr[i] > 0) { cell = ksym_search(addr[i]); - strncat(log, cell->func, LOG_MAX); + if (cell != NULL) { + strncat(log, cell->func, LOG_MAX); + } else { + strncat(log, "!nil", LOG_MAX); + } strncat(log, ",", LOG_MAX); } else { break; diff --git a/source/tools/monitor/unity/common/lineParse.lua b/source/tools/monitor/unity/common/lineParse.lua index 925fecdd..a70e0d35 100644 --- a/source/tools/monitor/unity/common/lineParse.lua +++ b/source/tools/monitor/unity/common/lineParse.lua @@ -10,6 +10,8 @@ local system = require("common.system") local module = {} local json = cjson.new() +json.encode_escape_forward_slash(false) + local function parseLabel(sls, ls) local lss = pystring:split(sls, ",") for _, cell in ipairs(lss) do diff --git a/source/tools/monitor/unity/httplib/httpComm.lua b/source/tools/monitor/unity/httplib/httpComm.lua index 46a51516..5986a65b 100644 --- a/source/tools/monitor/unity/httplib/httpComm.lua +++ b/source/tools/monitor/unity/httplib/httpComm.lua @@ -15,6 +15,8 @@ local ChttpComm = class("httplib.httpComm") local cjson = require("cjson.safe") local json = cjson.new() +json.encode_escape_forward_slash(false) + local function codeTable() return { [100] = "Continue", diff --git a/source/tools/monitor/unity/httplib/influxCli.lua b/source/tools/monitor/unity/httplib/influxCli.lua index 7b8c6047..a1eef70c 100644 --- a/source/tools/monitor/unity/httplib/influxCli.lua +++ b/source/tools/monitor/unity/httplib/influxCli.lua @@ -9,8 +9,8 @@ local ChttpCli = require("httplib.httpCli") local CinfluxCli = class("influxCli", ChttpCli) -function CinfluxCli:_init_(url, db, user, pass, proxy) - self._url = string.format("%swrite?u=%s&p=%s&db=%s", url, user, pass, db) +function CinfluxCli:_init_(url, proxy) + self._url = url ChttpCli._init_(self, proxy) end diff --git a/source/tools/monitor/unity/test/curl/influx.lua b/source/tools/monitor/unity/test/curl/influx.lua index eb8ce154..188992c4 100644 --- a/source/tools/monitor/unity/test/curl/influx.lua +++ b/source/tools/monitor/unity/test/curl/influx.lua @@ -8,12 +8,6 @@ package.path = package.path .. ";../../?.lua;" local system = require("common.system") local CinfluxCli = require("httplib.influxCli") -local cli = CinfluxCli.new("http://172.24.90.148:8086/", "lua", "xxx", "xxxxxx") - -local v = 1.1 -while true do - local s = string.format("test,label=a v=%f", v) - cli:puts(s) - v = v + 1 - system:sleep(2) -end +local cli = CinfluxCli.new("http://ld-wz9d17b514mg6kjkx-proxy-tsdb.lindorm.rds.aliyuncs.com:8242/api/v2/write?db=lua") +local res = cli:puts('virtout,instance=self log="task:0(swapper/13), cpu:13, delayed:162994631, callstack:mwait_idle_with_hints.constprop.0,intel_idle,cpuidle_enter_state,cpuidle_enter,cpuidle_idle_call,do_idle,cpu_startup_entry,secondary_startup_64_no_verify,"') +system:dumps(res) diff --git a/source/tools/monitor/unity/test/curl/postPy.lua b/source/tools/monitor/unity/test/curl/postPy.lua index 8fb9ad14..d4085f0a 100644 --- a/source/tools/monitor/unity/test/curl/postPy.lua +++ b/source/tools/monitor/unity/test/curl/postPy.lua @@ -10,6 +10,6 @@ local ChttpCli = require("httplib.httpCli") local cli = ChttpCli.new() local url = "http://127.0.0.1:8400/api/trig" -local req = {cmd = "exec", exec = "/usr/bin/python", args = {"../test/curl/hello.py",}} +local req = {cmd = "exec", exec = "/usr/bin/python", args = {"../test/curl/hello.py",}, } local res = cli:postTable(url, req) system:dumps(res) \ No newline at end of file diff --git a/source/tools/monitor/unity/test/lab/jencode.lua b/source/tools/monitor/unity/test/lab/jencode.lua new file mode 100644 index 00000000..89fbb74d --- /dev/null +++ b/source/tools/monitor/unity/test/lab/jencode.lua @@ -0,0 +1,13 @@ +--- +--- Generated by EmmyLua(https://github.com/EmmyLua) +--- Created by liaozhaoyan. +--- DateTime: 2023/4/18 15:10 +--- + +local cjson = require("cjson.safe") +local json = cjson.new() +json.encode_escape_forward_slash(false) + +local log = "task:0(swapper/13), cpu:13, delayed:162994631, callstack:mwait_idle_with_hints.constprop.0,intel_idle,cpuidle_enter_state,cpuidle_enter,cpuidle_idle_call,do_idle,cpu_startup_entry,secondary_startup_64_no_verify," + +print(json.encode(log)) diff --git a/source/tools/monitor/unity/test/lab/kmsg/kmsg.c b/source/tools/monitor/unity/test/lab/kmsg/kmsg.c new file mode 100644 index 00000000..613fa03c --- /dev/null +++ b/source/tools/monitor/unity/test/lab/kmsg/kmsg.c @@ -0,0 +1,88 @@ +// +// Created by 廖肇燕 on 2023/4/18. +// + +#define _GNU_SOURCE + +#include +#include +#include +#include +#include +#include +#include +#define KMSG_LINE 8192 + +static int kmsg_set_block(int fd) { + int ret; + int flags = fcntl(fd, F_GETFL); + flags &= ~O_NONBLOCK; + ret = fcntl(fd, F_SETFL, flags); + return ret; +} + +int kmsg_thread_func(void) { + int fd; + int ret; + char buff[KMSG_LINE]; + + + fd = open("/dev/kmsg", O_RDONLY | O_NONBLOCK); + if (fd < 0) { + goto endOpen; + }/* + ret = lseek(fd, 0, SEEK_DATA); + if (ret < 0) { + perror("kmsg seek error"); + goto endLseek; + }*/ + + ret = 1; // strip old message. + while (ret > 0) { + ret = read(fd, buff, KMSG_LINE - 1); + buff[ret] = '\0'; + printf("%s\n", buff); + if (ret < 0) { + if (errno == EAGAIN) { + break; + } + perror("kmsg read failed."); + goto endRead; + } + } + + ret = kmsg_set_block(fd); + if (ret < 0) { + perror("kmsg set block failed."); + goto endBlock; + } + + while (1) { + + ret = read(fd, buff, KMSG_LINE - 1); + if (ret < 0) { + if (errno == EINTR) { + break; + } + perror("kmsg read2 failed."); + goto endRead; + } + buff[ret -1] = '\0'; + + printf("read: %s\n", buff); + } + + close(fd); + return 0; + endBlock: + endRead: + endLseek: + close(fd); + endOpen: + return 1; +} + +int main(void) { + kmsg_thread_func(); + return 0; +} \ No newline at end of file -- Gitee From 2329be275cbda9f822d6f791a313419f37886798 Mon Sep 17 00:00:00 2001 From: liaozhaoyan Date: Tue, 18 Apr 2023 23:36:27 +0800 Subject: [PATCH 15/20] strip print. --- .../unity/collector/io/exceptCheck.lua | 1 + .../unity/collector/plugin/virtout/virtout.c | 3 +- .../unity/test/lab/fileSync/fileSync.py | 87 +++++++++++++++++++ 3 files changed, 89 insertions(+), 2 deletions(-) create mode 100644 source/tools/monitor/unity/test/lab/fileSync/fileSync.py diff --git a/source/tools/monitor/unity/collector/io/exceptCheck.lua b/source/tools/monitor/unity/collector/io/exceptCheck.lua index 1f88fc47..3db20287 100644 --- a/source/tools/monitor/unity/collector/io/exceptCheck.lua +++ b/source/tools/monitor/unity/collector/io/exceptCheck.lua @@ -84,6 +84,7 @@ function CexecptCheck:calcs() end end end + system:dumps(self._diskItem) end function CexecptCheck:addValue(v) diff --git a/source/tools/monitor/unity/collector/plugin/virtout/virtout.c b/source/tools/monitor/unity/collector/plugin/virtout/virtout.c index 1c636ad4..1c8d601c 100644 --- a/source/tools/monitor/unity/collector/plugin/virtout/virtout.c +++ b/source/tools/monitor/unity/collector/plugin/virtout/virtout.c @@ -207,7 +207,7 @@ int proc(int stack_fd, struct data_t *e, struct unity_line *line) { int id = e->stack_id; //last stack struct ksym_cell* cell; - snprintf(log, LOG_MAX, "task:%d(%s), cpu:%d, delayed:%ld, callstack:", e->pid, e->comm, e->cpu, e->delta); + snprintf(log, LOG_MAX, "task:%d(%s);cpu:%d;delayed:%ld;callstack:", e->pid, e->comm, e->cpu, e->delta); coobpf_key_value(stack_fd, &id, &addr); for (i = 0; i < 128; i ++) { @@ -223,7 +223,6 @@ int proc(int stack_fd, struct data_t *e, struct unity_line *line) { break; } } - printf("log:%s\n", log); unity_set_table(line, "virtout_log"); unity_set_log(line, "log", log); return 0; diff --git a/source/tools/monitor/unity/test/lab/fileSync/fileSync.py b/source/tools/monitor/unity/test/lab/fileSync/fileSync.py new file mode 100644 index 00000000..37b73697 --- /dev/null +++ b/source/tools/monitor/unity/test/lab/fileSync/fileSync.py @@ -0,0 +1,87 @@ +# -*- coding: utf-8 -*- +""" +------------------------------------------------- + File Name: fileSync.py + Description : + Author : liaozhaoyan + date: 2023/4/18 +------------------------------------------------- + Change Activity: + 2023/4/18: +------------------------------------------------- +""" +import os +import sys + + +FILE_SYNC = 0x010000 +FILE_DIRECT = 0x040000 + + +def getFlags(fdPath): + try: + with open(fdPath) as f: + for i, line in enumerate(f): + if line.startswith("flags:"): + _, ret = line.split(":", 1) + return ret.strip() + except IOError: + return 0 + + +def getFdName(pid, fd): + fdPath = "/proc/%s/fd/%s" % (pid, fd) + return os.readlink(fdPath) + + +def getCmd(pid): + path = "/proc/%s/cmdline" % pid + try: + with open(path) as f: + return f.read() + except IOError: + return "nil" + + +def checkFile(fileLink): + if fileLink.startswith("/") and not fileLink.startswith("/dev"): + return True + return False + + +def checkFlag(pid, fd, flag): + vFlag = int(flag) + link = getFdName(pid, fd) + if checkFile(link): + if vFlag & FILE_SYNC: + print("pid:%s, cmd:%s, file:%s, set sync flag" % (pid, getCmd(pid), link)) + if vFlag & FILE_DIRECT: + print("pid:%s, cmd:%s, file:%s, set direct flag" % (pid, getCmd(pid), link)) + + +def checkPid(pid): + path = "/proc/" + pid + "/fdinfo" + for fd in os.listdir(path): + fdPath = "/".join([path, fd]) + try: + flag = getFlags(fdPath) + except IOError: + continue + checkFlag(pid, fd, flag) + + +def walkGroup(path): + path += "/tasks" + try: + with open(path) as f: + for i, line in enumerate(f): + checkPid(line.strip()) + except IOError: + print("bad path.") + + +if __name__ == "__main__": + path = "/sys/fs/cgroup/pids/kubepods/besteffort/slot_976/231a6b41a7be49fdcecac835bc1487272ba8319b8106692d2134c34b025931fa" + if len(sys.argv) > 1: + path = sys.argv[1] + walkGroup(path) -- Gitee From 77ecf44f526b4f9e788278d7840ce26d93d03500 Mon Sep 17 00:00:00 2001 From: liaozhaoyan Date: Tue, 18 Apr 2023 23:56:17 +0800 Subject: [PATCH 16/20] strip print data. --- source/tools/monitor/unity/collector/io/exceptCheck.lua | 1 - source/tools/monitor/unity/httplib/coHttpCli.lua | 1 - source/tools/monitor/unity/httplib/coInflux.lua | 5 +++-- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/source/tools/monitor/unity/collector/io/exceptCheck.lua b/source/tools/monitor/unity/collector/io/exceptCheck.lua index 3db20287..1f88fc47 100644 --- a/source/tools/monitor/unity/collector/io/exceptCheck.lua +++ b/source/tools/monitor/unity/collector/io/exceptCheck.lua @@ -84,7 +84,6 @@ function CexecptCheck:calcs() end end end - system:dumps(self._diskItem) end function CexecptCheck:addValue(v) diff --git a/source/tools/monitor/unity/httplib/coHttpCli.lua b/source/tools/monitor/unity/httplib/coHttpCli.lua index 461689be..b8d40a73 100644 --- a/source/tools/monitor/unity/httplib/coHttpCli.lua +++ b/source/tools/monitor/unity/httplib/coHttpCli.lua @@ -514,7 +514,6 @@ function CcoHttpCli:work(cffi, efd) self.online = os.time() while true do local body = coroutine.yield() - --print("-- " .. type(msg)) self.status = enumStat.sending local s = self:pack(body) if not self:coWrite(cffi, efd, fd, s) then diff --git a/source/tools/monitor/unity/httplib/coInflux.lua b/source/tools/monitor/unity/httplib/coInflux.lua index b808e3fd..6b00a4c9 100644 --- a/source/tools/monitor/unity/httplib/coInflux.lua +++ b/source/tools/monitor/unity/httplib/coInflux.lua @@ -16,7 +16,9 @@ function CcoInflux:_init_(fYaml) end function CcoInflux:echo(tReq) - print(tReq.code, tReq.data) + if tReq.code ~= "204" then + print(tReq.code, tReq.data) + end end function CcoInflux:trans(msgs, body, filter) @@ -41,7 +43,6 @@ function CcoInflux:trans(msgs, body, filter) end function CcoInflux:pack(body) - print(#body) local line = self:packCliHead('POST', self._url) local head = { Host = self._host, -- Gitee From 39ddd583c35a141744458a6946ebc8b8ea88eb3f Mon Sep 17 00:00:00 2001 From: liaozhaoyan Date: Fri, 21 Apr 2023 13:31:20 +0800 Subject: [PATCH 17/20] add more space for virtout. --- .../unity/collector/guard/guardSelfStat.lua | 28 ++- .../monitor/unity/collector/io/diskFifo.lua | 35 +++- .../unity/collector/io/exceptCheck.lua | 162 +++++++++++++++++- .../unity/collector/io/io_diagnose.lua | 14 +- .../tools/monitor/unity/collector/plugin.yaml | 6 +- .../unity/collector/plugin/virtout/virtout.c | 2 +- source/tools/monitor/unity/common/fifo.lua | 4 + 7 files changed, 224 insertions(+), 27 deletions(-) diff --git a/source/tools/monitor/unity/collector/guard/guardSelfStat.lua b/source/tools/monitor/unity/collector/guard/guardSelfStat.lua index 0564b274..bbdc5e3d 100644 --- a/source/tools/monitor/unity/collector/guard/guardSelfStat.lua +++ b/source/tools/monitor/unity/collector/guard/guardSelfStat.lua @@ -34,6 +34,21 @@ function CguardSelfStat:_init_(proto, pffi, mnt, resYaml, jperiod) self._memLimit = resYaml.config.limit.mem * 1024 * 1024 end +local function rssRssAnon() + local anon = 0 + + local f = io.open("/proc/self/status") + for line in f:lines() do + if pystring:startswith(line, "RssAnon:") then + local res = pystring:split(line) + anon = tonumber(res[2]) * 1024 + end + end + f:close() + + return anon +end + function CguardSelfStat:proc(elapsed, lines) CvProc.proc(self) @@ -46,7 +61,9 @@ function CguardSelfStat:proc(elapsed, lines) print("last cpu usage overflow." .. cpus) os.exit(1) end - if rss * 4096 > self._memLimit then + + local anon = rssRssAnon() + if anon > self._memLimit then print("last mem usage overflow." .. rss) os.exit(1) end @@ -62,12 +79,15 @@ function CguardSelfStat:proc(elapsed, lines) { name = "vsize", value = vsize - } - , + }, { name = "rss", value = rss * 4096 - } + }, + { + name = "anon", + value = anon, + }, } self:appendLine(self:_packProto("self_stat", nil, vs)) self:push(lines) diff --git a/source/tools/monitor/unity/collector/io/diskFifo.lua b/source/tools/monitor/unity/collector/io/diskFifo.lua index dc3e4a7a..ac57e6ed 100644 --- a/source/tools/monitor/unity/collector/io/diskFifo.lua +++ b/source/tools/monitor/unity/collector/io/diskFifo.lua @@ -5,29 +5,42 @@ --- require("common.class") +local system = require("common.system") local Cfifo = require("common.fifo") local CdiskFifo = class("diskFifo", Cfifo) function CdiskFifo:_init_(maxLen) Cfifo._init_(self, maxLen) + self._nr = 0 +end + +function CdiskFifo:push(v) + Cfifo.push(self, v) + self._nr = self._nr + 1 end function CdiskFifo:iowait() local sum = 0 local value local cells = {} - local count = self._count + local len = self:len() + local c = 0 - if #self.list < count then + if len < self:capacity() then return end - for i, v in ipairs(self.list) do + for _, v in pairs(self.list) do + c = c + 1 value = v.iowait - cells[i], sum = value, sum + value + cells[c], sum = value, sum + value end - return {max = math.max(unpack(cells)), min = math.min(unpack(cells)), count = count, avg = sum / count} + return {max = math.max(unpack(cells)), + min = math.min(unpack(cells)), + nr = self._nr, + avg = sum / len, + last = value} end function CdiskFifo:values(disk, key) @@ -35,14 +48,14 @@ function CdiskFifo:values(disk, key) local sum = 0 local value local cells = {} - local count = self._count + local len = self:len() - if #self.list < count then + if len < self:capacity() then return end local d - for _, v in ipairs(self.list) do + for _, v in pairs(self.list) do d = v[disk] if d then value = d[key] @@ -52,7 +65,11 @@ function CdiskFifo:values(disk, key) return end end - return {max = math.max(unpack(cells)), min = math.min(unpack(cells)), count = count, avg = sum / count} + return {max = math.max(unpack(cells)), + min = math.min(unpack(cells)), + nr = self._nr, + avg = sum / len, + last = value} end return CdiskFifo diff --git a/source/tools/monitor/unity/collector/io/exceptCheck.lua b/source/tools/monitor/unity/collector/io/exceptCheck.lua index 1f88fc47..8947e0bd 100644 --- a/source/tools/monitor/unity/collector/io/exceptCheck.lua +++ b/source/tools/monitor/unity/collector/io/exceptCheck.lua @@ -30,6 +30,7 @@ local function addItem() curWinMinVal = 1e9, curWinMaxVal = 0, moveAvg = 0, + last = 0, thresh = 0 }, compensation = { @@ -55,24 +56,128 @@ local function addItems() return ret end -function CexecptCheck:_init_() +function CexecptCheck:_init_(diag) + self._diag = diag + self._fifo = CdiskFifo.new(fifoSize) self._waitItem = addItem() self._diskItem = {} + + self._cpuStatIowait = {sum = 0, iowait = 0} + self._uploadInter = 0 + self._exceptionStat = {system = {['IOwait-High'] = {cur = 0, max = 0}}} + self._dataStat = {system = {iowait= 0}} + + self._diagSwitch = { + diagIowait = { sw = self._diag.cfg.diagIowait, + esi = 'IOwait-High'}, + diagIoburst = { sw = self._diag.cfg.diagIoburst, + esi ='IO-Delay'}, + diagIolat = {sw = self._diag.cfg.diagIolat, + esi = 'IO-Burst'}, + diagIohang = {sw = self._diag.cfg.diagIohang, + esi = 'IO-Hang'} + } end -local function calc(item, vs) +local function calcBase(item, vs) local bt = item.baseThresh - bt.curWinMinVal = vs.min - bt.curWinMaxVal = vs.max - bt.moveAvg = vs.arg - bt.nrSample = vs.count + + local min, max, avg, nr = vs.min, vs.max, vs.avg, vs.nr + + bt.nrSample = nr + bt.curWinMinVal = math.min(min, bt.curWinMinVal) + bt.curWinMaxVal = math.max(max, bt.curWinMaxVal) + bt.last = vs.last + + local nrThreshSample = nr + 1 - fifoSize + local thresh = math.max(max - avg, avg - min) + local threshAvg = (bt.thresh * (nrThreshSample - 1) + thresh) / nrThreshSample + bt.thresh = threshAvg + bt.moveAvg = avg + + local usedWin = item.usedWin + usedWin = usedWin + 1 + if usedWin >= fifoSize then + bt.curWinMinVal = 1e9 + bt.curWinMaxVal = 0 + item.usedWin = 0 + else + item.usedWin = usedWin + end + return thresh +end + +local function calcStableThresh(ct, curBaseThresh, curThresh) + local avg = ct.decRangeThreshAvg + + if (curThresh - avg) < ((curBaseThresh - avg) / 10.0) then + local nrStableThreshSample = ct.nrStableThreshSample + + local tSum = ct.stableThreshAvg * ct.nrStableThreshSample + curThresh + nrStableThreshSample = nrStableThreshSample + 1 + + ct.nrStableThreshSample = nrStableThreshSample + ct.stableThreshAvg = tSum / nrStableThreshSample + ct.minStableThresh = math.min(ct.minStableThresh, curThresh) + ct.maxStableThresh = math.max(ct.maxStableThresh, curThresh) + + if nrStableThreshSample > fifoSize * 1.5 then + ct.thresh = math.max(ct.stableThreshAvg - ct.minStableThresh, + ct.maxStableThresh - ct.stableThreshAvg) + ct.shouldUpdThreshComp = false + ct.minStableThresh = 1e9 + ct.maxStableThresh = 0 + ct.stableThreshAvg, ct.decRangeThreshAvg = 0, 0 + ct.nrStableThreshSample, ct.decRangeCnt = 0, 0 + end + end +end + +local function calcCompThresh(item, lastBaseThresh, curThresh) + local curBaseThresh = item.baseThresh.thresh + local ct = item.compensation + + if ct.shouldUpdThreshComp and (ct.thresh < curThresh or item.usedWin == 0) then + ct.thresh = curThresh + end + + if curBaseThresh < lastBaseThresh then + local decRangeCnt = ct.decRangeCnt + local decRangeThreshAvg = ct.decRangeThreshAvg + local tSum = decRangeThreshAvg * decRangeCnt + curThresh + + decRangeCnt = decRangeCnt + 1 + ct.decRangeThreshAvg = tSum / decRangeCnt + ct.decRangeCnt = decRangeCnt + + if decRangeCnt >= fifoSize * 1.5 then + calcStableThresh(ct, curBaseThresh, curThresh) + else + ct.minStableThresh = 1e9 + ct.maxStableThresh = 0 + ct.stableThreshAvg, ct.decRangeThreshAvg = 0, 0 + ct.nrStableThreshSample, ct.decRangeCnt = 0, 0 + end + end +end + +local function updateDynThresh(item, vs) + local bt = item.baseThresh + local ct = item.compensation + local lastBaseThresh = bt.thresh + local curThresh = calcBase(item, vs) + + calcCompThresh(item, lastBaseThresh, curThresh) + item.dynTresh = bt.thresh + bt.moveAvg + ct.thresh + --print("thresh: ", item.dynTresh) end function CexecptCheck:calcs() local iowaits = self._fifo:iowait() if iowaits then - calc(self._waitItem, iowaits) + updateDynThresh(self._waitItem, iowaits) + self:checkIOwaitException(self._waitItem) end local vs @@ -80,7 +185,7 @@ function CexecptCheck:calcs() for _, key in ipairs(keys) do vs = self._fifo:values(disk, key) if vs then - calc(item[key], vs) + updateDynThresh(item[key], vs) end end end @@ -103,4 +208,45 @@ function CexecptCheck:addValue(v) self:calcs() end +local function disableThreshComp(item) + local ct = item.compensation + local bt = item.baseThresh + if ct.shouldUpdThreshComp then + ct.shouldUpdThreshComp = false + item.dynTresh = bt.thresh + bt.moveAvg + ct.thresh = 0.000001 + end +end + +function CexecptCheck:checkIOwaitException(item) + local iowait = item.baseThresh.last + local dataStat = self._dataStat.system + local es = self._exceptionStat.system['IOwait-High'] + local uploadInter = self._uploadInter + + if iowait >= self._diag.cfg.iowait then + disableThreshComp(item) + end + + dataStat.iowait = (dataStat.iowait * (uploadInter - 1) + iowait) / uploadInter + + local minThresh = self._diag.cfg.iowait + local iowaitThresh = math.max(item.dynTresh, minThresh) + if minThresh >= iowaitThresh then + es.cur = es.cur + 1 + local diagSW = self._diagSwitch.diagIowait + local rDiagValid = nil + end +end + +function CexecptCheck:checks() + local uploadInter = self._uploadInter + + if uploadInter % fifoSize == 0 then + self._uploadInter = 1 + else + self._uploadInter = uploadInter + 1 + end +end + return CexecptCheck diff --git a/source/tools/monitor/unity/collector/io/io_diagnose.lua b/source/tools/monitor/unity/collector/io/io_diagnose.lua index a26cdec7..5725acd9 100644 --- a/source/tools/monitor/unity/collector/io/io_diagnose.lua +++ b/source/tools/monitor/unity/collector/io/io_diagnose.lua @@ -58,16 +58,21 @@ end function CioDiagnose:_init_(que, proto_q, fYaml, tid) CvProto._init_(self, CprotoData.new(que)) self._tid = tid + local res = system:parseYaml(fYaml) - self.fStat = res.config.proc_path .. "proc/stat" - self.fDiskStat = res.config.proc_path .. "proc/diskstats" - self.dirSys = res.config.proc_path .. "sys/block/" + local proc_path = res.config.proc_path + self.fStat = proc_path .. "proc/stat" + self.fDiskStat = proc_path .. "proc/diskstats" + self.dirSys = proc_path .. "sys/block/" + self._lastCpuTotal, self._lastCpuIO = readIoWait(self.fStat) self._disks = {} self._disksLast = {} self:readProc() self:storeProc() - self._check = CexceptCheck.new() + + local diag = res.ioDiagnose + self._check = CexceptCheck.new(diag) end function CioDiagnose:readProc() @@ -112,6 +117,7 @@ function CioDiagnose:work(t) self._lastCpuTotal, self._lastCpuIO = cpuTotal, cpuIO self:readProc() + self._check:checks() local res = self:diff(t, iowait) self._check:addValue(res) self:storeProc() diff --git a/source/tools/monitor/unity/collector/plugin.yaml b/source/tools/monitor/unity/collector/plugin.yaml index c9f982a6..daf65fdf 100644 --- a/source/tools/monitor/unity/collector/plugin.yaml +++ b/source/tools/monitor/unity/collector/plugin.yaml @@ -21,8 +21,12 @@ config: mem: 40 # unit mb tasks: 10 # monitor 10 pid max. +ioDiagnose: + cfg: + iowait: 10.0 + outline: - - /tmp/sysom + - /var/sysom/outline pushTo: to: "Influx" diff --git a/source/tools/monitor/unity/collector/plugin/virtout/virtout.c b/source/tools/monitor/unity/collector/plugin/virtout/virtout.c index 1c8d601c..e3b32702 100644 --- a/source/tools/monitor/unity/collector/plugin/virtout/virtout.c +++ b/source/tools/monitor/unity/collector/plugin/virtout/virtout.c @@ -198,7 +198,7 @@ void deinit(void) DESTORY_SKEL_BOJECT(virtout); } -#define LOG_MAX 256 +#define LOG_MAX 4096 static char log[LOG_MAX]; int proc(int stack_fd, struct data_t *e, struct unity_line *line) { diff --git a/source/tools/monitor/unity/common/fifo.lua b/source/tools/monitor/unity/common/fifo.lua index 352d4d2f..76dfd307 100644 --- a/source/tools/monitor/unity/common/fifo.lua +++ b/source/tools/monitor/unity/common/fifo.lua @@ -46,6 +46,10 @@ function Cfifo:len() return self._count end +function Cfifo:capacity() + return self._max +end + function Cfifo:value(index) index = index + self._head - 1 return self.list[index] -- Gitee From 68cb7ae99a15d8b84242967dbc7be4e15592b90a Mon Sep 17 00:00:00 2001 From: liaozhaoyan Date: Fri, 21 Apr 2023 19:47:03 +0800 Subject: [PATCH 18/20] strip io-diagnose. --- .../tools/monitor/unity/beeQ/collectors.lua | 9 +--- .../tools/monitor/unity/test/lab/hashList.lua | 48 +++++++++++++++++++ 2 files changed, 50 insertions(+), 7 deletions(-) create mode 100644 source/tools/monitor/unity/test/lab/hashList.lua diff --git a/source/tools/monitor/unity/beeQ/collectors.lua b/source/tools/monitor/unity/beeQ/collectors.lua index 5e97d982..6c3d13e2 100644 --- a/source/tools/monitor/unity/beeQ/collectors.lua +++ b/source/tools/monitor/unity/beeQ/collectors.lua @@ -121,11 +121,7 @@ local function setupPostEngine(que, proto_q, fYaml, tid) return w, 1 end -local function setupIODiagnose(que, proto_q, fYaml, tid) - local CioDiagnose = require("collector.io.io_diagnose") - local w = CioDiagnose.new(que, proto_q, fYaml, tid) - return w, 1 -end + function work(que, proto_q, yaml, tid) local fYaml = yaml or "../collector/plugin.yaml" @@ -140,7 +136,6 @@ function work(que, proto_q, yaml, tid) engine:setTask(main.postPlugin.tasks) e:addEvent("postEngine", engine, unit) - io, unit = setupIODiagnose(que, proto_q, fYaml, tid) - e:addEvent("mainCollector", io, unit, true) + return e:proc() end diff --git a/source/tools/monitor/unity/test/lab/hashList.lua b/source/tools/monitor/unity/test/lab/hashList.lua new file mode 100644 index 00000000..b9dab8fb --- /dev/null +++ b/source/tools/monitor/unity/test/lab/hashList.lua @@ -0,0 +1,48 @@ +--- +--- Generated by EmmyLua(https://github.com/EmmyLua) +--- Created by liaozhaoyan. +--- DateTime: 2023/4/21 18:25 +--- + +local aList = {'a', 'ab', 'abc'} +print(#aList, table.getn(aList)) +for _, v in ipairs(aList) do + print(v) +end + +aList[2] = nil +print(#aList, table.getn(aList)) +for _, v in ipairs(aList) do + print(v) +end + +for k, v in pairs(aList) do + print(k, v) +end + +local bList = { + a = 1, + b = 2, + c = 3 +} + +bList.b = nil + +print("list.") +aList = {'a', 'ab', 'abc'} +for i, v in ipairs(aList) do + print(v) + if i == 2 then + table.remove(aList, i) + end +end + +print("list reverse.") +aList = {'a', 'ab', 'abc'} +for i = #aList, 1, -1 do + print(aList[i]) + if i == 2 then + table.remove(aList, i) + end +end + -- Gitee From dc46550dd272d0eeeff2af2f99cc3a2f2939bbf0 Mon Sep 17 00:00:00 2001 From: liaozhaoyan Date: Mon, 24 Apr 2023 01:22:02 +0800 Subject: [PATCH 19/20] add forkRun function. --- .../monitor/unity/beaver/localBeaver.lua | 2 +- source/tools/monitor/unity/beeQ/apps.c | 7 ++ .../tools/monitor/unity/beeQ/collectors.lua | 5 +- .../unity/collector/execEngine/forkRun.lua | 73 +++++++++++++++++++ .../unity/collector/guard/guardSched.lua | 11 ++- source/tools/monitor/unity/collector/loop.lua | 12 +++ .../tools/monitor/unity/collector/plugin.yaml | 7 +- .../unity/collector/postEngine/execBase.lua | 1 + .../tools/monitor/unity/test/curl/forkRun.py | 17 +++++ .../tools/monitor/unity/test/curl/outLine.py | 2 +- 10 files changed, 127 insertions(+), 10 deletions(-) create mode 100644 source/tools/monitor/unity/collector/execEngine/forkRun.lua create mode 100644 source/tools/monitor/unity/test/curl/forkRun.py diff --git a/source/tools/monitor/unity/beaver/localBeaver.lua b/source/tools/monitor/unity/beaver/localBeaver.lua index e2815938..548bd94a 100644 --- a/source/tools/monitor/unity/beaver/localBeaver.lua +++ b/source/tools/monitor/unity/beaver/localBeaver.lua @@ -162,7 +162,7 @@ function CLocalBeaver:_install_fd(port, ip, backlog) end function CLocalBeaver:read(fd, maxLen) - maxLen = maxLen or 1 * 1024 * 1024 -- signal conversation accept 1M stream max + maxLen = maxLen or 2 * 1024 * 1024 -- signal conversation accept 2M stream max local function readFd() local e = coroutine.yield() if e.ev_close > 0 then diff --git a/source/tools/monitor/unity/beeQ/apps.c b/source/tools/monitor/unity/beeQ/apps.c index ca81e6be..40e37b57 100644 --- a/source/tools/monitor/unity/beeQ/apps.c +++ b/source/tools/monitor/unity/beeQ/apps.c @@ -7,6 +7,7 @@ #include #include #include +#include #include "beeQ.h" #include "apps.h" #include "pushTo.h" @@ -241,6 +242,11 @@ static int lua_setup_daemon(lua_State *L) { return 1; } +static int prctl_death_kill(lua_State *L) { + prctl(PR_SET_PDEATHSIG, SIGKILL); + return 0; +} + int collector_qout(lua_State *L) { int ret; struct beeQ* q = (struct beeQ*) lua_topointer(L, 1); @@ -281,6 +287,7 @@ static int app_collector_work(void* q, void* proto_q) { lua_register(L, "collector_qout", collector_qout); lua_register(L, "lua_local_clock", lua_local_clock); lua_register(L, "lua_setup_daemon", lua_setup_daemon); + lua_register(L, "prctl_death_kill", prctl_death_kill); ret = lua_load_do_file(L, "../beeQ/collectors.lua"); if (ret) { diff --git a/source/tools/monitor/unity/beeQ/collectors.lua b/source/tools/monitor/unity/beeQ/collectors.lua index 6c3d13e2..10206def 100644 --- a/source/tools/monitor/unity/beeQ/collectors.lua +++ b/source/tools/monitor/unity/beeQ/collectors.lua @@ -121,14 +121,12 @@ local function setupPostEngine(que, proto_q, fYaml, tid) return w, 1 end - - function work(que, proto_q, yaml, tid) local fYaml = yaml or "../collector/plugin.yaml" checkSos() local e = CrbEvent.new() - local main, engine, io, unit + local main, engine, unit main, unit = setupMainCollector(que, proto_q, fYaml, tid) e:addEvent("mainCollector", main, unit) @@ -136,6 +134,5 @@ function work(que, proto_q, yaml, tid) engine:setTask(main.postPlugin.tasks) e:addEvent("postEngine", engine, unit) - return e:proc() end diff --git a/source/tools/monitor/unity/collector/execEngine/forkRun.lua b/source/tools/monitor/unity/collector/execEngine/forkRun.lua new file mode 100644 index 00000000..5ca34e9d --- /dev/null +++ b/source/tools/monitor/unity/collector/execEngine/forkRun.lua @@ -0,0 +1,73 @@ +--- +--- Generated by EmmyLua(https://github.com/EmmyLua) +--- Created by liaozhaoyan. +--- DateTime: 2023/4/22 01:34 +--- + +--- +--- Generated by EmmyLua(https://github.com/EmmyLua) +--- Created by liaozhaoyan. +--- DateTime: 2023/4/12 00:01 +--- + +require("common.class") +local unistd = require("posix.unistd") +local pwait = require("posix.sys.wait") +local signal = require("posix.signal") +local pystring = require("common.pystring") +local system = require("common.system") +local CvProc = require("collector.vproc") + +local CforkRun = class("execBase", CvProc) + +local function run(cmd, args) + local pid, err = unistd.fork() + if pid > 0 then -- for self + return pid + elseif pid == 0 then -- for child + local errno + prctl_death_kill() + _, err, errno = unistd.exec(cmd, args) + assert(not errno, "exec failed." .. err .. errno) + else + error("fork report" .. err) + end +end + +local function kill(pid) + signal.kill(pid, signal.SIGKILL) -- force to kill task + pwait.wait(pid) -- wait task +end + +function CforkRun:_init_(opts, proto, pffi, mnt) + CvProc._init_(self, proto, pffi, mnt) + self._pid = run(opts.cmd, opts.args) + self._opts = opts + + print("fork run: ", self._pid) +end + +function CforkRun:_del_() + if self._pid then + kill(self._pid) + end + print("kill " .. self._pid) +end + +function CforkRun:proc(elapsed, lines) + local pid, stat, exit = pwait.wait(self._pid, pwait.WNOHANG) + if pid == nil then + error("wait failed " .. stat .. exit) + elseif exit then + print("mon thread exit " .. self._pid) + self._pid = nil + return -1 + end +end + +local function kill(pid) + signal.kill(pid, signal.SIGKILL) -- force to kill task + pwait.wait(pid) -- wait task +end + +return CforkRun diff --git a/source/tools/monitor/unity/collector/guard/guardSched.lua b/source/tools/monitor/unity/collector/guard/guardSched.lua index b8a73fd6..cccbc15e 100644 --- a/source/tools/monitor/unity/collector/guard/guardSched.lua +++ b/source/tools/monitor/unity/collector/guard/guardSched.lua @@ -24,9 +24,16 @@ function CguardSched:proc(t, lines) local start = lua_local_clock() -- unit us local stop = 0 local j1 = self._stat:jiffies() + local ret for i, obj in ipairs(self._procs) do - obj:proc(t, lines) + ret = obj:proc(t, lines) + + if ret == -1 then + table.insert(toRemove, i) + goto continue + end + stop = lua_local_clock() if stop - start >= self._limit then -- local j2 = self._stat:jiffies() @@ -35,6 +42,8 @@ function CguardSched:proc(t, lines) end end start = stop + + ::continue:: end if #toRemove > 0 then diff --git a/source/tools/monitor/unity/collector/loop.lua b/source/tools/monitor/unity/collector/loop.lua index 7ea66311..933c87b8 100644 --- a/source/tools/monitor/unity/collector/loop.lua +++ b/source/tools/monitor/unity/collector/loop.lua @@ -14,6 +14,7 @@ local CguardSched = require("collector.guard.guardSched") local CguardDaemon = require("collector.guard.guardDaemon") local CguardSelfStat = require("collector.guard.guardSelfStat") local CpostPlugin = require("collector.postPlugin.postPlugin") +local CforkRun = require("collector.execEngine.forkRun") local Cloop = class("loop") @@ -24,6 +25,7 @@ function Cloop:_init_(que, proto_q, fYaml, tid) self._proto = CprotoData.new(que) self._tid = tid self:loadLuaPlugin(res, res.config.proc_path) + self:forkRun(res) local jperiod = calcJiffies.calc(res.config.proc_path, procffi) -- self._guardSched = CguardSched.new(tid, self._procs, self._names, jperiod) @@ -49,6 +51,16 @@ function Cloop:loadLuaPlugin(res, proc_path) print("add " .. system:keyCount(self._procs) .. " lua plugin.") end +function Cloop:forkRun(res) + local runs = res.forkRun + local c = system:keyCount(self._procs) + for _, run in ipairs(runs) do + c = c + 1 + self._procs[c] = CforkRun.new(run, self._proto, procffi) + self._names[c] = run.cmd + end +end + function Cloop:work(t) local lines = self._proto:protoTable() diff --git a/source/tools/monitor/unity/collector/plugin.yaml b/source/tools/monitor/unity/collector/plugin.yaml index daf65fdf..45fda75b 100644 --- a/source/tools/monitor/unity/collector/plugin.yaml +++ b/source/tools/monitor/unity/collector/plugin.yaml @@ -21,9 +21,10 @@ config: mem: 40 # unit mb tasks: 10 # monitor 10 pid max. -ioDiagnose: - cfg: - iowait: 10.0 +forkRun: + - + cmd: "/usr/bin/python" + args: ["../test/curl/forkRun.py"] outline: - /var/sysom/outline diff --git a/source/tools/monitor/unity/collector/postEngine/execBase.lua b/source/tools/monitor/unity/collector/postEngine/execBase.lua index 407b2ba6..59c3ac40 100644 --- a/source/tools/monitor/unity/collector/postEngine/execBase.lua +++ b/source/tools/monitor/unity/collector/postEngine/execBase.lua @@ -21,6 +21,7 @@ local function run(cmd, args) return pid elseif pid == 0 then -- for child local errno + prctl_death_kill() _, err, errno = unistd.exec(cmd, args) assert(not errno, "exec failed." .. err .. errno) else diff --git a/source/tools/monitor/unity/test/curl/forkRun.py b/source/tools/monitor/unity/test/curl/forkRun.py new file mode 100644 index 00000000..7ab0ceb8 --- /dev/null +++ b/source/tools/monitor/unity/test/curl/forkRun.py @@ -0,0 +1,17 @@ + +import time +from outLine import CnfPut + + +def loop(nf): + i = 1 + while True: + nf.puts('forkRun,mode=java log="hello runtime.",count=%d' % i) + i += 1 + time.sleep(15) + + +if __name__ == "__main__": + time.sleep(2) + nf = CnfPut() + loop(nf) \ No newline at end of file diff --git a/source/tools/monitor/unity/test/curl/outLine.py b/source/tools/monitor/unity/test/curl/outLine.py index 1307c695..cd5ad58c 100644 --- a/source/tools/monitor/unity/test/curl/outLine.py +++ b/source/tools/monitor/unity/test/curl/outLine.py @@ -1,7 +1,7 @@ import os import socket -PIPE_PATH = "/var/sysom/line" +PIPE_PATH = "/var/sysom/outline" MAX_BUFF = 128 * 1024 -- Gitee From ec497feb9068b33be45359160768af41d3aeda49 Mon Sep 17 00:00:00 2001 From: liaozhaoyan Date: Mon, 24 Apr 2023 17:11:48 +0800 Subject: [PATCH 20/20] add http line post. --- source/tools/monitor/unity/beaver/beaver.c | 15 +++-- source/tools/monitor/unity/beaver/beaver.h | 3 +- source/tools/monitor/unity/beaver/beaver.lua | 5 +- .../tools/monitor/unity/beaver/pushLine.lua | 58 +++++++++++++++++++ source/tools/monitor/unity/beaver/url_api.lua | 14 ++++- source/tools/monitor/unity/beeQ/bees.c | 2 +- .../tools/monitor/unity/httplib/httpApp.lua | 5 +- .../tools/monitor/unity/httplib/httpBase.lua | 6 +- .../tools/monitor/unity/httplib/httpCli.lua | 8 +++ .../tools/monitor/unity/httplib/httpHtml.lua | 9 +-- .../tools/monitor/unity/httplib/httpPlain.lua | 5 +- .../monitor/unity/test/curl/postLine.lua | 14 +++++ .../tools/monitor/unity/test/curl/postLine.py | 6 ++ 13 files changed, 127 insertions(+), 23 deletions(-) create mode 100644 source/tools/monitor/unity/beaver/pushLine.lua create mode 100644 source/tools/monitor/unity/test/curl/postLine.lua create mode 100644 source/tools/monitor/unity/test/curl/postLine.py diff --git a/source/tools/monitor/unity/beaver/beaver.c b/source/tools/monitor/unity/beaver/beaver.c index 07392729..8cb6b856 100644 --- a/source/tools/monitor/unity/beaver/beaver.c +++ b/source/tools/monitor/unity/beaver/beaver.c @@ -18,13 +18,14 @@ extern int lua_reg_errFunc(lua_State *L); extern int lua_check_ret(int ret); int lua_load_do_file(lua_State *L, const char* path); -static int call_init(lua_State *L, int err_func, char *fYaml) { +static int call_init(lua_State *L, int err_func, struct beeQ* q, char *fYaml) { int ret; lua_Number lret; lua_getglobal(L, "init"); + lua_pushlightuserdata(L, q); lua_pushstring(L, fYaml); - ret = lua_pcall(L, 1, 1, err_func); + ret = lua_pcall(L, 2, 1, err_func); if (ret) { lua_check_ret(ret); goto endCall; @@ -64,7 +65,8 @@ void LuaAddPath(lua_State *L, char *name, char *value) { lua_pop(L, 2); } -static lua_State * echos_init(char *fYaml) { +extern int collector_qout(lua_State *L); +static lua_State * echos_init(struct beeQ* q, char *fYaml) { int ret; int err_func; @@ -80,12 +82,13 @@ static lua_State * echos_init(char *fYaml) { LuaAddPath(L, "path", "../beaver/?.lua"); err_func = lua_reg_errFunc(L); + lua_register(L, "collector_qout", collector_qout); ret = lua_load_do_file(L, "../beaver/beaver.lua"); if (ret) { goto endLoad; } - ret = call_init(L, err_func, fYaml); + ret = call_init(L, err_func, q, fYaml); if (ret < 0) { goto endCall; } @@ -131,11 +134,11 @@ static int echos(lua_State *L) { return ret; } -int beaver_init(char *fYaml) { +int beaver_init(struct beeQ* q, char *fYaml) { int ret = 0; while (ret == 0) { - lua_State *L = echos_init(fYaml); + lua_State *L = echos_init(q, fYaml); if (L == NULL) { break; } diff --git a/source/tools/monitor/unity/beaver/beaver.h b/source/tools/monitor/unity/beaver/beaver.h index f1a6fd37..058fbb94 100644 --- a/source/tools/monitor/unity/beaver/beaver.h +++ b/source/tools/monitor/unity/beaver/beaver.h @@ -5,6 +5,7 @@ #ifndef UNITY_BEAVER_H #define UNITY_BEAVER_H -int beaver_init(char *fYaml); +#include "../beeQ/beeQ.h" +int beaver_init(struct beeQ* q, char *fYaml); #endif //UNITY_BEAVER_H diff --git a/source/tools/monitor/unity/beaver/beaver.lua b/source/tools/monitor/unity/beaver/beaver.lua index 2431d99c..f8287e1a 100644 --- a/source/tools/monitor/unity/beaver/beaver.lua +++ b/source/tools/monitor/unity/beaver/beaver.lua @@ -19,13 +19,12 @@ local CbaseQuery = require("beaver.query.baseQuery") local lb = nil -function init(fYaml) +function init(que, fYaml) fYaml = fYaml or "../collector/plugin.yaml" - print(fYaml) local web = Cframe.new() CurlIndex.new(web) - CurlApi.new(web, fYaml) + CurlApi.new(web, que, fYaml) CurlRpc.new(web) CurlGuide.new(web) CbaseQuery.new(web, fYaml) diff --git a/source/tools/monitor/unity/beaver/pushLine.lua b/source/tools/monitor/unity/beaver/pushLine.lua new file mode 100644 index 00000000..846568ec --- /dev/null +++ b/source/tools/monitor/unity/beaver/pushLine.lua @@ -0,0 +1,58 @@ +--- +--- Generated by EmmyLua(https://github.com/EmmyLua) +--- Created by liaozhaoyan. +--- DateTime: 2023/4/24 01:34 +--- + +require("common.class") +local system = require("common.system") +local CprotoData = require("common.protoData") +local lineParse = require("common.lineParse") +local pystring = require("common.pystring") + +local CpushLine = class("CpushLine") + +function CpushLine:_init_(que) + self._proto = CprotoData.new(que) +end + +local function trans(title, ls, vs, log) + local labels = {} + local values = {} + local logs = {} + + local c = 0 + for k, v in pairs(ls) do + c = c + 1 + labels[c] = {name=k, index=v} + end + c = 0 + for k, v in pairs(vs) do + c = c + 1 + values[c] = {name=k, value=v} + end + c = 0 + for k, v in pairs(log) do + c = c + 1 + logs[c] = {name=k, log=v} + end + return {line = title, ls = labels, vs = values, log = logs} +end + +function CpushLine:procLine(line) + return trans(lineParse.parse(line)) +end + +function CpushLine:procLines(stream) + local ss = pystring:split(stream, "\n") + local lines = self._proto:protoTable() + + for _, line in ipairs(ss) do + table.insert(lines.lines, self:procLine(line)) + end + local bytes = self._proto:encode(lines) + self._proto:que(bytes) + return 0 +end + +return CpushLine \ No newline at end of file diff --git a/source/tools/monitor/unity/beaver/url_api.lua b/source/tools/monitor/unity/beaver/url_api.lua index 29f1c219..c03d824e 100644 --- a/source/tools/monitor/unity/beaver/url_api.lua +++ b/source/tools/monitor/unity/beaver/url_api.lua @@ -9,19 +9,31 @@ local system = require("common.system") local ChttpApp = require("httplib.httpApp") local CfoxTSDB = require("tsdb.foxTSDB") local postQue = require("beeQ.postQue.postQue") +local CpushLine = require("beaver.pushLine") local CurlApi = class("urlApi", ChttpApp) -function CurlApi:_init_(frame, fYaml) +function CurlApi:_init_(frame, que, fYaml) ChttpApp._init_(self) + self._pushLine = CpushLine.new(que) self._urlCb["/api/sum"] = function(tReq) return self:sum(tReq) end self._urlCb["/api/sub"] = function(tReq) return self:sub(tReq) end self._urlCb["/api/query"] = function(tReq) return self:query(tReq) end self._urlCb["/api/trig"] = function(tReq) return self:trig(tReq) end + self._urlCb["/api/line"] = function(tReq) return self:line(tReq) end self:_install(frame) self:_setupQs(fYaml) end +function CurlApi:line(tReq) + local stat, _ = pcall(self._pushLine.procLines, self._pushLine, tReq.data) + if stat then + return "ok" + else + return "bad line " .. tReq.data, 400 + end +end + function CurlApi:trig(tReq) local stat, tJson = pcall(self.getJson, self, tReq) if stat and tJson then diff --git a/source/tools/monitor/unity/beeQ/bees.c b/source/tools/monitor/unity/beeQ/bees.c index 88239bfb..950c4bc9 100644 --- a/source/tools/monitor/unity/beeQ/bees.c +++ b/source/tools/monitor/unity/beeQ/bees.c @@ -91,7 +91,7 @@ int main(int argc, char *argv[]) { if (pid_outline == 0) { exit(1); } - beaver_init(g_yaml_file); + beaver_init(q, g_yaml_file); fprintf(stderr, "loop exit."); beeQ_stop(q); diff --git a/source/tools/monitor/unity/httplib/httpApp.lua b/source/tools/monitor/unity/httplib/httpApp.lua index 92b6cd27..996792ef 100644 --- a/source/tools/monitor/unity/httplib/httpApp.lua +++ b/source/tools/monitor/unity/httplib/httpApp.lua @@ -14,8 +14,9 @@ function ChttpApp:_init_(frame) ChttpBase._init_(self) end -function ChttpApp:echo(tRet, keep) - local stat = self:packStat(200) +function ChttpApp:echo(tRet, keep, code) + code = code or 200 + local stat = self:packStat(code) local tHead = { ["Content-Type"] = "application/json", ["Connection"] = (keep and "keep-alive") or "close" diff --git a/source/tools/monitor/unity/httplib/httpBase.lua b/source/tools/monitor/unity/httplib/httpBase.lua index 3ec1ff4f..0de7ba31 100644 --- a/source/tools/monitor/unity/httplib/httpBase.lua +++ b/source/tools/monitor/unity/httplib/httpBase.lua @@ -25,7 +25,7 @@ function ChttpBase:_installRe(path, frame) frame:registerRe(path, self) end -function ChttpBase:echo(tRet, keep) +function ChttpBase:echo(tRet, keep, code) error("ChttpBase:echo is a virtual function.") end @@ -48,8 +48,8 @@ end function ChttpBase:call(tReq) local keep = checkKeep(tReq) - local tRet = self._urlCb[tReq.path](tReq) - local res = self:echo(tRet, keep) + local tRet, code = self._urlCb[tReq.path](tReq) + local res = self:echo(tRet, keep, code) return res, keep end diff --git a/source/tools/monitor/unity/httplib/httpCli.lua b/source/tools/monitor/unity/httplib/httpCli.lua index c83888f7..3e067412 100644 --- a/source/tools/monitor/unity/httplib/httpCli.lua +++ b/source/tools/monitor/unity/httplib/httpCli.lua @@ -61,4 +61,12 @@ function ChttpCli:postTable(Url, t) return self:post(Url, req, headers) end +function ChttpCli:postLine(Url, line) + local headers = { + ["Content-Type"] = "text/plain", + ["Content-Length"] = #line, + } + return self:post(Url, line, headers) +end + return ChttpCli diff --git a/source/tools/monitor/unity/httplib/httpHtml.lua b/source/tools/monitor/unity/httplib/httpHtml.lua index 9eca219d..3d63de08 100644 --- a/source/tools/monitor/unity/httplib/httpHtml.lua +++ b/source/tools/monitor/unity/httplib/httpHtml.lua @@ -100,8 +100,9 @@ local function htmlPack(title, content) return pystring:join("", bodies) end -function ChttpHtml:pack(cType, keep, body) - local stat = self:packStat(200) +function ChttpHtml:pack(cType, keep, body, code) + code = code or 200 + local stat = self:packStat(code) local tHead = { ["Content-Type"] = cType, ["Connection"] = (keep and "keep-alive") or "close" @@ -111,11 +112,11 @@ function ChttpHtml:pack(cType, keep, body) return pystring:join("\r\n", tHttp) end -function ChttpHtml:echo(tRet, keep) +function ChttpHtml:echo(tRet, keep, code) local cType = tRet.type or "text/html" local body = htmlPack(tRet.title, tRet.content) - return self:pack(cType, keep, body) + return self:pack(cType, keep, body, code) end return ChttpHtml diff --git a/source/tools/monitor/unity/httplib/httpPlain.lua b/source/tools/monitor/unity/httplib/httpPlain.lua index f0999d89..fc0f075e 100644 --- a/source/tools/monitor/unity/httplib/httpPlain.lua +++ b/source/tools/monitor/unity/httplib/httpPlain.lua @@ -14,8 +14,9 @@ function ChttpPlain:_init_(frame) ChttpBase._init_(self) end -function ChttpPlain:echo(tRet, keep) - local stat = self:packStat(200) +function ChttpPlain:echo(tRet, keep, code) + code = code or 200 + local stat = self:packStat(code) local tHead = { ["Content-Type"] = "text/plain", ["Connection"] = (keep and "keep-alive") or "close" diff --git a/source/tools/monitor/unity/test/curl/postLine.lua b/source/tools/monitor/unity/test/curl/postLine.lua new file mode 100644 index 00000000..e04791a9 --- /dev/null +++ b/source/tools/monitor/unity/test/curl/postLine.lua @@ -0,0 +1,14 @@ +--- +--- Generated by EmmyLua(https://github.com/EmmyLua) +--- Created by liaozhaoyan. +--- DateTime: 2023/4/24 02:21 +--- + +package.path = package.path .. ";../../?.lua;" +local system = require("common.system") +local ChttpCli = require("httplib.httpCli") + +local cli = ChttpCli.new() +local url = "http://127.0.0.1:8400/api/line" +local res = cli:postLine(url, "lineTable,index=abc value=1") +system:dumps(res) diff --git a/source/tools/monitor/unity/test/curl/postLine.py b/source/tools/monitor/unity/test/curl/postLine.py new file mode 100644 index 00000000..9885d89c --- /dev/null +++ b/source/tools/monitor/unity/test/curl/postLine.py @@ -0,0 +1,6 @@ +import requests + +url = "http://127.0.0.1:8400/api/line" +line = "lineTable,index=abc value=2" +res = requests.post(url, data=line) +print(res) -- Gitee