2018-05-15 01:31:26 +01:00
|
|
|
// Copyright (C) 2018 Storj Labs, Inc.
|
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
2018-07-06 20:43:53 +01:00
|
|
|
package pointerdb
|
2018-05-15 01:31:26 +01:00
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2018-07-27 07:02:59 +01:00
|
|
|
"reflect"
|
|
|
|
"strings"
|
2018-05-15 01:31:26 +01:00
|
|
|
|
2018-05-30 03:47:40 +01:00
|
|
|
"github.com/golang/protobuf/proto"
|
2018-07-27 07:02:59 +01:00
|
|
|
"github.com/golang/protobuf/ptypes"
|
2018-05-15 01:31:26 +01:00
|
|
|
"go.uber.org/zap"
|
2018-06-04 17:45:07 +01:00
|
|
|
"google.golang.org/grpc"
|
2018-05-30 03:47:40 +01:00
|
|
|
"google.golang.org/grpc/codes"
|
|
|
|
"google.golang.org/grpc/status"
|
2018-05-15 01:31:26 +01:00
|
|
|
|
2018-07-27 07:02:59 +01:00
|
|
|
"storj.io/storj/pkg/paths"
|
2018-07-30 19:57:50 +01:00
|
|
|
"storj.io/storj/pkg/storage/meta"
|
2018-07-06 20:43:53 +01:00
|
|
|
"storj.io/storj/pointerdb/auth"
|
|
|
|
pb "storj.io/storj/protos/pointerdb"
|
2018-06-13 19:22:32 +01:00
|
|
|
"storj.io/storj/storage"
|
2018-05-15 01:31:26 +01:00
|
|
|
)
|
|
|
|
|
2018-07-27 07:02:59 +01:00
|
|
|
// ListPageLimit is the maximum number of items that will be returned by a list
|
|
|
|
// request.
|
|
|
|
// TODO(kaloyan): make it configurable
|
|
|
|
const ListPageLimit = 1000
|
|
|
|
|
2018-05-15 01:31:26 +01:00
|
|
|
// Server implements the network state RPC service
|
|
|
|
type Server struct {
|
2018-06-13 19:22:32 +01:00
|
|
|
DB storage.KeyValueStore
|
2018-05-15 01:31:26 +01:00
|
|
|
logger *zap.Logger
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewServer creates instance of Server
|
2018-06-13 19:22:32 +01:00
|
|
|
func NewServer(db storage.KeyValueStore, logger *zap.Logger) *Server {
|
2018-05-15 01:31:26 +01:00
|
|
|
return &Server{
|
|
|
|
DB: db,
|
|
|
|
logger: logger,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-07-27 07:02:59 +01:00
|
|
|
func (s *Server) validateAuth(APIKey []byte) error {
|
|
|
|
if !auth.ValidateAPIKey(string(APIKey)) {
|
2018-06-04 17:45:07 +01:00
|
|
|
s.logger.Error("unauthorized request: ", zap.Error(grpc.Errorf(codes.Unauthenticated, "Invalid API credential")))
|
|
|
|
return grpc.Errorf(codes.Unauthenticated, "Invalid API credential")
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2018-06-29 21:06:25 +01:00
|
|
|
// Put formats and hands off a key/value (path/pointer) to be saved to boltdb
|
2018-07-27 07:02:59 +01:00
|
|
|
func (s *Server) Put(ctx context.Context, req *pb.PutRequest) (resp *pb.PutResponse, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
2018-07-06 20:43:53 +01:00
|
|
|
s.logger.Debug("entering pointerdb put")
|
2018-05-15 01:31:26 +01:00
|
|
|
|
2018-07-27 07:02:59 +01:00
|
|
|
if err = s.validateAuth(req.GetAPIKey()); err != nil {
|
2018-06-04 17:45:07 +01:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2018-07-27 07:02:59 +01:00
|
|
|
// Update the pointer with the creation date
|
|
|
|
req.GetPointer().CreationDate = ptypes.TimestampNow()
|
|
|
|
|
|
|
|
pointerBytes, err := proto.Marshal(req.GetPointer())
|
2018-05-30 03:47:40 +01:00
|
|
|
if err != nil {
|
|
|
|
s.logger.Error("err marshaling pointer", zap.Error(err))
|
|
|
|
return nil, status.Errorf(codes.Internal, err.Error())
|
2018-05-15 01:31:26 +01:00
|
|
|
}
|
|
|
|
|
2018-07-27 07:02:59 +01:00
|
|
|
// TODO(kaloyan): make sure that we know we are overwriting the pointer!
|
|
|
|
// In such case we should delete the pieces of the old segment if it was
|
|
|
|
// a remote one.
|
|
|
|
if err = s.DB.Put([]byte(req.GetPath()), pointerBytes); err != nil {
|
2018-05-30 03:47:40 +01:00
|
|
|
s.logger.Error("err putting pointer", zap.Error(err))
|
|
|
|
return nil, status.Errorf(codes.Internal, err.Error())
|
|
|
|
}
|
2018-07-27 07:02:59 +01:00
|
|
|
s.logger.Debug("put to the db: " + string(req.GetPath()))
|
2018-05-30 03:47:40 +01:00
|
|
|
|
|
|
|
return &pb.PutResponse{}, nil
|
2018-05-15 01:31:26 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// Get formats and hands off a file path to get from boltdb
|
2018-07-27 07:02:59 +01:00
|
|
|
func (s *Server) Get(ctx context.Context, req *pb.GetRequest) (resp *pb.GetResponse, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
2018-07-06 20:43:53 +01:00
|
|
|
s.logger.Debug("entering pointerdb get")
|
2018-05-15 01:31:26 +01:00
|
|
|
|
2018-07-27 07:02:59 +01:00
|
|
|
if err = s.validateAuth(req.GetAPIKey()); err != nil {
|
2018-06-04 17:45:07 +01:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2018-07-27 07:02:59 +01:00
|
|
|
pointerBytes, err := s.DB.Get([]byte(req.GetPath()))
|
2018-05-15 01:31:26 +01:00
|
|
|
if err != nil {
|
2018-07-27 07:02:59 +01:00
|
|
|
s.logger.Error("err getting pointer", zap.Error(err))
|
2018-05-30 03:47:40 +01:00
|
|
|
return nil, status.Errorf(codes.Internal, err.Error())
|
2018-05-15 01:31:26 +01:00
|
|
|
}
|
|
|
|
|
2018-05-30 03:47:40 +01:00
|
|
|
return &pb.GetResponse{
|
|
|
|
Pointer: pointerBytes,
|
2018-05-15 01:31:26 +01:00
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2018-05-30 03:47:40 +01:00
|
|
|
// List calls the bolt client's List function and returns all Path keys in the Pointers bucket
|
2018-07-27 07:02:59 +01:00
|
|
|
func (s *Server) List(ctx context.Context, req *pb.ListRequest) (resp *pb.ListResponse, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
2018-07-06 20:43:53 +01:00
|
|
|
s.logger.Debug("entering pointerdb list")
|
2018-05-15 01:31:26 +01:00
|
|
|
|
2018-07-27 07:02:59 +01:00
|
|
|
limit := int(req.GetLimit())
|
|
|
|
if limit <= 0 || limit > ListPageLimit {
|
|
|
|
limit = ListPageLimit
|
2018-06-29 21:06:25 +01:00
|
|
|
}
|
|
|
|
|
2018-07-27 07:02:59 +01:00
|
|
|
if err = s.validateAuth(req.GetAPIKey()); err != nil {
|
2018-06-04 17:45:07 +01:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2018-07-27 07:02:59 +01:00
|
|
|
prefix := paths.New(req.GetPrefix())
|
|
|
|
|
|
|
|
// TODO(kaloyan): here we query the DB without limit. We must optimize it!
|
|
|
|
keys, err := s.DB.List(prefix.Bytes(), 0)
|
|
|
|
if err != nil {
|
|
|
|
return nil, status.Errorf(codes.Internal, err.Error())
|
|
|
|
}
|
|
|
|
|
|
|
|
var more bool
|
|
|
|
var items []*pb.ListResponse_Item
|
|
|
|
if req.GetEndBefore() != "" && req.GetStartAfter() == "" {
|
|
|
|
items, more = s.processKeysBackwards(ctx, keys, prefix,
|
|
|
|
req.GetEndBefore(), req.GetRecursive(), limit, req.GetMetaFlags())
|
|
|
|
} else {
|
|
|
|
items, more = s.processKeysForwards(ctx, keys, prefix, req.GetStartAfter(),
|
|
|
|
req.GetEndBefore(), req.GetRecursive(), limit, req.GetMetaFlags())
|
|
|
|
}
|
|
|
|
|
|
|
|
s.logger.Debug("path keys retrieved")
|
|
|
|
return &pb.ListResponse{Items: items, More: more}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// processKeysForwards iterates forwards through given keys, and returns them
|
|
|
|
// as list items
|
|
|
|
func (s *Server) processKeysForwards(ctx context.Context, keys storage.Keys,
|
|
|
|
prefix paths.Path, startAfter, endBefore string, recursive bool, limit int,
|
2018-07-30 19:57:50 +01:00
|
|
|
metaFlags uint32) (items []*pb.ListResponse_Item, more bool) {
|
2018-07-27 07:02:59 +01:00
|
|
|
skip := startAfter != ""
|
|
|
|
startAfterPath := prefix.Append(startAfter)
|
|
|
|
endBeforePath := prefix.Append(endBefore)
|
|
|
|
|
|
|
|
for _, key := range keys {
|
|
|
|
p := paths.New(string(key))
|
|
|
|
|
|
|
|
if skip {
|
|
|
|
if reflect.DeepEqual(p, startAfterPath) {
|
|
|
|
// TODO(kaloyan): Better check - what if there is no path equal to startAfter?
|
|
|
|
// TODO(kaloyan): Add Equal method in Path type
|
|
|
|
skip = false
|
|
|
|
}
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO(kaloyan): Better check - what if there is no path equal to endBefore?
|
|
|
|
// TODO(kaloyan): Add Equal method in Path type
|
|
|
|
if reflect.DeepEqual(p, endBeforePath) {
|
|
|
|
break
|
2018-06-29 21:06:25 +01:00
|
|
|
}
|
2018-07-27 07:02:59 +01:00
|
|
|
|
|
|
|
// TODO(kaloyan): add HasPrefix method to Path type
|
|
|
|
if !strings.HasPrefix(p.String(), prefix.String()) {
|
|
|
|
// We went through all keys that start with the prefix
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
if !recursive && len(p) > len(prefix)+1 {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
item := s.createListItem(ctx, p, metaFlags)
|
|
|
|
items = append(items, item)
|
|
|
|
|
|
|
|
if len(items) == limit {
|
|
|
|
more = true
|
|
|
|
break
|
2018-06-29 21:06:25 +01:00
|
|
|
}
|
2018-05-15 01:31:26 +01:00
|
|
|
}
|
2018-07-27 07:02:59 +01:00
|
|
|
return items, more
|
|
|
|
}
|
2018-05-15 01:31:26 +01:00
|
|
|
|
2018-07-27 07:02:59 +01:00
|
|
|
// processKeysBackwards iterates backwards through given keys, and returns them
|
|
|
|
// as list items
|
|
|
|
func (s *Server) processKeysBackwards(ctx context.Context, keys storage.Keys,
|
|
|
|
prefix paths.Path, endBefore string, recursive bool, limit int,
|
2018-07-30 19:57:50 +01:00
|
|
|
metaFlags uint32) (items []*pb.ListResponse_Item, more bool) {
|
2018-07-27 07:02:59 +01:00
|
|
|
skip := endBefore != ""
|
|
|
|
endBeforePath := prefix.Append(endBefore)
|
2018-06-29 21:06:25 +01:00
|
|
|
|
2018-07-27 07:02:59 +01:00
|
|
|
for i := len(keys) - 1; i >= 0; i-- {
|
|
|
|
key := keys[i]
|
|
|
|
p := paths.New(string(key))
|
|
|
|
|
|
|
|
if skip {
|
|
|
|
if reflect.DeepEqual(p, endBeforePath) {
|
|
|
|
// TODO(kaloyan): Better check - what if there is no path equal to endBefore?
|
|
|
|
// TODO(kaloyan): Add Equal method in Path type
|
|
|
|
skip = false
|
|
|
|
}
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO(kaloyan): add HasPrefix method to Path type
|
|
|
|
if !strings.HasPrefix(p.String(), prefix.String()) {
|
|
|
|
// We went through all keys that start with the prefix
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
if !recursive && len(p) > len(prefix)+1 {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
item := s.createListItem(ctx, p, metaFlags)
|
|
|
|
items = append([]*pb.ListResponse_Item{item}, items...)
|
|
|
|
|
|
|
|
if len(items) == limit {
|
|
|
|
more = true
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return items, more
|
|
|
|
}
|
|
|
|
|
|
|
|
// createListItem creates a new list item with the given path. It also adds
|
|
|
|
// the metadata according to the given metaFlags.
|
|
|
|
func (s *Server) createListItem(ctx context.Context, p paths.Path,
|
2018-07-30 19:57:50 +01:00
|
|
|
metaFlags uint32) *pb.ListResponse_Item {
|
2018-07-27 07:02:59 +01:00
|
|
|
item := &pb.ListResponse_Item{Path: p.String()}
|
|
|
|
err := s.getMetadata(ctx, item, metaFlags)
|
|
|
|
if err != nil {
|
|
|
|
s.logger.Warn("err retrieving metadata", zap.Error(err))
|
|
|
|
}
|
|
|
|
return item
|
2018-05-15 01:31:26 +01:00
|
|
|
}
|
|
|
|
|
2018-07-27 07:02:59 +01:00
|
|
|
// getMetadata adds the metadata to the given item pointer according to the
|
|
|
|
// given metaFlags
|
|
|
|
func (s *Server) getMetadata(ctx context.Context, item *pb.ListResponse_Item,
|
2018-07-30 19:57:50 +01:00
|
|
|
metaFlags uint32) (err error) {
|
2018-07-27 07:02:59 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
2018-07-30 19:57:50 +01:00
|
|
|
if metaFlags == meta.None {
|
2018-07-27 07:02:59 +01:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
b, err := s.DB.Get([]byte(item.GetPath()))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
pr := &pb.Pointer{}
|
|
|
|
err = proto.Unmarshal(b, pr)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
2018-06-29 21:06:25 +01:00
|
|
|
}
|
2018-07-27 07:02:59 +01:00
|
|
|
|
2018-07-30 19:57:50 +01:00
|
|
|
// Start with an empty pointer to and add only what's requested in
|
|
|
|
// metaFlags to safe to transfer payload
|
|
|
|
item.Pointer = &pb.Pointer{}
|
|
|
|
if metaFlags&meta.Modified != 0 {
|
|
|
|
item.Pointer.CreationDate = pr.GetCreationDate()
|
2018-07-27 07:02:59 +01:00
|
|
|
}
|
2018-07-30 19:57:50 +01:00
|
|
|
if metaFlags&meta.Expiration != 0 {
|
|
|
|
item.Pointer.ExpirationDate = pr.GetExpirationDate()
|
2018-07-27 07:02:59 +01:00
|
|
|
}
|
2018-07-30 19:57:50 +01:00
|
|
|
if metaFlags&meta.Size != 0 {
|
|
|
|
item.Pointer.Size = pr.GetSize()
|
|
|
|
}
|
|
|
|
if metaFlags&meta.UserDefined != 0 {
|
|
|
|
item.Pointer.Metadata = pr.GetMetadata()
|
2018-07-27 07:02:59 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
2018-06-29 21:06:25 +01:00
|
|
|
}
|
|
|
|
|
2018-05-15 01:31:26 +01:00
|
|
|
// Delete formats and hands off a file path to delete from boltdb
|
2018-07-27 07:02:59 +01:00
|
|
|
func (s *Server) Delete(ctx context.Context, req *pb.DeleteRequest) (resp *pb.DeleteResponse, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
2018-07-06 20:43:53 +01:00
|
|
|
s.logger.Debug("entering pointerdb delete")
|
2018-05-15 01:31:26 +01:00
|
|
|
|
2018-07-27 07:02:59 +01:00
|
|
|
if err = s.validateAuth(req.GetAPIKey()); err != nil {
|
2018-06-04 17:45:07 +01:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2018-07-27 07:02:59 +01:00
|
|
|
err = s.DB.Delete([]byte(req.GetPath()))
|
2018-05-15 01:31:26 +01:00
|
|
|
if err != nil {
|
2018-06-29 21:06:25 +01:00
|
|
|
s.logger.Error("err deleting path and pointer", zap.Error(err))
|
2018-05-30 03:47:40 +01:00
|
|
|
return nil, status.Errorf(codes.Internal, err.Error())
|
2018-05-15 01:31:26 +01:00
|
|
|
}
|
2018-07-27 07:02:59 +01:00
|
|
|
s.logger.Debug("deleted pointer at path: " + string(req.GetPath()))
|
2018-05-30 03:47:40 +01:00
|
|
|
return &pb.DeleteResponse{}, nil
|
2018-05-15 01:31:26 +01:00
|
|
|
}
|