From b6e7b571e96f0f273bdecb85f124d61b10e53f04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A6=99=E8=95=89=E5=82=A8=E8=93=84=E6=89=80?= <727854256@qq.com> Date: Thu, 16 Jan 2025 16:16:11 +0800 Subject: [PATCH 01/11] =?UTF-8?q?[add][normal]1.=E6=B7=BB=E5=8A=A0WritePag?= =?UTF-8?q?e=E5=87=BD=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/storage/page/page_compression.c | 104 +++++++++++++++++++- src/include/storage/page_compression.h | 40 ++++++++ 2 files changed, 143 insertions(+), 1 deletion(-) diff --git a/src/backend/storage/page/page_compression.c b/src/backend/storage/page/page_compression.c index 45e39ef..fdaf595 100644 --- a/src/backend/storage/page/page_compression.c +++ b/src/backend/storage/page/page_compression.c @@ -2,12 +2,21 @@ #include "storage/page_compression.h" #include "utils/rel.h" +#include "utils/wait_event.h" const uint32 INDEX_OF_HALF_BLCKSZ = 0; const uint32 INDEX_OF_QUARTER_BLCKSZ = 1; const uint32 INDEX_OF_EIGHTH_BLCKSZ = 2; const uint32 INDEX_OF_SIXTEENTHS_BLCKSZ = 3; +const int KPHC_CFS_EXTENT_SIZE = 129; +const int KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT = KPHC_CFS_EXTENT_SIZE - 1; +const int KPHC_CFS_EXTENT_COUNT_PER_FILE = RELSEG_SIZE / KPHC_CFS_EXTENT_SIZE; +const int KPHC_CFS_MAX_BLOCK_PER_FILE = KPHC_CFS_EXTENT_COUNT_PER_FILE * KPHC_CFS_EXTENT_SIZE; +const int KPHC_EXTENT_OPEN_FILE = 0; +const int KPHC_WRITE_BACK_OPEN_FILE = 1; +const int KPHC_EXTENT_CREATE_FILE = 2; + const KPHCCmpBitStuct gCmpBitStruct[] = {{KPHC_CMP_BYTE_CONVERT_LEN, 0x01, 15}, {KPHC_CMP_DIFF_CONVERT_LEN, 0x01, 14}, {KPHC_CMP_PRE_CHUNK_LEN, 0x07, 11}, @@ -75,4 +84,97 @@ void SetupPageCompressForRelation(RelFileNode *node, PageCompressOpts *compressO (int)compressOptions->compressDiffConvert, preallocChunks, (int)symbol, compressLevel, algorithm, chunkSize); } -} \ No newline at end of file +} + +inline size_t SizeOfExtentAddress(uint16 chunkSize) +{ + if (chunkSize == 0) { + return -1; + } + return offsetof(KPHCCfsExtentAddress, chunknos) + sizeof(uint16) * BLCKSZ / chunkSize; +} + +// 获取extent地址 +inline KPHCCfsExtentAddress *GetExtentAddress(KPHCCfsExtentHeader *header, uint16 blockOffset) +{ + uint16 chunkSize = header->chunk_size; + size_t headerOffset = offsetof(KPHCCfsExtentHeader, cfsExtentAddress); + size_t sizeOfExtentAddress = SizeOfExtentAddress(chunkSize); + return (KPHCCfsExtentAddress *)(((char *)header) + headerOffset + blockOffset * sizeOfExtentAddress); +} + +inline off_t OffsetOfPageCompressChunk(uint16 chunkSize, int chunkNo) { + return chunkSize * (chunkNo - 1); +} + +inline void KPHCAnalyzeCompressOptions(const RelFileNode node, KPHCRelFileCompressOption* opt) +{ + unsigned short compressOption = node.opt; + opt->compressChunkSize = compressOption & gCmpBitStruct[KPHC_CMP_CHUNK_SIZE_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_CHUNK_SIZE_INDEX].bitLen; + opt->compressAlgorithm = compressOption & gCmpBitStruct[KPHC_CMP_ALGORITHM_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_ALGORITHM_INDEX].bitLen; + opt->compressLevel = compressOption & gCmpBitStruct[KPHC_CMP_LEVEL_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_LEVEL_INDEX].bitLen; + opt->compressLevelSymbol = compressOption & gCmpBitStruct[KPHC_CMP_COMPERSS_LEVEL_SYMBOL_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_COMPERSS_LEVEL_SYMBOL_INDEX].bitLen; + opt->compressPreallocChunks = compressOption & gCmpBitStruct[KPHC_CMP_PRE_CHUNK_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_PRE_CHUNK_INDEX].bitLen; + opt->diffConvert = compressOption & gCmpBitStruct[KPHC_CMP_DIFF_CONVERT_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_DIFF_CONVERT_INDEX].bitLen; + opt->byteConvert = compressOption & gCmpBitStruct[KPHC_CMP_BYTE_CONVERT_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_BYTE_CONVERT_INDEX].bitLen; +} + +size_t KPHCCfsWritePage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, bool sipSync, KPHC_CFS_STORAGE_TYPE type) +{ + KPHCExtentLocation location = cfsLocationConverts[type](reln, forknum, logicBlockNumber, sipSync, KPHC_EXTENT_OPEN_FILE); + KPHCPciPageCtrl *ctrl = KPHCPciBufReadPage(location, LW_SHARED, PCI_BUF_NORMAL_READ); + KPHCCfsExtentHeader *cfsExtentHeader = ctrl->pciPage; + KPHCCfsExtentAddress *cfsExtentAddress = GetExtentAddress(cfsExtentAddress, (uint16)location.extentOffset); + + // 获取压缩参数 + KPHCRelFileCompressOption option; + KPHCAnalyzeCompressOptions(reln->SMgrRelation.node, &option); + uint16 chunkSize = cfsExtentHeader->chunk_size; + + // 执行页压缩 + uint8 nchunks; + char *compressedBuffer = KPHCCfsCompressPage(buffer, &option, &nchunks); + + // 设置地址 + uint8 needChunks = option.compressPreallocChunks > nchunks ? (uint8)option.compressPreallocChunks : (uint8)nchunks; + bool changed = ExtendChunksOfBlock(cfsExtentHeader, &location, needChunks, nchunks); + + off_t extentStartOffset = location.extentStart * BLCKSZ; + + // 写入每个chunk + for (uint8 i = 0; i < nchunks; ++i) { + uint8 bufferPos = compressedBuffer + (long)chunkSize * i; + off_t seekPos = OffsetOfPageCompressChunk(chunkSize, cfsExtentAddress->chunknos[i]); + if (cfsExtentAddress->chunknos[i] > ((BLCKSZ / chunkSize) * KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT)) { + *((uint32 *)nullptr) = 1; // ??? + } + uint8 start = i; + while (i < nchunks - 1 && cfsExtentAddress->chunknos[i + 1] == cfsExtentAddress->chunknos[i] + 1) { + i++; + } + int writeAmount = chunkSize * ((i - start) + 1); + if ((seekPos + extentStartOffset) > (((BlockNumber)RELSEG_SIZE) * BLCKSZ)) { + *((uint32 *)nullptr) = 1; // ??? + } + int nbytes = FilePWrite(location.fd, bufferPos, writeAmount, seekPos + extentStartOffset, (uint32)WAIT_EVENT_DATA_FILE_WRITE); + if (nbytes != writeAmount) { + if (compressedBuffer != nullptr && compressedBuffer != buffer) { + pfree(compressedBuffer); + } + KPHCPciBufFreePage(ctrl, location, false); + } + } + + if (compressedBuffer != nullptr && compressedBuffer != buffer) { + pfree(compressedBuffer); + } + KPHCPciBufFreePage(ctrl, location, false); + return BLCKSZ; +} diff --git a/src/include/storage/page_compression.h b/src/include/storage/page_compression.h index 8513504..893169b 100644 --- a/src/include/storage/page_compression.h +++ b/src/include/storage/page_compression.h @@ -21,6 +21,36 @@ #define KPHC_CMP_ALGORITHM_INDEX 5 #define KPHC_CMP_CHUNK_SIZE_INDEX 6 +typedef size_t KPHC_CFS_STORAGE_TYPE; + +typedef struct ExtentLocation { + int fd; + RelFileNode relFileNode; + BlockNumber extentNumber; + BlockNumber extentStart; + BlockNumber extentOffset; + BlockNumber headerNum; + uint16 chrunkSize; + uint8 algorithm; +}KPHCExtentLocation; + +typedef struct CfsExtentAddress { + uint32 checksum; + volatile uint8 nchunks; + volatile uint8 allocated_chunks; + uint16 chunknos[FLEXIBLE_ARRAY_MEMBER]; +}KPHCCfsExtentAddress; + +typedef struct CfsExtentHeader { + pg_atomic_uint32 nblocks; + pg_atomic_uint32 allocated_chunks; + uint16 chunk_size; + uint8 algorithm : 7; + uint8 recycleInOrder : 1; + uint8 recv; + KPHCCfsExtentAddress cfsExtentAddress[FLEXIBLE_ARRAY_MEMBER]; +}KPHCCfsExtentHeader; + typedef struct CmpBitStuct { const unsigned int bitLen; const unsigned int mask; @@ -66,10 +96,20 @@ extern const uint32 INDEX_OF_QUARTER_BLCKSZ; extern const uint32 INDEX_OF_EIGHTH_BLCKSZ; extern const uint32 INDEX_OF_SIXTEENTHS_BLCKSZ; +extern const int KPHC_CFS_EXTENT_SIZE; +extern const int KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT; +extern const int KPHC_CFS_EXTENT_COUNT_PER_FILE; +extern const int KPHC_CFS_MAX_BLOCK_PER_FILE; +extern const int KPHC_EXTENT_OPEN_FILE; +extern const int KPHC_WRITE_BACK_OPEN_FILE; +extern const int KPHC_EXTENT_CREATE_FILE; + extern const KPHCCmpBitStuct gCmpBitStruct[]; uint8 ConvertChunkSize(uint32 compressedChunkSize, bool *success); void SetupPageCompressForRelation(RelFileNode *node, PageCompressOpts *compress_options, const char *relationName); +size_t KPHCCfsWritePage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, bool sipSync, KPHC_CFS_STORAGE_TYPE type); + #endif // PAGE_COMPRESSION_H \ No newline at end of file -- Gitee From 7a11bffbeb00bf7b7ca860e73aea130fb2e7ebd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A6=99=E8=95=89=E5=82=A8=E8=93=84=E6=89=80?= <727854256@qq.com> Date: Thu, 16 Jan 2025 17:07:40 +0800 Subject: [PATCH 02/11] =?UTF-8?q?[add][normal]1.=E8=A1=A5=E5=85=85WritePag?= =?UTF-8?q?e=E4=B8=AD=E8=B0=83=E7=94=A8=E7=9A=84=E5=87=BD=E6=95=B0?= =?UTF-8?q?=E7=9A=84=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/storage/page/page_compression.c | 73 +++++++++++++++++++-- src/include/storage/page_compression.h | 1 + 2 files changed, 70 insertions(+), 4 deletions(-) diff --git a/src/backend/storage/page/page_compression.c b/src/backend/storage/page/page_compression.c index fdaf595..6fba924 100644 --- a/src/backend/storage/page/page_compression.c +++ b/src/backend/storage/page/page_compression.c @@ -3,6 +3,7 @@ #include "storage/page_compression.h" #include "utils/rel.h" #include "utils/wait_event.h" +#include "storage/fd.h" const uint32 INDEX_OF_HALF_BLCKSZ = 0; const uint32 INDEX_OF_QUARTER_BLCKSZ = 1; @@ -94,6 +95,16 @@ inline size_t SizeOfExtentAddress(uint16 chunkSize) return offsetof(KPHCCfsExtentAddress, chunknos) + sizeof(uint16) * BLCKSZ / chunkSize; } +inline off_t OffsetOfPageCompressChunk(uint16 chunkSize, int chunkNo) +{ + return chunkSize * (chunkNo - 1); +} + +inline size_t SizeOfExtentAddressByChunks(uint8 nChunks) +{ + return offsetof(KPHCCfsExtentAddress, chunknos) + sizeof(uint16) * nChunks; +} + // 获取extent地址 inline KPHCCfsExtentAddress *GetExtentAddress(KPHCCfsExtentHeader *header, uint16 blockOffset) { @@ -103,10 +114,6 @@ inline KPHCCfsExtentAddress *GetExtentAddress(KPHCCfsExtentHeader *header, uint1 return (KPHCCfsExtentAddress *)(((char *)header) + headerOffset + blockOffset * sizeOfExtentAddress); } -inline off_t OffsetOfPageCompressChunk(uint16 chunkSize, int chunkNo) { - return chunkSize * (chunkNo - 1); -} - inline void KPHCAnalyzeCompressOptions(const RelFileNode node, KPHCRelFileCompressOption* opt) { unsigned short compressOption = node.opt; @@ -126,6 +133,64 @@ inline void KPHCAnalyzeCompressOptions(const RelFileNode node, KPHCRelFileCompre compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_BYTE_CONVERT_INDEX].bitLen; } +uint32 AddrChecksum32(const KPHCCfsExtentAddress *cfsExtentAddress, const int needChunks) +{ + const size_t uintLen = sizeof(uint32); + uint32 checkSum = 0; + char *addr = ((char *) cfsExtentAddress) + sizeof(uint32); + size_t len = SizeOfExtentAddressByChunks((uint8)needChunks) - sizeof(uint32); + do { + if (len >= uintLen) { + checkSum += *((uint32*)(void*)addr); + addr += uintLen; + len -= uintLen; + } else { + char finalNum[uintLen]; + memset(finalNum, 0, sizeof(finalNum)); + size_t i = 0; + for(; i < len; ++i) { + finalNum[i] = addr[i]; + } + checkSum += *((uint32 *)finalNum); + len -= i; + } + } while (len); + return checkSum; +} + +static inline bool ExtendChunksOfBlockCore(KPHCCfsExtentHeader *cfsExtentHeader, KPHCCfsExtentAddress *cfsExtentAddress, + uint8 needChunks, uint8 actualUse) +{ + bool res = false; + if (cfsExtentAddress->allocated_chunks < needChunks) { + cfsExtentHeader->recycleInOrder = 0; + uint8 allocateNumber = needChunks - cfsExtentAddress->allocated_chunks; + uint32 chunkno = (pc_chunk_number_t)pg_atomic_fetch_add_u32(&cfsExtentHeader->allocated_chunks, (uint32)allocateNumber) + 1; + for (int i = cfsExtentAddress->allocated_chunks; i < needChunks; ++i, ++chunkno) { + cfsExtentAddress->chunknos[i] = (uint16)chunkno; + } + cfsExtentAddress->allocated_chunks = needChunks; + res = true; + } + if (cfsExtentAddress->nchunks != actualUse) { + cfsExtentAddress->nchunks = actualUse; + res = true; + } + uint32 cksm = AddrChecksum32(cfsExtentAddress, cfsExtentAddress->allocated_chunks); + if (cfsExtentAddress->checksum != cksm) { + cfsExtentAddress->checksum = cksm + res = true; + } + return res; +} + +static inline bool ExtendChunksOfBlock(KPHCCfsExtentHeader *cfsExtentHeader, KPHCExtentLocation *location, + uint8 needChunks, uint8 actualUse) +{ + KPHCCfsExtentAddress *cfsExtentAddress = GetExtentAddress(cfsExtentHeader, (uint16)location->extentOffset); + return ExtendChunksOfBlockCore(cfsExtentHeader, cfsExtentAddress, needChunks, actualUse); +} + size_t KPHCCfsWritePage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, bool sipSync, KPHC_CFS_STORAGE_TYPE type) { KPHCExtentLocation location = cfsLocationConverts[type](reln, forknum, logicBlockNumber, sipSync, KPHC_EXTENT_OPEN_FILE); diff --git a/src/include/storage/page_compression.h b/src/include/storage/page_compression.h index 893169b..1932436 100644 --- a/src/include/storage/page_compression.h +++ b/src/include/storage/page_compression.h @@ -22,6 +22,7 @@ #define KPHC_CMP_CHUNK_SIZE_INDEX 6 typedef size_t KPHC_CFS_STORAGE_TYPE; +typedef uint32 pc_chunk_number_t; typedef struct ExtentLocation { int fd; -- Gitee From ec352ac0dadfbdd45329cc50a3c7189441aa312a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A6=99=E8=95=89=E5=82=A8=E8=93=84=E6=89=80?= <727854256@qq.com> Date: Thu, 16 Jan 2025 19:34:33 +0800 Subject: [PATCH 03/11] =?UTF-8?q?[add|chg][normal]1.=E6=B7=BB=E5=8A=A0Writ?= =?UTF-8?q?ePage=E4=B8=AD=E5=8E=8B=E7=BC=A9=E7=9B=B8=E5=85=B3=E5=87=BD?= =?UTF-8?q?=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/storage/page/page_compression.c | 63 ++++++++++++++++++++- src/include/storage/page_compression.h | 4 ++ 2 files changed, 66 insertions(+), 1 deletion(-) diff --git a/src/backend/storage/page/page_compression.c b/src/backend/storage/page/page_compression.c index 6fba924..905a3f5 100644 --- a/src/backend/storage/page/page_compression.c +++ b/src/backend/storage/page/page_compression.c @@ -1,9 +1,13 @@ -#include "postgres.h" +// #define USE_ZSTD +#include +#include "postgres.h" #include "storage/page_compression.h" #include "utils/rel.h" #include "utils/wait_event.h" #include "storage/fd.h" +#include "storage/bufpage.h" + const uint32 INDEX_OF_HALF_BLCKSZ = 0; const uint32 INDEX_OF_QUARTER_BLCKSZ = 1; @@ -26,6 +30,8 @@ const KPHCCmpBitStuct gCmpBitStruct[] = {{KPHC_CMP_BYTE_CONVERT_LEN, 0x01, 15}, {KPHC_CMP_ALGORITHM_LEN, 0x07, 2}, {KPHC_CMP_CHUNK_SIZE_LEN, 0x03, 0}}; +const uint32 CHUNK_SIZE_LIST[4] = {BLCKSZ / 2, BLCKSZ / 4, BLCKSZ / 8, BLCKSZ / 16}; + uint8 ConvertChunkSize(uint32 compressedChunkSize, bool *success) { uint8 chunkSize = INDEX_OF_HALF_BLCKSZ; @@ -50,6 +56,26 @@ uint8 ConvertChunkSize(uint32 compressedChunkSize, bool *success) return chunkSize; } +inline size_t CompressReservedLen(const char* page) +{ + size_t length = offsetof(HeapPageCompressData, page_header) - offsetof(HeapPageCompressData, data); + return GetPageHeaderSize(page) + length; // 宏不完整 +} + +int CompressPageBufferBound(const char* page, uint8 algorithm) +{ + switch (algorithm) { + case COMPRESS_ALGORITHM_PGLZ: + return BLCKSZ * 2; + case COMPRESS_ALGORITHM_ZSTD: + return ZSTD_compressBound(BLCKSZ - CompressReservedLen(page)); + case COMPRESS_ALGORITHM_PGZSTD: + return BLCKSZ + 4; + default: + return -1; + } +} + void SetupPageCompressForRelation(RelFileNode *node, PageCompressOpts *compressOptions, const char *relationName) { uint32 algorithm = (uint32)compressOptions->compressType; @@ -191,6 +217,41 @@ static inline bool ExtendChunksOfBlock(KPHCCfsExtentHeader *cfsExtentHeader, KPH return ExtendChunksOfBlockCore(cfsExtentHeader, cfsExtentAddress, needChunks, actualUse); } +char *KPHCCfsCompressPage(const char *buffer, KPHCRelFileCompressOption *option, uint8 *nchunks) +{ + uint8 algorithm = option->compressAlgorithm; + uint32 chunkSize = CHUNK_SIZE_LIST[option->compressChunkSize]; + int32 workBufferSize = CompressPageBufferBound(buffer, algorithm); + int8 level = option->compressLevelSymbol ? option->compressLevel : -option->compressLevel; + uint8 preallocChunk = option->compressPreallocChunks; + + if (workBufferSize < 0) { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("mdwrite algorithm:%d, chunkSize:%d, level:%d, preallocChunk:%d", + (int)algorithm, (int)chunkSize, level, (int)preallocChunk))); + } + + char *workBuffer = (char*)palloc((unsigned long)workBufferSize); + int compressBufferSize = CompressPage(buffer, workBuffer, workBufferSize, *option); + if (compressBufferSize < 0) { + compressBufferSize = BLCKSZ; + } + *nchunks = (uint8)((unsigned int)(compressBufferSize - 1) / chunkSize + 1); + uint32 bufferSize = chunkSize * (*nchunks); + if (bufferSize >= BLCKSZ) { + pfree(workBuffer); + workBuffer = (char*)buffer; + (*nchunks) = (uint8)(BLCKSZ / chunkSize); + } else { + if ((uint32)compressBufferSize < bufferSize) { + uint32 leftSize = bufferSize - (uint32)compressBufferSize; + errno_t rc = memset_s(workBuffer + compressBufferSize, leftSize, 0, leftSize); + } + } + + return workBuffer; +} + size_t KPHCCfsWritePage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, bool sipSync, KPHC_CFS_STORAGE_TYPE type) { KPHCExtentLocation location = cfsLocationConverts[type](reln, forknum, logicBlockNumber, sipSync, KPHC_EXTENT_OPEN_FILE); diff --git a/src/include/storage/page_compression.h b/src/include/storage/page_compression.h index 1932436..b4795bf 100644 --- a/src/include/storage/page_compression.h +++ b/src/include/storage/page_compression.h @@ -21,6 +21,10 @@ #define KPHC_CMP_ALGORITHM_INDEX 5 #define KPHC_CMP_CHUNK_SIZE_INDEX 6 +#define COMPRESS_ALGORITHM_PGLZ 1 +#define COMPRESS_ALGORITHM_ZSTD 2 +#define COMPRESS_ALGORITHM_PGZSTD 3 + typedef size_t KPHC_CFS_STORAGE_TYPE; typedef uint32 pc_chunk_number_t; -- Gitee From c6a90192dc531cb8e1fb4ea1791b3fd331754162 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A6=99=E8=95=89=E5=82=A8=E8=93=84=E6=89=80?= <727854256@qq.com> Date: Thu, 16 Jan 2025 20:21:57 +0800 Subject: [PATCH 04/11] =?UTF-8?q?[del][normal]1.=E5=88=A0=E9=99=A4?= =?UTF-8?q?=E9=83=A8=E5=88=86=E9=87=8D=E5=A4=8D=E5=AE=9A=E4=B9=89=E7=9A=84?= =?UTF-8?q?=E7=BB=93=E6=9E=84=E4=BD=93=E3=80=81=E5=87=BD=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/storage/page/page_compression.c | 19 -------------- src/include/storage/page_compression.h | 29 --------------------- 2 files changed, 48 deletions(-) diff --git a/src/backend/storage/page/page_compression.c b/src/backend/storage/page/page_compression.c index 905a3f5..bc026df 100644 --- a/src/backend/storage/page/page_compression.c +++ b/src/backend/storage/page/page_compression.c @@ -140,25 +140,6 @@ inline KPHCCfsExtentAddress *GetExtentAddress(KPHCCfsExtentHeader *header, uint1 return (KPHCCfsExtentAddress *)(((char *)header) + headerOffset + blockOffset * sizeOfExtentAddress); } -inline void KPHCAnalyzeCompressOptions(const RelFileNode node, KPHCRelFileCompressOption* opt) -{ - unsigned short compressOption = node.opt; - opt->compressChunkSize = compressOption & gCmpBitStruct[KPHC_CMP_CHUNK_SIZE_INDEX].mask; - compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_CHUNK_SIZE_INDEX].bitLen; - opt->compressAlgorithm = compressOption & gCmpBitStruct[KPHC_CMP_ALGORITHM_INDEX].mask; - compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_ALGORITHM_INDEX].bitLen; - opt->compressLevel = compressOption & gCmpBitStruct[KPHC_CMP_LEVEL_INDEX].mask; - compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_LEVEL_INDEX].bitLen; - opt->compressLevelSymbol = compressOption & gCmpBitStruct[KPHC_CMP_COMPERSS_LEVEL_SYMBOL_INDEX].mask; - compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_COMPERSS_LEVEL_SYMBOL_INDEX].bitLen; - opt->compressPreallocChunks = compressOption & gCmpBitStruct[KPHC_CMP_PRE_CHUNK_INDEX].mask; - compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_PRE_CHUNK_INDEX].bitLen; - opt->diffConvert = compressOption & gCmpBitStruct[KPHC_CMP_DIFF_CONVERT_INDEX].mask; - compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_DIFF_CONVERT_INDEX].bitLen; - opt->byteConvert = compressOption & gCmpBitStruct[KPHC_CMP_BYTE_CONVERT_INDEX].mask; - compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_BYTE_CONVERT_INDEX].bitLen; -} - uint32 AddrChecksum32(const KPHCCfsExtentAddress *cfsExtentAddress, const int needChunks) { const size_t uintLen = sizeof(uint32); diff --git a/src/include/storage/page_compression.h b/src/include/storage/page_compression.h index b4795bf..b9cd905 100644 --- a/src/include/storage/page_compression.h +++ b/src/include/storage/page_compression.h @@ -25,37 +25,8 @@ #define COMPRESS_ALGORITHM_ZSTD 2 #define COMPRESS_ALGORITHM_PGZSTD 3 -typedef size_t KPHC_CFS_STORAGE_TYPE; typedef uint32 pc_chunk_number_t; -typedef struct ExtentLocation { - int fd; - RelFileNode relFileNode; - BlockNumber extentNumber; - BlockNumber extentStart; - BlockNumber extentOffset; - BlockNumber headerNum; - uint16 chrunkSize; - uint8 algorithm; -}KPHCExtentLocation; - -typedef struct CfsExtentAddress { - uint32 checksum; - volatile uint8 nchunks; - volatile uint8 allocated_chunks; - uint16 chunknos[FLEXIBLE_ARRAY_MEMBER]; -}KPHCCfsExtentAddress; - -typedef struct CfsExtentHeader { - pg_atomic_uint32 nblocks; - pg_atomic_uint32 allocated_chunks; - uint16 chunk_size; - uint8 algorithm : 7; - uint8 recycleInOrder : 1; - uint8 recv; - KPHCCfsExtentAddress cfsExtentAddress[FLEXIBLE_ARRAY_MEMBER]; -}KPHCCfsExtentHeader; - typedef struct CmpBitStuct { const unsigned int bitLen; const unsigned int mask; -- Gitee From 32b0af78bc98a60877ed67e1049840226b6830f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A6=99=E8=95=89=E5=82=A8=E8=93=84=E6=89=80?= <727854256@qq.com> Date: Fri, 17 Jan 2025 16:54:59 +0800 Subject: [PATCH 05/11] =?UTF-8?q?[dbg|add][normal]1.=E8=A7=A3=E5=86=B3?= =?UTF-8?q?=E6=98=8E=E6=98=BE=E7=9A=84=E7=BC=96=E8=AF=91=E9=97=AE=E9=A2=98?= =?UTF-8?q?=EF=BC=8C=E4=BB=8D=E5=AD=98=E5=9C=A8=E9=93=BE=E6=8E=A5=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 2 + src/backend/storage/page/page_compression.c | 135 ++++++++++++++---- src/backend/storage/smgr/md.c | 54 +------ src/common/pg_lzcompress.c | 147 ++++++++++++++++++++ src/include/c.h | 2 + src/include/common/pg_lzcompress.h | 12 ++ src/include/storage/bufpage.h | 30 ++++ src/include/storage/cfs_buffers.h | 3 + src/include/storage/page_compression.h | 53 ++++++- 9 files changed, 362 insertions(+), 76 deletions(-) diff --git a/.gitignore b/.gitignore index 1c0f3e5..fa2aaa1 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,5 @@ lib*.pc /Release/ /tmp_install/ /portlock/ +.vscode/settings.json +.vscode/Sftp.json diff --git a/src/backend/storage/page/page_compression.c b/src/backend/storage/page/page_compression.c index bc026df..52fad98 100644 --- a/src/backend/storage/page/page_compression.c +++ b/src/backend/storage/page/page_compression.c @@ -1,23 +1,22 @@ // #define USE_ZSTD -#include +// #include #include "postgres.h" +#include "storage/fd.h" +#include "storage/bufpage.h" +#include "storage/cfs_buffers.h" #include "storage/page_compression.h" #include "utils/rel.h" #include "utils/wait_event.h" -#include "storage/fd.h" -#include "storage/bufpage.h" +#include "common/pg_lzcompress.h" +uint32 AddrChecksum32(const KPHCCfsExtentAddress *cfsExtentAddress, const int needChunks); const uint32 INDEX_OF_HALF_BLCKSZ = 0; const uint32 INDEX_OF_QUARTER_BLCKSZ = 1; const uint32 INDEX_OF_EIGHTH_BLCKSZ = 2; const uint32 INDEX_OF_SIXTEENTHS_BLCKSZ = 3; -const int KPHC_CFS_EXTENT_SIZE = 129; -const int KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT = KPHC_CFS_EXTENT_SIZE - 1; -const int KPHC_CFS_EXTENT_COUNT_PER_FILE = RELSEG_SIZE / KPHC_CFS_EXTENT_SIZE; -const int KPHC_CFS_MAX_BLOCK_PER_FILE = KPHC_CFS_EXTENT_COUNT_PER_FILE * KPHC_CFS_EXTENT_SIZE; const int KPHC_EXTENT_OPEN_FILE = 0; const int KPHC_WRITE_BACK_OPEN_FILE = 1; const int KPHC_EXTENT_CREATE_FILE = 2; @@ -58,11 +57,11 @@ uint8 ConvertChunkSize(uint32 compressedChunkSize, bool *success) inline size_t CompressReservedLen(const char* page) { - size_t length = offsetof(HeapPageCompressData, page_header) - offsetof(HeapPageCompressData, data); + size_t length = offsetof(KPHCHeapPageCompressData, page_header) - offsetof(KPHCHeapPageCompressData, data); return GetPageHeaderSize(page) + length; // 宏不完整 } -int CompressPageBufferBound(const char* page, uint8 algorithm) +inline int CompressPageBufferBound(const char* page, uint8 algorithm) { switch (algorithm) { case COMPRESS_ALGORITHM_PGLZ: @@ -142,6 +141,7 @@ inline KPHCCfsExtentAddress *GetExtentAddress(KPHCCfsExtentHeader *header, uint1 uint32 AddrChecksum32(const KPHCCfsExtentAddress *cfsExtentAddress, const int needChunks) { + size_t i; const size_t uintLen = sizeof(uint32); uint32 checkSum = 0; char *addr = ((char *) cfsExtentAddress) + sizeof(uint32); @@ -152,9 +152,9 @@ uint32 AddrChecksum32(const KPHCCfsExtentAddress *cfsExtentAddress, const int ne addr += uintLen; len -= uintLen; } else { - char finalNum[uintLen]; - memset(finalNum, 0, sizeof(finalNum)); - size_t i = 0; + char *finalNum = (char*)malloc(uintLen * sizeof(char)); + memset(finalNum, 0, uintLen * sizeof(char)); + i = 0; for(; i < len; ++i) { finalNum[i] = addr[i]; } @@ -168,11 +168,16 @@ uint32 AddrChecksum32(const KPHCCfsExtentAddress *cfsExtentAddress, const int ne static inline bool ExtendChunksOfBlockCore(KPHCCfsExtentHeader *cfsExtentHeader, KPHCCfsExtentAddress *cfsExtentAddress, uint8 needChunks, uint8 actualUse) { - bool res = false; + bool res; + uint8 allocateNumber; + uint32 chunkno; + uint32 cksm; + + res = false; if (cfsExtentAddress->allocated_chunks < needChunks) { cfsExtentHeader->recycleInOrder = 0; - uint8 allocateNumber = needChunks - cfsExtentAddress->allocated_chunks; - uint32 chunkno = (pc_chunk_number_t)pg_atomic_fetch_add_u32(&cfsExtentHeader->allocated_chunks, (uint32)allocateNumber) + 1; + allocateNumber = needChunks - cfsExtentAddress->allocated_chunks; + chunkno = (pc_chunk_number_t)pg_atomic_fetch_add_u32(&cfsExtentHeader->allocated_chunks, (uint32)allocateNumber) + 1; for (int i = cfsExtentAddress->allocated_chunks; i < needChunks; ++i, ++chunkno) { cfsExtentAddress->chunknos[i] = (uint16)chunkno; } @@ -183,9 +188,9 @@ static inline bool ExtendChunksOfBlockCore(KPHCCfsExtentHeader *cfsExtentHeader, cfsExtentAddress->nchunks = actualUse; res = true; } - uint32 cksm = AddrChecksum32(cfsExtentAddress, cfsExtentAddress->allocated_chunks); + cksm = AddrChecksum32(cfsExtentAddress, cfsExtentAddress->allocated_chunks); if (cfsExtentAddress->checksum != cksm) { - cfsExtentAddress->checksum = cksm + cfsExtentAddress->checksum = cksm; res = true; } return res; @@ -198,6 +203,63 @@ static inline bool ExtendChunksOfBlock(KPHCCfsExtentHeader *cfsExtentHeader, KPH return ExtendChunksOfBlockCore(cfsExtentHeader, cfsExtentAddress, needChunks, actualUse); } +int KPHCCompressPage(const char* src, char* dst, int dst_size, KPHCRelFileCompressOption option) +{ + int compressed_size; + int8 level = option.compressLevelSymbol ? option.compressLevel : -option.compressLevel; + size_t sizeOfHeaderData = SizeOfPageHeaderData; + bool real_ByteConvert = false; + bool success; + + char* data = ((KPHCPageCompressData*)dst)->data; + PageHeaderData *page = (PageHeaderData *)src; + bool heapPageData = page->pd_special == BLCKSZ; + switch (option.compressAlgorithm) { + case COMPRESS_ALGORITHM_PGLZ: { + success = KPHCLzCompress(src + sizeOfHeaderData, BLCKSZ - sizeOfHeaderData, (KPHC_PGLZ_Header *)data, heapPageData ? PGLZ_strategy_default : PGLZ_strategy_always); + compressed_size = success ? VARSIZE(data) : BLCKSZ; + compressed_size = compressed_size < BLCKSZ ? compressed_size : BLCKSZ; + break; + } + case COMPRESS_ALGORITHM_ZSTD: { + if (level == 0 || level < MIN_ZSTD_COMPRESSION_LEVEL || level > MAX_ZSTD_COMPRESSION_LEVEL) { + level = DEFAULT_ZSTD_COMPRESSION_LEVEL; + } +#ifndef FRONTEND + compressed_size = ZSTD_compress(data, dst_size, src + sizeOfHeaderData, BLCKSZ - sizeOfHeaderData, level); +#else + if (real_ByteConvert) { + compressed_size = + ZSTD_compress(data, dst_size, src_copy + sizeOfHeaderData, BLCKSZ - sizeOfHeaderData, level); + } else { + compressed_size = + ZSTD_compress(data, dst_size, src + sizeOfHeaderData, BLCKSZ - sizeOfHeaderData, level); + } +#endif + if (ZSTD_isError(compressed_size)) { + return -1; + } + break; + } + default: + return KPHC_COMPRESS_UNSUPPORTED_ERROR; + } + + if ((compressed_size < 0) || ((offsetof(KPHCPageCompressData, data) + compressed_size) >= BLCKSZ)) { + return -1; + } + + KPHCPageCompressData* pcdptr = ((KPHCPageCompressData*)dst); + memcpy_s(pcdptr->page_header, sizeOfHeaderData, src, sizeOfHeaderData); + pcdptr->size = compressed_size; + pcdptr->crc32 = DataBlockChecksum(data, compressed_size, true); + pcdptr->byte_convert = real_ByteConvert; + pcdptr->diff_convert = option.diffConvert; + pcdptr->algorithm = option.compressAlgorithm; + + return offsetof(KPHCPageCompressData, data) + compressed_size; +} + char *KPHCCfsCompressPage(const char *buffer, KPHCRelFileCompressOption *option, uint8 *nchunks) { uint8 algorithm = option->compressAlgorithm; @@ -205,6 +267,8 @@ char *KPHCCfsCompressPage(const char *buffer, KPHCRelFileCompressOption *option, int32 workBufferSize = CompressPageBufferBound(buffer, algorithm); int8 level = option->compressLevelSymbol ? option->compressLevel : -option->compressLevel; uint8 preallocChunk = option->compressPreallocChunks; + char *workBuffer; + int compressBufferSize; if (workBufferSize < 0) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -212,8 +276,8 @@ char *KPHCCfsCompressPage(const char *buffer, KPHCRelFileCompressOption *option, (int)algorithm, (int)chunkSize, level, (int)preallocChunk))); } - char *workBuffer = (char*)palloc((unsigned long)workBufferSize); - int compressBufferSize = CompressPage(buffer, workBuffer, workBufferSize, *option); + workBuffer = (char*)palloc((unsigned long)workBufferSize); + compressBufferSize = KPHCCompressPage(buffer, workBuffer, workBufferSize, *option); if (compressBufferSize < 0) { compressBufferSize = BLCKSZ; } @@ -226,13 +290,34 @@ char *KPHCCfsCompressPage(const char *buffer, KPHCRelFileCompressOption *option, } else { if ((uint32)compressBufferSize < bufferSize) { uint32 leftSize = bufferSize - (uint32)compressBufferSize; - errno_t rc = memset_s(workBuffer + compressBufferSize, leftSize, 0, leftSize); + memset_s(workBuffer + compressBufferSize, leftSize, 0, leftSize); } } return workBuffer; } +inline void KPHCAnalyzeCompressOptions(const RelFileNode node, KPHCRelFileCompressOption* opt) +{ + unsigned short compressOption = node.opt; + opt->compressChunkSize = compressOption & gCmpBitStruct[KPHC_CMP_CHUNK_SIZE_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_CHUNK_SIZE_INDEX].bitLen; + opt->compressAlgorithm = compressOption & gCmpBitStruct[KPHC_CMP_ALGORITHM_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_ALGORITHM_INDEX].bitLen; + opt->compressLevel = compressOption & gCmpBitStruct[KPHC_CMP_LEVEL_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_LEVEL_INDEX].bitLen; + opt->compressLevelSymbol = compressOption & gCmpBitStruct[KPHC_CMP_COMPERSS_LEVEL_SYMBOL_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_COMPERSS_LEVEL_SYMBOL_INDEX].bitLen; + opt->compressPreallocChunks = compressOption & gCmpBitStruct[KPHC_CMP_PRE_CHUNK_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_PRE_CHUNK_INDEX].bitLen; + opt->diffConvert = compressOption & gCmpBitStruct[KPHC_CMP_DIFF_CONVERT_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_DIFF_CONVERT_INDEX].bitLen; + opt->byteConvert = compressOption & gCmpBitStruct[KPHC_CMP_BYTE_CONVERT_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_BYTE_CONVERT_INDEX].bitLen; +} + +extern KPHCCfsLocationConvert cfsLocationConverts[]; + size_t KPHCCfsWritePage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, bool sipSync, KPHC_CFS_STORAGE_TYPE type) { KPHCExtentLocation location = cfsLocationConverts[type](reln, forknum, logicBlockNumber, sipSync, KPHC_EXTENT_OPEN_FILE); @@ -242,7 +327,7 @@ size_t KPHCCfsWritePage(SMgrRelation reln, ForkNumber forknum, BlockNumber logic // 获取压缩参数 KPHCRelFileCompressOption option; - KPHCAnalyzeCompressOptions(reln->SMgrRelation.node, &option); + KPHCAnalyzeCompressOptions(reln->smgr_rnode.node, &option); uint16 chunkSize = cfsExtentHeader->chunk_size; // 执行页压缩 @@ -260,7 +345,7 @@ size_t KPHCCfsWritePage(SMgrRelation reln, ForkNumber forknum, BlockNumber logic uint8 bufferPos = compressedBuffer + (long)chunkSize * i; off_t seekPos = OffsetOfPageCompressChunk(chunkSize, cfsExtentAddress->chunknos[i]); if (cfsExtentAddress->chunknos[i] > ((BLCKSZ / chunkSize) * KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT)) { - *((uint32 *)nullptr) = 1; // ??? + *((uint32 *)NULL) = 1; // ??? } uint8 start = i; while (i < nchunks - 1 && cfsExtentAddress->chunknos[i + 1] == cfsExtentAddress->chunknos[i] + 1) { @@ -268,18 +353,18 @@ size_t KPHCCfsWritePage(SMgrRelation reln, ForkNumber forknum, BlockNumber logic } int writeAmount = chunkSize * ((i - start) + 1); if ((seekPos + extentStartOffset) > (((BlockNumber)RELSEG_SIZE) * BLCKSZ)) { - *((uint32 *)nullptr) = 1; // ??? + *((uint32 *)NULL) = 1; // ??? } int nbytes = FilePWrite(location.fd, bufferPos, writeAmount, seekPos + extentStartOffset, (uint32)WAIT_EVENT_DATA_FILE_WRITE); if (nbytes != writeAmount) { - if (compressedBuffer != nullptr && compressedBuffer != buffer) { + if (compressedBuffer != NULL && compressedBuffer != buffer) { pfree(compressedBuffer); } KPHCPciBufFreePage(ctrl, location, false); } } - if (compressedBuffer != nullptr && compressedBuffer != buffer) { + if (compressedBuffer != NULL && compressedBuffer != buffer) { pfree(compressedBuffer); } KPHCPciBufFreePage(ctrl, location, false); diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index adb18fb..2a18c99 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -43,6 +43,7 @@ #include "storage/cfs_buffers.h" #include "storage/page_compression.h" #include "common/pg_lzcompress.h" + /* * The magnetic disk storage manager keeps track of open file * descriptors in its own descriptor pool. This is done to make it @@ -669,61 +670,17 @@ mdwriteback(SMgrRelation reln, ForkNumber forknum, } } -typedef size_t KPHC_CFS_STORAGE_TYPE; -const unsigned KPHC_CMP_COMPRESS_LEVEL_SYMBOL = 3; -const int KPHC_CFS_EXTENT_SIZE = 129; -const uint32 CHUNK_SIZE_LIST[4] = {BLCKSZ / 2, BLCKSZ / 4, BLCKSZ / 8, BLCKSZ / 16}; -const int KPHC_EXTENT_OPEN_FILE = 0; -const int KPHC_WRITE_BACK_OPEN_FILE = 1; -const int KPHC_EXTENT_CREATE_FILE = 2; const KPHC_CFS_STORAGE_TYPE COMMON_STORAGE = 0; -#define KPHC_COMPRESS_ALGORITHM_PGLZ 1 -#define KPHC_COMPRESS_ALGORITHM_ZSTD 2 -#define KPHC_COMPRESS_UNSUPPORTED_ERROR (-2) -#define KPHC_IS_COMPRESSED_MAINFORK(reln, forkNum) ((reln)->smgr_rnode.node.opt != 0 && (forkNum) == MAIN_FORKNUM) -typedef struct PageCompressData { - char page_header[SizeOfPageHeaderData]; - uint32 crc32; - uint32 size : 16; - uint32 byte_convert : 1; - uint32 diff_convert : 1; - uint32 algorithm : 4; - uint32 unused : 10; - char data[FLEXIBLE_ARRAY_MEMBER]; -} KPHCPageCompressData; enum KPHC_SMGR_READ_STATUS { SMGR_RD_OK = 0, SMGR_RD_NO_BLOCK = 1, SMGR_RD_CRC_ERROR = 2 }; -typedef KPHCExtentLocation (*KPHCCfsLocationConvert)(SMgrRelation sRel, ForkNumber forknum, BlockNumber - logicBlockNumber, bool skipSync, int type); -inline void KPHCAnalyzeCompressOptions(const RelFileNode node, KPHCRelFileCompressOption* opt); -KPHCExtentLocation KPHCStorageConvert(SMgrRelation sRel, ForkNumber forknum, BlockNumber logicBlockNumber, bool skipSync, - int type); -int KPHCDecompressPage(const char* src, char* dst); -int KPHCCfsReadPage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, - KPHC_CFS_STORAGE_TYPE type); -inline void KPHCAnalyzeCompressOptions(const RelFileNode node, KPHCRelFileCompressOption* opt) -{ - unsigned short compressOption = node.opt; - opt->compressChunkSize = compressOption & gCmpBitStruct[KPHC_CMP_CHUNK_SIZE_INDEX].mask; - compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_CHUNK_SIZE_INDEX].bitLen; - opt->compressAlgorithm = compressOption & gCmpBitStruct[KPHC_CMP_ALGORITHM_INDEX].mask; - compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_ALGORITHM_INDEX].bitLen; - opt->compressLevel = compressOption & gCmpBitStruct[KPHC_CMP_LEVEL_INDEX].mask; - compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_LEVEL_INDEX].bitLen; - opt->compressLevelSymbol = compressOption & gCmpBitStruct[KPHC_CMP_COMPRESS_LEVEL_SYMBOL].mask; - compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_COMPRESS_LEVEL_SYMBOL].bitLen; - opt->compressPreallocChunks = compressOption & gCmpBitStruct[KPHC_CMP_PRE_CHUNK_INDEX].mask; - compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_PRE_CHUNK_INDEX].bitLen; - opt->diffConvert = compressOption & gCmpBitStruct[KPHC_CMP_DIFF_CONVERT_INDEX].mask; - compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_DIFF_CONVERT_INDEX].bitLen; - opt->byteConvert = compressOption & gCmpBitStruct[KPHC_CMP_BYTE_CONVERT_INDEX].mask; - compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_BYTE_CONVERT_INDEX].bitLen; -} +KPHCExtentLocation KPHCStorageConvert(SMgrRelation sRel, ForkNumber forknum, BlockNumber logicBlockNumber, bool skipSync, int type); +int KPHCDecompressPage(const char* src, char* dst); +int KPHCCfsReadPage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, KPHC_CFS_STORAGE_TYPE type); KPHCExtentLocation KPHCStorageConvert(SMgrRelation sRel, ForkNumber forknum, BlockNumber logicBlockNumber, bool skipSync, int type) @@ -769,6 +726,7 @@ KPHCCfsLocationConvert cfsLocationConverts[2] = { KPHCStorageConvert, NULL }; + inline size_t SizeOfKPHCExtentAddress(uint16 chunkSize) { if (chunkSize == 0) { return -1; @@ -784,6 +742,7 @@ inline KPHCCfsExtentAddress *KPHCGetExtentAddress(KPHCCfsExtentHeader *header, u size_t sizeOfKPHCExtentAddress = SizeOfKPHCExtentAddress(chunkSize); return (KPHCCfsExtentAddress *) (((char *) header) + headerOffset + blockOffset * sizeOfKPHCExtentAddress); } + int KPHCDecompressPage(const char* src, char* dst) { int decompressed_size; @@ -823,6 +782,7 @@ int KPHCDecompressPage(const char* src, char* dst) } return headerSize + decompressed_size; } + int KPHCCfsReadPage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, KPHC_CFS_STORAGE_TYPE type) { diff --git a/src/common/pg_lzcompress.c b/src/common/pg_lzcompress.c index 5667802..1cfa74b 100644 --- a/src/common/pg_lzcompress.c +++ b/src/common/pg_lzcompress.c @@ -386,6 +386,8 @@ do { \ } \ } while (0) +#define HIST_START_LEN (sizeof(PGLZ_HistEntry*) * PGLZ_HISTORY_LISTS) +#define HIST_ENTRIES_LEN (sizeof(PGLZ_HistEntry) * PGLZ_HISTORY_SIZE) /* ---------- * pglz_find_match - @@ -932,4 +934,149 @@ int32 KPHCLzDecompress(const KPHC_PGLZ_Header* source, char* dest) #endif } return source->rawsize; +} + +bool KPHCLzCompress(const char* source, int32 slen, KPHC_PGLZ_Header* dest, const PGLZ_Strategy* strategy) +{ + unsigned char* bp = ((unsigned char*)dest) + sizeof(KPHC_PGLZ_Header); + unsigned char* bstart = bp; + int hist_next = 0; + bool hist_recycle = false; + const char* dp = source; + const char* dend = source + slen; + unsigned char ctrl_dummy = 0; + unsigned char* ctrlp = &ctrl_dummy; + unsigned char ctrlb = 0; + unsigned char ctrl = 0; + bool found_match = false; + int32 match_len; + int32 match_off; + int32 good_match; + int32 good_drop; + int32 result_size; + int32 result_max; + int32 need_rate; + + /* + * Our fallback strategy is the default. + */ + if (strategy == NULL) + strategy = PGLZ_strategy_default; + + /* + * If the strategy forbids compression (at all or if source chunk size out + * of range), fail. + */ + if (strategy->match_size_good <= 0 || slen < strategy->min_input_size || slen > strategy->max_input_size) + return false; + + /* + * Save the original source size in the header. + */ + dest->rawsize = slen; + + /* + * Limit the match parameters to the supported range. + */ + good_match = strategy->match_size_good; + if (good_match > PGLZ_MAX_MATCH) + good_match = PGLZ_MAX_MATCH; + else if (good_match < 17) + good_match = 17; + + good_drop = strategy->match_size_drop; + if (good_drop < 0) + good_drop = 0; + else if (good_drop > 100) + good_drop = 100; + + need_rate = strategy->min_comp_rate; + if (need_rate < 0) + need_rate = 0; + else if (need_rate > 99) + need_rate = 99; + + /* + * Compute the maximum result size allowed by the strategy, namely the + * input size minus the minimum wanted compression rate. This had better + * be <= slen, else we might overrun the provided output buffer. + */ + if (slen > (INT_MAX / 100)) { + /* Approximate to avoid overflow */ + result_max = (slen / 100) * (100 - need_rate); + } else + result_max = (slen * (100 - need_rate)) / 100; + + /* + * Initialize the history lists to empty. We do not need to zero the + * u_sess->utils_cxt.hist_entries[] array; its entries are initialized as they are used. + */ + memset_s(hist_start, HIST_START_LEN, 0, HIST_START_LEN); + + /* + * Compress the source directly into the output buffer. + */ + while (dp < dend) { + /* + * If we already exceeded the maximum result size, fail. + * + * We check once per loop; since the loop body could emit as many as 4 + * bytes (a control byte and 3-byte tag), PGLZ_MAX_OUTPUT() had better + * allow 4 slop bytes. + */ + if (bp - bstart >= result_max) + return false; + + /* + * If we've emitted more than first_success_by bytes without finding + * anything compressible at all, fail. This lets us fall out + * reasonably quickly when looking at incompressible input (such as + * pre-compressed data). + */ + if (!found_match && bp - bstart >= strategy->first_success_by) + return false; + + /* + * Try to find a match in the history + */ + if (pglz_find_match(hist_start, dp, dend, &match_len, &match_off, good_match, good_drop, 0xffffffff)) { // mask可能有错 + /* + * Create the tag and add history entries for all matched + * characters. + */ + pglz_out_tag(ctrlp, ctrlb, ctrl, bp, match_len, match_off); + while (match_len--) { + pglz_hist_add( + hist_start, hist_entries, hist_next, hist_recycle, dp, dend, 0xffffffff); // mask可能有错 + dp++; /* Do not do this ++ in the line above! */ + /* The macro would do it four times - Jan. */ + } + found_match = true; + } else { + /* + * No match found. Copy one literal byte. + */ + pglz_out_literal(ctrlp, ctrlb, ctrl, bp, *dp); + pglz_hist_add( + hist_start, hist_entries, hist_next, hist_recycle, dp, dend, 0xffffffff); // mask可能有错 + dp++; /* Do not do this ++ in the line above! */ + /* The macro would do it four times - Jan. */ + } + } + + /* + * Write out the last control byte and check that we haven't overrun the + * output size allowed by the strategy. + */ + *ctrlp = ctrlb; + result_size = bp - bstart; + if (result_size >= result_max) + return false; + + /* + * Success - need only fill in the actual length of the compressed datum. + */ + SET_VARSIZE_COMPRESSED(dest, result_size + sizeof(KPHC_PGLZ_Header)); + + return true; } \ No newline at end of file diff --git a/src/include/c.h b/src/include/c.h index ffcdb05..c7628cd 100644 --- a/src/include/c.h +++ b/src/include/c.h @@ -612,6 +612,8 @@ typedef double float8; typedef Oid regproc; typedef regproc RegProcedure; +typedef uint32 ShortTransactionId; + typedef uint32 TransactionId; typedef uint32 LocalTransactionId; diff --git a/src/include/common/pg_lzcompress.h b/src/include/common/pg_lzcompress.h index fb660b8..97e517e 100644 --- a/src/include/common/pg_lzcompress.h +++ b/src/include/common/pg_lzcompress.h @@ -68,6 +68,16 @@ typedef struct PGLZ_Header { int32 vl_len_; int32 rawsize; } KPHC_PGLZ_Header; + +/* ---------- + * Local definitions + * ---------- + */ +#define PGLZ_HISTORY_LISTS 8192 /* must be power of 2 */ +#define PGLZ_HISTORY_MASK (PGLZ_HISTORY_LISTS - 1) +#define PGLZ_HISTORY_SIZE 4096 +#define PGLZ_MAX_MATCH 273 + /* ---------- * The standard strategies * @@ -90,6 +100,8 @@ extern int32 pglz_compress(const char *source, int32 slen, char *dest, const PGLZ_Strategy *strategy); extern int32 pglz_decompress(const char *source, int32 slen, char *dest, int32 rawsize, bool check_complete); + +extern bool KPHCLzCompress(const char* source, int32 slen, KPHC_PGLZ_Header* dest, const PGLZ_Strategy* strategy); extern int32 KPHCLzDecompress(const KPHC_PGLZ_Header* source, char* dest); extern int32 pglz_maximum_compressed_size(int32 rawsize, int32 total_compressed_size); diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h index e9f253f..2b4bad4 100644 --- a/src/include/storage/bufpage.h +++ b/src/include/storage/bufpage.h @@ -165,6 +165,32 @@ typedef struct PageHeaderData typedef PageHeaderData *PageHeader; +/* + * HeapPageHeaderData -- data that stored at the begin of each new version heap page. + * pd_xid_base - base value for transaction IDs on page + * pd_multi_base - base value for multixact IDs on page + * + */ +typedef struct { + /* XXX LSN is member of *any* block, not only page-organized ones */ + PageXLogRecPtr pd_lsn; /* LSN: next byte after last byte of xlog + * record for last change to this page */ + uint16 pd_checksum; /* checksum */ + uint16 pd_flags; /* flag bits, see below */ + LocationIndex pd_lower; /* offset to start of free space */ + LocationIndex pd_upper; /* offset to end of free space */ + LocationIndex pd_special; /* offset to start of special space */ + uint16 pd_pagesize_version; + ShortTransactionId pd_prune_xid; /* oldest prunable XID, or zero if none */ + TransactionId pd_xid_base; /* base value for transaction IDs on page */ + TransactionId pd_multi_base; /* base value for multixact IDs on page */ + ItemIdData pd_linp[FLEXIBLE_ARRAY_MEMBER]; /* beginning of line pointer array */ +} HeapPageHeaderData; + +typedef HeapPageHeaderData* HeapPageHeader; + +#define GetPageHeaderSize(page) (PageIs8BXidHeapVersion(page) ? SizeOfHeapPageHeaderData : SizeOfPageHeaderData) + /* * pd_flags contains the following flag bits. Undefined bits are initialized * to zero and may be used in the future. @@ -198,6 +224,7 @@ typedef PageHeaderData *PageHeader; */ #define PG_PAGE_LAYOUT_VERSION 4 #define PG_DATA_CHECKSUM_VERSION 1 +#define PG_HEAP_PAGE_LAYOUT_VERSION 6 /* ---------------------------------------------------------------- * page support macros @@ -214,6 +241,7 @@ typedef PageHeaderData *PageHeader; * line pointer(s) do not count as part of header */ #define SizeOfPageHeaderData (offsetof(PageHeaderData, pd_linp)) +#define SizeOfHeapPageHeaderData (offsetof(HeapPageHeaderData, pd_linp)) /* * PageIsEmpty @@ -275,6 +303,8 @@ typedef PageHeaderData *PageHeader; #define PageGetPageLayoutVersion(page) \ (((PageHeader) (page))->pd_pagesize_version & 0x00FF) +#define PageIs8BXidHeapVersion(page) (PageGetPageLayoutVersion(page) == PG_HEAP_PAGE_LAYOUT_VERSION) + /* * PageSetPageSizeAndVersion * Sets the page size and page layout version number of a page. diff --git a/src/include/storage/cfs_buffers.h b/src/include/storage/cfs_buffers.h index 31eb691..2f46383 100644 --- a/src/include/storage/cfs_buffers.h +++ b/src/include/storage/cfs_buffers.h @@ -12,6 +12,7 @@ #include "storage/shmem.h" #include "utils/hsearch.h" #include "storage/relfilenode.h" +#include "storage/smgr.h" typedef struct ExtentLocation { int fd; @@ -125,6 +126,8 @@ typedef enum pciBufferReadMode { PCI_BUF_NO_READ } KPHCPciBufferReadMode; +typedef KPHCExtentLocation (*KPHCCfsLocationConvert)(SMgrRelation sRel, ForkNumber forknum, BlockNumber logicBlockNumber, bool skipSync, int type); + extern KPHCPciPageBuffCtx *gPciBufCtx; extern void KPHCPciBufInitCtx(void); diff --git a/src/include/storage/page_compression.h b/src/include/storage/page_compression.h index b9cd905..62eb10c 100644 --- a/src/include/storage/page_compression.h +++ b/src/include/storage/page_compression.h @@ -3,6 +3,7 @@ #include "c.h" #include "storage/relfilenode.h" +#include "storage/bufpage.h" #include "utils/rel.h" #define KPHC_CMP_BYTE_CONVERT_LEN 1 @@ -25,6 +26,16 @@ #define COMPRESS_ALGORITHM_ZSTD 2 #define COMPRESS_ALGORITHM_PGZSTD 3 +#define KPHC_COMPRESS_ALGORITHM_PGLZ 1 +#define KPHC_COMPRESS_ALGORITHM_ZSTD 2 +#define KPHC_COMPRESS_UNSUPPORTED_ERROR (-2) +#define KPHC_IS_COMPRESSED_MAINFORK(reln, forkNum) ((reln)->smgr_rnode.node.opt != 0 && (forkNum) == MAIN_FORKNUM) + +#define DEFAULT_ZSTD_COMPRESSION_LEVEL (1) +#define MIN_ZSTD_COMPRESSION_LEVEL ZSTD_minCLevel() +#define MAX_ZSTD_COMPRESSION_LEVEL ZSTD_maxCLevel() + +typedef size_t KPHC_CFS_STORAGE_TYPE; typedef uint32 pc_chunk_number_t; typedef struct CmpBitStuct { @@ -43,6 +54,28 @@ typedef struct relFileCompressOption { unsigned compressChunkSize : KPHC_CMP_CHUNK_SIZE_LEN; }KPHCRelFileCompressOption; +typedef struct PageCompressData { + char page_header[SizeOfPageHeaderData]; + uint32 crc32; + uint32 size : 16; + uint32 byte_convert : 1; + uint32 diff_convert : 1; + uint32 algorithm : 4; + uint32 unused : 10; + char data[FLEXIBLE_ARRAY_MEMBER]; +} KPHCPageCompressData; + +typedef struct HeapPageCompressData { + char page_header[SizeOfHeapPageHeaderData]; + uint32 crc32; + uint32 size : 16; + uint32 byte_convert : 1; + uint32 diff_convert : 1; + uint32 algorithm : 4; + uint32 unused : 10; + char data[FLEXIBLE_ARRAY_MEMBER]; +} KPHCHeapPageCompressData; + #define SET_COMPRESS_OPTION(node, byteConvert, diffConvert, preChunks, symbol, level ,algorithm, chunkSize) \ do { \ (node).opt = 0; \ @@ -72,20 +105,32 @@ extern const uint32 INDEX_OF_QUARTER_BLCKSZ; extern const uint32 INDEX_OF_EIGHTH_BLCKSZ; extern const uint32 INDEX_OF_SIXTEENTHS_BLCKSZ; -extern const int KPHC_CFS_EXTENT_SIZE; -extern const int KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT; -extern const int KPHC_CFS_EXTENT_COUNT_PER_FILE; -extern const int KPHC_CFS_MAX_BLOCK_PER_FILE; +// extern const int KPHC_CFS_EXTENT_SIZE; +// extern const int KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT; +// extern const int KPHC_CFS_EXTENT_COUNT_PER_FILE; +// extern const int KPHC_CFS_MAX_BLOCK_PER_FILE; +#define KPHC_CFS_EXTENT_SIZE 129 +#define KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT (KPHC_CFS_EXTENT_SIZE - 1) +#define KPHC_CFS_EXTENT_COUNT_PER_FILE (RELSEG_SIZE / KPHC_CFS_EXTENT_SIZE) +#define KPHC_CFS_MAX_BLOCK_PER_FILE (KPHC_CFS_EXTENT_COUNT_PER_FILE * KPHC_CFS_EXTENT_SIZE) + + extern const int KPHC_EXTENT_OPEN_FILE; extern const int KPHC_WRITE_BACK_OPEN_FILE; extern const int KPHC_EXTENT_CREATE_FILE; extern const KPHCCmpBitStuct gCmpBitStruct[]; +extern const uint32 CHUNK_SIZE_LIST[4]; + uint8 ConvertChunkSize(uint32 compressedChunkSize, bool *success); void SetupPageCompressForRelation(RelFileNode *node, PageCompressOpts *compress_options, const char *relationName); +int KPHCCompressPage(const char* src, char* dst, int dst_size, KPHCRelFileCompressOption option); +char *KPHCCfsCompressPage(const char *buffer, KPHCRelFileCompressOption *option, uint8 *nchunks); + +inline void KPHCAnalyzeCompressOptions(const RelFileNode node, KPHCRelFileCompressOption* opt); size_t KPHCCfsWritePage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, bool sipSync, KPHC_CFS_STORAGE_TYPE type); #endif // PAGE_COMPRESSION_H \ No newline at end of file -- Gitee From f981756d79afd8848cff74de75573d5284fdbe02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A6=99=E8=95=89=E5=82=A8=E8=93=84=E6=89=80?= <727854256@qq.com> Date: Fri, 17 Jan 2025 17:16:14 +0800 Subject: [PATCH 06/11] =?UTF-8?q?[dbg][normal]1.=E8=A7=A3=E5=86=B3WritePag?= =?UTF-8?q?e=E5=8F=8A=E5=8E=8B=E7=BC=A9=E5=87=BD=E6=95=B0=E6=89=80?= =?UTF-8?q?=E6=9C=89=E7=BC=96=E8=AF=91=E9=97=AE=E9=A2=98=EF=BC=8C=E4=BB=8D?= =?UTF-8?q?=E5=AD=98=E5=9C=A8=E9=93=BE=E6=8E=A5=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/storage/page/page_compression.c | 6 +++--- src/include/storage/page_compression.h | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/backend/storage/page/page_compression.c b/src/backend/storage/page/page_compression.c index 52fad98..5636c76 100644 --- a/src/backend/storage/page/page_compression.c +++ b/src/backend/storage/page/page_compression.c @@ -10,6 +10,8 @@ #include "utils/wait_event.h" #include "common/pg_lzcompress.h" +extern KPHCCfsLocationConvert cfsLocationConverts[]; + uint32 AddrChecksum32(const KPHCCfsExtentAddress *cfsExtentAddress, const int needChunks); const uint32 INDEX_OF_HALF_BLCKSZ = 0; @@ -297,7 +299,7 @@ char *KPHCCfsCompressPage(const char *buffer, KPHCRelFileCompressOption *option, return workBuffer; } -inline void KPHCAnalyzeCompressOptions(const RelFileNode node, KPHCRelFileCompressOption* opt) +void KPHCAnalyzeCompressOptions(const RelFileNode node, KPHCRelFileCompressOption* opt) { unsigned short compressOption = node.opt; opt->compressChunkSize = compressOption & gCmpBitStruct[KPHC_CMP_CHUNK_SIZE_INDEX].mask; @@ -316,8 +318,6 @@ inline void KPHCAnalyzeCompressOptions(const RelFileNode node, KPHCRelFileCompre compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_BYTE_CONVERT_INDEX].bitLen; } -extern KPHCCfsLocationConvert cfsLocationConverts[]; - size_t KPHCCfsWritePage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, bool sipSync, KPHC_CFS_STORAGE_TYPE type) { KPHCExtentLocation location = cfsLocationConverts[type](reln, forknum, logicBlockNumber, sipSync, KPHC_EXTENT_OPEN_FILE); diff --git a/src/include/storage/page_compression.h b/src/include/storage/page_compression.h index 62eb10c..699316b 100644 --- a/src/include/storage/page_compression.h +++ b/src/include/storage/page_compression.h @@ -131,6 +131,7 @@ int KPHCCompressPage(const char* src, char* dst, int dst_size, KPHCRelFileCompre char *KPHCCfsCompressPage(const char *buffer, KPHCRelFileCompressOption *option, uint8 *nchunks); inline void KPHCAnalyzeCompressOptions(const RelFileNode node, KPHCRelFileCompressOption* opt); + size_t KPHCCfsWritePage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, bool sipSync, KPHC_CFS_STORAGE_TYPE type); #endif // PAGE_COMPRESSION_H \ No newline at end of file -- Gitee From 81136ae4a113d892e1aa570cdd4e9da5cd37db9a Mon Sep 17 00:00:00 2001 From: chenlinfeng <723609220@qq.com> Date: Sat, 18 Jan 2025 16:21:30 +0800 Subject: [PATCH 07/11] add write --- src/backend/access/common/reloptions.c | 27 +++++- src/backend/storage/page/page_compression.c | 73 ++++++++++------ src/backend/storage/smgr/md.c | 25 ++---- src/common/pg_lzcompress.c | 97 ++++----------------- src/include/common/pg_lzcompress.h | 9 -- src/include/storage/page_compression.h | 20 ++--- 6 files changed, 103 insertions(+), 148 deletions(-) diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c index 6527eab..5edaf10 100644 --- a/src/backend/access/common/reloptions.c +++ b/src/backend/access/common/reloptions.c @@ -382,7 +382,32 @@ static relopt_int intRelOpts[] = }, -1, 0, 1024 }, - + { + { + "compress_level", + "Level of page compression.", + RELOPT_KIND_HEAP | RELOPT_KIND_BTREE + }, + 0, -31, 31 + }, + { + { + "compresstype", + "compress type (none, pglz or zstd. pgzstd isn't available now).", + RELOPT_KIND_HEAP | RELOPT_KIND_BTREE + }, + 0, 0, 2 + }, + { + { + "compress_chunk_size", + "Size of chunk to store compressed page.", + RELOPT_KIND_HEAP | RELOPT_KIND_BTREE + }, + BLCKSZ / 2, + BLCKSZ / 16, + BLCKSZ / 2 + }, /* list terminator */ {{NULL}} }; diff --git a/src/backend/storage/page/page_compression.c b/src/backend/storage/page/page_compression.c index 5636c76..6213b20 100644 --- a/src/backend/storage/page/page_compression.c +++ b/src/backend/storage/page/page_compression.c @@ -1,15 +1,13 @@ -// #define USE_ZSTD -// #include - #include "postgres.h" #include "storage/fd.h" #include "storage/bufpage.h" -#include "storage/cfs_buffers.h" #include "storage/page_compression.h" #include "utils/rel.h" #include "utils/wait_event.h" #include "common/pg_lzcompress.h" - +#ifdef USE_ZSTD +#include +#endif extern KPHCCfsLocationConvert cfsLocationConverts[]; uint32 AddrChecksum32(const KPHCCfsExtentAddress *cfsExtentAddress, const int needChunks); @@ -69,7 +67,11 @@ inline int CompressPageBufferBound(const char* page, uint8 algorithm) case COMPRESS_ALGORITHM_PGLZ: return BLCKSZ * 2; case COMPRESS_ALGORITHM_ZSTD: +#ifdef USE_ZSTD return ZSTD_compressBound(BLCKSZ - CompressReservedLen(page)); +#else + return -1; +#endif case COMPRESS_ALGORITHM_PGZSTD: return BLCKSZ + 4; default: @@ -122,7 +124,7 @@ inline size_t SizeOfExtentAddress(uint16 chunkSize) return offsetof(KPHCCfsExtentAddress, chunknos) + sizeof(uint16) * BLCKSZ / chunkSize; } -inline off_t OffsetOfPageCompressChunk(uint16 chunkSize, int chunkNo) +off_t OffsetOfPageCompressChunk(uint16 chunkSize, int chunkNo) { return chunkSize * (chunkNo - 1); } @@ -133,7 +135,7 @@ inline size_t SizeOfExtentAddressByChunks(uint8 nChunks) } // 获取extent地址 -inline KPHCCfsExtentAddress *GetExtentAddress(KPHCCfsExtentHeader *header, uint16 blockOffset) +KPHCCfsExtentAddress *GetExtentAddress(KPHCCfsExtentHeader *header, uint16 blockOffset) { uint16 chunkSize = header->chunk_size; size_t headerOffset = offsetof(KPHCCfsExtentHeader, cfsExtentAddress); @@ -201,18 +203,20 @@ static inline bool ExtendChunksOfBlockCore(KPHCCfsExtentHeader *cfsExtentHeader, static inline bool ExtendChunksOfBlock(KPHCCfsExtentHeader *cfsExtentHeader, KPHCExtentLocation *location, uint8 needChunks, uint8 actualUse) { - KPHCCfsExtentAddress *cfsExtentAddress = GetExtentAddress(cfsExtentHeader, (uint16)location->extentOffset); + KPHCCfsExtentAddress *cfsExtentAddress = KPHCGetExtentAddress(cfsExtentHeader, (uint16)location->extentOffset); return ExtendChunksOfBlockCore(cfsExtentHeader, cfsExtentAddress, needChunks, actualUse); } -int KPHCCompressPage(const char* src, char* dst, int dst_size, KPHCRelFileCompressOption option) +int KPHCCompressPage(const char* src, char* dst, int dst_size, const KPHCRelFileCompressOption option) { int compressed_size; +#ifdef USE_ZSTD int8 level = option.compressLevelSymbol ? option.compressLevel : -option.compressLevel; +#endif size_t sizeOfHeaderData = SizeOfPageHeaderData; bool real_ByteConvert = false; bool success; - + KPHCPageCompressData* pcdptr; char* data = ((KPHCPageCompressData*)dst)->data; PageHeaderData *page = (PageHeaderData *)src; bool heapPageData = page->pd_special == BLCKSZ; @@ -224,6 +228,7 @@ int KPHCCompressPage(const char* src, char* dst, int dst_size, KPHCRelFileCompre break; } case COMPRESS_ALGORITHM_ZSTD: { +#ifdef USE_ZSTD if (level == 0 || level < MIN_ZSTD_COMPRESSION_LEVEL || level > MAX_ZSTD_COMPRESSION_LEVEL) { level = DEFAULT_ZSTD_COMPRESSION_LEVEL; } @@ -242,6 +247,9 @@ int KPHCCompressPage(const char* src, char* dst, int dst_size, KPHCRelFileCompre return -1; } break; +#else + return -1; +#endif } default: return KPHC_COMPRESS_UNSUPPORTED_ERROR; @@ -251,10 +259,9 @@ int KPHCCompressPage(const char* src, char* dst, int dst_size, KPHCRelFileCompre return -1; } - KPHCPageCompressData* pcdptr = ((KPHCPageCompressData*)dst); - memcpy_s(pcdptr->page_header, sizeOfHeaderData, src, sizeOfHeaderData); + pcdptr = ((KPHCPageCompressData*)dst); + memcpy(pcdptr->page_header, src, sizeOfHeaderData); pcdptr->size = compressed_size; - pcdptr->crc32 = DataBlockChecksum(data, compressed_size, true); pcdptr->byte_convert = real_ByteConvert; pcdptr->diff_convert = option.diffConvert; pcdptr->algorithm = option.compressAlgorithm; @@ -271,7 +278,7 @@ char *KPHCCfsCompressPage(const char *buffer, KPHCRelFileCompressOption *option, uint8 preallocChunk = option->compressPreallocChunks; char *workBuffer; int compressBufferSize; - + uint32 bufferSize; if (workBufferSize < 0) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("mdwrite algorithm:%d, chunkSize:%d, level:%d, preallocChunk:%d", @@ -284,7 +291,7 @@ char *KPHCCfsCompressPage(const char *buffer, KPHCRelFileCompressOption *option, compressBufferSize = BLCKSZ; } *nchunks = (uint8)((unsigned int)(compressBufferSize - 1) / chunkSize + 1); - uint32 bufferSize = chunkSize * (*nchunks); + bufferSize = chunkSize * (*nchunks); if (bufferSize >= BLCKSZ) { pfree(workBuffer); workBuffer = (char*)buffer; @@ -292,7 +299,7 @@ char *KPHCCfsCompressPage(const char *buffer, KPHCRelFileCompressOption *option, } else { if ((uint32)compressBufferSize < bufferSize) { uint32 leftSize = bufferSize - (uint32)compressBufferSize; - memset_s(workBuffer + compressBufferSize, leftSize, 0, leftSize); + memset(workBuffer + compressBufferSize, 0, leftSize); } } @@ -320,42 +327,50 @@ void KPHCAnalyzeCompressOptions(const RelFileNode node, KPHCRelFileCompressOptio size_t KPHCCfsWritePage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, bool sipSync, KPHC_CFS_STORAGE_TYPE type) { + uint16 chunkSize; + uint8 start; + int writeAmount; + uint8 nchunks; + char *compressedBuffer; + uint8 needChunksl; + bool changed; + off_t extentStartOffset; + int nbytes; KPHCExtentLocation location = cfsLocationConverts[type](reln, forknum, logicBlockNumber, sipSync, KPHC_EXTENT_OPEN_FILE); KPHCPciPageCtrl *ctrl = KPHCPciBufReadPage(location, LW_SHARED, PCI_BUF_NORMAL_READ); KPHCCfsExtentHeader *cfsExtentHeader = ctrl->pciPage; - KPHCCfsExtentAddress *cfsExtentAddress = GetExtentAddress(cfsExtentAddress, (uint16)location.extentOffset); + KPHCCfsExtentAddress *cfsExtentAddress = KPHCGetExtentAddress(cfsExtentHeader, (uint16)location.extentOffset); // 获取压缩参数 KPHCRelFileCompressOption option; KPHCAnalyzeCompressOptions(reln->smgr_rnode.node, &option); - uint16 chunkSize = cfsExtentHeader->chunk_size; + chunkSize = cfsExtentHeader->chunk_size; // 执行页压缩 - uint8 nchunks; - char *compressedBuffer = KPHCCfsCompressPage(buffer, &option, &nchunks); + compressedBuffer = KPHCCfsCompressPage(buffer, &option, &nchunks); // 设置地址 - uint8 needChunks = option.compressPreallocChunks > nchunks ? (uint8)option.compressPreallocChunks : (uint8)nchunks; - bool changed = ExtendChunksOfBlock(cfsExtentHeader, &location, needChunks, nchunks); + needChunks = option.compressPreallocChunks > nchunks ? (uint8)option.compressPreallocChunks : (uint8)nchunks; + changed = ExtendChunksOfBlock(cfsExtentHeader, &location, needChunks, nchunks); - off_t extentStartOffset = location.extentStart * BLCKSZ; + extentStartOffset = location.extentStart * BLCKSZ; // 写入每个chunk - for (uint8 i = 0; i < nchunks; ++i) { - uint8 bufferPos = compressedBuffer + (long)chunkSize * i; + for (size_t i = 0; i < nchunks; ++i) { + size_t bufferPos = (size_t)compressedBuffer + (long)chunkSize * i; off_t seekPos = OffsetOfPageCompressChunk(chunkSize, cfsExtentAddress->chunknos[i]); if (cfsExtentAddress->chunknos[i] > ((BLCKSZ / chunkSize) * KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT)) { *((uint32 *)NULL) = 1; // ??? } - uint8 start = i; + start = i; while (i < nchunks - 1 && cfsExtentAddress->chunknos[i + 1] == cfsExtentAddress->chunknos[i] + 1) { i++; } - int writeAmount = chunkSize * ((i - start) + 1); + writeAmount = chunkSize * ((i - start) + 1); if ((seekPos + extentStartOffset) > (((BlockNumber)RELSEG_SIZE) * BLCKSZ)) { *((uint32 *)NULL) = 1; // ??? } - int nbytes = FilePWrite(location.fd, bufferPos, writeAmount, seekPos + extentStartOffset, (uint32)WAIT_EVENT_DATA_FILE_WRITE); + nbytes = FileWrite(location.fd, (char *)bufferPos, writeAmount, seekPos + extentStartOffset, (uint32)WAIT_EVENT_DATA_FILE_WRITE); if (nbytes != writeAmount) { if (compressedBuffer != NULL && compressedBuffer != buffer) { pfree(compressedBuffer); @@ -367,6 +382,6 @@ size_t KPHCCfsWritePage(SMgrRelation reln, ForkNumber forknum, BlockNumber logic if (compressedBuffer != NULL && compressedBuffer != buffer) { pfree(compressedBuffer); } - KPHCPciBufFreePage(ctrl, location, false); + KPHCPciBufFreePage(ctrl, location, changed); return BLCKSZ; } diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index 2a18c99..3aff4ed 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -40,7 +40,6 @@ #include "storage/sync.h" #include "utils/hsearch.h" #include "utils/memutils.h" -#include "storage/cfs_buffers.h" #include "storage/page_compression.h" #include "common/pg_lzcompress.h" @@ -733,15 +732,6 @@ inline size_t SizeOfKPHCExtentAddress(uint16 chunkSize) { } return offsetof(KPHCCfsExtentAddress, chunknos) + sizeof(uint16) * BLCKSZ / chunkSize; } -inline off_t OffsetOfPageCompressChunk(uint16 chunkSize, int chunkNo) { - return chunkSize * (chunkNo - 1); -} -inline KPHCCfsExtentAddress *KPHCGetExtentAddress(KPHCCfsExtentHeader *header, uint16 blockOffset) { - uint16 chunkSize = header->chunk_size; - size_t headerOffset = offsetof(KPHCCfsExtentHeader, cfsExtentAddress); - size_t sizeOfKPHCExtentAddress = SizeOfKPHCExtentAddress(chunkSize); - return (KPHCCfsExtentAddress *) (((char *) header) + headerOffset + blockOffset * sizeOfKPHCExtentAddress); -} int KPHCDecompressPage(const char* src, char* dst) { @@ -919,13 +909,14 @@ mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, v = _mdfd_getseg(reln, forknum, blocknum, skipFsync, EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY); - - seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); - - Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); - - nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_WRITE); - + bool compressed = KPHC_IS_COMPRESSED_MAINFORK(reln, forknum); + if (compressed) { + nbytes = (int)KPHCCfsWritePage(reln, forknum, blocknum, buffer, skipFsync, COMMON_STORAGE); + } else { + seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); + Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); + nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_WRITE); + } TRACE_POSTGRESQL_SMGR_MD_WRITE_DONE(forknum, blocknum, reln->smgr_rnode.node.spcNode, reln->smgr_rnode.node.dbNode, diff --git a/src/common/pg_lzcompress.c b/src/common/pg_lzcompress.c index 1cfa74b..363a1f9 100644 --- a/src/common/pg_lzcompress.c +++ b/src/common/pg_lzcompress.c @@ -386,9 +386,6 @@ do { \ } \ } while (0) -#define HIST_START_LEN (sizeof(PGLZ_HistEntry*) * PGLZ_HISTORY_LISTS) -#define HIST_ENTRIES_LEN (sizeof(PGLZ_HistEntry) * PGLZ_HISTORY_SIZE) - /* ---------- * pglz_find_match - * @@ -957,27 +954,11 @@ bool KPHCLzCompress(const char* source, int32 slen, KPHC_PGLZ_Header* dest, cons int32 result_max; int32 need_rate; - /* - * Our fallback strategy is the default. - */ if (strategy == NULL) strategy = PGLZ_strategy_default; - - /* - * If the strategy forbids compression (at all or if source chunk size out - * of range), fail. - */ if (strategy->match_size_good <= 0 || slen < strategy->min_input_size || slen > strategy->max_input_size) return false; - - /* - * Save the original source size in the header. - */ dest->rawsize = slen; - - /* - * Limit the match parameters to the supported range. - */ good_match = strategy->match_size_good; if (good_match > PGLZ_MAX_MATCH) good_match = PGLZ_MAX_MATCH; @@ -995,88 +976,42 @@ bool KPHCLzCompress(const char* source, int32 slen, KPHC_PGLZ_Header* dest, cons need_rate = 0; else if (need_rate > 99) need_rate = 99; - - /* - * Compute the maximum result size allowed by the strategy, namely the - * input size minus the minimum wanted compression rate. This had better - * be <= slen, else we might overrun the provided output buffer. - */ if (slen > (INT_MAX / 100)) { - /* Approximate to avoid overflow */ result_max = (slen / 100) * (100 - need_rate); - } else + } else { result_max = (slen * (100 - need_rate)) / 100; - - /* - * Initialize the history lists to empty. We do not need to zero the - * u_sess->utils_cxt.hist_entries[] array; its entries are initialized as they are used. - */ - memset_s(hist_start, HIST_START_LEN, 0, HIST_START_LEN); - - /* - * Compress the source directly into the output buffer. - */ + } + memset(hist_start, 0, PGLZ_MAX_HISTORY_LISTS * sizeof(int16)); while (dp < dend) { - /* - * If we already exceeded the maximum result size, fail. - * - * We check once per loop; since the loop body could emit as many as 4 - * bytes (a control byte and 3-byte tag), PGLZ_MAX_OUTPUT() had better - * allow 4 slop bytes. - */ if (bp - bstart >= result_max) return false; - - /* - * If we've emitted more than first_success_by bytes without finding - * anything compressible at all, fail. This lets us fall out - * reasonably quickly when looking at incompressible input (such as - * pre-compressed data). - */ if (!found_match && bp - bstart >= strategy->first_success_by) return false; - - /* - * Try to find a match in the history - */ - if (pglz_find_match(hist_start, dp, dend, &match_len, &match_off, good_match, good_drop, 0xffffffff)) { // mask可能有错 - /* - * Create the tag and add history entries for all matched - * characters. - */ + if (pglz_find_match(hist_start, dp, dend, &match_len, &match_off, good_match, good_drop, 0xffffffff)) { pglz_out_tag(ctrlp, ctrlb, ctrl, bp, match_len, match_off); while (match_len--) { pglz_hist_add( - hist_start, hist_entries, hist_next, hist_recycle, dp, dend, 0xffffffff); // mask可能有错 - dp++; /* Do not do this ++ in the line above! */ - /* The macro would do it four times - Jan. */ + hist_start, hist_entries, hist_next, hist_recycle, dp, dend, 0xffffffff); + dp++; } found_match = true; } else { - /* - * No match found. Copy one literal byte. - */ pglz_out_literal(ctrlp, ctrlb, ctrl, bp, *dp); pglz_hist_add( - hist_start, hist_entries, hist_next, hist_recycle, dp, dend, 0xffffffff); // mask可能有错 - dp++; /* Do not do this ++ in the line above! */ - /* The macro would do it four times - Jan. */ + hist_start, hist_entries, hist_next, hist_recycle, dp, dend, 0xffffffff); + dp++; } } - - /* - * Write out the last control byte and check that we haven't overrun the - * output size allowed by the strategy. - */ *ctrlp = ctrlb; result_size = bp - bstart; - if (result_size >= result_max) + if (result_size >= result_max) { return false; - - /* - * Success - need only fill in the actual length of the compressed datum. - */ - SET_VARSIZE_COMPRESSED(dest, result_size + sizeof(KPHC_PGLZ_Header)); - + } + result_size = result_size + sizeof(KPHC_PGLZ_Header); +#ifndef FRONTEND + SET_VARSIZE_COMPRESSED(dest, result_size); +#else + return false; +#endif return true; } \ No newline at end of file diff --git a/src/include/common/pg_lzcompress.h b/src/include/common/pg_lzcompress.h index 97e517e..2172204 100644 --- a/src/include/common/pg_lzcompress.h +++ b/src/include/common/pg_lzcompress.h @@ -69,15 +69,6 @@ typedef struct PGLZ_Header { int32 rawsize; } KPHC_PGLZ_Header; -/* ---------- - * Local definitions - * ---------- - */ -#define PGLZ_HISTORY_LISTS 8192 /* must be power of 2 */ -#define PGLZ_HISTORY_MASK (PGLZ_HISTORY_LISTS - 1) -#define PGLZ_HISTORY_SIZE 4096 -#define PGLZ_MAX_MATCH 273 - /* ---------- * The standard strategies * diff --git a/src/include/storage/page_compression.h b/src/include/storage/page_compression.h index 699316b..856b752 100644 --- a/src/include/storage/page_compression.h +++ b/src/include/storage/page_compression.h @@ -5,7 +5,7 @@ #include "storage/relfilenode.h" #include "storage/bufpage.h" #include "utils/rel.h" - +#include "storage/cfs_buffers.h" #define KPHC_CMP_BYTE_CONVERT_LEN 1 #define KPHC_CMP_DIFF_CONVERT_LEN 1 #define KPHC_CMP_PRE_CHUNK_LEN 3 @@ -32,9 +32,10 @@ #define KPHC_IS_COMPRESSED_MAINFORK(reln, forkNum) ((reln)->smgr_rnode.node.opt != 0 && (forkNum) == MAIN_FORKNUM) #define DEFAULT_ZSTD_COMPRESSION_LEVEL (1) +#ifdef USE_ZSTD #define MIN_ZSTD_COMPRESSION_LEVEL ZSTD_minCLevel() #define MAX_ZSTD_COMPRESSION_LEVEL ZSTD_maxCLevel() - +#endif typedef size_t KPHC_CFS_STORAGE_TYPE; typedef uint32 pc_chunk_number_t; @@ -105,15 +106,9 @@ extern const uint32 INDEX_OF_QUARTER_BLCKSZ; extern const uint32 INDEX_OF_EIGHTH_BLCKSZ; extern const uint32 INDEX_OF_SIXTEENTHS_BLCKSZ; -// extern const int KPHC_CFS_EXTENT_SIZE; -// extern const int KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT; -// extern const int KPHC_CFS_EXTENT_COUNT_PER_FILE; -// extern const int KPHC_CFS_MAX_BLOCK_PER_FILE; + #define KPHC_CFS_EXTENT_SIZE 129 #define KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT (KPHC_CFS_EXTENT_SIZE - 1) -#define KPHC_CFS_EXTENT_COUNT_PER_FILE (RELSEG_SIZE / KPHC_CFS_EXTENT_SIZE) -#define KPHC_CFS_MAX_BLOCK_PER_FILE (KPHC_CFS_EXTENT_COUNT_PER_FILE * KPHC_CFS_EXTENT_SIZE) - extern const int KPHC_EXTENT_OPEN_FILE; extern const int KPHC_WRITE_BACK_OPEN_FILE; @@ -127,11 +122,14 @@ uint8 ConvertChunkSize(uint32 compressedChunkSize, bool *success); void SetupPageCompressForRelation(RelFileNode *node, PageCompressOpts *compress_options, const char *relationName); -int KPHCCompressPage(const char* src, char* dst, int dst_size, KPHCRelFileCompressOption option); +int KPHCCompressPage(const char* src, char* dst, int dst_size, const KPHCRelFileCompressOption option); char *KPHCCfsCompressPage(const char *buffer, KPHCRelFileCompressOption *option, uint8 *nchunks); -inline void KPHCAnalyzeCompressOptions(const RelFileNode node, KPHCRelFileCompressOption* opt); +void KPHCAnalyzeCompressOptions(const RelFileNode node, KPHCRelFileCompressOption* opt); size_t KPHCCfsWritePage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, bool sipSync, KPHC_CFS_STORAGE_TYPE type); +off_t OffsetOfPageCompressChunk(uint16 chunkSize, int chunkNo); + +KPHCCfsExtentAddress *GetExtentAddress(KPHCCfsExtentHeader *header, uint16 blockOffset); #endif // PAGE_COMPRESSION_H \ No newline at end of file -- Gitee From 48cb41b4378884e23b1a0eff5f2881500bf7c6c9 Mon Sep 17 00:00:00 2001 From: chenlinfeng <723609220@qq.com> Date: Sat, 18 Jan 2025 16:32:31 +0800 Subject: [PATCH 08/11] add write --- src/backend/storage/page/page_compression.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/backend/storage/page/page_compression.c b/src/backend/storage/page/page_compression.c index 6213b20..e21eb0a 100644 --- a/src/backend/storage/page/page_compression.c +++ b/src/backend/storage/page/page_compression.c @@ -135,7 +135,7 @@ inline size_t SizeOfExtentAddressByChunks(uint8 nChunks) } // 获取extent地址 -KPHCCfsExtentAddress *GetExtentAddress(KPHCCfsExtentHeader *header, uint16 blockOffset) +KPHCCfsExtentAddress *KPHCGetExtentAddress(KPHCCfsExtentHeader *header, uint16 blockOffset) { uint16 chunkSize = header->chunk_size; size_t headerOffset = offsetof(KPHCCfsExtentHeader, cfsExtentAddress); @@ -332,7 +332,7 @@ size_t KPHCCfsWritePage(SMgrRelation reln, ForkNumber forknum, BlockNumber logic int writeAmount; uint8 nchunks; char *compressedBuffer; - uint8 needChunksl; + uint8 needChunks; bool changed; off_t extentStartOffset; int nbytes; -- Gitee From fce906b4a9562bfaa866d67e78e47e0ffa991691 Mon Sep 17 00:00:00 2001 From: chenlinfeng <723609220@qq.com> Date: Sat, 18 Jan 2025 16:35:39 +0800 Subject: [PATCH 09/11] add write --- src/include/storage/page_compression.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/include/storage/page_compression.h b/src/include/storage/page_compression.h index 856b752..a435f79 100644 --- a/src/include/storage/page_compression.h +++ b/src/include/storage/page_compression.h @@ -131,5 +131,5 @@ size_t KPHCCfsWritePage(SMgrRelation reln, ForkNumber forknum, BlockNumber logic off_t OffsetOfPageCompressChunk(uint16 chunkSize, int chunkNo); -KPHCCfsExtentAddress *GetExtentAddress(KPHCCfsExtentHeader *header, uint16 blockOffset); +KPHCCfsExtentAddress *KPHCGetExtentAddress(KPHCCfsExtentHeader *header, uint16 blockOffset); #endif // PAGE_COMPRESSION_H \ No newline at end of file -- Gitee From ec4bf0393381d811e2a2ab80028b0e58064cfb5c Mon Sep 17 00:00:00 2001 From: chenlinfeng <723609220@qq.com> Date: Sat, 18 Jan 2025 16:57:57 +0800 Subject: [PATCH 10/11] add write --- src/backend/storage/smgr/md.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index 3aff4ed..c8d7371 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -895,7 +895,7 @@ mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, off_t seekpos; int nbytes; MdfdVec *v; - + bool compressed; /* This assert is too expensive to have on normally ... */ #ifdef CHECK_WRITE_VS_EXTEND Assert(blocknum < mdnblocks(reln, forknum)); @@ -909,7 +909,7 @@ mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, v = _mdfd_getseg(reln, forknum, blocknum, skipFsync, EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY); - bool compressed = KPHC_IS_COMPRESSED_MAINFORK(reln, forknum); + compressed = KPHC_IS_COMPRESSED_MAINFORK(reln, forknum); if (compressed) { nbytes = (int)KPHCCfsWritePage(reln, forknum, blocknum, buffer, skipFsync, COMMON_STORAGE); } else { -- Gitee From 7c144405fc8335b227f31075b84344cf24c2bd4a Mon Sep 17 00:00:00 2001 From: chenlinfeng <723609220@qq.com> Date: Tue, 21 Jan 2025 09:36:49 +0800 Subject: [PATCH 11/11] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=AE=BE=E7=BD=AE?= =?UTF-8?q?=E5=8E=8B=E7=BC=A9=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/include/storage/page_compression.h | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/include/storage/page_compression.h b/src/include/storage/page_compression.h index a435f79..25598e9 100644 --- a/src/include/storage/page_compression.h +++ b/src/include/storage/page_compression.h @@ -81,19 +81,19 @@ typedef struct HeapPageCompressData { do { \ (node).opt = 0; \ (node).opt = (node).opt << gCmpBitStruct[KPHC_CMP_BYTE_CONVERT_INDEX].bitLen; \ - (node).opt = (byteConvert)&gCmpBitStruct[KPHC_CMP_BYTE_CONVERT_INDEX].mask; \ + (node).opt += (byteConvert)&gCmpBitStruct[KPHC_CMP_BYTE_CONVERT_INDEX].mask; \ (node).opt = (node).opt << gCmpBitStruct[KPHC_CMP_DIFF_CONVERT_INDEX].bitLen; \ - (node).opt = (diffConvert)&gCmpBitStruct[KPHC_CMP_DIFF_CONVERT_INDEX].mask; \ + (node).opt += (diffConvert)&gCmpBitStruct[KPHC_CMP_DIFF_CONVERT_INDEX].mask; \ (node).opt = (node).opt << gCmpBitStruct[KPHC_CMP_PRE_CHUNK_INDEX].bitLen; \ - (node).opt = (preChunks)&gCmpBitStruct[KPHC_CMP_PRE_CHUNK_INDEX].mask; \ + (node).opt += (preChunks)&gCmpBitStruct[KPHC_CMP_PRE_CHUNK_INDEX].mask; \ (node).opt = (node).opt << gCmpBitStruct[KPHC_CMP_COMPERSS_LEVEL_SYMBOL_INDEX].bitLen; \ - (node).opt = (symbol)&gCmpBitStruct[KPHC_CMP_COMPERSS_LEVEL_SYMBOL_INDEX].mask; \ + (node).opt += (symbol)&gCmpBitStruct[KPHC_CMP_COMPERSS_LEVEL_SYMBOL_INDEX].mask; \ (node).opt = (node).opt << gCmpBitStruct[KPHC_CMP_LEVEL_INDEX].bitLen; \ - (node).opt = (level)&gCmpBitStruct[KPHC_CMP_LEVEL_INDEX].mask; \ + (node).opt += (level)&gCmpBitStruct[KPHC_CMP_LEVEL_INDEX].mask; \ (node).opt = (node).opt << gCmpBitStruct[KPHC_CMP_ALGORITHM_INDEX].bitLen; \ - (node).opt = (algorithm)&gCmpBitStruct[KPHC_CMP_ALGORITHM_INDEX].mask; \ + (node).opt += (algorithm)&gCmpBitStruct[KPHC_CMP_ALGORITHM_INDEX].mask; \ (node).opt = (node).opt << gCmpBitStruct[KPHC_CMP_CHUNK_SIZE_INDEX].bitLen; \ - (node).opt = (chunkSize)&gCmpBitStruct[KPHC_CMP_CHUNK_SIZE_INDEX].mask; \ + (node).opt += (chunkSize)&gCmpBitStruct[KPHC_CMP_CHUNK_SIZE_INDEX].mask; \ } while (0) #define SUPPORT_COMPRESSED(relKind, relam) \ -- Gitee