diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c index 6527eabfed2fb279d8581cd26d77c7905a02a6e5..5edaf106550f372d918a5303c7c2464b83d12cad 100644 --- a/src/backend/access/common/reloptions.c +++ b/src/backend/access/common/reloptions.c @@ -382,7 +382,32 @@ static relopt_int intRelOpts[] = }, -1, 0, 1024 }, - + { + { + "compress_level", + "Level of page compression.", + RELOPT_KIND_HEAP | RELOPT_KIND_BTREE + }, + 0, -31, 31 + }, + { + { + "compresstype", + "compress type (none, pglz or zstd. pgzstd isn't available now).", + RELOPT_KIND_HEAP | RELOPT_KIND_BTREE + }, + 0, 0, 2 + }, + { + { + "compress_chunk_size", + "Size of chunk to store compressed page.", + RELOPT_KIND_HEAP | RELOPT_KIND_BTREE + }, + BLCKSZ / 2, + BLCKSZ / 16, + BLCKSZ / 2 + }, /* list terminator */ {{NULL}} }; diff --git a/src/backend/storage/page/page_compression.c b/src/backend/storage/page/page_compression.c index 5636c766d6cddc6ab6ed950bb2b2827db8de7f4f..e21eb0ae604f521c5852d72c5a9a2258c6d27daf 100644 --- a/src/backend/storage/page/page_compression.c +++ b/src/backend/storage/page/page_compression.c @@ -1,15 +1,13 @@ -// #define USE_ZSTD -// #include - #include "postgres.h" #include "storage/fd.h" #include "storage/bufpage.h" -#include "storage/cfs_buffers.h" #include "storage/page_compression.h" #include "utils/rel.h" #include "utils/wait_event.h" #include "common/pg_lzcompress.h" - +#ifdef USE_ZSTD +#include +#endif extern KPHCCfsLocationConvert cfsLocationConverts[]; uint32 AddrChecksum32(const KPHCCfsExtentAddress *cfsExtentAddress, const int needChunks); @@ -69,7 +67,11 @@ inline int CompressPageBufferBound(const char* page, uint8 algorithm) case COMPRESS_ALGORITHM_PGLZ: return BLCKSZ * 2; case COMPRESS_ALGORITHM_ZSTD: +#ifdef USE_ZSTD return ZSTD_compressBound(BLCKSZ - CompressReservedLen(page)); +#else + return -1; +#endif case COMPRESS_ALGORITHM_PGZSTD: return BLCKSZ + 4; default: @@ -122,7 +124,7 @@ inline size_t SizeOfExtentAddress(uint16 chunkSize) return offsetof(KPHCCfsExtentAddress, chunknos) + sizeof(uint16) * BLCKSZ / chunkSize; } -inline off_t OffsetOfPageCompressChunk(uint16 chunkSize, int chunkNo) +off_t OffsetOfPageCompressChunk(uint16 chunkSize, int chunkNo) { return chunkSize * (chunkNo - 1); } @@ -133,7 +135,7 @@ inline size_t SizeOfExtentAddressByChunks(uint8 nChunks) } // 获取extent地址 -inline KPHCCfsExtentAddress *GetExtentAddress(KPHCCfsExtentHeader *header, uint16 blockOffset) +KPHCCfsExtentAddress *KPHCGetExtentAddress(KPHCCfsExtentHeader *header, uint16 blockOffset) { uint16 chunkSize = header->chunk_size; size_t headerOffset = offsetof(KPHCCfsExtentHeader, cfsExtentAddress); @@ -201,18 +203,20 @@ static inline bool ExtendChunksOfBlockCore(KPHCCfsExtentHeader *cfsExtentHeader, static inline bool ExtendChunksOfBlock(KPHCCfsExtentHeader *cfsExtentHeader, KPHCExtentLocation *location, uint8 needChunks, uint8 actualUse) { - KPHCCfsExtentAddress *cfsExtentAddress = GetExtentAddress(cfsExtentHeader, (uint16)location->extentOffset); + KPHCCfsExtentAddress *cfsExtentAddress = KPHCGetExtentAddress(cfsExtentHeader, (uint16)location->extentOffset); return ExtendChunksOfBlockCore(cfsExtentHeader, cfsExtentAddress, needChunks, actualUse); } -int KPHCCompressPage(const char* src, char* dst, int dst_size, KPHCRelFileCompressOption option) +int KPHCCompressPage(const char* src, char* dst, int dst_size, const KPHCRelFileCompressOption option) { int compressed_size; +#ifdef USE_ZSTD int8 level = option.compressLevelSymbol ? option.compressLevel : -option.compressLevel; +#endif size_t sizeOfHeaderData = SizeOfPageHeaderData; bool real_ByteConvert = false; bool success; - + KPHCPageCompressData* pcdptr; char* data = ((KPHCPageCompressData*)dst)->data; PageHeaderData *page = (PageHeaderData *)src; bool heapPageData = page->pd_special == BLCKSZ; @@ -224,6 +228,7 @@ int KPHCCompressPage(const char* src, char* dst, int dst_size, KPHCRelFileCompre break; } case COMPRESS_ALGORITHM_ZSTD: { +#ifdef USE_ZSTD if (level == 0 || level < MIN_ZSTD_COMPRESSION_LEVEL || level > MAX_ZSTD_COMPRESSION_LEVEL) { level = DEFAULT_ZSTD_COMPRESSION_LEVEL; } @@ -242,6 +247,9 @@ int KPHCCompressPage(const char* src, char* dst, int dst_size, KPHCRelFileCompre return -1; } break; +#else + return -1; +#endif } default: return KPHC_COMPRESS_UNSUPPORTED_ERROR; @@ -251,10 +259,9 @@ int KPHCCompressPage(const char* src, char* dst, int dst_size, KPHCRelFileCompre return -1; } - KPHCPageCompressData* pcdptr = ((KPHCPageCompressData*)dst); - memcpy_s(pcdptr->page_header, sizeOfHeaderData, src, sizeOfHeaderData); + pcdptr = ((KPHCPageCompressData*)dst); + memcpy(pcdptr->page_header, src, sizeOfHeaderData); pcdptr->size = compressed_size; - pcdptr->crc32 = DataBlockChecksum(data, compressed_size, true); pcdptr->byte_convert = real_ByteConvert; pcdptr->diff_convert = option.diffConvert; pcdptr->algorithm = option.compressAlgorithm; @@ -271,7 +278,7 @@ char *KPHCCfsCompressPage(const char *buffer, KPHCRelFileCompressOption *option, uint8 preallocChunk = option->compressPreallocChunks; char *workBuffer; int compressBufferSize; - + uint32 bufferSize; if (workBufferSize < 0) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("mdwrite algorithm:%d, chunkSize:%d, level:%d, preallocChunk:%d", @@ -284,7 +291,7 @@ char *KPHCCfsCompressPage(const char *buffer, KPHCRelFileCompressOption *option, compressBufferSize = BLCKSZ; } *nchunks = (uint8)((unsigned int)(compressBufferSize - 1) / chunkSize + 1); - uint32 bufferSize = chunkSize * (*nchunks); + bufferSize = chunkSize * (*nchunks); if (bufferSize >= BLCKSZ) { pfree(workBuffer); workBuffer = (char*)buffer; @@ -292,7 +299,7 @@ char *KPHCCfsCompressPage(const char *buffer, KPHCRelFileCompressOption *option, } else { if ((uint32)compressBufferSize < bufferSize) { uint32 leftSize = bufferSize - (uint32)compressBufferSize; - memset_s(workBuffer + compressBufferSize, leftSize, 0, leftSize); + memset(workBuffer + compressBufferSize, 0, leftSize); } } @@ -320,42 +327,50 @@ void KPHCAnalyzeCompressOptions(const RelFileNode node, KPHCRelFileCompressOptio size_t KPHCCfsWritePage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, bool sipSync, KPHC_CFS_STORAGE_TYPE type) { + uint16 chunkSize; + uint8 start; + int writeAmount; + uint8 nchunks; + char *compressedBuffer; + uint8 needChunks; + bool changed; + off_t extentStartOffset; + int nbytes; KPHCExtentLocation location = cfsLocationConverts[type](reln, forknum, logicBlockNumber, sipSync, KPHC_EXTENT_OPEN_FILE); KPHCPciPageCtrl *ctrl = KPHCPciBufReadPage(location, LW_SHARED, PCI_BUF_NORMAL_READ); KPHCCfsExtentHeader *cfsExtentHeader = ctrl->pciPage; - KPHCCfsExtentAddress *cfsExtentAddress = GetExtentAddress(cfsExtentAddress, (uint16)location.extentOffset); + KPHCCfsExtentAddress *cfsExtentAddress = KPHCGetExtentAddress(cfsExtentHeader, (uint16)location.extentOffset); // 获取压缩参数 KPHCRelFileCompressOption option; KPHCAnalyzeCompressOptions(reln->smgr_rnode.node, &option); - uint16 chunkSize = cfsExtentHeader->chunk_size; + chunkSize = cfsExtentHeader->chunk_size; // 执行页压缩 - uint8 nchunks; - char *compressedBuffer = KPHCCfsCompressPage(buffer, &option, &nchunks); + compressedBuffer = KPHCCfsCompressPage(buffer, &option, &nchunks); // 设置地址 - uint8 needChunks = option.compressPreallocChunks > nchunks ? (uint8)option.compressPreallocChunks : (uint8)nchunks; - bool changed = ExtendChunksOfBlock(cfsExtentHeader, &location, needChunks, nchunks); + needChunks = option.compressPreallocChunks > nchunks ? (uint8)option.compressPreallocChunks : (uint8)nchunks; + changed = ExtendChunksOfBlock(cfsExtentHeader, &location, needChunks, nchunks); - off_t extentStartOffset = location.extentStart * BLCKSZ; + extentStartOffset = location.extentStart * BLCKSZ; // 写入每个chunk - for (uint8 i = 0; i < nchunks; ++i) { - uint8 bufferPos = compressedBuffer + (long)chunkSize * i; + for (size_t i = 0; i < nchunks; ++i) { + size_t bufferPos = (size_t)compressedBuffer + (long)chunkSize * i; off_t seekPos = OffsetOfPageCompressChunk(chunkSize, cfsExtentAddress->chunknos[i]); if (cfsExtentAddress->chunknos[i] > ((BLCKSZ / chunkSize) * KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT)) { *((uint32 *)NULL) = 1; // ??? } - uint8 start = i; + start = i; while (i < nchunks - 1 && cfsExtentAddress->chunknos[i + 1] == cfsExtentAddress->chunknos[i] + 1) { i++; } - int writeAmount = chunkSize * ((i - start) + 1); + writeAmount = chunkSize * ((i - start) + 1); if ((seekPos + extentStartOffset) > (((BlockNumber)RELSEG_SIZE) * BLCKSZ)) { *((uint32 *)NULL) = 1; // ??? } - int nbytes = FilePWrite(location.fd, bufferPos, writeAmount, seekPos + extentStartOffset, (uint32)WAIT_EVENT_DATA_FILE_WRITE); + nbytes = FileWrite(location.fd, (char *)bufferPos, writeAmount, seekPos + extentStartOffset, (uint32)WAIT_EVENT_DATA_FILE_WRITE); if (nbytes != writeAmount) { if (compressedBuffer != NULL && compressedBuffer != buffer) { pfree(compressedBuffer); @@ -367,6 +382,6 @@ size_t KPHCCfsWritePage(SMgrRelation reln, ForkNumber forknum, BlockNumber logic if (compressedBuffer != NULL && compressedBuffer != buffer) { pfree(compressedBuffer); } - KPHCPciBufFreePage(ctrl, location, false); + KPHCPciBufFreePage(ctrl, location, changed); return BLCKSZ; } diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index 2a18c995f213001254d0bcfdb480b6d75f2ae7f9..c8d737162008c96c1d9b8eed90040f70205c9af2 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -40,7 +40,6 @@ #include "storage/sync.h" #include "utils/hsearch.h" #include "utils/memutils.h" -#include "storage/cfs_buffers.h" #include "storage/page_compression.h" #include "common/pg_lzcompress.h" @@ -733,15 +732,6 @@ inline size_t SizeOfKPHCExtentAddress(uint16 chunkSize) { } return offsetof(KPHCCfsExtentAddress, chunknos) + sizeof(uint16) * BLCKSZ / chunkSize; } -inline off_t OffsetOfPageCompressChunk(uint16 chunkSize, int chunkNo) { - return chunkSize * (chunkNo - 1); -} -inline KPHCCfsExtentAddress *KPHCGetExtentAddress(KPHCCfsExtentHeader *header, uint16 blockOffset) { - uint16 chunkSize = header->chunk_size; - size_t headerOffset = offsetof(KPHCCfsExtentHeader, cfsExtentAddress); - size_t sizeOfKPHCExtentAddress = SizeOfKPHCExtentAddress(chunkSize); - return (KPHCCfsExtentAddress *) (((char *) header) + headerOffset + blockOffset * sizeOfKPHCExtentAddress); -} int KPHCDecompressPage(const char* src, char* dst) { @@ -905,7 +895,7 @@ mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, off_t seekpos; int nbytes; MdfdVec *v; - + bool compressed; /* This assert is too expensive to have on normally ... */ #ifdef CHECK_WRITE_VS_EXTEND Assert(blocknum < mdnblocks(reln, forknum)); @@ -919,13 +909,14 @@ mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, v = _mdfd_getseg(reln, forknum, blocknum, skipFsync, EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY); - - seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); - - Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); - - nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_WRITE); - + compressed = KPHC_IS_COMPRESSED_MAINFORK(reln, forknum); + if (compressed) { + nbytes = (int)KPHCCfsWritePage(reln, forknum, blocknum, buffer, skipFsync, COMMON_STORAGE); + } else { + seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); + Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); + nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_WRITE); + } TRACE_POSTGRESQL_SMGR_MD_WRITE_DONE(forknum, blocknum, reln->smgr_rnode.node.spcNode, reln->smgr_rnode.node.dbNode, diff --git a/src/common/pg_lzcompress.c b/src/common/pg_lzcompress.c index 1cfa74bfe218db1dec3537df7e75f1b494af263e..363a1f94db378e9ec9d49b0be056275e8e502124 100644 --- a/src/common/pg_lzcompress.c +++ b/src/common/pg_lzcompress.c @@ -386,9 +386,6 @@ do { \ } \ } while (0) -#define HIST_START_LEN (sizeof(PGLZ_HistEntry*) * PGLZ_HISTORY_LISTS) -#define HIST_ENTRIES_LEN (sizeof(PGLZ_HistEntry) * PGLZ_HISTORY_SIZE) - /* ---------- * pglz_find_match - * @@ -957,27 +954,11 @@ bool KPHCLzCompress(const char* source, int32 slen, KPHC_PGLZ_Header* dest, cons int32 result_max; int32 need_rate; - /* - * Our fallback strategy is the default. - */ if (strategy == NULL) strategy = PGLZ_strategy_default; - - /* - * If the strategy forbids compression (at all or if source chunk size out - * of range), fail. - */ if (strategy->match_size_good <= 0 || slen < strategy->min_input_size || slen > strategy->max_input_size) return false; - - /* - * Save the original source size in the header. - */ dest->rawsize = slen; - - /* - * Limit the match parameters to the supported range. - */ good_match = strategy->match_size_good; if (good_match > PGLZ_MAX_MATCH) good_match = PGLZ_MAX_MATCH; @@ -995,88 +976,42 @@ bool KPHCLzCompress(const char* source, int32 slen, KPHC_PGLZ_Header* dest, cons need_rate = 0; else if (need_rate > 99) need_rate = 99; - - /* - * Compute the maximum result size allowed by the strategy, namely the - * input size minus the minimum wanted compression rate. This had better - * be <= slen, else we might overrun the provided output buffer. - */ if (slen > (INT_MAX / 100)) { - /* Approximate to avoid overflow */ result_max = (slen / 100) * (100 - need_rate); - } else + } else { result_max = (slen * (100 - need_rate)) / 100; - - /* - * Initialize the history lists to empty. We do not need to zero the - * u_sess->utils_cxt.hist_entries[] array; its entries are initialized as they are used. - */ - memset_s(hist_start, HIST_START_LEN, 0, HIST_START_LEN); - - /* - * Compress the source directly into the output buffer. - */ + } + memset(hist_start, 0, PGLZ_MAX_HISTORY_LISTS * sizeof(int16)); while (dp < dend) { - /* - * If we already exceeded the maximum result size, fail. - * - * We check once per loop; since the loop body could emit as many as 4 - * bytes (a control byte and 3-byte tag), PGLZ_MAX_OUTPUT() had better - * allow 4 slop bytes. - */ if (bp - bstart >= result_max) return false; - - /* - * If we've emitted more than first_success_by bytes without finding - * anything compressible at all, fail. This lets us fall out - * reasonably quickly when looking at incompressible input (such as - * pre-compressed data). - */ if (!found_match && bp - bstart >= strategy->first_success_by) return false; - - /* - * Try to find a match in the history - */ - if (pglz_find_match(hist_start, dp, dend, &match_len, &match_off, good_match, good_drop, 0xffffffff)) { // mask可能有错 - /* - * Create the tag and add history entries for all matched - * characters. - */ + if (pglz_find_match(hist_start, dp, dend, &match_len, &match_off, good_match, good_drop, 0xffffffff)) { pglz_out_tag(ctrlp, ctrlb, ctrl, bp, match_len, match_off); while (match_len--) { pglz_hist_add( - hist_start, hist_entries, hist_next, hist_recycle, dp, dend, 0xffffffff); // mask可能有错 - dp++; /* Do not do this ++ in the line above! */ - /* The macro would do it four times - Jan. */ + hist_start, hist_entries, hist_next, hist_recycle, dp, dend, 0xffffffff); + dp++; } found_match = true; } else { - /* - * No match found. Copy one literal byte. - */ pglz_out_literal(ctrlp, ctrlb, ctrl, bp, *dp); pglz_hist_add( - hist_start, hist_entries, hist_next, hist_recycle, dp, dend, 0xffffffff); // mask可能有错 - dp++; /* Do not do this ++ in the line above! */ - /* The macro would do it four times - Jan. */ + hist_start, hist_entries, hist_next, hist_recycle, dp, dend, 0xffffffff); + dp++; } } - - /* - * Write out the last control byte and check that we haven't overrun the - * output size allowed by the strategy. - */ *ctrlp = ctrlb; result_size = bp - bstart; - if (result_size >= result_max) + if (result_size >= result_max) { return false; - - /* - * Success - need only fill in the actual length of the compressed datum. - */ - SET_VARSIZE_COMPRESSED(dest, result_size + sizeof(KPHC_PGLZ_Header)); - + } + result_size = result_size + sizeof(KPHC_PGLZ_Header); +#ifndef FRONTEND + SET_VARSIZE_COMPRESSED(dest, result_size); +#else + return false; +#endif return true; } \ No newline at end of file diff --git a/src/include/common/pg_lzcompress.h b/src/include/common/pg_lzcompress.h index 97e517e07bd725acef3e506ebf1895bb32c0fc0e..21722042af33b0f677645cefabc5007b07b69515 100644 --- a/src/include/common/pg_lzcompress.h +++ b/src/include/common/pg_lzcompress.h @@ -69,15 +69,6 @@ typedef struct PGLZ_Header { int32 rawsize; } KPHC_PGLZ_Header; -/* ---------- - * Local definitions - * ---------- - */ -#define PGLZ_HISTORY_LISTS 8192 /* must be power of 2 */ -#define PGLZ_HISTORY_MASK (PGLZ_HISTORY_LISTS - 1) -#define PGLZ_HISTORY_SIZE 4096 -#define PGLZ_MAX_MATCH 273 - /* ---------- * The standard strategies * diff --git a/src/include/storage/page_compression.h b/src/include/storage/page_compression.h index 699316b0d07e068f1b73c4feab79148779f25dea..a435f79a61c8dd44cd37415394c531409eab9e22 100644 --- a/src/include/storage/page_compression.h +++ b/src/include/storage/page_compression.h @@ -5,7 +5,7 @@ #include "storage/relfilenode.h" #include "storage/bufpage.h" #include "utils/rel.h" - +#include "storage/cfs_buffers.h" #define KPHC_CMP_BYTE_CONVERT_LEN 1 #define KPHC_CMP_DIFF_CONVERT_LEN 1 #define KPHC_CMP_PRE_CHUNK_LEN 3 @@ -32,9 +32,10 @@ #define KPHC_IS_COMPRESSED_MAINFORK(reln, forkNum) ((reln)->smgr_rnode.node.opt != 0 && (forkNum) == MAIN_FORKNUM) #define DEFAULT_ZSTD_COMPRESSION_LEVEL (1) +#ifdef USE_ZSTD #define MIN_ZSTD_COMPRESSION_LEVEL ZSTD_minCLevel() #define MAX_ZSTD_COMPRESSION_LEVEL ZSTD_maxCLevel() - +#endif typedef size_t KPHC_CFS_STORAGE_TYPE; typedef uint32 pc_chunk_number_t; @@ -105,15 +106,9 @@ extern const uint32 INDEX_OF_QUARTER_BLCKSZ; extern const uint32 INDEX_OF_EIGHTH_BLCKSZ; extern const uint32 INDEX_OF_SIXTEENTHS_BLCKSZ; -// extern const int KPHC_CFS_EXTENT_SIZE; -// extern const int KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT; -// extern const int KPHC_CFS_EXTENT_COUNT_PER_FILE; -// extern const int KPHC_CFS_MAX_BLOCK_PER_FILE; + #define KPHC_CFS_EXTENT_SIZE 129 #define KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT (KPHC_CFS_EXTENT_SIZE - 1) -#define KPHC_CFS_EXTENT_COUNT_PER_FILE (RELSEG_SIZE / KPHC_CFS_EXTENT_SIZE) -#define KPHC_CFS_MAX_BLOCK_PER_FILE (KPHC_CFS_EXTENT_COUNT_PER_FILE * KPHC_CFS_EXTENT_SIZE) - extern const int KPHC_EXTENT_OPEN_FILE; extern const int KPHC_WRITE_BACK_OPEN_FILE; @@ -127,11 +122,14 @@ uint8 ConvertChunkSize(uint32 compressedChunkSize, bool *success); void SetupPageCompressForRelation(RelFileNode *node, PageCompressOpts *compress_options, const char *relationName); -int KPHCCompressPage(const char* src, char* dst, int dst_size, KPHCRelFileCompressOption option); +int KPHCCompressPage(const char* src, char* dst, int dst_size, const KPHCRelFileCompressOption option); char *KPHCCfsCompressPage(const char *buffer, KPHCRelFileCompressOption *option, uint8 *nchunks); -inline void KPHCAnalyzeCompressOptions(const RelFileNode node, KPHCRelFileCompressOption* opt); +void KPHCAnalyzeCompressOptions(const RelFileNode node, KPHCRelFileCompressOption* opt); size_t KPHCCfsWritePage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, bool sipSync, KPHC_CFS_STORAGE_TYPE type); +off_t OffsetOfPageCompressChunk(uint16 chunkSize, int chunkNo); + +KPHCCfsExtentAddress *KPHCGetExtentAddress(KPHCCfsExtentHeader *header, uint16 blockOffset); #endif // PAGE_COMPRESSION_H \ No newline at end of file