satellite/metabase,cmd/tools/segment-verify: simplify interface
Change-Id: Icdd445b1713bc26cee3b3a125b68b0cde0739837
This commit is contained in:
parent
ac6bb1e187
commit
c8506cdda3
@ -104,13 +104,21 @@ func (service *Service) VerifyBatches(ctx context.Context, batches []*Batch) err
|
|||||||
func (service *Service) convertAliasToNodeURL(ctx context.Context, alias metabase.NodeAlias) (_ storj.NodeURL, err error) {
|
func (service *Service) convertAliasToNodeURL(ctx context.Context, alias metabase.NodeAlias) (_ storj.NodeURL, err error) {
|
||||||
nodeURL, ok := service.aliasToNodeURL[alias]
|
nodeURL, ok := service.aliasToNodeURL[alias]
|
||||||
if !ok {
|
if !ok {
|
||||||
// not in cache, use the slow path
|
nodeID, ok := service.aliasMap.Node(alias)
|
||||||
nodeIDs, err := service.metabase.ConvertAliasesToNodes(ctx, []metabase.NodeAlias{alias})
|
if !ok {
|
||||||
if err != nil {
|
latest, err := service.metabase.LatestNodesAliasMap(ctx)
|
||||||
return storj.NodeURL{}, Error.Wrap(err)
|
if !ok {
|
||||||
|
return storj.NodeURL{}, Error.Wrap(err)
|
||||||
|
}
|
||||||
|
service.aliasMap = latest
|
||||||
|
|
||||||
|
nodeID, ok = service.aliasMap.Node(alias)
|
||||||
|
if !ok {
|
||||||
|
return storj.NodeURL{}, Error.Wrap(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
info, err := service.overlay.Get(ctx, nodeIDs[0])
|
info, err := service.overlay.Get(ctx, nodeID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return storj.NodeURL{}, Error.Wrap(err)
|
return storj.NodeURL{}, Error.Wrap(err)
|
||||||
}
|
}
|
||||||
|
@ -29,7 +29,6 @@ var Error = errs.Class("segment-verify")
|
|||||||
// Metabase defines implementation dependencies we need from metabase.
|
// Metabase defines implementation dependencies we need from metabase.
|
||||||
type Metabase interface {
|
type Metabase interface {
|
||||||
LatestNodesAliasMap(ctx context.Context) (*metabase.NodeAliasMap, error)
|
LatestNodesAliasMap(ctx context.Context) (*metabase.NodeAliasMap, error)
|
||||||
ConvertAliasesToNodes(ctx context.Context, aliases []metabase.NodeAlias) ([]storj.NodeID, error)
|
|
||||||
GetSegmentByPosition(ctx context.Context, opts metabase.GetSegmentByPosition) (segment metabase.Segment, err error)
|
GetSegmentByPosition(ctx context.Context, opts metabase.GetSegmentByPosition) (segment metabase.Segment, err error)
|
||||||
ListVerifySegments(ctx context.Context, opts metabase.ListVerifySegments) (result metabase.ListVerifySegmentsResult, err error)
|
ListVerifySegments(ctx context.Context, opts metabase.ListVerifySegments) (result metabase.ListVerifySegmentsResult, err error)
|
||||||
}
|
}
|
||||||
@ -77,6 +76,7 @@ type Service struct {
|
|||||||
verifier Verifier
|
verifier Verifier
|
||||||
overlay Overlay
|
overlay Overlay
|
||||||
|
|
||||||
|
aliasMap *metabase.NodeAliasMap
|
||||||
aliasToNodeURL map[metabase.NodeAlias]storj.NodeURL
|
aliasToNodeURL map[metabase.NodeAlias]storj.NodeURL
|
||||||
priorityNodes NodeAliasSet
|
priorityNodes NodeAliasSet
|
||||||
onlineNodes NodeAliasSet
|
onlineNodes NodeAliasSet
|
||||||
@ -132,13 +132,8 @@ func (service *Service) loadOnlineNodes(ctx context.Context) (err error) {
|
|||||||
return Error.Wrap(err)
|
return Error.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
aliasMap, err := service.metabase.LatestNodesAliasMap(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return Error.Wrap(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, node := range nodes {
|
for _, node := range nodes {
|
||||||
alias, ok := aliasMap.Alias(node.ID)
|
alias, ok := service.aliasMap.Alias(node.ID)
|
||||||
if !ok {
|
if !ok {
|
||||||
// This means the node does not hold any data in metabase.
|
// This means the node does not hold any data in metabase.
|
||||||
continue
|
continue
|
||||||
@ -165,11 +160,6 @@ func (service *Service) loadPriorityNodes(ctx context.Context) (err error) {
|
|||||||
return Error.New("unable to read priority nodes: %w", err)
|
return Error.New("unable to read priority nodes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
aliasMap, err := service.metabase.LatestNodesAliasMap(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return Error.Wrap(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, line := range strings.Split(string(data), "\n") {
|
for _, line := range strings.Split(string(data), "\n") {
|
||||||
line = strings.TrimSpace(line)
|
line = strings.TrimSpace(line)
|
||||||
if line == "" || strings.HasPrefix(line, "#") {
|
if line == "" || strings.HasPrefix(line, "#") {
|
||||||
@ -181,7 +171,7 @@ func (service *Service) loadPriorityNodes(ctx context.Context) (err error) {
|
|||||||
return Error.Wrap(err)
|
return Error.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
alias, ok := aliasMap.Alias(nodeID)
|
alias, ok := service.aliasMap.Alias(nodeID)
|
||||||
if !ok {
|
if !ok {
|
||||||
service.log.Info("priority node ID not used", zap.Stringer("node id", nodeID), zap.Error(err))
|
service.log.Info("priority node ID not used", zap.Stringer("node id", nodeID), zap.Error(err))
|
||||||
continue
|
continue
|
||||||
@ -197,6 +187,12 @@ func (service *Service) loadPriorityNodes(ctx context.Context) (err error) {
|
|||||||
func (service *Service) ProcessRange(ctx context.Context, low, high uuid.UUID) (err error) {
|
func (service *Service) ProcessRange(ctx context.Context, low, high uuid.UUID) (err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
|
aliasMap, err := service.metabase.LatestNodesAliasMap(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return Error.Wrap(err)
|
||||||
|
}
|
||||||
|
service.aliasMap = aliasMap
|
||||||
|
|
||||||
err = service.loadOnlineNodes(ctx)
|
err = service.loadOnlineNodes(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Error.Wrap(err)
|
return Error.Wrap(err)
|
||||||
|
@ -274,18 +274,6 @@ func (db *metabaseMock) LatestNodesAliasMap(ctx context.Context) (*metabase.Node
|
|||||||
return metabase.NewNodeAliasMap(entries), nil
|
return metabase.NewNodeAliasMap(entries), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *metabaseMock) ConvertAliasesToNodes(ctx context.Context, aliases []metabase.NodeAlias) ([]storj.NodeID, error) {
|
|
||||||
xs := make([]storj.NodeID, len(aliases))
|
|
||||||
for i, alias := range aliases {
|
|
||||||
id, ok := db.aliasToNodeID[alias]
|
|
||||||
if !ok {
|
|
||||||
return nil, errs.New("alias %v not found", alias)
|
|
||||||
}
|
|
||||||
xs[i] = id
|
|
||||||
}
|
|
||||||
return xs, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *metabaseMock) DeleteSegmentByPosition(ctx context.Context, opts metabase.GetSegmentByPosition) error {
|
func (db *metabaseMock) DeleteSegmentByPosition(ctx context.Context, opts metabase.GetSegmentByPosition) error {
|
||||||
for i, s := range db.segments {
|
for i, s := range db.segments {
|
||||||
if opts.StreamID == s.StreamID && opts.Position == s.Position {
|
if opts.StreamID == s.StreamID && opts.Position == s.Position {
|
||||||
|
@ -79,17 +79,3 @@ func (db *DB) LatestNodesAliasMap(ctx context.Context) (_ *NodeAliasMap, err err
|
|||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
return db.aliasCache.Latest(ctx)
|
return db.aliasCache.Latest(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ConvertNodesToAliases converts nodeIDs to node aliases.
|
|
||||||
// Returns an error when an alias is missing.
|
|
||||||
func (db *DB) ConvertNodesToAliases(ctx context.Context, nodeIDs []storj.NodeID) (_ []NodeAlias, err error) {
|
|
||||||
defer mon.Task()(&ctx)(&err)
|
|
||||||
return db.aliasCache.Aliases(ctx, nodeIDs)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ConvertAliasesToNodes converts aliases to node ID-s.
|
|
||||||
// Returns an error when a node alias is missing.
|
|
||||||
func (db *DB) ConvertAliasesToNodes(ctx context.Context, aliases []NodeAlias) (_ []storj.NodeID, err error) {
|
|
||||||
defer mon.Task()(&ctx)(&err)
|
|
||||||
return db.aliasCache.Nodes(ctx, aliases)
|
|
||||||
}
|
|
||||||
|
Loading…
Reference in New Issue
Block a user