From a61a6fcdfc9f3e4fd2c7a7025056b6fff2676b8e Mon Sep 17 00:00:00 2001 From: chenlinfeng <723609220@qq.com> Date: Mon, 17 Feb 2025 15:51:18 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E6=B7=BB=E5=8A=A0writeback?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/storage/smgr/md.c | 80 ++++++++++++++++++++++++++++++++++- 1 file changed, 79 insertions(+), 1 deletion(-) diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index 1cfad80..a496cb2 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -697,7 +697,81 @@ mdprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) return true; } - +#define KPHCGET_COMPRESS_CHUNK_SIZE(opt) \ + (((opt) >> gCmpBitStruct[KPHC_CMP_CHUNK_SIZE_INDEX].moveBit) & gCmpBitStruct[KPHC_CMP_CHUNK_SIZE_INDEX].mask) +inline static int KPHCPageCompressChunkSize(SMgrRelation reln) +{ + return CHUNK_SIZE_LIST[KPHCGET_COMPRESS_CHUNK_SIZE((reln)->smgr_rnode.node.opt)]; +} +void KPHCCfsWriteBack(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, BlockNumber nblocks, + KPHC_CFS_STORAGE_TYPE type) +{ + unsigned int segnumStart; + unsigned int segnumEnd; + BlockNumber nflush; + off_t seekPos; + KPHCPciPageCtrl *ctrl; + KPHCCfsExtentHeader *cfsExtentHeader; + KPHCCfsExtentAddress *cfsExtentAddress; + int nchunks; + int seekPosChunk; + int lastChunk; + while (nblocks > 0) { + KPHCExtentLocation location = cfsLocationConverts[type](reln, forknum, blocknum, true, KPHC_WRITE_BACK_OPEN_FILE); + if (location.fd == -1) { + return; + } + segnumStart = blocknum / KPHC_CFS_LOGIC_BLOCKS_PER_FILE; + segnumEnd = (blocknum + nblocks - 1) / KPHC_CFS_LOGIC_BLOCKS_PER_FILE; + nflush = nblocks; + if (segnumStart != segnumEnd) { + nflush = KPHC_CFS_LOGIC_BLOCKS_PER_FILE - (blocknum % KPHC_CFS_LOGIC_BLOCKS_PER_FILE); + } + for (BlockNumber iblock = 0; iblock < nflush; ++iblock) { + uint32 chunkSize = KPHCPageCompressChunkSize(reln); + location = + cfsLocationConverts[type](reln, forknum, blocknum + iblock, true, KPHC_WRITE_BACK_OPEN_FILE); + + ctrl = KPHCPciBufReadPage(location, LW_SHARED, PCI_BUF_NORMAL_READ); + if (ctrl->loadStatus == CTRL_PAGE_LOADED_ERROR) { + KPHCPciBufFreePage(ctrl, location, false); + ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg("Failed to CfsWriteBack %s, headerNum: %u.", + FilePathName(location.fd), + location.headerNum))); + } + cfsExtentHeader = ctrl->pciPage; + + cfsExtentAddress = KPHCGetExtentAddress(cfsExtentHeader, (uint16)location.extentOffset); + bool firstEnter = true; + + for (uint8 i = 0; i < cfsExtentAddress->nchunks; ++i) { + if (firstEnter) { + seekPosChunk = cfsExtentAddress->chunknos[i]; + lastChunk = seekPosChunk; + firstEnter = false; + } else if (cfsExtentAddress->chunknos[i] == lastChunk + 1) { + lastChunk++; + } else { + seekPos = OffsetOfPageCompressChunk((uint16)chunkSize, (int)seekPosChunk); + nchunks = (lastChunk - seekPosChunk) + 1; + FileWriteback(location.fd, seekPos, (off_t) nchunks * chunkSize, WAIT_EVENT_DATA_FILE_FLUSH); + seekPosChunk = cfsExtentAddress->chunknos[i]; + lastChunk = seekPosChunk; + } + } + /* flush the rest chunks */ + if (!firstEnter) { + seekPos = (off_t) chunkSize * seekPosChunk; + nchunks = (lastChunk - seekPosChunk) + 1; + FileWriteback(location.fd, seekPos, (off_t) nchunks * chunkSize, WAIT_EVENT_DATA_FILE_FLUSH); + } + + KPHCPciBufFreePage(ctrl, location, false); + } + nblocks -= nflush; + blocknum += nflush; + } +} /* * mdwriteback() -- Tell the kernel to write pages back to storage. * @@ -708,6 +782,10 @@ void mdwriteback(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, BlockNumber nblocks) { + if (KPHC_IS_COMPRESSED_MAINFORK(reln, forknum)) { + KPHCCfsWriteBack(reln, forknum, blocknum, nblocks, COMMON_STORAGE); + return; + } /* * Issue flush requests in as few requests as possible; have to split at * segment boundaries though, since those are actually separate files. -- Gitee From aa8e2e5743d4a869f1f78af7ff16778d3e2a4d40 Mon Sep 17 00:00:00 2001 From: chenlinfeng <723609220@qq.com> Date: Tue, 18 Feb 2025 10:01:08 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=E6=B7=BB=E5=8A=A0writeback?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/storage/smgr/md.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index a496cb2..d08632f 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -703,7 +703,7 @@ inline static int KPHCPageCompressChunkSize(SMgrRelation reln) { return CHUNK_SIZE_LIST[KPHCGET_COMPRESS_CHUNK_SIZE((reln)->smgr_rnode.node.opt)]; } -void KPHCCfsWriteBack(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, BlockNumber nblocks, +static void KPHCCfsWriteBack(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, BlockNumber nblocks, KPHC_CFS_STORAGE_TYPE type) { unsigned int segnumStart; @@ -716,6 +716,7 @@ void KPHCCfsWriteBack(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknu int nchunks; int seekPosChunk; int lastChunk; + bool firstEnter; while (nblocks > 0) { KPHCExtentLocation location = cfsLocationConverts[type](reln, forknum, blocknum, true, KPHC_WRITE_BACK_OPEN_FILE); if (location.fd == -1) { @@ -742,7 +743,7 @@ void KPHCCfsWriteBack(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknu cfsExtentHeader = ctrl->pciPage; cfsExtentAddress = KPHCGetExtentAddress(cfsExtentHeader, (uint16)location.extentOffset); - bool firstEnter = true; + firstEnter = true; for (uint8 i = 0; i < cfsExtentAddress->nchunks; ++i) { if (firstEnter) { -- Gitee From 75810192f731378770ebb20b8208d9812d339e7a Mon Sep 17 00:00:00 2001 From: chenlinfeng <723609220@qq.com> Date: Tue, 18 Feb 2025 11:06:14 +0800 Subject: [PATCH 3/5] =?UTF-8?q?=E6=B7=BB=E5=8A=A0prefetch?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/storage/smgr/md.c | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index d08632f..935f4ae 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -672,7 +672,38 @@ mdclose(SMgrRelation reln, ForkNumber forknum) nopensegs--; } } +void KPHCCfsMdPrefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, bool skipSync, + KPHC_CFS_STORAGE_TYPE type) +{ + KPHCExtentLocation location = cfsLocationConverts[type](reln, forknum, logicBlockNumber, skipSync, KPHC_EXTENT_OPEN_FILE); + if (location.fd < 0) { + return; + } + KPHCPciPageCtrl *ctrl = KPHCPciBufReadPage(location, LW_SHARED, PCI_BUF_NORMAL_READ); + if (ctrl->loadStatus == CTRL_PAGE_LOADED_ERROR) { + KPHCPciBufFreePage(ctrl, location, false); + ereport(WARNING, (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Failed to KPHCCfsMdPrefetch %s, headerNum: %u.", FilePathName(location.fd), location.headerNum))); + return; + } + KPHCCfsExtentHeader *cfsExtentHeader = ctrl->pciPage; + KPHCCfsExtentAddress cfsExtentAddress = KPHCGetExtentAddress(cfsExtentHeader, (uint16)location.extentOffset); + int startOffset = location.extentStart * BLCKSZ; + int chunkSize = cfsExtentHeader->chunk_size; + + for (uint8 i = 0; i < cfsExtentAddress->nchunks; i++) { + off_t seekPos = startOffset + OffsetOfPageCompressChunk(chunkSize, cfsExtentAddress->chunknos[i]); + int range = 1; + while (i < cfsExtentAddress->nchunks - 1 && + cfsExtentAddress->chunknos[i + 1] == cfsExtentAddress->chunknos[i] + 1) { + i++; + range++; + } + (void)FilePrefetch(location.fd, seekPos, chunkSize * range, (uint32)WAIT_EVENT_DATA_FILE_PREFETCH); + } + KPHCPciBufFreePage(ctrl, location, false); +} /* * mdprefetch() -- Initiate asynchronous read of the specified block of a relation */ @@ -680,6 +711,10 @@ bool mdprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) { #ifdef USE_PREFETCH + if (KPHC_IS_COMPRESSED_MAINFORK(reln, forknum)) { + KPHCCfsMdPrefetch(reln, forknum, blocknum, false, COMMON_STORAGE); + return; + } off_t seekpos; MdfdVec *v; -- Gitee From 59154f6d28e10f04035029a90ffcebd41f2e0b27 Mon Sep 17 00:00:00 2001 From: chenlinfeng <723609220@qq.com> Date: Tue, 18 Feb 2025 11:35:28 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E6=B7=BB=E5=8A=A0prefetch?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/storage/smgr/md.c | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index 935f4ae..4128731 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -672,25 +672,30 @@ mdclose(SMgrRelation reln, ForkNumber forknum) nopensegs--; } } -void KPHCCfsMdPrefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, bool skipSync, +static KPHCCfsMdPrefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, bool skipSync, KPHC_CFS_STORAGE_TYPE type) { - KPHCExtentLocation location = cfsLocationConverts[type](reln, forknum, logicBlockNumber, skipSync, KPHC_EXTENT_OPEN_FILE); + KPHCExtentLocation location; + KPHCPciPageCtrl *ctrl; + KPHCCfsExtentHeader *cfsExtentHeader; + KPHCCfsExtentAddress* cfsExtentAddress; + int startOffset; + int chunkSize; + location = cfsLocationConverts[type](reln, forknum, logicBlockNumber, skipSync, KPHC_EXTENT_OPEN_FILE); if (location.fd < 0) { return; } - KPHCPciPageCtrl *ctrl = KPHCPciBufReadPage(location, LW_SHARED, PCI_BUF_NORMAL_READ); + ctrl = KPHCPciBufReadPage(location, LW_SHARED, PCI_BUF_NORMAL_READ); if (ctrl->loadStatus == CTRL_PAGE_LOADED_ERROR) { KPHCPciBufFreePage(ctrl, location, false); ereport(WARNING, (errcode(ERRCODE_DATA_CORRUPTED), errmsg("Failed to KPHCCfsMdPrefetch %s, headerNum: %u.", FilePathName(location.fd), location.headerNum))); return; } - KPHCCfsExtentHeader *cfsExtentHeader = ctrl->pciPage; - - KPHCCfsExtentAddress cfsExtentAddress = KPHCGetExtentAddress(cfsExtentHeader, (uint16)location.extentOffset); - int startOffset = location.extentStart * BLCKSZ; - int chunkSize = cfsExtentHeader->chunk_size; + cfsExtentHeader = ctrl->pciPage; + cfsExtentAddress = KPHCGetExtentAddress(cfsExtentHeader, (uint16)location.extentOffset); + startOffset = location.extentStart * BLCKSZ; + chunkSize = cfsExtentHeader->chunk_size; for (uint8 i = 0; i < cfsExtentAddress->nchunks; i++) { off_t seekPos = startOffset + OffsetOfPageCompressChunk(chunkSize, cfsExtentAddress->chunknos[i]); @@ -713,7 +718,7 @@ mdprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) #ifdef USE_PREFETCH if (KPHC_IS_COMPRESSED_MAINFORK(reln, forknum)) { KPHCCfsMdPrefetch(reln, forknum, blocknum, false, COMMON_STORAGE); - return; + return true; } off_t seekpos; MdfdVec *v; -- Gitee From b61455bc71f0296b1a5de45d488cd3aec7293e5b Mon Sep 17 00:00:00 2001 From: chenlinfeng <723609220@qq.com> Date: Tue, 25 Feb 2025 17:06:47 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E6=B7=BB=E5=8A=A0prefetch?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/storage/smgr/md.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index 4128731..bede81e 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -672,7 +672,7 @@ mdclose(SMgrRelation reln, ForkNumber forknum) nopensegs--; } } -static KPHCCfsMdPrefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, bool skipSync, +static void KPHCCfsMdPrefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, bool skipSync, KPHC_CFS_STORAGE_TYPE type) { KPHCExtentLocation location; @@ -716,13 +716,12 @@ bool mdprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) { #ifdef USE_PREFETCH + off_t seekpos; + MdfdVec *v; if (KPHC_IS_COMPRESSED_MAINFORK(reln, forknum)) { KPHCCfsMdPrefetch(reln, forknum, blocknum, false, COMMON_STORAGE); return true; } - off_t seekpos; - MdfdVec *v; - v = _mdfd_getseg(reln, forknum, blocknum, false, InRecovery ? EXTENSION_RETURN_NULL : EXTENSION_FAIL); if (v == NULL) -- Gitee