博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【Postgresql源码分析之二】同步复制源码分析
阅读量:4282 次
发布时间:2019-05-27

本文共 3003 字,大约阅读时间需要 10 分钟。

前面说明了pg中同步复制的设置方法,主要是设置参数synchronous_commit和synchronous_standby_names,下面介绍同步复制的代码实现流程(这里先介绍PG9.6之前的同步复制方式,即只支持一个同步备机)。

    同步复制的源代码在pg中的位置为src/backend/replication/syncrep.c,该程序中包含的函数主要分为三部分:

  •   提供给后台程序(Synchronous Replication functions for normal user backends)
  •   提供给walsender进程(Synchronous Replication functions for wal sender processes)
  •   提供给其它程序使用(Synchronous Replication functions executed by any process)
      其中第三部分比较简单,主要包括下面两个函数check_synchronous_standby_names和assign_synchronous_commit,从名字中可以看出,分别是检查设置的
synchronous_standby_names参数是否合法以及赋值参数synchronous_commit,这里不做详细说明。

后台函数

后台应用程序在提交或者回滚事物,在返回结果给客户端之前,会调用函数SyncRepWaitForLSN,在该函数中会等待备机的响应的消息,否则一直等待。
        函数实现流程如下:
步骤1:检查是否开启同步复制模式
if (!SyncRepRequested() || !SyncStandbysDefined())		return;
步骤2:判断全局变量中sync_standbys_defined是否定义,该变量表示是否设置全局变量,在checkpoint进程中更新该字段, 同时检查当前的LSN与全局变量中记录的等待LSN之间的关系,如果提交的LSN小于或者等于记录的等待等待提交LSN,表明已经接收到备机的响应,无须等待
if (!WalSndCtl->sync_standbys_defined ||		XLByteLE(XactCommitLSN, WalSndCtl->lsn[mode]))	{		LWLockRelease(SyncRepLock);		return;	}
步骤3:更新等待的LSN号以及等待状态
MyProc->waitLSN = XactCommitLSN;	MyProc->syncRepState = SYNC_REP_WAITING;	SyncRepQueueInsert(mode);
步骤4:一直等待同步复制状态的更新,如果为SYNC_REP_WAIT_COMPLETE,表示备机已经作出响应,在备机上提交完成
if (syncRepState == SYNC_REP_WAIT_COMPLETE)			break;
     从代码中可以看出,在同步复制模式下,主机上产生事物日志后,在返回给客户端消息之前会等待备机的消息响应,判断进程的同步复制状态,直到其变为SYNC_REP_WAIT_COMPLETE,表示同步提交在备机上完成。
那么该状态的更新是通过walsender函数SyncRepReleaseWaiters完成的。下面介绍该函数的实现方式。

Walsender进程函数

SyncRepReleaseWaiters函数表示释放同步复制中的等待者,该函数由walsender进程调用,且只在walsender进程处理walreceiver进程回应消息时候调用(ProcessStandbyReplyMessage函数)。
SyncRepReleaseWaiters实现步骤主要如下:
step1:检查是否需要等待备机提交
if (MyWalSnd->sync_standby_priority == 0 ||		MyWalSnd->state < WALSNDSTATE_STREAMING ||		XLogRecPtrIsInvalid(MyWalSnd->flush))		return;
step2:判断是否为同步复制进程(PG9.6之前的版本只支持一个同步备)
for (i = 0; i < max_wal_senders; i++)	{		/* use volatile pointer to prevent code rearrangement */		volatile WalSnd *walsnd = &walsndctl->walsnds[i];		if (walsnd->pid != 0 &&			walsnd->state == WALSNDSTATE_STREAMING &&			walsnd->sync_standby_priority > 0 &&			(priority == 0 ||			 priority > walsnd->sync_standby_priority) &&			!XLogRecPtrIsInvalid(walsnd->flush))		{			priority = walsnd->sync_standby_priority;			syncWalSnd = walsnd;		}	}
对所有的walsender进程循环判断,找到最高优先级对应的walsender进程。然后判断当前进程是否为同步复制
if (syncWalSnd != MyWalSnd)	{		LWLockRelease(SyncRepLock);		announce_next_takeover = true;		return;	}
step3:如果Wie同步复制模式,唤醒等待的队列
/*	 * Set the lsn first so that when we wake backends they will release up to	 * this location.	 */	if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_WRITE], MyWalSnd->write))	{		walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write;		numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);	}	if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_FLUSH], MyWalSnd->flush))	{		walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;		numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);	}
在唤醒同步等待队列函数SyncRepWakeQueue中,改变同步复制的状态。
至此,在同步复制模式下,后台进程在提交事物后将结果返回给客户端前会等待备机上的回应消息,当备机回应消息后,walsender进程唤醒等待队列,从而返回结果给客户端,不阻塞其它事物的操作。

转载地址:http://xibgi.baihongyu.com/

你可能感兴趣的文章
kafka消费者报错:Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
查看>>
(转)Spark与Hadoop的shuffle的异同
查看>>
(转)Redis 持久化之RDB和AOF
查看>>
Redis创建集群报错:[ERR] Sorry, can't connect to node 192.168.0.9:6380
查看>>
Redis(4):集群搭建和java连接
查看>>
pom报错:Element' dependency' cannot have character [ children], because the type's content type is e
查看>>
(转)面试必备:HashMap、Hashtable、ConcurrentHashMap的原理与区别
查看>>
CDH5.15.2替换JDK1.7到1.8
查看>>
JAVA多线程(9):多线程依次打印ABC
查看>>
(转)github在git push之后不记录Contributions
查看>>
(转)IDEA导入Git项目后无Git选项
查看>>
Tomcat的GC优化实践
查看>>
idea多模块项目间通过配置pom.xml相互引用
查看>>
(转)MYSQL如何设置大小写敏感
查看>>
SpringBoot单元测试,无法导入@RunWith
查看>>
(转)hbase balance命令走过的坑
查看>>
Linux环境cpu过高,定位问题步骤(附实例)
查看>>
(转)java final关键字使用及面试题重点
查看>>
(转)CDH下集成spark2.2.0与kafka(四十一):在spark+kafka流处理程序中抛出错误java.lang.NoSuchMethodError:
查看>>
(转)maven打包时跳过测试
查看>>