From b6e7b571e96f0f273bdecb85f124d61b10e53f04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A6=99=E8=95=89=E5=82=A8=E8=93=84=E6=89=80?= <727854256@qq.com> Date: Thu, 16 Jan 2025 16:16:11 +0800 Subject: [PATCH] =?UTF-8?q?[add][normal]1.=E6=B7=BB=E5=8A=A0WritePage?= =?UTF-8?q?=E5=87=BD=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/storage/page/page_compression.c | 104 +++++++++++++++++++- src/include/storage/page_compression.h | 40 ++++++++ 2 files changed, 143 insertions(+), 1 deletion(-) diff --git a/src/backend/storage/page/page_compression.c b/src/backend/storage/page/page_compression.c index 45e39ef..fdaf595 100644 --- a/src/backend/storage/page/page_compression.c +++ b/src/backend/storage/page/page_compression.c @@ -2,12 +2,21 @@ #include "storage/page_compression.h" #include "utils/rel.h" +#include "utils/wait_event.h" const uint32 INDEX_OF_HALF_BLCKSZ = 0; const uint32 INDEX_OF_QUARTER_BLCKSZ = 1; const uint32 INDEX_OF_EIGHTH_BLCKSZ = 2; const uint32 INDEX_OF_SIXTEENTHS_BLCKSZ = 3; +const int KPHC_CFS_EXTENT_SIZE = 129; +const int KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT = KPHC_CFS_EXTENT_SIZE - 1; +const int KPHC_CFS_EXTENT_COUNT_PER_FILE = RELSEG_SIZE / KPHC_CFS_EXTENT_SIZE; +const int KPHC_CFS_MAX_BLOCK_PER_FILE = KPHC_CFS_EXTENT_COUNT_PER_FILE * KPHC_CFS_EXTENT_SIZE; +const int KPHC_EXTENT_OPEN_FILE = 0; +const int KPHC_WRITE_BACK_OPEN_FILE = 1; +const int KPHC_EXTENT_CREATE_FILE = 2; + const KPHCCmpBitStuct gCmpBitStruct[] = {{KPHC_CMP_BYTE_CONVERT_LEN, 0x01, 15}, {KPHC_CMP_DIFF_CONVERT_LEN, 0x01, 14}, {KPHC_CMP_PRE_CHUNK_LEN, 0x07, 11}, @@ -75,4 +84,97 @@ void SetupPageCompressForRelation(RelFileNode *node, PageCompressOpts *compressO (int)compressOptions->compressDiffConvert, preallocChunks, (int)symbol, compressLevel, algorithm, chunkSize); } -} \ No newline at end of file +} + +inline size_t SizeOfExtentAddress(uint16 chunkSize) +{ + if (chunkSize == 0) { + return -1; + } + return offsetof(KPHCCfsExtentAddress, chunknos) + sizeof(uint16) * BLCKSZ / chunkSize; +} + +// 获取extent地址 +inline KPHCCfsExtentAddress *GetExtentAddress(KPHCCfsExtentHeader *header, uint16 blockOffset) +{ + uint16 chunkSize = header->chunk_size; + size_t headerOffset = offsetof(KPHCCfsExtentHeader, cfsExtentAddress); + size_t sizeOfExtentAddress = SizeOfExtentAddress(chunkSize); + return (KPHCCfsExtentAddress *)(((char *)header) + headerOffset + blockOffset * sizeOfExtentAddress); +} + +inline off_t OffsetOfPageCompressChunk(uint16 chunkSize, int chunkNo) { + return chunkSize * (chunkNo - 1); +} + +inline void KPHCAnalyzeCompressOptions(const RelFileNode node, KPHCRelFileCompressOption* opt) +{ + unsigned short compressOption = node.opt; + opt->compressChunkSize = compressOption & gCmpBitStruct[KPHC_CMP_CHUNK_SIZE_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_CHUNK_SIZE_INDEX].bitLen; + opt->compressAlgorithm = compressOption & gCmpBitStruct[KPHC_CMP_ALGORITHM_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_ALGORITHM_INDEX].bitLen; + opt->compressLevel = compressOption & gCmpBitStruct[KPHC_CMP_LEVEL_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_LEVEL_INDEX].bitLen; + opt->compressLevelSymbol = compressOption & gCmpBitStruct[KPHC_CMP_COMPERSS_LEVEL_SYMBOL_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_COMPERSS_LEVEL_SYMBOL_INDEX].bitLen; + opt->compressPreallocChunks = compressOption & gCmpBitStruct[KPHC_CMP_PRE_CHUNK_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_PRE_CHUNK_INDEX].bitLen; + opt->diffConvert = compressOption & gCmpBitStruct[KPHC_CMP_DIFF_CONVERT_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_DIFF_CONVERT_INDEX].bitLen; + opt->byteConvert = compressOption & gCmpBitStruct[KPHC_CMP_BYTE_CONVERT_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_BYTE_CONVERT_INDEX].bitLen; +} + +size_t KPHCCfsWritePage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, bool sipSync, KPHC_CFS_STORAGE_TYPE type) +{ + KPHCExtentLocation location = cfsLocationConverts[type](reln, forknum, logicBlockNumber, sipSync, KPHC_EXTENT_OPEN_FILE); + KPHCPciPageCtrl *ctrl = KPHCPciBufReadPage(location, LW_SHARED, PCI_BUF_NORMAL_READ); + KPHCCfsExtentHeader *cfsExtentHeader = ctrl->pciPage; + KPHCCfsExtentAddress *cfsExtentAddress = GetExtentAddress(cfsExtentAddress, (uint16)location.extentOffset); + + // 获取压缩参数 + KPHCRelFileCompressOption option; + KPHCAnalyzeCompressOptions(reln->SMgrRelation.node, &option); + uint16 chunkSize = cfsExtentHeader->chunk_size; + + // 执行页压缩 + uint8 nchunks; + char *compressedBuffer = KPHCCfsCompressPage(buffer, &option, &nchunks); + + // 设置地址 + uint8 needChunks = option.compressPreallocChunks > nchunks ? (uint8)option.compressPreallocChunks : (uint8)nchunks; + bool changed = ExtendChunksOfBlock(cfsExtentHeader, &location, needChunks, nchunks); + + off_t extentStartOffset = location.extentStart * BLCKSZ; + + // 写入每个chunk + for (uint8 i = 0; i < nchunks; ++i) { + uint8 bufferPos = compressedBuffer + (long)chunkSize * i; + off_t seekPos = OffsetOfPageCompressChunk(chunkSize, cfsExtentAddress->chunknos[i]); + if (cfsExtentAddress->chunknos[i] > ((BLCKSZ / chunkSize) * KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT)) { + *((uint32 *)nullptr) = 1; // ??? + } + uint8 start = i; + while (i < nchunks - 1 && cfsExtentAddress->chunknos[i + 1] == cfsExtentAddress->chunknos[i] + 1) { + i++; + } + int writeAmount = chunkSize * ((i - start) + 1); + if ((seekPos + extentStartOffset) > (((BlockNumber)RELSEG_SIZE) * BLCKSZ)) { + *((uint32 *)nullptr) = 1; // ??? + } + int nbytes = FilePWrite(location.fd, bufferPos, writeAmount, seekPos + extentStartOffset, (uint32)WAIT_EVENT_DATA_FILE_WRITE); + if (nbytes != writeAmount) { + if (compressedBuffer != nullptr && compressedBuffer != buffer) { + pfree(compressedBuffer); + } + KPHCPciBufFreePage(ctrl, location, false); + } + } + + if (compressedBuffer != nullptr && compressedBuffer != buffer) { + pfree(compressedBuffer); + } + KPHCPciBufFreePage(ctrl, location, false); + return BLCKSZ; +} diff --git a/src/include/storage/page_compression.h b/src/include/storage/page_compression.h index 8513504..893169b 100644 --- a/src/include/storage/page_compression.h +++ b/src/include/storage/page_compression.h @@ -21,6 +21,36 @@ #define KPHC_CMP_ALGORITHM_INDEX 5 #define KPHC_CMP_CHUNK_SIZE_INDEX 6 +typedef size_t KPHC_CFS_STORAGE_TYPE; + +typedef struct ExtentLocation { + int fd; + RelFileNode relFileNode; + BlockNumber extentNumber; + BlockNumber extentStart; + BlockNumber extentOffset; + BlockNumber headerNum; + uint16 chrunkSize; + uint8 algorithm; +}KPHCExtentLocation; + +typedef struct CfsExtentAddress { + uint32 checksum; + volatile uint8 nchunks; + volatile uint8 allocated_chunks; + uint16 chunknos[FLEXIBLE_ARRAY_MEMBER]; +}KPHCCfsExtentAddress; + +typedef struct CfsExtentHeader { + pg_atomic_uint32 nblocks; + pg_atomic_uint32 allocated_chunks; + uint16 chunk_size; + uint8 algorithm : 7; + uint8 recycleInOrder : 1; + uint8 recv; + KPHCCfsExtentAddress cfsExtentAddress[FLEXIBLE_ARRAY_MEMBER]; +}KPHCCfsExtentHeader; + typedef struct CmpBitStuct { const unsigned int bitLen; const unsigned int mask; @@ -66,10 +96,20 @@ extern const uint32 INDEX_OF_QUARTER_BLCKSZ; extern const uint32 INDEX_OF_EIGHTH_BLCKSZ; extern const uint32 INDEX_OF_SIXTEENTHS_BLCKSZ; +extern const int KPHC_CFS_EXTENT_SIZE; +extern const int KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT; +extern const int KPHC_CFS_EXTENT_COUNT_PER_FILE; +extern const int KPHC_CFS_MAX_BLOCK_PER_FILE; +extern const int KPHC_EXTENT_OPEN_FILE; +extern const int KPHC_WRITE_BACK_OPEN_FILE; +extern const int KPHC_EXTENT_CREATE_FILE; + extern const KPHCCmpBitStuct gCmpBitStruct[]; uint8 ConvertChunkSize(uint32 compressedChunkSize, bool *success); void SetupPageCompressForRelation(RelFileNode *node, PageCompressOpts *compress_options, const char *relationName); +size_t KPHCCfsWritePage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, bool sipSync, KPHC_CFS_STORAGE_TYPE type); + #endif // PAGE_COMPRESSION_H \ No newline at end of file -- Gitee