diff --git a/common/inc/pwrdata.h b/common/inc/pwrdata.h index 8e7d8e820116ebd4c9dde7be3d4b76e689998a78..69fe8c0304eac413d368e00403451ce909181724 100644 --- a/common/inc/pwrdata.h +++ b/common/inc/pwrdata.h @@ -21,7 +21,7 @@ #define MAX_NUMA_NODE_NUM 16 #define MAX_GOV_NUM 16 #define MAX_STRING_LEN 1000 -#define MAX_TIME_LEN 24 +#define MAX_TIME_LEN 25 #define MAX_DC_INTERVAL 100000000 #define MIN_DC_INTERVAL 500 @@ -45,7 +45,7 @@ typedef struct PWR_COM_CallbackData { char ctime[MAX_TIME_LEN]; int dataType; int dataLen; - void *data; + char data[0]; } PWR_COM_CallbackData; typedef struct PWR_COM_BasicDcTaskInfo { diff --git a/common/inc/pwrmsg.h b/common/inc/pwrmsg.h index 8eb7adcd4304443cbe0b976835daf2ab4341b8a0..ff4d82be73aa81e890c05ba24b72230657541ca1 100644 --- a/common/inc/pwrmsg.h +++ b/common/inc/pwrmsg.h @@ -99,6 +99,7 @@ PwrMsg *ClonePwrMsg(PwrMsg *msg); PwrMsg *CreateReqMsg(enum OperationType optType, uint32_t taskNo, uint32_t dataLen, char *data); int InitMsgFactory(void); void DestroyMsgFactory(void); +int GenerateMetadataMsg(PwrMsg *metadata, uint32_t sysId, char *data, uint32_t len); int GenerateRspMsg(const PwrMsg *req, PwrMsg *rsp, int rspCode, char *data, int dataLen); diff --git a/common/src/pwrmsg.c b/common/src/pwrmsg.c index 64759e42bae0e8d0baf1848d6d590fc90749c4c3..41f51d0c86ac9c9473a0d0afc0a1d12eca4276be 100644 --- a/common/src/pwrmsg.c +++ b/common/src/pwrmsg.c @@ -106,6 +106,25 @@ void DestroyMsgFactory(void) pthread_mutex_destroy((pthread_mutex_t *)&g_seqLock); } +int GenerateMetadataMsg(PwrMsg *metadata, uint32_t sysId, char *data, uint32_t len) +{ + if (!metadata || !data) { + return ERR_NULL_POINTER; + } + bzero(metadata, sizeof(PwrMsg)); + metadata->head.majorVer = MAJOR_VERSION; + metadata->head.minorVer = MINOR_VERSION; + metadata->head.optType = COM_CALLBACK_DATA; + metadata->head.dataFormat = FMT_BIN; + metadata->head.msgType = MT_MDT; + metadata->head.rspCode = 0; + metadata->head.seqId = GenerateSeqId(); + metadata->head.taskNo = 0; // 暂时无作用 + metadata->head.crcMagic = GenerateCrcMagic(); + metadata->head.dataLen = len; + metadata->head.sysId = sysId; + metadata->data = data; +} int GenerateRspMsg(const PwrMsg *req, PwrMsg *rsp, int rspCode, char *data, int dataLen) { diff --git a/pwrapic/test/demo_main.c b/pwrapic/test/demo_main.c index 04c94b33572cca3e5ec19fbc70e2cfb71dca7ddc..2cda54d7193913d4a661481f176df07d23ad066c 100644 --- a/pwrapic/test/demo_main.c +++ b/pwrapic/test/demo_main.c @@ -24,6 +24,8 @@ #define TEST_CORE_NUM 128 #define AVG_LEN_PER_CORE 5 #define TEST_CPU_DMA_LATENCY 2000 +#define TASK_INTERNAL 500 +#define TASK_RUN_TIME 10 static int g_run = 1; enum { @@ -65,6 +67,31 @@ void LogCallback(int level, const char *fmt, va_list vl) printf("%s: %s\n", GetLevelName(level), message); } +void MetaDataCallback(const PWR_COM_CallbackData *callbackData) +{ + PWR_CPU_Usage *usage = NULL; + switch (callbackData->dataType) { + case PWR_COM_DATATYPE_LLC_MISS: + printf("[TASK]Get cache miss data. miss: %f, ctime:%s\n", *(double *)callbackData->data, + callbackData->ctime); + break; + case PWR_COM_DATATYPE_CPU_USAGE: + usage = (PWR_CPU_Usage *)(callbackData->data); + printf("[TASK]Get Cpu Usage. avgUsage: %f, coreNum:%d, ctime:%s\n", usage->avgUsage, usage->coreNum, + callbackData->ctime); + /* for (int i = 0; i < usage->coreNum; i++) { + printf(" core%d usage: %f\n", usage->coreNum[i].coreNo, usage->coreNum[i].usage); + } */ + break; + case PWR_COM_DATATYPE_CPU_IPC: + printf("[TASK]Get Ipc. ipc: %f, ctime:%s\n", *(double *)callbackData->data, callbackData->ctime); + break; + default: + printf("[TASK]Get INVALIDE data.\n"); + break; + } +} + static void SignalHandler(int none) { g_run = 0; @@ -206,6 +233,34 @@ static void TEST_PWR_CPU_DmaSetAndGetLatency(void) printf("PWR_CPU_DmaGetLatency ret: %d, Latency:%d\n", ret, la); } + +static void TEST_PWR_COM_DcTaskMgr(void) +{ + int ret = SUCCESS; + ret = PWR_SetMetaDataCallback(MetaDataCallback); + printf("PWR_SetMetaDataCallback ret: %d\n", ret); + + PWR_COM_BasicDcTaskInfo task = { 0 }; + task.dataType = PWR_COM_DATATYPE_LLC_MISS; + task.interval = TASK_INTERNAL; + ret = PWR_CreateDcTask(&task); + printf("PWR_CreateDcTask. dataType:%d ret: %d\n", task.dataType, ret); + task.dataType = PWR_COM_DATATYPE_CPU_USAGE; + ret = PWR_CreateDcTask(&task); + printf("PWR_CreateDcTask. dataType:%d ret: %d\n", task.dataType, ret); + task.dataType = PWR_COM_DATATYPE_CPU_IPC; + ret = PWR_CreateDcTask(&task); + printf("PWR_CreateDcTask. dataType:%d ret: %d\n", task.dataType, ret); + + sleep(TASK_RUN_TIME); + ret = PWR_DeleteDcTask(PWR_COM_DATATYPE_LLC_MISS); + printf("PWR_DeleteDcTask. dataType:%d ret: %d\n", PWR_COM_DATATYPE_LLC_MISS, ret); + ret = PWR_DeleteDcTask(PWR_COM_DATATYPE_CPU_USAGE); + printf("PWR_DeleteDcTask. dataType:%d ret: %d\n", PWR_COM_DATATYPE_CPU_USAGE, ret); + ret = PWR_DeleteDcTask(PWR_COM_DATATYPE_CPU_IPC); + printf("PWR_DeleteDcTask. dataType:%d ret: %d\n", PWR_COM_DATATYPE_CPU_IPC, ret); +} + int main(int argc, const char *args[]) { PWR_SetLogCallback(LogCallback); @@ -234,7 +289,7 @@ int main(int argc, const char *args[]) // PWR_CPU_DmaSetLatency PWR_CPU_DmaGetLatency // TEST_PWR_CPU_DmaSetAndGetLatency(); - + TEST_PWR_COM_DcTaskMgr(); // todo: 其他接口测试 while (g_run) { sleep(MAIN_LOOP_INTERVAL); diff --git a/pwrapis/inc/cpuservice.h b/pwrapis/inc/cpuservice.h index 7b557e58a425bc00ff92ca583bd4098f6d950256..57dde9dc976cb89a8d519d2f2039fc9c4907015b 100644 --- a/pwrapis/inc/cpuservice.h +++ b/pwrapis/inc/cpuservice.h @@ -15,11 +15,16 @@ #ifndef __PAPIS_CPU_SERVICE_H__ #define __PAPIS_CPU_SERVICE_H__ #include "pwrmsg.h" +#include "pwrdata.h" void GetCpuinfo(PwrMsg *req); void GetCpuUsage(PwrMsg *req); void GetLLCMiss(PwrMsg *req); void GetCpuFreq(PwrMsg *req); +int LLCMissRead(double *lm); +int CPUUsageRead(PWR_CPU_Usage *rstData, int coreNum); +int CpuIpcRead(double *ipc); void GetCpuFreqGovernor(PwrMsg *req); void SetCpuFreqGovernor(PwrMsg *req); +int GetCpuCoreNumber(void); #endif diff --git a/pwrapis/inc/server.h b/pwrapis/inc/server.h index 25359ad5098eb943b3d402c2002897d4f1f9d18a..7b7bd6d439d164c378212e08079aae3b9a176d90 100644 --- a/pwrapis/inc/server.h +++ b/pwrapis/inc/server.h @@ -26,6 +26,7 @@ int StartServer(void); void StopServer(void); int SendRspToClient(const PwrMsg *req, int rspCode, char *data, uint32_t len); +int SendMetadataToClient(uint32_t sysId, char *data, uint32_t len); int SendRspMsg(PwrMsg *rsp); #endif diff --git a/pwrapis/inc/utils.h b/pwrapis/inc/utils.h index 1c0070027b0950b3d10d5a5410e99e7ee6e9587e..2f098d4c24c686656a3aad26dc7f3e0c015a5db2 100644 --- a/pwrapis/inc/utils.h +++ b/pwrapis/inc/utils.h @@ -49,6 +49,9 @@ const char *GetCurFmtTmStr(const char *fmt, char *strTime, int bufLen); */ const char *GetCurFullTime(char *fullTime, int bufLen); +long GetTimeDistance(struct timeval v1, struct timeval v2); + + /** * Return file size * diff --git a/pwrapis/src/cpuservice.c b/pwrapis/src/cpuservice.c index ccbb60b5c2a3600dd9a2fb0ad1db137b2859d53d..4c7b3ee02f516a6dc6150799ac2a08fc9275e21b 100644 --- a/pwrapis/src/cpuservice.c +++ b/pwrapis/src/cpuservice.c @@ -17,7 +17,6 @@ #include "string.h" #include "pwrerr.h" #include "server.h" -#include "pwrdata.h" #include "log.h" #include "unistd.h" #define USAGE_COLUMN 8 @@ -285,6 +284,13 @@ int LLCMissRead(double *lm) return 0; } +int CpuIpcRead(double *ipc) +{ + *ipc = 1.0; + // todo: impl get ipc + return SUCCESS; +} + int GetPolicys(char (*policys)[MAX_ELEMENT_NAME_LEN], int *poNum) { FILE *fp = NULL; @@ -554,3 +560,9 @@ void GetCpuFreq(PwrMsg *req) ReleasePwrMsg(&rsp); } } + +// 总CPU核数 +int GetCpuCoreNumber(void) +{ + return sysconf(_SC_NPROCESSORS_CONF); +} diff --git a/pwrapis/src/server.c b/pwrapis/src/server.c index d959a226165b9adf4b410028ebcda7139a5c91a9..126fa651413d6e9d1cbc68cfc1bb1cad44354243 100644 --- a/pwrapis/src/server.c +++ b/pwrapis/src/server.c @@ -378,6 +378,12 @@ static void *RunServiceProcess(void *none) ProcessReqMsg(msg); } // while } + +static inline int SendMsg(PwrMsg *msg) +{ + return AddToBufferTail(&g_sendBuff, msg); +} + // public====================================================================================== // Init Socket. Start listening & accepting int StartServer(void) @@ -446,6 +452,24 @@ int SendRspToClient(const PwrMsg *req, int rspCode, char *data, uint32_t len) ReleasePwrMsg(&rsp); } } +// 本函数会将data指针所指向数据迁移走,调用方勿对data进行释放操作。 +int SendMetadataToClient(uint32_t sysId, char *data, uint32_t len) +{ + if (!data && len != 0) { + return ERR_INVALIDE_PARAM; + } + PwrMsg *metadata = (PwrMsg *)malloc(sizeof(PwrMsg)); + if (!metadata) { + Logger(ERROR, MD_NM_SVR, "Malloc failed."); + free(data); + return ERR_SYS_EXCEPTION; + } + bzero(metadata, sizeof(PwrMsg)); + GenerateMetadataMsg(metadata, sysId, data, len); + if (SendMsg(metadata)) { + ReleasePwrMsg(&metadata); + } +} int SendRspMsg(PwrMsg *rsp) { diff --git a/pwrapis/src/taskservice.c b/pwrapis/src/taskservice.c index f098eae3599c17a9be6c25d9a27de66c749c576c..1e6de6530fb91220962df6d6731584625bbcfc30 100644 --- a/pwrapis/src/taskservice.c +++ b/pwrapis/src/taskservice.c @@ -15,15 +15,17 @@ // todo: socket断链时,需要考虑task的释放 #include "taskservice.h" -#include #include #include #include +#include #include "common.h" #include "pwrerr.h" #include "log.h" #include "pwrdata.h" #include "server.h" +#include "cpuservice.h" +#include "utils.h" #define INVALIDE_TASK_ID (-1) #define MAX_TASK_NUM 10 @@ -31,6 +33,7 @@ typedef struct CollDataSubscriber { uint32_t sysId; int interval; + struct timeval lastSent; } CollDataSubscriber; @@ -123,6 +126,7 @@ static int AddSubscriber(int index, const PWR_COM_BasicDcTaskInfo *taskInfo, uin if (g_collTaskList[index]->subscriberList[i].sysId == 0) { // find available subscriber sslot g_collTaskList[index]->subscriberList[i].sysId = subscriber; g_collTaskList[index]->subscriberList[i].interval = taskInfo->interval; + gettimeofday(&(g_collTaskList[index]->subscriberList[i].lastSent), NULL); g_collTaskList[index]->subNum++; UpdataTaskInterval(index, i); break; @@ -142,8 +146,7 @@ static int DeleteSubscriber(int index, uint32_t subscriber) } for (int i = 0; i < MAX_CLIENT_NUM; i++) { if (g_collTaskList[index]->subscriberList[i].sysId == subscriber) { - g_collTaskList[index]->subscriberList[i].sysId = 0; - g_collTaskList[index]->subscriberList[i].interval = 0; + bzero(&(g_collTaskList[index]->subscriberList[i]), sizeof(CollDataSubscriber)); g_collTaskList[index]->subNum--; break; } @@ -157,19 +160,80 @@ static int DeleteSubscriber(int index, uint32_t subscriber) return SUCCESS; } -typedef void (*ActionFunc)(CollTask *); +static void SendMetadataToSubscribers(CollTask *task, const char *data, int len, const struct timeval *startTime) +{ + for (int i = 0; i < MAX_CLIENT_NUM; i++) { + if (task->subscriberList[i].sysId != 0 && + GetTimeDistance(*startTime, task->subscriberList[i].lastSent) >= task->subscriberList[i].interval) { + char *dataCpy = (char *)malloc(len); + if (!dataCpy) { + continue; + } + memcpy(dataCpy, data, len); + SendMetadataToClient(task->subscriberList[i].sysId, dataCpy, len); // 该函数内部会释放dataCpy + task->subscriberList[i].lastSent = *startTime; + } + } +} -static void TaskProcessLlcMiss(CollTask *task) +static PWR_COM_CallbackData *CreateMedataObject(int dataLen, PWR_COM_COL_DATATYPE dataType) { - // todo: 采集LLC MISS 并发送 + PWR_COM_CallbackData *callbackData = (PWR_COM_CallbackData *)malloc(dataLen); + if (!callbackData) { + return NULL; + } + bzero(callbackData, dataLen); + GetCurFullTime(callbackData->ctime, MAX_TIME_LEN); + callbackData->dataType = dataType; + callbackData->dataLen = dataLen - sizeof(PWR_COM_CallbackData); + return callbackData; } -static void TaskProcessCpuUsage(CollTask *task) + +typedef void (*ActionFunc)(CollTask *, const struct timeval *); +static void TaskProcessLlcMiss(CollTask *task, const struct timeval *startTime) { - // todo: 采集CPU USAGE 并发送 + int callbackDataLen = sizeof(PWR_COM_CallbackData) + sizeof(double); + PWR_COM_CallbackData *callbackData = CreateMedataObject(callbackDataLen, task->dataType); + if (!callbackData) { + return; + } + if (LLCMissRead((double *)callbackData->data) != SUCCESS) { + free(callbackData); + return; + } + SendMetadataToSubscribers(task, (char *)callbackData, callbackDataLen, startTime); + free(callbackData); } -static void TaskProcessCpuIpc(CollTask *task) + +static void TaskProcessCpuUsage(CollTask *task, const struct timeval *startTime) +{ + int coreNum = GetCpuCoreNumber(); + int callbackDataLen = sizeof(PWR_COM_CallbackData) + sizeof(PWR_CPU_Usage) + coreNum * sizeof(PWR_CPU_CoreUsage); + PWR_COM_CallbackData *callbackData = CreateMedataObject(callbackDataLen, task->dataType); + if (!callbackData) { + return; + } + if (CPUUsageRead((PWR_CPU_Usage *)callbackData->data, coreNum) != SUCCESS) { + free(callbackData); + return; + } + SendMetadataToSubscribers(task, (char *)callbackData, callbackDataLen, startTime); + free(callbackData); +} + +static void TaskProcessCpuIpc(CollTask *task, const struct timeval *startTime) { - // todo: 采集CPU IPC 并发送 + int callbackDataLen = sizeof(PWR_COM_CallbackData) + sizeof(double); + PWR_COM_CallbackData *callbackData = CreateMedataObject(callbackDataLen, task->dataType); + if (!callbackData) { + return; + } + if (CpuIpcRead((double *)callbackData->data) != SUCCESS) { + free(callbackData); + return; + } + SendMetadataToSubscribers(task, (char *)callbackData, callbackDataLen, startTime); + free(callbackData); } static ActionFunc GetActionByDataType(PWR_COM_COL_DATATYPE dataType) @@ -193,10 +257,18 @@ static void *RunDcTaskProcess(void *arg) if (!actionFunc) { return NULL; } - + struct timeval after; + gettimeofday(&after, NULL); + struct timeval before = after; + int sleepTime = 0; while (task->collThread.keepRunning) { - usleep(THOUSAND * (task->interval)); - actionFunc(task); + gettimeofday(&after, NULL); + sleepTime = task->interval - GetTimeDistance(after, before); // 任务周期 - 上次执行花费时间 + if (sleepTime > 0) { + usleep(THOUSAND * sleepTime); + } + gettimeofday(&before, NULL); + actionFunc(task, &before); } } @@ -217,6 +289,7 @@ static int CreateNewTask(const PWR_COM_BasicDcTaskInfo *taskInfo, uint32_t subsc task->subNum = 1; task->subscriberList[0].sysId = subscriber; task->subscriberList[0].interval = taskInfo->interval; + gettimeofday(&(task->subscriberList[0].lastSent), NULL); InitThreadInfo(&(task->collThread)); int rspCode = CreateThread(&(task->collThread), RunDcTaskProcess, (void *)task); if (rspCode != SUCCESS) { @@ -225,7 +298,9 @@ static int CreateNewTask(const PWR_COM_BasicDcTaskInfo *taskInfo, uint32_t subsc subscriber); } else { g_collTaskList[slot] = task; - Logger(INFO, MD_NM_SVR_TASK, "CreateNewTask succeed. type:%d, sysId:%d", taskInfo->dataType, subscriber); + g_taskNum++; + Logger(INFO, MD_NM_SVR_TASK, "CreateNewTask succeed. type:%d, sysId:%d, taskNum:%d", taskInfo->dataType, + subscriber, g_taskNum); } return rspCode; } @@ -244,6 +319,8 @@ static int CreateTask(const PWR_COM_BasicDcTaskInfo *taskInfo, uint32_t subscrib rspCode = CreateNewTask(taskInfo, subscriber); } pthread_mutex_unlock(&g_taskListMutex); + Logger(INFO, MD_NM_SVR_TASK, "CreateTask. sysId:%d dataType:%d result: %d", subscriber, taskInfo->dataType, + rspCode); return rspCode; } @@ -255,6 +332,7 @@ static int DeleteTask(PWR_COM_COL_DATATYPE dataType, uint32_t subscriber) DeleteSubscriber(index, subscriber); } pthread_mutex_unlock(&g_taskListMutex); + Logger(INFO, MD_NM_SVR_TASK, "DeleteTask. sysId:%d dataType:%d taskNum:%d", subscriber, dataType, g_taskNum); return SUCCESS; } @@ -305,7 +383,7 @@ void CreateDataCollTask(const PwrMsg *req) void DeleteDataCollTask(const PwrMsg *req) { - if (!req || req->head.dataLen != sizeof(int)) { + if (!req || req->head.dataLen != sizeof(PWR_COM_COL_DATATYPE)) { return; } Logger(DEBUG, MD_NM_SVR_TASK, "Get DeleteDataCollTask Req. seqId:%u, sysId:%d", req->head.seqId, req->head.sysId); diff --git a/pwrapis/src/utils.c b/pwrapis/src/utils.c index d653d477310c6dd7f7715afcac1605b5729569d5..00ec16ee5ec53034daf3961d0fd92db5a9f99035 100644 --- a/pwrapis/src/utils.c +++ b/pwrapis/src/utils.c @@ -141,6 +141,12 @@ const char *GetCurFullTime(char *fullTime, int bufLen) return fullTime; } +// return ms time +long GetTimeDistance(struct timeval v1, struct timeval v2) +{ + return (v1.tv_sec - v2.tv_sec) * THOUSAND + (v1.tv_usec - v2.tv_usec) / THOUSAND; +} + size_t GetFileSize(const char *fileName) { int res = 0;