From f9b032100b9e84ebf35c8d7f40c6f5bf8177e328 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B9=B8=E8=BF=90=E7=9A=84=E7=B1=B3=E7=B2=A5912?= <2903807914@qq.com> Date: Fri, 6 Jan 2023 03:07:28 +0000 Subject: [PATCH 1/2] =?UTF-8?q?!18=20=E5=A2=9E=E5=8A=A0int32=E8=BE=B9?= =?UTF-8?q?=E7=95=8C=E5=80=BC=E5=AE=8F=20*=20=E5=A2=9E=E5=8A=A0int32?= =?UTF-8?q?=E8=BE=B9=E7=95=8C=E5=80=BC=E5=AE=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- walminer/.gitignore | 4 ++++ walminer/imagemanage.c | 2 +- walminer/results/wal2sql_other.out | 36 ------------------------------ walminer/wm_utils.h | 2 ++ 4 files changed, 7 insertions(+), 37 deletions(-) create mode 100644 walminer/.gitignore delete mode 100644 walminer/results/wal2sql_other.out diff --git a/walminer/.gitignore b/walminer/.gitignore new file mode 100644 index 0000000..688193c --- /dev/null +++ b/walminer/.gitignore @@ -0,0 +1,4 @@ +*.o +*.so +results/* + diff --git a/walminer/imagemanage.c b/walminer/imagemanage.c index 8e15909..8458db3 100755 --- a/walminer/imagemanage.c +++ b/walminer/imagemanage.c @@ -527,7 +527,7 @@ put_image_index(ImageStore *image, bool *use_existed_imageindex) { memcpy(&imageStoreHashEntry->key,&key,sizeof(ImageStore)); imageStoreHashEntry->index = hash_get_num_entries(walminer_decode_context->anapro.imageStoreHash) - 1; - if(__INT32_MAX__ <= imageStoreHashEntry->index + 1) + if(WALMINER_INT32_MAX <= imageStoreHashEntry->index + 1) { elog(ERROR, "Can not support page numer greater than %d", imageStoreHashEntry->index); } diff --git a/walminer/results/wal2sql_other.out b/walminer/results/wal2sql_other.out deleted file mode 100644 index b416cd1..0000000 --- a/walminer/results/wal2sql_other.out +++ /dev/null @@ -1,36 +0,0 @@ -DROP EXTENSION IF EXISTS walminer; -CREATE EXTENSION IF NOT EXISTS walminer; -DROP TABLE t1; -SELECT walminer_stop(); - walminer_stop ------------------- - walminer stoped! -(1 row) - -CHECKPOINT; -SELECT pg_current_wal_lsn() AS lsn1 \gset -CREATE TABLE t1(i INT, j INT, k VARCHAR); -INSERT INTO t1 VALUES(1,1,NULL); -UPDATE t1 SET i = 10 WHERE i = 1; -DELETE FROM t1 WHERE i = 10; -SELECT pg_current_wal_lsn() AS lsn2 \gset -SELECT walminer_regression_mode(); - walminer_regression_mode --------------------------- - t -(1 row) - -SELECT wal2sql(:'lsn1'::pg_lsn, :'lsn2'::pg_lsn, 'true'); - wal2sql ---------------------- - pg_minerwal success -(1 row) - -SELECT sqlno, topxid=0 as istopxid, op_text FROM walminer_contents; - sqlno | istopxid | op_text --------+----------+----------------------------------------------------------- - 1 | t | INSERT INTO public.t1(i ,j ,k) VALUES(1 ,1 ,null) - 1 | t | UPDATE public.t1 SET i=10 WHERE i=1 AND j=1 AND k is null - 1 | t | DELETE FROM public.t1 WHERE i=10 AND j=1 AND k is null -(3 rows) - diff --git a/walminer/wm_utils.h b/walminer/wm_utils.h index 34c2c98..ffc997d 100755 --- a/walminer/wm_utils.h +++ b/walminer/wm_utils.h @@ -13,6 +13,8 @@ extern bool debug_mode; +#define WALMINER_INT32_MAX 2147483647 +#define WALMINER_INT32_MIN (-WALMINER_INT32_MAX -1) #define WALMINER_VERSION_NUM "3.0.0" #define PATH_KIND_INVALID 0 -- Gitee From 8b676a0a7180c09e15d89a134680e7e30ed7af9e Mon Sep 17 00:00:00 2001 From: movead Date: Sat, 13 Jan 2024 17:50:03 +0800 Subject: [PATCH 2/2] PG16 support --- walminer/Makefile | 11 ++++++--- walminer/datadictionary.c | 12 ++++++---- walminer/datadictionary.h | 12 +++++++--- walminer/imagemanage.c | 19 +++++++++------ walminer/pagecollect.c | 7 +++--- walminer/wal2sql.c | 30 ++++++++++++++---------- walminer/wallist.c | 6 ++--- walminer/walminer.c | 10 +++++++- walminer/walminer_decode.c | 47 ++++++++++++++++++++------------------ walminer/walminer_decode.h | 1 + walminer/walreader.c | 18 +++++++-------- walminer/wm_compatible.c | 20 ++++++++++++++++ walminer/wm_compatible.h | 40 ++++++++++++++++++++++++++++++++ walminer/wm_utils.h | 1 + 14 files changed, 166 insertions(+), 68 deletions(-) create mode 100755 walminer/wm_compatible.c create mode 100755 walminer/wm_compatible.h diff --git a/walminer/Makefile b/walminer/Makefile index 2791e6b..043cbe1 100755 --- a/walminer/Makefile +++ b/walminer/Makefile @@ -3,7 +3,7 @@ MODULE_big = walminer OBJS = walminer.o wm_utils.o datadictionary.o fetchcatalogtable.o wallist.o walreader.o \ walminer_decode.o imagemanage.o wal2sql.o walminer_contents.o walminer_thread.o \ - wal2sql_spi.o wal2sql_ddl.o pagecollect.o + wal2sql_spi.o wal2sql_ddl.o pagecollect.o wm_compatible.o EXTENSION = walminer @@ -20,7 +20,9 @@ ifdef USE_PGXS PG_CONFIG = pg_config PGXS := $(shell $(PG_CONFIG) --pgxs) -ifeq ($(MAJORVERSION), 15) +ifeq ($(MAJORVERSION), 16) + PG_CPPFLAGS = -DPG_VERSION_16 +else ifeq ($(MAJORVERSION), 15) PG_CPPFLAGS = -DPG_VERSION_15 else ifeq ($(MAJORVERSION), 14) PG_CPPFLAGS = -DPG_VERSION_14 @@ -45,7 +47,10 @@ else subdir = contrib/walminer top_builddir = ../.. include $(top_builddir)/src/Makefile.global -ifeq ($(MAJORVERSION), 15) + +ifeq ($(MAJORVERSION), 16) + PG_CPPFLAGS = -DPG_VERSION_16 +else ifeq ($(MAJORVERSION), 15) PG_CPPFLAGS = -DPG_VERSION_15 else ifeq ($(MAJORVERSION), 14) PG_CPPFLAGS = -DPG_VERSION_14 diff --git a/walminer/datadictionary.c b/walminer/datadictionary.c index d015178..fa0df1c 100755 --- a/walminer/datadictionary.c +++ b/walminer/datadictionary.c @@ -435,7 +435,7 @@ built_ddh(char *target_path) { FILE *fp = NULL; DataDicHeader ddh; -#if (defined PG_VERSION_15) +#if ((defined PG_VERSION_15) || (defined PG_VERSION_16)) ControlFileData *ControlFile = NULL; bool crc_ok = true; #endif @@ -445,13 +445,13 @@ built_ddh(char *target_path) ereport(ERROR, (errcode(ERRCODE_SYSTEM_ERROR), errmsg("Can not create dictionary file %s",target_path))); -#if (defined PG_VERSION_15) +#if ((defined PG_VERSION_15) || (defined PG_VERSION_16)) ControlFile = get_controlfile(DataDir, &crc_ok); if (!crc_ok) ereport(ERROR, (errmsg("calculated CRC checksum does not match value stored in file"))); #endif -#if (defined PG_VERSION_15) +#if ((defined PG_VERSION_15) || (defined PG_VERSION_16)) ddh.timeline = ControlFile->checkPointCopy.ThisTimeLineID; #else ddh.timeline = ThisTimeLineID; @@ -1104,9 +1104,11 @@ get_relnode_by_reloid(Oid reloid, RelFileNode* rnode) { elog(ERROR, "could not open relation with OID %u", reloid); } - +#if (defined PG_VERSION_16) + memcpy(rnode, &rel->rd_locator, sizeof(RelFileNode)); +#else memcpy(rnode, &rel->rd_node, sizeof(RelFileNode)); - +#endif RelationClose(rel); } /* diff --git a/walminer/datadictionary.h b/walminer/datadictionary.h index 5bb1571..2917350 100755 --- a/walminer/datadictionary.h +++ b/walminer/datadictionary.h @@ -30,8 +30,9 @@ #include "port/pg_crc32c.h" #include "access/xlog_internal.h" #include "replication/reorderbuffer.h" +#include "wm_compatible.h" -#if (defined PG_VERSION_12) || (defined PG_VERSION_13) || (defined PG_VERSION_14) || (defined PG_VERSION_15) +#if (defined PG_VERSION_12) || (defined PG_VERSION_13) || (defined PG_VERSION_14) || (defined PG_VERSION_15) || (defined PG_VERSION_16) #include "access/table.h" #endif @@ -51,8 +52,11 @@ #define WALMINER_LOST_RECORD 1 - -#define MAX_MAPPINGS 62 /* 62 * 8 + 16 = 512 */ +#if (defined PG_VERSION_16) +#define MAX_MAPPINGS 64 +#else +#define MAX_MAPPINGS 62 +#endif #define RELMAPPER_FILEMAGIC 0x592717 /* version ID value */ #define WALMINER_SYSCLASS_MAX 80 @@ -112,7 +116,9 @@ typedef struct RelMapFile int32 num_mappings; /* number of valid RelMapping entries */ RelMapping mappings[MAX_MAPPINGS]; pg_crc32c crc; /* CRC of all above */ +#ifndef PG_VERSION_16 int32 pad; /* to make the struct size be 512 exactly */ +#endif } RelMapFile; diff --git a/walminer/imagemanage.c b/walminer/imagemanage.c index 8458db3..6a0ae26 100755 --- a/walminer/imagemanage.c +++ b/walminer/imagemanage.c @@ -49,7 +49,7 @@ get_block_image(XLogReaderState *record, uint8 block_id, char *page) { DecodedBkpBlock *bkpb; char *ptr; -#if (defined PG_VERSION_15) +#if ((defined PG_VERSION_15) || (defined PG_VERSION_16)) PGAlignedBlock tmp; if (!record->record->blocks[block_id].in_use) return false; @@ -89,7 +89,7 @@ get_block_image(XLogReaderState *record, uint8 block_id, char *page) ptr = tmp; } #endif -#if (defined PG_VERSION_15) +#if ((defined PG_VERSION_15) || (defined PG_VERSION_16)) if (BKPIMAGE_COMPRESSED(bkpb->bimg_info)) { bool decomp_success = true; @@ -327,23 +327,23 @@ record_store_image(XLogReaderState *record) int max_block = 0; memset(&target_node, 0, sizeof(RelFileNode)); -#if (defined PG_VERSION_15) +#if ((defined PG_VERSION_15) || (defined PG_VERSION_16)) if(!XLogRecHasBlockRef(record, 0)) return; #endif - XLogRecGetBlockTag(record, 0, &target_node, NULL, NULL); + wm_XLogRecGetBlockTag(record, 0, &target_node, NULL, NULL); if(!filter_in_decode(&target_node)) { return; } -#if (defined PG_VERSION_15) +#if ((defined PG_VERSION_15) || (defined PG_VERSION_16)) max_block = XLogRecMaxBlockId(record); #else max_block = XLR_MAX_BLOCK_ID; #endif for(block_id = 0; block_id <= max_block; block_id++) { -#if (defined PG_VERSION_15) +#if ((defined PG_VERSION_15) || (defined PG_VERSION_16)) bkpb = &record->record->blocks[block_id]; #else bkpb = &record->blocks[block_id]; @@ -354,10 +354,15 @@ record_store_image(XLogReaderState *record) continue; memset(page, 0, BLCKSZ); image = palloc0(sizeof(ImageStore)); +#if (defined PG_VERSION_16) + image->rnode.dbNode = bkpb->rlocator.dbOid; + image->rnode.relNode = bkpb->rlocator.relNumber; + image->rnode.spcNode = bkpb->rlocator.spcOid; +#else image->rnode.dbNode = bkpb->rnode.dbNode; image->rnode.relNode = bkpb->rnode.relNode; image->rnode.spcNode = bkpb->rnode.spcNode; - +#endif image->forknum = bkpb->forknum; image->blkno = bkpb->blkno; diff --git a/walminer/pagecollect.c b/walminer/pagecollect.c index 86f7f59..565a530 100644 --- a/walminer/pagecollect.c +++ b/walminer/pagecollect.c @@ -18,7 +18,7 @@ #include "utils/rel.h" #include "storage/smgr.h" -#if ((defined PG_VERSION_13) || (defined PG_VERSION_14) || (defined PG_VERSION_15)) +#if ((defined PG_VERSION_13) || (defined PG_VERSION_14) || (defined PG_VERSION_15) || (defined PG_VERSION_16)) #include "access/relation.h" #else #include "catalog/catalog.h" @@ -253,7 +253,7 @@ pc_search_wal(void* temp) wdc->swpro.read_lsn = prepare_read(true, InvalidXLogRecPtr); wdc->swpro.decode_lsn = wdc->swpro.read_lsn; wdc->swpro.get_decode_lsn = true; -#if (defined PG_VERSION_13) || (defined PG_VERSION_14) || (defined PG_VERSION_15) +#if (defined PG_VERSION_13) || (defined PG_VERSION_14) || (defined PG_VERSION_15) || (defined PG_VERSION_16) wdc->swpro.timeline_id = wdc->reader_search->latestPageTLI; wdc->swpro.wal_seg_size = wdc->reader_search->segcxt.ws_segsize; #elif (defined PG_VERSION_11) || (defined PG_VERSION_12) @@ -328,6 +328,7 @@ pc_handle_argument(PG_FUNCTION_ARGS) Datum page_collect_internal(PG_FUNCTION_ARGS) { + const char* result = "page collect success"; check_all(); memset(&wdecoder, 0, sizeof(WalminerDecode)); @@ -336,5 +337,5 @@ page_collect_internal(PG_FUNCTION_ARGS) pg_walminer(fcinfo); - PG_RETURN_CSTRING(cstring_to_text("page collect success")); + PG_RETURN_TEXT_P(cstring_to_text(result)); } diff --git a/walminer/wal2sql.c b/walminer/wal2sql.c index bb6a75c..59ee6ee 100644 --- a/walminer/wal2sql.c +++ b/walminer/wal2sql.c @@ -30,7 +30,7 @@ #include "wm_utils.h" #include "utils/builtins.h" #include "catalog/namespace.h" -#if (defined PG_VERSION_13) || (defined PG_VERSION_14 || (defined PG_VERSION_15)) +#if (defined PG_VERSION_13) || (defined PG_VERSION_14) || (defined PG_VERSION_15) || (defined PG_VERSION_16) #include "access/heaptoast.h" #include "access/detoast.h" #else @@ -684,7 +684,7 @@ check_varlena(Datum attr,struct varlena** att_return, List *toastlist) } VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr); -#if ((defined PG_VERSION_14) || (defined PG_VERSION_15)) +#if ((defined PG_VERSION_14) || (defined PG_VERSION_15) || (defined PG_VERSION_16)) ressize = VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer); #else ressize = toast_pointer.va_extsize; @@ -706,7 +706,7 @@ check_varlena(Datum attr,struct varlena** att_return, List *toastlist) memcpy(VARDATA(result) + tent->chunk_seq * TOAST_MAX_CHUNK_SIZE, tent->chunk_data, tent->datalength); } -#if (defined PG_VERSION_13) || (defined PG_VERSION_14) || (defined PG_VERSION_15) +#if (defined PG_VERSION_13) || (defined PG_VERSION_14) || (defined PG_VERSION_15) || (defined PG_VERSION_16) cell = lnext(toastlist, cell); #else cell = lnext(cell); @@ -721,7 +721,7 @@ check_varlena(Datum attr,struct varlena** att_return, List *toastlist) if (VARATT_IS_COMPRESSED(result)) { struct varlena *tmp = result; -#if (defined PG_VERSION_13) || (defined PG_VERSION_14) || (defined PG_VERSION_15) +#if (defined PG_VERSION_13) || (defined PG_VERSION_14) || (defined PG_VERSION_15) || (defined PG_VERSION_16) result = detoast_attr(tmp); #else result = heap_tuple_untoast_attr(tmp); @@ -812,7 +812,7 @@ collect_info_by_change(ReorderBufferChange *change, TransactionEntry *te) { Oid reloid = InvalidOid; - reloid = get_reloid_by_relfilenode(&change->data.tp.relnode); + reloid = get_reloid_by_relfilenode(GET_CHANGE_RELNODE(change)); Assert(0 != reloid); /* 如果不需要解析系统表,且当前为系统表,则不解析这个record */ @@ -1353,7 +1353,7 @@ collect_toast(ReorderBufferChange *change, TransactionEntry *te, ReorderBufferTu if(!tuplebuf) return; - reloid = get_reloid_by_relfilenode(&change->data.tp.relnode); + reloid = get_reloid_by_relfilenode(GET_CHANGE_RELNODE(change)); Assert(0 != reloid); /* 如果不需要解析系统表,且当前为系统表,则不解析这个record */ @@ -1754,7 +1754,7 @@ wal2sql_search_wal(void* temp) elog(ERROR, "Wrong analyse type %d", wal2sql_struct.at); break; } -#if (defined PG_VERSION_13) || (defined PG_VERSION_14) || (defined PG_VERSION_15) +#if (defined PG_VERSION_13) || (defined PG_VERSION_14) || (defined PG_VERSION_15) || (defined PG_VERSION_16) XLogBeginRead(walminer_decode_context->reader_search, first_record); #endif while(true) @@ -1770,7 +1770,7 @@ wal2sql_search_wal(void* temp) * 在DECODE_TYPE_LSN/DECODE_TYPE_TIME/DECODE_TYPE_XID解析类型时 * 需要别的地方决定end_lsn和get_search_end */ -#if (defined PG_VERSION_15) +#if ((defined PG_VERSION_15) || (defined PG_VERSION_16)) if(!walminer_decode_context->reader_search->record) #else if(!walminer_decode_context->reader_search->decoded_record) @@ -1794,7 +1794,7 @@ wal2sql_search_wal(void* temp) (uint32)wdc->swpro.decode_lsn); wdc->swpro.get_decode_lsn = true; wdc->swpro.decode_ckp_cell = list_tail(wdc->swpro.checkpoint_list); -#if (defined PG_VERSION_13) || (defined PG_VERSION_14) || (defined PG_VERSION_15) +#if (defined PG_VERSION_13) || (defined PG_VERSION_14) || (defined PG_VERSION_15) || (defined PG_VERSION_16) wdc->swpro.timeline_id = wdc->reader_search->latestPageTLI; wdc->swpro.wal_seg_size = wdc->reader_search->segcxt.ws_segsize; #elif (defined PG_VERSION_11) || (defined PG_VERSION_12) @@ -1976,7 +1976,7 @@ wal2sql_check_statue(void) else { cell = walminer_decode_context->anapro.ckp_cell; -#if (defined PG_VERSION_13) || (defined PG_VERSION_14) || (defined PG_VERSION_15) +#if (defined PG_VERSION_13) || (defined PG_VERSION_14) || (defined PG_VERSION_15) || (defined PG_VERSION_16) nextcell = lnext(walminer_decode_context->swpro.checkpoint_list, cell); #else nextcell = lnext(cell); @@ -2037,7 +2037,7 @@ wal2sql_check_statue(void) return true; } - walminer_debug("[check_search_statue]curlsn=%d,ReadRecPtr=%x/%x,end_lsn=%x/%x",get_search_end, + walminer_debug("[check_search_statue]end=%d,curlsn=%x/%x,end_lsn=%x/%x",get_search_end, (uint32)(curlsn >> 32), (uint32)(curlsn), (uint32)(end_lsn >> 32), (uint32)(end_lsn)); if(get_search_end && curlsn >= end_lsn) @@ -2106,6 +2106,12 @@ wal2sql_self_apply(PG_FUNCTION_ARGS) Datum wal2sql_internal(PG_FUNCTION_ARGS) { + const char* result = "wal2sql success"; + + if(regression_mode) + { + result = "pg_minerwal success"; + } check_all(); memset(&wdecoder, 0, sizeof(WalminerDecode)); @@ -2115,5 +2121,5 @@ wal2sql_internal(PG_FUNCTION_ARGS) wal2sql_prepare(); pg_walminer(fcinfo); wal2sql_end(); - PG_RETURN_CSTRING(cstring_to_text("pg_minerwal success")); + PG_RETURN_TEXT_P(cstring_to_text(result)); } diff --git a/walminer/wallist.c b/walminer/wallist.c index e43a6e6..260cc20 100755 --- a/walminer/wallist.c +++ b/walminer/wallist.c @@ -13,7 +13,7 @@ List *wal_file_list = NULL; static bool check_walfile_valid(char *path,int pathkind); static void add_walfile_to_list(char *path); -#if (defined PG_VERSION_13) || (defined PG_VERSION_14) || (defined PG_VERSION_15) +#if (defined PG_VERSION_13) || (defined PG_VERSION_14) || (defined PG_VERSION_15) || (defined PG_VERSION_16) static int wal_file_compare(const ListCell *v1,const ListCell *v2); #else static int wal_file_compare(const void *v1,const void *v2); @@ -352,7 +352,7 @@ add_wal(char *path) } } } -#if (defined PG_VERSION_13) || (defined PG_VERSION_14) || (defined PG_VERSION_15) +#if (defined PG_VERSION_13) || (defined PG_VERSION_14) || (defined PG_VERSION_15) || (defined PG_VERSION_16) list_sort(wal_file_list, wal_file_compare); #else wal_file_list = list_qsort(wal_file_list, wal_file_compare); @@ -372,7 +372,7 @@ add_walfile_to_list(char *path) wal_file_list = lappend(wal_file_list, wf); } -#if (defined PG_VERSION_13) || (defined PG_VERSION_14) || (defined PG_VERSION_15) +#if (defined PG_VERSION_13) || (defined PG_VERSION_14) || (defined PG_VERSION_15) || (defined PG_VERSION_16) static int wal_file_compare(const ListCell *v1,const ListCell *v2) { diff --git a/walminer/walminer.c b/walminer/walminer.c index 24e45ff..aa15279 100755 --- a/walminer/walminer.c +++ b/walminer/walminer.c @@ -152,6 +152,14 @@ pg_walminer(PG_FUNCTION_ARGS) { MemoryContext oldcontex = NULL; + if(!regression_mode) + { + elog(NOTICE, "==========================================="); + elog(NOTICE, "======OPEN SOURCE PROJECT WALMINER========="); + elog(NOTICE, "======Author mail lchch1990@sina.cn========"); + elog(NOTICE, "==========================================="); + } + pg_usleep(1000000L); if(!wal_file_list || !wdd.loaded) { self_load_dic_and_wal(); @@ -321,7 +329,7 @@ walminer_wal_list(PG_FUNCTION_ARGS) values[0] = wf->filepath; tuple = BuildTupleFromCStrings(funcctx->attinmeta, values); -#if (defined PG_VERSION_13) || (defined PG_VERSION_14) || (defined PG_VERSION_15) +#if (defined PG_VERSION_13) || (defined PG_VERSION_14) || (defined PG_VERSION_15) || (defined PG_VERSION_16) funcctx->user_fctx = lnext(wal_file_list, cell); #else funcctx->user_fctx = lnext(cell); diff --git a/walminer/walminer_decode.c b/walminer/walminer_decode.c index f6de183..64a50a2 100755 --- a/walminer/walminer_decode.c +++ b/walminer/walminer_decode.c @@ -50,6 +50,7 @@ #include "storage/standbydefs.h" #endif #include +#include "wm_compatible.h" typedef struct TxDiskHead { @@ -59,7 +60,7 @@ typedef struct TxDiskHead int new_len; int old_len; }TxDiskHead; -#if (defined PG_VERSION_14) || (defined PG_VERSION_15) +#if (defined PG_VERSION_14) || (defined PG_VERSION_15) || (defined PG_VERSION_16) static void walminer_decode_prune(WalRecordBuffer *buf); static void walminer_decode_vacuum(WalRecordBuffer *buf); #else @@ -471,7 +472,7 @@ walminer_decode_xact(WalRecordBuffer *buf) break; case XLOG_XACT_PREPARE: -#if (defined PG_VERSION_14) || (defined PG_VERSION_15) +#if (defined PG_VERSION_14) || (defined PG_VERSION_15) || (defined PG_VERSION_16) case XLOG_XACT_INVALIDATIONS: #endif /* 对解析过程无意义的wal记录无意义 */ @@ -525,7 +526,7 @@ walminer_decode_heap2(WalRecordBuffer *buf) * 目前walminer不需要这个信息 */ break; -#if (defined PG_VERSION_14) || (defined PG_VERSION_15) +#if (defined PG_VERSION_14) || (defined PG_VERSION_15) || (defined PG_VERSION_16) case XLOG_HEAP2_VACUUM: walminer_decode_vacuum(buf); break; @@ -755,7 +756,7 @@ reassemble_tuplenew_from_wal_data(char *data, Size len, ReorderBufferChange *cha ItemId id = NULL; walminer_debug("[reassemble_tuplenew_from_wal_data] have prefix or suffix"); - get_old_page= get_image_from_store(&change->data.tp.relnode, MAIN_FORKNUM, blknum_old, + get_old_page= get_image_from_store(GET_CHANGE_RELNODE(change), MAIN_FORKNUM, blknum_old, (char*)page_old, &pageindex_old); walminer_debug("[reassemble_tuplenew_from_wal_data] get_old_page=%d", get_old_page); /* 如果没有找到旧的page,那么就不解析这个update了 */ @@ -854,7 +855,7 @@ reassemble_tuplenew_from_wal_data(char *data, Size len, ReorderBufferChange *cha return true; } -#if (defined PG_VERSION_14) || (defined PG_VERSION_15) +#if (defined PG_VERSION_14) || (defined PG_VERSION_15) || (defined PG_VERSION_16) static void walminer_decode_vacuum(WalRecordBuffer *buf) { @@ -874,7 +875,7 @@ walminer_decode_vacuum(WalRecordBuffer *buf) xlrec = (xl_heap_vacuum *) XLogRecGetData(record); - XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno); + wm_XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno); if (!filter_in_decode(&rnode)) return; @@ -917,7 +918,7 @@ walminer_decode_vacuum(WalRecordBuffer *buf) #endif static void -#if (defined PG_VERSION_14) || (defined PG_VERSION_15) +#if (defined PG_VERSION_14) || (defined PG_VERSION_15) || (defined PG_VERSION_16) walminer_decode_prune(WalRecordBuffer *buf) #else walminer_decode_clean(WalRecordBuffer *buf) @@ -925,7 +926,7 @@ walminer_decode_clean(WalRecordBuffer *buf) { RelFileNode rnode; BlockNumber blkno = 0; -#if (defined PG_VERSION_14) || (defined PG_VERSION_15) +#if (defined PG_VERSION_14) || (defined PG_VERSION_15) || (defined PG_VERSION_16) xl_heap_prune *xlrec = NULL; #else xl_heap_clean *xlrec = NULL; @@ -938,13 +939,13 @@ walminer_decode_clean(WalRecordBuffer *buf) memset(&rnode, 0, sizeof(RelFileNode)); record = buf->record; -#if (defined PG_VERSION_14) || (defined PG_VERSION_15) +#if (defined PG_VERSION_14) || (defined PG_VERSION_15) || (defined PG_VERSION_16) xlrec = (xl_heap_prune *) XLogRecGetData(record); #else xlrec = (xl_heap_clean *) XLogRecGetData(record); #endif - XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno); + wm_XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno); if (!filter_in_decode(&rnode)) return; @@ -1007,7 +1008,7 @@ walminer_decode_mutiinsert(WalRecordBuffer *buf) xlrec = (xl_heap_multi_insert *) XLogRecGetData(r); - XLogRecGetBlockTag(r, 0, &rnode, NULL, &blknum); + wm_XLogRecGetBlockTag(r, 0, &rnode, NULL, &blknum); if (!filter_in_decode(&rnode)) return; @@ -1046,7 +1047,7 @@ walminer_decode_mutiinsert(WalRecordBuffer *buf) change = get_change_space(); change->action = REORDER_BUFFER_CHANGE_INSERT; change->origin_id = XLogRecGetOrigin(r); - memcpy(&change->data.tp.relnode, &rnode, sizeof(RelFileNode)); + memcpy(GET_CHANGE_RELNODE(change), &rnode, sizeof(RelFileNode)); if (has_image) { @@ -1142,7 +1143,7 @@ walminer_decode_insert(WalRecordBuffer *buf) xlrec = (xl_heap_insert *) XLogRecGetData(r); /* 不关心其他数据库 */ - XLogRecGetBlockTag(r, 0, &target_node, &forknum, &blknum); + wm_XLogRecGetBlockTag(r, 0, &target_node, &forknum, &blknum); if (!filter_in_decode(&target_node)) return; @@ -1152,7 +1153,7 @@ walminer_decode_insert(WalRecordBuffer *buf) else change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT; change->origin_id = XLogRecGetOrigin(r); - memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); + memcpy(GET_CHANGE_RELNODE(change), &target_node, sizeof(RelFileNode)); change->lsn = r->ReadRecPtr; xid = XLogRecGetXid(r); @@ -1253,7 +1254,7 @@ walminer_decode_delete(WalRecordBuffer *buf) xlrec = (xl_heap_delete *) XLogRecGetData(r); /* only interested in our database */ - XLogRecGetBlockTag(r, 0, &target_node, &forknum, &blknum); + wm_XLogRecGetBlockTag(r, 0, &target_node, &forknum, &blknum); if (!filter_in_decode(&target_node)) return; @@ -1261,7 +1262,7 @@ walminer_decode_delete(WalRecordBuffer *buf) change->action = REORDER_BUFFER_CHANGE_DELETE; change->origin_id = XLogRecGetOrigin(r); - memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); + memcpy(GET_CHANGE_RELNODE(change), &target_node, sizeof(RelFileNode)); change->lsn = r->ReadRecPtr; get_image = XLogRecHasBlockImage(r, 0); @@ -1368,11 +1369,11 @@ walminer_decode_update(WalRecordBuffer *buf) xlrec = (xl_heap_update *) XLogRecGetData(r); /* only interested in our database */ - XLogRecGetBlockTag(r, 0, &target_node, &forknum, &blknum_new); -#if (defined PG_VERSION_15) + wm_XLogRecGetBlockTag(r, 0, &target_node, &forknum, &blknum_new); +#if ((defined PG_VERSION_15) || (defined PG_VERSION_16)) if (!XLogRecGetBlockTagExtended(r, 1, NULL, NULL, &blknum_old, NULL)) #else - if (!XLogRecGetBlockTag(r, 1, NULL, NULL, &blknum_old)) + if (!wm_XLogRecGetBlockTag(r, 1, NULL, NULL, &blknum_old)) #endif { blknum_old = blknum_new; @@ -1387,7 +1388,8 @@ walminer_decode_update(WalRecordBuffer *buf) change = get_change_space(); change->action = REORDER_BUFFER_CHANGE_UPDATE; change->origin_id = XLogRecGetOrigin(r); - memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); + memcpy(GET_CHANGE_RELNODE(change), &target_node, sizeof(RelFileNode)); + change->lsn = r->ReadRecPtr; if (XLogRecHasBlockImage(r, 0)) @@ -1816,7 +1818,7 @@ decode_wal(void) first_record = prepare_read(false, InvalidXLogRecPtr); clean_image(); -#if (defined PG_VERSION_13) || (defined PG_VERSION_14) || (defined PG_VERSION_15) +#if (defined PG_VERSION_13) || (defined PG_VERSION_14) || (defined PG_VERSION_15) || (defined PG_VERSION_16) XLogBeginRead(walminer_decode_context->reader, first_record); #endif @@ -1828,7 +1830,7 @@ decode_wal(void) if(wdecoder.w_call_funcs.walminer_check_statue) if(!wdecoder.w_call_funcs.walminer_check_statue()) break; -#if (defined PG_VERSION_15) +#if ((defined PG_VERSION_15) || (defined PG_VERSION_16)) if(!walminer_decode_context->reader->record) #else if(!walminer_decode_context->reader->decoded_record) @@ -1840,5 +1842,6 @@ decode_wal(void) walminer_decode_record(walminer_decode_context->reader); } + walminer_debug("decode_wal end"); return NULL; } diff --git a/walminer/walminer_decode.h b/walminer/walminer_decode.h index bbe17ec..c27d372 100755 --- a/walminer/walminer_decode.h +++ b/walminer/walminer_decode.h @@ -11,6 +11,7 @@ #include "datadictionary.h" #include "walminer_contents.h" #include "access/xact.h" +#include "wm_compatible.h" #define DECODE_SQL_KIND_INSERT 1 #define DECODE_SQL_KIND_UPDATE 2 diff --git a/walminer/walreader.c b/walminer/walreader.c index 5f735a8..786f2fd 100755 --- a/walminer/walreader.c +++ b/walminer/walreader.c @@ -25,7 +25,7 @@ static ListCell* walminer_get_first_valid_walfile(WalminerPrivate *private); /* * 此为从src/backend/access/transam/xlogreader.c文件拷贝来的函数,没有改动 */ -#if (defined PG_VERSION_13) || (defined PG_VERSION_14)|| (defined PG_VERSION_15) +#if (defined PG_VERSION_13) || (defined PG_VERSION_14) || (defined PG_VERSION_15) || (defined PG_VERSION_16) static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) { @@ -312,7 +312,7 @@ WalFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) break; } } -#if (defined PG_VERSION_13) || (defined PG_VERSION_14) || (defined PG_VERSION_15) +#if (defined PG_VERSION_13) || (defined PG_VERSION_14) || (defined PG_VERSION_15) || (defined PG_VERSION_16) XLogBeginRead(state, tmpRecPtr); while (XLogReadRecord(state, &errormsg) != NULL) #else @@ -335,7 +335,7 @@ out: /* Reset state to what we had before finding the record */ state->ReadRecPtr = saved_state.ReadRecPtr; state->EndRecPtr = saved_state.EndRecPtr; -#if (defined PG_VERSION_13) || (defined PG_VERSION_14)|| (defined PG_VERSION_15) +#if (defined PG_VERSION_13) || (defined PG_VERSION_14)|| (defined PG_VERSION_15) || (defined PG_VERSION_16) state->seg.ws_segno = 0; state->segoff = 0; #else @@ -400,7 +400,7 @@ walminer_get_first_valid_walfile(WalminerPrivate *private) if(!inseg) { walminer_debug("Skip read walfile %s", filename); -#if (defined PG_VERSION_13) || (defined PG_VERSION_14)|| (defined PG_VERSION_15) +#if (defined PG_VERSION_13) || (defined PG_VERSION_14)|| (defined PG_VERSION_15) || (defined PG_VERSION_16) cell = lnext(wal_file_list, cell); #else cell = lnext(cell); @@ -496,7 +496,7 @@ walminer_wal_read(const char *dirtemp, TimeLineID timeline_id, #else *wal_seg_sz = XLogSegSize; #endif -#if (defined PG_VERSION_13) || (defined PG_VERSION_14)|| (defined PG_VERSION_15) +#if (defined PG_VERSION_13) || (defined PG_VERSION_14)|| (defined PG_VERSION_15) || (defined PG_VERSION_16) cell = lnext(wal_file_list, cell); #else cell = lnext(cell); @@ -596,7 +596,7 @@ walminer_wal_read(const char *dirtemp, TimeLineID timeline_id, } } -#if (defined PG_VERSION_13) || (defined PG_VERSION_14)|| (defined PG_VERSION_15) +#if (defined PG_VERSION_13) || (defined PG_VERSION_14)|| (defined PG_VERSION_15) || (defined PG_VERSION_16) static int Walminer_read_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetPtr, char *readBuff) @@ -680,7 +680,7 @@ prepare_read(bool issearch, XLogRecPtr point_end_lsn) { private_temp->endptr = point_end_lsn; } -#if (defined PG_VERSION_13) || (defined PG_VERSION_14)|| (defined PG_VERSION_15) +#if (defined PG_VERSION_13) || (defined PG_VERSION_14)|| (defined PG_VERSION_15) || (defined PG_VERSION_16) walminer_decode_context->reader_search = XLogReaderAllocate(get_wal_seg_size(wfs->filepath), NULL, XL_ROUTINE(.page_read = &Walminer_read_page), private_temp); @@ -702,7 +702,7 @@ prepare_read(bool issearch, XLogRecPtr point_end_lsn) swal = &walminer_decode_context->swpro; if(wdecoder.w_call_funcs.walminer_front_read) wdecoder.w_call_funcs.walminer_front_read(); -#if (defined PG_VERSION_13) || (defined PG_VERSION_14)|| (defined PG_VERSION_15) +#if (defined PG_VERSION_13) || (defined PG_VERSION_14)|| (defined PG_VERSION_15) || (defined PG_VERSION_16) walminer_decode_context->reader = XLogReaderAllocate(swal->wal_seg_size, NULL, XL_ROUTINE(.page_read = &Walminer_read_page), private_temp); @@ -737,7 +737,7 @@ get_next_record(XLogReaderState *reader ,void *private_temp ,XLogRecPtr first_re walminerprivate = (WalminerPrivate*)private_temp; if(walminerprivate->endptr_reached) return NULL; -#if (defined PG_VERSION_13) || (defined PG_VERSION_14)|| (defined PG_VERSION_15) +#if (defined PG_VERSION_13) || (defined PG_VERSION_14)|| (defined PG_VERSION_15) || (defined PG_VERSION_16) record = XLogReadRecord(reader, &errormsg); #else record = XLogReadRecord(reader, first_record, &errormsg); diff --git a/walminer/wm_compatible.c b/walminer/wm_compatible.c new file mode 100755 index 0000000..11c1236 --- /dev/null +++ b/walminer/wm_compatible.c @@ -0,0 +1,20 @@ +/*------------------------------------------------------------------------- + * + * IDENTIFICATION + * wm_compatible.c + * + *------------------------------------------------------------------------- + */ +#include "wm_compatible.h" + +void +wm_XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id, + RelFileNode *rlocator, ForkNumber *forknum, + BlockNumber *blknum) +{ +#if (defined PG_VERSION_16) + XLogRecGetBlockTag(record, block_id, (RelFileLocator*)rlocator, forknum, blknum); +#else + XLogRecGetBlockTag(record, block_id, rlocator, forknum, blknum); +#endif +} \ No newline at end of file diff --git a/walminer/wm_compatible.h b/walminer/wm_compatible.h new file mode 100755 index 0000000..1813aef --- /dev/null +++ b/walminer/wm_compatible.h @@ -0,0 +1,40 @@ +/*------------------------------------------------------------------------- + * + * IDENTIFICATION + * wm_compatible.h + * + *------------------------------------------------------------------------- + */ +#ifndef WM_COMPATIBLE_H +#define WM_COMPATIBLE_H 1 + +#include "postgres.h" +#include "access/xlogreader.h" +#include "storage/block.h" + +#if (defined PG_VERSION_16) +#include "storage/relfilelocator.h" +#endif + +#if (defined PG_VERSION_16) +typedef struct RelFileNode +{ + Oid spcNode; /* tablespace */ + Oid dbNode; /* database */ + Oid relNode; /* relation */ +} RelFileNode; +#endif + +#if (defined PG_VERSION_16) +#define GET_CHANGE_RELNODE(change) ((RelFileNode*)&change->data.tp.rlocator) +#else +#define GET_CHANGE_RELNODE(change) ((RelFileNode*)&change->data.tp.relnode) +#endif + +void +wm_XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id, + RelFileNode *rlocator, ForkNumber *forknum, + BlockNumber *blknum); + + +#endif \ No newline at end of file diff --git a/walminer/wm_utils.h b/walminer/wm_utils.h index ffc997d..cd6b572 100755 --- a/walminer/wm_utils.h +++ b/walminer/wm_utils.h @@ -10,6 +10,7 @@ #include "postgres.h" #include "walminer_decode.h" +#include "wm_compatible.h" extern bool debug_mode; -- Gitee