satellite/gracefulexit: Add signatures for success/failed exit finished messages. (#3368)

* add signatures, fix process loop bug, move delete to on success

* added tests for signatures

* PR comment updates

* fixed setting reason by default.

* updates for PR comments

* added signed failure when verificationi fails

* moved to sign_test

* fix panic

* removed testplanet from test
This commit is contained in:
Ethan Adams 2019-10-25 16:36:26 -04:00 committed by Yingrong Zhao
parent 6df4d7bc73
commit e54d290d2e
10 changed files with 467 additions and 123 deletions

View File

@ -8,15 +8,18 @@ import (
fmt "fmt"
_ "github.com/gogo/protobuf/gogoproto"
proto "github.com/gogo/protobuf/proto"
_ "github.com/golang/protobuf/ptypes/timestamp"
grpc "google.golang.org/grpc"
math "math"
drpc "storj.io/drpc"
time "time"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
var _ = time.Kitchen
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
@ -694,10 +697,16 @@ var xxx_messageInfo_DeletePiece proto.InternalMessageInfo
type ExitCompleted struct {
// when everything is completed
ExitCompleteSignature []byte `protobuf:"bytes,1,opt,name=exit_complete_signature,json=exitCompleteSignature,proto3" json:"exit_complete_signature,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
ExitCompleteSignature []byte `protobuf:"bytes,1,opt,name=exit_complete_signature,json=exitCompleteSignature,proto3" json:"exit_complete_signature,omitempty"`
// satellite who issued this exit completed
SatelliteId NodeID `protobuf:"bytes,2,opt,name=satellite_id,json=satelliteId,proto3,customtype=NodeID" json:"satellite_id"`
// storage node this exit completed was issued to
NodeId NodeID `protobuf:"bytes,3,opt,name=node_id,json=nodeId,proto3,customtype=NodeID" json:"node_id"`
// timestamp when the exit completed
Completed time.Time `protobuf:"bytes,4,opt,name=completed,proto3,stdtime" json:"completed"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ExitCompleted) Reset() { *m = ExitCompleted{} }
@ -731,13 +740,26 @@ func (m *ExitCompleted) GetExitCompleteSignature() []byte {
return nil
}
func (m *ExitCompleted) GetCompleted() time.Time {
if m != nil {
return m.Completed
}
return time.Time{}
}
type ExitFailed struct {
// on failure
ExitFailureSignature []byte `protobuf:"bytes,1,opt,name=exit_failure_signature,json=exitFailureSignature,proto3" json:"exit_failure_signature,omitempty"`
Reason ExitFailed_Reason `protobuf:"varint,2,opt,name=reason,proto3,enum=gracefulexit.ExitFailed_Reason" json:"reason,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
// satellite who issued this exit failed
SatelliteId NodeID `protobuf:"bytes,3,opt,name=satellite_id,json=satelliteId,proto3,customtype=NodeID" json:"satellite_id"`
// storage node this exit failed was issued to
NodeId NodeID `protobuf:"bytes,4,opt,name=node_id,json=nodeId,proto3,customtype=NodeID" json:"node_id"`
// timestamp when the exit failed
Failed time.Time `protobuf:"bytes,5,opt,name=failed,proto3,stdtime" json:"failed"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ExitFailed) Reset() { *m = ExitFailed{} }
@ -778,6 +800,13 @@ func (m *ExitFailed) GetReason() ExitFailed_Reason {
return ExitFailed_VERIFICATION_FAILED
}
func (m *ExitFailed) GetFailed() time.Time {
if m != nil {
return m.Failed
}
return time.Time{}
}
type SatelliteMessage struct {
// Types that are valid to be assigned to Message:
// *SatelliteMessage_NotReady
@ -1038,78 +1067,84 @@ func init() {
func init() { proto.RegisterFile("gracefulexit.proto", fileDescriptor_8f0acbf2ce5fa631) }
var fileDescriptor_8f0acbf2ce5fa631 = []byte{
// 1128 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x56, 0xef, 0x6e, 0xe3, 0x44,
0x10, 0x8f, 0x73, 0xd7, 0xb4, 0x99, 0xa4, 0x69, 0xba, 0xfd, 0x17, 0xd2, 0xbb, 0x26, 0x18, 0xd0,
0xe5, 0x24, 0xa8, 0x50, 0x80, 0x03, 0xe9, 0x24, 0x90, 0xdb, 0x38, 0x89, 0xb9, 0xd4, 0xe9, 0x6d,
0xd2, 0x72, 0x42, 0x02, 0xcb, 0x17, 0x4f, 0x53, 0xeb, 0x12, 0x3b, 0xac, 0x9d, 0xd3, 0xf5, 0x0b,
0xcf, 0xc1, 0x67, 0x78, 0x19, 0x78, 0x04, 0x90, 0x38, 0x89, 0x6f, 0x3c, 0x06, 0x5a, 0x7b, 0xe3,
0xda, 0x49, 0x5a, 0x2a, 0xdd, 0xa7, 0xc4, 0x33, 0xbf, 0x99, 0x9d, 0x9d, 0xf9, 0xcd, 0xec, 0x00,
0x19, 0x32, 0x73, 0x80, 0x17, 0xd3, 0x11, 0xbe, 0xb1, 0xfd, 0xc3, 0x09, 0x73, 0x7d, 0x97, 0xe4,
0xe3, 0xb2, 0x32, 0x0c, 0xdd, 0xa1, 0x1b, 0x6a, 0xca, 0x85, 0x31, 0xfa, 0xa6, 0xed, 0x5c, 0xcc,
0xbe, 0xf3, 0x2e, 0xb3, 0x90, 0x79, 0xe1, 0x97, 0x5c, 0x85, 0x83, 0x16, 0xfa, 0xba, 0xeb, 0xa8,
0x6f, 0x6c, 0xdf, 0x76, 0x86, 0x3d, 0xd3, 0xc7, 0xd1, 0xc8, 0xf6, 0xd1, 0xa3, 0xf8, 0xd3, 0x14,
0x3d, 0x5f, 0x6e, 0xc2, 0xbe, 0xe6, 0xd8, 0xbe, 0x6d, 0xfa, 0xd8, 0x12, 0x67, 0x70, 0xac, 0x50,
0x93, 0x47, 0xb0, 0xea, 0xb8, 0x16, 0x1a, 0xb6, 0x55, 0x92, 0xaa, 0x52, 0x2d, 0x7f, 0x54, 0xf8,
0xfd, 0x6d, 0x25, 0xf5, 0xd7, 0xdb, 0x4a, 0x46, 0x77, 0x2d, 0xd4, 0x1a, 0x34, 0xc3, 0xd5, 0x9a,
0x25, 0xff, 0x0c, 0x5b, 0x4b, 0x8e, 0xb9, 0xb3, 0x3d, 0xa9, 0x40, 0xce, 0x72, 0xc7, 0xa6, 0xed,
0x18, 0x8e, 0x39, 0xc6, 0x52, 0xba, 0x2a, 0xd5, 0xb2, 0x14, 0x42, 0x91, 0x6e, 0x8e, 0x91, 0x3c,
0x04, 0xf0, 0x26, 0xe6, 0x00, 0x8d, 0xa9, 0x87, 0x56, 0xe9, 0x5e, 0x55, 0xaa, 0x49, 0x34, 0x1b,
0x48, 0xce, 0x3c, 0xb4, 0x64, 0x0b, 0x2a, 0x37, 0xde, 0xd4, 0x9b, 0xb8, 0x8e, 0x87, 0x44, 0x01,
0xf0, 0x22, 0x69, 0x49, 0xaa, 0xde, 0xab, 0xe5, 0xea, 0xef, 0x1f, 0x26, 0xb2, 0xbd, 0xc4, 0x9e,
0xc6, 0x8c, 0xe4, 0x12, 0xec, 0xb6, 0xd0, 0xe7, 0x90, 0x53, 0xe6, 0x0e, 0x19, 0x7a, 0x51, 0x1e,
0x9f, 0xc3, 0xde, 0x82, 0x46, 0x9c, 0xfb, 0x04, 0xd6, 0x26, 0x42, 0x26, 0x4e, 0x2d, 0x27, 0x4f,
0x4d, 0x58, 0x45, 0x58, 0xf9, 0x37, 0x09, 0xf2, 0x71, 0xd5, 0x7c, 0x8e, 0xa4, 0x85, 0x1c, 0xc5,
0xb2, 0x9d, 0xbe, 0x35, 0xdb, 0x8f, 0xa1, 0x38, 0x41, 0x36, 0x40, 0xc7, 0x37, 0x06, 0xee, 0x78,
0x32, 0x42, 0x1f, 0x83, 0x94, 0xa6, 0xe9, 0x86, 0x90, 0x1f, 0x0b, 0x31, 0x39, 0x00, 0xf0, 0xa6,
0x83, 0x01, 0x7a, 0xde, 0xc5, 0x74, 0x54, 0xba, 0x5f, 0x95, 0x6a, 0x6b, 0x34, 0x26, 0x91, 0x7f,
0x4d, 0xc3, 0x66, 0x9f, 0x99, 0x8e, 0x77, 0x81, 0xac, 0xc7, 0xc5, 0x68, 0xa1, 0x45, 0x1a, 0xb0,
0xed, 0x32, 0x7b, 0x68, 0x3b, 0xe6, 0xc8, 0x08, 0x18, 0x69, 0x8c, 0xec, 0xb1, 0xed, 0x07, 0x31,
0xe7, 0xea, 0xe4, 0x50, 0xb0, 0xb4, 0xcb, 0x7f, 0x3a, 0x5c, 0x43, 0xc9, 0x0c, 0x7f, 0x2d, 0x23,
0x0a, 0x6c, 0x45, 0x5e, 0x26, 0x36, 0x0e, 0xd0, 0xb8, 0x34, 0xbd, 0xcb, 0xe0, 0x6e, 0xb9, 0xfa,
0xe6, 0xcc, 0xc9, 0x29, 0xd7, 0xb4, 0x4d, 0xef, 0x92, 0x6e, 0xce, 0xd0, 0x91, 0x88, 0xb4, 0x60,
0x97, 0xe1, 0x64, 0x64, 0x0e, 0x70, 0xcc, 0x6f, 0x1b, 0xf3, 0x72, 0xef, 0x26, 0x2f, 0xdb, 0x31,
0x83, 0x6b, 0x47, 0x4f, 0x61, 0x73, 0x2e, 0x16, 0xdb, 0x0a, 0xd2, 0x91, 0x3f, 0xda, 0x10, 0x59,
0x5e, 0x0d, 0xd0, 0x5a, 0x83, 0x6e, 0x24, 0xe2, 0xd0, 0x2c, 0xf9, 0x1f, 0x09, 0x0a, 0xb3, 0x24,
0x35, 0x4d, 0x7b, 0x84, 0xd6, 0x72, 0x7f, 0xd2, 0xdd, 0xfc, 0x91, 0xaf, 0x60, 0x05, 0x19, 0x73,
0x59, 0x90, 0x8a, 0x42, 0x5d, 0x4e, 0xf2, 0x29, 0x79, 0xd2, 0xa1, 0xca, 0x91, 0x34, 0x34, 0x90,
0x5f, 0xc0, 0x4a, 0xf0, 0x4d, 0xd6, 0x21, 0xab, 0x77, 0xfb, 0x46, 0xb3, 0x7b, 0xa6, 0x37, 0x8a,
0x29, 0xf2, 0x00, 0x4a, 0xbd, 0x7e, 0x97, 0x2a, 0x2d, 0xd5, 0xd0, 0xbb, 0x0d, 0xd5, 0x38, 0xd3,
0x95, 0x73, 0x45, 0xeb, 0x28, 0x47, 0x1d, 0xb5, 0x28, 0x91, 0x1d, 0xd8, 0x6c, 0x2b, 0xbd, 0xb6,
0x71, 0xae, 0x52, 0xad, 0xa9, 0x1d, 0x2b, 0x7d, 0xad, 0xab, 0x17, 0xd3, 0x24, 0x07, 0xab, 0x67,
0xfa, 0x33, 0xbd, 0xfb, 0x9d, 0x5e, 0x04, 0xf9, 0x17, 0x09, 0x48, 0xcf, 0x77, 0x99, 0x39, 0x44,
0xce, 0xb6, 0x13, 0xf4, 0x3c, 0x73, 0x88, 0xe4, 0x1b, 0xc8, 0x7a, 0x33, 0x5a, 0x88, 0xf2, 0x57,
0x96, 0x87, 0x1b, 0xb1, 0xa7, 0x9d, 0xa2, 0xd7, 0x36, 0xe4, 0x09, 0x64, 0x2e, 0x82, 0x8b, 0x88,
0xba, 0x3f, 0xb8, 0xed, 0xb2, 0xed, 0x14, 0x15, 0xe8, 0xa3, 0x2c, 0xac, 0x8a, 0x18, 0x64, 0x80,
0x35, 0xdd, 0xf5, 0x29, 0x9a, 0xd6, 0x95, 0xfc, 0xa7, 0x04, 0xeb, 0x33, 0x9b, 0x20, 0x9d, 0xef,
0x5a, 0x89, 0xdc, 0x84, 0xd9, 0xaf, 0x4d, 0x1f, 0x8d, 0x57, 0x78, 0x25, 0xda, 0x6e, 0x4f, 0x98,
0x6d, 0x04, 0xa8, 0xd3, 0x50, 0xff, 0x0c, 0xaf, 0x28, 0x4c, 0xa2, 0xff, 0xe4, 0x39, 0xec, 0x98,
0x96, 0xc5, 0x1b, 0x1b, 0xad, 0x44, 0x8f, 0x84, 0xc4, 0x7c, 0x78, 0x18, 0x4d, 0x76, 0x65, 0x06,
0x8b, 0xb5, 0xcb, 0x96, 0xb9, 0x28, 0x94, 0xbf, 0x85, 0x5c, 0x03, 0x79, 0xd7, 0xbe, 0xfb, 0xc5,
0xe4, 0x16, 0xac, 0xf3, 0xe1, 0x33, 0x9b, 0x03, 0xbc, 0x0e, 0x7b, 0x3c, 0xe1, 0xd1, 0xc0, 0x30,
0x3c, 0x7b, 0xe8, 0x98, 0xfe, 0x94, 0x85, 0x93, 0x28, 0x4f, 0x77, 0x30, 0x86, 0xef, 0xcd, 0x94,
0xf2, 0xbf, 0x12, 0x00, 0xf7, 0x24, 0x78, 0xff, 0x39, 0xec, 0x06, 0x6e, 0x78, 0x95, 0xa6, 0x6c,
0xd1, 0xcb, 0x36, 0x0a, 0xec, 0x94, 0x5d, 0x3b, 0x21, 0x5f, 0x42, 0x86, 0xa1, 0xe9, 0xb9, 0x8e,
0x60, 0x7c, 0x65, 0x71, 0x82, 0x0a, 0xb6, 0xd3, 0x00, 0x46, 0x05, 0x5c, 0x7e, 0x05, 0x99, 0x50,
0x42, 0xf6, 0x60, 0x2b, 0x4e, 0x5f, 0xa3, 0xa9, 0x68, 0x1d, 0x95, 0x53, 0xbf, 0x02, 0xfb, 0x9a,
0xae, 0x1c, 0xf7, 0xb5, 0x73, 0xd5, 0xe8, 0x6b, 0x27, 0x6a, 0x93, 0x2a, 0x27, 0xaa, 0xa1, 0xbe,
0x38, 0x56, 0xd5, 0x86, 0xda, 0x28, 0x4a, 0xe4, 0x11, 0x7c, 0xd0, 0x3d, 0x57, 0xa9, 0xd2, 0xe9,
0x04, 0x46, 0x67, 0x54, 0x35, 0x4e, 0x55, 0x7a, 0xac, 0xea, 0x7d, 0xde, 0x2e, 0x11, 0x30, 0x2d,
0xff, 0x9d, 0x86, 0x62, 0xf4, 0x70, 0xcc, 0x1a, 0xe0, 0x0b, 0xc8, 0x3a, 0xae, 0x6f, 0x30, 0xce,
0x3e, 0xd1, 0x00, 0xbb, 0xf3, 0xaf, 0x4e, 0xc8, 0xcd, 0x76, 0x8a, 0xae, 0x39, 0xe2, 0x3f, 0x69,
0x40, 0xc1, 0x17, 0x34, 0x0d, 0x8b, 0x27, 0xe8, 0xbf, 0xbf, 0x9c, 0xfe, 0xe1, 0xa0, 0x4a, 0xd1,
0x75, 0x3f, 0xc1, 0xed, 0xaf, 0x21, 0x6f, 0x05, 0x8c, 0x10, 0x3e, 0x42, 0x6e, 0xbd, 0x97, 0xf4,
0x11, 0xe3, 0x4c, 0x3b, 0x45, 0x73, 0x56, 0x8c, 0x42, 0x0d, 0x28, 0x24, 0x8a, 0x1e, 0x8e, 0xbc,
0x85, 0x28, 0x12, 0x4c, 0xe1, 0x51, 0x60, 0x82, 0x3a, 0x4f, 0x21, 0x17, 0xd5, 0x1c, 0xad, 0xd2,
0x4a, 0xe0, 0xa2, 0x74, 0x53, 0x09, 0xdb, 0x29, 0x0a, 0x18, 0x7d, 0xc5, 0xfa, 0xb8, 0xfe, 0x47,
0x1a, 0x8a, 0x7c, 0xb6, 0xc4, 0x37, 0x15, 0xf2, 0x3a, 0x78, 0x79, 0x97, 0xbd, 0xfc, 0xe4, 0xe3,
0xe4, 0x11, 0xb7, 0xaf, 0x42, 0xe5, 0x4f, 0xee, 0x88, 0x16, 0xcf, 0xfa, 0x0f, 0xb0, 0xbd, 0x6c,
0x73, 0x22, 0x8f, 0x93, 0x6e, 0x6e, 0xd9, 0xae, 0xca, 0xb7, 0xec, 0x01, 0xe4, 0x47, 0xd8, 0x98,
0x5b, 0x28, 0xc8, 0x87, 0x0b, 0x01, 0x2e, 0xd9, 0x44, 0xca, 0x1f, 0xfd, 0x0f, 0x2a, 0x0c, 0xbf,
0x7e, 0x09, 0x3b, 0xd1, 0xa5, 0x12, 0xf1, 0x77, 0x61, 0xf5, 0x94, 0xb9, 0xfc, 0x79, 0x27, 0xd5,
0xa4, 0xab, 0xc5, 0xe9, 0x5e, 0x3e, 0x98, 0x43, 0xcc, 0x91, 0xbf, 0x26, 0x7d, 0x2a, 0x1d, 0xdd,
0xff, 0x3e, 0x3d, 0x79, 0xf9, 0x32, 0x13, 0x6c, 0xa4, 0x9f, 0xfd, 0x17, 0x00, 0x00, 0xff, 0xff,
0x42, 0x7c, 0x67, 0xa5, 0xdf, 0x0a, 0x00, 0x00,
// 1223 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x56, 0xdf, 0x72, 0xdb, 0xc4,
0x17, 0xb6, 0x9c, 0x36, 0x89, 0x8f, 0x9d, 0xc4, 0xd9, 0xfc, 0xf3, 0xcf, 0x69, 0x6b, 0xff, 0x04,
0x4c, 0xd3, 0x19, 0x70, 0xc1, 0x40, 0x61, 0xa6, 0x0c, 0x8c, 0x1c, 0xcb, 0xb1, 0xa8, 0x2b, 0xa7,
0x6b, 0x27, 0x74, 0x98, 0x01, 0x8d, 0x6a, 0x1d, 0x3b, 0x9a, 0xda, 0x92, 0x90, 0xe4, 0x4e, 0x7b,
0xc3, 0x05, 0x4f, 0xc0, 0x25, 0xd7, 0xf0, 0x32, 0xf0, 0x08, 0x30, 0x43, 0x19, 0xae, 0x78, 0x0d,
0x66, 0xa5, 0x95, 0x22, 0xd9, 0x8e, 0x69, 0xa7, 0x57, 0xb6, 0xce, 0x7e, 0xe7, 0xec, 0xd9, 0xb3,
0xe7, 0xfb, 0xf6, 0x00, 0x19, 0xb9, 0xfa, 0x00, 0x87, 0xd3, 0x31, 0x3e, 0x37, 0xfd, 0x9a, 0xe3,
0xda, 0xbe, 0x4d, 0x0a, 0x49, 0x5b, 0x19, 0x46, 0xf6, 0xc8, 0x0e, 0x57, 0xca, 0x95, 0x91, 0x6d,
0x8f, 0xc6, 0x78, 0x37, 0xf8, 0x7a, 0x32, 0x1d, 0xde, 0xf5, 0xcd, 0x09, 0x7a, 0xbe, 0x3e, 0x71,
0x38, 0x60, 0x73, 0x82, 0xbe, 0x6e, 0x5a, 0xc3, 0xc8, 0xa1, 0x60, 0xbb, 0x06, 0xba, 0x5e, 0xf8,
0x25, 0x56, 0xe1, 0xd6, 0x09, 0xfa, 0xaa, 0x6d, 0xc9, 0xcf, 0x4d, 0xdf, 0xb4, 0x46, 0x3d, 0xdd,
0xc7, 0xf1, 0xd8, 0xf4, 0xd1, 0xa3, 0xf8, 0xdd, 0x14, 0x3d, 0x5f, 0x6c, 0xc1, 0xa1, 0x62, 0x99,
0xbe, 0xa9, 0xfb, 0x78, 0xc2, 0x93, 0x60, 0x58, 0xbe, 0x4c, 0x6e, 0xc3, 0x9a, 0x65, 0x1b, 0xa8,
0x99, 0x46, 0x49, 0xa8, 0x0a, 0x47, 0x85, 0xc6, 0xe6, 0xaf, 0x2f, 0x2b, 0x99, 0x3f, 0x5e, 0x56,
0x56, 0x55, 0xdb, 0x40, 0xa5, 0x49, 0x57, 0xd9, 0xb2, 0x62, 0x88, 0xdf, 0xc3, 0xce, 0x82, 0x6d,
0x5e, 0xd9, 0x9f, 0x54, 0x20, 0x6f, 0xd8, 0x13, 0xdd, 0xb4, 0x34, 0x4b, 0x9f, 0x60, 0x29, 0x5b,
0x15, 0x8e, 0x72, 0x14, 0x42, 0x93, 0xaa, 0x4f, 0x90, 0xdc, 0x04, 0xf0, 0x1c, 0x7d, 0x80, 0xda,
0xd4, 0x43, 0xa3, 0xb4, 0x52, 0x15, 0x8e, 0x04, 0x9a, 0x0b, 0x2c, 0x67, 0x1e, 0x1a, 0xa2, 0x01,
0x95, 0x2b, 0x4f, 0xea, 0x39, 0xb6, 0xe5, 0x21, 0x91, 0x00, 0xbc, 0xd8, 0x5a, 0x12, 0xaa, 0x2b,
0x47, 0xf9, 0xfa, 0xff, 0x6b, 0xa9, 0xeb, 0x58, 0xe0, 0x4f, 0x13, 0x4e, 0x62, 0x09, 0xf6, 0x4f,
0xd0, 0x67, 0x90, 0x53, 0xd7, 0x1e, 0xb9, 0xe8, 0xc5, 0x75, 0x7c, 0x04, 0x07, 0x73, 0x2b, 0x7c,
0xdf, 0x7b, 0xb0, 0xee, 0x70, 0x1b, 0xdf, 0xb5, 0x9c, 0xde, 0x35, 0xe5, 0x15, 0x63, 0xc5, 0x5f,
0x04, 0x28, 0x24, 0x97, 0x66, 0x6b, 0x24, 0xcc, 0xd5, 0x28, 0x51, 0xed, 0xec, 0xd2, 0x6a, 0xdf,
0x81, 0xa2, 0x83, 0xee, 0x00, 0x2d, 0x5f, 0x1b, 0xd8, 0x13, 0x67, 0x8c, 0x3e, 0x06, 0x25, 0xcd,
0xd2, 0x2d, 0x6e, 0x3f, 0xe6, 0x66, 0x72, 0x0b, 0xc0, 0x9b, 0x0e, 0x06, 0xe8, 0x79, 0xc3, 0xe9,
0xb8, 0x74, 0xad, 0x2a, 0x1c, 0xad, 0xd3, 0x84, 0x45, 0xfc, 0x39, 0x0b, 0xdb, 0x7d, 0x57, 0xb7,
0xbc, 0x21, 0xba, 0x3d, 0x66, 0x46, 0x03, 0x0d, 0xd2, 0x84, 0x5d, 0xdb, 0x35, 0x47, 0xa6, 0xa5,
0x8f, 0xb5, 0xa0, 0x23, 0xb5, 0xb1, 0x39, 0x31, 0xfd, 0x20, 0xe7, 0x7c, 0x9d, 0xd4, 0x78, 0x97,
0x76, 0xd9, 0x4f, 0x87, 0xad, 0x50, 0x12, 0xe1, 0x2f, 0x6d, 0x44, 0x82, 0x9d, 0x38, 0x8a, 0x63,
0xe2, 0x00, 0xb5, 0x0b, 0xdd, 0xbb, 0x08, 0xce, 0x96, 0xaf, 0x6f, 0x47, 0x41, 0x4e, 0xd9, 0x4a,
0x5b, 0xf7, 0x2e, 0xe8, 0x76, 0x84, 0x8e, 0x4d, 0xe4, 0x04, 0xf6, 0x5d, 0x74, 0xc6, 0xfa, 0x00,
0x27, 0xec, 0xb4, 0x89, 0x28, 0x2b, 0x57, 0x45, 0xd9, 0x4d, 0x38, 0x5c, 0x06, 0xba, 0x0f, 0xdb,
0x33, 0xb9, 0x98, 0x46, 0x50, 0x8e, 0x42, 0x63, 0x8b, 0x57, 0x79, 0x2d, 0x40, 0x2b, 0x4d, 0xba,
0x95, 0xca, 0x43, 0x31, 0xc4, 0xbf, 0x05, 0xd8, 0x8c, 0x8a, 0xd4, 0xd2, 0xcd, 0x31, 0x1a, 0x8b,
0xe3, 0x09, 0xaf, 0x16, 0x8f, 0x7c, 0x0a, 0xd7, 0xd1, 0x75, 0x6d, 0x37, 0x28, 0xc5, 0x66, 0x5d,
0x4c, 0xf7, 0x53, 0x7a, 0xa7, 0x9a, 0xcc, 0x90, 0x34, 0x74, 0x10, 0x1f, 0xc3, 0xf5, 0xe0, 0x9b,
0x6c, 0x40, 0x4e, 0xed, 0xf6, 0xb5, 0x56, 0xf7, 0x4c, 0x6d, 0x16, 0x33, 0xe4, 0x06, 0x94, 0x7a,
0xfd, 0x2e, 0x95, 0x4e, 0x64, 0x4d, 0xed, 0x36, 0x65, 0xed, 0x4c, 0x95, 0xce, 0x25, 0xa5, 0x23,
0x35, 0x3a, 0x72, 0x51, 0x20, 0x7b, 0xb0, 0xdd, 0x96, 0x7a, 0x6d, 0xed, 0x5c, 0xa6, 0x4a, 0x4b,
0x39, 0x96, 0xfa, 0x4a, 0x57, 0x2d, 0x66, 0x49, 0x1e, 0xd6, 0xce, 0xd4, 0x07, 0x6a, 0xf7, 0x2b,
0xb5, 0x08, 0xe2, 0x4f, 0x02, 0x90, 0x9e, 0x6f, 0xbb, 0xfa, 0x08, 0x59, 0xb7, 0x3d, 0x44, 0xcf,
0xd3, 0x47, 0x48, 0xbe, 0x80, 0x9c, 0x17, 0xb5, 0x05, 0xbf, 0xfe, 0xca, 0xe2, 0x74, 0xe3, 0xee,
0x69, 0x67, 0xe8, 0xa5, 0x0f, 0xb9, 0x07, 0xab, 0xc3, 0xe0, 0x20, 0xfc, 0xde, 0x6f, 0x2c, 0x3b,
0x6c, 0x3b, 0x43, 0x39, 0xba, 0x91, 0x83, 0x35, 0x9e, 0x83, 0x08, 0xb0, 0xae, 0xda, 0x3e, 0x45,
0xdd, 0x78, 0x21, 0xfe, 0x2e, 0xc0, 0x46, 0xe4, 0x13, 0x94, 0xf3, 0x4d, 0x6f, 0x22, 0xef, 0xb8,
0xe6, 0x33, 0xdd, 0x47, 0xed, 0x29, 0xbe, 0xe0, 0xb4, 0x3b, 0xe0, 0x6e, 0x5b, 0x01, 0xea, 0x34,
0x5c, 0x7f, 0x80, 0x2f, 0x28, 0x38, 0xf1, 0x7f, 0xf2, 0x08, 0xf6, 0x74, 0xc3, 0x60, 0xc4, 0x46,
0x23, 0xc5, 0x91, 0xb0, 0x31, 0x6f, 0xd6, 0x62, 0x65, 0x97, 0x22, 0x58, 0x82, 0x2e, 0x3b, 0xfa,
0xbc, 0x51, 0xfc, 0x12, 0xf2, 0x4d, 0x64, 0xac, 0x7d, 0xf3, 0x83, 0x89, 0xff, 0x08, 0xb0, 0xc1,
0xd4, 0x27, 0x12, 0x02, 0x76, 0x11, 0x07, 0xac, 0xe2, 0xb1, 0x62, 0x68, 0x9e, 0x39, 0xb2, 0x74,
0x7f, 0xea, 0x86, 0x52, 0x54, 0xa0, 0x7b, 0x98, 0xc0, 0xf7, 0xa2, 0x45, 0xf2, 0x01, 0x14, 0x62,
0x09, 0xbd, 0x5a, 0x9a, 0xf2, 0x31, 0x46, 0x31, 0x92, 0x42, 0xb6, 0xb2, 0x54, 0xc8, 0x1a, 0x90,
0x8b, 0xd2, 0x09, 0xd9, 0x18, 0x88, 0x6b, 0xf0, 0x66, 0xd6, 0xa2, 0x37, 0xb3, 0xd6, 0x8f, 0xde,
0xcc, 0xc6, 0x3a, 0x0b, 0xf3, 0xe3, 0x5f, 0x15, 0x81, 0x5e, 0xba, 0x89, 0x3f, 0xac, 0x00, 0xb0,
0x93, 0x72, 0x62, 0x7e, 0x04, 0xfb, 0xc1, 0x31, 0x59, 0x1b, 0x4d, 0xdd, 0xf9, 0x53, 0xee, 0x22,
0xc7, 0x4e, 0xdd, 0xc4, 0x21, 0x3f, 0x81, 0x55, 0x17, 0x75, 0xcf, 0xb6, 0x38, 0x25, 0x2b, 0xf3,
0x12, 0xcf, 0xe9, 0x48, 0x03, 0x18, 0xe5, 0xf0, 0xb9, 0xea, 0xac, 0xbc, 0x56, 0x75, 0xae, 0x2d,
0xad, 0xce, 0x67, 0x31, 0x75, 0xae, 0xbf, 0x46, 0x69, 0xb8, 0x8f, 0xf8, 0x14, 0x56, 0xc3, 0x5c,
0xc9, 0x01, 0xec, 0x24, 0x99, 0xaf, 0xb5, 0x24, 0xa5, 0x23, 0x33, 0xd5, 0xa8, 0xc0, 0xa1, 0xa2,
0x4a, 0xc7, 0x7d, 0xe5, 0x5c, 0xd6, 0xfa, 0xca, 0x43, 0xb9, 0x45, 0xa5, 0x87, 0xb2, 0x26, 0x3f,
0x3e, 0x96, 0xe5, 0xa6, 0xdc, 0x2c, 0x0a, 0xe4, 0x36, 0xbc, 0xd5, 0x3d, 0x97, 0xa9, 0xd4, 0xe9,
0x04, 0x4e, 0x67, 0x54, 0xd6, 0x4e, 0x65, 0x7a, 0x2c, 0xab, 0x7d, 0xa6, 0x34, 0x31, 0x30, 0x2b,
0xfe, 0x99, 0x85, 0x62, 0xfc, 0xe6, 0x46, 0xda, 0xf1, 0x31, 0xe4, 0x2c, 0xdb, 0xd7, 0x5c, 0x46,
0x5c, 0xae, 0x1d, 0xfb, 0xb3, 0x0f, 0x76, 0x48, 0xeb, 0x76, 0x86, 0xae, 0x5b, 0xfc, 0x3f, 0x69,
0xc2, 0xa6, 0xcf, 0x19, 0x1e, 0xf6, 0x3d, 0x57, 0x8e, 0xc3, 0xc5, 0xca, 0x11, 0x6a, 0x7c, 0x86,
0x6e, 0xf8, 0x29, 0x59, 0xf8, 0x1c, 0x0a, 0x46, 0x40, 0x26, 0x1e, 0x23, 0xa4, 0xe5, 0xff, 0xd2,
0x31, 0x12, 0x74, 0x6b, 0x67, 0x68, 0xde, 0x48, 0xb0, 0xaf, 0x09, 0x9b, 0x29, 0xba, 0x44, 0xfd,
0x79, 0x38, 0xdf, 0x19, 0x31, 0xc7, 0x58, 0x16, 0x98, 0x22, 0xdd, 0x7d, 0xc8, 0xc7, 0xdd, 0x18,
0xdf, 0x63, 0xe9, 0xaa, 0xe6, 0x6a, 0x67, 0x28, 0x60, 0xfc, 0x95, 0x90, 0xc0, 0xfa, 0x6f, 0x59,
0x28, 0xb2, 0xee, 0x48, 0x0e, 0x79, 0xe4, 0x59, 0x30, 0xb4, 0x2c, 0x1a, 0x9a, 0xc8, 0xbb, 0xe9,
0x2d, 0x96, 0x4f, 0x91, 0xe5, 0xf7, 0x5e, 0x11, 0xcd, 0x27, 0xa2, 0x6f, 0x60, 0x77, 0xd1, 0xd0,
0x49, 0xee, 0xa4, 0xc3, 0x2c, 0x19, 0x4c, 0xcb, 0x4b, 0x46, 0x28, 0xf2, 0x2d, 0x6c, 0xcd, 0xcc,
0x62, 0xe4, 0xed, 0xb9, 0x04, 0x17, 0x0c, 0x71, 0xe5, 0x77, 0xfe, 0x03, 0x15, 0xa6, 0x5f, 0xbf,
0x80, 0xbd, 0xf8, 0x50, 0xa9, 0xfc, 0xbb, 0xb0, 0x76, 0xea, 0xda, 0x6c, 0x32, 0x22, 0xd5, 0x74,
0xa8, 0xf9, 0x87, 0xb1, 0x7c, 0x6b, 0x06, 0x31, 0xd3, 0xfc, 0x47, 0xc2, 0xfb, 0x42, 0xe3, 0xda,
0xd7, 0x59, 0xe7, 0xc9, 0x93, 0xd5, 0x80, 0xae, 0x1f, 0xfe, 0x1b, 0x00, 0x00, 0xff, 0xff, 0x55,
0x72, 0xec, 0xc3, 0x3b, 0x0c, 0x00, 0x00,
}
type DRPCNodeGracefulExitClient interface {

View File

@ -5,6 +5,7 @@ syntax = "proto3";
option go_package = "pb";
import "gogo.proto";
import "google/protobuf/timestamp.proto";
import "metainfo.proto";
import "orders.proto";
@ -98,6 +99,12 @@ message DeletePiece {
message ExitCompleted {
// when everything is completed
bytes exit_complete_signature = 1;
// satellite who issued this exit completed
bytes satellite_id = 2 [(gogoproto.customtype) = "NodeID", (gogoproto.nullable) = false];
// storage node this exit completed was issued to
bytes node_id = 3 [(gogoproto.customtype) = "NodeID", (gogoproto.nullable) = false];
// timestamp when the exit completed
google.protobuf.Timestamp completed = 4 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
}
message ExitFailed {
@ -109,6 +116,12 @@ message ExitFailed {
// on failure
bytes exit_failure_signature = 1;
Reason reason = 2;
// satellite who issued this exit failed
bytes satellite_id = 3 [(gogoproto.customtype) = "NodeID", (gogoproto.nullable) = false];
// storage node this exit failed was issued to
bytes node_id = 4 [(gogoproto.customtype) = "NodeID", (gogoproto.nullable) = false];
// timestamp when the exit failed
google.protobuf.Timestamp failed = 5 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
}
message SatelliteMessage {

View File

@ -108,3 +108,25 @@ func EncodeSegmentID(ctx context.Context, segmentID *pb.SatSegmentID) (_ []byte,
segmentID.SatelliteSignature = signature
return out, err
}
// EncodeExitCompleted encodes ExitCompleted into bytes for signing.
func EncodeExitCompleted(ctx context.Context, exitCompleted *pb.ExitCompleted) (_ []byte, err error) {
defer mon.Task()(&ctx)(&err)
signature := exitCompleted.ExitCompleteSignature
exitCompleted.ExitCompleteSignature = nil
out, err := proto.Marshal(exitCompleted)
exitCompleted.ExitCompleteSignature = signature
return out, err
}
// EncodeExitFailed encodes ExitFailed into bytes for signing.
func EncodeExitFailed(ctx context.Context, exitFailed *pb.ExitFailed) (_ []byte, err error) {
defer mon.Task()(&ctx)(&err)
signature := exitFailed.ExitFailureSignature
exitFailed.ExitFailureSignature = nil
out, err := proto.Marshal(exitFailed)
exitFailed.ExitFailureSignature = signature
return out, err
}

View File

@ -145,3 +145,39 @@ func SignSegmentID(ctx context.Context, signer Signer, unsigned *pb.SatSegmentID
return &signed, nil
}
// SignExitCompleted signs the ExitCompleted using the specified signer
// Signer is a satellite
func SignExitCompleted(ctx context.Context, signer Signer, unsigned *pb.ExitCompleted) (_ *pb.ExitCompleted, err error) {
defer mon.Task()(&ctx)(&err)
bytes, err := EncodeExitCompleted(ctx, unsigned)
if err != nil {
return nil, Error.Wrap(err)
}
signed := *unsigned
signed.ExitCompleteSignature, err = signer.HashAndSign(ctx, bytes)
if err != nil {
return nil, Error.Wrap(err)
}
return &signed, nil
}
// SignExitFailed signs the ExitFailed using the specified signer
// Signer is a satellite
func SignExitFailed(ctx context.Context, signer Signer, unsigned *pb.ExitFailed) (_ *pb.ExitFailed, err error) {
defer mon.Task()(&ctx)(&err)
bytes, err := EncodeExitFailed(ctx, unsigned)
if err != nil {
return nil, Error.Wrap(err)
}
signed := *unsigned
signed.ExitFailureSignature, err = signer.HashAndSign(ctx, bytes)
if err != nil {
return nil, Error.Wrap(err)
}
return &signed, nil
}

View File

@ -6,12 +6,15 @@ package signing_test
import (
"encoding/hex"
"testing"
"time"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testidentity"
"storj.io/storj/internal/testrand"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/signing"
@ -206,3 +209,62 @@ func TestPieceHashVerification(t *testing.T) {
assert.Equal(t, unsignedBytes, encoded)
}
}
func TestSignExitCompleted(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
satIdentity, err := testidentity.NewTestIdentity(ctx)
nodeID := testrand.NodeID()
require.NoError(t, err)
finishedAt := time.Now().UTC()
signer := signing.SignerFromFullIdentity(satIdentity)
signee := signing.SigneeFromPeerIdentity(satIdentity.PeerIdentity())
unsigned := &pb.ExitCompleted{
SatelliteId: satIdentity.ID,
NodeId: nodeID,
Completed: finishedAt,
}
signed, err := signing.SignExitCompleted(ctx, signer, unsigned)
require.NoError(t, err)
err = signing.VerifyExitCompleted(ctx, signee, signed)
require.NoError(t, err)
signed.SatelliteId = testrand.NodeID()
err = signing.VerifyExitCompleted(ctx, signee, signed)
require.Error(t, err)
}
func TestSignExitFailed(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
satIdentity, err := testidentity.NewTestIdentity(ctx)
nodeID := testrand.NodeID()
require.NoError(t, err)
finishedAt := time.Now().UTC()
signer := signing.SignerFromFullIdentity(satIdentity)
signee := signing.SigneeFromPeerIdentity(satIdentity.PeerIdentity())
unsigned := &pb.ExitFailed{
SatelliteId: satIdentity.ID,
NodeId: nodeID,
Failed: finishedAt,
Reason: pb.ExitFailed_INACTIVE_TIMEFRAME_EXCEEDED,
}
signed, err := signing.SignExitFailed(ctx, signer, unsigned)
require.NoError(t, err)
err = signing.VerifyExitFailed(ctx, signee, signed)
require.NoError(t, err)
signed.Reason = pb.ExitFailed_OVERALL_FAILURE_PERCENTAGE_EXCEEDED
err = signing.VerifyExitFailed(ctx, signee, signed)
require.Error(t, err)
}

View File

@ -104,3 +104,25 @@ func VerifySegmentID(ctx context.Context, satellite Signee, signed *pb.SatSegmen
return satellite.HashAndVerifySignature(ctx, bytes, signed.SatelliteSignature)
}
// VerifyExitCompleted verifies that the signature inside ExitCompleted belongs to the satellite
func VerifyExitCompleted(ctx context.Context, satellite Signee, signed *pb.ExitCompleted) (err error) {
defer mon.Task()(&ctx)(&err)
bytes, err := EncodeExitCompleted(ctx, signed)
if err != nil {
return Error.Wrap(err)
}
return Error.Wrap(satellite.HashAndVerifySignature(ctx, bytes, signed.ExitCompleteSignature))
}
// VerifyExitFailed verifies that the signature inside ExitFailed belongs to the satellite
func VerifyExitFailed(ctx context.Context, satellite Signee, signed *pb.ExitFailed) (err error) {
defer mon.Task()(&ctx)(&err)
bytes, err := EncodeExitFailed(ctx, signed)
if err != nil {
return Error.Wrap(err)
}
return Error.Wrap(satellite.HashAndVerifySignature(ctx, bytes, signed.ExitFailureSignature))
}

View File

@ -1147,6 +1147,51 @@
"id": 1,
"name": "exit_complete_signature",
"type": "bytes"
},
{
"id": 2,
"name": "satellite_id",
"type": "bytes",
"options": [
{
"name": "(gogoproto.customtype)",
"value": "NodeID"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 3,
"name": "node_id",
"type": "bytes",
"options": [
{
"name": "(gogoproto.customtype)",
"value": "NodeID"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 4,
"name": "completed",
"type": "google.protobuf.Timestamp",
"options": [
{
"name": "(gogoproto.stdtime)",
"value": "true"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
}
]
},
@ -1162,6 +1207,51 @@
"id": 2,
"name": "reason",
"type": "Reason"
},
{
"id": 3,
"name": "satellite_id",
"type": "bytes",
"options": [
{
"name": "(gogoproto.customtype)",
"value": "NodeID"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 4,
"name": "node_id",
"type": "bytes",
"options": [
{
"name": "(gogoproto.customtype)",
"value": "NodeID"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
},
{
"id": 5,
"name": "failed",
"type": "google.protobuf.Timestamp",
"options": [
{
"name": "(gogoproto.stdtime)",
"value": "true"
},
{
"name": "(gogoproto.nullable)",
"value": "false"
}
]
}
]
},
@ -1234,6 +1324,9 @@
{
"path": "gogo.proto"
},
{
"path": "google/protobuf/timestamp.proto"
},
{
"path": "metainfo.proto"
},

View File

@ -441,7 +441,17 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metai
{ // setup graceful exit
log.Debug("Satellite API Process setting up graceful exit endpoint")
peer.GracefulExit.Endpoint = gracefulexit.NewEndpoint(peer.Log.Named("gracefulexit:endpoint"), peer.DB.GracefulExit(), peer.Overlay.DB, peer.Overlay.Service, peer.Metainfo.Service, peer.Orders.Service, peer.DB.PeerIdentities(), config.GracefulExit)
peer.GracefulExit.Endpoint = gracefulexit.NewEndpoint(
peer.Log.Named("gracefulexit:endpoint"),
signing.SignerFromFullIdentity(peer.Identity),
peer.DB.GracefulExit(),
peer.Overlay.DB,
peer.Overlay.Service,
peer.Metainfo.Service,
peer.Orders.Service,
peer.DB.PeerIdentities(),
config.GracefulExit)
pb.RegisterSatelliteGracefulExitServer(peer.Server.GRPC(), peer.GracefulExit.Endpoint)
pb.DRPCRegisterSatelliteGracefulExit(peer.Server.DRPC(), peer.GracefulExit.Endpoint.DRPC())
}

View File

@ -48,6 +48,7 @@ type processStream interface {
type Endpoint struct {
log *zap.Logger
interval time.Duration
signer signing.Signer
db DB
overlaydb overlay.DB
overlay *overlay.Service
@ -118,11 +119,12 @@ func (endpoint *Endpoint) DRPC() pb.DRPCSatelliteGracefulExitServer {
}
// NewEndpoint creates a new graceful exit endpoint.
func NewEndpoint(log *zap.Logger, db DB, overlaydb overlay.DB, overlay *overlay.Service, metainfo *metainfo.Service, orders *orders.Service,
func NewEndpoint(log *zap.Logger, signer signing.Signer, db DB, overlaydb overlay.DB, overlay *overlay.Service, metainfo *metainfo.Service, orders *orders.Service,
peerIdentities overlay.PeerIdentities, config Config) *Endpoint {
return &Endpoint{
log: log,
interval: time.Millisecond * buildQueueMillis,
signer: signer,
db: db,
overlaydb: overlaydb,
overlay: overlay,
@ -173,9 +175,13 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
}
if exitStatus.ExitFinishedAt != nil {
// TODO revisit this. Should check if signature was sent
completed := &pb.SatelliteMessage{Message: &pb.SatelliteMessage_ExitCompleted{ExitCompleted: &pb.ExitCompleted{}}}
err = stream.Send(completed)
// TODO maybe we should store the reason in the DB so we know how it originally failed.
finishedMsg, err := endpoint.getFinishedMessage(ctx, endpoint.signer, nodeID, *exitStatus.ExitFinishedAt, exitStatus.ExitSuccess, -1)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
err = stream.Send(finishedMsg)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, Error.Wrap(err).Error())
}
@ -211,6 +217,8 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
var morePiecesFlag int32 = 1
errChan := make(chan error, 1)
processChan := make(chan bool, 1)
handleError := func(err error) error {
errChan <- err
close(errChan)
@ -221,6 +229,10 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
group.Go(func() error {
ticker := time.NewTicker(endpoint.interval)
defer ticker.Stop()
defer func() {
processChan <- true
close(processChan)
}()
for range ticker.C {
if pending.length() == 0 {
@ -248,6 +260,7 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
return handleError(err)
}
}
processChan <- true
}
}
return nil
@ -262,6 +275,13 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
pendingCount := pending.length()
// wait if there are none pending
if pendingCount == 0 {
select {
case <-processChan:
}
}
// if there are no more transfers and the pending queue is empty, send complete
if atomic.LoadInt32(&morePiecesFlag) == 0 && pendingCount == 0 {
exitStatusRequest := &overlay.ExitStatusRequest{
@ -280,21 +300,15 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
if processed > 0 && float64(progress.PiecesFailed)/float64(processed)*100 >= float64(endpoint.config.OverallMaxFailuresPercentage) {
exitStatusRequest.ExitSuccess = false
// TODO needs signature
transferMsg = &pb.SatelliteMessage{
Message: &pb.SatelliteMessage_ExitFailed{
ExitFailed: &pb.ExitFailed{
Reason: pb.ExitFailed_OVERALL_FAILURE_PERCENTAGE_EXCEEDED,
},
},
transferMsg, err = endpoint.getFinishedMessage(ctx, endpoint.signer, nodeID, exitStatusRequest.ExitFinishedAt, exitStatusRequest.ExitSuccess, pb.ExitFailed_OVERALL_FAILURE_PERCENTAGE_EXCEEDED)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
} else {
exitStatusRequest.ExitSuccess = true
// TODO needs signature
transferMsg = &pb.SatelliteMessage{
Message: &pb.SatelliteMessage_ExitCompleted{
ExitCompleted: &pb.ExitCompleted{},
},
transferMsg, err = endpoint.getFinishedMessage(ctx, endpoint.signer, nodeID, exitStatusRequest.ExitFinishedAt, exitStatusRequest.ExitSuccess, -1)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
}
@ -315,10 +329,6 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
}
break
}
// skip if there are none pending
if pendingCount == 0 {
continue
}
request, err := stream.Recv()
if err != nil {
@ -327,23 +337,19 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
switch m := request.GetMessage().(type) {
case *pb.StorageNodeMessage_Succeeded:
err = endpoint.handleSucceeded(ctx, pending, nodeID, m)
err = endpoint.handleSucceeded(ctx, stream, pending, nodeID, m)
if err != nil {
if ErrInvalidArgument.Has(err) {
// immediately fail and complete graceful exit for nodes that fail satellite validation
// TODO(moby) use getFinishedMessage helper after 3368 is merged
exitStatusRequest := &overlay.ExitStatusRequest{
NodeID: nodeID,
ExitFinishedAt: time.Now().UTC(),
ExitSuccess: false,
}
// TODO needs signature
transferMsg := &pb.SatelliteMessage{
Message: &pb.SatelliteMessage_ExitFailed{
ExitFailed: &pb.ExitFailed{
Reason: pb.ExitFailed_VERIFICATION_FAILED,
},
},
finishedMsg, err := endpoint.getFinishedMessage(ctx, endpoint.signer, nodeID, exitStatusRequest.ExitFinishedAt, exitStatusRequest.ExitSuccess, pb.ExitFailed_VERIFICATION_FAILED)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
_, err = endpoint.overlaydb.UpdateExitStatus(ctx, exitStatusRequest)
@ -351,7 +357,7 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
err = stream.Send(transferMsg)
err = stream.Send(finishedMsg)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, Error.Wrap(err).Error())
}
@ -366,17 +372,6 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
}
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
deleteMsg := &pb.SatelliteMessage{
Message: &pb.SatelliteMessage_DeletePiece{
DeletePiece: &pb.DeletePiece{
OriginalPieceId: m.Succeeded.OriginalPieceId,
},
},
}
err = stream.Send(deleteMsg)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, Error.Wrap(err).Error())
}
case *pb.StorageNodeMessage_Failed:
err = endpoint.handleFailed(ctx, pending, nodeID, m)
if err != nil {
@ -502,7 +497,7 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream processS
return nil
}
func (endpoint *Endpoint) handleSucceeded(ctx context.Context, pending *pendingMap, exitingNodeID storj.NodeID, message *pb.StorageNodeMessage_Succeeded) (err error) {
func (endpoint *Endpoint) handleSucceeded(ctx context.Context, stream processStream, pending *pendingMap, exitingNodeID storj.NodeID, message *pb.StorageNodeMessage_Succeeded) (err error) {
defer mon.Task()(&ctx)(&err)
originalPieceID := message.Succeeded.OriginalPieceId
@ -610,6 +605,18 @@ func (endpoint *Endpoint) handleSucceeded(ctx context.Context, pending *pendingM
pending.delete(originalPieceID)
deleteMsg := &pb.SatelliteMessage{
Message: &pb.SatelliteMessage_DeletePiece{
DeletePiece: &pb.DeletePiece{
OriginalPieceId: originalPieceID,
},
},
}
err = stream.Send(deleteMsg)
if err != nil {
return Error.Wrap(err)
}
return nil
}
@ -658,6 +665,41 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *pendingMap,
return nil
}
func (endpoint *Endpoint) getFinishedMessage(ctx context.Context, signer signing.Signer, nodeID storj.NodeID, finishedAt time.Time, success bool, reason pb.ExitFailed_Reason) (message *pb.SatelliteMessage, err error) {
if success {
unsigned := &pb.ExitCompleted{
SatelliteId: endpoint.signer.ID(),
NodeId: nodeID,
Completed: finishedAt,
}
signed, err := signing.SignExitCompleted(ctx, endpoint.signer, unsigned)
if err != nil {
return nil, Error.Wrap(err)
}
message = &pb.SatelliteMessage{Message: &pb.SatelliteMessage_ExitCompleted{
ExitCompleted: signed,
}}
} else {
unsigned := &pb.ExitFailed{
SatelliteId: endpoint.signer.ID(),
NodeId: nodeID,
Failed: finishedAt,
}
if reason >= 0 {
unsigned.Reason = reason
}
signed, err := signing.SignExitFailed(ctx, endpoint.signer, unsigned)
if err != nil {
return nil, Error.Wrap(err)
}
message = &pb.SatelliteMessage{Message: &pb.SatelliteMessage_ExitFailed{
ExitFailed: signed,
}}
}
return message, nil
}
func (endpoint *Endpoint) updatePointer(ctx context.Context, exitingNodeID storj.NodeID, receivingNodeID storj.NodeID, path []byte, pieceNum int32) (err error) {
defer mon.Task()(&ctx)(&err)

View File

@ -677,7 +677,16 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo
log.Debug("Setting up graceful")
peer.GracefulExit.Chore = gracefulexit.NewChore(peer.Log.Named("graceful exit chore"), peer.DB.GracefulExit(), peer.Overlay.DB, peer.Metainfo.Loop, config.GracefulExit)
peer.GracefulExit.Endpoint = gracefulexit.NewEndpoint(peer.Log.Named("gracefulexit:endpoint"), peer.DB.GracefulExit(), peer.Overlay.DB, peer.Overlay.Service, peer.Metainfo.Service, peer.Orders.Service, peer.DB.PeerIdentities(), config.GracefulExit)
peer.GracefulExit.Endpoint = gracefulexit.NewEndpoint(
peer.Log.Named("gracefulexit:endpoint"),
signing.SignerFromFullIdentity(peer.Identity),
peer.DB.GracefulExit(),
peer.Overlay.DB,
peer.Overlay.Service,
peer.Metainfo.Service,
peer.Orders.Service,
peer.DB.PeerIdentities(),
config.GracefulExit)
pb.RegisterSatelliteGracefulExitServer(peer.Server.GRPC(), peer.GracefulExit.Endpoint)
pb.DRPCRegisterSatelliteGracefulExit(peer.Server.DRPC(), peer.GracefulExit.Endpoint.DRPC())