diff --git a/Makefile.in b/Makefile.in index 27eb90d3f40e54e9a8bd2c88c86bc5b270dbe294..606b31358fe69278a322571304f6879b50afc0d1 100644 --- a/Makefile.in +++ b/Makefile.in @@ -95,6 +95,7 @@ LIBJEMALLOC := $(LIBPREFIX)jemalloc$(install_suffix) BINS := $(objroot)bin/jemalloc-config $(objroot)bin/jemalloc.sh $(objroot)bin/jeprof C_HDRS := $(objroot)include/jemalloc/jemalloc$(install_suffix).h C_SRCS := $(srcroot)src/jemalloc.c \ + $(srcroot)src/tracing.c \ $(srcroot)src/arena.c \ $(srcroot)src/background_thread.c \ $(srcroot)src/base.c \ diff --git a/include/jemalloc/internal/tracing.h b/include/jemalloc/internal/tracing.h new file mode 100644 index 0000000000000000000000000000000000000000..95c7139afccc6fcba61de27e83c17217c44c9312 --- /dev/null +++ b/include/jemalloc/internal/tracing.h @@ -0,0 +1,35 @@ +#ifndef TRACING_H +#define TRACING_H + +#include +#include + +void Tracing(char ops_type, void *ptr, size_t size); + +void MallocTrace(void *ptr, size_t size, char *func_name); +void FreeTrace(void *ptr, char *func_name); + +// tracing frame levels, deeper level means more performance reduction. +#define MAX_STACK_FRAMES 192 +typedef struct { + void *buffer[MAX_STACK_FRAMES]; + int frames; + void *ptr; + char ops_type; + size_t size; +} ExecStack; + +typedef struct Node { + struct Node *next; + struct Node *prev; + ExecStack exec_stack; +} Node; + +#define EXEC_STACK_NODE_SIZE (sizeof(Node)) + +void AppendExecStack(Node *exec_node); + +void InitLib(); +void CleanLib(); + +#endif diff --git a/src/jemalloc.c b/src/jemalloc.c index 67be7681643fcac1ae1b088bcd2a0e2349938b60..7b09fccd1006c0b9073c864cd773eed9a2256406 100644 --- a/src/jemalloc.c +++ b/src/jemalloc.c @@ -23,6 +23,7 @@ #include "jemalloc/internal/sz.h" #include "jemalloc/internal/ticker.h" #include "jemalloc/internal/thread_event.h" +#include "jemalloc/internal/tracing.h" #include "jemalloc/internal/util.h" /******************************************************************************/ @@ -2764,9 +2765,11 @@ void JEMALLOC_NOTHROW * JEMALLOC_ATTR(malloc) JEMALLOC_ALLOC_SIZE(1) je_malloc(size_t size) { LOG("core.malloc.entry", "size: %zu", size); - + malloc_printf("je malloc size %d\n", size); void * ret = imalloc_fastpath(size, &malloc_default); + MallocTrace(ret, size, "je_malloc"); + LOG("core.malloc.exit", "result: %p", ret); return ret; } @@ -3033,7 +3036,9 @@ je_free(void *ptr) { je_free_impl(ptr); - LOG("core.free.exit", ""); + FreeTrace(ptr, "je_free"); + + LOG("core.free.exit", ""); } JEMALLOC_EXPORT void JEMALLOC_NOTHROW @@ -4255,6 +4260,13 @@ JEMALLOC_ATTR(constructor) static void jemalloc_constructor(void) { malloc_init(); + InitLib(); +} + +JEMALLOC_ATTR(destructor) +static void +jemalloc_destructor(void) { + CleanLib(); } #endif diff --git a/src/tracing.c b/src/tracing.c new file mode 100644 index 0000000000000000000000000000000000000000..8c79844b7ea4c58acc24a5db4d4f6695ec7c69e1 --- /dev/null +++ b/src/tracing.c @@ -0,0 +1,528 @@ +#include "jemalloc/internal/tracing.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "jemalloc/internal/malloc_io.h" +#include "jemalloc/internal/jemalloc_internal_includes.h" + +#define USE_LIB_UNWIND + +#ifdef USE_LIB_UNWIND +# include +#endif + +pthread_mutex_t mutex_gl = PTHREAD_MUTEX_INITIALIZER; + +volatile char kInitialized = 0; + +int Lock() { + return pthread_mutex_lock(&mutex_gl); +} + +int Unlock() { + return pthread_mutex_unlock(&mutex_gl); +} + +char thread_name[16]; + +#define NO_PRINT +void MallocTrace(void *ptr, size_t size, char *func_name) { + // malloc_printf("TEST malloc print start, size : %ld, ptr : %p %s\n", size, ptr, func_name); + Tracing((char)1, ptr, size); +#ifdef NO_PRINT + return; +#endif + Lock(); + pthread_t thread_id = pthread_self(); + if (pthread_getname_np(thread_id, thread_name, sizeof(thread_name)) != 0) { + malloc_printf("Error - get thread name failed, line : %s\n", func_name); + } + malloc_printf("TEST malloc print end, size : %ld, ptr : %p %s%lu %s\n", size, ptr, + thread_name, (unsigned long)thread_id, func_name); + Unlock(); +} + +void FreeTrace(void *ptr, char *func_name) { + Tracing((char)0, ptr, 0); +#ifdef NO_PRINT + return; +#endif + Lock(); + pthread_t thread_id = pthread_self(); + if (pthread_getname_np(thread_id, thread_name, sizeof(thread_name)) != 0) { + malloc_printf("Error - get thread name failed, line : %s\n", func_name); + } + + malloc_printf("TEST free %p %s%lu %s\n", ptr, thread_name, + (unsigned long)thread_id, func_name); + Unlock(); +} + +/////////////////////////////////////////////////////////////////////////// +// Define for Object pool +unsigned long MMAP_MEMORY_INFO_DEFAULT_SIZE = 50 * 1024 * 1024 * 1024L; +typedef struct MmapMemoryInfo { + struct MmapMemoryInfo *next_; + unsigned char *base_addr_; + unsigned char *offset_; + size_t size_; +} MmapMemoryInfo; + +MmapMemoryInfo kMmapMemoryInfo; +char kMemoryInfoInitialized = 0; + +void *MmapMalloc(size_t size) { + void *new_block = mmap( + NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); + if (new_block == MAP_FAILED) { + malloc_printf("mmap failed"); + return NULL; + } + malloc_printf("mmap malloc size : %ld, %p\n", size, new_block); + return new_block; +} + +void *ExtendMmapMemory(size_t size) { + malloc_printf("Extend mmap memory, size : %ld\n", size); + return MmapMalloc(size + MMAP_MEMORY_INFO_DEFAULT_SIZE); +} + +void ExtendMmapMemoryInfo(MmapMemoryInfo *mmap_memory_info) { + malloc_printf("Extend mmap memory info\n"); + mmap_memory_info->next_ = NULL; + mmap_memory_info->base_addr_ = MmapMalloc(MMAP_MEMORY_INFO_DEFAULT_SIZE); + mmap_memory_info->offset_ = mmap_memory_info->base_addr_; + mmap_memory_info->size_ = MMAP_MEMORY_INFO_DEFAULT_SIZE; + malloc_printf("Extend mmap memory info, base addr : %p, offset : %p, size : %ld\n", mmap_memory_info->base_addr_, + mmap_memory_info->offset_, mmap_memory_info->size_); +} + +void *Malloc(size_t size) { + malloc_printf("Malloc start\n"); + if (kMemoryInfoInitialized == 0) { + ExtendMmapMemoryInfo(&kMmapMemoryInfo); + kMemoryInfoInitialized = 1; + } + malloc_printf("start judge\n"); + if (kMmapMemoryInfo.offset_ + size > kMmapMemoryInfo.base_addr_ + kMmapMemoryInfo.size_) { + malloc_printf("Malloc failed.\n"); + uint8_t *base_addr = (uint8_t *)ExtendMmapMemory(sizeof(MmapMemoryInfo)); + if (base_addr == NULL) { + malloc_printf("ExtendMmapMemory failed.\n"); + return NULL; + } + MmapMemoryInfo *new_mmap_memory_info = (MmapMemoryInfo *)base_addr; + new_mmap_memory_info->next_ = NULL; + new_mmap_memory_info->base_addr_ = base_addr + sizeof(MmapMemoryInfo); + new_mmap_memory_info->offset_ = new_mmap_memory_info->base_addr_; + + // swap mmap memory info and new mmap memory info + MmapMemoryInfo *tmp = (MmapMemoryInfo *)(base_addr + sizeof(MmapMemoryInfo)); + tmp->base_addr_ = new_mmap_memory_info->base_addr_; + tmp->offset_ = new_mmap_memory_info->offset_; + tmp->size_ = new_mmap_memory_info->size_; + tmp->next_ = new_mmap_memory_info->next_; + + new_mmap_memory_info->base_addr_ = kMmapMemoryInfo.base_addr_; + new_mmap_memory_info->offset_ = kMmapMemoryInfo.offset_; + new_mmap_memory_info->size_ = kMmapMemoryInfo.size_; + new_mmap_memory_info->next_ = kMmapMemoryInfo.next_; + + kMmapMemoryInfo.base_addr_ = tmp->base_addr_; + kMmapMemoryInfo.offset_ = tmp->offset_; + kMmapMemoryInfo.size_ = tmp->size_; + kMmapMemoryInfo.next_ = new_mmap_memory_info; + } + + void *addr = kMmapMemoryInfo.offset_; + malloc_printf("kMmapMemoryInfo.offset_ %p\n", kMmapMemoryInfo.offset_); + kMmapMemoryInfo.offset_ += size; + malloc_printf("kMmapMemoryInfo.offset_ %p, size : %ld\n", kMmapMemoryInfo.offset_, size); + + malloc_printf("%d - malloc : %p, size : %ld, limit : %p\n", (int)getpid(), addr, size, kMmapMemoryInfo.offset_); + return addr; +} + +void Free(void *addr) { + // not support currently +} + +////////////////// definition for object pool ///////////////////// +typedef struct Block { + struct Block *next_; +} Block; + +#define DEFAULT_BUFFER_SIZE 1024 * 1024 + +typedef struct Buffer { + struct Buffer *next_; + char data_[EXEC_STACK_NODE_SIZE * DEFAULT_BUFFER_SIZE]; +} Buffer; + +inline void *GetBlock(Buffer *buffer, int index) { + return &buffer->data_[index * EXEC_STACK_NODE_SIZE]; +} + +Buffer *NewBuffer(Buffer *buffer) { + Buffer *new_buffer = (Buffer *)Malloc(sizeof(Buffer)); + if (new_buffer == NULL) { + malloc_printf("Malloc failed"); + return NULL; + } + new_buffer->next_ = buffer; + malloc_printf("%d - new buffer : %p\n", (int)getpid(), new_buffer); + return new_buffer; +} + +void spinlock_init(atomic_int *lock) { + malloc_printf("spin lock init : %p\n", lock); + atomic_store_explicit(lock, 0, memory_order_relaxed); +} + +void spinlock_lock(atomic_int *lock) { + // malloc_printf("lock : %p\n", lock); + while (atomic_exchange_explicit(lock, 1, memory_order_acquire)) { + while (atomic_load_explicit(lock, memory_order_relaxed)) { + thrd_yield(); + } + } +} + +void spinlock_unlock(atomic_int *lock) { + // malloc_printf("unlock : %p\n", lock); + atomic_store_explicit(lock, 0, memory_order_release); +} + +typedef struct ObjectPool { + Block *free_list_; + Buffer *buffer_; + int buffer_index_; + int buffer_size_; + + // pthread_mutex_t mutex; + atomic_int spin_lock_; +} ObjectPool; + +ObjectPool kObjectPool; +void InitObjectPool(ObjectPool *object_pool) { + object_pool->free_list_ = NULL; + object_pool->buffer_ = NULL; + object_pool->buffer_index_ = DEFAULT_BUFFER_SIZE; + object_pool->buffer_size_ = DEFAULT_BUFFER_SIZE; + spinlock_init(&object_pool->spin_lock_); +} + +void Destroy(ObjectPool *pool) { + Buffer *first_buffer = pool->buffer_; + while (first_buffer != NULL) { + Buffer *buffer = first_buffer; + first_buffer = buffer->next_; + Free(buffer); + } +} + +inline Node *Borrow(ObjectPool *pool) { + malloc_printf("BORROW : %p from cache\n", pool); + spinlock_lock(&pool->spin_lock_); + if (pool->free_list_ != NULL) { + Block *block = pool->free_list_; + pool->free_list_ = block->next_; + // malloc_printf("BORROW : %p with cache\n", block); + spinlock_unlock(&pool->spin_lock_); + return (Node *)block; + } + + if (pool->buffer_index_ >= pool->buffer_size_) { + pool->buffer_ = NewBuffer(pool->buffer_); + // malloc_printf("new buffer : %p\n", pool->buffer_); + pool->buffer_index_ = 0; + } + + Node *node = (Node *)GetBlock(pool->buffer_, pool->buffer_index_); + pool->buffer_index_++; + // malloc_printf("BORROW : %p, index : %d, node size : %d\n", node, pool->buffer_index_, EXEC_STACK_NODE_SIZE); + node->prev = NULL; + node->next = NULL; + spinlock_unlock(&pool->spin_lock_); + return node; +} + +inline void Return(ObjectPool *pool, Node *node) { + spinlock_lock(&pool->spin_lock_); + // malloc_printf("RETURN : %p\n", node); + Block *block = (Block *)node; + block->next_ = pool->free_list_; + pool->free_list_ = block; + spinlock_unlock(&pool->spin_lock_); +} + +// this method need to be multi thread safe. +Node *BorrowFromPool() { + Node *node = Borrow(&kObjectPool); + // malloc_printf("%d - BORROW : %p\n", (int)getpid(), node); + return node; +} + +void ReturnToPool(Node *node) { + // malloc_printf("%d - RETURN : %p\n", (int)getpid(), node); + Return(&kObjectPool, node); +} + +// Here, add concurrent info +typedef void *(*malloc_libc)(size_t); +// extern "C" malloc_libc real_malloc; +typedef void (*free_libc)(void *ptr); + +typedef struct { + Node head; + pthread_mutex_t mutex; + pthread_cond_t not_empty; +} BlockingQueue; + +BlockingQueue kBlockingQueue; +void InitQueue(BlockingQueue *queue) { + queue->head.next = queue->head.prev = &queue->head; + pthread_mutex_init(&queue->mutex, NULL); + pthread_cond_init(&queue->not_empty, NULL); +} + +void Enqueue(BlockingQueue *queue, Node *node) { + if (!kInitialized) { + return; + } + + pthread_mutex_lock(&queue->mutex); + node->prev = &queue->head; + node->next = queue->head.next; + queue->head.next->prev = node; + queue->head.next = node; + + pthread_cond_signal(&queue->not_empty); + pthread_mutex_unlock(&queue->mutex); +} + +Node *Dequeue(BlockingQueue *queue) { + pthread_mutex_lock(&queue->mutex); + while (queue->head.prev == &queue->head) { + pthread_cond_wait(&queue->not_empty, &queue->mutex); + } + + Node *node = queue->head.prev; + node->prev->next = &queue->head; + queue->head.prev = node->prev; + pthread_mutex_unlock(&queue->mutex); + return node; +} + +void DestroyQueue(BlockingQueue *queue) { + pthread_mutex_lock(&queue->mutex); + Node *current = queue->head.next; + while (current != NULL) { + Node *next = current->next; + ReturnToPool(current); + current = next; + } + pthread_mutex_unlock(&queue->mutex); + pthread_mutex_destroy(&queue->mutex); + pthread_cond_destroy(&queue->not_empty); +} + +// ConcurrentQueue kConcurrentQueue; +volatile bool init_flag = true; + +/////////////////////////////////////////////////////////////////////////// + +int DumpPidMapFile(int pid) { + // int copy_file(const char *source_path, const char *dest_path) { + char source_path[100] = {0}; + FILE *source_file = NULL; + FILE *dest_file = NULL; + int status = -1; + const size_t buffer_size = 4096; + char buffer[buffer_size]; + + // Format source file and open it. + sprintf(source_path, "/proc/%d/maps", pid); + // malloc_printf("source path : %s\n", source_path); + source_file = fopen(source_path, "rb"); + if (!source_file) { + perror("Failed to open source file"); + goto cleanup; + } + + char dest_path[100] = {0}; + sprintf(dest_path, "tracker_%d_maps", pid); + // malloc_printf("dest path : %s\n", dest_path); + + struct stat src_stat, dst_stat; + // Check source file status. + if (stat(source_path, &src_stat) != 0) { + perror("Error getting source file status"); + return status; + } + + // Check target file status. + int dst_exists = (stat(dest_path, &dst_stat) == 0); + if (dst_exists && src_stat.st_size == dst_stat.st_size) { + return status; + } + + // Open destination file. + dest_file = fopen(dest_path, "wb"); + if (!dest_file) { + perror("Failed to open destination file"); + goto cleanup; + } + + // malloc_printf("update mapping file - source path : %s, dest path : %s\n", source_path, dest_path); + // Start copying data. + size_t bytes_read; + while ((bytes_read = fread(buffer, 1, buffer_size, source_file)) > 0) { + size_t bytes_written = fwrite(buffer, 1, bytes_read, dest_file); + if (bytes_written != bytes_read) { + perror("Write operation failed"); + goto cleanup; + } + } + + // track timestamp + struct utimbuf new_time; + new_time.actime = src_stat.st_atime; + new_time.modtime = src_stat.st_mtime; + utime(dest_file, &new_time); + + // Check for read errors. + if (ferror(source_file)) { + perror("Read operation failed"); + goto cleanup; + } + + status = 0; + +cleanup: + // Clean up resources. + if (source_file) + fclose(source_file); + if (dest_file) + fclose(dest_file); + + // Remove destination file if an error occurred. + if (status != 0 && dest_file) { + remove(dest_path); + } + + return status; +} + +void *consumer(void *arg) { + // malloc_printf("free_internal : %p, arg : %p\n", free_internal, arg); + static size_t cnt = 0; + // pthread_t thread_id = pthread_self(); + pid_t pid = getpid(); + char file_name[100]; + sprintf(file_name, "tracker_%d.dat", (int)pid); + FILE *file = fopen(file_name, "a+"); + if (file == NULL) { + malloc_printf("Open failed failed"); + return NULL; + } + while (init_flag) { + Node *exec_node = Dequeue(&kBlockingQueue); + // malloc_printf("Get process exec node : %p\n", exec_node); + // here format to be very simple so that it will be easier to parse. + // ops_type|ptr|size|frame-0|frame-1... + fprintf(file, "%d|%p|%ld", exec_node->exec_stack.ops_type, + exec_node->exec_stack.ptr, exec_node->exec_stack.size); + for (int i = 0; i < exec_node->exec_stack.frames; i++) { + fprintf(file, "|%p", exec_node->exec_stack.buffer[i]); + } + fprintf(file, "\n"); + + if (cnt % 100000 == 0) { + malloc_printf("Flush: %ld\n", cnt); + fflush(file); + + DumpPidMapFile((int)pid); + } + cnt++; + // malloc_printf("free : %p\n", exec_node); + ReturnToPool(exec_node); + } + fflush(file); + fclose(file); + return NULL; +} + +pthread_mutex_t kInitializedMutex = PTHREAD_MUTEX_INITIALIZER; + +void InitLib() { + malloc_printf("Try init library\n"); + if (kInitialized) { + return; + } + malloc_printf("init library\n"); + pthread_mutex_lock(&kInitializedMutex); + if (kInitialized) { + pthread_mutex_unlock(&kInitializedMutex); + } + + malloc_printf("start init library\n"); + // concurrent_queue_init(&kConcurrentQueue, malloc_internal, free_internal); + InitQueue(&kBlockingQueue); + InitObjectPool(&kObjectPool); + malloc_printf("start create thread\n"); + pthread_t thread_id; + // 创建新线程 + int result = + pthread_create(&thread_id, NULL, consumer, (void *)NULL); + if (result == 0) { + malloc_printf("init backend thread success.\n"); + } else { + malloc_printf("init backend thread failed.\n"); + init_flag = false; + } + kInitialized = 1; + pthread_mutex_unlock(&kInitializedMutex); +} + +void CleanLib() { + malloc_printf("Shared library is being cleaned up.\n"); + Node *node = BorrowFromPool(); + node->exec_stack.ops_type = 'E'; + init_flag = false; + AppendExecStack(node); + DestroyQueue(&kBlockingQueue); +} + +inline void AppendExecStack(Node *exec_node) { + Enqueue(&kBlockingQueue, exec_node); +} + +void Tracing(char ops_type, void *ptr, size_t size) { + malloc_printf("trace stack start: %p, size : %ld\n", ptr, size); +#ifdef USE_LIB_UNWIND + if (!kInitialized) { + malloc_printf("tracing is not enabled.\n"); + return; + } + Node *exec_node = BorrowFromPool(); + malloc_printf("%d - trace stack exec_node: %p, exec stack : %p\n", (int)getpid(), exec_node, &exec_node->exec_stack); + exec_node->exec_stack.ops_type = ops_type; + exec_node->exec_stack.ptr = ptr; + int frames = unw_backtrace(exec_node->exec_stack.buffer, MAX_STACK_FRAMES); + exec_node->exec_stack.frames = frames; + exec_node->exec_stack.size = size; + // malloc_printf("trace stack : %p, size : %ld, frames : %d\n", ptr, size, frames); + AppendExecStack(exec_node); +#endif +} \ No newline at end of file