diff --git a/src/backend/storage/page/page_compression.c b/src/backend/storage/page/page_compression.c index 45e39ef694e7de42551229a075545c35e5e0e753..bc026dffab12220659ebad84d2952dd8a28bbdd8 100644 --- a/src/backend/storage/page/page_compression.c +++ b/src/backend/storage/page/page_compression.c @@ -1,13 +1,27 @@ -#include "postgres.h" +// #define USE_ZSTD +#include +#include "postgres.h" #include "storage/page_compression.h" #include "utils/rel.h" +#include "utils/wait_event.h" +#include "storage/fd.h" +#include "storage/bufpage.h" + const uint32 INDEX_OF_HALF_BLCKSZ = 0; const uint32 INDEX_OF_QUARTER_BLCKSZ = 1; const uint32 INDEX_OF_EIGHTH_BLCKSZ = 2; const uint32 INDEX_OF_SIXTEENTHS_BLCKSZ = 3; +const int KPHC_CFS_EXTENT_SIZE = 129; +const int KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT = KPHC_CFS_EXTENT_SIZE - 1; +const int KPHC_CFS_EXTENT_COUNT_PER_FILE = RELSEG_SIZE / KPHC_CFS_EXTENT_SIZE; +const int KPHC_CFS_MAX_BLOCK_PER_FILE = KPHC_CFS_EXTENT_COUNT_PER_FILE * KPHC_CFS_EXTENT_SIZE; +const int KPHC_EXTENT_OPEN_FILE = 0; +const int KPHC_WRITE_BACK_OPEN_FILE = 1; +const int KPHC_EXTENT_CREATE_FILE = 2; + const KPHCCmpBitStuct gCmpBitStruct[] = {{KPHC_CMP_BYTE_CONVERT_LEN, 0x01, 15}, {KPHC_CMP_DIFF_CONVERT_LEN, 0x01, 14}, {KPHC_CMP_PRE_CHUNK_LEN, 0x07, 11}, @@ -16,6 +30,8 @@ const KPHCCmpBitStuct gCmpBitStruct[] = {{KPHC_CMP_BYTE_CONVERT_LEN, 0x01, 15}, {KPHC_CMP_ALGORITHM_LEN, 0x07, 2}, {KPHC_CMP_CHUNK_SIZE_LEN, 0x03, 0}}; +const uint32 CHUNK_SIZE_LIST[4] = {BLCKSZ / 2, BLCKSZ / 4, BLCKSZ / 8, BLCKSZ / 16}; + uint8 ConvertChunkSize(uint32 compressedChunkSize, bool *success) { uint8 chunkSize = INDEX_OF_HALF_BLCKSZ; @@ -40,6 +56,26 @@ uint8 ConvertChunkSize(uint32 compressedChunkSize, bool *success) return chunkSize; } +inline size_t CompressReservedLen(const char* page) +{ + size_t length = offsetof(HeapPageCompressData, page_header) - offsetof(HeapPageCompressData, data); + return GetPageHeaderSize(page) + length; // 宏不完整 +} + +int CompressPageBufferBound(const char* page, uint8 algorithm) +{ + switch (algorithm) { + case COMPRESS_ALGORITHM_PGLZ: + return BLCKSZ * 2; + case COMPRESS_ALGORITHM_ZSTD: + return ZSTD_compressBound(BLCKSZ - CompressReservedLen(page)); + case COMPRESS_ALGORITHM_PGZSTD: + return BLCKSZ + 4; + default: + return -1; + } +} + void SetupPageCompressForRelation(RelFileNode *node, PageCompressOpts *compressOptions, const char *relationName) { uint32 algorithm = (uint32)compressOptions->compressType; @@ -75,4 +111,177 @@ void SetupPageCompressForRelation(RelFileNode *node, PageCompressOpts *compressO (int)compressOptions->compressDiffConvert, preallocChunks, (int)symbol, compressLevel, algorithm, chunkSize); } -} \ No newline at end of file +} + +inline size_t SizeOfExtentAddress(uint16 chunkSize) +{ + if (chunkSize == 0) { + return -1; + } + return offsetof(KPHCCfsExtentAddress, chunknos) + sizeof(uint16) * BLCKSZ / chunkSize; +} + +inline off_t OffsetOfPageCompressChunk(uint16 chunkSize, int chunkNo) +{ + return chunkSize * (chunkNo - 1); +} + +inline size_t SizeOfExtentAddressByChunks(uint8 nChunks) +{ + return offsetof(KPHCCfsExtentAddress, chunknos) + sizeof(uint16) * nChunks; +} + +// 获取extent地址 +inline KPHCCfsExtentAddress *GetExtentAddress(KPHCCfsExtentHeader *header, uint16 blockOffset) +{ + uint16 chunkSize = header->chunk_size; + size_t headerOffset = offsetof(KPHCCfsExtentHeader, cfsExtentAddress); + size_t sizeOfExtentAddress = SizeOfExtentAddress(chunkSize); + return (KPHCCfsExtentAddress *)(((char *)header) + headerOffset + blockOffset * sizeOfExtentAddress); +} + +uint32 AddrChecksum32(const KPHCCfsExtentAddress *cfsExtentAddress, const int needChunks) +{ + const size_t uintLen = sizeof(uint32); + uint32 checkSum = 0; + char *addr = ((char *) cfsExtentAddress) + sizeof(uint32); + size_t len = SizeOfExtentAddressByChunks((uint8)needChunks) - sizeof(uint32); + do { + if (len >= uintLen) { + checkSum += *((uint32*)(void*)addr); + addr += uintLen; + len -= uintLen; + } else { + char finalNum[uintLen]; + memset(finalNum, 0, sizeof(finalNum)); + size_t i = 0; + for(; i < len; ++i) { + finalNum[i] = addr[i]; + } + checkSum += *((uint32 *)finalNum); + len -= i; + } + } while (len); + return checkSum; +} + +static inline bool ExtendChunksOfBlockCore(KPHCCfsExtentHeader *cfsExtentHeader, KPHCCfsExtentAddress *cfsExtentAddress, + uint8 needChunks, uint8 actualUse) +{ + bool res = false; + if (cfsExtentAddress->allocated_chunks < needChunks) { + cfsExtentHeader->recycleInOrder = 0; + uint8 allocateNumber = needChunks - cfsExtentAddress->allocated_chunks; + uint32 chunkno = (pc_chunk_number_t)pg_atomic_fetch_add_u32(&cfsExtentHeader->allocated_chunks, (uint32)allocateNumber) + 1; + for (int i = cfsExtentAddress->allocated_chunks; i < needChunks; ++i, ++chunkno) { + cfsExtentAddress->chunknos[i] = (uint16)chunkno; + } + cfsExtentAddress->allocated_chunks = needChunks; + res = true; + } + if (cfsExtentAddress->nchunks != actualUse) { + cfsExtentAddress->nchunks = actualUse; + res = true; + } + uint32 cksm = AddrChecksum32(cfsExtentAddress, cfsExtentAddress->allocated_chunks); + if (cfsExtentAddress->checksum != cksm) { + cfsExtentAddress->checksum = cksm + res = true; + } + return res; +} + +static inline bool ExtendChunksOfBlock(KPHCCfsExtentHeader *cfsExtentHeader, KPHCExtentLocation *location, + uint8 needChunks, uint8 actualUse) +{ + KPHCCfsExtentAddress *cfsExtentAddress = GetExtentAddress(cfsExtentHeader, (uint16)location->extentOffset); + return ExtendChunksOfBlockCore(cfsExtentHeader, cfsExtentAddress, needChunks, actualUse); +} + +char *KPHCCfsCompressPage(const char *buffer, KPHCRelFileCompressOption *option, uint8 *nchunks) +{ + uint8 algorithm = option->compressAlgorithm; + uint32 chunkSize = CHUNK_SIZE_LIST[option->compressChunkSize]; + int32 workBufferSize = CompressPageBufferBound(buffer, algorithm); + int8 level = option->compressLevelSymbol ? option->compressLevel : -option->compressLevel; + uint8 preallocChunk = option->compressPreallocChunks; + + if (workBufferSize < 0) { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("mdwrite algorithm:%d, chunkSize:%d, level:%d, preallocChunk:%d", + (int)algorithm, (int)chunkSize, level, (int)preallocChunk))); + } + + char *workBuffer = (char*)palloc((unsigned long)workBufferSize); + int compressBufferSize = CompressPage(buffer, workBuffer, workBufferSize, *option); + if (compressBufferSize < 0) { + compressBufferSize = BLCKSZ; + } + *nchunks = (uint8)((unsigned int)(compressBufferSize - 1) / chunkSize + 1); + uint32 bufferSize = chunkSize * (*nchunks); + if (bufferSize >= BLCKSZ) { + pfree(workBuffer); + workBuffer = (char*)buffer; + (*nchunks) = (uint8)(BLCKSZ / chunkSize); + } else { + if ((uint32)compressBufferSize < bufferSize) { + uint32 leftSize = bufferSize - (uint32)compressBufferSize; + errno_t rc = memset_s(workBuffer + compressBufferSize, leftSize, 0, leftSize); + } + } + + return workBuffer; +} + +size_t KPHCCfsWritePage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, bool sipSync, KPHC_CFS_STORAGE_TYPE type) +{ + KPHCExtentLocation location = cfsLocationConverts[type](reln, forknum, logicBlockNumber, sipSync, KPHC_EXTENT_OPEN_FILE); + KPHCPciPageCtrl *ctrl = KPHCPciBufReadPage(location, LW_SHARED, PCI_BUF_NORMAL_READ); + KPHCCfsExtentHeader *cfsExtentHeader = ctrl->pciPage; + KPHCCfsExtentAddress *cfsExtentAddress = GetExtentAddress(cfsExtentAddress, (uint16)location.extentOffset); + + // 获取压缩参数 + KPHCRelFileCompressOption option; + KPHCAnalyzeCompressOptions(reln->SMgrRelation.node, &option); + uint16 chunkSize = cfsExtentHeader->chunk_size; + + // 执行页压缩 + uint8 nchunks; + char *compressedBuffer = KPHCCfsCompressPage(buffer, &option, &nchunks); + + // 设置地址 + uint8 needChunks = option.compressPreallocChunks > nchunks ? (uint8)option.compressPreallocChunks : (uint8)nchunks; + bool changed = ExtendChunksOfBlock(cfsExtentHeader, &location, needChunks, nchunks); + + off_t extentStartOffset = location.extentStart * BLCKSZ; + + // 写入每个chunk + for (uint8 i = 0; i < nchunks; ++i) { + uint8 bufferPos = compressedBuffer + (long)chunkSize * i; + off_t seekPos = OffsetOfPageCompressChunk(chunkSize, cfsExtentAddress->chunknos[i]); + if (cfsExtentAddress->chunknos[i] > ((BLCKSZ / chunkSize) * KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT)) { + *((uint32 *)nullptr) = 1; // ??? + } + uint8 start = i; + while (i < nchunks - 1 && cfsExtentAddress->chunknos[i + 1] == cfsExtentAddress->chunknos[i] + 1) { + i++; + } + int writeAmount = chunkSize * ((i - start) + 1); + if ((seekPos + extentStartOffset) > (((BlockNumber)RELSEG_SIZE) * BLCKSZ)) { + *((uint32 *)nullptr) = 1; // ??? + } + int nbytes = FilePWrite(location.fd, bufferPos, writeAmount, seekPos + extentStartOffset, (uint32)WAIT_EVENT_DATA_FILE_WRITE); + if (nbytes != writeAmount) { + if (compressedBuffer != nullptr && compressedBuffer != buffer) { + pfree(compressedBuffer); + } + KPHCPciBufFreePage(ctrl, location, false); + } + } + + if (compressedBuffer != nullptr && compressedBuffer != buffer) { + pfree(compressedBuffer); + } + KPHCPciBufFreePage(ctrl, location, false); + return BLCKSZ; +} diff --git a/src/include/storage/page_compression.h b/src/include/storage/page_compression.h index 8513504bb1276a79b0dc9642761d812b9b102a90..b9cd905715c2a17259de8edec52d034fb39876ba 100644 --- a/src/include/storage/page_compression.h +++ b/src/include/storage/page_compression.h @@ -21,6 +21,12 @@ #define KPHC_CMP_ALGORITHM_INDEX 5 #define KPHC_CMP_CHUNK_SIZE_INDEX 6 +#define COMPRESS_ALGORITHM_PGLZ 1 +#define COMPRESS_ALGORITHM_ZSTD 2 +#define COMPRESS_ALGORITHM_PGZSTD 3 + +typedef uint32 pc_chunk_number_t; + typedef struct CmpBitStuct { const unsigned int bitLen; const unsigned int mask; @@ -66,10 +72,20 @@ extern const uint32 INDEX_OF_QUARTER_BLCKSZ; extern const uint32 INDEX_OF_EIGHTH_BLCKSZ; extern const uint32 INDEX_OF_SIXTEENTHS_BLCKSZ; +extern const int KPHC_CFS_EXTENT_SIZE; +extern const int KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT; +extern const int KPHC_CFS_EXTENT_COUNT_PER_FILE; +extern const int KPHC_CFS_MAX_BLOCK_PER_FILE; +extern const int KPHC_EXTENT_OPEN_FILE; +extern const int KPHC_WRITE_BACK_OPEN_FILE; +extern const int KPHC_EXTENT_CREATE_FILE; + extern const KPHCCmpBitStuct gCmpBitStruct[]; uint8 ConvertChunkSize(uint32 compressedChunkSize, bool *success); void SetupPageCompressForRelation(RelFileNode *node, PageCompressOpts *compress_options, const char *relationName); +size_t KPHCCfsWritePage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, bool sipSync, KPHC_CFS_STORAGE_TYPE type); + #endif // PAGE_COMPRESSION_H \ No newline at end of file