storj/pkg/overlay/server.go
Bryan White 5d20cf8829
Node Identity (#193)
* peertls: don't log errors for double close

understood that this part of the code is undergoing heavy change
right now, but just want to make sure this fix gets incorporated
somewhere

* git cleanup: node-id stuff

* cleanup

* rename identity_util.go

* wip `CertificateAuthority` refactor

* refactoring

* gitignore update

* wip

* Merge remote-tracking branch 'storj/doubleclose' into node-id3

* storj/doubleclose:
  peertls: don't log errors for double close

* add peertls tests & gomports

* wip:

+ refactor
+ style changes
+ cleanup
+ [wip] add version to CA and identity configs
+ [wip] heavy client setup

* refactor

* wip:

+ refactor
+ style changes
+ add `CAConfig.Load`
+ add `CAConfig.Save`

* wip:

+ add `LoadOrCreate` and `Create` to CA and Identity configs
+ add overwrite to CA and identity configs
+ heavy client setup
+ refactor
+ style changes
+ cleanup

* wip

* fixing things

* fixing things

* wip hc setup

* hc setup:

+ refactor
+ bugfixing

* improvements based on reveiw feedback

* goimports

* improvements:

+ responding to review feedback
+ refactor

* feedback-based improvements

* feedback-based improvements

* feedback-based improvements

* feedback-based improvements

* feedback-based improvements

* feedback-based improvements

* cleanup

* refactoring CA and Identity structs

* Merge branch 'master' into node-id3

* move version field to setup config structs for CA and identity

* fix typo

* responding to revieiw feedback

* responding to revieiw feedback

* responding to revieiw feedback

* responding to revieiw feedback

* responding to revieiw feedback

* responding to revieiw feedback

* Merge branch 'master' into node-id3

* fix gateway setup finally

* go imports

* fix `FullCertificateAuthority.GenerateIdentity`

* cleanup overlay tests

* bugfixing

* update ca/identity setup

* go imports

* fix peertls test copy/paste fail

* responding to review feedback

* setup tweaking

* update farmer setup
2018-08-13 10:39:45 +02:00

144 lines
3.5 KiB
Go

// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package overlay
import (
"context"
"fmt"
protob "github.com/gogo/protobuf/proto"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/pkg/dht"
proto "storj.io/storj/protos/overlay" // naming proto to avoid confusion with this package
"storj.io/storj/storage"
)
// Server implements our overlay RPC service
type Server struct {
dht dht.DHT
cache *Cache
logger *zap.Logger
metrics *monkit.Registry
}
// Lookup finds the address of a node in our overlay network
func (o *Server) Lookup(ctx context.Context, req *proto.LookupRequest) (*proto.LookupResponse, error) {
na, err := o.cache.Get(ctx, req.NodeID)
if err != nil {
o.logger.Error("Error looking up node", zap.Error(err), zap.String("nodeID", req.NodeID))
return nil, err
}
return &proto.LookupResponse{
Node: na,
}, nil
}
// FindStorageNodes searches the overlay network for nodes that meet the provided requirements
func (o *Server) FindStorageNodes(ctx context.Context, req *proto.FindStorageNodesRequest) (resp *proto.FindStorageNodesResponse, err error) {
opts := req.GetOpts()
maxNodes := opts.GetAmount()
restrictions := opts.GetRestrictions()
restrictedBandwidth := restrictions.GetFreeBandwidth()
restrictedSpace := restrictions.GetFreeDisk()
var start storage.Key
result := []*proto.Node{}
for {
var nodes []*proto.Node
nodes, start, err = o.populate(ctx, start, maxNodes, restrictedBandwidth, restrictedSpace)
if err != nil {
return nil, Error.Wrap(err)
}
if len(nodes) <= 0 {
break
}
result = append(result, nodes...)
if len(result) >= int(maxNodes) || start == nil {
break
}
}
if len(result) < int(maxNodes) {
fmt.Printf("result %v", result)
return nil, status.Errorf(codes.ResourceExhausted, fmt.Sprintf("requested %d nodes, only %d nodes matched the criteria requested", maxNodes, len(result)))
}
if len(result) > int(maxNodes) {
result = result[:maxNodes]
}
return &proto.FindStorageNodesResponse{
Nodes: result,
}, nil
}
func (o *Server) getNodes(ctx context.Context, keys storage.Keys) ([]*proto.Node, error) {
values, err := o.cache.DB.GetAll(keys)
if err != nil {
return nil, Error.Wrap(err)
}
nodes := []*proto.Node{}
for _, v := range values {
n := &proto.Node{}
if err := protob.Unmarshal(v, n); err != nil {
return nil, Error.Wrap(err)
}
nodes = append(nodes, n)
}
return nodes, nil
}
func (o *Server) populate(ctx context.Context, starting storage.Key, maxNodes, restrictedBandwidth, restrictedSpace int64) ([]*proto.Node, storage.Key, error) {
limit := storage.Limit(maxNodes) * 2
keys, err := o.cache.DB.List(starting, limit)
fmt.Printf("keys from populate %v\n", keys)
if err != nil {
o.logger.Error("Error listing nodes", zap.Error(err))
return nil, nil, Error.Wrap(err)
}
if len(keys) <= 0 {
o.logger.Info("No Keys returned from List operation")
return []*proto.Node{}, starting, nil
}
result := []*proto.Node{}
nodes, err := o.getNodes(ctx, keys)
fmt.Printf("nodes from populate %v\n", len(nodes))
if err != nil {
o.logger.Error("Error getting nodes", zap.Error(err))
return nil, nil, Error.Wrap(err)
}
for _, v := range nodes {
rest := v.GetRestrictions()
if rest.GetFreeBandwidth() < restrictedBandwidth || rest.GetFreeDisk() < restrictedSpace {
continue
}
result = append(result, v)
}
nextStart := keys[len(keys)-1]
if len(keys) < int(limit) {
nextStart = nil
}
return result, nextStart, nil
}