RELATEED CONSULTING
相关咨询
选择下列产品马上在线沟通
服务时间:8:30-17:00
你可能遇到了下面的问题
关闭右侧工具栏

新闻中心

这里有您想知道的互联网营销解决方案
PostgreSQL中函数AssignTransactionId的实现逻辑是什么

本篇内容介绍了“PostgreSQL中函数AssignTransactionId的实现逻辑是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

成都创新互联长期为数千家客户提供的网站建设服务,团队从业经验10年,关注不同地域、不同群体,并针对不同对象提供差异化的产品和服务;打造开放共赢平台,与合作伙伴共同营造健康的互联网生态环境。为忻城企业提供专业的做网站、网站设计,忻城网站改版等技术服务。拥有十多年丰富建站经验和众多成功案例,为您定制开发。

一、数据结构

静态变量
当前事务状态CurrentTransactionState

/*
 * CurrentTransactionState always points to the current transaction state
 * block.  It will point to TopTransactionStateData when not in a
 * transaction at all, or when in a top-level transaction.
 * CurrentTransactionState通常指向当前事务块.
 * 如不处于事务中或者处于顶层事务中,则指向TopTransactionStateData
 */
static TransactionStateData TopTransactionStateData = {
    .state = TRANS_DEFAULT,
    .blockState = TBLOCK_DEFAULT,
};
/*
 * unreportedXids holds XIDs of all subtransactions that have not yet been
 * reported in an XLOG_XACT_ASSIGNMENT record.
 * unreportedXids保存所有尚未在XLOG_XACT_ASSIGNMENT记录的子事务.
 */
static int  nUnreportedXids;
static TransactionId unreportedXids[PGPROC_MAX_CACHED_SUBXIDS];
static TransactionState CurrentTransactionState = &TopTransactionStateData;
/*
 * The subtransaction ID and command ID assignment counters are global
 * to a whole transaction, so we do not keep them in the state stack.
 * subtransaction ID和command ID全局计数器,对事务可见,在state栈中不记录这些信息.
 */
static SubTransactionId currentSubTransactionId;
static CommandId currentCommandId;
static bool currentCommandIdUsed;

TransactionState
事务状态结构体

/*
 *  transaction states - transaction state from server perspective
 *  事务状态枚举 - 服务器视角的事务状态
 */
typedef enum TransState
{
    TRANS_DEFAULT,              /* idle 空闲 */
    TRANS_START,                /* transaction starting 事务启动 */
    TRANS_INPROGRESS,           /* inside a valid transaction 进行中 */
    TRANS_COMMIT,               /* commit in progress 提交中 */
    TRANS_ABORT,                /* abort in progress 回滚中 */
    TRANS_PREPARE               /* prepare in progress 准备中 */
} TransState;
/*
 *  transaction block states - transaction state of client queries
 *  事务块状态 - 客户端查询的事务状态
 *
 * Note: the subtransaction states are used only for non-topmost
 * transactions; the others appear only in the topmost transaction.
 * 注意:subtransaction只用于非顶层事务;其他字段用于顶层事务.
 */
typedef enum TBlockState
{
    /* not-in-transaction-block states 未进入事务块状态 */
    TBLOCK_DEFAULT,             /* idle 空闲  */
    TBLOCK_STARTED,             /* running single-query transaction 单个查询事务 */
    /* transaction block states 事务块状态 */
    TBLOCK_BEGIN,               /* starting transaction block 开始事务块 */
    TBLOCK_INPROGRESS,          /* live transaction 进行中 */
    TBLOCK_IMPLICIT_INPROGRESS, /* live transaction after implicit BEGIN 隐式事务,进行中 */
    TBLOCK_PARALLEL_INPROGRESS, /* live transaction inside parallel worker 并行worker中的事务,进行中 */
    TBLOCK_END,                 /* COMMIT received 接收到COMMIT */
    TBLOCK_ABORT,               /* failed xact, awaiting ROLLBACK 失败,等待ROLLBACK */
    TBLOCK_ABORT_END,           /* failed xact, ROLLBACK received 失败,已接收ROLLBACK */
    TBLOCK_ABORT_PENDING,       /* live xact, ROLLBACK received 进行中,接收到ROLLBACK */
    TBLOCK_PREPARE,             /* live xact, PREPARE received 进行中,接收到PREPARE */
    /* subtransaction states 子事务状态 */
    TBLOCK_SUBBEGIN,            /* starting a subtransaction 开启 */
    TBLOCK_SUBINPROGRESS,       /* live subtransaction 进行中 */
    TBLOCK_SUBRELEASE,          /* RELEASE received 接收到RELEASE */
    TBLOCK_SUBCOMMIT,           /* COMMIT received while TBLOCK_SUBINPROGRESS 进行中,接收到COMMIT */
    TBLOCK_SUBABORT,            /* failed subxact, awaiting ROLLBACK 失败,等待ROLLBACK */
    TBLOCK_SUBABORT_END,        /* failed subxact, ROLLBACK received 失败,已接收ROLLBACK */
    TBLOCK_SUBABORT_PENDING,    /* live subxact, ROLLBACK received 进行中,接收到ROLLBACK */
    TBLOCK_SUBRESTART,          /* live subxact, ROLLBACK TO received 进行中,接收到ROLLBACK TO */
    TBLOCK_SUBABORT_RESTART     /* failed subxact, ROLLBACK TO received 失败,已接收ROLLBACK TO */
} TBlockState;
/*
 *  transaction state structure
 *  事务状态结构体
 */
