diff --git a/src/backend/storage/smgr/Makefile b/src/backend/storage/smgr/Makefile index 596b564656fa27a27184a2c313847662ce5d7525..cc02a8a0c22aafd373e8e28523bcce56f30e2664 100644 --- a/src/backend/storage/smgr/Makefile +++ b/src/backend/storage/smgr/Makefile @@ -14,6 +14,8 @@ include $(top_builddir)/src/Makefile.global OBJS = \ md.o \ - smgr.o + smgr.o \ + cfs_buffers.o + include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/smgr/cfs_buffers.c b/src/backend/storage/smgr/cfs_buffers.c new file mode 100644 index 0000000000000000000000000000000000000000..d288b9912203d879a21e7cec5805da59612b8f8b --- /dev/null +++ b/src/backend/storage/smgr/cfs_buffers.c @@ -0,0 +1,14 @@ +#include "storage/cfs_buffers.h" + +void KPHCPciBufInitCtx(void) +{ +} +KPHCPciPageCtrl *KPHCPciBufReadPage(const KPHCExtentLocation location, LWLockMode lockMode, KPHCPciBufferReadMode readMode) +{ + return NULL; +} + +void KPHCPciBufFreePage(KPHCPciPageCtrl *ctrl, const KPHCExtentLocation location, bool need_write) +{ +} + diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index f14c48da6cf057aed2c62773d64234ebda110135..adb18fb7427247dd47eca2801d9624f25103b7b7 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -40,7 +40,9 @@ #include "storage/sync.h" #include "utils/hsearch.h" #include "utils/memutils.h" - +#include "storage/cfs_buffers.h" +#include "storage/page_compression.h" +#include "common/pg_lzcompress.h" /* * The magnetic disk storage manager keeps track of open file * descriptors in its own descriptor pool. This is done to make it @@ -667,6 +669,205 @@ mdwriteback(SMgrRelation reln, ForkNumber forknum, } } +typedef size_t KPHC_CFS_STORAGE_TYPE; +const unsigned KPHC_CMP_COMPRESS_LEVEL_SYMBOL = 3; +const int KPHC_CFS_EXTENT_SIZE = 129; +const uint32 CHUNK_SIZE_LIST[4] = {BLCKSZ / 2, BLCKSZ / 4, BLCKSZ / 8, BLCKSZ / 16}; +const int KPHC_EXTENT_OPEN_FILE = 0; +const int KPHC_WRITE_BACK_OPEN_FILE = 1; +const int KPHC_EXTENT_CREATE_FILE = 2; +const KPHC_CFS_STORAGE_TYPE COMMON_STORAGE = 0; +#define KPHC_COMPRESS_ALGORITHM_PGLZ 1 +#define KPHC_COMPRESS_ALGORITHM_ZSTD 2 +#define KPHC_COMPRESS_UNSUPPORTED_ERROR (-2) +#define KPHC_IS_COMPRESSED_MAINFORK(reln, forkNum) ((reln)->smgr_rnode.node.opt != 0 && (forkNum) == MAIN_FORKNUM) +typedef struct PageCompressData { + char page_header[SizeOfPageHeaderData]; + uint32 crc32; + uint32 size : 16; + uint32 byte_convert : 1; + uint32 diff_convert : 1; + uint32 algorithm : 4; + uint32 unused : 10; + char data[FLEXIBLE_ARRAY_MEMBER]; +} KPHCPageCompressData; + +enum KPHC_SMGR_READ_STATUS { + SMGR_RD_OK = 0, + SMGR_RD_NO_BLOCK = 1, + SMGR_RD_CRC_ERROR = 2 +}; +typedef KPHCExtentLocation (*KPHCCfsLocationConvert)(SMgrRelation sRel, ForkNumber forknum, BlockNumber + logicBlockNumber, bool skipSync, int type); +inline void KPHCAnalyzeCompressOptions(const RelFileNode node, KPHCRelFileCompressOption* opt); +KPHCExtentLocation KPHCStorageConvert(SMgrRelation sRel, ForkNumber forknum, BlockNumber logicBlockNumber, bool skipSync, + int type); +int KPHCDecompressPage(const char* src, char* dst); +int KPHCCfsReadPage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, + KPHC_CFS_STORAGE_TYPE type); + +inline void KPHCAnalyzeCompressOptions(const RelFileNode node, KPHCRelFileCompressOption* opt) +{ + unsigned short compressOption = node.opt; + opt->compressChunkSize = compressOption & gCmpBitStruct[KPHC_CMP_CHUNK_SIZE_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_CHUNK_SIZE_INDEX].bitLen; + opt->compressAlgorithm = compressOption & gCmpBitStruct[KPHC_CMP_ALGORITHM_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_ALGORITHM_INDEX].bitLen; + opt->compressLevel = compressOption & gCmpBitStruct[KPHC_CMP_LEVEL_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_LEVEL_INDEX].bitLen; + opt->compressLevelSymbol = compressOption & gCmpBitStruct[KPHC_CMP_COMPRESS_LEVEL_SYMBOL].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_COMPRESS_LEVEL_SYMBOL].bitLen; + opt->compressPreallocChunks = compressOption & gCmpBitStruct[KPHC_CMP_PRE_CHUNK_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_PRE_CHUNK_INDEX].bitLen; + opt->diffConvert = compressOption & gCmpBitStruct[KPHC_CMP_DIFF_CONVERT_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_DIFF_CONVERT_INDEX].bitLen; + opt->byteConvert = compressOption & gCmpBitStruct[KPHC_CMP_BYTE_CONVERT_INDEX].mask; + compressOption = compressOption >> gCmpBitStruct[KPHC_CMP_BYTE_CONVERT_INDEX].bitLen; +} + +KPHCExtentLocation KPHCStorageConvert(SMgrRelation sRel, ForkNumber forknum, BlockNumber logicBlockNumber, bool skipSync, + int type) +{ + KPHCRelFileCompressOption option; + BlockNumber extentNumber; + BlockNumber extentOffset; + BlockNumber extentStart; + BlockNumber extentHeader; + MdfdVec *v; + int fd; + KPHCExtentLocation location; + KPHCAnalyzeCompressOptions(sRel->smgr_rnode.node, &option); + extentNumber = logicBlockNumber / (KPHC_CFS_EXTENT_SIZE - 1); + extentOffset = logicBlockNumber % (KPHC_CFS_EXTENT_SIZE - 1); + extentStart = (extentNumber * KPHC_CFS_EXTENT_SIZE) % + (RELSEG_SIZE / KPHC_CFS_EXTENT_SIZE * KPHC_CFS_EXTENT_SIZE); + extentHeader = extentStart + KPHC_CFS_EXTENT_SIZE - 1; + v = NULL; + fd = -1; + if (type == KPHC_EXTENT_OPEN_FILE) { + v = _mdfd_getseg(sRel, forknum, logicBlockNumber, skipSync, EXTENSION_FAIL); + } else if (type == KPHC_WRITE_BACK_OPEN_FILE) { + v = _mdfd_getseg(sRel, forknum, logicBlockNumber, skipSync, EXTENSION_RETURN_NULL); + } else if (type == KPHC_EXTENT_CREATE_FILE) { + v = _mdfd_getseg(sRel, forknum, logicBlockNumber, skipSync, EXTENSION_CREATE); + } + if (v != NULL) { + fd = v->mdfd_vfd; + } + location.fd = fd; + location.relFileNode = sRel->smgr_rnode.node; + location.extentNumber = extentNumber; + location.extentStart = extentStart; + location.extentOffset = extentOffset; + location.headerNum = extentHeader; + location.chrunkSize = (uint16)CHUNK_SIZE_LIST[option.compressChunkSize]; + location.algorithm = (uint8)option.compressAlgorithm; + return location; +} + +KPHCCfsLocationConvert cfsLocationConverts[2] = { + KPHCStorageConvert, + NULL +}; +inline size_t SizeOfKPHCExtentAddress(uint16 chunkSize) { + if (chunkSize == 0) { + return -1; + } + return offsetof(KPHCCfsExtentAddress, chunknos) + sizeof(uint16) * BLCKSZ / chunkSize; +} +inline off_t OffsetOfPageCompressChunk(uint16 chunkSize, int chunkNo) { + return chunkSize * (chunkNo - 1); +} +inline KPHCCfsExtentAddress *KPHCGetExtentAddress(KPHCCfsExtentHeader *header, uint16 blockOffset) { + uint16 chunkSize = header->chunk_size; + size_t headerOffset = offsetof(KPHCCfsExtentHeader, cfsExtentAddress); + size_t sizeOfKPHCExtentAddress = SizeOfKPHCExtentAddress(chunkSize); + return (KPHCCfsExtentAddress *) (((char *) header) + headerOffset + blockOffset * sizeOfKPHCExtentAddress); +} +int KPHCDecompressPage(const char* src, char* dst) +{ + int decompressed_size; + char* data; +#ifdef USE_ZSTD + uint32 size; +#endif + int algorithm; + size_t headerSize = SizeOfPageHeaderData; + memcpy(dst, src, headerSize); + data = ((KPHCPageCompressData*)src)->data; +#ifdef USE_ZSTD + size = ((KPHCPageCompressData*)src)->size; +#endif + algorithm = ((KPHCPageCompressData*)src)->algorithm; + + switch (algorithm) { + case KPHC_COMPRESS_ALGORITHM_PGLZ: + decompressed_size = KPHCLzDecompress((const KPHC_PGLZ_Header *)data, dst + headerSize); + if (decompressed_size == -1) { + return -1; + } + break; + case KPHC_COMPRESS_ALGORITHM_ZSTD: +#ifdef USE_ZSTD + decompressed_size = ZSTD_decompress(dst + headerSize, BLCKSZ - headerSize, data, size); + if (ZSTD_isError(decompressed_size)) { + return -1; + } +#else + return -1; +#endif + break; + default: + return KPHC_COMPRESS_UNSUPPORTED_ERROR; + break; + } + return headerSize + decompressed_size; +} +int KPHCCfsReadPage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, + KPHC_CFS_STORAGE_TYPE type) +{ + uint32 chunkSize; + uint32 startOffset; + char *compressedBuffer; + char *bufferPos; + off_t seekPos; + uint8 start; + int readAmount; + KPHCExtentLocation location = cfsLocationConverts[type](reln, forknum, logicBlockNumber, false, KPHC_EXTENT_OPEN_FILE); + KPHCPciPageCtrl *ctrl = KPHCPciBufReadPage(location, LW_SHARED, PCI_BUF_NORMAL_READ); + KPHCCfsExtentHeader *cfsExtentHeader = ctrl->pciPage; + KPHCCfsExtentAddress *cfsExtentAddress = KPHCGetExtentAddress(cfsExtentHeader, (uint16)location.extentOffset); + KPHCRelFileCompressOption option; + KPHCAnalyzeCompressOptions(reln->smgr_rnode.node, &option); + chunkSize = CHUNK_SIZE_LIST[option.compressChunkSize]; + startOffset = location.extentStart * BLCKSZ; + compressedBuffer = (char *) palloc(chunkSize * cfsExtentAddress->nchunks); + bufferPos = compressedBuffer; + + for (uint32 i = 0; i < cfsExtentAddress->nchunks; i++) { + bufferPos = compressedBuffer + (long)chunkSize * i; + seekPos = OffsetOfPageCompressChunk((uint16)chunkSize, cfsExtentAddress->chunknos[i]) + startOffset; + start = (uint8)i; + while (i < cfsExtentAddress->nchunks - 1 && + cfsExtentAddress->chunknos[i + 1] == cfsExtentAddress->chunknos[i] + 1) { + i++; + } + readAmount = (int)(chunkSize * ((int)(i - (int)start) + 1)); + FileRead(location.fd, bufferPos, readAmount, seekPos, (uint32)WAIT_EVENT_DATA_FILE_READ); + } + + if (cfsExtentAddress->nchunks == (BLCKSZ / chunkSize)) { + memcpy(buffer, compressedBuffer, BLCKSZ); + } else if (KPHCDecompressPage(compressedBuffer, buffer) != BLCKSZ) { + memset(buffer, 0, BLCKSZ); + KPHCPciBufFreePage(ctrl, location, false); + pfree(compressedBuffer); + return -1; + } + KPHCPciBufFreePage(ctrl, location, false); + pfree(compressedBuffer); + return BLCKSZ; +} /* * mdread() -- Read the specified block from a relation. */ @@ -686,13 +887,15 @@ mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, v = _mdfd_getseg(reln, forknum, blocknum, false, EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY); - - seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); - - Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); - - nbytes = FileRead(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_READ); - + if (KPHC_IS_COMPRESSED_MAINFORK(reln, forknum)) { + nbytes = KPHCCfsReadPage(reln, forknum, blocknum, buffer, COMMON_STORAGE); + if (nbytes < 0) { + return; + } + } else { + seekpos = (off_t)BLCKSZ * (blocknum % ((BlockNumber)RELSEG_SIZE)); + nbytes = FileRead(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_READ); + } TRACE_POSTGRESQL_SMGR_MD_READ_DONE(forknum, blocknum, reln->smgr_rnode.node.spcNode, reln->smgr_rnode.node.dbNode, diff --git a/src/common/pg_lzcompress.c b/src/common/pg_lzcompress.c index ad3970c7a967e42b8cd7412ff92e51d7a6b0d9ba..5667802c763c84c3de123128841fb6e20eb84af9 100644 --- a/src/common/pg_lzcompress.c +++ b/src/common/pg_lzcompress.c @@ -874,3 +874,62 @@ pglz_maximum_compressed_size(int32 rawsize, int32 total_compressed_size) return (int32) compressed_size; } + +int32 KPHCLzDecompress(const KPHC_PGLZ_Header* source, char* dest) +{ + const unsigned char* sp = NULL; + const unsigned char* srcend = NULL; + unsigned char* dp = NULL; + unsigned char* destend = NULL; + + sp = ((const unsigned char*)source) + sizeof(KPHC_PGLZ_Header); +#ifndef FRONTEND + srcend = ((const unsigned char*)source) + VARSIZE(source); +#else + return -1; +#endif + dp = (unsigned char*)dest; + destend = dp + source->rawsize; + + while (sp < srcend && dp < destend) { + unsigned char ctrl = *sp++; + int ctrlc; + + for (ctrlc = 0; ctrlc < 8 && sp < srcend; ctrlc++) { + if (ctrl & 1) { + int32 len; + int32 off; + + len = (sp[0] & 0x0f) + 3; + off = ((sp[0] & 0xf0) << 4) | sp[1]; + sp += 2; + if (len == 18) + len += *sp++; + if (dp + len > destend) { + dp += len; + break; + } + while (len--) { + Assert((long)dp - off >= (long)dest); + *dp = dp[-off]; + dp++; + } + } else { + if (dp >= destend) + break; + + *dp++ = *sp++; + } + ctrl >>= 1; + } + } + + if (dp != destend || sp != srcend) { +#ifndef FRONTEND + ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg("compressed data is corrupt"))); +#else + return -1; +#endif + } + return source->rawsize; +} \ No newline at end of file diff --git a/src/include/common/pg_lzcompress.h b/src/include/common/pg_lzcompress.h index 2a12b33a008024e1eefcafe202b97b37cdd561c0..fb660b85ec0221849fe696aafa36678f8b1fb56c 100644 --- a/src/include/common/pg_lzcompress.h +++ b/src/include/common/pg_lzcompress.h @@ -64,7 +64,10 @@ typedef struct PGLZ_Strategy int32 match_size_drop; } PGLZ_Strategy; - +typedef struct PGLZ_Header { + int32 vl_len_; + int32 rawsize; +} KPHC_PGLZ_Header; /* ---------- * The standard strategies * @@ -87,6 +90,7 @@ extern int32 pglz_compress(const char *source, int32 slen, char *dest, const PGLZ_Strategy *strategy); extern int32 pglz_decompress(const char *source, int32 slen, char *dest, int32 rawsize, bool check_complete); +extern int32 KPHCLzDecompress(const KPHC_PGLZ_Header* source, char* dest); extern int32 pglz_maximum_compressed_size(int32 rawsize, int32 total_compressed_size); diff --git a/src/include/storage/cfs_buffers.h b/src/include/storage/cfs_buffers.h new file mode 100644 index 0000000000000000000000000000000000000000..31eb691eecd343b9700f7b843a17338a2e798e85 --- /dev/null +++ b/src/include/storage/cfs_buffers.h @@ -0,0 +1,135 @@ +#ifndef CFS_BUFFERS_H +#define CFS_BUFFERS_H + +#include "postgres.h" +#include "postgres_ext.h" +#include "storage/block.h" +#include "port/atomics.h" +#include "storage/lwlock.h" +#include "utils/wait_event.h" +#include "storage/fd.h" +#include "utils/elog.h" +#include "storage/shmem.h" +#include "utils/hsearch.h" +#include "storage/relfilenode.h" + +typedef struct ExtentLocation { + int fd; + RelFileNode relFileNode; + BlockNumber extentNumber; + BlockNumber extentStart; + BlockNumber extentOffset; + BlockNumber headerNum; + uint16 chrunkSize; + uint8 algorithm; +}KPHCExtentLocation; + +typedef struct CfsBufferKey { + RelFileNode relFileNode; + uint32 extentCount; +}KPHCCfsBufferKey; + +typedef enum enCtrlState { + CTRL_STATE_INIT = 0, + CTRL_STATE_USED = 1, + CTRL_STATE_FREE = 2, +} KPHCCtrlState; + +typedef enum enCtrlLoadStatus { + CTRL_PAGE_NO_LOAD, + CTRL_PAGE_IS_LOADED, + CTRL_PAGE_LOADED_ERROR +} KPHCCtrlLoadStatus; + +typedef struct CfsExtentAddress { + uint32 checksum; + volatile uint8 nchunks; + volatile uint8 allocated_chunks; + uint16 chunknos[FLEXIBLE_ARRAY_MEMBER]; +}KPHCCfsExtentAddress; + +typedef struct CfsExtentHeader { + pg_atomic_uint32 nblocks; + pg_atomic_uint32 allocated_chunks; + uint16 chunk_size; + uint8 algorithm : 7; + uint8 recycleInOrder : 1; + uint8 recv; + KPHCCfsExtentAddress cfsExtentAddress[FLEXIBLE_ARRAY_MEMBER]; +}KPHCCfsExtentHeader; + +typedef struct __attribute__((aligned(128))) stPciPageCtrl +{ + uint32 ctrlId; + KPHCCtrlState state; + volatile uint32 lruPrev; + volatile uint32 lruNext; + volatile uint32 bckPrev; + volatile uint32 bckNext; + volatile uint32 bckId; + pg_atomic_uint32 touchNum; + pg_atomic_uint32 refNum; + + LWLock *contentLock; + KPHCCtrlLoadStatus loadStatus; + + KPHCCfsBufferKey pciKey; + KPHCCfsExtentHeader *pciPage; +} KPHCPciPageCtrl; + +typedef struct stPciHashBucket { + LWLock *lock; + uint32 bckId; + uint32 ctrlCount; + uint32 firstCtrlId; +} KPHCPciHashBucket; + +typedef struct stPciPageHashtbl { + HashValueFunc hash; + HashCompareFunc match; + + uint32 bucketNum; + KPHCPciHashBucket *buckets; +} KPHCPciPageHashtbl; + +typedef struct stPciLruList { + LWLock *lock; + volatile uint32 count; + volatile uint32 first; + volatile uint32 last; +} KPHCPciLruList; + +#define PCI_PART_LIST_NUM (8) +#define PCI_LRU_LIST_NUM (PCI_PART_LIST_NUM * 2) + +typedef struct __attribute__((aligned(128))) stPciPageBuffCtx +{ + KPHCPciPageCtrl *ctrlBuf; + char *pageBuf; + KPHCPciPageHashtbl hashtbl; + + uint32 maxCount; + KPHCPciLruList mainLru[PCI_PART_LIST_NUM]; + KPHCPciLruList freeLru[PCI_PART_LIST_NUM]; +} KPHCPciPageBuffCtx; + +#define PCI_INVALID_ID (0) +#define PCI_GET_CTRL_BY_ID(ctx, ctrlId) ((KPHCPciPageCtrl *)(&(ctx->ctrlBuf[(ctrlId) - 1]))) +#define PCI_GET_BUCKET_BY_ID(ctx, bckId) ((KPHCPciHashBucket *)(&(ctx->hashtbl.buckets[(bckId) - 1]))) +#define PCI_GET_BUCKET_BY_HASH(ctx, hashcode) (PCI_GET_BUCKET_BY_ID(ctx, (((hashcode) % ctx->hashtbl.bucketNum) + 1))) + +#define PCI_SET_NO_READ(ctrl) (ctrl->loadStatus = CTRL_PAGE_NO_LOAD) + +typedef enum pciBufferReadMode { + PCI_BUF_NORMAL_READ, + PCI_BUF_NO_READ +} KPHCPciBufferReadMode; + +extern KPHCPciPageBuffCtx *gPciBufCtx; + +extern void KPHCPciBufInitCtx(void); + +extern KPHCPciPageCtrl *KPHCPciBufReadPage(const KPHCExtentLocation location, LWLockMode lockMode, KPHCPciBufferReadMode readMode); + +extern void KPHCPciBufFreePage(KPHCPciPageCtrl *ctrl, const KPHCExtentLocation location, bool need_write); +#endif // CFS_BUFFERS_H