diff --git a/src/backend/storage/buffer/Makefile b/src/backend/storage/buffer/Makefile index fd7c40dcb089de06025792783d8ba7fb990a2ba6..33f22e5123ca0ca7cd7b0cd86e16b2ad600f5aa0 100644 --- a/src/backend/storage/buffer/Makefile +++ b/src/backend/storage/buffer/Makefile @@ -17,6 +17,7 @@ OBJS = \ buf_table.o \ bufmgr.o \ freelist.o \ - localbuf.o + localbuf.o \ + buf_init_ext.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/buffer/buf_init.c b/src/backend/storage/buffer/buf_init.c index ef1774383e52193e97fa538da7b00d83f1058ba3..61c92be2f4ecf9876942dbf27fa51ac9117b7e18 100644 --- a/src/backend/storage/buffer/buf_init.c +++ b/src/backend/storage/buffer/buf_init.c @@ -179,6 +179,4 @@ BufferShmemSize(void) size = add_size(size, mul_size(NBuffers, sizeof(CkptSortItem))); return size; -} - -#include "buf_init_ext.c" +} \ No newline at end of file diff --git a/src/backend/storage/buffer/bufmgr_ext.c b/src/backend/storage/buffer/bufmgr_ext.c index 2f9c9e032d5e209030bce17c3abc771483089bbc..95800b63f41ce306d55be33e4d9b75a603ef7898 100644 --- a/src/backend/storage/buffer/bufmgr_ext.c +++ b/src/backend/storage/buffer/bufmgr_ext.c @@ -21,7 +21,7 @@ inline uint32 KPHCPciKeyCmp(KPHCCfsBufferKey *keya, KPHCCfsBufferKey *keyb) * * Getting Lock Count */ -static +/*static uint32 KPHCPciLockCount() { Size buffer_size = 0; @@ -30,7 +30,7 @@ uint32 KPHCPciLockCount() ctrlCount = (uint32)((buffer_size - sizeof(KPHCPciPageBuffCtx)) / (sizeof(KPHCPciPageCtrl) + BLCKSZ + 3 * sizeof(KPHCPciHashBucket))); return (PCI_LRU_LIST_NUM + ctrlCount + ctrlCount * 3); -} +}*/ /* * KPHCPciHashCode diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c index 8d7ef91785e70a0ecf3458c7299b686f734b1265..498d2561a9b86cc4d1c1f69fb84476477a6eb0c1 100644 --- a/src/backend/storage/file/fd.c +++ b/src/backend/storage/file/fd.c @@ -3891,3 +3891,10 @@ pg_pwritev_with_retry(int fd, const struct iovec *iov, int iovcnt, off_t offset) return sum; } +void KPHCFileAllocate(File file, uint32 offset, uint32 size) +{ + if (fallocate(VfdCache[file].fd, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, offset, size) < 0) { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("fallocate failed on relation: \"%s\": ", FilePathName(file)))); + } +} \ No newline at end of file diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 1a6f527051845ff2dac3d46a987ff68943cb3e5e..89ab47f8bcccb322d8c0449d9bbbdaf115e275bf 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -47,6 +47,7 @@ #include "storage/procsignal.h" #include "storage/sinvaladt.h" #include "storage/spin.h" +#include "storage/buf_ext.h" #include "utils/snapmgr.h" /* GUCs */ @@ -150,7 +151,7 @@ CalculateShmemSize(int *num_semaphores) /* might as well round it off to a multiple of a typical page size */ size = add_size(size, 8192 - (size % 8192)); - + size = add_size(size, KPHCPciBufferSize()); return size; } @@ -245,7 +246,7 @@ CreateSharedMemoryAndSemaphores(void) SUBTRANSShmemInit(); MultiXactShmemInit(); InitBufferPool(); - + KPHCPciBufInitCtx(); /* * Set up lock manager */ diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index a9051d1f518c2fca46998256c07c1dc55721c1c4..1c8910563180e2c6f4258e42531915f40aba1e23 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -441,6 +441,88 @@ mdunlinkfork(RelFileNodeBackend rnode, ForkNumber forkNum, bool isRedo) pfree(path); } +KPHCExtentLocation KPHCStorageConvert(SMgrRelation sRel, ForkNumber forknum, BlockNumber logicBlockNumber, bool skipSync, + int type) +{ + KPHCRelFileCompressOption option; + BlockNumber extentNumber; + BlockNumber extentOffset; + BlockNumber extentStart; + BlockNumber extentHeader; + MdfdVec *v; + int fd; + KPHCExtentLocation location; + KPHCAnalyzeCompressOptions(sRel->smgr_rnode.node, &option); + extentNumber = logicBlockNumber / (KPHC_CFS_EXTENT_SIZE - 1); + extentOffset = logicBlockNumber % (KPHC_CFS_EXTENT_SIZE - 1); + extentStart = (extentNumber * KPHC_CFS_EXTENT_SIZE) % + (RELSEG_SIZE / KPHC_CFS_EXTENT_SIZE * KPHC_CFS_EXTENT_SIZE); + extentHeader = extentStart + KPHC_CFS_EXTENT_SIZE - 1; + v = NULL; + fd = -1; + if (type == KPHC_EXTENT_OPEN_FILE) { + v = _mdfd_getseg(sRel, forknum, logicBlockNumber, skipSync, EXTENSION_FAIL); + } else if (type == KPHC_WRITE_BACK_OPEN_FILE) { + v = _mdfd_getseg(sRel, forknum, logicBlockNumber, skipSync, EXTENSION_RETURN_NULL); + } else if (type == KPHC_EXTENT_CREATE_FILE) { + v = _mdfd_getseg(sRel, forknum, logicBlockNumber, skipSync, EXTENSION_CREATE); + } + if (v != NULL) { + fd = v->mdfd_vfd; + } + location.fd = fd; + location.relFileNode = sRel->smgr_rnode.node; + location.extentNumber = extentNumber; + location.extentStart = extentStart; + location.extentOffset = extentOffset; + location.headerNum = extentHeader; + location.chrunkSize = (uint16)CHUNK_SIZE_LIST[option.compressChunkSize]; + location.algorithm = (uint8)option.compressAlgorithm; + return location; +} + +KPHCCfsLocationConvert cfsLocationConverts[2] = { + KPHCStorageConvert, + NULL +}; +static void KPHCInitExtentHeader(const KPHCExtentLocation location) +{ + KPHCPciPageCtrl *ctrl = KPHCPciBufReadPage(location, LW_SHARED, PCI_BUF_NO_READ); + KPHCCfsExtentHeader *cfsExtentHeader = ctrl->pciPage; + memset((char *)cfsExtentHeader, 0, BLCKSZ); + cfsExtentHeader->algorithm = location.algorithm; + cfsExtentHeader->chunk_size = location.chrunkSize; + KPHCPciBufFreePage(ctrl, location, true); +} + +static void KPHCCfsExtendExtent(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, + KPHC_CFS_STORAGE_TYPE type) +{ + KPHCPciPageCtrl *ctrl; + KPHCCfsExtentHeader *cfsExtentHeader; + KPHCExtentLocation location = cfsLocationConverts[type](reln, forknum, logicBlockNumber, true, KPHC_EXTENT_OPEN_FILE); + if (location.fd < 0) { + return; + } + if (location.extentOffset == 0) { + size_t start = location.extentStart * BLCKSZ; + KPHCInitExtentHeader(location); + KPHCFileAllocate(location.fd, start, KPHC_CFS_LOGIC_BLOCKS_PER_EXTENT * BLCKSZ); + } + (void)KPHCCfsWritePage(reln, forknum, logicBlockNumber, buffer, true, type); + ctrl = KPHCPciBufReadPage(location, LW_SHARED, PCI_BUF_NORMAL_READ); + if (ctrl->loadStatus == CTRL_PAGE_LOADED_ERROR) { + KPHCPciBufFreePage(ctrl, location, false); + ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Failed to KPHCCfsExtendExtent %s, headerNum: %u.", FilePathName(location.fd), location.headerNum))); + } + cfsExtentHeader = ctrl->pciPage; + if (pg_atomic_read_u32(&cfsExtentHeader->nblocks) < location.extentOffset + 1) { + pg_atomic_write_u32(&cfsExtentHeader->nblocks, location.extentOffset + 1); + } + + KPHCPciBufFreePage(ctrl, location, true); +} /* * mdextend() -- Add a block to the specified relation. * @@ -477,28 +559,30 @@ mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, InvalidBlockNumber))); v = _mdfd_getseg(reln, forknum, blocknum, skipFsync, EXTENSION_CREATE); + if (KPHC_IS_COMPRESSED_MAINFORK(reln, forknum)) { + KPHCCfsExtendExtent(reln, forknum, blocknum, buffer, COMMON_STORAGE); + } else { + seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); - seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); - - Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); + Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); - if ((nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_EXTEND)) != BLCKSZ) - { - if (nbytes < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not extend file \"%s\": %m", - FilePathName(v->mdfd_vfd)), - errhint("Check free disk space."))); - /* short write: complain appropriately */ - ereport(ERROR, - (errcode(ERRCODE_DISK_FULL), - errmsg("could not extend file \"%s\": wrote only %d of %d bytes at block %u", - FilePathName(v->mdfd_vfd), - nbytes, BLCKSZ, blocknum), - errhint("Check free disk space."))); + if ((nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_EXTEND)) != BLCKSZ) + { + if (nbytes < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not extend file \"%s\": %m", + FilePathName(v->mdfd_vfd)), + errhint("Check free disk space."))); + /* short write: complain appropriately */ + ereport(ERROR, + (errcode(ERRCODE_DISK_FULL), + errmsg("could not extend file \"%s\": wrote only %d of %d bytes at block %u", + FilePathName(v->mdfd_vfd), + nbytes, BLCKSZ, blocknum), + errhint("Check free disk space."))); + } } - if (!skipFsync && !SmgrIsTemp(reln)) register_dirty_segment(reln, forknum, v); @@ -669,51 +753,6 @@ mdwriteback(SMgrRelation reln, ForkNumber forknum, } } -KPHCExtentLocation KPHCStorageConvert(SMgrRelation sRel, ForkNumber forknum, BlockNumber logicBlockNumber, bool skipSync, - int type) -{ - KPHCRelFileCompressOption option; - BlockNumber extentNumber; - BlockNumber extentOffset; - BlockNumber extentStart; - BlockNumber extentHeader; - MdfdVec *v; - int fd; - KPHCExtentLocation location; - KPHCAnalyzeCompressOptions(sRel->smgr_rnode.node, &option); - extentNumber = logicBlockNumber / (KPHC_CFS_EXTENT_SIZE - 1); - extentOffset = logicBlockNumber % (KPHC_CFS_EXTENT_SIZE - 1); - extentStart = (extentNumber * KPHC_CFS_EXTENT_SIZE) % - (RELSEG_SIZE / KPHC_CFS_EXTENT_SIZE * KPHC_CFS_EXTENT_SIZE); - extentHeader = extentStart + KPHC_CFS_EXTENT_SIZE - 1; - v = NULL; - fd = -1; - if (type == KPHC_EXTENT_OPEN_FILE) { - v = _mdfd_getseg(sRel, forknum, logicBlockNumber, skipSync, EXTENSION_FAIL); - } else if (type == KPHC_WRITE_BACK_OPEN_FILE) { - v = _mdfd_getseg(sRel, forknum, logicBlockNumber, skipSync, EXTENSION_RETURN_NULL); - } else if (type == KPHC_EXTENT_CREATE_FILE) { - v = _mdfd_getseg(sRel, forknum, logicBlockNumber, skipSync, EXTENSION_CREATE); - } - if (v != NULL) { - fd = v->mdfd_vfd; - } - location.fd = fd; - location.relFileNode = sRel->smgr_rnode.node; - location.extentNumber = extentNumber; - location.extentStart = extentStart; - location.extentOffset = extentOffset; - location.headerNum = extentHeader; - location.chrunkSize = (uint16)CHUNK_SIZE_LIST[option.compressChunkSize]; - location.algorithm = (uint8)option.compressAlgorithm; - return location; -} - -KPHCCfsLocationConvert cfsLocationConverts[2] = { - KPHCStorageConvert, - NULL -}; - /* * mdread() -- Read the specified block from a relation. */ @@ -1402,6 +1441,48 @@ _mdfd_getseg(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno, return v; } +static BlockNumber KPHCCfsNBlock(const RelFileNode relFileNode, int fd, BlockNumber segNo, off_t len) +{ + KPHCRelFileCompressOption option; + BlockNumber fileBlockNum; + BlockNumber extentCount; + BlockNumber result; + KPHCExtentLocation location; + KPHCPciPageCtrl *ctrl; + KPHCCfsExtentHeader *cfsExtentHeader; + KPHCAnalyzeCompressOptions(relFileNode, &option); + + fileBlockNum = (BlockNumber) len / BLCKSZ; + extentCount = fileBlockNum / KPHC_CFS_EXTENT_SIZE; + if (extentCount == 0) { + return (BlockNumber)0; + } + result = (extentCount - 1) * (KPHC_CFS_EXTENT_SIZE - 1); + //location = {fd, relFileNode, ((extentCount - 1) + segNo * RELSEG_SIZE / KPHC_CFS_EXTENT_SIZE), 0, 0, extentCount * KPHC_CFS_EXTENT_SIZE - 1, + // (uint16)CHUNK_SIZE_LIST[option.compressChunkSize], + // (uint8)option.compressAlgorithm}; + location.fd = fd; + location.relFileNode = relFileNode; + location.extentNumber = extentCount - 1 + segNo * RELSEG_SIZE / KPHC_CFS_EXTENT_SIZE; + location.extentStart = 0; + location.extentOffset = 0; + location.headerNum = extentCount * KPHC_CFS_EXTENT_SIZE - 1; + location.chrunkSize = (uint16)CHUNK_SIZE_LIST[option.compressChunkSize]; + location.algorithm = (uint8)option.compressAlgorithm; + ctrl = KPHCPciBufReadPage(location, LW_SHARED, PCI_BUF_NORMAL_READ); + if (ctrl->loadStatus == CTRL_PAGE_LOADED_ERROR) { + KPHCPciBufFreePage(ctrl, location, false); + ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Failed to KPHCCfsNBlock %s, headerNum: %u.", FilePathName(location.fd), location.headerNum))); + } + cfsExtentHeader = ctrl->pciPage; + + result = result + pg_atomic_read_u32(&(cfsExtentHeader->nblocks)); + Assert(result <= ((BlockNumber)RELSEG_SIZE)); + + KPHCPciBufFreePage(ctrl, location, false); + return result; +} /* * Get number of blocks present in a single disk file */ @@ -1416,6 +1497,10 @@ _mdnblocks(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg) (errcode_for_file_access(), errmsg("could not seek to end of file \"%s\": %m", FilePathName(seg->mdfd_vfd)))); + if (KPHC_IS_COMPRESSED_MAINFORK(reln, forknum)) { + return KPHCCfsNBlock(reln->smgr_rnode.node, seg->mdfd_vfd, seg->mdfd_segno, len); + } + /* note that this calculation will ignore any partial block at EOF */ return (BlockNumber) (len / BLCKSZ); } diff --git a/src/common/pg_lzcompress.c b/src/common/pg_lzcompress.c index 363a1f94db378e9ec9d49b0be056275e8e502124..a9572bf829983dc1b0e314f259d038a5d9b4e261 100644 --- a/src/common/pg_lzcompress.c +++ b/src/common/pg_lzcompress.c @@ -932,7 +932,8 @@ int32 KPHCLzDecompress(const KPHC_PGLZ_Header* source, char* dest) } return source->rawsize; } - +#define PGLZ_HISTORU_LISTS 8192 +#define PGLZ_HISTORY_MASK (PGLZ_HISTORU_LISTS - 1) bool KPHCLzCompress(const char* source, int32 slen, KPHC_PGLZ_Header* dest, const PGLZ_Strategy* strategy) { unsigned char* bp = ((unsigned char*)dest) + sizeof(KPHC_PGLZ_Header); @@ -987,18 +988,18 @@ bool KPHCLzCompress(const char* source, int32 slen, KPHC_PGLZ_Header* dest, cons return false; if (!found_match && bp - bstart >= strategy->first_success_by) return false; - if (pglz_find_match(hist_start, dp, dend, &match_len, &match_off, good_match, good_drop, 0xffffffff)) { + if (pglz_find_match(hist_start, dp, dend, &match_len, &match_off, good_match, good_drop, PGLZ_HISTORY_MASK)) { pglz_out_tag(ctrlp, ctrlb, ctrl, bp, match_len, match_off); while (match_len--) { pglz_hist_add( - hist_start, hist_entries, hist_next, hist_recycle, dp, dend, 0xffffffff); + hist_start, hist_entries, hist_next, hist_recycle, dp, dend, PGLZ_HISTORY_MASK); dp++; } found_match = true; } else { pglz_out_literal(ctrlp, ctrlb, ctrl, bp, *dp); pglz_hist_add( - hist_start, hist_entries, hist_next, hist_recycle, dp, dend, 0xffffffff); + hist_start, hist_entries, hist_next, hist_recycle, dp, dend, PGLZ_HISTORY_MASK); dp++; } } diff --git a/src/include/storage/buf_ext.h b/src/include/storage/buf_ext.h index 10d4884b40e24d68af864b81e475cf23e8baf564..42cc118e731fe3413078c5098b50af6b247c2b08 100644 --- a/src/include/storage/buf_ext.h +++ b/src/include/storage/buf_ext.h @@ -11,10 +11,10 @@ */ #ifndef BUF_EXT_H #define BUF_EXT_H - +#include "postgres.h" #include "buf_internals.h" #include "utils/hsearch.h" -#include "postgres.h" + // 待确定 #define LWTRANCHE_PCI_BUFFER_CONTENT (8) @@ -133,8 +133,8 @@ typedef enum pciBufferReadMode { extern KPHCPciPageBuffCtx *gPciBufCtx; -void KPHCPciBufInitCtx(); -Size KPHCPciBufferSize(); +void KPHCPciBufInitCtx(void); +extern Size KPHCPciBufferSize(void); void KPHCPciBufFreePage(KPHCPciPageCtrl *ctrl, const KPHCExtentLocation location, bool need_write); void KPHCPciLruPushNoLock(KPHCPciLruList *lru, KPHCPciPageCtrl *item, KPHCCtrlState state); diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h index 69549b000fa39c26c047a78cf24da590a9d5213c..20fb65bb44f52734e807d8ff2222b51195de48ae 100644 --- a/src/include/storage/fd.h +++ b/src/include/storage/fd.h @@ -111,6 +111,7 @@ extern int FileSync(File file, uint32 wait_event_info); extern off_t FileSize(File file); extern int FileTruncate(File file, off_t offset, uint32 wait_event_info); extern void FileWriteback(File file, off_t offset, off_t nbytes, uint32 wait_event_info); +extern void KPHCFileAllocate(File file, uint32 offset, uint32 size); extern char *FilePathName(File file); extern int FileGetRawDesc(File file); extern int FileGetRawFlags(File file);