typedef struct TransactionStateData
{
    //事务ID
    TransactionId transactionId;    /* my XID, or Invalid if none */
    //子事务ID
    SubTransactionId subTransactionId;  /* my subxact ID */
    //保存点名称
    char       *name;           /* savepoint name, if any */
    //保存点级别
    int         savepointLevel; /* savepoint level */
    //低级别的事务状态
    TransState  state;          /* low-level state */
    //高级别的事务状态
    TBlockState blockState;     /* high-level state */
    //事务嵌套深度
    int         nestingLevel;   /* transaction nesting depth */
    //GUC上下文嵌套深度
    int         gucNestLevel;   /* GUC context nesting depth */
    //事务生命周期上下文
    MemoryContext curTransactionContext;    /* my xact-lifetime context */
    //查询资源
    ResourceOwner curTransactionOwner;  /* my query resources */
    //按XID顺序保存的已提交的子事务ID
    TransactionId *childXids;   /* subcommitted child XIDs, in XID order */
    //childXids数组大小
    int         nChildXids;     /* # of subcommitted child XIDs */
    //分配的childXids数组空间
    int         maxChildXids;   /* allocated size of childXids[] */
    //上一个CurrentUserId
    Oid         prevUser;       /* previous CurrentUserId setting */
    //上一个SecurityRestrictionContext
    int         prevSecContext; /* previous SecurityRestrictionContext */
    //上一事务是否只读?
    bool        prevXactReadOnly;   /* entry-time xact r/o state */
    //是否处于Recovery?
    bool        startedInRecovery;  /* did we start in recovery? */
    //XID是否已保存在WAL Record中?
    bool        didLogXid;      /* has xid been included in WAL record? */
    //Enter/ExitParallelMode计数器
    int         parallelModeLevel;  /* Enter/ExitParallelMode counter */
    //父事务状态
    struct TransactionStateData *parent;    /* back link to parent */
} TransactionStateData;
//结构体指针
typedef TransactionStateData *TransactionState;

二、源码解读

AssignTransactionId函数,给定的TransactionState分配一个新的持久化事务号XID,在此函数调用前,不会为事务分配XIDs.

/*
 * AssignTransactionId
 *
 * Assigns a new permanent XID to the given TransactionState.
 * We do not assign XIDs to transactions until/unless this is called.
 * Also, any parent TransactionStates that don't yet have XIDs are assigned
 * one; this maintains the invariant that a child transaction has an XID
 * following its parent's.
 * 为给定的TransactionState分配一个新的持久化事务号XID.
 * 在此函数调用前,我们不会为事务分配XIDs.
 * 同时,所有尚未获得XIDs的父TransactionStates也会分配事务号,
 *   这可以确保子事务的事务号在父事务之后.
 */
