代码拉取完成,页面将自动刷新
From 06216228c3af8998940bdb175ef9c80a83b64d17 Mon Sep 17 00:00:00 2001
From: Lucas Rodriguez <[email protected]>
Date: Mon, 9 Sep 2024 12:40:10 -0500
Subject: [PATCH] Backport TestLessorRenewExtendPileup race condition fix for
release-3.4
Signed-off-by: Lucas Rodriguez <[email protected]>
---
lease/lessor.go | 21 ++++++++++++++++-----
lease/lessor_test.go | 19 +++++++++----------
2 files changed, 25 insertions(+), 15 deletions(-)
diff --git a/lease/lessor.go b/lease/lessor.go
index b16099f..a9fdd6c 100644
--- a/lease/lessor.go
+++ b/lease/lessor.go
@@ -41,8 +41,8 @@ var (
leaseBucketName = []byte("lease")
- // maximum number of leases to revoke per second; configurable for tests
- leaseRevokeRate = 1000
+ // default number of leases to revoke per second; configurable for tests
+ defaultLeaseRevokeRate = 1000
// maximum number of lease checkpoints recorded to the consensus log per second; configurable for tests
leaseCheckpointRate = 1000
@@ -169,6 +169,9 @@ type lessor struct {
// requests for shorter TTLs are extended to the minimum TTL.
minLeaseTTL int64
+ // maximum number of leases to revoke per second
+ leaseRevokeRate int
+
expiredC chan []*Lease
// stopC is a channel whose closure indicates that the lessor should be stopped.
stopC chan struct{}
@@ -187,6 +190,8 @@ type LessorConfig struct {
MinLeaseTTL int64
CheckpointInterval time.Duration
ExpiredLeasesRetryInterval time.Duration
+
+ leaseRevokeRate int
}
func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor {
@@ -196,12 +201,16 @@ func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor {
func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
checkpointInterval := cfg.CheckpointInterval
expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval
+ leaseRevokeRate := cfg.leaseRevokeRate
if checkpointInterval == 0 {
checkpointInterval = defaultLeaseCheckpointInterval
}
if expiredLeaseRetryInterval == 0 {
expiredLeaseRetryInterval = defaultExpiredleaseRetryInterval
}
+ if leaseRevokeRate == 0 {
+ leaseRevokeRate = defaultLeaseRevokeRate
+ }
l := &lessor{
leaseMap: make(map[LeaseID]*Lease),
itemMap: make(map[LeaseItem]LeaseID),
@@ -209,6 +218,7 @@ func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
leaseCheckpointHeap: make(LeaseQueue, 0),
b: b,
minLeaseTTL: cfg.MinLeaseTTL,
+ leaseRevokeRate: leaseRevokeRate,
checkpointInterval: checkpointInterval,
expiredLeaseRetryInterval: expiredLeaseRetryInterval,
// expiredC is a small buffered chan to avoid unnecessary blocking.
@@ -449,7 +459,7 @@ func (le *lessor) Promote(extend time.Duration) {
le.leaseExpiredNotifier.RegisterOrUpdate(item)
}
- if len(le.leaseMap) < leaseRevokeRate {
+ if len(le.leaseMap) < le.leaseRevokeRate {
// no possibility of lease pile-up
return
}
@@ -463,7 +473,7 @@ func (le *lessor) Promote(extend time.Duration) {
expires := 0
// have fewer expires than the total revoke rate so piled up leases
// don't consume the entire revoke limit
- targetExpiresPerSecond := (3 * leaseRevokeRate) / 4
+ targetExpiresPerSecond := (3 * le.leaseRevokeRate) / 4
for _, l := range leases {
remaining := l.Remaining()
if remaining > nextWindow {
@@ -487,6 +497,7 @@ func (le *lessor) Promote(extend time.Duration) {
le.leaseExpiredNotifier.RegisterOrUpdate(item)
le.scheduleCheckpointIfNeeded(l)
}
+
}
type leasesByExpiry []*Lease
@@ -602,7 +613,7 @@ func (le *lessor) revokeExpiredLeases() {
var ls []*Lease
// rate limit
- revokeLimit := leaseRevokeRate / 2
+ revokeLimit := le.leaseRevokeRate / 2
le.mu.RLock()
if le.isPrimary() {
diff --git a/lease/lessor_test.go b/lease/lessor_test.go
index c79c32e..66497c7 100644
--- a/lease/lessor_test.go
+++ b/lease/lessor_test.go
@@ -283,17 +283,16 @@ func TestLessorRenewWithCheckpointer(t *testing.T) {
// TestLessorRenewExtendPileup ensures Lessor extends leases on promotion if too many
// expire at the same time.
func TestLessorRenewExtendPileup(t *testing.T) {
- oldRevokeRate := leaseRevokeRate
- defer func() { leaseRevokeRate = oldRevokeRate }()
+ leaseRevokeRate := 10
lg := zap.NewNop()
- leaseRevokeRate = 10
+
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
- le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
+ le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL,leaseRevokeRate: leaseRevokeRate})
ttl := int64(10)
- for i := 1; i <= leaseRevokeRate*10; i++ {
+ for i := 1; i <= le.leaseRevokeRate*10; i++ {
if _, err := le.Grant(LeaseID(2*i), ttl); err != nil {
t.Fatal(err)
}
@@ -310,7 +309,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {
bcfg.Path = filepath.Join(dir, "be")
be = backend.New(bcfg)
defer be.Close()
- le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
+ le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL,leaseRevokeRate: leaseRevokeRate})
defer le.Stop()
// extend after recovery should extend expiration on lease pile-up
@@ -325,11 +324,11 @@ func TestLessorRenewExtendPileup(t *testing.T) {
for i := ttl; i < ttl+20; i++ {
c := windowCounts[i]
- if c > leaseRevokeRate {
- t.Errorf("expected at most %d expiring at %ds, got %d", leaseRevokeRate, i, c)
+ if c > le.leaseRevokeRate {
+ t.Errorf("expected at most %d expiring at %ds, got %d", le.leaseRevokeRate, i, c)
}
- if c < leaseRevokeRate/2 {
- t.Errorf("expected at least %d expiring at %ds, got %d", leaseRevokeRate/2, i, c)
+ if c < le.leaseRevokeRate/2 {
+ t.Errorf("expected at least %d expiring at %ds, got %d", le.leaseRevokeRate/2, i, c)
}
}
}
--
2.9.3.windows.1
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。