satellite/*: use typed lrucache and ReadCache
Change-Id: Ieee535dd8735a95dd196a77413e4a25a6a72342c
This commit is contained in:
parent
50afaa411e
commit
f40a0cb7ba
@ -45,7 +45,7 @@ type ProjectLimitCache struct {
|
||||
defaultMaxBandwidth memory.Size
|
||||
defaultMaxSegments int64
|
||||
|
||||
state *lrucache.ExpiringLRU
|
||||
state *lrucache.ExpiringLRUOf[ProjectLimits]
|
||||
}
|
||||
|
||||
// NewProjectLimitCache creates a new project limit cache to store the project limits for each project ID.
|
||||
@ -55,7 +55,7 @@ func NewProjectLimitCache(db ProjectLimitDB, defaultMaxUsage, defaultMaxBandwidt
|
||||
defaultMaxUsage: defaultMaxUsage,
|
||||
defaultMaxBandwidth: defaultMaxBandwidth,
|
||||
defaultMaxSegments: defaultMaxSegments,
|
||||
state: lrucache.New(lrucache.Options{
|
||||
state: lrucache.NewOf[ProjectLimits](lrucache.Options{
|
||||
Capacity: config.CacheCapacity,
|
||||
Expiration: config.CacheExpiration,
|
||||
Name: "accounting-projectlimit",
|
||||
@ -65,17 +65,13 @@ func NewProjectLimitCache(db ProjectLimitDB, defaultMaxUsage, defaultMaxBandwidt
|
||||
|
||||
// GetLimits returns the project limits from cache.
|
||||
func (c *ProjectLimitCache) GetLimits(ctx context.Context, projectID uuid.UUID) (ProjectLimits, error) {
|
||||
fn := func() (interface{}, error) {
|
||||
return c.getProjectLimits(ctx, projectID)
|
||||
}
|
||||
projectLimits, err := c.state.Get(ctx, projectID.String(), fn)
|
||||
limits, err := c.state.Get(ctx, projectID.String(),
|
||||
func() (ProjectLimits, error) {
|
||||
return c.getProjectLimits(ctx, projectID)
|
||||
})
|
||||
if err != nil {
|
||||
return ProjectLimits{}, ErrGetProjectLimitCache.Wrap(err)
|
||||
}
|
||||
limits, ok := projectLimits.(ProjectLimits)
|
||||
if !ok {
|
||||
return ProjectLimits{}, ErrProjectLimitType.New("cache Get error")
|
||||
}
|
||||
return limits, nil
|
||||
}
|
||||
|
||||
|
@ -15,7 +15,7 @@ import (
|
||||
|
||||
// RateLimiter allows to prevent multiple events in fixed period of time.
|
||||
type RateLimiter struct {
|
||||
limiters *lrucache.ExpiringLRU
|
||||
limiters *lrucache.ExpiringLRUOf[*rate.Limiter]
|
||||
interval time.Duration // interval during which events are not limiting.
|
||||
burst int // maximum number of events allowed during duration.
|
||||
}
|
||||
@ -23,7 +23,7 @@ type RateLimiter struct {
|
||||
// NewRateLimiter is a constructor for RateLimiter.
|
||||
func NewRateLimiter(interval time.Duration, burst, numLimits int) *RateLimiter {
|
||||
return &RateLimiter{
|
||||
limiters: lrucache.New(lrucache.Options{
|
||||
limiters: lrucache.NewOf[*rate.Limiter](lrucache.Options{
|
||||
Expiration: -1,
|
||||
Capacity: numLimits,
|
||||
Name: "contact-ratelimit",
|
||||
@ -35,7 +35,7 @@ func NewRateLimiter(interval time.Duration, burst, numLimits int) *RateLimiter {
|
||||
|
||||
// IsAllowed indicates if event is allowed to happen.
|
||||
func (rateLimiter *RateLimiter) IsAllowed(ctx context.Context, key string) bool {
|
||||
limiter, err := rateLimiter.limiters.Get(ctx, key, func() (interface{}, error) {
|
||||
limiter, err := rateLimiter.limiters.Get(ctx, key, func() (*rate.Limiter, error) {
|
||||
return rate.NewLimiter(
|
||||
rate.Limit(time.Second)/rate.Limit(rateLimiter.interval),
|
||||
rateLimiter.burst,
|
||||
@ -45,5 +45,5 @@ func (rateLimiter *RateLimiter) IsAllowed(ctx context.Context, key string) bool
|
||||
panic(fmt.Sprintf("unreachable: %+v", err))
|
||||
}
|
||||
|
||||
return limiter.(*rate.Limiter).Allow()
|
||||
return limiter.Allow()
|
||||
}
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"github.com/spacemonkeygo/monkit/v3"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"storj.io/common/encryption"
|
||||
"storj.io/common/lrucache"
|
||||
@ -71,8 +72,8 @@ type Endpoint struct {
|
||||
projects console.Projects
|
||||
apiKeys APIKeys
|
||||
satellite signing.Signer
|
||||
limiterCache *lrucache.ExpiringLRU
|
||||
singleObjectLimitCache *lrucache.ExpiringLRU
|
||||
limiterCache *lrucache.ExpiringLRUOf[*rate.Limiter]
|
||||
singleObjectLimitCache *lrucache.ExpiringLRUOf[struct{}]
|
||||
encInlineSegmentSize int64 // max inline segment size + encryption overhead
|
||||
revocations revocation.DB
|
||||
defaultRS *pb.RedundancyScheme
|
||||
@ -119,12 +120,12 @@ func NewEndpoint(log *zap.Logger, buckets *buckets.Service, metabaseDB *metabase
|
||||
projectLimits: projectLimits,
|
||||
projects: projects,
|
||||
satellite: satellite,
|
||||
limiterCache: lrucache.New(lrucache.Options{
|
||||
limiterCache: lrucache.NewOf[*rate.Limiter](lrucache.Options{
|
||||
Capacity: config.RateLimiter.CacheCapacity,
|
||||
Expiration: config.RateLimiter.CacheExpiration,
|
||||
Name: "metainfo-ratelimit",
|
||||
}),
|
||||
singleObjectLimitCache: lrucache.New(lrucache.Options{
|
||||
singleObjectLimitCache: lrucache.NewOf[struct{}](lrucache.Options{
|
||||
Expiration: config.UploadLimiter.SingleObjectLimit,
|
||||
Capacity: config.UploadLimiter.CacheCapacity,
|
||||
}),
|
||||
|
@ -210,13 +210,13 @@ func (endpoint *Endpoint) checkRate(ctx context.Context, projectID uuid.UUID) (e
|
||||
if !endpoint.config.RateLimiter.Enabled {
|
||||
return nil
|
||||
}
|
||||
limiter, err := endpoint.limiterCache.Get(ctx, projectID.String(), func() (interface{}, error) {
|
||||
limiter, err := endpoint.limiterCache.Get(ctx, projectID.String(), func() (*rate.Limiter, error) {
|
||||
rateLimit := rate.Limit(endpoint.config.RateLimiter.Rate)
|
||||
burstLimit := int(endpoint.config.RateLimiter.Rate)
|
||||
|
||||
limits, err := endpoint.projectLimits.GetLimits(ctx, projectID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
return nil, err
|
||||
}
|
||||
if limits.RateLimit != nil {
|
||||
rateLimit = rate.Limit(*limits.RateLimit)
|
||||
@ -233,11 +233,11 @@ func (endpoint *Endpoint) checkRate(ctx context.Context, projectID uuid.UUID) (e
|
||||
return rpcstatus.Error(rpcstatus.Unavailable, err.Error())
|
||||
}
|
||||
|
||||
if !limiter.(*rate.Limiter).Allow() {
|
||||
if !limiter.Allow() {
|
||||
endpoint.log.Warn("too many requests for project",
|
||||
zap.Stringer("projectID", projectID),
|
||||
zap.Float64("rate limit", float64(limiter.(*rate.Limiter).Limit())),
|
||||
zap.Float64("burst limit", float64(limiter.(*rate.Limiter).Burst())))
|
||||
zap.Float64("rate limit", float64(limiter.Limit())),
|
||||
zap.Float64("burst limit", float64(limiter.Burst())))
|
||||
|
||||
mon.Event("metainfo_rate_limit_exceeded") //mon:locked
|
||||
|
||||
@ -504,7 +504,7 @@ func (endpoint *Endpoint) checkObjectUploadRate(ctx context.Context, projectID u
|
||||
// if object location is in cache it means that we won't allow to upload yet here,
|
||||
// if it's not or internally key expired we are good to go
|
||||
key := strings.Join([]string{string(projectID[:]), string(bucketName), string(objectKey)}, "/")
|
||||
_, _ = endpoint.singleObjectLimitCache.Get(ctx, key, func() (interface{}, error) {
|
||||
_, _ = endpoint.singleObjectLimitCache.Get(ctx, key, func() (struct{}, error) {
|
||||
limited = false
|
||||
return struct{}{}, nil
|
||||
})
|
||||
|
@ -35,7 +35,7 @@ type DownloadSelectionCache struct {
|
||||
db DownloadSelectionDB
|
||||
config DownloadSelectionCacheConfig
|
||||
|
||||
cache sync2.ReadCache
|
||||
cache sync2.ReadCacheOf[*DownloadSelectionCacheState]
|
||||
}
|
||||
|
||||
// NewDownloadSelectionCache creates a new cache that keeps a list of all the storage nodes that are qualified to download data from.
|
||||
@ -62,7 +62,7 @@ func (cache *DownloadSelectionCache) Refresh(ctx context.Context) (err error) {
|
||||
}
|
||||
|
||||
// read loads the latest download selection state.
|
||||
func (cache *DownloadSelectionCache) read(ctx context.Context) (_ interface{}, err error) {
|
||||
func (cache *DownloadSelectionCache) read(ctx context.Context) (_ *DownloadSelectionCacheState, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
onlineNodes, err := cache.db.SelectAllStorageNodesDownload(ctx, cache.config.OnlineWindow, cache.config.AsOfSystemTime)
|
||||
@ -79,11 +79,10 @@ func (cache *DownloadSelectionCache) read(ctx context.Context) (_ interface{}, e
|
||||
func (cache *DownloadSelectionCache) GetNodeIPs(ctx context.Context, nodes []storj.NodeID) (_ map[storj.NodeID]string, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
stateAny, err := cache.cache.Get(ctx, time.Now())
|
||||
state, err := cache.cache.Get(ctx, time.Now())
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
state := stateAny.(*DownloadSelectionCacheState)
|
||||
|
||||
return state.IPs(nodes), nil
|
||||
}
|
||||
@ -92,22 +91,19 @@ func (cache *DownloadSelectionCache) GetNodeIPs(ctx context.Context, nodes []sto
|
||||
func (cache *DownloadSelectionCache) GetNodes(ctx context.Context, nodes []storj.NodeID) (_ map[storj.NodeID]*SelectedNode, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
stateAny, err := cache.cache.Get(ctx, time.Now())
|
||||
state, err := cache.cache.Get(ctx, time.Now())
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
state := stateAny.(*DownloadSelectionCacheState)
|
||||
|
||||
return state.Nodes(nodes), nil
|
||||
}
|
||||
|
||||
// Size returns how many nodes are in the cache.
|
||||
func (cache *DownloadSelectionCache) Size(ctx context.Context) (int, error) {
|
||||
stateAny, err := cache.cache.Get(ctx, time.Now())
|
||||
if stateAny == nil || err != nil {
|
||||
state, err := cache.cache.Get(ctx, time.Now())
|
||||
if state == nil || err != nil {
|
||||
return 0, Error.Wrap(err)
|
||||
}
|
||||
state := stateAny.(*DownloadSelectionCacheState)
|
||||
return state.Size(), nil
|
||||
}
|
||||
|
||||
|
@ -36,7 +36,7 @@ type UploadSelectionCache struct {
|
||||
db UploadSelectionDB
|
||||
selectionConfig NodeSelectionConfig
|
||||
|
||||
cache sync2.ReadCache
|
||||
cache sync2.ReadCacheOf[*uploadselection.State]
|
||||
}
|
||||
|
||||
// NewUploadSelectionCache creates a new cache that keeps a list of all the storage nodes that are qualified to store data.
|
||||
@ -65,7 +65,7 @@ func (cache *UploadSelectionCache) Refresh(ctx context.Context) (err error) {
|
||||
// refresh calls out to the database and refreshes the cache with the most up-to-date
|
||||
// data from the nodes table, then sets time that the last refresh occurred so we know when
|
||||
// to refresh again in the future.
|
||||
func (cache *UploadSelectionCache) read(ctx context.Context) (_ interface{}, err error) {
|
||||
func (cache *UploadSelectionCache) read(ctx context.Context) (_ *uploadselection.State, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
reputableNodes, newNodes, err := cache.db.SelectAllStorageNodesUpload(ctx, cache.selectionConfig)
|
||||
@ -87,11 +87,10 @@ func (cache *UploadSelectionCache) read(ctx context.Context) (_ interface{}, err
|
||||
func (cache *UploadSelectionCache) GetNodes(ctx context.Context, req FindStorageNodesRequest) (_ []*SelectedNode, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
stateAny, err := cache.cache.Get(ctx, time.Now())
|
||||
state, err := cache.cache.Get(ctx, time.Now())
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
state := stateAny.(*uploadselection.State)
|
||||
|
||||
selected, err := state.Select(ctx, uploadselection.Request{
|
||||
Count: req.RequestedCount,
|
||||
@ -109,11 +108,10 @@ func (cache *UploadSelectionCache) GetNodes(ctx context.Context, req FindStorage
|
||||
|
||||
// Size returns how many reputable nodes and new nodes are in the cache.
|
||||
func (cache *UploadSelectionCache) Size(ctx context.Context) (reputableNodeCount int, newNodeCount int, _ error) {
|
||||
stateAny, err := cache.cache.Get(ctx, time.Now())
|
||||
state, err := cache.cache.Get(ctx, time.Now())
|
||||
if err != nil {
|
||||
return 0, 0, Error.Wrap(err)
|
||||
}
|
||||
state := stateAny.(*uploadselection.State)
|
||||
stats := state.Stats()
|
||||
return stats.Reputable, stats.New, nil
|
||||
}
|
||||
|
@ -21,7 +21,7 @@ var _ console.APIKeys = (*apikeys)(nil)
|
||||
// apikeys is an implementation of satellite.APIKeys.
|
||||
type apikeys struct {
|
||||
methods dbx.Methods
|
||||
lru *lrucache.ExpiringLRU
|
||||
lru *lrucache.ExpiringLRUOf[*dbx.ApiKey]
|
||||
db *satelliteDB
|
||||
}
|
||||
|
||||
@ -139,16 +139,12 @@ func (keys *apikeys) Get(ctx context.Context, id uuid.UUID) (_ *console.APIKeyIn
|
||||
func (keys *apikeys) GetByHead(ctx context.Context, head []byte) (_ *console.APIKeyInfo, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
dbKeyI, err := keys.lru.Get(ctx, string(head), func() (interface{}, error) {
|
||||
dbKey, err := keys.lru.Get(ctx, string(head), func() (*dbx.ApiKey, error) {
|
||||
return keys.methods.Get_ApiKey_By_Head(ctx, dbx.ApiKey_Head(head))
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dbKey, ok := dbKeyI.(*dbx.ApiKey)
|
||||
if !ok {
|
||||
return nil, Error.New("invalid key type: %T", dbKeyI)
|
||||
}
|
||||
return fromDBXAPIKey(ctx, dbKey)
|
||||
}
|
||||
|
||||
|
@ -53,7 +53,7 @@ func (db *ConsoleDB) APIKeys() console.APIKeys {
|
||||
options.Name = "satellitedb-apikeys"
|
||||
db.apikeys = &apikeys{
|
||||
methods: db.methods,
|
||||
lru: lrucache.New(options),
|
||||
lru: lrucache.NewOf[*dbx.ApiKey](options),
|
||||
db: db.db,
|
||||
}
|
||||
})
|
||||
|
@ -222,7 +222,7 @@ func (dbc *satelliteDBCollection) Revocation() revocation.DB {
|
||||
options.Name = "satellitedb-revocations"
|
||||
db.revocationDB = &revocationDB{
|
||||
db: db,
|
||||
lru: lrucache.New(options),
|
||||
lru: lrucache.NewOf[bool](options),
|
||||
methods: db,
|
||||
}
|
||||
})
|
||||
|
@ -15,7 +15,7 @@ import (
|
||||
|
||||
type revocationDB struct {
|
||||
db *satelliteDB
|
||||
lru *lrucache.ExpiringLRU
|
||||
lru *lrucache.ExpiringLRUOf[bool]
|
||||
methods dbx.Methods
|
||||
}
|
||||
|
||||
@ -37,7 +37,7 @@ func (db *revocationDB) Check(ctx context.Context, tails [][]byte) (bool, error)
|
||||
// again.
|
||||
finalTail := tails[numTails-1]
|
||||
|
||||
val, err := db.lru.Get(ctx, string(finalTail), func() (interface{}, error) {
|
||||
revoked, err := db.lru.Get(ctx, string(finalTail), func() (bool, error) {
|
||||
const query = "SELECT EXISTS(SELECT 1 FROM revocations WHERE revoked IN (%s))"
|
||||
|
||||
var (
|
||||
@ -57,7 +57,7 @@ func (db *revocationDB) Check(ctx context.Context, tails [][]byte) (bool, error)
|
||||
row := db.db.QueryRowContext(ctx, fmt.Sprintf(query, tailQuery), tailsForQuery...)
|
||||
err := row.Scan(&revoked)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return false, err
|
||||
}
|
||||
|
||||
return revoked, nil
|
||||
@ -66,10 +66,5 @@ func (db *revocationDB) Check(ctx context.Context, tails [][]byte) (bool, error)
|
||||
return false, errs.Wrap(err)
|
||||
}
|
||||
|
||||
revoked, ok := val.(bool)
|
||||
if !ok {
|
||||
return false, errs.New("Revoked not a bool")
|
||||
}
|
||||
|
||||
return revoked, nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user