storagenode/piecestore: add workgroup to endpoint to prevent stray goroutine after shutdown
Change-Id: Ie8444c3c8f870745b73342de2e9a93027fcad371
This commit is contained in:
parent
92d86fa044
commit
d578102672
@ -441,6 +441,12 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
|
||||
if err != nil {
|
||||
return nil, errs.Combine(err, peer.Close())
|
||||
}
|
||||
// TODO remove this once workgroup is removed from piecestore endpoint
|
||||
peer.Services.Add(lifecycle.Item{
|
||||
Name: "piecestore",
|
||||
Close: peer.Storage2.Endpoint.Close,
|
||||
})
|
||||
|
||||
pb.RegisterPiecestoreServer(peer.Server.GRPC(), peer.Storage2.Endpoint)
|
||||
pb.DRPCRegisterPiecestore(peer.Server.DRPC(), peer.Storage2.Endpoint.DRPC())
|
||||
|
||||
|
@ -88,6 +88,8 @@ type Endpoint struct {
|
||||
usage bandwidth.DB
|
||||
usedSerials UsedSerials
|
||||
|
||||
group sync2.WorkGroup // temporary fix for uncontrolled goroutine at end of doUpload
|
||||
|
||||
reportCapacity func(context.Context)
|
||||
|
||||
// liveRequests tracks the total number of incoming rpc requests. For gRPC
|
||||
@ -285,10 +287,11 @@ func (endpoint *Endpoint) doUpload(stream uploadStream, requestLimit int) (err e
|
||||
// if availableSpace has fallen below ReportCapacityThreshold, report capacity to satellites
|
||||
defer func() {
|
||||
if availableSpace < endpoint.config.ReportCapacityThreshold.Int64() {
|
||||
go func() {
|
||||
// workgroup is a temporary fix to clean up goroutine when peer shuts down
|
||||
endpoint.group.Go(func() {
|
||||
endpoint.monitor.Loop.TriggerWait()
|
||||
endpoint.reportCapacity(ctx)
|
||||
}()
|
||||
})
|
||||
}
|
||||
}()
|
||||
|
||||
@ -823,3 +826,9 @@ func min(a, b int64) int64 {
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
// Close is a temporary fix to clean up uncontrolled goroutine in doUpload
|
||||
func (endpoint *Endpoint) Close() error {
|
||||
endpoint.group.Close()
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user