diff --git a/.gitignore b/.gitignore index 1c0f3e5e3512288eaaa63c4f402421f86c5c4025..fa2aaa171ecbe04b7c00e9f6c4e5653fcb5ecb85 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,5 @@ lib*.pc /Release/ /tmp_install/ /portlock/ +.vscode/settings.json +.vscode/Sftp.json diff --git a/src/backend/storage/page/page_compression.c b/src/backend/storage/page/page_compression.c index 45e39ef694e7de42551229a075545c35e5e0e753..5636c766d6cddc6ab6ed950bb2b2827db8de7f4f 100644 --- a/src/backend/storage/page/page_compression.c +++ b/src/backend/storage/page/page_compression.c @@ -1,13 +1,28 @@ -#include "postgres.h" +// #define USE_ZSTD +// #include +#include "postgres.h" +#include "storage/fd.h" +#include "storage/bufpage.h" +#include "storage/cfs_buffers.h" #include "storage/page_compression.h" #include "utils/rel.h" +#include "utils/wait_event.h" +#include "common/pg_lzcompress.h" + +extern KPHCCfsLocationConvert cfsLocationConverts[]; + +uint32 AddrChecksum32(const KPHCCfsExtentAddress *cfsExtentAddress, const int needChunks); const uint32 INDEX_OF_HALF_BLCKSZ = 0; const uint32 INDEX_OF_QUARTER_BLCKSZ = 1; const uint32 INDEX_OF_EIGHTH_BLCKSZ = 2; const uint32 INDEX_OF_SIXTEENTHS_BLCKSZ = 3; +const int KPHC_EXTENT_OPEN_FILE = 0; +const int KPHC_WRITE_BACK_OPEN_FILE = 1; +const int KPHC_EXTENT_CREATE_FILE = 2; + const KPHCCmpBitStuct gCmpBitStruct[] = {{KPHC_CMP_BYTE_CONVERT_LEN, 0x01, 15}, {KPHC_CMP_DIFF_CONVERT_LEN, 0x01, 14}, {KPHC_CMP_PRE_CHUNK_LEN, 0x07, 11}, @@ -16,6 +31,8 @@ const KPHCCmpBitStuct gCmpBitStruct[] = {{KPHC_CMP_BYTE_CONVERT_LEN, 0x01, 15}, {KPHC_CMP_ALGORITHM_LEN, 0x07, 2}, {KPHC_CMP_CHUNK_SIZE_LEN, 0x03, 0}}; +const uint32 CHUNK_SIZE_LIST[4] = {BLCKSZ / 2, BLCKSZ / 4, BLCKSZ / 8, BLCKSZ / 16}; + uint8 ConvertChunkSize(uint32 compressedChunkSize, bool *success) { uint8 chunkSize = INDEX_OF_HALF_BLCKSZ; @@ -40,6 +57,26 @@ uint8 ConvertChunkSize(uint32 compressedChunkSize, bool *success) return chunkSize; } +inline size_t CompressReservedLen(const char* page) +{ + size_t length = offsetof(KPHCHeapPageCompressData, page_header) - offsetof(KPHCHeapPageCompressData, data); + return GetPageHeaderSize(page) + length; // 宏不完整 +} + +inline int CompressPageBufferBound(const char* page, uint8 algorithm) +{ + switch (algorithm) { + case COMPRESS_ALGORITHM_PGLZ: + return BLCKSZ * 2; + case COMPRESS_ALGORITHM_ZSTD: + return ZSTD_compressBound(BLCKSZ - CompressReservedLen(page)); + case COMPRESS_ALGORITHM_PGZSTD: + return BLCKSZ + 4; + default: + return -1; + } +} + void SetupPageCompressForRelation(RelFileNode *node, PageCompressOpts *compressOptions, const char *relationName) { uint32 algorithm = (uint32)compressOptions->compressType; @@ -75,4 +112,261 @@ void SetupPageCompressForRelation(RelFileNode *node, PageCompressOpts *compressO (int)compressOptions->compressDiffConvert, preallocChunks, (int)symbol, compressLevel, algorithm, chunkSize); } -} \ No newline at end of file +} + +inline size_t SizeOfExtentAddress(uint16 chunkSize) +{ + if (chunkSize == 0) { + return -1; + } + return offsetof(KPHCCfsExtentAddress, chunknos) + sizeof(uint16) * BLCKSZ / chunkSize; +} + +inline off_t OffsetOfPageCompressChunk(uint16 chunkSize, int chunkNo) +{ + return chunkSize * (chunkNo - 1); +} + +inline size_t SizeOfExtentAddressByChunks(uint8 nChunks) +{ + return offsetof(KPHCCfsExtentAddress, chunknos) + sizeof(uint16) * nChunks; +} + +// 获取extent地址 +inline KPHCCfsExtentAddress *GetExtentAddress(KPHCCfsExtentHeader *header, uint16 blockOffset) +{ + uint16 chunkSize = header->chunk_size; + size_t headerOffset = offsetof(KPHCCfsExtentHeader, cfsExtentAddress); + size_t sizeOfExtentAddress = SizeOfExtentAddress(chunkSize); + return (KPHCCfsExtentAddress *)(((char *)header) + headerOffset + blockOffset * sizeOfExtentAddress); +} + +uint32 AddrChecksum32(const KPHCCfsExtentAddress *cfsExtentAddress, const int needChunks) +{ + size_t i; + const size_t uintLen = sizeof(uint32); + uint32 checkSum = 0; + char *addr = ((char *) cfsExtentAddress) + sizeof(uint32); + size_t len = SizeOfExtentAddressByChunks((uint8)needChunks) - sizeof(uint32); + do { + if (len >= uintLen) { + checkSum += *((uint32*)(void*)addr); + addr += uintLen; + len -= uintLen; + } else { + char *finalNum = (char*)malloc(uintLen * sizeof(char)); + memset(finalNum, 0, uintLen * sizeof(char)); + i = 0; + for(; i < len; ++i) { + finalNum[i] = addr[i]; + } + checkSum += *((uint32 *)finalNum); + len -= i; + } + } while (len); + return checkSum; +} + +static inline bool ExtendChunksOfBlockCore(KPHCCfsExtentHeader *cfsExtentHeader, KPHCCfsExtentAddress *cfsExtentAddress, + uint8 needChunks, uint8 actualUse) +{ + bool res; + uint8 allocateNumber; + uint32 chunkno; + uint32 cksm; + + res = false; + if (cfsExtentAddress->allocated_chunks < needChunks) { + cfsExtentHeader->recycleInOrder = 0; + allocateNumber = needChunks - cfsExtentAddress->allocated_chunks; + chunkno = (pc_chunk_number_t)pg_atomic_fetch_add_u32(&cfsExtentHeader->allocated_chunks, (uint32)allocateNumber) + 1; + for (int i = cfsExtentAddress->allocated_chunks; i < needChunks; ++i, ++chunkno) { + cfsExtentAddress->chunknos[i] = (uint16)chunkno; + } + cfsExtentAddress->allocated_chunks = needChunks; + res = true; + } + if (cfsExtentAddress->nchunks != actualUse) { + cfsExtentAddress->nchunks = actualUse; + res = true; + } + cksm = AddrChecksum32(cfsExtentAddress, cfsExtentAddress->allocated_chunks); + if (cfsExtentAddress->checksum != cksm) { + cfsExtentAddress->checksum = cksm; + res = true; + } + return res; +} + +static inline bool ExtendChunksOfBlock(KPHCCfsExtentHeader *cfsExtentHeader, KPHCExtentLocation *location, + uint8 needChunks, uint8 actualUse) +{ + KPHCCfsExtentAddress *cfsExtentAddress = GetExtentAddress(cfsExtentHeader, (uint16)location->extentOffset); + return ExtendChunksOfBlockCore(cfsExtentHeader, cfsExtentAddress, needChunks, actualUse); +} + +int KPHCCompressPage(const char* src, char* dst, int dst_size, KPHCRelFileCompressOption option) +{ + int compressed_size; + int8 level = option.compressLevelSymbol ? option.compressLevel : -option.compressLevel; + size_t sizeOfHeaderData = SizeOfPageHeaderData; + bool real_ByteConvert = false; + bool success; + + char* data = ((KPHCPageCompressData*)dst)->data; + PageHeaderData *page = (PageHeaderData *)src; + bool heapPageData = page->pd_special == BLCKSZ; + switch (option.compressAlgorithm) { + case COMPRESS_ALGORITHM_PGLZ: { + success = KPHCLzCompress(src + sizeOfHeaderData, BLCKSZ - sizeOfHeaderData, (KPHC_PGLZ_Header *)data, heapPageData ? PGLZ_strategy_default : PGLZ_strategy_always); + compressed_size = success ? VARSIZE(data) : BLCKSZ; + compressed_size = compressed_size < BLCKSZ ? compressed_size : BLCKSZ; + break; + } + case COMPRESS_ALGORITHM_ZSTD: { + if (level == 0 || level < MIN_ZSTD_COMPRESSION_LEVEL || level > MAX_ZSTD_COMPRESSION_LEVEL) { + level = DEFAULT_ZSTD_COMPRESSION_LEVEL; + } +#ifndef FRONTEND + compressed_size = ZSTD_compress(data, dst_size, src + sizeOfHeaderData, BLCKSZ - sizeOfHeaderData, level); +#else + if (real_ByteConvert) { + compressed_size = + ZSTD_compress(data, dst_size, src_copy + sizeOfHeaderData, BLCKSZ - sizeOfHeaderData, level); + } else { + compressed_size = + ZSTD_compress(data, dst_size, src + sizeOfHeaderData, BLCKSZ - sizeOfHeaderData, level); + } +#endif + if (ZSTD_isError(compressed_size)) { + return -1; + } + break; + } + default: + return KPHC_COMPRESS_UNSUPPORTED_ERROR; + } + + if ((compressed_size < 0) || ((offsetof(KPHCPageCompressData, data) + compressed_size) >= BLCKSZ)) { + return -1; + } + + KPHCPageCompressData* pcdptr = ((KPHCPageCompressData*)dst); + memcpy_s(pcdptr->page_header, sizeOfHeaderData, src, sizeOfHeaderData); + pcdptr->size = compressed_size; + pcdptr->crc32 = DataBlockChecksum(data, compressed_size, true); + pcdptr->byte_convert = real_ByteConvert; + pcdptr->diff_convert = option.diffConvert; + pcdptr->algorithm = option.compressAlgorithm; + + return offsetof(KPHCPageCompressData, data) + compressed_size; +} + +char *KPHCCfsCompressPage(const char *buffer, KPHCRelFileCompressOption *option, uint8 *nchunks) +{ + uint8 algorithm = option->compressAlgorithm; + uint32 chunkSize = CHUNK_SIZE_LIST[option->compressChunkSize]; + int32 workBufferSize = CompressPageBufferBound(buffer, algorithm); + int8 level = option->compressLevelSymbol ? option->compressLevel : -option->compressLevel; + uint8 preallocChunk = option->compressPreallocChunks; + char *workBuffer; + int compressBufferSize; + + if (workBufferSize < 0) { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("mdwrite algorithm:%d, chunkSize:%d, level:%d, preallocChunk:%d", + (int)algorithm, (int)chunkSize, level, (int)preallocChunk))); + } + + workBuffer = (char*)palloc((unsigned long)workBufferSize); + compressBufferSize = KPHCCompressPage(buffer, workBuffer, workBufferSize, *option); + if (compressBufferSize < 0) { + compressBufferSize = BLCKSZ; + } + *nchunks = (uint8)((unsigned int)(compressBufferSize - 1) / chunkSize + 1); + uint32 bufferSize = chunkSize * (*nchunks); + if (bufferSize >= BLCKSZ) { + pfree(workBuffer); + workBuffer = (char*)buffer; + (*nchunks) = (uint8)(BLCKSZ / chunkSize); + } else { + if ((uint32)compressBufferSize < bufferSize) { + uint32 leftSize = bufferSize - (uint32)compressBufferSize; + memset_s(workBuffer + compressBufferSize, leftSize, 0, leftSize); + } + } + + return workBuffer; +} + +void KPHCAnalyzeCompressOptions(const RelFileNode node, KPHCRelFileCompressOption* opt) +{ + unsigned short compressOption = node.opt; + opt->compressChunkSize = compressOption & gCmpBitStruct[KPHC_CMP_CHUNK_SIZE_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_CHUNK_SIZE_INDEX].bitLen; + opt->compressAlgorithm = compressOption & gCmpBitStruct[KPHC_CMP_ALGORITHM_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_ALGORITHM_INDEX].bitLen; + opt->compressLevel = compressOption & gCmpBitStruct[KPHC_CMP_LEVEL_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_LEVEL_INDEX].bitLen; + opt->compressLevelSymbol = compressOption & gCmpBitStruct[KPHC_CMP_COMPERSS_LEVEL_SYMBOL_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_COMPERSS_LEVEL_SYMBOL_INDEX].bitLen; + opt->compressPreallocChunks = compressOption & gCmpBitStruct[KPHC_CMP_PRE_CHUNK_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_PRE_CHUNK_INDEX].bitLen; + opt->diffConvert = compressOption & gCmpBitStruct[KPHC_CMP_DIFF_CONVERT_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_DIFF_CONVERT_INDEX].bitLen; + opt->byteConvert = compressOption & gCmpBitStruct[KPHC_CMP_BYTE_CONVERT_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_BYTE_CONVERT_INDEX].bitLen; +} + +size_t KPHCCfsWritePage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, bool sipSync, KPHC_CFS_STORAGE_TYPE type) +{ + KPHCExtentLocation location = cfsLocationConverts[type](reln, forknum, logicBlockNumber, sipSync, KPHC_EXTENT_OPEN_FILE); + KPHCPciPageCtrl *ctrl = KPHCPciBufReadPage(location, LW_SHARED, PCI_BUF_NORMAL_READ); + KPHCCfsExtentHeader *cfsExtentHeader = ctrl->pciPage; + KPHCCfsExtentAddress *cfsExtentAddress = GetExtentAddress(cfsExtentAddress, (uint16)location.extentOffset); + + // 获取压缩参数 + KPHCRelFileCompressOption option; + KPHCAnalyzeCompressOptions(reln->smgr_rnode.node, &option); + uint16 chunkSize = cfsExtentHeader->chunk_size; + + // 执行页压缩 + uint8 nchunks; + char *compressedBuffer = KPHCCfsCompressPage(buffer, &option, &nchunks); + + // 设置地址 + uint8 needChunks = option.compressPreallocChunks > nchunks ? (uint8)option.compressPreallocChunks : (uint8)nchunks; + bool changed = ExtendChunksOfBlock(cfsExtentHeader, &location, needChunks, nchunks); + + off_t extentStartOffset = location.extentStart * BLCKSZ; + + // 写入每个chunk + for (uint8 i = 0; i < nchunks; ++i) { + uint8 bufferPos = compressedBuffer + (long)chunkSize * i; + off_t seekPos = OffsetOfPageCompressChunk(chunkSize, cfsExtentAddress->chunknos[i]); + if (cfsExtentAddress->chunknos[i] > ((BLCKSZ / chunkSize) * KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT)) { + *((uint32 *)NULL) = 1; // ??? + } + uint8 start = i; + while (i < nchunks - 1 && cfsExtentAddress->chunknos[i + 1] == cfsExtentAddress->chunknos[i] + 1) { + i++; + } + int writeAmount = chunkSize * ((i - start) + 1); + if ((seekPos + extentStartOffset) > (((BlockNumber)RELSEG_SIZE) * BLCKSZ)) { + *((uint32 *)NULL) = 1; // ??? + } + int nbytes = FilePWrite(location.fd, bufferPos, writeAmount, seekPos + extentStartOffset, (uint32)WAIT_EVENT_DATA_FILE_WRITE); + if (nbytes != writeAmount) { + if (compressedBuffer != NULL && compressedBuffer != buffer) { + pfree(compressedBuffer); + } + KPHCPciBufFreePage(ctrl, location, false); + } + } + + if (compressedBuffer != NULL && compressedBuffer != buffer) { + pfree(compressedBuffer); + } + KPHCPciBufFreePage(ctrl, location, false); + return BLCKSZ; +} diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index adb18fb7427247dd47eca2801d9624f25103b7b7..2a18c995f213001254d0bcfdb480b6d75f2ae7f9 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -43,6 +43,7 @@ #include "storage/cfs_buffers.h" #include "storage/page_compression.h" #include "common/pg_lzcompress.h" + /* * The magnetic disk storage manager keeps track of open file * descriptors in its own descriptor pool. This is done to make it @@ -669,61 +670,17 @@ mdwriteback(SMgrRelation reln, ForkNumber forknum, } } -typedef size_t KPHC_CFS_STORAGE_TYPE; -const unsigned KPHC_CMP_COMPRESS_LEVEL_SYMBOL = 3; -const int KPHC_CFS_EXTENT_SIZE = 129; -const uint32 CHUNK_SIZE_LIST[4] = {BLCKSZ / 2, BLCKSZ / 4, BLCKSZ / 8, BLCKSZ / 16}; -const int KPHC_EXTENT_OPEN_FILE = 0; -const int KPHC_WRITE_BACK_OPEN_FILE = 1; -const int KPHC_EXTENT_CREATE_FILE = 2; const KPHC_CFS_STORAGE_TYPE COMMON_STORAGE = 0; -#define KPHC_COMPRESS_ALGORITHM_PGLZ 1 -#define KPHC_COMPRESS_ALGORITHM_ZSTD 2 -#define KPHC_COMPRESS_UNSUPPORTED_ERROR (-2) -#define KPHC_IS_COMPRESSED_MAINFORK(reln, forkNum) ((reln)->smgr_rnode.node.opt != 0 && (forkNum) == MAIN_FORKNUM) -typedef struct PageCompressData { - char page_header[SizeOfPageHeaderData]; - uint32 crc32; - uint32 size : 16; - uint32 byte_convert : 1; - uint32 diff_convert : 1; - uint32 algorithm : 4; - uint32 unused : 10; - char data[FLEXIBLE_ARRAY_MEMBER]; -} KPHCPageCompressData; enum KPHC_SMGR_READ_STATUS { SMGR_RD_OK = 0, SMGR_RD_NO_BLOCK = 1, SMGR_RD_CRC_ERROR = 2 }; -typedef KPHCExtentLocation (*KPHCCfsLocationConvert)(SMgrRelation sRel, ForkNumber forknum, BlockNumber - logicBlockNumber, bool skipSync, int type); -inline void KPHCAnalyzeCompressOptions(const RelFileNode node, KPHCRelFileCompressOption* opt); -KPHCExtentLocation KPHCStorageConvert(SMgrRelation sRel, ForkNumber forknum, BlockNumber logicBlockNumber, bool skipSync, - int type); -int KPHCDecompressPage(const char* src, char* dst); -int KPHCCfsReadPage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, - KPHC_CFS_STORAGE_TYPE type); -inline void KPHCAnalyzeCompressOptions(const RelFileNode node, KPHCRelFileCompressOption* opt) -{ - unsigned short compressOption = node.opt; - opt->compressChunkSize = compressOption & gCmpBitStruct[KPHC_CMP_CHUNK_SIZE_INDEX].mask; - compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_CHUNK_SIZE_INDEX].bitLen; - opt->compressAlgorithm = compressOption & gCmpBitStruct[KPHC_CMP_ALGORITHM_INDEX].mask; - compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_ALGORITHM_INDEX].bitLen; - opt->compressLevel = compressOption & gCmpBitStruct[KPHC_CMP_LEVEL_INDEX].mask; - compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_LEVEL_INDEX].bitLen; - opt->compressLevelSymbol = compressOption & gCmpBitStruct[KPHC_CMP_COMPRESS_LEVEL_SYMBOL].mask; - compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_COMPRESS_LEVEL_SYMBOL].bitLen; - opt->compressPreallocChunks = compressOption & gCmpBitStruct[KPHC_CMP_PRE_CHUNK_INDEX].mask; - compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_PRE_CHUNK_INDEX].bitLen; - opt->diffConvert = compressOption & gCmpBitStruct[KPHC_CMP_DIFF_CONVERT_INDEX].mask; - compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_DIFF_CONVERT_INDEX].bitLen; - opt->byteConvert = compressOption & gCmpBitStruct[KPHC_CMP_BYTE_CONVERT_INDEX].mask; - compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_BYTE_CONVERT_INDEX].bitLen; -} +KPHCExtentLocation KPHCStorageConvert(SMgrRelation sRel, ForkNumber forknum, BlockNumber logicBlockNumber, bool skipSync, int type); +int KPHCDecompressPage(const char* src, char* dst); +int KPHCCfsReadPage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, KPHC_CFS_STORAGE_TYPE type); KPHCExtentLocation KPHCStorageConvert(SMgrRelation sRel, ForkNumber forknum, BlockNumber logicBlockNumber, bool skipSync, int type) @@ -769,6 +726,7 @@ KPHCCfsLocationConvert cfsLocationConverts[2] = { KPHCStorageConvert, NULL }; + inline size_t SizeOfKPHCExtentAddress(uint16 chunkSize) { if (chunkSize == 0) { return -1; @@ -784,6 +742,7 @@ inline KPHCCfsExtentAddress *KPHCGetExtentAddress(KPHCCfsExtentHeader *header, u size_t sizeOfKPHCExtentAddress = SizeOfKPHCExtentAddress(chunkSize); return (KPHCCfsExtentAddress *) (((char *) header) + headerOffset + blockOffset * sizeOfKPHCExtentAddress); } + int KPHCDecompressPage(const char* src, char* dst) { int decompressed_size; @@ -823,6 +782,7 @@ int KPHCDecompressPage(const char* src, char* dst) } return headerSize + decompressed_size; } + int KPHCCfsReadPage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, KPHC_CFS_STORAGE_TYPE type) { diff --git a/src/common/pg_lzcompress.c b/src/common/pg_lzcompress.c index 5667802c763c84c3de123128841fb6e20eb84af9..1cfa74bfe218db1dec3537df7e75f1b494af263e 100644 --- a/src/common/pg_lzcompress.c +++ b/src/common/pg_lzcompress.c @@ -386,6 +386,8 @@ do { \ } \ } while (0) +#define HIST_START_LEN (sizeof(PGLZ_HistEntry*) * PGLZ_HISTORY_LISTS) +#define HIST_ENTRIES_LEN (sizeof(PGLZ_HistEntry) * PGLZ_HISTORY_SIZE) /* ---------- * pglz_find_match - @@ -932,4 +934,149 @@ int32 KPHCLzDecompress(const KPHC_PGLZ_Header* source, char* dest) #endif } return source->rawsize; +} + +bool KPHCLzCompress(const char* source, int32 slen, KPHC_PGLZ_Header* dest, const PGLZ_Strategy* strategy) +{ + unsigned char* bp = ((unsigned char*)dest) + sizeof(KPHC_PGLZ_Header); + unsigned char* bstart = bp; + int hist_next = 0; + bool hist_recycle = false; + const char* dp = source; + const char* dend = source + slen; + unsigned char ctrl_dummy = 0; + unsigned char* ctrlp = &ctrl_dummy; + unsigned char ctrlb = 0; + unsigned char ctrl = 0; + bool found_match = false; + int32 match_len; + int32 match_off; + int32 good_match; + int32 good_drop; + int32 result_size; + int32 result_max; + int32 need_rate; + + /* + * Our fallback strategy is the default. + */ + if (strategy == NULL) + strategy = PGLZ_strategy_default; + + /* + * If the strategy forbids compression (at all or if source chunk size out + * of range), fail. + */ + if (strategy->match_size_good <= 0 || slen < strategy->min_input_size || slen > strategy->max_input_size) + return false; + + /* + * Save the original source size in the header. + */ + dest->rawsize = slen; + + /* + * Limit the match parameters to the supported range. + */ + good_match = strategy->match_size_good; + if (good_match > PGLZ_MAX_MATCH) + good_match = PGLZ_MAX_MATCH; + else if (good_match < 17) + good_match = 17; + + good_drop = strategy->match_size_drop; + if (good_drop < 0) + good_drop = 0; + else if (good_drop > 100) + good_drop = 100; + + need_rate = strategy->min_comp_rate; + if (need_rate < 0) + need_rate = 0; + else if (need_rate > 99) + need_rate = 99; + + /* + * Compute the maximum result size allowed by the strategy, namely the + * input size minus the minimum wanted compression rate. This had better + * be <= slen, else we might overrun the provided output buffer. + */ + if (slen > (INT_MAX / 100)) { + /* Approximate to avoid overflow */ + result_max = (slen / 100) * (100 - need_rate); + } else + result_max = (slen * (100 - need_rate)) / 100; + + /* + * Initialize the history lists to empty. We do not need to zero the + * u_sess->utils_cxt.hist_entries[] array; its entries are initialized as they are used. + */ + memset_s(hist_start, HIST_START_LEN, 0, HIST_START_LEN); + + /* + * Compress the source directly into the output buffer. + */ + while (dp < dend) { + /* + * If we already exceeded the maximum result size, fail. + * + * We check once per loop; since the loop body could emit as many as 4 + * bytes (a control byte and 3-byte tag), PGLZ_MAX_OUTPUT() had better + * allow 4 slop bytes. + */ + if (bp - bstart >= result_max) + return false; + + /* + * If we've emitted more than first_success_by bytes without finding + * anything compressible at all, fail. This lets us fall out + * reasonably quickly when looking at incompressible input (such as + * pre-compressed data). + */ + if (!found_match && bp - bstart >= strategy->first_success_by) + return false; + + /* + * Try to find a match in the history + */ + if (pglz_find_match(hist_start, dp, dend, &match_len, &match_off, good_match, good_drop, 0xffffffff)) { // mask可能有错 + /* + * Create the tag and add history entries for all matched + * characters. + */ + pglz_out_tag(ctrlp, ctrlb, ctrl, bp, match_len, match_off); + while (match_len--) { + pglz_hist_add( + hist_start, hist_entries, hist_next, hist_recycle, dp, dend, 0xffffffff); // mask可能有错 + dp++; /* Do not do this ++ in the line above! */ + /* The macro would do it four times - Jan. */ + } + found_match = true; + } else { + /* + * No match found. Copy one literal byte. + */ + pglz_out_literal(ctrlp, ctrlb, ctrl, bp, *dp); + pglz_hist_add( + hist_start, hist_entries, hist_next, hist_recycle, dp, dend, 0xffffffff); // mask可能有错 + dp++; /* Do not do this ++ in the line above! */ + /* The macro would do it four times - Jan. */ + } + } + + /* + * Write out the last control byte and check that we haven't overrun the + * output size allowed by the strategy. + */ + *ctrlp = ctrlb; + result_size = bp - bstart; + if (result_size >= result_max) + return false; + + /* + * Success - need only fill in the actual length of the compressed datum. + */ + SET_VARSIZE_COMPRESSED(dest, result_size + sizeof(KPHC_PGLZ_Header)); + + return true; } \ No newline at end of file diff --git a/src/include/c.h b/src/include/c.h index ffcdb0520e2f0a1ae345ce4c664943e135d7a57b..c7628cd2b087d04ffec4a33d1689b0e26f94eb26 100644 --- a/src/include/c.h +++ b/src/include/c.h @@ -612,6 +612,8 @@ typedef double float8; typedef Oid regproc; typedef regproc RegProcedure; +typedef uint32 ShortTransactionId; + typedef uint32 TransactionId; typedef uint32 LocalTransactionId; diff --git a/src/include/common/pg_lzcompress.h b/src/include/common/pg_lzcompress.h index fb660b85ec0221849fe696aafa36678f8b1fb56c..97e517e07bd725acef3e506ebf1895bb32c0fc0e 100644 --- a/src/include/common/pg_lzcompress.h +++ b/src/include/common/pg_lzcompress.h @@ -68,6 +68,16 @@ typedef struct PGLZ_Header { int32 vl_len_; int32 rawsize; } KPHC_PGLZ_Header; + +/* ---------- + * Local definitions + * ---------- + */ +#define PGLZ_HISTORY_LISTS 8192 /* must be power of 2 */ +#define PGLZ_HISTORY_MASK (PGLZ_HISTORY_LISTS - 1) +#define PGLZ_HISTORY_SIZE 4096 +#define PGLZ_MAX_MATCH 273 + /* ---------- * The standard strategies * @@ -90,6 +100,8 @@ extern int32 pglz_compress(const char *source, int32 slen, char *dest, const PGLZ_Strategy *strategy); extern int32 pglz_decompress(const char *source, int32 slen, char *dest, int32 rawsize, bool check_complete); + +extern bool KPHCLzCompress(const char* source, int32 slen, KPHC_PGLZ_Header* dest, const PGLZ_Strategy* strategy); extern int32 KPHCLzDecompress(const KPHC_PGLZ_Header* source, char* dest); extern int32 pglz_maximum_compressed_size(int32 rawsize, int32 total_compressed_size); diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h index e9f253f2c8ad68575e34ea826705148a65b31a03..2b4bad4d68342170251bc01aedbf833fc684bedc 100644 --- a/src/include/storage/bufpage.h +++ b/src/include/storage/bufpage.h @@ -165,6 +165,32 @@ typedef struct PageHeaderData typedef PageHeaderData *PageHeader; +/* + * HeapPageHeaderData -- data that stored at the begin of each new version heap page. + * pd_xid_base - base value for transaction IDs on page + * pd_multi_base - base value for multixact IDs on page + * + */ +typedef struct { + /* XXX LSN is member of *any* block, not only page-organized ones */ + PageXLogRecPtr pd_lsn; /* LSN: next byte after last byte of xlog + * record for last change to this page */ + uint16 pd_checksum; /* checksum */ + uint16 pd_flags; /* flag bits, see below */ + LocationIndex pd_lower; /* offset to start of free space */ + LocationIndex pd_upper; /* offset to end of free space */ + LocationIndex pd_special; /* offset to start of special space */ + uint16 pd_pagesize_version; + ShortTransactionId pd_prune_xid; /* oldest prunable XID, or zero if none */ + TransactionId pd_xid_base; /* base value for transaction IDs on page */ + TransactionId pd_multi_base; /* base value for multixact IDs on page */ + ItemIdData pd_linp[FLEXIBLE_ARRAY_MEMBER]; /* beginning of line pointer array */ +} HeapPageHeaderData; + +typedef HeapPageHeaderData* HeapPageHeader; + +#define GetPageHeaderSize(page) (PageIs8BXidHeapVersion(page) ? SizeOfHeapPageHeaderData : SizeOfPageHeaderData) + /* * pd_flags contains the following flag bits. Undefined bits are initialized * to zero and may be used in the future. @@ -198,6 +224,7 @@ typedef PageHeaderData *PageHeader; */ #define PG_PAGE_LAYOUT_VERSION 4 #define PG_DATA_CHECKSUM_VERSION 1 +#define PG_HEAP_PAGE_LAYOUT_VERSION 6 /* ---------------------------------------------------------------- * page support macros @@ -214,6 +241,7 @@ typedef PageHeaderData *PageHeader; * line pointer(s) do not count as part of header */ #define SizeOfPageHeaderData (offsetof(PageHeaderData, pd_linp)) +#define SizeOfHeapPageHeaderData (offsetof(HeapPageHeaderData, pd_linp)) /* * PageIsEmpty @@ -275,6 +303,8 @@ typedef PageHeaderData *PageHeader; #define PageGetPageLayoutVersion(page) \ (((PageHeader) (page))->pd_pagesize_version & 0x00FF) +#define PageIs8BXidHeapVersion(page) (PageGetPageLayoutVersion(page) == PG_HEAP_PAGE_LAYOUT_VERSION) + /* * PageSetPageSizeAndVersion * Sets the page size and page layout version number of a page. diff --git a/src/include/storage/cfs_buffers.h b/src/include/storage/cfs_buffers.h index 31eb691eecd343b9700f7b843a17338a2e798e85..2f46383994d1b0ac0bc015fbdd71f2775fa2f1cb 100644 --- a/src/include/storage/cfs_buffers.h +++ b/src/include/storage/cfs_buffers.h @@ -12,6 +12,7 @@ #include "storage/shmem.h" #include "utils/hsearch.h" #include "storage/relfilenode.h" +#include "storage/smgr.h" typedef struct ExtentLocation { int fd; @@ -125,6 +126,8 @@ typedef enum pciBufferReadMode { PCI_BUF_NO_READ } KPHCPciBufferReadMode; +typedef KPHCExtentLocation (*KPHCCfsLocationConvert)(SMgrRelation sRel, ForkNumber forknum, BlockNumber logicBlockNumber, bool skipSync, int type); + extern KPHCPciPageBuffCtx *gPciBufCtx; extern void KPHCPciBufInitCtx(void); diff --git a/src/include/storage/page_compression.h b/src/include/storage/page_compression.h index 8513504bb1276a79b0dc9642761d812b9b102a90..699316b0d07e068f1b73c4feab79148779f25dea 100644 --- a/src/include/storage/page_compression.h +++ b/src/include/storage/page_compression.h @@ -3,6 +3,7 @@ #include "c.h" #include "storage/relfilenode.h" +#include "storage/bufpage.h" #include "utils/rel.h" #define KPHC_CMP_BYTE_CONVERT_LEN 1 @@ -21,6 +22,22 @@ #define KPHC_CMP_ALGORITHM_INDEX 5 #define KPHC_CMP_CHUNK_SIZE_INDEX 6 +#define COMPRESS_ALGORITHM_PGLZ 1 +#define COMPRESS_ALGORITHM_ZSTD 2 +#define COMPRESS_ALGORITHM_PGZSTD 3 + +#define KPHC_COMPRESS_ALGORITHM_PGLZ 1 +#define KPHC_COMPRESS_ALGORITHM_ZSTD 2 +#define KPHC_COMPRESS_UNSUPPORTED_ERROR (-2) +#define KPHC_IS_COMPRESSED_MAINFORK(reln, forkNum) ((reln)->smgr_rnode.node.opt != 0 && (forkNum) == MAIN_FORKNUM) + +#define DEFAULT_ZSTD_COMPRESSION_LEVEL (1) +#define MIN_ZSTD_COMPRESSION_LEVEL ZSTD_minCLevel() +#define MAX_ZSTD_COMPRESSION_LEVEL ZSTD_maxCLevel() + +typedef size_t KPHC_CFS_STORAGE_TYPE; +typedef uint32 pc_chunk_number_t; + typedef struct CmpBitStuct { const unsigned int bitLen; const unsigned int mask; @@ -37,6 +54,28 @@ typedef struct relFileCompressOption { unsigned compressChunkSize : KPHC_CMP_CHUNK_SIZE_LEN; }KPHCRelFileCompressOption; +typedef struct PageCompressData { + char page_header[SizeOfPageHeaderData]; + uint32 crc32; + uint32 size : 16; + uint32 byte_convert : 1; + uint32 diff_convert : 1; + uint32 algorithm : 4; + uint32 unused : 10; + char data[FLEXIBLE_ARRAY_MEMBER]; +} KPHCPageCompressData; + +typedef struct HeapPageCompressData { + char page_header[SizeOfHeapPageHeaderData]; + uint32 crc32; + uint32 size : 16; + uint32 byte_convert : 1; + uint32 diff_convert : 1; + uint32 algorithm : 4; + uint32 unused : 10; + char data[FLEXIBLE_ARRAY_MEMBER]; +} KPHCHeapPageCompressData; + #define SET_COMPRESS_OPTION(node, byteConvert, diffConvert, preChunks, symbol, level ,algorithm, chunkSize) \ do { \ (node).opt = 0; \ @@ -66,10 +105,33 @@ extern const uint32 INDEX_OF_QUARTER_BLCKSZ; extern const uint32 INDEX_OF_EIGHTH_BLCKSZ; extern const uint32 INDEX_OF_SIXTEENTHS_BLCKSZ; +// extern const int KPHC_CFS_EXTENT_SIZE; +// extern const int KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT; +// extern const int KPHC_CFS_EXTENT_COUNT_PER_FILE; +// extern const int KPHC_CFS_MAX_BLOCK_PER_FILE; +#define KPHC_CFS_EXTENT_SIZE 129 +#define KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT (KPHC_CFS_EXTENT_SIZE - 1) +#define KPHC_CFS_EXTENT_COUNT_PER_FILE (RELSEG_SIZE / KPHC_CFS_EXTENT_SIZE) +#define KPHC_CFS_MAX_BLOCK_PER_FILE (KPHC_CFS_EXTENT_COUNT_PER_FILE * KPHC_CFS_EXTENT_SIZE) + + +extern const int KPHC_EXTENT_OPEN_FILE; +extern const int KPHC_WRITE_BACK_OPEN_FILE; +extern const int KPHC_EXTENT_CREATE_FILE; + extern const KPHCCmpBitStuct gCmpBitStruct[]; +extern const uint32 CHUNK_SIZE_LIST[4]; + uint8 ConvertChunkSize(uint32 compressedChunkSize, bool *success); void SetupPageCompressForRelation(RelFileNode *node, PageCompressOpts *compress_options, const char *relationName); +int KPHCCompressPage(const char* src, char* dst, int dst_size, KPHCRelFileCompressOption option); +char *KPHCCfsCompressPage(const char *buffer, KPHCRelFileCompressOption *option, uint8 *nchunks); + +inline void KPHCAnalyzeCompressOptions(const RelFileNode node, KPHCRelFileCompressOption* opt); + +size_t KPHCCfsWritePage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, bool sipSync, KPHC_CFS_STORAGE_TYPE type); + #endif // PAGE_COMPRESSION_H \ No newline at end of file