Metainfo RPC segment methods (part 1) (#2567)

This commit is contained in:
Michal Niewrzal 2019-07-22 16:45:18 +02:00 committed by GitHub
parent 30eeb64816
commit 6f2b85603d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 2698 additions and 329 deletions

2
go.mod
View File

@ -115,7 +115,7 @@ require (
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.10.0
golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8
golang.org/x/net v0.0.0-20190628185345-da137c7871d7
golang.org/x/net v0.0.0-20190628185345-da137c7871d7 // indirect
golang.org/x/sync v0.0.0-20190423024810-112230192c58
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb
golang.org/x/text v0.3.2 // indirect

View File

@ -93,9 +93,18 @@ func EncodeVoucher(ctx context.Context, voucher *pb.Voucher) (_ []byte, err erro
func EncodeStreamID(ctx context.Context, streamID *pb.SatStreamID) (_ []byte, err error) {
defer mon.Task()(&ctx)(&err)
signature := streamID.SatelliteSignature
// TODO verify if that can cause race
streamID.SatelliteSignature = nil
out, err := proto.Marshal(streamID)
streamID.SatelliteSignature = signature
return out, err
}
// EncodeSegmentID encodes segment ID into bytes for signing.
func EncodeSegmentID(ctx context.Context, segmentID *pb.SatSegmentID) (_ []byte, err error) {
defer mon.Task()(&ctx)(&err)
signature := segmentID.SatelliteSignature
segmentID.SatelliteSignature = nil
out, err := proto.Marshal(segmentID)
segmentID.SatelliteSignature = signature
return out, err
}

View File

@ -127,3 +127,21 @@ func SignStreamID(ctx context.Context, signer Signer, unsigned *pb.SatStreamID)
return &signed, nil
}
// SignSegmentID signs the segment ID using the specified signer
// Signer is a satellite
func SignSegmentID(ctx context.Context, signer Signer, unsigned *pb.SatSegmentID) (_ *pb.SatSegmentID, err error) {
defer mon.Task()(&ctx)(&err)
bytes, err := EncodeSegmentID(ctx, unsigned)
if err != nil {
return nil, Error.Wrap(err)
}
signed := *unsigned
signed.SatelliteSignature, err = signer.HashAndSign(ctx, bytes)
if err != nil {
return nil, Error.Wrap(err)
}
return &signed, nil
}

View File

@ -93,3 +93,14 @@ func VerifyStreamID(ctx context.Context, satellite Signee, signed *pb.SatStreamI
return satellite.HashAndVerifySignature(ctx, bytes, signed.SatelliteSignature)
}
// VerifySegmentID verifies that the signature inside segment ID belongs to the satellite
func VerifySegmentID(ctx context.Context, satellite Signee, signed *pb.SatSegmentID) (err error) {
defer mon.Task()(&ctx)(&err)
bytes, err := EncodeSegmentID(ctx, signed)
if err != nil {
return Error.Wrap(err)
}
return satellite.HashAndVerifySignature(ctx, bytes, signed.SatelliteSignature)
}

File diff suppressed because it is too large Load Diff

View File

@ -28,6 +28,14 @@ service Metainfo {
rpc BeginDeleteObject(ObjectBeginDeleteRequest) returns (ObjectBeginDeleteResponse);
rpc FinishDeleteObject(ObjectFinishDeleteRequest) returns (ObjectFinishDeleteResponse);
rpc BeginSegment(SegmentBeginRequest) returns (SegmentBeginResponse);
rpc CommitSegment(SegmentCommitRequest) returns (SegmentCommitResponse);
rpc MakeInlineSegment(SegmentMakeInlineRequest) returns (SegmentMakeInlineResponse);
rpc BeginDeleteSegment(SegmentBeginDeleteRequest) returns (SegmentBeginDeleteResponse);
rpc FinishDeleteSegment(SegmentFinishDeleteRequest) returns (SegmentFinishDeleteResponse);
rpc ListSegments(SegmentListRequest) returns (SegmentListResponse);
rpc DownloadSegment(SegmentDownloadRequest) returns (SegmentDownloadResponse);
rpc CreateSegmentOld(SegmentWriteRequestOld) returns (SegmentWriteResponseOld);
rpc CommitSegmentOld(SegmentCommitRequestOld) returns (SegmentCommitResponseOld);
rpc SegmentInfoOld(SegmentInfoRequestOld) returns (SegmentInfoResponseOld);
@ -206,6 +214,10 @@ message ProjectInfoResponse {
bytes project_salt = 1;
}
//---------------------------
// Object
//---------------------------
message Object {
enum Status {
INVALID = 0;
@ -330,4 +342,140 @@ message SatStreamID {
google.protobuf.Timestamp expiration_date = 6 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
bytes satellite_signature = 7;
}
//---------------------------
// Segment
//---------------------------
message Segment {
bytes stream_id = 1 [(gogoproto.customtype) = "StreamID", (gogoproto.nullable) = false];
SegmentPosition position = 2;
bytes encrypted_key_nonce = 3 [(gogoproto.customtype) = "Nonce", (gogoproto.nullable) = false];
bytes encrypted_key = 4;
int64 size_encrypted_data = 5; // refers to segment size not piece size
bytes encrypted_inline_data = 6;
repeated Piece pieces = 7;
}
message Piece {
int32 piece_num = 1;
bytes node = 2[(gogoproto.customtype) = "NodeID", (gogoproto.nullable) = false];
}
message SegmentPosition {
int32 part_number = 1;
int32 index = 2;
}
message SegmentBeginRequest {
bytes stream_id = 1 [(gogoproto.customtype) = "StreamID", (gogoproto.nullable) = false];
SegmentPosition position = 2;
int64 max_order_limit = 3;
}
message SegmentBeginResponse {
bytes segment_id = 1 [(gogoproto.customtype) = "SegmentID", (gogoproto.nullable) = false];
repeated AddressedOrderLimit addressed_limits = 2;
bytes private_key = 3 [(gogoproto.customtype) = "PiecePrivateKey", (gogoproto.nullable) = false];
}
message SegmentCommitRequest {
bytes segment_id = 1 [(gogoproto.customtype) = "SegmentID", (gogoproto.nullable) = false];
bytes encrypted_key_nonce = 2 [(gogoproto.customtype) = "Nonce", (gogoproto.nullable) = false];
bytes encrypted_key = 3;
int64 size_encrypted_data = 4; // refers to segment size not piece size
repeated SegmentPieceUploadResult upload_result = 5;
}
message SegmentPieceUploadResult {
int32 piece_num = 1;
bytes node_id = 2 [(gogoproto.customtype) = "NodeID", (gogoproto.nullable) = false];
orders.PieceHash hash = 3;
}
// only for satellite use
message SatSegmentID {
SatStreamID stream_id = 1;
int32 part_number = 2;
int32 index = 3;
// TODO we have redundancy in SatStreamID, do we need it here?
// pointerdb.RedundancyScheme redundancy = 4;
bytes root_piece_id = 5 [(gogoproto.customtype) = "PieceID", (gogoproto.nullable) = false];
repeated AddressedOrderLimit original_order_limits = 6;
google.protobuf.Timestamp creation_date = 7 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
bytes satellite_signature = 8;
}
message SegmentCommitResponse {}
message SegmentMakeInlineRequest {
bytes stream_id = 1 [(gogoproto.customtype) = "StreamID", (gogoproto.nullable) = false];
SegmentPosition position = 2;
bytes encrypted_key_nonce = 3 [(gogoproto.customtype) = "Nonce", (gogoproto.nullable) = false];
bytes encrypted_key = 4;
bytes encrypted_inline_data = 5;
}
message SegmentMakeInlineResponse {}
message SegmentBeginDeleteRequest {
bytes stream_id = 1;
SegmentPosition position = 2;
}
message SegmentBeginDeleteResponse {
bytes segment_id = 1 [(gogoproto.customtype) = "SegmentID", (gogoproto.nullable) = false];
repeated AddressedOrderLimit addressed_limits = 2;
}
message SegmentFinishDeleteRequest {
bytes segment_id = 1 [(gogoproto.customtype) = "SegmentID", (gogoproto.nullable) = false];
repeated SegmentPieceDeleteResult results = 2;
}
message SegmentPieceDeleteResult {
int32 piece_num = 1;
bytes node_id = 2 [(gogoproto.customtype) = "NodeID", (gogoproto.nullable) = false];
orders.PieceHash hash = 3;
}
message SegmentFinishDeleteResponse {}
message SegmentListRequest {
bytes stream_id = 1;
SegmentPosition cursor_position = 2;
int32 limit = 3;
}
message SegmentListResponse {
repeated SegmentListItem items = 1;
bool more = 2;
}
message SegmentListItem {
SegmentPosition position = 1;
}
message SegmentDownloadRequest {
bytes stream_id = 1 [(gogoproto.customtype) = "StreamID", (gogoproto.nullable) = false];
SegmentPosition cursor_position = 2;
}
message SegmentDownloadResponse {
bytes segment_id = 1 [(gogoproto.customtype) = "SegmentID", (gogoproto.nullable) = false];
repeated AddressedOrderLimit addressed_limits = 2;
bytes encrypted_inline_data = 3;
SegmentPosition next = 4; // can be nil
}

View File

@ -31,3 +31,6 @@ type StreamID = storj.StreamID
// Nonce is an alias to storj.Nonce for use in generated protobuf code
type Nonce = storj.Nonce
// SegmentID is an alias to storj.SegmentID for use in generated protobuf code
type SegmentID = storj.SegmentID

81
pkg/storj/segmentid.go Normal file
View File

@ -0,0 +1,81 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package storj
import (
"encoding/base32"
"github.com/zeebo/errs"
)
// ErrSegmentID is used when something goes wrong with a segment ID
var ErrSegmentID = errs.Class("segment ID error")
// segmentIDEncoding is base32 without padding
var segmentIDEncoding = base32.StdEncoding.WithPadding(base32.NoPadding)
// SegmentID is the unique identifier for segment related to object
type SegmentID []byte
// SegmentIDFromString decodes an base32 encoded
func SegmentIDFromString(s string) (SegmentID, error) {
idBytes, err := segmentIDEncoding.DecodeString(s)
if err != nil {
return SegmentID{}, ErrSegmentID.Wrap(err)
}
return SegmentIDFromBytes(idBytes)
}
// SegmentIDFromBytes converts a byte slice into a segment ID
func SegmentIDFromBytes(b []byte) (SegmentID, error) {
// return error will be used in future implementation
id := make([]byte, len(b))
copy(id, b)
return id, nil
}
// IsZero returns whether segment ID is unassigned
func (id SegmentID) IsZero() bool {
return len(id) == 0
}
// String representation of the segment ID
func (id SegmentID) String() string { return segmentIDEncoding.EncodeToString(id.Bytes()) }
// Bytes returns bytes of the segment ID
func (id SegmentID) Bytes() []byte { return id[:] }
// Marshal serializes a segment ID (implements gogo's custom type interface)
func (id SegmentID) Marshal() ([]byte, error) {
return id.Bytes(), nil
}
// MarshalTo serializes a segment ID into the passed byte slice (implements gogo's custom type interface)
func (id *SegmentID) MarshalTo(data []byte) (n int, err error) {
return copy(data, id.Bytes()), nil
}
// Unmarshal deserializes a segment ID (implements gogo's custom type interface)
func (id *SegmentID) Unmarshal(data []byte) error {
var err error
*id, err = SegmentIDFromBytes(data)
return err
}
// Size returns the length of a segment ID (implements gogo's custom type interface)
func (id SegmentID) Size() int {
return len(id)
}
// MarshalJSON serializes a segment ID to a json string as bytes (implements gogo's custom type interface)
func (id SegmentID) MarshalJSON() ([]byte, error) {
return []byte(`"` + id.String() + `"`), nil
}
// UnmarshalJSON deserializes a json string (as bytes) to a segment ID (implements gogo's custom type interface)
func (id *SegmentID) UnmarshalJSON(data []byte) error {
var err error
*id, err = SegmentIDFromString(string(data))
return err
}

View File

@ -2712,6 +2712,583 @@
"type": "bytes"
}
]
},
{
"name": "Segment",
"fields": [
{
"id": 1,
"name": "stream_id",
"type": "bytes",
"options": [
{
"name": "(gogoproto.customtype)",
"value": "StreamID"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 2,
"name": "position",
"type": "SegmentPosition"
},
{
"id": 3,
"name": "encrypted_key_nonce",
"type": "bytes",
"options": [
{
"name": "(gogoproto.customtype)",
"value": "Nonce"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 4,
"name": "encrypted_key",
"type": "bytes"
},
{
"id": 5,
"name": "size_encrypted_data",
"type": "int64"
},
{
"id": 6,
"name": "encrypted_inline_data",
"type": "bytes"
},
{
"id": 7,
"name": "pieces",
"type": "Piece",
"is_repeated": true
}
]
},
{
"name": "Piece",
"fields": [
{
"id": 1,
"name": "piece_num",
"type": "int32"
},
{
"id": 2,
"name": "node",
"type": "bytes",
"options": [
{
"name": "(gogoproto.customtype)",
"value": "NodeID"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
}
]
},
{
"name": "SegmentPosition",
"fields": [
{
"id": 1,
"name": "part_number",
"type": "int32"
},
{
"id": 2,
"name": "index",
"type": "int32"
}
]
},
{
"name": "SegmentBeginRequest",
"fields": [
{
"id": 1,
"name": "stream_id",
"type": "bytes",
"options": [
{
"name": "(gogoproto.customtype)",
"value": "StreamID"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 2,
"name": "position",
"type": "SegmentPosition"
},
{
"id": 3,
"name": "max_order_limit",
"type": "int64"
}
]
},
{
"name": "SegmentBeginResponse",
"fields": [
{
"id": 1,
"name": "segment_id",
"type": "bytes",
"options": [
{
"name": "(gogoproto.customtype)",
"value": "SegmentID"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 2,
"name": "addressed_limits",
"type": "AddressedOrderLimit",
"is_repeated": true
},
{
"id": 3,
"name": "private_key",
"type": "bytes",
"options": [
{
"name": "(gogoproto.customtype)",
"value": "PiecePrivateKey"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
}
]
},
{
"name": "SegmentCommitRequest",
"fields": [
{
"id": 1,
"name": "segment_id",
"type": "bytes",
"options": [
{
"name": "(gogoproto.customtype)",
"value": "SegmentID"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 2,
"name": "encrypted_key_nonce",
"type": "bytes",
"options": [
{
"name": "(gogoproto.customtype)",
"value": "Nonce"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 3,
"name": "encrypted_key",
"type": "bytes"
},
{
"id": 4,
"name": "size_encrypted_data",
"type": "int64"
},
{
"id": 5,
"name": "upload_result",
"type": "SegmentPieceUploadResult",
"is_repeated": true
}
]
},
{
"name": "SegmentPieceUploadResult",
"fields": [
{
"id": 1,
"name": "piece_num",
"type": "int32"
},
{
"id": 2,
"name": "node_id",
"type": "bytes",
"options": [
{
"name": "(gogoproto.customtype)",
"value": "NodeID"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 3,
"name": "hash",
"type": "orders.PieceHash"
}
]
},
{
"name": "SatSegmentID",
"fields": [
{
"id": 1,
"name": "stream_id",
"type": "SatStreamID"
},
{
"id": 2,
"name": "part_number",
"type": "int32"
},
{
"id": 3,
"name": "index",
"type": "int32"
},
{
"id": 5,
"name": "root_piece_id",
"type": "bytes",
"options": [
{
"name": "(gogoproto.customtype)",
"value": "PieceID"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 6,
"name": "original_order_limits",
"type": "AddressedOrderLimit",
"is_repeated": true
},
{
"id": 7,
"name": "creation_date",
"type": "google.protobuf.Timestamp",
"options": [
{
"name": "(gogoproto.stdtime)",
"value": "true"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 8,
"name": "satellite_signature",
"type": "bytes"
}
]
},
{
"name": "SegmentCommitResponse"
},
{
"name": "SegmentMakeInlineRequest",
"fields": [
{
"id": 1,
"name": "stream_id",
"type": "bytes",
"options": [
{
"name": "(gogoproto.customtype)",
"value": "StreamID"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 2,
"name": "position",
"type": "SegmentPosition"
},
{
"id": 3,
"name": "encrypted_key_nonce",
"type": "bytes",
"options": [
{
"name": "(gogoproto.customtype)",
"value": "Nonce"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 4,
"name": "encrypted_key",
"type": "bytes"
},
{
"id": 5,
"name": "encrypted_inline_data",
"type": "bytes"
}
]
},
{
"name": "SegmentMakeInlineResponse"
},
{
"name": "SegmentBeginDeleteRequest",
"fields": [
{
"id": 1,
"name": "stream_id",
"type": "bytes"
},
{
"id": 2,
"name": "position",
"type": "SegmentPosition"
}
]
},
{
"name": "SegmentBeginDeleteResponse",
"fields": [
{
"id": 1,
"name": "segment_id",
"type": "bytes",
"options": [
{
"name": "(gogoproto.customtype)",
"value": "SegmentID"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 2,
"name": "addressed_limits",
"type": "AddressedOrderLimit",
"is_repeated": true
}
]
},
{
"name": "SegmentFinishDeleteRequest",
"fields": [
{
"id": 1,
"name": "segment_id",
"type": "bytes",
"options": [
{
"name": "(gogoproto.customtype)",
"value": "SegmentID"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 2,
"name": "results",
"type": "SegmentPieceDeleteResult",
"is_repeated": true
}
]
},
{
"name": "SegmentPieceDeleteResult",
"fields": [
{
"id": 1,
"name": "piece_num",
"type": "int32"
},
{
"id": 2,
"name": "node_id",
"type": "bytes",
"options": [
{
"name": "(gogoproto.customtype)",
"value": "NodeID"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 3,
"name": "hash",
"type": "orders.PieceHash"
}
]
},
{
"name": "SegmentFinishDeleteResponse"
},
{
"name": "SegmentListRequest",
"fields": [
{
"id": 1,
"name": "stream_id",
"type": "bytes"
},
{
"id": 2,
"name": "cursor_position",
"type": "SegmentPosition"
},
{
"id": 3,
"name": "limit",
"type": "int32"
}
]
},
{
"name": "SegmentListResponse",
"fields": [
{
"id": 1,
"name": "items",
"type": "SegmentListItem",
"is_repeated": true
},
{
"id": 2,
"name": "more",
"type": "bool"
}
]
},
{
"name": "SegmentListItem",
"fields": [
{
"id": 1,
"name": "position",
"type": "SegmentPosition"
}
]
},
{
"name": "SegmentDownloadRequest",
"fields": [
{
"id": 1,
"name": "stream_id",
"type": "bytes",
"options": [
{
"name": "(gogoproto.customtype)",
"value": "StreamID"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 2,
"name": "cursor_position",
"type": "SegmentPosition"
}
]
},
{
"name": "SegmentDownloadResponse",
"fields": [
{
"id": 1,
"name": "segment_id",
"type": "bytes",
"options": [
{
"name": "(gogoproto.customtype)",
"value": "SegmentID"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 2,
"name": "addressed_limits",
"type": "AddressedOrderLimit",
"is_repeated": true
},
{
"id": 3,
"name": "encrypted_inline_data",
"type": "bytes"
},
{
"id": 4,
"name": "next",
"type": "SegmentPosition"
}
]
}
],
"services": [
@ -2768,6 +3345,41 @@
"in_type": "ObjectFinishDeleteRequest",
"out_type": "ObjectFinishDeleteResponse"
},
{
"name": "BeginSegment",
"in_type": "SegmentBeginRequest",
"out_type": "SegmentBeginResponse"
},
{
"name": "CommitSegment",
"in_type": "SegmentCommitRequest",
"out_type": "SegmentCommitResponse"
},
{
"name": "MakeInlineSegment",
"in_type": "SegmentMakeInlineRequest",
"out_type": "SegmentMakeInlineResponse"
},
{
"name": "BeginDeleteSegment",
"in_type": "SegmentBeginDeleteRequest",
"out_type": "SegmentBeginDeleteResponse"
},
{
"name": "FinishDeleteSegment",
"in_type": "SegmentFinishDeleteRequest",
"out_type": "SegmentFinishDeleteResponse"
},
{
"name": "ListSegments",
"in_type": "SegmentListRequest",
"out_type": "SegmentListResponse"
},
{
"name": "DownloadSegment",
"in_type": "SegmentDownloadRequest",
"out_type": "SegmentDownloadResponse"
},
{
"name": "CreateSegmentOld",
"in_type": "SegmentWriteRequestOld",

View File

@ -1158,3 +1158,213 @@ func (endpoint *Endpoint) FinishDeleteObject(ctx context.Context, req *pb.Object
return &pb.ObjectFinishDeleteResponse{}, nil
}
// BeginSegment begins segment uploading
func (endpoint *Endpoint) BeginSegment(ctx context.Context, req *pb.SegmentBeginRequest) (resp *pb.SegmentBeginResponse, err error) {
defer mon.Task()(&ctx)(&err)
streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, err.Error())
}
_, err = endpoint.validateAuth(ctx, macaroon.Action{
Op: macaroon.ActionWrite,
Bucket: streamID.Bucket,
EncryptedPath: streamID.EncryptedPath,
Time: time.Now(),
})
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, err.Error())
}
// TODO implement logic
return &pb.SegmentBeginResponse{}, status.Error(codes.Unimplemented, "not implemented")
}
// CommitSegment commits segment after uploading
func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentCommitRequest) (resp *pb.SegmentCommitResponse, err error) {
defer mon.Task()(&ctx)(&err)
segmentID, err := endpoint.unmarshalSatSegmentID(ctx, req.SegmentId)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, err.Error())
}
streamID := segmentID.StreamId
_, err = endpoint.validateAuth(ctx, macaroon.Action{
Op: macaroon.ActionWrite,
Bucket: streamID.Bucket,
EncryptedPath: streamID.EncryptedPath,
Time: time.Now(),
})
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, err.Error())
}
// TODO implement logic
return &pb.SegmentCommitResponse{}, status.Error(codes.Unimplemented, "not implemented")
}
// MakeInlineSegment makes inline segment on satellite
func (endpoint *Endpoint) MakeInlineSegment(ctx context.Context, req *pb.SegmentMakeInlineRequest) (resp *pb.SegmentMakeInlineResponse, err error) {
defer mon.Task()(&ctx)(&err)
streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, err.Error())
}
_, err = endpoint.validateAuth(ctx, macaroon.Action{
Op: macaroon.ActionWrite,
Bucket: streamID.Bucket,
EncryptedPath: streamID.EncryptedPath,
Time: time.Now(),
})
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, err.Error())
}
// TODO implement logic
return &pb.SegmentMakeInlineResponse{}, status.Error(codes.Unimplemented, "not implemented")
}
// BeginDeleteSegment begins segment deletion process
func (endpoint *Endpoint) BeginDeleteSegment(ctx context.Context, req *pb.SegmentBeginDeleteRequest) (resp *pb.SegmentBeginDeleteResponse, err error) {
defer mon.Task()(&ctx)(&err)
streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, err.Error())
}
_, err = endpoint.validateAuth(ctx, macaroon.Action{
Op: macaroon.ActionDelete,
Bucket: streamID.Bucket,
EncryptedPath: streamID.EncryptedPath,
Time: time.Now(),
})
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, err.Error())
}
// TODO implement logic
return &pb.SegmentBeginDeleteResponse{}, status.Error(codes.Unimplemented, "not implemented")
}
// FinishDeleteSegment finishes segment deletion process
func (endpoint *Endpoint) FinishDeleteSegment(ctx context.Context, req *pb.SegmentFinishDeleteRequest) (resp *pb.SegmentFinishDeleteResponse, err error) {
defer mon.Task()(&ctx)(&err)
segmentID, err := endpoint.unmarshalSatSegmentID(ctx, req.SegmentId)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, err.Error())
}
streamID := segmentID.StreamId
_, err = endpoint.validateAuth(ctx, macaroon.Action{
Op: macaroon.ActionDelete,
Bucket: streamID.Bucket,
EncryptedPath: streamID.EncryptedPath,
Time: time.Now(),
})
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, err.Error())
}
// TODO implement logic
return &pb.SegmentFinishDeleteResponse{}, status.Error(codes.Unimplemented, "not implemented")
}
// ListSegments list segments
func (endpoint *Endpoint) ListSegments(ctx context.Context, req *pb.SegmentListRequest) (resp *pb.SegmentListResponse, err error) {
defer mon.Task()(&ctx)(&err)
streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, err.Error())
}
_, err = endpoint.validateAuth(ctx, macaroon.Action{
Op: macaroon.ActionList,
Bucket: streamID.Bucket,
EncryptedPath: streamID.EncryptedPath,
Time: time.Now(),
})
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, err.Error())
}
// TODO implement logic
return &pb.SegmentListResponse{}, status.Error(codes.Unimplemented, "not implemented")
}
// DownloadSegment returns data necessary to download segment
func (endpoint *Endpoint) DownloadSegment(ctx context.Context, req *pb.SegmentDownloadRequest) (resp *pb.SegmentDownloadResponse, err error) {
defer mon.Task()(&ctx)(&err)
streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, err.Error())
}
_, err = endpoint.validateAuth(ctx, macaroon.Action{
Op: macaroon.ActionRead,
Bucket: streamID.Bucket,
EncryptedPath: streamID.EncryptedPath,
Time: time.Now(),
})
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, err.Error())
}
// TODO implement logic
return &pb.SegmentDownloadResponse{}, status.Error(codes.Unimplemented, "not implemented")
}
func (endpoint *Endpoint) unmarshalSatStreamID(ctx context.Context, streamID storj.StreamID) (*pb.SatStreamID, error) {
satStreamID := &pb.SatStreamID{}
err := proto.Unmarshal(streamID, satStreamID)
if err != nil {
return nil, err
}
err = signing.VerifyStreamID(ctx, endpoint.satellite, satStreamID)
if err != nil {
return nil, err
}
if satStreamID.CreationDate.Before(time.Now().Add(-satIDExpiration)) {
return nil, errs.New("stream ID expired")
}
return satStreamID, nil
}
func (endpoint *Endpoint) unmarshalSatSegmentID(ctx context.Context, segmentID storj.SegmentID) (*pb.SatSegmentID, error) {
satSegmentID := &pb.SatSegmentID{}
err := proto.Unmarshal(segmentID, satSegmentID)
if err != nil {
return nil, err
}
err = signing.VerifySegmentID(ctx, endpoint.satellite, satSegmentID)
if err != nil {
return nil, err
}
if satSegmentID.CreationDate.Before(time.Now().Add(-satIDExpiration)) {
return nil, errs.New("segment ID expired")
}
return satSegmentID, nil
}