From fdfa6e85c81cf088b09182b1307058df4cfdb1cb Mon Sep 17 00:00:00 2001 From: James Hagans Date: Thu, 23 Aug 2018 23:56:38 -0400 Subject: [PATCH] Streamstore impl of Stor interface (#191) * first stab at PUT * only PUT * working on PUT * Put with LimitReader * start of Get * reorder of files and proto meta * working on Meta * working on Meta * add aware limit reader * add size from segment put * rm if for eof * update to proto meta * update gen proto file * working on get * working on get * working on get * working on list * working on delete * working on list * working on meta method * fix merge error and working on feedback from PR * update to proto file * rm size tuple * mv eof limit reader to new file * add toMeta * rm varible names * add updates from PR feedback * updates from PR feedback * updates from PR feedback * add toMeta size based on total size * update toMeta size calculation * rm passthrough * add default to config for segment size * fix get method ranger bug * add object support for nested stream proto * rm nested stream meta data * rm test for another PR --- pkg/miniogw/config.go | 11 +- pkg/storage/streams/EOFAwareLimitReader.go | 29 +++ pkg/storage/streams/passthrough.go | 82 -------- pkg/storage/streams/store.go | 215 ++++++++++++++++++++- protos/streams/gen.go | 6 + protos/streams/meta.pb.go | 102 ++++++++++ protos/streams/meta.proto | 13 ++ 7 files changed, 366 insertions(+), 92 deletions(-) create mode 100644 pkg/storage/streams/EOFAwareLimitReader.go delete mode 100644 pkg/storage/streams/passthrough.go create mode 100644 protos/streams/gen.go create mode 100644 protos/streams/meta.pb.go create mode 100644 protos/streams/meta.proto diff --git a/pkg/miniogw/config.go b/pkg/miniogw/config.go index fbcfd0a89..452b34553 100644 --- a/pkg/miniogw/config.go +++ b/pkg/miniogw/config.go @@ -20,7 +20,7 @@ import ( ecclient "storj.io/storj/pkg/storage/ec" "storj.io/storj/pkg/storage/objects" segment "storj.io/storj/pkg/storage/segments" - "storj.io/storj/pkg/storage/streams" + streams "storj.io/storj/pkg/storage/streams" "storj.io/storj/pkg/transport" ) @@ -52,6 +52,7 @@ type ClientConfig struct { APIKey string `help:"API Key (TODO: this needs to change to macaroons somehow)"` MaxInlineSize int `help:"max inline segment size in bytes" default:"4096"` + SegmentSize int64 `help:"the size of a segment in bytes" default:"64000000"` } // Config is a general miniogw configuration struct. This should be everything @@ -150,9 +151,11 @@ func (c Config) NewGateway(ctx context.Context, segments := segment.NewSegmentStore(oc, ec, pdb, rs, c.MaxInlineSize) - // TODO(jt): wrap segments and turn segments into streams actually - // TODO: passthrough is bad - stream := streams.NewPassthrough(segments) + // segment size 64MB + stream, err := streams.NewStreamStore(segments, c.SegmentSize) + if err != nil { + return nil, err + } obj := objects.NewStore(stream) return NewStorjGateway(buckets.NewStore(obj)), nil diff --git a/pkg/storage/streams/EOFAwareLimitReader.go b/pkg/storage/streams/EOFAwareLimitReader.go new file mode 100644 index 000000000..0464166cf --- /dev/null +++ b/pkg/storage/streams/EOFAwareLimitReader.go @@ -0,0 +1,29 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package streams + +import "io" + +// EOFAwareLimitReader holds reader and status of EOF +type EOFAwareLimitReader struct { + reader io.Reader + eof bool +} + +// EOFAwareReader keeps track of the state, has the internal reader reached EOF +func EOFAwareReader(r io.Reader) *EOFAwareLimitReader { + return &EOFAwareLimitReader{reader: r, eof: false} +} + +func (r *EOFAwareLimitReader) Read(p []byte) (n int, err error) { + n, err = r.reader.Read(p) + if err == io.EOF { + r.eof = true + } + return n, err +} + +func (r *EOFAwareLimitReader) isEOF() bool { + return r.eof +} diff --git a/pkg/storage/streams/passthrough.go b/pkg/storage/streams/passthrough.go deleted file mode 100644 index 4ef3b8c90..000000000 --- a/pkg/storage/streams/passthrough.go +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright (C) 2018 Storj Labs, Inc. -// See LICENSE for copying information. - -package streams - -import ( - "context" - "io" - "time" - - "storj.io/storj/pkg/paths" - "storj.io/storj/pkg/ranger" - "storj.io/storj/pkg/storage/segments" -) - -// Passthrough implementation of stream store -type Passthrough struct { - Segments segments.Store -} - -// NewPassthrough stream store -func NewPassthrough(s segments.Store) *Passthrough { - return &Passthrough{Segments: s} -} - -var _ Store = (*Passthrough)(nil) - -// Meta implements Store.Meta -func (p *Passthrough) Meta(ctx context.Context, path paths.Path) (Meta, error) { - m, err := p.Segments.Meta(ctx, path) - return convertMeta(m), err -} - -// Get implements Store.Get -func (p *Passthrough) Get(ctx context.Context, path paths.Path) ( - ranger.RangeCloser, Meta, error) { - rr, m, err := p.Segments.Get(ctx, path) - return rr, convertMeta(m), err -} - -// Put implements Store.Put -func (p *Passthrough) Put(ctx context.Context, path paths.Path, data io.Reader, - metadata []byte, expiration time.Time) (Meta, error) { - m, err := p.Segments.Put(ctx, path, data, metadata, expiration) - return convertMeta(m), err -} - -// Delete implements Store.Delete -func (p *Passthrough) Delete(ctx context.Context, path paths.Path) error { - return p.Segments.Delete(ctx, path) -} - -// List implements Store.List -func (p *Passthrough) List(ctx context.Context, - prefix, startAfter, endBefore paths.Path, recursive bool, limit int, - metaFlags uint32) (items []ListItem, more bool, err error) { - segItems, more, err := p.Segments.List(ctx, prefix, startAfter, endBefore, - recursive, limit, metaFlags) - if err != nil { - return nil, false, nil - } - - items = make([]ListItem, len(segItems)) - for i, itm := range segItems { - items[i] = ListItem{ - Path: itm.Path, - Meta: convertMeta(itm.Meta), - } - } - - return items, more, nil -} - -// convertMeta converts segment metadata to stream metadata -func convertMeta(m segments.Meta) Meta { - return Meta{ - Modified: m.Modified, - Expiration: m.Expiration, - Size: m.Size, - Data: m.Data, - } -} diff --git a/pkg/storage/streams/store.go b/pkg/storage/streams/store.go index 4e010f1d9..388c346a1 100644 --- a/pkg/storage/streams/store.go +++ b/pkg/storage/streams/store.go @@ -5,13 +5,22 @@ package streams import ( "context" + "fmt" "io" "time" + proto "github.com/gogo/protobuf/proto" + "github.com/zeebo/errs" + monkit "gopkg.in/spacemonkeygo/monkit.v2" + "storj.io/storj/pkg/paths" - "storj.io/storj/pkg/ranger" + ranger "storj.io/storj/pkg/ranger" + "storj.io/storj/pkg/storage/segments" + streamspb "storj.io/storj/protos/streams" ) +var mon = monkit.Package() + // Meta info about a segment type Meta struct { Modified time.Time @@ -20,13 +29,23 @@ type Meta struct { Data []byte } -// ListItem is a single item in a listing -type ListItem struct { - Path paths.Path - Meta Meta +// convertMeta converts segment metadata to stream metadata +func convertMeta(segmentMeta segments.Meta) (Meta, error) { + msi := streamspb.MetaStreamInfo{} + err := proto.Unmarshal(segmentMeta.Data, &msi) + if err != nil { + return Meta{}, err + } + + return Meta{ + Modified: segmentMeta.Modified, + Expiration: segmentMeta.Expiration, + Size: ((msi.NumberOfSegments - 1) * msi.SegmentsSize) + msi.LastSegmentSize, + Data: msi.Metadata, + }, nil } -// Store for streams +// Store interface methods for streams to satisfy to be a store type Store interface { Meta(ctx context.Context, path paths.Path) (Meta, error) Get(ctx context.Context, path paths.Path) (ranger.RangeCloser, Meta, error) @@ -37,3 +56,187 @@ type Store interface { recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error) } + +// streamStore is a store for streams +type streamStore struct { + segments segments.Store + segmentSize int64 +} + +// NewStreamStore stuff +func NewStreamStore(segments segments.Store, segmentSize int64) (Store, error) { + if segmentSize <= 0 { + return nil, errs.New("segment size must be larger than 0") + } + return &streamStore{segments: segments, segmentSize: segmentSize}, nil +} + +// Put breaks up data as it comes in into s.segmentSize length pieces, then +// store the first piece at s0/, second piece at s1/, and the +// *last* piece at l/. Store the given metadata, along with the number +// of segments, in a new protobuf, in the metadata of l/. +func (s *streamStore) Put(ctx context.Context, path paths.Path, data io.Reader, + metadata []byte, expiration time.Time) (m Meta, err error) { + defer mon.Task()(&ctx)(&err) + + var totalSegments int64 + var totalSize int64 + var lastSegmentSize int64 + + awareLimitReader := EOFAwareReader(data) + + for !awareLimitReader.isEOF() { + segmentPath := path.Prepend(fmt.Sprintf("s%d", totalSegments)) + segmentData := io.LimitReader(awareLimitReader, s.segmentSize) + + putMeta, err := s.segments.Put(ctx, segmentPath, segmentData, + nil, expiration) + if err != nil { + return Meta{}, err + } + lastSegmentSize = putMeta.Size + totalSize = totalSize + putMeta.Size + totalSegments = totalSegments + 1 + } + + lastSegmentPath := path.Prepend("l") + + md := streamspb.MetaStreamInfo{ + NumberOfSegments: totalSegments, + SegmentsSize: s.segmentSize, + LastSegmentSize: lastSegmentSize, + Metadata: metadata, + } + lastSegmentMetadata, err := proto.Marshal(&md) + if err != nil { + return Meta{}, err + } + + putMeta, err := s.segments.Put(ctx, lastSegmentPath, data, + lastSegmentMetadata, expiration) + if err != nil { + return Meta{}, err + } + totalSize = totalSize + putMeta.Size + + resultMeta := Meta{ + Modified: putMeta.Modified, + Expiration: expiration, + Size: totalSize, + Data: metadata, + } + + return resultMeta, nil +} + +// Get returns a ranger that knows what the overall size is (from l/) +// and then returns the appropriate data from segments s0/, s1/, +// ..., l/. +func (s *streamStore) Get(ctx context.Context, path paths.Path) ( + rr ranger.RangeCloser, meta Meta, err error) { + defer mon.Task()(&ctx)(&err) + + lastRangerCloser, lastSegmentMeta, err := s.segments.Get(ctx, path.Prepend("l")) + if err != nil { + return nil, Meta{}, err + } + + msi := streamspb.MetaStreamInfo{} + err = proto.Unmarshal(lastSegmentMeta.Data, &msi) + if err != nil { + return nil, Meta{}, err + } + + newMeta, err := convertMeta(lastSegmentMeta) + if err != nil { + return nil, Meta{}, err + } + + var rangers []ranger.Ranger + + for i := 0; i < int(msi.NumberOfSegments); i++ { + currentPath := fmt.Sprintf("s%d", i) + rangeCloser, _, err := s.segments.Get(ctx, path.Prepend(currentPath)) + if err != nil { + return nil, Meta{}, err + } + + rangers = append(rangers, rangeCloser) + } + + rangers = append(rangers, lastRangerCloser) + + catRangers := ranger.Concat(rangers...) + + return ranger.NopCloser(catRangers), newMeta, nil +} + +// Meta implements Store.Meta +func (s *streamStore) Meta(ctx context.Context, path paths.Path) (Meta, error) { + segmentMeta, err := s.segments.Meta(ctx, path.Prepend("l")) + if err != nil { + return Meta{}, err + } + + meta, err := convertMeta(segmentMeta) + if err != nil { + return Meta{}, err + } + + return meta, nil +} + +// Delete all the segments, with the last one last +func (s *streamStore) Delete(ctx context.Context, path paths.Path) (err error) { + defer mon.Task()(&ctx)(&err) + + lastSegmentMeta, err := s.segments.Meta(ctx, path.Prepend("l")) + if err != nil { + return err + } + + msi := streamspb.MetaStreamInfo{} + err = proto.Unmarshal(lastSegmentMeta.Data, &msi) + if err != nil { + return err + } + + for i := 0; i < int(msi.NumberOfSegments); i++ { + currentPath := fmt.Sprintf("s%d", i) + err := s.segments.Delete(ctx, path.Prepend(currentPath)) + if err != nil { + return err + } + } + + return s.segments.Delete(ctx, path.Prepend("l")) +} + +// ListItem is a single item in a listing +type ListItem struct { + Path paths.Path + Meta Meta +} + +// List all the paths inside l/, stripping off the l/ prefix +func (s *streamStore) List(ctx context.Context, prefix, startAfter, endBefore paths.Path, + recursive bool, limit int, metaFlags uint32) (items []ListItem, + more bool, err error) { + defer mon.Task()(&ctx)(&err) + + lItems, more, err := s.segments.List(ctx, prefix.Prepend("l"), startAfter, endBefore, recursive, limit, metaFlags) + if err != nil { + return nil, false, err + } + + items = make([]ListItem, len(lItems)) + for i, item := range lItems { + newMeta, err := convertMeta(item.Meta) + if err != nil { + return nil, false, err + } + items[i] = ListItem{Path: item.Path[1:], Meta: newMeta} + } + + return items, more, nil +} diff --git a/protos/streams/gen.go b/protos/streams/gen.go new file mode 100644 index 000000000..8d6c56094 --- /dev/null +++ b/protos/streams/gen.go @@ -0,0 +1,6 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package streams + +//go:generate protoc --go_out=plugins=grpc:. meta.proto diff --git a/protos/streams/meta.pb.go b/protos/streams/meta.pb.go new file mode 100644 index 000000000..941707847 --- /dev/null +++ b/protos/streams/meta.pb.go @@ -0,0 +1,102 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: meta.proto + +package streams + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type MetaStreamInfo struct { + NumberOfSegments int64 `protobuf:"varint,1,opt,name=number_of_segments,json=numberOfSegments" json:"number_of_segments,omitempty"` + SegmentsSize int64 `protobuf:"varint,2,opt,name=segments_size,json=segmentsSize" json:"segments_size,omitempty"` + LastSegmentSize int64 `protobuf:"varint,3,opt,name=last_segment_size,json=lastSegmentSize" json:"last_segment_size,omitempty"` + Metadata []byte `protobuf:"bytes,4,opt,name=metadata,proto3" json:"metadata,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *MetaStreamInfo) Reset() { *m = MetaStreamInfo{} } +func (m *MetaStreamInfo) String() string { return proto.CompactTextString(m) } +func (*MetaStreamInfo) ProtoMessage() {} +func (*MetaStreamInfo) Descriptor() ([]byte, []int) { + return fileDescriptor_meta_1e6a51dbfd2db316, []int{0} +} +func (m *MetaStreamInfo) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_MetaStreamInfo.Unmarshal(m, b) +} +func (m *MetaStreamInfo) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_MetaStreamInfo.Marshal(b, m, deterministic) +} +func (dst *MetaStreamInfo) XXX_Merge(src proto.Message) { + xxx_messageInfo_MetaStreamInfo.Merge(dst, src) +} +func (m *MetaStreamInfo) XXX_Size() int { + return xxx_messageInfo_MetaStreamInfo.Size(m) +} +func (m *MetaStreamInfo) XXX_DiscardUnknown() { + xxx_messageInfo_MetaStreamInfo.DiscardUnknown(m) +} + +var xxx_messageInfo_MetaStreamInfo proto.InternalMessageInfo + +func (m *MetaStreamInfo) GetNumberOfSegments() int64 { + if m != nil { + return m.NumberOfSegments + } + return 0 +} + +func (m *MetaStreamInfo) GetSegmentsSize() int64 { + if m != nil { + return m.SegmentsSize + } + return 0 +} + +func (m *MetaStreamInfo) GetLastSegmentSize() int64 { + if m != nil { + return m.LastSegmentSize + } + return 0 +} + +func (m *MetaStreamInfo) GetMetadata() []byte { + if m != nil { + return m.Metadata + } + return nil +} + +func init() { + proto.RegisterType((*MetaStreamInfo)(nil), "streams.MetaStreamInfo") +} + +func init() { proto.RegisterFile("meta.proto", fileDescriptor_meta_1e6a51dbfd2db316) } + +var fileDescriptor_meta_1e6a51dbfd2db316 = []byte{ + // 165 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xca, 0x4d, 0x2d, 0x49, + 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2f, 0x2e, 0x29, 0x4a, 0x4d, 0xcc, 0x2d, 0x56, + 0x5a, 0xcd, 0xc8, 0xc5, 0xe7, 0x9b, 0x5a, 0x92, 0x18, 0x0c, 0xe6, 0x7b, 0xe6, 0xa5, 0xe5, 0x0b, + 0xe9, 0x70, 0x09, 0xe5, 0x95, 0xe6, 0x26, 0xa5, 0x16, 0xc5, 0xe7, 0xa7, 0xc5, 0x17, 0xa7, 0xa6, + 0xe7, 0xa6, 0xe6, 0x95, 0x14, 0x4b, 0x30, 0x2a, 0x30, 0x6a, 0x30, 0x07, 0x09, 0x40, 0x64, 0xfc, + 0xd3, 0x82, 0xa1, 0xe2, 0x42, 0xca, 0x5c, 0xbc, 0x30, 0x35, 0xf1, 0xc5, 0x99, 0x55, 0xa9, 0x12, + 0x4c, 0x60, 0x85, 0x3c, 0x30, 0xc1, 0xe0, 0xcc, 0xaa, 0x54, 0x21, 0x2d, 0x2e, 0xc1, 0x9c, 0xc4, + 0xe2, 0x12, 0x98, 0x69, 0x10, 0x85, 0xcc, 0x60, 0x85, 0xfc, 0x20, 0x09, 0xa8, 0x69, 0x60, 0xb5, + 0x52, 0x5c, 0x1c, 0x20, 0x87, 0xa6, 0x24, 0x96, 0x24, 0x4a, 0xb0, 0x28, 0x30, 0x6a, 0xf0, 0x04, + 0xc1, 0xf9, 0x49, 0x6c, 0x60, 0xd7, 0x1b, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0x9a, 0x56, 0xdb, + 0x89, 0xcb, 0x00, 0x00, 0x00, +} diff --git a/protos/streams/meta.proto b/protos/streams/meta.proto new file mode 100644 index 000000000..7cc221304 --- /dev/null +++ b/protos/streams/meta.proto @@ -0,0 +1,13 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +syntax = "proto3"; + +package streams; + +message MetaStreamInfo { + int64 number_of_segments = 1; + int64 segments_size = 2; + int64 last_segment_size = 3; + bytes metadata = 4; +} \ No newline at end of file