From 91d83411d5b0dddb863c4932df0f41ab4e9cdc6f Mon Sep 17 00:00:00 2001 From: liaozhaoyan Date: Tue, 11 Jul 2023 13:48:13 +0800 Subject: [PATCH 1/2] fix message que is full bug. --- source/tools/monitor/unity/beeQ/foxRecv.lua | 17 ++++++++++++++--- .../tools/monitor/unity/collector/btfLoader.lua | 3 +-- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/source/tools/monitor/unity/beeQ/foxRecv.lua b/source/tools/monitor/unity/beeQ/foxRecv.lua index 1e3c2dd6..558fdd07 100644 --- a/source/tools/monitor/unity/beeQ/foxRecv.lua +++ b/source/tools/monitor/unity/beeQ/foxRecv.lua @@ -11,6 +11,7 @@ local system = require("common.system") local CfoxRecv = class("CfoxRecv") local unistd = require("posix.unistd") local fcntl = require("posix.fcntl") +local bit = require("bit") local struct = require("struct") local function setupCo(fYaml) @@ -24,6 +25,10 @@ local function setupCo(fYaml) fcntl.fcntl(fdIn, 1031, 1024 * 1024) fcntl.fcntl(fdOut, 1031, 1024 * 1024) + + local flag = fcntl.fcntl(fdOut, fcntl.F_GETFL, 0); + flag = bit.bor(flag, fcntl.O_NONBLOCK) + lua_push_start(fdIn) return fdIn, fdOut end @@ -50,11 +55,17 @@ function CfoxRecv:_del_() end end -function CfoxRecv:outToFd(stream) +local function pipeOut(fd, stream) local len = #stream local s = struct.pack(" 0 then + unistd.write(fd, stream) + end +end + +function CfoxRecv:outToFd(stream) + pipeOut(self.fdOut, stream) self._fox:write(stream) end diff --git a/source/tools/monitor/unity/collector/btfLoader.lua b/source/tools/monitor/unity/collector/btfLoader.lua index f7d4869e..44770eaf 100644 --- a/source/tools/monitor/unity/collector/btfLoader.lua +++ b/source/tools/monitor/unity/collector/btfLoader.lua @@ -70,8 +70,7 @@ local function downKo(path, name, region, machine, release) end function CbtfLoader:_init_(root) - return - local distro = utsname.uname() + --local distro = utsname.uname() if distro then local release, machine = distro.release, distro.machine local path = '/boot/vmlinux-' .. release -- Gitee From ae097189c844786cbf6d1197f326669c984c3f56 Mon Sep 17 00:00:00 2001 From: liaozhaoyan Date: Tue, 11 Jul 2023 14:45:46 +0800 Subject: [PATCH 2/2] fix for run failed cannot resume dead coroutine. --- source/tools/monitor/unity/beeQ/foxRecv.lua | 4 ++- source/tools/monitor/unity/httplib/coCli.lua | 34 +++++++++++++++----- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/source/tools/monitor/unity/beeQ/foxRecv.lua b/source/tools/monitor/unity/beeQ/foxRecv.lua index 558fdd07..9537e10b 100644 --- a/source/tools/monitor/unity/beeQ/foxRecv.lua +++ b/source/tools/monitor/unity/beeQ/foxRecv.lua @@ -59,8 +59,10 @@ local function pipeOut(fd, stream) local len = #stream local s = struct.pack(" 0 then + if ret == 4 then unistd.write(fd, stream) + else + print("pipeOut failed.") end end diff --git a/source/tools/monitor/unity/httplib/coCli.lua b/source/tools/monitor/unity/httplib/coCli.lua index 327c111d..c6155e28 100644 --- a/source/tools/monitor/unity/httplib/coCli.lua +++ b/source/tools/monitor/unity/httplib/coCli.lua @@ -143,9 +143,20 @@ function CcoCli:pushMsg(coOut, bytes) local lines = self._proto:decode(bytes) ok, msg = coroutine.resume(coOut, lines) + if not ok then + print(string.format("coOut run failed %s", msg)) + end + return ok +end + +function CcoCli:_newOut(cli) + local coOut = coroutine.create(self.coQueFunc) + local ok, msg = coroutine.resume(coOut, self, cli, self.cffi, self._efd, coOut) if not ok then error(string.format("coOut run failed %s", msg)) + return nil end + return coOut end function CcoCli:_pollFd(bfd, cli, nes, coIn, coOut) @@ -157,7 +168,11 @@ function CcoCli:_pollFd(bfd, cli, nes, coIn, coOut) ok, msg = coroutine.resume(coIn, e) if ok then if msg then - self:pushMsg(coOut, msg) + ok = self:pushMsg(coOut, msg) + if not ok then + coOut = self:_newOut(cli) + assert(coOut) + end end else error(string.format("coIn run failed %s", msg)) @@ -177,6 +192,7 @@ function CcoCli:_pollFd(bfd, cli, nes, coIn, coOut) print("bad fd " .. fd .. "use fd " .. cli.fd) end end + return coOut end function CcoCli:_poll(cli) @@ -185,11 +201,14 @@ function CcoCli:_poll(cli) local efd = self._efd local ffi, cffi = self.ffi, self.cffi - local coOut = coroutine.create(self.coQueFunc) - ok, msg = coroutine.resume(coOut, self, cli, cffi, efd, coOut) - if not ok then - error(string.format("coOut run failed %s", msg)) - end + --local coOut = coroutine.create(self.coQueFunc) + --ok, msg = coroutine.resume(coOut, self, cli, cffi, efd, coOut) + --if not ok then + -- error(string.format("coOut run failed %s", msg)) + --end + + local coOut = self:_newOut(cli) + assert(coOut) local coIn = coroutine.create(read_stream) ok, msg = coroutine.resume(coIn, bfd) @@ -197,7 +216,6 @@ function CcoCli:_poll(cli) error(string.format("coIn run failed %s", msg)) end - local c = 1 while true do local nes = ffi.new("native_events_t") local res = cffi.poll_fds(efd, 1, nes) @@ -206,7 +224,7 @@ function CcoCli:_poll(cli) return "end poll." end if nes.num > 0 then - self:_pollFd(bfd, cli, nes, coIn, coOut) + coOut = self:_pollFd(bfd, cli, nes, coIn, coOut) else self:checkOvertime(cli, coOut, ffi) end -- Gitee