static void
AssignTransactionId(TransactionState s)
{
    bool        isSubXact = (s->parent != NULL);
    ResourceOwner currentOwner;
    bool        log_unknown_top = false;
    /* Assert that caller didn't screw up */
    //确保调用者没有搞砸
    Assert(!TransactionIdIsValid(s->transactionId));
    Assert(s->state == TRANS_INPROGRESS);
    /*
     * Workers synchronize transaction state at the beginning of each parallel
     * operation, so we can't account for new XIDs at this point.
     * 在每个并行操作前,Parallel Workers同步事务状态,
     *   因此我们不能在这时候请求XIDs
     */
    if (IsInParallelMode() || IsParallelWorker())
        elog(ERROR, "cannot assign XIDs during a parallel operation");
    /*
     * Ensure parent(s) have XIDs, so that a child always has an XID later
     * than its parent.  Mustn't recurse here, or we might get a stack overflow
     * if we're at the bottom of a huge stack of subtransactions none of which
     * have XIDs yet.
     * 确保父事务已分配XIDs,这样可以确保子事务的XID后于父事务.
     * 不能在这里递归执行,否则如果我们在一个未分配XID子事务大栈的底部,可能会遇到栈溢出
     */
    if (isSubXact && !TransactionIdIsValid(s->parent->transactionId))
    {
        TransactionState p = s->parent;
        TransactionState *parents;
        size_t      parentOffset = 0;
        parents = palloc(sizeof(TransactionState) * s->nestingLevel);
        while (p != NULL && !TransactionIdIsValid(p->transactionId))
        {
            parents[parentOffset++] = p;
            p = p->parent;
        }
        /*
         * This is technically a recursive call, but the recursion will never
         * be more than one layer deep.
         * 递归调用,但递归不会超过一层
         */
        while (parentOffset != 0)
            AssignTransactionId(parents[--parentOffset]);
        pfree(parents);
    }
    /*
     * When wal_level=logical, guarantee that a subtransaction's xid can only
     * be seen in the WAL stream if its toplevel xid has been logged before.
     * If necessary we log an xact_assignment record with fewer than
     * PGPROC_MAX_CACHED_SUBXIDS. Note that it is fine if didLogXid isn't set
     * for a transaction even though it appears in a WAL record, we just might
     * superfluously log something. That can happen when an xid is included
     * somewhere inside a wal record, but not in XLogRecord->xl_xid, like in
     * xl_standby_locks.
     * 如wal_level=logical,确保子事务XID在顶层XID已被"日志"的情况只可以被WAL stream看到.
     * 如果有必要,我们将使用小于PGPROC_MAX_CACHED_SUBXIDS的值来记录xact_assignment记录.
     * 请注意,即使didLogXid出现在WAL记录中,如果它没有为事务设置,也没有问题,
     *   我们可能只是多余地记录了一些东西。
     * 当一个xid出现在WAL Record中的某个地方,但不在XLogRecord->xl_xid中
     *   (比如在xl_standby_locks中)时,就会发生这种情况。
     */
    if (isSubXact && XLogLogicalInfoActive() &&
        !TopTransactionStateData.didLogXid)
        log_unknown_top = true;
    /*
     * Generate a new Xid and record it in PG_PROC and pg_subtrans.
     * 生成一个新的XID,并记录在PG_PROC和pg_subtrans中
     *
     * NB: we must make the subtrans entry BEFORE the Xid appears anywhere in
     * shared storage other than PG_PROC; because if there's no room for it in
     * PG_PROC, the subtrans entry is needed to ensure that other backends see
     * the Xid as "running".  See GetNewTransactionId.
     * 注意:我们必须在Xid出现在除PG_PROC之外的共享存储之前构造subtrans条目.
     * 因为如果在PG_PROC没有空闲空间,子事务条目需要确保其他进程看到该XID正在运行.
     * 参考函数GetNewTransactionId说明.
     * 
     */
    s->transactionId = GetNewTransactionId(isSubXact);
    if (!isSubXact)
        XactTopTransactionId = s->transactionId;
    if (isSubXact)
        SubTransSetParent(s->transactionId, s->parent->transactionId);
    /*
     * If it's a top-level transaction, the predicate locking system needs to
     * be told about it too.
     * 如为顶层事务,谓词锁系统也需要了解此事务.
     */
    if (!isSubXact)
        RegisterPredicateLockingXid(s->transactionId);
    /*
     * Acquire lock on the transaction XID.  (We assume this cannot block.) We
     * have to ensure that the lock is assigned to the transaction's own
     * ResourceOwner.
     * 请求锁(我们假定这样做不好引起阻塞).我们必须确保锁已被分配给事务自己的ResourceOwner.
     */
    currentOwner = CurrentResourceOwner;
    CurrentResourceOwner = s->curTransactionOwner;
    XactLockTableInsert(s->transactionId);
    CurrentResourceOwner = currentOwner;
    /*
     * Every PGPROC_MAX_CACHED_SUBXIDS assigned transaction ids within each
     * top-level transaction we issue a WAL record for the assignment. We
     * include the top-level xid and all the subxids that have not yet been
     * reported using XLOG_XACT_ASSIGNMENT records.
     * 在每个顶级事务中分配的每个PGPROC_MAX_CACHED_SUBXIDS事务id,
     *   我们都会为分配记录一条WAL记录。
     * 该记录包括顶级的xid和所有尚未使用XLOG_XACT_ASSIGNMENT记录报告的子xid。
     *
     * This is required to limit the amount of shared memory required in a hot
     * standby server to keep track of in-progress XIDs. See notes for
     * RecordKnownAssignedTransactionIds().
     * 在跟踪进行中的XIDs的备机上,需要控制共享内存的大小.
     * 参见RecordKnownAssignedTransactionIds()函数说明.
     *
     * We don't keep track of the immediate parent of each subxid, only the
     * top-level transaction that each subxact belongs to. This is correct in
     * recovery only because aborted subtransactions are separately WAL
     * logged.
     * 我们不需要跟踪父事务的每个子事务,只需要跟踪子事务归属的顶层事务即可.
     * 这样可行是因为在恢复中,已回滚的子事务是通过WAL单独记录的.
     *
     * This is correct even for the case where several levels above us didn't
     * have an xid assigned as we recursed up to them beforehand.
     * 即使在我们之前递归到上面的几个级别时没有分配xid的情况下,这也是正确的。
     */
    if (isSubXact && XLogStandbyInfoActive())
    {
        unreportedXids[nUnreportedXids] = s->transactionId;
        nUnreportedXids++;
        /*
         * ensure this test matches similar one in
         * RecoverPreparedTransactions()
         * 确保在RecoverPreparedTransactions()中可以匹配到相似的.
         */
        if (nUnreportedXids >= PGPROC_MAX_CACHED_SUBXIDS ||
            log_unknown_top)
        {
            xl_xact_assignment xlrec;
            /*
             * xtop is always set by now because we recurse up transaction
             * stack to the highest unassigned xid and then come back down
             * xtop现在已经设置好了,因为我们将事务堆栈递归到最高的未分配的xid,然后再返回
             */
            xlrec.xtop = GetTopTransactionId();
            Assert(TransactionIdIsValid(xlrec.xtop));
            xlrec.nsubxacts = nUnreportedXids;
            XLogBeginInsert();
            XLogRegisterData((char *) &xlrec, MinSizeOfXactAssignment);
            XLogRegisterData((char *) unreportedXids,
                             nUnreportedXids * sizeof(TransactionId));
            (void) XLogInsert(RM_XACT_ID, XLOG_XACT_ASSIGNMENT);
            nUnreportedXids = 0;
            /* mark top, not current xact as having been logged */
            //标记为最顶层,而不是当前已记录日志的xact
            TopTransactionStateData.didLogXid = true;
        }
    }
}

