storj/pkg/overlay/server.go
Dennis Coyle 5de5428d3a
First pass at node restrictions (#146)
* WIP First pass at node restrictions

* adjustments to storage clients

* fix cover

* undo previous import change

* adding copyright

* fix the tests

* check for keys

* moar fixes to tests

* linter

* change how we handle restrictions

* add generated pb.go file

* PR comments addressed

* MockKeyValueStore

* pr comments addressed

* missing )

* past my bedtime

* moar merge issues

* cleanup
2018-08-01 10:15:38 -04:00

133 lines
3.0 KiB
Go

// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package overlay
import (
"context"
protob "github.com/gogo/protobuf/proto"
"go.uber.org/zap"
"gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/pkg/dht"
proto "storj.io/storj/protos/overlay" // naming proto to avoid confusion with this package
"storj.io/storj/storage"
)
// Server implements our overlay RPC service
type Server struct {
dht dht.DHT
cache *Cache
logger *zap.Logger
metrics *monkit.Registry
}
// Lookup finds the address of a node in our overlay network
func (o *Server) Lookup(ctx context.Context, req *proto.LookupRequest) (*proto.LookupResponse, error) {
na, err := o.cache.Get(ctx, req.NodeID)
if err != nil {
o.logger.Error("Error looking up node", zap.Error(err), zap.String("nodeID", req.NodeID))
return nil, err
}
return &proto.LookupResponse{
Node: na,
}, nil
}
// FindStorageNodes searches the overlay network for nodes that meet the provided requirements
func (o *Server) FindStorageNodes(ctx context.Context, req *proto.FindStorageNodesRequest) (resp *proto.FindStorageNodesResponse, err error) {
opts := req.GetOpts()
maxNodes := opts.GetLimit()
restrictions := opts.GetRestrictions()
restrictedBandwidth := restrictions.GetFreeBandwidth()
restrictedSpace := restrictions.GetFreeDisk()
var start storage.Key
result := []*proto.Node{}
for {
var nodes []*proto.Node
nodes, start, err = o.populate(ctx, start, maxNodes, restrictedBandwidth, restrictedSpace)
if err != nil {
return nil, err
}
if len(nodes) <= 0 {
break
}
result = append(result, nodes...)
if len(result) >= int(maxNodes) || start == nil {
break
}
}
if len(result) > int(maxNodes) {
result = result[:maxNodes]
}
return &proto.FindStorageNodesResponse{
Nodes: result,
}, nil
}
func (o *Server) getNodes(ctx context.Context, keys storage.Keys) ([]*proto.Node, error) {
values, err := o.cache.DB.GetAll(keys)
if err != nil {
return nil, err
}
nodes := []*proto.Node{}
for _, v := range values {
n := &proto.Node{}
if err := protob.Unmarshal(v, n); err != nil {
return nil, err
}
nodes = append(nodes, n)
}
return nodes, nil
}
func (o *Server) populate(ctx context.Context, starting storage.Key, maxNodes, restrictedBandwidth, restrictedSpace int64) ([]*proto.Node, storage.Key, error) {
limit := storage.Limit(maxNodes) * 2
keys, err := o.cache.DB.List(nil, limit)
if err != nil {
o.logger.Error("Error listing nodes", zap.Error(err))
return nil, nil, err
}
if len(keys) <= 0 {
o.logger.Info("No Keys returned from List operation")
return []*proto.Node{}, starting, nil
}
result := []*proto.Node{}
nodes, err := o.getNodes(ctx, keys)
if err != nil {
return nil, nil, err
}
for _, v := range nodes {
rest := v.GetRestrictions()
if rest.GetFreeBandwidth() < restrictedBandwidth || rest.GetFreeDisk() < restrictedSpace {
continue
}
result = append(result, v)
}
nextStart := keys[len(keys)-1]
if len(keys) < int(limit) {
nextStart = nil
}
return result, nextStart, nil
}