diff --git a/common/inc/pwrerr.h b/common/inc/pwrerr.h index 692b3f4d12a7efcd70e108fdf54ef8de03fa1d45..5d02ea0be850c3ead3635d492606b85e66f43c63 100644 --- a/common/inc/pwrerr.h +++ b/common/inc/pwrerr.h @@ -25,11 +25,15 @@ enum RtnCode { ERR_SYS_EXCEPTION, ERR_NULL_POINTER, ERR_INVALIDE_PARAM, + ERR_INVALIDE_DATATYPE, ERR_CALLBACK_FUNCTION_SHOULD_BE_SET_FIRST, ERR_NOT_REGISTED = 100, ERR_OVER_MAX_CONNECTION, ERR_DISCONNECTED = 300, ERR_WRONG_RESPONSE_FROM_SERVER, ERR_ANSWER_LONGER_THAN_SIZE, + ERR_CREATE_TASK_FAILED = 400, + ERR_TASK_NOT_EXISTS, + ERR_OVER_MAX_TASK_NUM }; #endif diff --git a/common/inc/pwrmsg.h b/common/inc/pwrmsg.h index b7e34861527544de209c18ce3b2f472aa706deb7..8eb7adcd4304443cbe0b976835daf2ab4341b8a0 100644 --- a/common/inc/pwrmsg.h +++ b/common/inc/pwrmsg.h @@ -99,7 +99,7 @@ PwrMsg *ClonePwrMsg(PwrMsg *msg); PwrMsg *CreateReqMsg(enum OperationType optType, uint32_t taskNo, uint32_t dataLen, char *data); int InitMsgFactory(void); void DestroyMsgFactory(void); -int GenerateRspMsg(PwrMsg *req, PwrMsg *rsp, int rspCode, char *data, int dataLen); +int GenerateRspMsg(const PwrMsg *req, PwrMsg *rsp, int rspCode, char *data, int dataLen); typedef struct ThreadInfo { @@ -109,7 +109,7 @@ typedef struct ThreadInfo { } ThreadInfo; void InitThreadInfo(ThreadInfo *threadInfo); -int CreateThread(ThreadInfo *threadInfo, void *(*thread_proc)(void *)); +int CreateThread(ThreadInfo *threadInfo, void *(*thread_proc)(void *), void *arg); void FiniThreadInfo(ThreadInfo *threadInfo); #endif diff --git a/common/src/pwrmsg.c b/common/src/pwrmsg.c index fb70126b43d109563d9bd606d63b889dcb0af085..64759e42bae0e8d0baf1848d6d590fc90749c4c3 100644 --- a/common/src/pwrmsg.c +++ b/common/src/pwrmsg.c @@ -107,7 +107,7 @@ void DestroyMsgFactory(void) } -int GenerateRspMsg(PwrMsg *req, PwrMsg *rsp, int rspCode, char *data, int dataLen) +int GenerateRspMsg(const PwrMsg *req, PwrMsg *rsp, int rspCode, char *data, int dataLen) { if (!req || !rsp) { return ERR_NULL_POINTER; @@ -139,13 +139,13 @@ void InitThreadInfo(ThreadInfo *threadInfo) threadInfo->tid = 0; } -int CreateThread(ThreadInfo *threadInfo, void *(*thread_proc)(void *)) +int CreateThread(ThreadInfo *threadInfo, void *(*thread_proc)(void *), void *arg) { if (!threadInfo || !thread_proc) { return ERR_NULL_POINTER; } threadInfo->keepRunning = TRUE; - if (pthread_create(&threadInfo->tid, NULL, thread_proc, NULL) != SUCCESS) { + if (pthread_create(&threadInfo->tid, NULL, thread_proc, arg) != SUCCESS) { return ERR_SYS_EXCEPTION; } threadInfo->created = TRUE; diff --git a/pwrapic/inc/powerapi.h b/pwrapic/inc/powerapi.h index de748d41f64d69cf063ce13750c3f87fed0091ef..5e9787f4ce17071dffe6951adc2da40b249b7134 100644 --- a/pwrapic/inc/powerapi.h +++ b/pwrapic/inc/powerapi.h @@ -31,9 +31,9 @@ extern "C" { PWR_API int PWR_SetLogCallback(void(LogCallback)(int level, const char *fmt, va_list vl)); PWR_API int PWR_Register(void); PWR_API int PWR_UnRegister(void); -PWR_API int PWR_SetMetaDataCallback(void(MetaDataCallback)(int taskId, const PWR_COM_CallbackData *callbackData)); -PWR_API int PWR_CreateDcTask(PWR_COM_BasicDcTaskInfo *basicDcTaskInfo, int *taskId); -PWR_API int PWR_DeleteDcTask(int taskId); +PWR_API int PWR_SetMetaDataCallback(void(MetaDataCallback)(const PWR_COM_CallbackData *callbackData)); +PWR_API int PWR_CreateDcTask(const PWR_COM_BasicDcTaskInfo *basicDcTaskInfo); +PWR_API int PWR_DeleteDcTask(PWR_COM_COL_DATATYPE dataType); // CPU PWR_API int PWR_CPU_GetInfo(PWR_CPU_Info *cpuInfo); diff --git a/pwrapic/inc/pwrtask.h b/pwrapic/inc/pwrtask.h index 54aec7de2481026d4c491ae52a035b9639b629bf..7f23d67f2c0333c5edf64feda22d66bd2c45643f 100644 --- a/pwrapic/inc/pwrtask.h +++ b/pwrapic/inc/pwrtask.h @@ -17,7 +17,7 @@ #include "pwrdata.h" -int CreateDcTask(PWR_COM_BasicDcTaskInfo *basicDcTaskInfo, int *taskId); -int DeleteDcTask(int taskId); +int CreateDcTask(const PWR_COM_BasicDcTaskInfo *basicDcTaskInfo); +int DeleteDcTask(PWR_COM_COL_DATATYPE dataType); #endif diff --git a/pwrapic/inc/sockclient.h b/pwrapic/inc/sockclient.h index 621a36cb8c9b48e75a817c120b47714b6bab98da..6bf89ee373458978b63ef51743686745934d231c 100644 --- a/pwrapic/inc/sockclient.h +++ b/pwrapic/inc/sockclient.h @@ -31,7 +31,7 @@ typedef struct RspOutputParam { int InitSockClient(void); int FiniSockClient(void); -int SetMetaDataCallback(void(MetaDataCallback)(int, const PWR_COM_CallbackData *)); +int SetMetaDataCallback(void(MetaDataCallback)(const PWR_COM_CallbackData *)); int HasSetDataCallback(void); int SendReqAndWaitForRsp(ReqInputParam input, RspOutputParam output); #endif diff --git a/pwrapic/src/powerapi.c b/pwrapic/src/powerapi.c index c8aea67cb5f1b8d713eada343bbf1f9cfbee154c..65c8eeeeade5ef04cd41983d15cc97a649c90cf8 100644 --- a/pwrapic/src/powerapi.c +++ b/pwrapic/src/powerapi.c @@ -76,7 +76,7 @@ int PWR_UnRegister(void) } -int PWR_SetMetaDataCallback(void(MetaDataCallback)(int, const PWR_COM_CallbackData *)) +int PWR_SetMetaDataCallback(void(MetaDataCallback)(const PWR_COM_CallbackData *)) { if (MetaDataCallback) { return SetMetaDataCallback(MetaDataCallback); @@ -84,11 +84,10 @@ int PWR_SetMetaDataCallback(void(MetaDataCallback)(int, const PWR_COM_CallbackDa return ERR_NULL_POINTER; } -int PWR_CreateDcTask(PWR_COM_BasicDcTaskInfo *basicDcTaskInfo, int *taskId) +int PWR_CreateDcTask(const PWR_COM_BasicDcTaskInfo *basicDcTaskInfo) { CHECK_STATUS(); CHECK_NULL_POINTER(basicDcTaskInfo); - CHECK_NULL_POINTER(taskId); if (basicDcTaskInfo->interval < MIN_DC_INTERVAL || basicDcTaskInfo->interval > MAX_DC_INTERVAL) { return ERR_INVALIDE_PARAM; @@ -98,14 +97,14 @@ int PWR_CreateDcTask(PWR_COM_BasicDcTaskInfo *basicDcTaskInfo, int *taskId) return ERR_CALLBACK_FUNCTION_SHOULD_BE_SET_FIRST; } - return CreateDcTask(basicDcTaskInfo, taskId); + return CreateDcTask(basicDcTaskInfo); } -int PWR_DeleteDcTask(int taskId) +int PWR_DeleteDcTask(PWR_COM_COL_DATATYPE dataType) { CHECK_STATUS(); - return DeleteDcTask(taskId); + return DeleteDcTask(dataType); } int PWR_CPU_GetInfo(PWR_CPU_Info *cpuInfo) diff --git a/pwrapic/src/pwrtask.c b/pwrapic/src/pwrtask.c index 8cfd9d53c1266c12b41fe425590e918c322061f2..f9db2ebc2bc7da6989a375e38056892d49ff645f 100644 --- a/pwrapic/src/pwrtask.c +++ b/pwrapic/src/pwrtask.c @@ -19,7 +19,7 @@ #include "pwrerr.h" #include "sockclient.h" -int CreateDcTask(PWR_COM_BasicDcTaskInfo *basicDcTaskInfo, int *taskId) +int CreateDcTask(const PWR_COM_BasicDcTaskInfo *basicDcTaskInfo) { ReqInputParam input; input.optType = COM_CREATE_DC_TASK; @@ -28,8 +28,8 @@ int CreateDcTask(PWR_COM_BasicDcTaskInfo *basicDcTaskInfo, int *taskId) RspOutputParam output; uint32_t size = sizeof(int); - output.rspBuffSize = &size; - output.rspData = (void *)taskId; + output.rspBuffSize = NULL; + output.rspData = NULL; int ret = SendReqAndWaitForRsp(input, output); if (ret != SUCCESS) { @@ -40,12 +40,12 @@ int CreateDcTask(PWR_COM_BasicDcTaskInfo *basicDcTaskInfo, int *taskId) return ret; } -int DeleteDcTask(int taskId) +int DeleteDcTask(PWR_COM_COL_DATATYPE dataType) { ReqInputParam input; input.optType = COM_DELETE_DC_TASK; - input.dataLen = sizeof(taskId); - input.data = (char *)&taskId; + input.dataLen = sizeof(dataType); + input.data = (char *)&dataType; RspOutputParam output; output.rspBuffSize = NULL; output.rspData = NULL; diff --git a/pwrapic/src/sockclient.c b/pwrapic/src/sockclient.c index b50b5c2ea8e9e906a05e74e46f6a8d6dcb751283..7482fb03e20eadf067566dcbc56df4e76e2788e5 100644 --- a/pwrapic/src/sockclient.c +++ b/pwrapic/src/sockclient.c @@ -96,10 +96,21 @@ static int WriteMsg(const void *pData, int len) return SUCCESS; } -static void (*g_metadata_callback)(int, const PWR_COM_CallbackData *) = NULL; +static void (*g_metadata_callback)(const PWR_COM_CallbackData *) = NULL; static void DoDataCallback(PwrMsg *msg) { - g_metadata_callback(msg->head.taskNo, (PWR_COM_CallbackData *)(msg->data)); + if (msg->head.dataLen < sizeof(PWR_COM_CallbackData)) { + PwrLog(DEBUG, "DoDataCallback. msg data len error. len:%d", msg->head.dataLen); + ReleasePwrMsg(&msg); + return; + } + PWR_COM_CallbackData *callBackData = (PWR_COM_CallbackData *)msg->data; + if (callBackData->dataLen <= 0) { + PwrLog(DEBUG, "DoDataCallback. data empty. len:%d", callBackData->dataLen); + ReleasePwrMsg(&msg); + return; + } + g_metadata_callback(callBackData); ReleasePwrMsg(&msg); } @@ -337,7 +348,7 @@ int InitSockClient(void) ret = ERR_COMMON; break; } - int r = CreateThread(&g_sockThread, RunSocketProcess); + int r = CreateThread(&g_sockThread, RunSocketProcess, NULL); if (r != SUCCESS) { PwrLog(ERROR, "Create recv thread failed. ret[%d]", r); ret = ERR_COMMON; @@ -362,7 +373,7 @@ int FiniSockClient(void) return SUCCESS; } -int SetMetaDataCallback(void(MetaDataCallback)(int, const PWR_COM_CallbackData *)) +int SetMetaDataCallback(void(MetaDataCallback)(const PWR_COM_CallbackData *)) { if (MetaDataCallback) { g_metadata_callback = MetaDataCallback; @@ -384,7 +395,7 @@ int SendReqAndWaitForRsp(ReqInputParam input, RspOutputParam output) char *inputData = NULL; if (input.data && input.dataLen != 0) { - inputData = (char *)malloc(input.dataLen); + inputData = (char *)malloc(input.dataLen); // Be released when PwrMsg released bzero(inputData, input.dataLen); memcpy(inputData, input.data, input.dataLen); } diff --git a/pwrapis/inc/common.h b/pwrapis/inc/common.h index ff142bb2d5d75188a2a91068266f4d9deccef90f..ea178d1b4fd54c54d15feb9d7444173036e11567 100644 --- a/pwrapis/inc/common.h +++ b/pwrapis/inc/common.h @@ -57,6 +57,7 @@ #define MD_NM_MAN "MAIN" #define MD_NM_SVR_CPU "CPU_SERVICE" #define MD_NM_SVR_DISK "DISK_SERVICE" +#define MD_NM_SVR_TASK "TASK_SERVICE" // Define configuration section name #define CFG_NM_PST "persist" @@ -264,11 +265,12 @@ enum RunStatus { #define INVALID_FD (-1) #define INVALID_INDEX (-1) -#define MAX_LICENT_NUM 3 +#define MAX_CLIENT_NUM 3 #define THREAD_LOOP_INTERVAL 2000 // us #define SERVER_ADDR "pwrserver.sock" #define CLIENT_ADDR "pwrclient.sock." #define MAX_SYSID_LEN 20 #define MAX_PROC_NUM_ONE_LOOP 5 +#define THOUSAND 1000 #endif diff --git a/pwrapis/inc/cpuservice.h b/pwrapis/inc/cpuservice.h index 012cf193f34cff9d434bd369b08c1ac39520cc2e..7b557e58a425bc00ff92ca583bd4098f6d950256 100644 --- a/pwrapis/inc/cpuservice.h +++ b/pwrapis/inc/cpuservice.h @@ -19,6 +19,7 @@ void GetCpuinfo(PwrMsg *req); void GetCpuUsage(PwrMsg *req); void GetLLCMiss(PwrMsg *req); +void GetCpuFreq(PwrMsg *req); void GetCpuFreqGovernor(PwrMsg *req); void SetCpuFreqGovernor(PwrMsg *req); #endif diff --git a/pwrapis/inc/server.h b/pwrapis/inc/server.h index e9ed186cd5b57bd827f53778fff43df290d6411c..25359ad5098eb943b3d402c2002897d4f1f9d18a 100644 --- a/pwrapis/inc/server.h +++ b/pwrapis/inc/server.h @@ -16,7 +16,6 @@ #define __PAPIS_SERVER_H__ #include #include -#include "common.h" #include "pwrmsg.h" /** @@ -26,6 +25,7 @@ */ int StartServer(void); void StopServer(void); +int SendRspToClient(const PwrMsg *req, int rspCode, char *data, uint32_t len); int SendRspMsg(PwrMsg *rsp); #endif diff --git a/pwrapis/inc/taskservice.h b/pwrapis/inc/taskservice.h new file mode 100644 index 0000000000000000000000000000000000000000..d4528eb6c7b0dd1cd767b3cb3eeef8efe42298b0 --- /dev/null +++ b/pwrapis/inc/taskservice.h @@ -0,0 +1,24 @@ +/* ***************************************************************************** + * Copyright (c) Huawei Technologies Co., Ltd. 2022-2022. All rights reserved. + * PowerAPI licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR + * PURPOSE. + * See the Mulan PSL v2 for more details. + * Author: queyanwen + * Create: 2022-11-05 + * Description: provide task service + * **************************************************************************** */ +#ifndef PAPIS_TASK_SERVICE_H__ +#define PAPIS_TASK_SERVICE_H__ +#include "pwrmsg.h" + +int InitTaskService(void); +void FiniTaskService(void); +void CreateDataCollTask(const PwrMsg *req); +void DeleteDataCollTask(const PwrMsg *req); + +#endif diff --git a/pwrapis/src/config.c b/pwrapis/src/config.c index 3f84fbe4e31da46af0c348b665fc1c5ca1df79f0..e6cea0f0681f749962f12ea086f5bfc5cd84b0b4 100644 --- a/pwrapis/src/config.c +++ b/pwrapis/src/config.c @@ -12,7 +12,7 @@ * Create: 2022-06-23 * Description: loading config file and manager all config items for the PowerAPI service * **************************************************************************** */ - + #include "config.h" #include #include "string.h" diff --git a/pwrapis/src/cpuservice.c b/pwrapis/src/cpuservice.c index 10af3ca7a9b3b615fcfc40d3bd0db6076110afa1..ccbb60b5c2a3600dd9a2fb0ad1db137b2859d53d 100644 --- a/pwrapis/src/cpuservice.c +++ b/pwrapis/src/cpuservice.c @@ -249,7 +249,7 @@ int LLCMissRead(double *lm) missStr = "perf stat -e r0033 -e instructions -a sleep 0.1 &>perf.txt"; } else if (m == X86_64) { missStr = "perf stat -e LLC-load-misses -e LLC-store-misses -e instructions -a sleep 0.1 &>perf.txt"; - } else { // Add other arch + } else { // Add other arch return 1; } FILE *fp = NULL; diff --git a/pwrapis/src/pwrclient.c b/pwrapis/src/pwrclient.c index 35c9743285237af710e17ceb729c7169480eeeca..8f59f08466e3a2ee9ffd55e90b4e9cbf6c7e81c1 100644 --- a/pwrapis/src/pwrclient.c +++ b/pwrapis/src/pwrclient.c @@ -22,7 +22,7 @@ static int FindAvailableSlot(PwrClient clients[]) { - for (int i = 0; i < MAX_LICENT_NUM; i++) { + for (int i = 0; i < MAX_CLIENT_NUM; i++) { if (clients[i].fd == INVALID_FD) { return i; } @@ -32,7 +32,7 @@ static int FindAvailableSlot(PwrClient clients[]) static int GetClientIdx(PwrClient clients[], PwrClient newClient) { - for (int i = 0; i < MAX_LICENT_NUM; i++) { + for (int i = 0; i < MAX_CLIENT_NUM; i++) { if (clients[i].sysId == newClient.sysId) { return i; } @@ -42,7 +42,7 @@ static int GetClientIdx(PwrClient clients[], PwrClient newClient) void InitPwrClient(PwrClient clients[]) { - for (int i = 0; i < MAX_LICENT_NUM; i++) { + for (int i = 0; i < MAX_CLIENT_NUM; i++) { clients[i].fd = INVALID_FD; clients[i].sysId = INVALID_INDEX; } @@ -63,7 +63,7 @@ int AddToClientList(PwrClient clients[], PwrClient newClient) // new client int index = FindAvailableSlot(clients); if (index == INVALID_INDEX) { - Logger(ERROR, MD_NM_SVR, "Maximum client num : %d errno :%d\n", MAX_LICENT_NUM, errno); + Logger(ERROR, MD_NM_SVR, "Maximum client num : %d errno :%d\n", MAX_CLIENT_NUM, errno); return ERR_OVER_MAX_CONNECTION; } else { clients[index] = newClient; @@ -74,7 +74,7 @@ int AddToClientList(PwrClient clients[], PwrClient newClient) int DeleteFromClientList(PwrClient clients[], int idx) { - if (idx < 0 || idx >= MAX_LICENT_NUM) { + if (idx < 0 || idx >= MAX_CLIENT_NUM) { return ERR_INVALIDE_PARAM; } close(clients[idx].fd); @@ -85,7 +85,7 @@ int DeleteFromClientList(PwrClient clients[], int idx) void CloseAllConnections(PwrClient clients[]) { - for (int i = 0; i < MAX_LICENT_NUM; i++) { + for (int i = 0; i < MAX_CLIENT_NUM; i++) { if (clients[i].fd == INVALID_FD) { continue; } @@ -97,7 +97,7 @@ void CloseAllConnections(PwrClient clients[]) int GetFdBySysId(const PwrClient clients[], uint32_t sysId) { - for (int i = 0; i < MAX_LICENT_NUM; i++) { + for (int i = 0; i < MAX_CLIENT_NUM; i++) { if (clients[i].sysId == sysId) { return clients[i].fd; } @@ -107,7 +107,7 @@ int GetFdBySysId(const PwrClient clients[], uint32_t sysId) int GetIdxByFd(const PwrClient clients[], int fd) { - for (int i = 0; i < MAX_LICENT_NUM; i++) { + for (int i = 0; i < MAX_CLIENT_NUM; i++) { if (clients[i].fd == fd) { return i; } diff --git a/pwrapis/src/server.c b/pwrapis/src/server.c index 910e4dac2173d55d09da4416319b8702dfac1fb6..d959a226165b9adf4b410028ebcda7139a5c91a9 100644 --- a/pwrapis/src/server.c +++ b/pwrapis/src/server.c @@ -29,18 +29,18 @@ #include "config.h" #include "pwrbuffer.h" #include "cpuservice.h" +#include "taskservice.h" #include "pwrerr.h" #define COUNT_MAX 5 -#define THOUSAND 1000 static int g_listenFd = -1; static pthread_mutex_t g_listenFdLock = PTHREAD_MUTEX_INITIALIZER; static ThreadInfo g_sockProcThread; static ThreadInfo g_serviceThread; -static PwrClient g_pwrClients[MAX_LICENT_NUM]; // 对该结构的读和写都在一个线程完成,因而不需要加锁 -static PwrMsgBuffer g_sendBuff; // 发送队列 -static PwrMsgBuffer g_recvBuff; // 接收队列 +static PwrClient g_pwrClients[MAX_CLIENT_NUM]; // 对该结构的读和写都在一个线程完成,因而不需要加锁 +static PwrMsgBuffer g_sendBuff; // 发送队列 +static PwrMsgBuffer g_recvBuff; // 接收队列 static pthread_mutex_t g_waitMsgMutex; static pthread_cond_t g_waitMsgCond; @@ -112,11 +112,7 @@ static void AcceptConnection(void) newClientFd = accept(g_listenFd, (struct sockaddr *)&clientAddr, &socklen); pthread_mutex_unlock(&g_listenFdLock); if (newClientFd < 0) { - Logger(ERROR, - MD_NM_SVR, - "accpet socket error: %s errno :%d, addr:%s", - strerror(errno), - errno, + Logger(ERROR, MD_NM_SVR, "accpet socket error: %s errno :%d, addr:%s", strerror(errno), errno, clientAddr.sun_path); return; } @@ -128,7 +124,7 @@ static void AcceptConnection(void) strncpy(strSysId, clientAddr.sun_path + strlen(CLIENT_ADDR), MAX_SYSID_LEN - 1); client.sysId = atoi(strSysId); if (AddToClientList(g_pwrClients, client) != SUCCESS) { - Logger(ERROR, MD_NM_SVR, "Reach maximum connections or client existed : %d ", MAX_LICENT_NUM); + Logger(ERROR, MD_NM_SVR, "Reach maximum connections or client existed : %d ", MAX_CLIENT_NUM); close(newClientFd); } Logger(INFO, MD_NM_SVR, "Create new connection succeed. fd:%d, sysId:%d", client.fd, client.sysId); @@ -171,7 +167,7 @@ static void ProcessRecvMsgFromClient(int clientIdx) Logger(DEBUG, MD_NM_SVR, "receivd msg. opt:%d,sysId:%d", msg->head.optType, msg->head.sysId); if (msg->head.msgType != MT_REQ) { - ReleasePwrMsg(&msg); // the server accept request msg only. + ReleasePwrMsg(&msg); // the server accept request msg only. } if (msg->head.dataLen > 0) { @@ -263,7 +259,7 @@ static void ProcessSendMsgToClient(void) * 1. Accepting connection request * 2. Receiving msg from or send msg to client FDs */ -static void *RunServerSocketProcess(void) +static void *RunServerSocketProcess(void *none) { fd_set recvFdSet; int maxFd = INVALID_FD; @@ -276,7 +272,7 @@ static void *RunServerSocketProcess(void) maxFd = g_listenFd; FD_SET(g_listenFd, &recvFdSet); - for (int i = 0; i < MAX_LICENT_NUM; i++) { + for (int i = 0; i < MAX_CLIENT_NUM; i++) { if (g_pwrClients[i].fd != INVALID_FD) { FD_SET(g_pwrClients[i].fd, &recvFdSet); maxFd = maxFd < g_pwrClients[i].fd ? g_pwrClients[i].fd : maxFd; @@ -292,16 +288,16 @@ static void *RunServerSocketProcess(void) continue; } - if (FD_ISSET(g_listenFd, &recvFdSet)) { // new connection + if (FD_ISSET(g_listenFd, &recvFdSet)) { // new connection AcceptConnection(); } - for (int i = 0; i < MAX_LICENT_NUM; i++) { - if (FD_ISSET(g_pwrClients[i].fd, &recvFdSet)) { // new msg in + for (int i = 0; i < MAX_CLIENT_NUM; i++) { + if (FD_ISSET(g_pwrClients[i].fd, &recvFdSet)) { // new msg in ProcessRecvMsgFromClient(i); } } - } // while + } // while CloseAllConnections(g_pwrClients); } @@ -321,6 +317,12 @@ static void WaitForMsg(void) static void ProcessReqMsg(PwrMsg *req) { switch (req->head.optType) { + case COM_CREATE_DC_TASK: + CreateDataCollTask(req); + break; + case COM_DELETE_DC_TASK: + DeleteDataCollTask(req); + break; case CPU_GET_USAGE: GetCpuUsage(req); break; @@ -363,7 +365,7 @@ static void ProcessReqMsg(PwrMsg *req) * RunServiceProcess - Run RunServiceProcess * Process the request msg in receiving buffer g_recvBuff */ -static void *RunServiceProcess(void) +static void *RunServiceProcess(void *none) { while (g_serviceThread.keepRunning) { if (IsEmptyBuffer(&g_recvBuff)) { @@ -374,7 +376,7 @@ static void *RunServiceProcess(void) continue; } ProcessReqMsg(msg); - } // while + } // while } // public====================================================================================== // Init Socket. Start listening & accepting @@ -394,22 +396,24 @@ int StartServer(void) return ERR_SYS_EXCEPTION; } - ret = CreateThread(&g_serviceThread, RunServiceProcess); + ret = CreateThread(&g_serviceThread, RunServiceProcess, NULL); if (ret != SUCCESS) { Logger(ERROR, MD_NM_SVR, "Create service thread failed! ret[%d]", ret); return ERR_SYS_EXCEPTION; } - ret = CreateThread(&g_sockProcThread, RunServerSocketProcess); + ret = CreateThread(&g_sockProcThread, RunServerSocketProcess, NULL); if (ret != SUCCESS) { Logger(ERROR, MD_NM_SVR, "Create ServerSocketProcess thread failed! ret[%d]", ret); return ERR_SYS_EXCEPTION; } + InitTaskService(); return SUCCESS; } void StopServer(void) { + FiniTaskService(); FiniThreadInfo(&g_sockProcThread); FiniThreadInfo(&g_serviceThread); StopListen(); @@ -420,6 +424,29 @@ void StopServer(void) pthread_mutex_destroy((pthread_mutex_t *)&g_waitMsgMutex); } +// 本函数会将data指针所指向数据迁移走,调用方勿对data进行释放操作。 +int SendRspToClient(const PwrMsg *req, int rspCode, char *data, uint32_t len) +{ + if (!req) { + return ERR_NULL_POINTER; + } + if (!data && len != 0) { + return ERR_INVALIDE_PARAM; + } + + PwrMsg *rsp = (PwrMsg *)malloc(sizeof(PwrMsg)); + if (!rsp) { + Logger(ERROR, MD_NM_SVR, "Malloc failed."); + free(data); + return ERR_SYS_EXCEPTION; + } + bzero(rsp, sizeof(PwrMsg)); + GenerateRspMsg(req, rsp, rspCode, data, len); + if (SendRspMsg(rsp) != SUCCESS) { + ReleasePwrMsg(&rsp); + } +} + int SendRspMsg(PwrMsg *rsp) { return AddToBufferTail(&g_sendBuff, rsp); diff --git a/pwrapis/src/taskservice.c b/pwrapis/src/taskservice.c new file mode 100644 index 0000000000000000000000000000000000000000..f098eae3599c17a9be6c25d9a27de66c749c576c --- /dev/null +++ b/pwrapis/src/taskservice.c @@ -0,0 +1,316 @@ +/* ***************************************************************************** + * Copyright (c) Huawei Technologies Co., Ltd. 2022-2022. All rights reserved. + * PowerAPI licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR + * PURPOSE. + * See the Mulan PSL v2 for more details. + * Author: queyanwen + * Create: 2022-11-05 + * Description: provide task service + * **************************************************************************** */ +// todo: socket断链时,需要考虑task的释放 + +#include "taskservice.h" +#include +#include +#include +#include +#include "common.h" +#include "pwrerr.h" +#include "log.h" +#include "pwrdata.h" +#include "server.h" + +#define INVALIDE_TASK_ID (-1) +#define MAX_TASK_NUM 10 + +typedef struct CollDataSubscriber { + uint32_t sysId; + int interval; +} CollDataSubscriber; + + +typedef struct CollTask { + PWR_COM_COL_DATATYPE dataType; + int interval; + int subNum; // 订阅数 + CollDataSubscriber subscriberList[MAX_CLIENT_NUM]; // 该数据订阅方 + ThreadInfo collThread; +} CollTask; + +static CollTask *g_collTaskList[MAX_TASK_NUM]; +static int g_taskNum = 0; +static pthread_mutex_t g_taskListMutex; +static int g_hasInited = FALSE; + +static int FindCollTaskByType(PWR_COM_COL_DATATYPE dataType) +{ + if (g_taskNum == 0) { + return INVALID_INDEX; + } + for (int i = 0; i < MAX_TASK_NUM; i++) { + if (g_collTaskList[i] && g_collTaskList[i]->dataType == dataType) { + return i; + } + } + return INVALID_INDEX; +} + +static int FindAvailebleTaskSlot(void) +{ + for (int i = 0; i < MAX_TASK_NUM; i++) { + if (!g_collTaskList[i]) { + return i; + } + } + return INVALID_INDEX; +} + +static inline void FiniTask(int index) +{ + FiniThreadInfo(&(g_collTaskList[index]->collThread)); // 必须先停止线程 + free(g_collTaskList[index]); + g_collTaskList[index] = NULL; + g_taskNum--; +} + +// 需要使用指定的订阅者周期更新task周期时,subIdx填具体订阅者的索引,否则填INVALIDE_INDEX +static void UpdataTaskInterval(int index, int subIdx) +{ + if (subIdx != INVALID_INDEX) { + if (g_collTaskList[index]->interval > g_collTaskList[index]->subscriberList[subIdx].interval) { + g_collTaskList[index]->interval = g_collTaskList[index]->subscriberList[subIdx].interval; + } + return; + } + + int interval = MAX_DC_INTERVAL; + for (int i = 0; i < MAX_CLIENT_NUM; i++) { + if (interval > g_collTaskList[index]->subscriberList[i].interval) { + interval = g_collTaskList[index]->subscriberList[i].interval; + } + } + g_collTaskList[index]->interval = interval; +} + +static int FindSubscriberById(int index, uint32_t subscriber) +{ + for (int i = 0; i < MAX_CLIENT_NUM; i++) { + if (g_collTaskList[index]->subscriberList[i].sysId == subscriber) { + return i; + } + } + return INVALID_INDEX; +} + +static int AddSubscriber(int index, const PWR_COM_BasicDcTaskInfo *taskInfo, uint32_t subscriber) +{ + if (!g_collTaskList[index]) { + return ERR_TASK_NOT_EXISTS; + } + int ret = SUCCESS; + int subIdx = FindSubscriberById(index, subscriber); + if (subIdx != INVALID_INDEX) { + g_collTaskList[index]->subscriberList[subIdx].interval = taskInfo->interval; + UpdataTaskInterval(index, subIdx); + } else { + int i; + for (i = 0; i < MAX_CLIENT_NUM; i++) { + if (g_collTaskList[index]->subscriberList[i].sysId == 0) { // find available subscriber sslot + g_collTaskList[index]->subscriberList[i].sysId = subscriber; + g_collTaskList[index]->subscriberList[i].interval = taskInfo->interval; + g_collTaskList[index]->subNum++; + UpdataTaskInterval(index, i); + break; + } + } + if (i == MAX_CLIENT_NUM) { // subscriber overflow + ret = ERR_SYS_EXCEPTION; + } + } + return ret; +} + +static int DeleteSubscriber(int index, uint32_t subscriber) +{ + if (!g_collTaskList[index]) { + return ERR_TASK_NOT_EXISTS; + } + for (int i = 0; i < MAX_CLIENT_NUM; i++) { + if (g_collTaskList[index]->subscriberList[i].sysId == subscriber) { + g_collTaskList[index]->subscriberList[i].sysId = 0; + g_collTaskList[index]->subscriberList[i].interval = 0; + g_collTaskList[index]->subNum--; + break; + } + } + + if (g_collTaskList[index]->subNum <= 0) { // 无用户订阅数据时,停止采集 + FiniTask(index); + } else { + UpdataTaskInterval(index, INVALID_INDEX); + } + return SUCCESS; +} + +typedef void (*ActionFunc)(CollTask *); + +static void TaskProcessLlcMiss(CollTask *task) +{ + // todo: 采集LLC MISS 并发送 +} +static void TaskProcessCpuUsage(CollTask *task) +{ + // todo: 采集CPU USAGE 并发送 +} +static void TaskProcessCpuIpc(CollTask *task) +{ + // todo: 采集CPU IPC 并发送 +} + +static ActionFunc GetActionByDataType(PWR_COM_COL_DATATYPE dataType) +{ + switch (dataType) { + case PWR_COM_DATATYPE_LLC_MISS: + return TaskProcessLlcMiss; + case PWR_COM_DATATYPE_CPU_USAGE: + return TaskProcessCpuUsage; + case PWR_COM_DATATYPE_CPU_IPC: + return TaskProcessCpuIpc; + default: + return NULL; + } +} + +static void *RunDcTaskProcess(void *arg) +{ + CollTask *task = (CollTask *)arg; + ActionFunc actionFunc = GetActionByDataType(task->dataType); + if (!actionFunc) { + return NULL; + } + + while (task->collThread.keepRunning) { + usleep(THOUSAND * (task->interval)); + actionFunc(task); + } +} + +static int CreateNewTask(const PWR_COM_BasicDcTaskInfo *taskInfo, uint32_t subscriber) +{ + int slot = FindAvailebleTaskSlot(); + if (slot == INVALID_INDEX) { + return ERR_OVER_MAX_TASK_NUM; + } + + CollTask *task = (CollTask *)malloc(sizeof(CollTask)); + if (!task) { + return ERR_SYS_EXCEPTION; + } + bzero(task, sizeof(CollTask)); + task->dataType = taskInfo->dataType; + task->interval = taskInfo->interval; + task->subNum = 1; + task->subscriberList[0].sysId = subscriber; + task->subscriberList[0].interval = taskInfo->interval; + InitThreadInfo(&(task->collThread)); + int rspCode = CreateThread(&(task->collThread), RunDcTaskProcess, (void *)task); + if (rspCode != SUCCESS) { + free(task); + Logger(ERROR, MD_NM_SVR_TASK, "CreateNewTask failed. ret:%d type:%d, sysId:%d", rspCode, taskInfo->dataType, + subscriber); + } else { + g_collTaskList[slot] = task; + Logger(INFO, MD_NM_SVR_TASK, "CreateNewTask succeed. type:%d, sysId:%d", taskInfo->dataType, subscriber); + } + return rspCode; +} + +// with lock================================================================ + +static int CreateTask(const PWR_COM_BasicDcTaskInfo *taskInfo, uint32_t subscriber) +{ + int rspCode = SUCCESS; + pthread_mutex_lock(&g_taskListMutex); + + int index = FindCollTaskByType(taskInfo->dataType); + if (index != INVALID_INDEX) { // task existed + rspCode = AddSubscriber(index, taskInfo, subscriber); + } else { + rspCode = CreateNewTask(taskInfo, subscriber); + } + pthread_mutex_unlock(&g_taskListMutex); + return rspCode; +} + +static int DeleteTask(PWR_COM_COL_DATATYPE dataType, uint32_t subscriber) +{ + pthread_mutex_lock(&g_taskListMutex); + int index = FindCollTaskByType(dataType); + if (index != INVALID_INDEX) { + DeleteSubscriber(index, subscriber); + } + pthread_mutex_unlock(&g_taskListMutex); + return SUCCESS; +} + +static void FiniAllTask(void) +{ + pthread_mutex_lock(&g_taskListMutex); + for (int i = 0; i < MAX_TASK_NUM; i++) { + if (g_collTaskList[i]) { + FiniTask(i); + } + } + pthread_mutex_unlock(&g_taskListMutex); +} + + +// public====================================================================== +int InitTaskService(void) +{ + if (g_hasInited) { + return SUCCESS; + } + bzero(g_collTaskList, MAX_TASK_NUM * sizeof(CollTask *)); + g_taskNum = 0; + pthread_mutex_init((pthread_mutex_t *)&g_taskListMutex, NULL); + g_hasInited = TRUE; + return SUCCESS; +} + + +void FiniTaskService(void) +{ + FiniAllTask(); + g_taskNum = 0; + pthread_mutex_destroy((pthread_mutex_t *)&g_taskListMutex); + g_hasInited = FALSE; +} + +void CreateDataCollTask(const PwrMsg *req) +{ + if (!req || req->head.dataLen != sizeof(PWR_COM_BasicDcTaskInfo)) { + return; + } + Logger(DEBUG, MD_NM_SVR_TASK, "Get CreateDataCollTask Req. seqId:%u, sysId:%d", req->head.seqId, req->head.sysId); + + int rspCode = CreateTask((PWR_COM_BasicDcTaskInfo *)req->data, req->head.sysId); + SendRspToClient(req, rspCode, NULL, 0); +} + +void DeleteDataCollTask(const PwrMsg *req) +{ + if (!req || req->head.dataLen != sizeof(int)) { + return; + } + Logger(DEBUG, MD_NM_SVR_TASK, "Get DeleteDataCollTask Req. seqId:%u, sysId:%d", req->head.seqId, req->head.sysId); + + PWR_COM_COL_DATATYPE *dataType = (PWR_COM_COL_DATATYPE *)req->data; + int rspCode = DeleteTask(*dataType, req->head.sysId); + SendRspToClient(req, rspCode, NULL, 0); +}