三、跟踪分析

执行txid_current,触发函数调用

11:10:36 (xdb@[local]:5432)testdb=# begin;
BEGIN
11:40:20 (xdb@[local]:5432)testdb=#* select txid_current_if_assigned();
 txid_current_if_assigned 
--------------------------
(1 row)
11:40:43 (xdb@[local]:5432)testdb=#* select txid_current();

启动gdb,设置断点

(gdb) b AssignTransactionId
Breakpoint 5 at 0x546a4c: file xact.c, line 491.
(gdb) c
Continuing.
Breakpoint 5, AssignTransactionId (s=0xf9c720 ) at xact.c:491
491     bool        isSubXact = (s->parent != NULL);
(gdb)

查看调用栈

(gdb) bt
#0  AssignTransactionId (s=0xf9c720 ) at xact.c:491
#1  0x000000000054693d in GetTopTransactionId () at xact.c:392
#2  0x00000000009fe1f3 in txid_current (fcinfo=0x25835a0) at txid.c:443
#3  0x00000000006cfebd in ExecInterpExpr (state=0x25834b8, econtext=0x25831a8, isnull=0x7ffe3d4a31f7)
    at execExprInterp.c:654
#4  0x00000000006d1ac6 in ExecInterpExprStillValid (state=0x25834b8, econtext=0x25831a8, isNull=0x7ffe3d4a31f7)
    at execExprInterp.c:1786
#5  0x00000000007140dd in ExecEvalExprSwitchContext (state=0x25834b8, econtext=0x25831a8, isNull=0x7ffe3d4a31f7)
    at ../../../src/include/executor/executor.h:303
#6  0x000000000071414b in ExecProject (projInfo=0x25834b0) at ../../../src/include/executor/executor.h:337
#7  0x0000000000714323 in ExecResult (pstate=0x2583090) at nodeResult.c:136
#8  0x00000000006e4c30 in ExecProcNodeFirst (node=0x2583090) at execProcnode.c:445
#9  0x00000000006d9974 in ExecProcNode (node=0x2583090) at ../../../src/include/executor/executor.h:237
#10 0x00000000006dc22d in ExecutePlan (estate=0x2582e78, planstate=0x2583090, use_parallel_mode=false, 
    operation=CMD_SELECT, sendTuples=true, numberTuples=0, direction=ForwardScanDirection, dest=0x24cd0a0, 
    execute_once=true) at execMain.c:1723
