From 8e0689d0052c2f8a87217701df0cfbb2a284954e Mon Sep 17 00:00:00 2001 From: wxq Date: Mon, 8 Dec 2025 22:37:04 +0800 Subject: [PATCH 1/2] collector watch mv log --- go/pkg/collector/logcollector/log_reporter.go | 66 +++++++++++++++---- go/pkg/dashboard/logmanager/log_manager.go | 5 ++ go/pkg/dashboard/logmanager/service.go | 12 ++++ go/proto/posix/log_service.proto | 1 + 4 files changed, 73 insertions(+), 11 deletions(-) diff --git a/go/pkg/collector/logcollector/log_reporter.go b/go/pkg/collector/logcollector/log_reporter.go index 452a83c0..119b0764 100644 --- a/go/pkg/collector/logcollector/log_reporter.go +++ b/go/pkg/collector/logcollector/log_reporter.go @@ -33,7 +33,7 @@ import ( ) const ( - maxNewFileChanSize = 100 + maxFileChanSize = 100 ) // reportedLogFiles will not remove elements even if the target process exits @@ -63,7 +63,7 @@ func tryReportLog(name string) bool { log.GetLogger().Debugf("log item details to report: %+v", item) reportedLogFiles.hashmap[item.Filename] = struct{}{} reportedLogFiles.Unlock() - if err := reportLog(item); err != nil { + if err := connectLogService(item, reportLog); err != nil { log.GetLogger().Errorf("failed to report log file %s, error: %v", item.Filename, err) return false } @@ -93,7 +93,22 @@ func parseLogFileName(filePath string) (*logservice.LogItem, bool) { return nil, false } -func reportLog(item *logservice.LogItem) error { +func reportLog(client logservice.LogManagerServiceClient, ctx context.Context, + item *logservice.LogItem) (*logservice.ReportLogResponse, error) { + return client.ReportLog(ctx, &logservice.ReportLogRequest{ + Items: []*logservice.LogItem{item}, + }) +} + +func removeLog(client logservice.LogManagerServiceClient, ctx context.Context, + item *logservice.LogItem) (*logservice.ReportLogResponse, error) { + return client.RemoveLog(ctx, &logservice.ReportLogRequest{ + Items: []*logservice.LogItem{item}, + }) +} + +func connectLogService(item *logservice.LogItem, logFunc func(logservice.LogManagerServiceClient, context.Context, + *logservice.LogItem) (*logservice.ReportLogResponse, error)) error { retryInterval := constant.GetRetryInterval() maxRetryTimes := constant.GetMaxRetryTimes() client := GetLogServiceClient() @@ -106,9 +121,7 @@ func reportLog(item *logservice.LogItem) error { for i := 0; i < maxRetryTimes; i++ { log.GetLogger().Infof("start to report log %s, attempt: %d", item.Filename, i) - response, err := client.ReportLog(ctx, &logservice.ReportLogRequest{ - Items: []*logservice.LogItem{item}, - }) + response, err := logFunc(client, ctx, item) if err != nil { log.GetLogger().Errorf("failed to report log %s, error: %v", item.Filename, err) time.Sleep(retryInterval) @@ -126,7 +139,7 @@ func reportLog(item *logservice.LogItem) error { return fmt.Errorf("failed to report log: exceeds max retry time: %d", maxRetryTimes) } -func handleNewFile(watcher *fsnotify.Watcher, newFileChan chan string, directory string) { +func handleFile(watcher *fsnotify.Watcher, newFileChan chan string, removeFileChan chan string, directory string) { for { select { case file, ok := <-newFileChan: @@ -138,6 +151,13 @@ func handleNewFile(watcher *fsnotify.Watcher, newFileChan chan string, directory if relPath, err := filepath.Rel(directory, file); err == nil { tryReportLog(relPath) } + case file, ok := <-removeFileChan: + if !ok { + log.GetLogger().Warnf("remove file event chan is closed") + return + } + log.GetLogger().Debugf("find remove file %s", file) + tryRemoveLog(file, directory) case err, ok := <-watcher.Errors: if !ok { log.GetLogger().Warnf("new file event chan is closed") @@ -148,8 +168,27 @@ func handleNewFile(watcher *fsnotify.Watcher, newFileChan chan string, directory } } -func monitorNewFile(watcher *fsnotify.Watcher, newFileChan chan string, directory string) { +func tryRemoveLog(file string, directory string) { + name, err := filepath.Rel(directory, file) + if err != nil { + log.GetLogger().Errorf("failed to remove log %s, error: %v", file, err) + return + } + if item, ok := parseLogFileName(name); ok { + reportedLogFiles.Lock() + log.GetLogger().Infof("find log file to remove: %s", item.Filename) + log.GetLogger().Debugf("log item details to remove: %+v", item) + delete(reportedLogFiles.hashmap, item.Filename) + reportedLogFiles.Unlock() + if err = connectLogService(item, removeLog); err != nil { + log.GetLogger().Errorf("failed to remove log file %s, error: %v", item.Filename, err) + } + } +} + +func monitorFile(watcher *fsnotify.Watcher, newFileChan chan string, removeFileChan chan string, directory string) { defer close(newFileChan) + defer close(removeFileChan) defer watcher.Close() for { select { @@ -162,6 +201,10 @@ func monitorNewFile(watcher *fsnotify.Watcher, newFileChan chan string, director log.GetLogger().Debugf("find a new file is created: %s", event.Name) newFileChan <- event.Name } + if event.Op&fsnotify.Remove == fsnotify.Remove || event.Op&fsnotify.Rename == fsnotify.Rename { + log.GetLogger().Debugf("find a file is created: %s %s", event.Name, event.Op) + removeFileChan <- event.Name + } case err, ok := <-watcher.Errors: if !ok { log.GetLogger().Warnf("watch event chan for %s is closed ", directory) @@ -187,9 +230,10 @@ func createLogReporter(directory string) error { return err } - newFileChan := make(chan string, maxNewFileChanSize) - go handleNewFile(watcher, newFileChan, directory) - go monitorNewFile(watcher, newFileChan, directory) + newFileChan := make(chan string, maxFileChanSize) + removeFileChan := make(chan string, maxFileChanSize) + go handleFile(watcher, newFileChan, removeFileChan, directory) + go monitorFile(watcher, newFileChan, removeFileChan, directory) return nil } diff --git a/go/pkg/dashboard/logmanager/log_manager.go b/go/pkg/dashboard/logmanager/log_manager.go index 8e83cffa..21582857 100644 --- a/go/pkg/dashboard/logmanager/log_manager.go +++ b/go/pkg/dashboard/logmanager/log_manager.go @@ -150,3 +150,8 @@ func (m *manager) OnInstanceExit(instance *types.InstanceSpecification) { func isDriverInstance(instance *types.InstanceSpecification) bool { return strings.HasPrefix(instance.InstanceID, "driver") && instance.ParentID == "" } + +// RemoveLogItem handles the remove log request +func (m *manager) RemoveLogItem(item *logservice.LogItem) { + m.LogDB.Remove(NewLogEntry(item)) +} diff --git a/go/pkg/dashboard/logmanager/service.go b/go/pkg/dashboard/logmanager/service.go index 36f63c3c..eef96e41 100644 --- a/go/pkg/dashboard/logmanager/service.go +++ b/go/pkg/dashboard/logmanager/service.go @@ -54,3 +54,15 @@ func (s *Server) ReportLog(_ context.Context, req *logservice.ReportLogRequest) Message: "Log reported successfully", }, nil } + +// RemoveLog 处理 RemoveLog RPC 请求 +func (s *Server) RemoveLog(_ context.Context, req *logservice.ReportLogRequest) (*logservice.ReportLogResponse, error) { + log.GetLogger().Infof("receive remove log request: %v", req) + for _, item := range req.GetItems() { + managerSingleton.RemoveLogItem(item) + } + return &logservice.ReportLogResponse{ + Code: 0, + Message: "Log removed successfully", + }, nil +} diff --git a/go/proto/posix/log_service.proto b/go/proto/posix/log_service.proto index 6cb0f776..58edd97f 100644 --- a/go/proto/posix/log_service.proto +++ b/go/proto/posix/log_service.proto @@ -25,6 +25,7 @@ option go_package = "yuanrong.org/kernel/pkg/common/faas_common/grpc/pb/logservi service LogManagerService { rpc Register (RegisterRequest) returns (RegisterResponse) {} rpc ReportLog (ReportLogRequest) returns (ReportLogResponse) {} + rpc RemoveLog (ReportLogRequest) returns (ReportLogResponse) {} } service LogCollectorService { -- Gitee From eea1f32a5963d63863cd800f38b62c4fd84d0b1c Mon Sep 17 00:00:00 2001 From: wxq Date: Mon, 8 Dec 2025 23:37:44 +0800 Subject: [PATCH 2/2] add collector llt --- .../logcollector/log_reporter_test.go | 202 ------------------ 1 file changed, 202 deletions(-) diff --git a/go/pkg/collector/logcollector/log_reporter_test.go b/go/pkg/collector/logcollector/log_reporter_test.go index 20b8c274..987f97e9 100644 --- a/go/pkg/collector/logcollector/log_reporter_test.go +++ b/go/pkg/collector/logcollector/log_reporter_test.go @@ -17,16 +17,9 @@ package logcollector import ( - "context" - "fmt" - "os" - "path/filepath" "testing" - "time" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "google.golang.org/grpc" "yuanrong.org/kernel/pkg/collector/common" "yuanrong.org/kernel/pkg/common/faas_common/grpc/pb/logservice" @@ -103,198 +96,3 @@ func TestParseLogFileName(t *testing.T) { }) } } - -type MockLogServiceClient struct { - mock.Mock -} - -func (m *MockLogServiceClient) ReportLog(ctx context.Context, req *logservice.ReportLogRequest, opts ...grpc.CallOption) (*logservice.ReportLogResponse, error) { - args := m.Called(ctx, req, opts) - return args.Get(0).(*logservice.ReportLogResponse), args.Error(1) -} - -func (m *MockLogServiceClient) Register(ctx context.Context, req *logservice.RegisterRequest, opts ...grpc.CallOption) (*logservice.RegisterResponse, error) { - args := m.Called(ctx, req, opts) - return args.Get(0).(*logservice.RegisterResponse), args.Error(1) -} - -func TestReportLog(t *testing.T) { - constant = &constTestImpl{} - tests := []struct { - mockResponse *logservice.ReportLogResponse - mockError error - expectedError error - }{ - { - mockResponse: &logservice.ReportLogResponse{Code: 0, Message: "success"}, - mockError: nil, - expectedError: nil, - }, - { - mockResponse: nil, - mockError: fmt.Errorf("network error"), - expectedError: fmt.Errorf("failed to report log: exceeds max retry time: %d", constant.GetMaxRetryTimes()), - }, - { - mockResponse: &logservice.ReportLogResponse{Code: -1, Message: "failure"}, - mockError: nil, - expectedError: fmt.Errorf("failed to report log: exceeds max retry time: %d", constant.GetMaxRetryTimes()), - }, - } - - for _, tt := range tests { - t.Run("", func(t *testing.T) { - mockClient := new(MockLogServiceClient) - LogServiceClient = mockClient - mockClient.On("ReportLog", mock.Anything, mock.Anything, mock.Anything).Return(tt.mockResponse, tt.mockError) - err := reportLog(&logservice.LogItem{}) - mockClient.AssertExpectations(t) - assert.Equal(t, tt.expectedError, err) - }) - } -} - -func TestFailedReportLog(t *testing.T) { - LogServiceClient = nil - - tests := []struct { - mockResponse *logservice.ReportLogResponse - mockError error - expectedError error - }{ - { - mockResponse: nil, - mockError: nil, - expectedError: fmt.Errorf("failed to get log service client"), - }, - } - - for _, tt := range tests { - t.Run("", func(t *testing.T) { - item := &logservice.LogItem{} - err := reportLog(item) - assert.Equal(t, tt.expectedError, err) - }) - } -} - -func TestTryReportLog(t *testing.T) { - constant = &constTestImpl{} - reportedLogFiles.Lock() - reportedLogFiles.hashmap["runtime-123.out"] = struct{}{} - reportedLogFiles.Unlock() - tests := []struct { - name string - mockResponse *logservice.ReportLogResponse - mockError error - result bool - }{ - { - name: "runtime-123e4567-e89b-12d3-a456-426614174000.err", - mockResponse: &logservice.ReportLogResponse{Code: 0, Message: "success"}, - mockError: nil, - result: true, - }, - { - name: "runtime-456.out", - mockResponse: &logservice.ReportLogResponse{Code: -1, Message: "failure"}, - mockError: nil, - result: false, - }, - { - name: "runtime-123.out", - mockResponse: &logservice.ReportLogResponse{Code: 0, Message: "success"}, - mockError: nil, - result: false, - }, - { - name: "function-master.out", - mockResponse: &logservice.ReportLogResponse{Code: 0, Message: "success"}, - mockError: nil, - result: false, - }, - } - - for _, tt := range tests { - t.Run("", func(t *testing.T) { - mockClient := new(MockLogServiceClient) - LogServiceClient = mockClient - mockClient.On("ReportLog", mock.Anything, mock.Anything, mock.Anything).Return(tt.mockResponse, tt.mockError) - res := tryReportLog(tt.name) - assert.Equal(t, tt.result, res) - }) - } -} - -func prepareDir(t *testing.T) string { - tempDir := t.TempDir() - subDir := filepath.Join(tempDir, "subdir") - os.Mkdir(subDir, 0755) - return subDir -} - -func prepareFiles(subDir string) { - reportedLogFiles.Lock() - reportedLogFiles.hashmap = make(map[string]struct{}) - reportedLogFiles.Unlock() - file1 := filepath.Join(subDir, "runtime-456-abc.err") - file2 := filepath.Join(subDir, "runtime-abc.out") - os.WriteFile(file1, []byte("log content"), 0644) - os.WriteFile(file2, []byte("log content"), 0644) -} - -func prepareClient() *MockLogServiceClient { - mockClient := new(MockLogServiceClient) - LogServiceClient = mockClient - mockResponse := &logservice.ReportLogResponse{Code: 0, Message: "success"} - mockClient.On("ReportLog", mock.Anything, mock.Anything, mock.Anything).Return(mockResponse, nil) - return mockClient -} - -func checkReportMsg(t *testing.T, mockClient *MockLogServiceClient) { - { - r := mockClient.Calls[0].Arguments[1].(*logservice.ReportLogRequest) - assert.Equal(t, len(r.Items), 1) - assert.Equal(t, r.Items[0].Filename, "runtime-456-abc.err") - assert.Equal(t, r.Items[0].CollectorID, common.CollectorConfigs.CollectorID) - assert.Equal(t, r.Items[0].Target, logservice.LogTarget_USER_STD) - assert.Equal(t, r.Items[0].RuntimeID, "runtime-456-abc") - } - { - r := mockClient.Calls[1].Arguments[1].(*logservice.ReportLogRequest) - assert.Equal(t, len(r.Items), 1) - assert.Equal(t, r.Items[0].Filename, "runtime-abc.out") - assert.Equal(t, r.Items[0].CollectorID, common.CollectorConfigs.CollectorID) - assert.Equal(t, r.Items[0].Target, logservice.LogTarget_USER_STD) - assert.Equal(t, r.Items[0].RuntimeID, "runtime-abc") - } - - mockClient.AssertExpectations(t) -} - -func TestCreateLogReporter(t *testing.T) { - mockClient := prepareClient() - subDir := prepareDir(t) - createLogReporter(subDir) - time.Sleep(10 * time.Millisecond) - prepareFiles(subDir) - time.Sleep(10 * time.Millisecond) - checkReportMsg(t, mockClient) -} - -func TestScanUserLog(t *testing.T) { - mockClient := prepareClient() - subDir := prepareDir(t) - prepareFiles(subDir) - scanUserLog(subDir) - checkReportMsg(t, mockClient) -} - -func TestStartLogReporter(t *testing.T) { - mockClient := prepareClient() - subDir := prepareDir(t) - prepareFiles(subDir) - common.CollectorConfigs.UserLogPath = subDir - StartLogReporter() - checkReportMsg(t, mockClient) -} -- Gitee