Decreases refresh time per bucket in Kademlia (#1150)
This commit is contained in:
parent
d50c07e56c
commit
df903ea215
@ -221,8 +221,15 @@ func (k *Kademlia) Bootstrap(ctx context.Context) error {
|
|||||||
k.routingTable.mutex.Lock()
|
k.routingTable.mutex.Lock()
|
||||||
id := k.routingTable.self.Id
|
id := k.routingTable.self.Id
|
||||||
k.routingTable.mutex.Unlock()
|
k.routingTable.mutex.Unlock()
|
||||||
|
|
||||||
_, err = k.lookup(ctx, id, true)
|
_, err = k.lookup(ctx, id, true)
|
||||||
|
|
||||||
|
// TODO(dylan): We do not currently handle this last bit of behavior.
|
||||||
|
// ```
|
||||||
|
// Finally, u refreshes all k-buckets further away than its closest neighbor.
|
||||||
|
// During the refreshes, u both populates its own k-buckets and inserts
|
||||||
|
// itself into other nodes' k-buckets as necessary.
|
||||||
|
// ``
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -338,7 +345,7 @@ func (k *Kademlia) RunRefresh(ctx context.Context) error {
|
|||||||
|
|
||||||
ticker := time.NewTicker(5 * time.Minute)
|
ticker := time.NewTicker(5 * time.Minute)
|
||||||
for {
|
for {
|
||||||
if err := k.refresh(ctx); err != nil {
|
if err := k.refresh(ctx, time.Minute); err != nil {
|
||||||
k.log.Warn("bucket refresh failed", zap.Error(err))
|
k.log.Warn("bucket refresh failed", zap.Error(err))
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
@ -351,7 +358,7 @@ func (k *Kademlia) RunRefresh(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// refresh updates each Kademlia bucket not contacted in the last hour
|
// refresh updates each Kademlia bucket not contacted in the last hour
|
||||||
func (k *Kademlia) refresh(ctx context.Context) error {
|
func (k *Kademlia) refresh(ctx context.Context, threshold time.Duration) error {
|
||||||
bIDs, err := k.routingTable.GetBucketIds()
|
bIDs, err := k.routingTable.GetBucketIds()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Error.Wrap(err)
|
return Error.Wrap(err)
|
||||||
@ -363,7 +370,7 @@ func (k *Kademlia) refresh(ctx context.Context) error {
|
|||||||
ts, tErr := k.routingTable.GetBucketTimestamp(bID)
|
ts, tErr := k.routingTable.GetBucketTimestamp(bID)
|
||||||
if tErr != nil {
|
if tErr != nil {
|
||||||
errors.Add(tErr)
|
errors.Add(tErr)
|
||||||
} else if now.After(ts.Add(time.Hour)) {
|
} else if now.After(ts.Add(threshold)) {
|
||||||
rID, _ := randomIDInRange(startID, keyToBucketID(bID))
|
rID, _ := randomIDInRange(startID, keyToBucketID(bID))
|
||||||
_, _ = k.FindNode(ctx, rID) // ignore node not found
|
_, _ = k.FindNode(ctx, rID) // ignore node not found
|
||||||
}
|
}
|
||||||
|
@ -205,13 +205,13 @@ func TestRefresh(t *testing.T) {
|
|||||||
err := rt.SetBucketTimestamp(bID[:], now.Add(-2*time.Hour))
|
err := rt.SetBucketTimestamp(bID[:], now.Add(-2*time.Hour))
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
//refresh should call FindNode, updating the time
|
//refresh should call FindNode, updating the time
|
||||||
err = k.refresh(ctx)
|
err = k.refresh(ctx, time.Minute)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
ts1, err := rt.GetBucketTimestamp(bID[:])
|
ts1, err := rt.GetBucketTimestamp(bID[:])
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.True(t, now.Add(-5*time.Minute).Before(ts1))
|
assert.True(t, now.Add(-5*time.Minute).Before(ts1))
|
||||||
//refresh should not call FindNode, leaving the previous time
|
//refresh should not call FindNode, leaving the previous time
|
||||||
err = k.refresh(ctx)
|
err = k.refresh(ctx, time.Minute)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
ts2, err := rt.GetBucketTimestamp(bID[:])
|
ts2, err := rt.GetBucketTimestamp(bID[:])
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
Loading…
Reference in New Issue
Block a user