diff --git a/src/backend/storage/page/page_compression.c b/src/backend/storage/page/page_compression.c index 7cd45892e29f3a085ff964dcb5fa983a407201df..3a5083b0bf7134f75219a1a5981c3e327eb30486 100644 --- a/src/backend/storage/page/page_compression.c +++ b/src/backend/storage/page/page_compression.c @@ -10,6 +10,8 @@ #include #endif +#define MIN_FALLOCATE_SIZE (4096) + const KPHC_CFS_STORAGE_TYPE COMMON_STORAGE = 0; uint32 AddrChecksum32(const KPHCCfsExtentAddress *cfsExtentAddress, const int needChunks); @@ -57,13 +59,13 @@ uint8 ConvertChunkSize(uint32 compressedChunkSize, bool *success) return chunkSize; } -inline size_t CompressReservedLen(const char* page) +size_t CompressReservedLen(const char* page) { size_t length = offsetof(KPHCHeapPageCompressData, page_header) - offsetof(KPHCHeapPageCompressData, data); return GetPageHeaderSize(page) + length; // 宏不完整 } -inline int CompressPageBufferBound(const char* page, uint8 algorithm) +int CompressPageBufferBound(const char* page, uint8 algorithm) { switch (algorithm) { case COMPRESS_ALGORITHM_PGLZ: @@ -118,7 +120,7 @@ void SetupPageCompressForRelation(RelFileNode *node, PageCompressOpts *compressO } } -inline size_t SizeOfExtentAddress(uint16 chunkSize) +size_t SizeOfExtentAddress(uint16 chunkSize) { if (chunkSize == 0) { return -1; @@ -131,7 +133,7 @@ off_t OffsetOfPageCompressChunk(uint16 chunkSize, int chunkNo) return chunkSize * (chunkNo - 1); } -inline size_t SizeOfExtentAddressByChunks(uint8 nChunks) +size_t SizeOfExtentAddressByChunks(uint8 nChunks) { return offsetof(KPHCCfsExtentAddress, chunknos) + sizeof(uint16) * nChunks; } @@ -472,3 +474,52 @@ int KPHCCfsReadPage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBloc pfree(compressedBuffer); return BLCKSZ; } + +off_t KPHCCfsMdTruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, bool skipSync, KPHC_CFS_STORAGE_TYPE type) +{ + KPHCExtentLocation location = cfsLocationConverts[type](reln, forknum, logicBlockNumber, skipSync, KPHC_EXTENT_OPEN_FILE); + /* if logicBlockNumber is the first block of extent, truncate all after blocks */ + if (logicBlockNumber % KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT == 0) { + return location.extentStart * BLCKSZ; + } + + off_t truncateOffset = (location.headerNum + 1) * BLCKSZ; + KPHCPciPageCtrl *ctrl = KPHCPciBufReadPage(location, LW_SHARED, PCI_BUF_NORMAL_READ); + if (ctrl->loadStatus == CTRL_PAGE_LOADED_ERROR) { + KPHCPciBufFreePage(ctrl, location, false); + ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Failed to CfsMdTruncate %s, headerNum: %u.", FilePathName(location.fd), location.headerNum))); + } + KPHCCfsExtentHeader *cfsExtentHeader = ctrl->pciPage; + + for (int i = (int)location.extentOffset; i < KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT; i++) { + KPHCCfsExtentAddress *cfsExtentAddress = KPHCGetExtentAddress(cfsExtentHeader, (uint16)i); + + cfsExtentAddress->nchunks = 0; + cfsExtentAddress->checksum = AddrChecksum32(cfsExtentAddress, cfsExtentAddress->allocated_chunks); + } + + uint16 max = 0; + for (uint16 i = 0; i < location.extentOffset; i++) { + KPHCCfsExtentAddress *cfsExtentAddress = KPHCGetExtentAddress(cfsExtentHeader, i); + for (int j = 0; j < cfsExtentAddress->allocated_chunks; j++) { + max = (max > cfsExtentAddress->chunknos[j]) ? max : cfsExtentAddress->chunknos[j]; + } + } + + pg_atomic_write_u32(&cfsExtentHeader->nblocks, location.extentOffset); + + /* need sync cfs header */ + KPHCPciBufFreePage(ctrl, location, true); + + /* File allocate (file hole) */ + uint32 start = location.extentStart * BLCKSZ + max * cfsExtentHeader->chunk_size; + uint32 len = (uint32)((KPHC_CFS_MAX_LOGIC_CHUNKS_NUMBER(cfsExtentHeader->chunk_size) - max) * + cfsExtentHeader->chunk_size); + if (len >= MIN_FALLOCATE_SIZE) { + start += (len % MIN_FALLOCATE_SIZE); + len -= (len % MIN_FALLOCATE_SIZE); + KPHCFileAllocate(location.fd, start, len); + } + return truncateOffset; +} diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index 1cfad80e9ae062027195474d726483ae1afb4c8a..8b41ab1e7c610a54e4b272fcdbac83495ebc4db4 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -143,6 +143,8 @@ static MdfdVec *_mdfd_getseg(SMgrRelation reln, ForkNumber forkno, BlockNumber blkno, bool skipFsync, int behavior); static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg); +static int _mdfd_getfd(SMgrRelation sRel, ForkNumber forknum, + BlockNumber logicBlockNumber, bool skipSync, int type); /* @@ -975,12 +977,13 @@ mdtruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks) * Truncate segments, starting at the last one. Starting at the end makes * managing the memory for the fd array easier, should there be errors. */ + BlockNumber relSegSize = KPHC_IS_COMPRESSED_MAINFORK(reln, forknum) ? KPHC_CFS_LOGIC_BLOCKS_PER_FILE: RELSEG_SIZE; curopensegs = reln->md_num_open_segs[forknum]; while (curopensegs > 0) { MdfdVec *v; - priorblocks = (curopensegs - 1) * RELSEG_SIZE; + priorblocks = (curopensegs - 1) * relSegSize; v = &reln->md_seg_fds[forknum][curopensegs - 1]; @@ -1005,7 +1008,7 @@ mdtruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks) FileClose(v->mdfd_vfd); _fdvec_resize(reln, forknum, curopensegs - 1); } - else if (priorblocks + ((BlockNumber) RELSEG_SIZE) > nblocks) + else if (priorblocks + ((BlockNumber)relSegSize) > nblocks) { /* * This is the last segment we want to keep. Truncate the file to @@ -1016,7 +1019,12 @@ mdtruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks) */ BlockNumber lastsegblocks = nblocks - priorblocks; - if (FileTruncate(v->mdfd_vfd, (off_t) lastsegblocks * BLCKSZ, WAIT_EVENT_DATA_FILE_TRUNCATE) < 0) + off_t truncateOffset = (off_t)lastsegblocks * BLCKSZ; + if (KPHC_IS_COMPRESSED_MAINFORK(reln, forknum)) { + truncateOffset = KPHCCfsMdTruncate(reln, forknum, nblocks, false, COMMON_STORAGE); + } + + if (FileTruncate(v->mdfd_vfd, truncateOffset, WAIT_EVENT_DATA_FILE_TRUNCATE) < 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not truncate file \"%s\" to %u blocks: %m", @@ -1588,3 +1596,20 @@ mdfiletagmatches(const FileTag *ftag, const FileTag *candidate) */ return ftag->rnode.dbNode == candidate->rnode.dbNode; } + +static int _mdfd_getfd(SMgrRelation sRel, ForkNumber forknum, BlockNumber logicBlockNumber, bool skipSync, int type) +{ + int fd = -1; + MdfdVec *v = NULL; + if (type == KPHC_EXTENT_OPEN_FILE) { + v = _mdfd_getseg(sRel, forknum, logicBlockNumber, skipSync, EXTENSION_FAIL); + } else if (type == KPHC_WRITE_BACK_OPEN_FILE) { + v = _mdfd_getseg(sRel, forknum, logicBlockNumber, skipSync, EXTENSION_RETURN_NULL); + } else if (type == KPHC_EXTENT_CREATE_FILE) { + v = _mdfd_getseg(sRel, forknum, logicBlockNumber, skipSync, EXTENSION_CREATE); + } + if (v != NULL) { + fd = v->mdfd_vfd; + } + return fd; +} diff --git a/src/include/storage/page_compression.h b/src/include/storage/page_compression.h index 16d53b067e3e4e3a4d8587f35bade17f502e35e3..57615cb6c3fbb704a06ead45b1cd5fa7f7d10ee2 100644 --- a/src/include/storage/page_compression.h +++ b/src/include/storage/page_compression.h @@ -121,6 +121,8 @@ extern const uint32 INDEX_OF_SIXTEENTHS_BLCKSZ; #define KPHC_CFS_LOGIC_BLOCKS_PER_FILE (KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT * KPHC_CFS_LOGIC_COUNT_PER_FILE) #define KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT (KPHC_CFS_EXTENT_SIZE - 1) +#define KPHC_CFS_MAX_LOGIC_CHUNKS_NUMBER(chunk_size) (KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT * (BLCKSZ / (chunk_size))) + extern const int KPHC_EXTENT_OPEN_FILE; extern const int KPHC_WRITE_BACK_OPEN_FILE; extern const int KPHC_EXTENT_CREATE_FILE; @@ -146,5 +148,6 @@ KPHCCfsExtentAddress *KPHCGetExtentAddress(KPHCCfsExtentHeader *header, uint16 b size_t KPHCCfsWritePage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, bool sipSync, KPHC_CFS_STORAGE_TYPE type); int KPHCCfsReadPage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, KPHC_CFS_STORAGE_TYPE type); +off_t KPHCCfsMdTruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, bool skipSync, KPHC_CFS_STORAGE_TYPE type); #endif // PAGE_COMPRESSION_H \ No newline at end of file