#11 0x00000000006d9f5c in standard_ExecutorRun (queryDesc=0x256b0c8, direction=ForwardScanDirection, count=0, 
    execute_once=true) at execMain.c:364
#12 0x00000000006d9d7f in ExecutorRun (queryDesc=0x256b0c8, direction=ForwardScanDirection, count=0, execute_once=true)
    at execMain.c:307
#13 0x00000000008ccf5a in PortalRunSelect (portal=0x250c748, forward=true, count=0, dest=0x24cd0a0) at pquery.c:932
#14 0x00000000008ccbf3 in PortalRun (portal=0x250c748, count=9223372036854775807, isTopLevel=true, run_once=true, 
    dest=0x24cd0a0, altdest=0x24cd0a0, completionTag=0x7ffe3d4a3570 "") at pquery.c:773
#15 0x00000000008c6b1e in exec_simple_query (query_string=0x24a6ec8 "select txid_current();") at postgres.c:1145
#16 0x00000000008cae70 in PostgresMain (argc=1, argv=0x24d2dc8, dbname=0x24d2c30 "testdb", username=0x24a3ba8 "xdb")
    at postgres.c:4182
    #17 0x000000000082642b in BackendRun (port=0x24c8c00) at postmaster.c:4361
---Type  to continue, or q  to quit---
#18 0x0000000000825b8f in BackendStartup (port=0x24c8c00) at postmaster.c:4033
#19 0x0000000000821f1c in ServerLoop () at postmaster.c:1706
#20 0x00000000008217b4 in PostmasterMain (argc=1, argv=0x24a1b60) at postmaster.c:1379
#21 0x00000000007488ef in main (argc=1, argv=0x24a1b60) at main.c:228

输入参数TransactionState(全局变量,指向TopTransactionStateData)

(gdb) p s
$13 = (TransactionState) 0xf9c720 
(gdb) p *s
$14 = {transactionId = 0, subTransactionId = 1, name = 0x0, savepointLevel = 0, state = TRANS_INPROGRESS, 
  blockState = TBLOCK_INPROGRESS, nestingLevel = 1, gucNestLevel = 1, curTransactionContext = 0x2523850, 
  curTransactionOwner = 0x24d4868, childXids = 0x0, nChildXids = 0, maxChildXids = 0, prevUser = 10, prevSecContext = 0, 
  prevXactReadOnly = false, startedInRecovery = false, didLogXid = false, parallelModeLevel = 0, parent = 0x0}
(gdb)

初始化部分变量并验证

(gdb) n
493     bool        log_unknown_top = false;
(gdb) 
496     Assert(!TransactionIdIsValid(s->transactionId));
(gdb) 
497     Assert(s->state == TRANS_INPROGRESS);
(gdb)

获取事务号

(gdb) 
503     if (IsInParallelMode() || IsParallelWorker())
(gdb) 
512     if (isSubXact && !TransactionIdIsValid(s->parent->transactionId))
(gdb) 
545     if (isSubXact && XLogLogicalInfoActive() &&
(gdb) 
557     s->transactionId = GetNewTransactionId(isSubXact);
(gdb) 
558     if (!isSubXact)
(gdb) p s->transactionId
$15 = 2407
(gdb)

注册,并设置其他信息

(gdb) n
559         XactTopTransactionId = s->transactionId;
(gdb) 
561     if (isSubXact)
(gdb) 
568     if (!isSubXact)
(gdb) 
569         RegisterPredicateLockingXid(s->transactionId);
(gdb) 
576     currentOwner = CurrentResourceOwner;
(gdb) 
577     CurrentResourceOwner = s->curTransactionOwner;
(gdb) 
579     XactLockTableInsert(s->transactionId);
(gdb) 
581     CurrentResourceOwner = currentOwner;
(gdb) 
601     if (isSubXact && XLogStandbyInfoActive())
(gdb) 
635 }

完成调用

(gdb) 
GetTopTransactionId () at xact.c:393
393     return XactTopTransactionId;
(gdb)

“PostgreSQL中函数AssignTransactionId的实现逻辑是什么”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注创新互联网站,小编将为大家输出更多高质量的实用文章!


当前题目:PostgreSQL中函数AssignTransactionId的实现逻辑是什么
网站地址:http://scpingwu.com/article/pjjecp.html