diff --git a/src/backend/storage/buffer/buf_init.c b/src/backend/storage/buffer/buf_init.c index 2862e9e412c59f83197a2dcce8852185726e2ce8..ef1774383e52193e97fa538da7b00d83f1058ba3 100644 --- a/src/backend/storage/buffer/buf_init.c +++ b/src/backend/storage/buffer/buf_init.c @@ -180,3 +180,5 @@ BufferShmemSize(void) return size; } + +#include "buf_init_ext.c" diff --git a/src/backend/storage/buffer/buf_init_ext.c b/src/backend/storage/buffer/buf_init_ext.c new file mode 100644 index 0000000000000000000000000000000000000000..3696a3c3a630ff1851473e59dcca561a3d263635 --- /dev/null +++ b/src/backend/storage/buffer/buf_init_ext.c @@ -0,0 +1,89 @@ +#include "storage/buf_ext.h" +#include "common/hashfn.h" + + +KPHCPciPageBuffCtx *gPciBufCtx = NULL; + + +//Default 8K * 1000 +Size KPHCPciBufferSize() +{ + return 1000 * 1024 * 8; +} + +void KPHCPciLruPushNoLock(KPHCPciLruList *lru, KPHCPciPageCtrl *item, KPHCCtrlState state) +{ + KPHCPciPageCtrl *temp = NULL; + if (lru->count == 0) { + item->lruNext = PCI_INVALID_ID; + item->lruPrev = PCI_INVALID_ID; + lru->first = item->ctrlId; + lru->last = item->ctrlId; + lru->count++; + item->state = state; + return ; + } + item->lruNext = lru->first; + item->lruPrev = PCI_INVALID_ID; + temp = PCI_GET_CTRL_BY_ID(gPciBufCtx, lru->first); + temp->lruPrev = item->ctrlId; + lru->first = item->ctrlId; + lru->count++; + item->state = state; +} + +/* + * KPHCPciBufInitCtx + * + * Init Pci buffer blocks + */ +void KPHCPciBufInitCtx() +{ + char *buf; + bool found = false; + Size buffer_size = KPHCPciBufferSize(); + char* pciBufferBlocks = + (char *)CACHELINEALIGN(ShmemInitStruct("PCI Buffer Blocks", buffer_size, &found)); + + if (found) { + return; + } + + buf = pciBufferBlocks; + memset((void*)buf, 0, sizeof(KPHCPciPageBuffCtx)); + + gPciBufCtx = (KPHCPciPageBuffCtx *)(void *)buf; + gPciBufCtx->maxCount = (uint32)((buffer_size - sizeof(KPHCPciPageBuffCtx)) / + (sizeof(KPHCPciPageCtrl) + BLCKSZ + 3 * sizeof(KPHCPciHashBucket))); + gPciBufCtx->ctrlBuf = (KPHCPciPageCtrl *)(void *)(buf + sizeof(KPHCPciPageBuffCtx)); + gPciBufCtx->pageBuf = + ((char *)(gPciBufCtx->ctrlBuf)) + (Size)((Size)gPciBufCtx->maxCount * sizeof(KPHCPciPageCtrl)); + gPciBufCtx->hashtbl.buckets = (KPHCPciHashBucket *)(void *)( + ((char *)(gPciBufCtx->pageBuf)) + (Size)((Size)gPciBufCtx->maxCount * BLCKSZ)); + + for (uint32 i = 0; i < PCI_PART_LIST_NUM; i++) { + + LWLockInitialize(&gPciBufCtx->mainLru[i].lock, (int)LWTRANCHE_PCI_BUFFER_CONTENT); + LWLockInitialize(&gPciBufCtx->freeLru[i].lock, (int)LWTRANCHE_PCI_BUFFER_CONTENT); + } + + for (uint32 i = 1; i <= gPciBufCtx->maxCount; i++) { + KPHCPciPageCtrl *ctrl = PCI_GET_CTRL_BY_ID(gPciBufCtx, i); + memset((void*)ctrl, 0, sizeof(KPHCPciPageCtrl)); + ctrl->ctrlId = i; + ctrl->pciPage = (CfsExtentHeader *)(void *)(gPciBufCtx->pageBuf + (Size)((Size)(i - 1) * BLCKSZ)); + LWLockInitialize(&ctrl->contentLock, (int)LWTRANCHE_PCI_BUFFER_CONTENT); + + KPHCPciLruPushNoLock(&gPciBufCtx->freeLru[ctrl->ctrlId % PCI_PART_LIST_NUM], ctrl, CTRL_STATE_FREE); + } + gPciBufCtx->hashtbl.hash = tag_hash; + gPciBufCtx->hashtbl.match = memcmp; + gPciBufCtx->hashtbl.bucketNum = 3 * gPciBufCtx->maxCount; + for (uint32 i = 1; i <= gPciBufCtx->hashtbl.bucketNum; i++) { + KPHCPciHashBucket *bucket = PCI_GET_BUCKET_BY_ID(gPciBufCtx, i); + memset((void*)bucket, 0, sizeof(KPHCPciHashBucket)); + bucket->bckId = i; + LWLockInitialize(&bucket->lock, (int)LWTRANCHE_PCI_BUFFER_CONTENT); + } +} + diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 9fcb3d6e1944151936048be791843824a9960378..ea65020b0e9883ffcee92eb8c71126690bc450d0 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -5015,3 +5015,5 @@ TestForOldSnapshot_impl(Snapshot snapshot, Relation relation) (errcode(ERRCODE_SNAPSHOT_TOO_OLD), errmsg("snapshot too old"))); } + +#include "bufmgr_ext.c" \ No newline at end of file diff --git a/src/backend/storage/buffer/bufmgr_ext.c b/src/backend/storage/buffer/bufmgr_ext.c new file mode 100644 index 0000000000000000000000000000000000000000..2f9c9e032d5e209030bce17c3abc771483089bbc --- /dev/null +++ b/src/backend/storage/buffer/bufmgr_ext.c @@ -0,0 +1,408 @@ +#include "storage/buf_ext.h" +#include "storage/fd.h" + +#include "common/hashfn.h" +#include "utils/wait_event.h" + + +/* + * KPHCPciKeyCmp + * + * compute pci key + */ +static +inline uint32 KPHCPciKeyCmp(KPHCCfsBufferKey *keya, KPHCCfsBufferKey *keyb) +{ + return (uint32)gPciBufCtx->hashtbl.match((void *)keya, (void *)keyb, sizeof(KPHCCfsBufferKey)); +} + +/* + * KPHCPciLockCount + * + * Getting Lock Count + */ +static +uint32 KPHCPciLockCount() +{ + Size buffer_size = 0; + uint32 ctrlCount = 0; + buffer_size = KPHCPciBufferSize(); + ctrlCount = (uint32)((buffer_size - sizeof(KPHCPciPageBuffCtx)) / + (sizeof(KPHCPciPageCtrl) + BLCKSZ + 3 * sizeof(KPHCPciHashBucket))); + return (PCI_LRU_LIST_NUM + ctrlCount + ctrlCount * 3); +} + +/* + * KPHCPciHashCode + * + * Getting Hash Code + */ +static +inline uint32 KPHCPciHashCode(KPHCCfsBufferKey *key) +{ + return gPciBufCtx->hashtbl.hash((void *)key, sizeof(KPHCCfsBufferKey)); +} + +/* + * KPHCPciBufFindFromBucket + * + * Getting Ctrl from bucket + */ +static +KPHCPciPageCtrl *KPHCPciBufFindFromBucket(KPHCPciHashBucket *bucket, KPHCCfsBufferKey *key) +{ + KPHCPciPageCtrl *ctrl = NULL; + + uint32 ctrlId = bucket->firstCtrlId; + while (ctrlId != PCI_INVALID_ID) { + ctrl = PCI_GET_CTRL_BY_ID(gPciBufCtx, ctrlId); + if (KPHCPciKeyCmp(key, &ctrl->pciKey) == 0) { + return ctrl; + } + ctrlId = ctrl->bckNext; + } + + return NULL; +} + +/* + * KPHCPciLruRemove + * + * remove Ctrl page from bucket + */ +static +void KPHCPciLruRemove(KPHCPciLruList *lru, KPHCPciPageCtrl *item) +{ + KPHCPciPageCtrl *prev = NULL; + KPHCPciPageCtrl *next = NULL; + + if (item->lruPrev != PCI_INVALID_ID) { + prev = PCI_GET_CTRL_BY_ID(gPciBufCtx, item->lruPrev); + prev->lruNext = item->lruNext; + } else { + lru->first = item->lruNext; + } + + if (item->lruNext != PCI_INVALID_ID) { + next = PCI_GET_CTRL_BY_ID(gPciBufCtx, item->lruNext); + next->lruPrev = item->lruPrev; + } else { + lru->last = item->lruPrev; + } + + item->lruPrev = PCI_INVALID_ID; + item->lruNext = PCI_INVALID_ID; + item->state = CTRL_STATE_INIT; + lru->count--; + + return ; +} + +/* + * KPHCPciBufRemoveFromBucket + * + * remove ctrl page from bucket + */ +static +void KPHCPciBufRemoveFromBucket(KPHCPciHashBucket *bucket, KPHCPciPageCtrl *item) +{ + KPHCPciPageCtrl *prev = NULL; + KPHCPciPageCtrl *next = NULL; + + if (item->bckPrev != PCI_INVALID_ID) { + prev = PCI_GET_CTRL_BY_ID(gPciBufCtx, item->bckPrev); + prev->bckNext = item->bckNext; + } else { + bucket->firstCtrlId = item->bckNext; + } + + if (item->bckNext != PCI_INVALID_ID) { + next = PCI_GET_CTRL_BY_ID(gPciBufCtx, item->bckNext); + next->bckPrev = item->bckPrev; + } + + item->bckPrev = PCI_INVALID_ID; + item->bckNext = PCI_INVALID_ID; + item->bckId = PCI_INVALID_ID; + + memset(&item->pciKey, 0x00, sizeof(item->pciKey)); + + bucket->ctrlCount--; +} + +/* + * KPHCPciBufFindFromBucket + * + * remove buf from bucket + */ +static +bool KPHCPciBufRecycleCoreTry(KPHCPciPageCtrl *item) +{ + KPHCPciHashBucket *bucket = NULL; + if ((pg_atomic_read_u32(&item->refNum) > 0) || (pg_atomic_read_u32(&item->touchNum) >= 3)) { + return false; + } + + bucket = PCI_GET_BUCKET_BY_ID(gPciBufCtx, item->bckId); + if (!LWLockConditionalAcquire(&bucket->lock, LW_EXCLUSIVE)) { + return false; + } + + if ((pg_atomic_read_u32(&item->refNum) > 0) || (pg_atomic_read_u32(&item->touchNum) >= 3)) { + LWLockRelease(&bucket->lock); + return false; + } + + KPHCPciBufRemoveFromBucket(bucket, item); + pg_atomic_write_u32(&item->touchNum, 0); + LWLockRelease(&bucket->lock); + return true; +} + + +/* + * KPHCPciLruPop + * + * get ctrl page from LRU + */ +static +KPHCPciPageCtrl *KPHCPciLruPop(KPHCPciLruList *lru) +{ + KPHCPciPageCtrl *item = NULL; + + if (lru->count == 0) { + return NULL; + } + (void)LWLockAcquire(&lru->lock, LW_EXCLUSIVE); + + if (lru->count == 0) { + LWLockRelease(&lru->lock); + return NULL; + } + + item = PCI_GET_CTRL_BY_ID(gPciBufCtx, lru->last); + KPHCPciLruRemove(lru, item); + LWLockRelease(&lru->lock); + return item; +} + +/* + * KPHCPciLruPop + * + * remove ctrl page from LRU + */ +static +KPHCPciPageCtrl *KPHCPciBufRecycleCore(KPHCPciLruList *mainLru) +{ + uint32 step = 0; + KPHCPciPageCtrl *item = NULL; + KPHCPciPageCtrl *res = NULL; + uint32 expected_val = 0; + uint32 currCtrlId = 0; + + if (!LWLockConditionalAcquire(&mainLru->lock, LW_EXCLUSIVE)) { + return res; + } + + currCtrlId = mainLru->last; + while (currCtrlId != PCI_INVALID_ID) { + if (step >= 1024) { + break; + } + + item = PCI_GET_CTRL_BY_ID(gPciBufCtx, currCtrlId); + if (KPHCPciBufRecycleCoreTry(item)) { + KPHCPciLruRemove(mainLru, item); + res = item; + break; + } + + expected_val = pg_atomic_read_u32(&item->touchNum); + pg_atomic_compare_exchange_u32_impl(&item->touchNum, &expected_val, expected_val/2); + + step++; + currCtrlId = item->lruPrev; + } + + LWLockRelease(&mainLru->lock); + return res; +} + +/* + * KPHCPciBufRecycle + * + * get free page + */ +static +KPHCPciPageCtrl *KPHCPciBufRecycle(uint32 randVal) +{ + KPHCPciPageCtrl *item = NULL; + uint8 randStart = randVal % PCI_PART_LIST_NUM; + + while (1) { + for (uint8 i = 0; i < PCI_PART_LIST_NUM; i++) { + item = KPHCPciLruPop(&gPciBufCtx->freeLru[(randStart + i) % PCI_PART_LIST_NUM]); + if (item != NULL) { + return item; + } + } + for (uint8 i = 0; i < PCI_PART_LIST_NUM; i++) { + item = KPHCPciBufRecycleCore(&gPciBufCtx->mainLru[(randStart + i) % PCI_PART_LIST_NUM]); + if (item != NULL) { + return item; + } + if (gPciBufCtx->freeLru[i].count != 0) { + break; + } + } + } +} + + + +/* + * KPHCPciBufLoadPage + * + * Loading pci page from file + */ +static +void KPHCPciBufLoadPage(KPHCPciPageCtrl *item, const KPHCExtentLocation location, KPHCCfsBufferKey *pci_key) +{ + int nbytes = 0; + memcpy(&item->pciKey, (char *)pci_key, sizeof(KPHCCfsBufferKey)); + nbytes = FileRead(location.fd, (char *)item->pciPage, BLCKSZ, + location.headerNum * BLCKSZ, (uint32)WAIT_EVENT_DATA_FILE_READ); + if (nbytes != BLCKSZ) { + item->loadStatus = CTRL_PAGE_LOADED_ERROR; + ereport(DEBUG5, (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Failed to pci_buf_load_page %s, headerNum: %u.", FilePathName(location.fd), location.headerNum))); + return; + } + item->loadStatus = CTRL_PAGE_IS_LOADED; + return; +} + +/* + * KPHCPciBufAddIntoBucket + * + * adding pci ctrl page into bucket + */ +static +void KPHCPciBufAddIntoBucket(KPHCPciHashBucket *bucket, KPHCPciPageCtrl *item) +{ + KPHCPciPageCtrl *head = NULL; + if (bucket->ctrlCount == 0) { + item->bckId = bucket->bckId; + item->bckPrev = PCI_INVALID_ID; + item->bckNext = PCI_INVALID_ID; + bucket->firstCtrlId = item->ctrlId; + bucket->ctrlCount++; + return; + } + head = PCI_GET_CTRL_BY_ID(gPciBufCtx, bucket->firstCtrlId); + item->bckId = bucket->bckId; + item->bckPrev = PCI_INVALID_ID; + item->bckNext = head->ctrlId; + head->bckPrev = item->ctrlId; + bucket->firstCtrlId = item->ctrlId; + bucket->ctrlCount++; +} + +/* + * KPHCPciLruPush + * + * adding pci ctrl page into LRU + */ +static +void KPHCPciLruPush(KPHCPciLruList *lru, KPHCPciPageCtrl *item, KPHCCtrlState state) +{ + (void)LWLockAcquire(&lru->lock, LW_EXCLUSIVE); + KPHCPciLruPushNoLock(lru, item, state); + LWLockRelease(&lru->lock); +} + +/* + * KPHCPciBufReadPage + * + * Reading page from Pci buffer + */ +KPHCPciPageCtrl *KPHCPciBufReadPage(const KPHCExtentLocation location, LWLockMode lockMode, KPHCPciBufferReadMode readMode) +{ + KPHCCfsBufferKey key = {{location.relFileNode.spcNode, location.relFileNode.dbNode, location.relFileNode.relNode, + location.relFileNode.bucketNode}, + location.extentNumber}; + uint32 hashcode = 0; + KPHCPciHashBucket *bucket = NULL; + KPHCPciPageCtrl *ctrl = NULL; + KPHCPciPageCtrl *item = NULL; + + hashcode = KPHCPciHashCode(&key); + bucket = PCI_GET_BUCKET_BY_HASH(gPciBufCtx, hashcode); + (void)LWLockAcquire(&bucket->lock, LW_EXCLUSIVE); + ctrl = KPHCPciBufFindFromBucket(bucket, &key); + if (ctrl != NULL) { + (void)LWLockAcquire(&ctrl->contentLock, lockMode); + if (ctrl->loadStatus != CTRL_PAGE_IS_LOADED && readMode == PCI_BUF_NORMAL_READ) { + KPHCPciBufLoadPage(ctrl, location, &key); + } + (void)pg_atomic_fetch_add_u32(&ctrl->refNum, 1); + (void)pg_atomic_fetch_add_u32(&ctrl->touchNum, 3); + LWLockRelease(&bucket->lock); + return ctrl; + } + LWLockRelease(&bucket->lock); + item = KPHCPciBufRecycle(location.extentNumber); + (void)LWLockAcquire(&bucket->lock, LW_EXCLUSIVE); + ctrl = KPHCPciBufFindFromBucket(bucket, &key); + if (ctrl != NULL) { + (void)LWLockAcquire(&ctrl->contentLock, lockMode); + if (ctrl->loadStatus != CTRL_PAGE_IS_LOADED && readMode == PCI_BUF_NORMAL_READ) { + KPHCPciBufLoadPage(ctrl, location, &key); + } + (void)pg_atomic_fetch_add_u32(&ctrl->refNum, 1); + (void)pg_atomic_fetch_add_u32(&ctrl->touchNum, 1); + LWLockRelease(&bucket->lock); + KPHCPciLruPush(&gPciBufCtx->freeLru[item->ctrlId % PCI_PART_LIST_NUM], item, CTRL_STATE_FREE); + return ctrl; + } + pg_atomic_write_u32(&item->refNum, 1); + pg_atomic_write_u32(&item->touchNum, 3); + memset((void*)item->pciPage, 0, BLCKSZ); + item->loadStatus = CTRL_PAGE_NO_LOAD; + if (readMode == PCI_BUF_NORMAL_READ) { + KPHCPciBufLoadPage(item, location, &key); + } + KPHCPciBufAddIntoBucket(bucket, item); + (void)LWLockAcquire(&item->contentLock, lockMode); + LWLockRelease(&bucket->lock); + KPHCPciLruPush(&gPciBufCtx->mainLru[item->ctrlId % PCI_PART_LIST_NUM], item, CTRL_STATE_USED); + return item; +} + + +/* + * KPHCPciBufFreePage + * + * Free page + */ +void KPHCPciBufFreePage(KPHCPciPageCtrl *ctrl, const KPHCExtentLocation location, bool need_write) +{ + int nbytes = 0; + if (need_write) { + nbytes = FileWrite(location.fd, (char *)ctrl->pciPage, BLCKSZ, location.headerNum * BLCKSZ, + (uint32)WAIT_EVENT_DATA_FILE_WRITE); + if (nbytes != BLCKSZ) { + (void)pg_atomic_fetch_sub_u32(&ctrl->refNum, 1); + ctrl->loadStatus = CTRL_PAGE_LOADED_ERROR; + LWLockRelease(&ctrl->contentLock); + + ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("Failed to KPHCPciBufFreePage %s", FilePathName(location.fd)))); + return; + } + ctrl->loadStatus = CTRL_PAGE_IS_LOADED; + } + + (void)pg_atomic_fetch_sub_u32(&ctrl->refNum, 1); + LWLockRelease(&ctrl->contentLock); +} \ No newline at end of file diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 3bb448627cd0ea0e4b36e22f890b078990f0196e..36d6c59fd08f2b1d2e8f2b6fe2fb2d6263b8726e 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -183,6 +183,8 @@ static const char *const BuiltinTrancheNames[] = { "PgStatsHash", /* LWTRANCHE_PGSTATS_DATA: */ "PgStatsData", + /* LWTRANCHE_PCI_BUFFER: */ + "PgPciBuffer", }; StaticAssertDecl(lengthof(BuiltinTrancheNames) == diff --git a/src/backend/storage/page/page_compression.c b/src/backend/storage/page/page_compression.c index e21eb0ae604f521c5852d72c5a9a2258c6d27daf..7cd45892e29f3a085ff964dcb5fa983a407201df 100644 --- a/src/backend/storage/page/page_compression.c +++ b/src/backend/storage/page/page_compression.c @@ -1,6 +1,7 @@ #include "postgres.h" #include "storage/fd.h" #include "storage/bufpage.h" +#include "storage/page_common.h" #include "storage/page_compression.h" #include "utils/rel.h" #include "utils/wait_event.h" @@ -8,7 +9,8 @@ #ifdef USE_ZSTD #include #endif -extern KPHCCfsLocationConvert cfsLocationConverts[]; + +const KPHC_CFS_STORAGE_TYPE COMMON_STORAGE = 0; uint32 AddrChecksum32(const KPHCCfsExtentAddress *cfsExtentAddress, const int needChunks); @@ -306,6 +308,46 @@ char *KPHCCfsCompressPage(const char *buffer, KPHCRelFileCompressOption *option, return workBuffer; } +int KPHCDecompressPage(const char* src, char* dst) +{ + int decompressed_size; + char* data; +#ifdef USE_ZSTD + uint32 size; +#endif + int algorithm; + size_t headerSize = SizeOfPageHeaderData; + memcpy(dst, src, headerSize); + data = ((KPHCPageCompressData*)src)->data; +#ifdef USE_ZSTD + size = ((KPHCPageCompressData*)src)->size; +#endif + algorithm = ((KPHCPageCompressData*)src)->algorithm; + + switch (algorithm) { + case KPHC_COMPRESS_ALGORITHM_PGLZ: + decompressed_size = KPHCLzDecompress((const KPHC_PGLZ_Header *)data, dst + headerSize); + if (decompressed_size == -1) { + return -1; + } + break; + case KPHC_COMPRESS_ALGORITHM_ZSTD: +#ifdef USE_ZSTD + decompressed_size = ZSTD_decompress(dst + headerSize, BLCKSZ - headerSize, data, size); + if (ZSTD_isError(decompressed_size)) { + return -1; + } +#else + return -1; +#endif + break; + default: + return KPHC_COMPRESS_UNSUPPORTED_ERROR; + break; + } + return headerSize + decompressed_size; +} + void KPHCAnalyzeCompressOptions(const RelFileNode node, KPHCRelFileCompressOption* opt) { unsigned short compressOption = node.opt; @@ -385,3 +427,48 @@ size_t KPHCCfsWritePage(SMgrRelation reln, ForkNumber forknum, BlockNumber logic KPHCPciBufFreePage(ctrl, location, changed); return BLCKSZ; } + +int KPHCCfsReadPage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, KPHC_CFS_STORAGE_TYPE type) +{ + uint32 chunkSize; + uint32 startOffset; + char *compressedBuffer; + char *bufferPos; + off_t seekPos; + uint8 start; + int readAmount; + KPHCExtentLocation location = cfsLocationConverts[type](reln, forknum, logicBlockNumber, false, KPHC_EXTENT_OPEN_FILE); + KPHCPciPageCtrl *ctrl = KPHCPciBufReadPage(location, LW_SHARED, PCI_BUF_NORMAL_READ); + KPHCCfsExtentHeader *cfsExtentHeader = ctrl->pciPage; + KPHCCfsExtentAddress *cfsExtentAddress = KPHCGetExtentAddress(cfsExtentHeader, (uint16)location.extentOffset); + KPHCRelFileCompressOption option; + KPHCAnalyzeCompressOptions(reln->smgr_rnode.node, &option); + chunkSize = CHUNK_SIZE_LIST[option.compressChunkSize]; + startOffset = location.extentStart * BLCKSZ; + compressedBuffer = (char *) palloc(chunkSize * cfsExtentAddress->nchunks); + bufferPos = compressedBuffer; + + for (uint32 i = 0; i < cfsExtentAddress->nchunks; i++) { + bufferPos = compressedBuffer + (long)chunkSize * i; + seekPos = OffsetOfPageCompressChunk((uint16)chunkSize, cfsExtentAddress->chunknos[i]) + startOffset; + start = (uint8)i; + while (i < cfsExtentAddress->nchunks - 1 && + cfsExtentAddress->chunknos[i + 1] == cfsExtentAddress->chunknos[i] + 1) { + i++; + } + readAmount = (int)(chunkSize * ((int)(i - (int)start) + 1)); + FileRead(location.fd, bufferPos, readAmount, seekPos, (uint32)WAIT_EVENT_DATA_FILE_READ); + } + + if (cfsExtentAddress->nchunks == (BLCKSZ / chunkSize)) { + memcpy(buffer, compressedBuffer, BLCKSZ); + } else if (KPHCDecompressPage(compressedBuffer, buffer) != BLCKSZ) { + memset(buffer, 0, BLCKSZ); + KPHCPciBufFreePage(ctrl, location, false); + pfree(compressedBuffer); + return -1; + } + KPHCPciBufFreePage(ctrl, location, false); + pfree(compressedBuffer); + return BLCKSZ; +} diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index c8d737162008c96c1d9b8eed90040f70205c9af2..a9051d1f518c2fca46998256c07c1dc55721c1c4 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -669,18 +669,6 @@ mdwriteback(SMgrRelation reln, ForkNumber forknum, } } -const KPHC_CFS_STORAGE_TYPE COMMON_STORAGE = 0; - -enum KPHC_SMGR_READ_STATUS { - SMGR_RD_OK = 0, - SMGR_RD_NO_BLOCK = 1, - SMGR_RD_CRC_ERROR = 2 -}; - -KPHCExtentLocation KPHCStorageConvert(SMgrRelation sRel, ForkNumber forknum, BlockNumber logicBlockNumber, bool skipSync, int type); -int KPHCDecompressPage(const char* src, char* dst); -int KPHCCfsReadPage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, KPHC_CFS_STORAGE_TYPE type); - KPHCExtentLocation KPHCStorageConvert(SMgrRelation sRel, ForkNumber forknum, BlockNumber logicBlockNumber, bool skipSync, int type) { @@ -726,98 +714,6 @@ KPHCCfsLocationConvert cfsLocationConverts[2] = { NULL }; -inline size_t SizeOfKPHCExtentAddress(uint16 chunkSize) { - if (chunkSize == 0) { - return -1; - } - return offsetof(KPHCCfsExtentAddress, chunknos) + sizeof(uint16) * BLCKSZ / chunkSize; -} - -int KPHCDecompressPage(const char* src, char* dst) -{ - int decompressed_size; - char* data; -#ifdef USE_ZSTD - uint32 size; -#endif - int algorithm; - size_t headerSize = SizeOfPageHeaderData; - memcpy(dst, src, headerSize); - data = ((KPHCPageCompressData*)src)->data; -#ifdef USE_ZSTD - size = ((KPHCPageCompressData*)src)->size; -#endif - algorithm = ((KPHCPageCompressData*)src)->algorithm; - - switch (algorithm) { - case KPHC_COMPRESS_ALGORITHM_PGLZ: - decompressed_size = KPHCLzDecompress((const KPHC_PGLZ_Header *)data, dst + headerSize); - if (decompressed_size == -1) { - return -1; - } - break; - case KPHC_COMPRESS_ALGORITHM_ZSTD: -#ifdef USE_ZSTD - decompressed_size = ZSTD_decompress(dst + headerSize, BLCKSZ - headerSize, data, size); - if (ZSTD_isError(decompressed_size)) { - return -1; - } -#else - return -1; -#endif - break; - default: - return KPHC_COMPRESS_UNSUPPORTED_ERROR; - break; - } - return headerSize + decompressed_size; -} - -int KPHCCfsReadPage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, - KPHC_CFS_STORAGE_TYPE type) -{ - uint32 chunkSize; - uint32 startOffset; - char *compressedBuffer; - char *bufferPos; - off_t seekPos; - uint8 start; - int readAmount; - KPHCExtentLocation location = cfsLocationConverts[type](reln, forknum, logicBlockNumber, false, KPHC_EXTENT_OPEN_FILE); - KPHCPciPageCtrl *ctrl = KPHCPciBufReadPage(location, LW_SHARED, PCI_BUF_NORMAL_READ); - KPHCCfsExtentHeader *cfsExtentHeader = ctrl->pciPage; - KPHCCfsExtentAddress *cfsExtentAddress = KPHCGetExtentAddress(cfsExtentHeader, (uint16)location.extentOffset); - KPHCRelFileCompressOption option; - KPHCAnalyzeCompressOptions(reln->smgr_rnode.node, &option); - chunkSize = CHUNK_SIZE_LIST[option.compressChunkSize]; - startOffset = location.extentStart * BLCKSZ; - compressedBuffer = (char *) palloc(chunkSize * cfsExtentAddress->nchunks); - bufferPos = compressedBuffer; - - for (uint32 i = 0; i < cfsExtentAddress->nchunks; i++) { - bufferPos = compressedBuffer + (long)chunkSize * i; - seekPos = OffsetOfPageCompressChunk((uint16)chunkSize, cfsExtentAddress->chunknos[i]) + startOffset; - start = (uint8)i; - while (i < cfsExtentAddress->nchunks - 1 && - cfsExtentAddress->chunknos[i + 1] == cfsExtentAddress->chunknos[i] + 1) { - i++; - } - readAmount = (int)(chunkSize * ((int)(i - (int)start) + 1)); - FileRead(location.fd, bufferPos, readAmount, seekPos, (uint32)WAIT_EVENT_DATA_FILE_READ); - } - - if (cfsExtentAddress->nchunks == (BLCKSZ / chunkSize)) { - memcpy(buffer, compressedBuffer, BLCKSZ); - } else if (KPHCDecompressPage(compressedBuffer, buffer) != BLCKSZ) { - memset(buffer, 0, BLCKSZ); - KPHCPciBufFreePage(ctrl, location, false); - pfree(compressedBuffer); - return -1; - } - KPHCPciBufFreePage(ctrl, location, false); - pfree(compressedBuffer); - return BLCKSZ; -} /* * mdread() -- Read the specified block from a relation. */ diff --git a/src/include/storage/buf_ext.h b/src/include/storage/buf_ext.h new file mode 100644 index 0000000000000000000000000000000000000000..e358caa49526843d0e2ab617194df0f219cf3225 --- /dev/null +++ b/src/include/storage/buf_ext.h @@ -0,0 +1,143 @@ +/*------------------------------------------------------------------------- + * + * buf_ext.h + * Internal definitions for buffer manager ext and the buffer init ext. + * + * + * + * src/include/storage/buf_ext.h + * + *------------------------------------------------------------------------- + */ +#ifndef BUF_EXT_H +#define BUF_EXT_H + +#include "buf_internals.h" +#include "utils/hsearch.h" +#include "postgres.h" + +// 待确定 +#define LWTRANCHE_PCI_BUFFER_CONTENT (8) + + +#define PCI_PART_LIST_NUM (8) +#define PCI_LRU_LIST_NUM (PCI_PART_LIST_NUM * 2) + +#define PCI_INVALID_ID (0) + +#define PCI_GET_CTRL_BY_ID(ctx, ctrlId) ((KPHCPciPageCtrl *)(&(ctx->ctrlBuf[(ctrlId) - 1]))) +#define PCI_GET_BUCKET_BY_ID(ctx, bckId) ((KPHCPciHashBucket *)(&(ctx->hashtbl.buckets[(bckId) - 1]))) +#define PCI_GET_BUCKET_BY_HASH(ctx, hashcode) (PCI_GET_BUCKET_BY_ID(ctx, (((hashcode) % ctx->hashtbl.bucketNum) + 1))) + +#define PCI_SET_NO_READ(ctrl) (ctrl->loadStatus = CTRL_PAGE_NO_LOAD) + + +typedef struct CfsExtentHeader { + void *exHeader; +} CfsExtentHeader; + + +typedef struct RelFileNodeKp { + Oid spcNode; + Oid dbNode; + Oid relNode; + uint8 bucketNode; +} RelFileNodeKp; + +typedef struct KPHCExtentLocation { + int fd; + RelFileNodeKp relFileNode; + BlockNumber extentNumber; + BlockNumber extentStart; + BlockNumber extentOffset; + BlockNumber headerNum; + uint16 chrunkSize; + uint8 algorithm; +} KPHCExtentLocation; + + +typedef struct stPciHashBucket { + LWLock lock; + uint32 bckId; + uint32 ctrlCount; + uint32 firstCtrlId; +} KPHCPciHashBucket; + +typedef struct stPciPageHashtbl { + HashValueFunc hash; + HashCompareFunc match; + uint32 bucketNum; + KPHCPciHashBucket *buckets; +} KPHCPciPageHashtbl; + +typedef struct stPciLruList { + LWLock lock; + volatile uint32 count; + volatile uint32 first; + volatile uint32 last; +} KPHCPciLruList; + + +typedef struct KPHCCfsBufferKey { + RelFileNodeKp relFileNode; + uint32 extentCount; +} KPHCCfsBufferKey; + +typedef enum enCtrlState { + CTRL_STATE_INIT = 0, + CTRL_STATE_USED = 1, + CTRL_STATE_FREE = 2, +} KPHCCtrlState; + +typedef enum enCtrlLoadStatus { + CTRL_PAGE_NO_LOAD, + CTRL_PAGE_IS_LOADED, + CTRL_PAGE_LOADED_ERROR +} KPHCCtrlLoadStatus; + + +typedef struct __attribute__((aligned(128))) stPciPageCtrl +{ + uint32 ctrlId; + KPHCCtrlState state; + volatile uint32 lruPrev; + volatile uint32 lruNext; + volatile uint32 bckPrev; + volatile uint32 bckNext; + volatile uint32 bckId; + + pg_atomic_uint32 touchNum; + pg_atomic_uint32 refNum; + LWLock contentLock; + KPHCCtrlLoadStatus loadStatus; + KPHCCfsBufferKey pciKey; + CfsExtentHeader *pciPage; +} KPHCPciPageCtrl; + +typedef struct __attribute__((aligned(128))) stPciPageBuffCtx +{ + KPHCPciPageCtrl *ctrlBuf; + char *pageBuf; + KPHCPciPageHashtbl hashtbl; + uint32 maxCount; + KPHCPciLruList mainLru[PCI_PART_LIST_NUM]; + KPHCPciLruList freeLru[PCI_PART_LIST_NUM]; +} KPHCPciPageBuffCtx; + +typedef enum pciBufferReadMode { + PCI_BUF_NORMAL_READ, + PCI_BUF_NO_READ +} KPHCPciBufferReadMode; + + +extern KPHCPciPageBuffCtx *gPciBufCtx; + +void KPHCPciBufInitCtx(); +Size KPHCPciBufferSize(); + +void KPHCPciBufFreePage(KPHCPciPageCtrl *ctrl, const KPHCExtentLocation location, bool need_write); +void KPHCPciLruPushNoLock(KPHCPciLruList *lru, KPHCPciPageCtrl *item, KPHCCtrlState state); + +KPHCPciPageCtrl *KPHCPciBufReadPage(const KPHCExtentLocation location, LWLockMode lockMode, KPHCPciBufferReadMode readMode); + +#endif \ No newline at end of file diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index e03d317eeac744a5ca2fe18992b3572619e59bad..d4bd519ba84f374597afd8a60053f270134c5527 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -193,6 +193,7 @@ typedef enum BuiltinTrancheIds LWTRANCHE_PGSTATS_DSA, LWTRANCHE_PGSTATS_HASH, LWTRANCHE_PGSTATS_DATA, + LWTRANCHE_PCI_BUFFER_CONTENT, LWTRANCHE_FIRST_USER_DEFINED } BuiltinTrancheIds; diff --git a/src/include/storage/page_common.h b/src/include/storage/page_common.h new file mode 100644 index 0000000000000000000000000000000000000000..6dda937ff254c22ee42817bb17f1861bbbd65374 --- /dev/null +++ b/src/include/storage/page_common.h @@ -0,0 +1,6 @@ +#ifndef PAGE_COMMON_H +#define PAGE_COMMON_H + +// 公共头结构放这里,例如KPHCExtentLocation、KPHCCfsExtentAddress、KPHCCfsExtentHeader + +#endif \ No newline at end of file diff --git a/src/include/storage/page_compression.h b/src/include/storage/page_compression.h index 25598e9406c014901fcfdf0f0111eeb3b4fa43ac..22b3422c536174bbfb217a1a759c28a6a4177f30 100644 --- a/src/include/storage/page_compression.h +++ b/src/include/storage/page_compression.h @@ -6,6 +6,7 @@ #include "storage/bufpage.h" #include "utils/rel.h" #include "storage/cfs_buffers.h" + #define KPHC_CMP_BYTE_CONVERT_LEN 1 #define KPHC_CMP_DIFF_CONVERT_LEN 1 #define KPHC_CMP_PRE_CHUNK_LEN 3 @@ -55,6 +56,12 @@ typedef struct relFileCompressOption { unsigned compressChunkSize : KPHC_CMP_CHUNK_SIZE_LEN; }KPHCRelFileCompressOption; +enum KPHC_SMGR_READ_STATUS { + SMGR_RD_OK = 0, + SMGR_RD_NO_BLOCK = 1, + SMGR_RD_CRC_ERROR = 2 +}; + typedef struct PageCompressData { char page_header[SizeOfPageHeaderData]; uint32 crc32; @@ -101,6 +108,8 @@ typedef struct HeapPageCompressData { #define REL_SUPPORT_COMPRESSED(relation) SUPPORT_COMPRESSED((relation)->rd_rel->relkind, (relation)->rd_rel->relam) +extern const KPHC_CFS_STORAGE_TYPE COMMON_STORAGE; + extern const uint32 INDEX_OF_HALF_BLCKSZ; extern const uint32 INDEX_OF_QUARTER_BLCKSZ; extern const uint32 INDEX_OF_EIGHTH_BLCKSZ; @@ -117,19 +126,23 @@ extern const int KPHC_EXTENT_CREATE_FILE; extern const KPHCCmpBitStuct gCmpBitStruct[]; extern const uint32 CHUNK_SIZE_LIST[4]; +extern KPHCCfsLocationConvert cfsLocationConverts[]; uint8 ConvertChunkSize(uint32 compressedChunkSize, bool *success); void SetupPageCompressForRelation(RelFileNode *node, PageCompressOpts *compress_options, const char *relationName); +KPHCExtentLocation KPHCStorageConvert(SMgrRelation sRel, ForkNumber forknum, BlockNumber logicBlockNumber, bool skipSync, int type); + int KPHCCompressPage(const char* src, char* dst, int dst_size, const KPHCRelFileCompressOption option); char *KPHCCfsCompressPage(const char *buffer, KPHCRelFileCompressOption *option, uint8 *nchunks); +int KPHCDecompressPage(const char* src, char* dst); void KPHCAnalyzeCompressOptions(const RelFileNode node, KPHCRelFileCompressOption* opt); +off_t OffsetOfPageCompressChunk(uint16 chunkSize, int chunkNo); +KPHCCfsExtentAddress *KPHCGetExtentAddress(KPHCCfsExtentHeader *header, uint16 blockOffset); size_t KPHCCfsWritePage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, bool sipSync, KPHC_CFS_STORAGE_TYPE type); +int KPHCCfsReadPage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, KPHC_CFS_STORAGE_TYPE type); -off_t OffsetOfPageCompressChunk(uint16 chunkSize, int chunkNo); - -KPHCCfsExtentAddress *KPHCGetExtentAddress(KPHCCfsExtentHeader *header, uint16 blockOffset); #endif // PAGE_COMPRESSION_H \ No newline at end of file