diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index f14c48da6cf057aed2c62773d64234ebda110135..75dc2e9215cc49342fa94670e901f3f33a57d017 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -40,7 +40,7 @@ #include "storage/sync.h" #include "utils/hsearch.h" #include "utils/memutils.h" - +#include /* * The magnetic disk storage manager keeps track of open file * descriptors in its own descriptor pool. This is done to make it @@ -666,6 +666,204 @@ mdwriteback(SMgrRelation reln, ForkNumber forknum, blocknum += nflush; } } +const unsigned KPHC_CMP_BYTE_CONVERT_INDEX = 0; +const unsigned KPHC_CMP_DIFF_CONVERT_INDEX = 1; +const unsigned KPHC_CMP_PRE_CHUNK_INDEX = 2; +const unsigned KPHC_CMP_COMPRESS_LEVEL_SYMBOL = 3; +const unsigned KPHC_CMP_LEVEL_INDEX = 4; +const unsigned KPHC_CMP_ALGORITHM_INDEX = 5; +const unsigned KPHC_CMP_CHUNK_SIZE_INDEX = 6; +const int KPHC_CFS_EXTENT_SIZE = 129; +const int KPHC_CFS_EXTENT_COUNT_PER_FILE = RELSEG_SIZE / KPHC_CFS_EXTENT_SIZE; +const uint32 CHUNK_SIZE_LIST[4] = {BLCKSZ / 2, BLCKSZ / 4, BLCKSZ / 8, BLCKSZ / 16}; +const int KPHC_CFS_MAX_BLOCK_PER_FILE = KPHC_CFS_EXTENT_COUNT_PER_FILE * KPHC_CFS_EXTENT_SIZE; +const int KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT = KPHC_CFS_EXTENT_SIZE - 1; +const int KPHC_EXTENT_OPEN_FILE = 0; +const int KPHC_WRITE_BACK_OPEN_FILE = 1; +const int KPHC_EXTENT_CREATE_FILE = 2; +const KPHC_CFS_STORAGE_TYPE COMMON_STORAGE = 0; +#define KPHC_COMPRESS_ALGORITHM_PGLZ 1 +#define KPHC_COMPRESS_ALGORITHM_ZSTD 2 +#define KPHC_COMPRESS_UNSUPPORTED_ERROR (-2) +#define KPHC_IS_COMPRESSED_MAINFORK(reln, forkNum) ((reln)->smgr_rnode.node.opt != 0 && (forkNum) == MAIN_FORKNUM) +typedef struct PageCompressData { + char page_header[SizeOfPageHeaderData]; + uint32 crc32; + uint32 size : 16; + uint32 byte_convert : 1; + uint32 diff_convert : 1; + uint32 algorithm : 4; + uint32 unused : 10; + char data[FLEXIBLE_ARRAY_MEMBER]; +} KPHCPageCompressData; + +typedef struct PGLZ_Header { + int32 vl_len_; + int32 rawsize; +} KPHC_PGLZ_Header; + +enum KPHC_SMGR_READ_STATUS { + SMGR_RD_OK = 0, + SMGR_RD_NO_BLOCK = 1, + SMGR_RD_CRC_ERROR = 2 +}; + +const KPHCCmpBitStuct gCmpBitStruct[] = {{KPHC_CMP_BYTE_CONVERT_LEN, 0x01, 15}, + {KPHC_CMP_DIFF_CONVERT_LEN, 0x01, 14}, + {KPHC_CMP_PRE_CHUNK_LEN, 0x07, 11}, + {KPHC_CMP_LEVEL_SYMBOL_LEN, 0x01, 10}, + {KPHC_CMP_LEVEL_LEN, 0x1F, 5}, + {KPHC_CMP_ALGORITHM_LEN, 0x07, 2}, + {KPHC_CMP_CHUNK_SIZE_LEN, 0x03, 0}}; + +typedef KPHCExtentLocation (*KPHCCfsLocationConvert)(SMgrRelation sRel, ForkNumber forknum, BlockNumber logicBlockNumber, bool skipSync, int type); + +inline void KPHCAnalyzeCompressOptions(const RelFileNode node, KPHCRelFileCompressOption* opt) +{ + unsigned short compressOption = node.opt; + opt->compressChunkSize = compressOption & gCmpBitStruct[KPHC_CMP_CHUNK_SIZE_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_CHUNK_SIZE_INDEX].bitLen; + opt->compressAlgorithm = compressOption & gCmpBitStruct[KPHC_CMP_ALGORITHM_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_ALGORITHM_INDEX].bitLen; + opt->compressLevel = compressOption & gCmpBitStruct[KPHC_CMP_LEVEL_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_LEVEL_INDEX].bitLen; + opt->compressLevelSymbol = compressOption & gCmpBitStruct[KPHC_CMP_COMPRESS_LEVEL_SYMBOL].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_COMPRESS_LEVEL_SYMBOL].bitLen; + opt->compressPreallocChunks = compressOption & gCmpBitStruct[KPHC_CMP_PRE_CHUNK_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_PRE_CHUNK_INDEX].bitLen; + opt->diffConvert = compressOption & gCmpBitStruct[KPHC_CMP_DIFF_CONVERT_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_DIFF_CONVERT_INDEX].bitLen; + opt->byteConvert = compressOption & gCmpBitStruct[KPHC_CMP_BYTE_CONVERT_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_BYTE_CONVERT_INDEX].bitLen; +} + +KPHCExtentLocation KPHCStorageConvert(SMgrRelation sRel, ForkNumber forknum, BlockNumber logicBlockNumber, bool skipSync, + int type) +{ + KPHCRelFileCompressOption option; + KPHCAnalyzeCompressOptions(sRel->smgr_rnode.node, &option); + + BlockNumber extentNumber = logicBlockNumber / KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT; + BlockNumber extentOffset = logicBlockNumber % KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT; + BlockNumber extentStart = (extentNumber * KPHC_CFS_EXTENT_SIZE) % KPHC_CFS_MAX_BLOCK_PER_FILE; + BlockNumber extentHeader = extentStart + KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT; + MdfdVec *v = NULL; + int fd = -1; + if (type == KPHC_EXTENT_OPEN_FILE) { + v = _mdfd_getseg(sRel, forknum, logicBlockNumber, skipSync, EXTENSION_FAIL); + } else if (type == KPHC_WRITE_BACK_OPEN_FILE) { + v = _mdfd_getseg(sRel, forknum, logicBlockNumber, skipSync, EXTENSION_RETURN_NULL); + } else if (type == KPHC_EXTENT_CREATE_FILE) { + v = _mdfd_getseg(sRel, forknum, logicBlockNumber, skipSync, EXTENSION_CREATE); + } + if (v != NULL) { + fd = v->mdfd_vfd; + } + KPHCExtentLocation location; + location.fd = fd; + location.relFileNode = sRel->smgr_rnode.node; + location.extentNumber = extentNumber; + location.extentStart = extentStart; + location.extentOffset = extentOffset; + location.headerNum = extentHeader; + location.chrunkSize = (uint16)CHUNK_SIZE_LIST[option.compressChunkSize]; + location.algorithm = (uint8)option.compressAlgorithm; + return location; +} + +KPHCCfsLocationConvert cfsLocationConverts[2] = { + KPHCStorageConvert, + NULL +}; + +inline size_t SizeOfKPHCExtentAddress(uint16 chunkSize) { + if (chunkSize == 0) { + return -1; + } + return offsetof(KPHCCfsExtentAddress, chunknos) + sizeof(uint16) * BLCKSZ / chunkSize; +} +inline off_t OffsetOfPageCompressChunk(uint16 chunkSize, int chunkNo) { + return chunkSize * (chunkNo - 1); +} +inline KPHCCfsExtentAddress *KPHCGetExtentAddress(KPHCCfsExtentHeader *header, uint16 blockOffset) { + auto chunkSize = header->chunk_size; + auto headerOffset = offsetof(KPHCCfsExtentHeader, cfsExtentAddress); + auto sizeOfKPHCExtentAddress = SizeOfKPHCExtentAddress(chunkSize); + return (KPHCCfsExtentAddress *) (((char *) header) + headerOffset + blockOffset * sizeOfKPHCExtentAddress); +} +int KPHCDecompressPage(const char* src, char* dst) +{ + int decompressed_size; + char* data; + uint32 size; + int algorithm; + size_t headerSize = SizeOfPageHeaderData; + memcpy(dst, src, headerSize); + data = ((KPHCPageCompressData*)src)->data; + size = ((KPHCPageCompressData*)src)->size; + algorithm = ((KPHCPageCompressData*)src)->algorithm; + + switch (algorithm) { + case KPHC_COMPRESS_ALGORITHM_PGLZ: + decompressed_size = pglz_decompress((const KPHC_PGLZ_Header* )data, dst + headerSize); + if (decompressed_size == -1) { + return -1; + } + break; + case KPHC_COMPRESS_ALGORITHM_ZSTD: +#ifdef USE_ZSTD + decompressed_size = ZSTD_decompress(dst + headerSize, BLCKSZ - headerSize, data, size); + if (ZSTD_isError(decompressed_size)) { + return -1; + } +#else + return -1; +#endif + break; + default: + return KPHC_COMPRESS_UNSUPPORTED_ERROR; + break; + } + return headerSize + decompressed_size; +} +int KPHCCfsReadPage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, + KPHC_CFS_STORAGE_TYPE type) +{ + KPHCExtentLocation location = cfsLocationConverts[type](reln, forknum, logicBlockNumber, false, KPHC_EXTENT_OPEN_FILE); + KPHCPciPageCtrl *ctrl = KPHCPciBufReadPage(location, LW_SHARED, PCI_BUF_NORMAL_READ); + KPHCCfsExtentHeader *cfsExtentHeader = ctrl->pciPage; + KPHCCfsExtentAddress *cfsExtentAddress = KPHCGetExtentAddress(cfsExtentHeader, (uint16)location.extentOffset); + KPHCRelFileCompressOption option; + KPHCAnalyzeCompressOptions(reln->smgr_rnode.node, &option); + auto chunkSize = CHUNK_SIZE_LIST[option.compressChunkSize]; + auto startOffset = location.extentStart * BLCKSZ; + char *compressedBuffer = (char *) palloc(chunkSize * cfsExtentAddress->nchunks); + char *bufferPos = compressedBuffer; + + for (auto i = 0; i < cfsExtentAddress->nchunks; i++) { + bufferPos = compressedBuffer + (long)chunkSize * i; + off_t seekPos = OffsetOfPageCompressChunk((uint16)chunkSize, cfsExtentAddress->chunknos[i]) + startOffset; + uint8 start = (uint8)i; + while (i < cfsExtentAddress->nchunks - 1 && + cfsExtentAddress->chunknos[i + 1] == cfsExtentAddress->chunknos[i] + 1) { + i++; + } + int readAmount = (int)(chunkSize * ((int)(i - (int)start) + 1)); + FileRead(location.fd, bufferPos, readAmount, seekPos, (uint32)WAIT_EVENT_DATA_FILE_READ); + } + + if (cfsExtentAddress->nchunks == (BLCKSZ / chunkSize)) { + memcpy(buffer, compressedBuffer, BLCKSZ); + } else if (KPHCDecompressPage(compressedBuffer, buffer) != BLCKSZ) { + memset(buffer, 0, BLCKSZ); + KPHCPciBufFreePage(ctrl, location, false); + pfree(compressedBuffer); + return -1; + } + KPHCPciBufFreePage(ctrl, location, false); + pfree(compressedBuffer); + return BLCKSZ; +} /* * mdread() -- Read the specified block from a relation. @@ -687,12 +885,16 @@ mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, v = _mdfd_getseg(reln, forknum, blocknum, false, EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY); - seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); - - Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); - - nbytes = FileRead(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_READ); + if (KPHC_IS_COMPRESSED_MAINFORK(reln, forknum)) { + nbytes = KPHCCfsReadPage(reln, forknum, blocknum, buffer, COMMON_STORAGE); + if (nbytes < 0) { + return SMGR_RD_CRC_ERROR; + } + } else { + seekpos = (off_t)BLCKSZ * (blocknum % ((BlockNumber)RELSEG_SIZE)); + nbytes = FileRead(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_READ); + } TRACE_POSTGRESQL_SMGR_MD_READ_DONE(forknum, blocknum, reln->smgr_rnode.node.spcNode, reln->smgr_rnode.node.dbNode, @@ -754,14 +956,21 @@ mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, reln->smgr_rnode.node.relNode, reln->smgr_rnode.backend); - v = _mdfd_getseg(reln, forknum, blocknum, skipFsync, - EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY); + v = _mdfd_getseg(reln, forknum, blocknum, skipFsync, EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY); + if (v == nullptr) { + return; + } seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); + if (KPHC_IS_COMPRESSED_MAINFORK(reln, forknum)) { + nbytes = (int)KPHCCfsWritePage(reln, forknum, blocknum, buffer, skipFsync, COMMON_STORAGE); + } else { + seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); - Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); + Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); - nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_WRITE); + nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_WRITE); + } TRACE_POSTGRESQL_SMGR_MD_WRITE_DONE(forknum, blocknum, reln->smgr_rnode.node.spcNode, diff --git a/src/include/storage/md.h b/src/include/storage/md.h index ffffa40db7197a04e0b9f18316a13f096d0f9a7c..c6f7a0f8a09045ca411d2682781919e0403041ff 100644 --- a/src/include/storage/md.h +++ b/src/include/storage/md.h @@ -18,6 +18,33 @@ #include "storage/relfilenode.h" #include "storage/smgr.h" #include "storage/sync.h" +#include "port/atomics.h" +#include "storage/cfs_buffers.h" +typedef size_t KPHC_CFS_STORAGE_TYPE; +#define KPHC_CMP_BYTE_CONVERT_LEN 1 +#define KPHC_CMP_DIFF_CONVERT_LEN 1 +#define KPHC_CMP_PRE_CHUNK_LEN 3 +#define KPHC_CMP_LEVEL_SYMBOL_LEN 1 + +#define KPHC_CMP_LEVEL_LEN 5 +#define KPHC_CMP_ALGORITHM_LEN 3 +#define KPHC_CMP_CHUNK_SIZE_LEN 2 + +typedef struct CmpBitStuct { + const unsigned int bitLen; + const unsigned int mask; + const unsigned int moveBit; +}KPHCCmpBitStuct; + +typedef struct relFileCompressOption { + unsigned byteConvert : KPHC_CMP_BYTE_CONVERT_LEN; + unsigned diffConvert : KPHC_CMP_DIFF_CONVERT_LEN; + unsigned compressPreallocChunks : KPHC_CMP_PRE_CHUNK_LEN; + unsigned compressLevelSymbol : KPHC_CMP_LEVEL_SYMBOL_LEN; + unsigned compressLevel : KPHC_CMP_LEVEL_LEN; + unsigned compressAlgorithm : KPHC_CMP_ALGORITHM_LEN; + unsigned compressChunkSize : KPHC_CMP_CHUNK_SIZE_LEN; +}KPHCRelFileCompressOption; /* md storage manager functionality */ extern void mdinit(void);