From a1275746b4b819161394d41137f0b978cfc579f1 Mon Sep 17 00:00:00 2001 From: Ethan Adams Date: Fri, 11 Oct 2019 17:18:05 -0400 Subject: [PATCH] satellite/gracefulexit: Implement the 'process' endpoint on the satellite (#3223) --- internal/testplanet/satellite.go | 3 + pkg/pb/gracefulexit.pb.go | 1 + pkg/pb/gracefulexit.proto | 9 +- pkg/pb/orders.pb.go | 142 +++--- pkg/pb/orders.proto | 1 + proto.lock | 19 + satellite/gracefulexit/chore.go | 11 +- satellite/gracefulexit/common.go | 27 ++ satellite/gracefulexit/db.go | 16 +- satellite/gracefulexit/db_test.go | 7 +- satellite/gracefulexit/endpoint.go | 468 +++++++++++++++++++ satellite/gracefulexit/endpoint_test.go | 260 +++++++++++ satellite/gracefulexit/pathcollector.go | 10 +- satellite/orders/service.go | 66 +++ satellite/overlay/service.go | 11 + satellite/overlay/service_test.go | 3 +- satellite/peer.go | 10 +- satellite/satellitedb/dbx/satellitedb.dbx | 6 - satellite/satellitedb/dbx/satellitedb.dbx.go | 88 ---- satellite/satellitedb/gracefulexit.go | 114 ++++- satellite/satellitedb/locked.go | 22 +- satellite/satellitedb/overlaycache.go | 23 + scripts/testdata/satellite-config.yaml.lock | 6 + 23 files changed, 1109 insertions(+), 214 deletions(-) create mode 100644 satellite/gracefulexit/common.go create mode 100644 satellite/gracefulexit/endpoint.go create mode 100644 satellite/gracefulexit/endpoint_test.go diff --git a/internal/testplanet/satellite.go b/internal/testplanet/satellite.go index 51a53da4a..1358e4b4a 100644 --- a/internal/testplanet/satellite.go +++ b/internal/testplanet/satellite.go @@ -208,6 +208,9 @@ func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) { GracefulExit: gracefulexit.Config{ ChoreBatchSize: 10, ChoreInterval: defaultInterval, + + EndpointBatchSize: 100, + EndpointMaxFailures: 5, }, } if planet.config.Reconfigure.Satellite != nil { diff --git a/pkg/pb/gracefulexit.pb.go b/pkg/pb/gracefulexit.pb.go index 035ee18e6..ad581a73a 100644 --- a/pkg/pb/gracefulexit.pb.go +++ b/pkg/pb/gracefulexit.pb.go @@ -309,6 +309,7 @@ type TransferSucceeded struct { AddressedOrderLimit *AddressedOrderLimit `protobuf:"bytes,1,opt,name=addressed_order_limit,json=addressedOrderLimit,proto3" json:"addressed_order_limit,omitempty"` OriginalPieceHash *PieceHash `protobuf:"bytes,2,opt,name=original_piece_hash,json=originalPieceHash,proto3" json:"original_piece_hash,omitempty"` ReplacementPieceHash *PieceHash `protobuf:"bytes,3,opt,name=replacement_piece_hash,json=replacementPieceHash,proto3" json:"replacement_piece_hash,omitempty"` + PieceId PieceID `protobuf:"bytes,4,opt,name=piece_id,json=pieceId,proto3,customtype=PieceID" json:"piece_id"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` diff --git a/pkg/pb/gracefulexit.proto b/pkg/pb/gracefulexit.proto index dd444e614..dfc197963 100644 --- a/pkg/pb/gracefulexit.proto +++ b/pkg/pb/gracefulexit.proto @@ -55,10 +55,11 @@ message TransferSucceeded { metainfo.AddressedOrderLimit addressed_order_limit = 1; orders.PieceHash original_piece_hash = 2; orders.PieceHash replacement_piece_hash = 3; + bytes piece_id = 4 [(gogoproto.customtype) = "PieceID", (gogoproto.nullable) = false]; // the original piece ID } message TransferFailed { - bytes piece_id = 1 [(gogoproto.customtype) = "PieceID", (gogoproto.nullable) = false]; + bytes piece_id = 1 [(gogoproto.customtype) = "PieceID", (gogoproto.nullable) = false]; // the original piece ID enum Error { NOT_FOUND = 0; STORAGE_NODE_UNAVAILABLE = 1; @@ -77,14 +78,15 @@ message StorageNodeMessage { message NotReady {} message TransferPiece { - bytes piece_id = 1 [(gogoproto.customtype) = "PieceID", (gogoproto.nullable) = false]; // the current piece-id + bytes piece_id = 1 [(gogoproto.customtype) = "PieceID", (gogoproto.nullable) = false]; // the original piece ID bytes private_key = 2 [(gogoproto.customtype) = "PiecePrivateKey", (gogoproto.nullable) = false]; + // addressed_order_limit contains the new piece id. metainfo.AddressedOrderLimit addressed_order_limit =3; } message DeletePiece { - bytes piece_id = 1 [(gogoproto.customtype) = "PieceID", (gogoproto.nullable) = false]; + bytes piece_id = 1 [(gogoproto.customtype) = "PieceID", (gogoproto.nullable) = false]; // the original piece ID } message ExitCompleted { @@ -92,7 +94,6 @@ message ExitCompleted { bytes exit_complete_signature = 1; } - message ExitFailed { enum Reason { VERIFICATION_FAILED = 0; diff --git a/pkg/pb/orders.pb.go b/pkg/pb/orders.pb.go index e167e349e..46f872d82 100644 --- a/pkg/pb/orders.pb.go +++ b/pkg/pb/orders.pb.go @@ -31,13 +31,14 @@ const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package type PieceAction int32 const ( - PieceAction_INVALID PieceAction = 0 - PieceAction_PUT PieceAction = 1 - PieceAction_GET PieceAction = 2 - PieceAction_GET_AUDIT PieceAction = 3 - PieceAction_GET_REPAIR PieceAction = 4 - PieceAction_PUT_REPAIR PieceAction = 5 - PieceAction_DELETE PieceAction = 6 + PieceAction_INVALID PieceAction = 0 + PieceAction_PUT PieceAction = 1 + PieceAction_GET PieceAction = 2 + PieceAction_GET_AUDIT PieceAction = 3 + PieceAction_GET_REPAIR PieceAction = 4 + PieceAction_PUT_REPAIR PieceAction = 5 + PieceAction_DELETE PieceAction = 6 + PieceAction_PUT_GRACEFUL_EXIT PieceAction = 7 ) var PieceAction_name = map[int32]string{ @@ -48,16 +49,18 @@ var PieceAction_name = map[int32]string{ 4: "GET_REPAIR", 5: "PUT_REPAIR", 6: "DELETE", + 7: "PUT_GRACEFUL_EXIT", } var PieceAction_value = map[string]int32{ - "INVALID": 0, - "PUT": 1, - "GET": 2, - "GET_AUDIT": 3, - "GET_REPAIR": 4, - "PUT_REPAIR": 5, - "DELETE": 6, + "INVALID": 0, + "PUT": 1, + "GET": 2, + "GET_AUDIT": 3, + "GET_REPAIR": 4, + "PUT_REPAIR": 5, + "DELETE": 6, + "PUT_GRACEFUL_EXIT": 7, } func (x PieceAction) String() string { @@ -659,61 +662,62 @@ func init() { func init() { proto.RegisterFile("orders.proto", fileDescriptor_e0f5d4cf0fc9e41b) } var fileDescriptor_e0f5d4cf0fc9e41b = []byte{ - // 859 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xdc, 0x55, 0xcf, 0x6f, 0xe3, 0x44, - 0x14, 0xee, 0x34, 0x89, 0xd3, 0xbc, 0x38, 0xa9, 0x33, 0x5b, 0xad, 0x42, 0x04, 0x6a, 0x09, 0x97, - 0xb0, 0x48, 0x29, 0x1b, 0x24, 0xa4, 0x95, 0x50, 0xa5, 0xa4, 0xb1, 0x8a, 0x69, 0xd5, 0x8d, 0x26, - 0x29, 0x07, 0x2e, 0x91, 0x13, 0x0f, 0xae, 0xb5, 0x8e, 0x6d, 0x3c, 0x63, 0x89, 0xdd, 0x2b, 0xe2, - 0xc6, 0x81, 0x7f, 0x88, 0x3b, 0x07, 0x24, 0xee, 0x1c, 0x96, 0xff, 0x83, 0x13, 0x9a, 0xe7, 0x5f, - 0x29, 0x74, 0x05, 0xed, 0x2e, 0x12, 0x70, 0xf3, 0x9b, 0xf7, 0xbe, 0xf7, 0xe6, 0xcd, 0xfb, 0xbe, - 0x67, 0xd0, 0xc3, 0xd8, 0xe1, 0xb1, 0x18, 0x46, 0x71, 0x28, 0x43, 0xaa, 0xa5, 0x56, 0x0f, 0xdc, - 0xd0, 0x0d, 0xd3, 0xb3, 0xde, 0xa1, 0x1b, 0x86, 0xae, 0xcf, 0x8f, 0xd1, 0x5a, 0x25, 0x5f, 0x1e, - 0x4b, 0x6f, 0xc3, 0x85, 0xb4, 0x37, 0x51, 0x16, 0x00, 0x41, 0xe8, 0xf0, 0xf4, 0xbb, 0xff, 0x8d, - 0x06, 0xf0, 0x54, 0xe5, 0xb8, 0xf0, 0x36, 0x9e, 0xa4, 0x4f, 0xa0, 0x25, 0x78, 0xec, 0xd9, 0xfe, - 0x32, 0x48, 0x36, 0x2b, 0x1e, 0x77, 0xc9, 0x11, 0x19, 0xe8, 0x93, 0x83, 0x1f, 0x5f, 0x1e, 0xee, - 0xfc, 0xf2, 0xf2, 0x50, 0x9f, 0xa3, 0xf3, 0x12, 0x7d, 0x4c, 0x17, 0x5b, 0x16, 0x7d, 0x0c, 0xba, - 0xb0, 0x25, 0xf7, 0x7d, 0x4f, 0xf2, 0xa5, 0xe7, 0x74, 0x77, 0x11, 0xd9, 0xce, 0x90, 0xda, 0x65, - 0xe8, 0x70, 0x6b, 0xca, 0x9a, 0x45, 0x8c, 0xe5, 0xd0, 0x4f, 0xe0, 0xc0, 0xe1, 0x51, 0xcc, 0xd7, - 0xb6, 0xe4, 0xce, 0x32, 0x89, 0x7c, 0x2f, 0x78, 0xa6, 0xa0, 0x15, 0x84, 0xc2, 0x16, 0x8c, 0x96, - 0x71, 0x57, 0x18, 0x66, 0x39, 0x74, 0x02, 0x9d, 0x0c, 0x12, 0x25, 0x2b, 0xdf, 0x5b, 0x2f, 0x9f, - 0xf1, 0xe7, 0xdd, 0x16, 0x42, 0x1f, 0x66, 0x55, 0xdb, 0x33, 0x8f, 0xaf, 0xf9, 0x0c, 0xdd, 0xe7, - 0xfc, 0x39, 0xdb, 0x4f, 0x01, 0xc5, 0x01, 0xfd, 0x18, 0xf6, 0x85, 0x0c, 0x63, 0xdb, 0xe5, 0x4b, - 0xf5, 0x28, 0xaa, 0x78, 0xf5, 0xd6, 0x7b, 0xb7, 0xb2, 0x30, 0x34, 0x1d, 0xfa, 0x08, 0xf6, 0x22, - 0x95, 0x5a, 0x01, 0x6a, 0x08, 0xd8, 0xcf, 0x00, 0x75, 0x2c, 0x69, 0x4d, 0x59, 0x1d, 0x03, 0x2c, - 0x87, 0x1e, 0x40, 0xcd, 0x57, 0x8f, 0xdb, 0xd5, 0x8e, 0xc8, 0xa0, 0xc2, 0x52, 0x83, 0x7e, 0x00, - 0x9a, 0xbd, 0x96, 0x5e, 0x18, 0x74, 0xeb, 0x47, 0x64, 0xd0, 0x1e, 0x3d, 0x18, 0x66, 0x83, 0x45, - 0xfc, 0x18, 0x5d, 0x2c, 0x0b, 0xa1, 0x4f, 0xc1, 0x48, 0xcb, 0xf1, 0xaf, 0x23, 0x2f, 0xb6, 0x11, - 0xb6, 0x77, 0x44, 0x06, 0xcd, 0x51, 0x6f, 0x98, 0x4e, 0x7b, 0x98, 0x4f, 0x7b, 0xb8, 0xc8, 0xa7, - 0x3d, 0xd9, 0x53, 0x57, 0xfa, 0xfe, 0xd7, 0x43, 0xc2, 0xf6, 0x11, 0x6d, 0x16, 0x60, 0x95, 0x10, - 0xcb, 0x6d, 0x27, 0x6c, 0xdc, 0x25, 0x21, 0xa2, 0xb7, 0x12, 0x9e, 0x43, 0x3b, 0x4d, 0xb8, 0x8e, - 0x79, 0x9a, 0x4e, 0xbf, 0x43, 0xba, 0x16, 0x62, 0x4f, 0x33, 0x28, 0x3d, 0x86, 0x07, 0x25, 0x95, - 0x84, 0xe7, 0x06, 0xb6, 0x4c, 0x62, 0xde, 0x05, 0xf5, 0xd0, 0x8c, 0x16, 0xae, 0x79, 0xee, 0xa1, - 0x27, 0xd0, 0x29, 0x01, 0xb6, 0xe3, 0xc4, 0x5c, 0x88, 0x6e, 0x13, 0x2f, 0xd0, 0x19, 0x22, 0xdb, - 0xd5, 0xdc, 0xc6, 0xa9, 0x83, 0x19, 0x45, 0x6c, 0x76, 0xd2, 0xff, 0xad, 0x06, 0x9d, 0x52, 0x05, - 0x2a, 0xaf, 0x17, 0xb8, 0xff, 0x29, 0x31, 0x9c, 0xbc, 0x5a, 0x0c, 0xf4, 0x7f, 0x24, 0x84, 0xf3, - 0x7b, 0x09, 0xa1, 0x7a, 0xbb, 0x08, 0xce, 0xef, 0x25, 0x82, 0xea, 0xed, 0x02, 0x38, 0xbb, 0x87, - 0x00, 0xaa, 0xff, 0x0a, 0xf2, 0x7f, 0x4b, 0xa0, 0x86, 0xe4, 0x7f, 0x1d, 0xc2, 0x3f, 0x04, 0xcd, - 0xde, 0x84, 0x49, 0x20, 0x91, 0xea, 0x15, 0x96, 0x59, 0xf4, 0x7d, 0x30, 0x32, 0x5e, 0x96, 0xad, - 0x20, 0xa3, 0x73, 0x0a, 0x16, 0x7d, 0xf4, 0xbf, 0x23, 0xa0, 0xe3, 0x3d, 0xde, 0x80, 0xfe, 0xde, - 0xc0, 0x75, 0x7e, 0x22, 0xd0, 0x40, 0x0a, 0x7e, 0x6a, 0x8b, 0xeb, 0x1b, 0x3c, 0x27, 0x7f, 0xc1, - 0x73, 0x0a, 0xd5, 0x6b, 0x5b, 0x5c, 0xa7, 0xa2, 0x67, 0xf8, 0x4d, 0xdf, 0x01, 0x48, 0xf1, 0xc2, - 0x7b, 0xc1, 0x51, 0x5a, 0x15, 0xd6, 0xc0, 0x93, 0xb9, 0xf7, 0x82, 0xd3, 0x09, 0x34, 0x8a, 0xbf, - 0x34, 0xea, 0xe8, 0xef, 0x6e, 0xce, 0x12, 0x46, 0xdf, 0x86, 0xc6, 0x1f, 0x9b, 0x2a, 0x0f, 0xfa, - 0x3f, 0x13, 0x30, 0x8a, 0x76, 0xf2, 0x17, 0xfe, 0x87, 0xbb, 0x3a, 0xb9, 0x5b, 0x57, 0xd5, 0xbb, - 0x75, 0xb4, 0x82, 0xce, 0x9c, 0x4b, 0xe9, 0xf3, 0x0d, 0x0f, 0x24, 0xe3, 0x5f, 0x25, 0x5c, 0x48, - 0x3a, 0xc8, 0x77, 0x0c, 0xc1, 0x72, 0x34, 0x5f, 0x26, 0xe5, 0x76, 0xcf, 0xf7, 0xce, 0x7b, 0x50, - 0x43, 0x1f, 0x36, 0xd4, 0x1c, 0xb5, 0x6e, 0x44, 0xb2, 0xd4, 0xd7, 0xff, 0x81, 0x00, 0xdd, 0x2e, - 0x22, 0xa2, 0x30, 0x10, 0xfc, 0x75, 0x98, 0xf9, 0x04, 0x34, 0x21, 0x6d, 0x99, 0x08, 0xac, 0xdb, - 0x1e, 0xbd, 0x9b, 0xd7, 0xfd, 0x73, 0x99, 0xe1, 0x1c, 0x03, 0x59, 0x06, 0xe8, 0x3f, 0x06, 0x2d, - 0x3d, 0xa1, 0x4d, 0xa8, 0x5b, 0x97, 0x9f, 0x8f, 0x2f, 0xac, 0xa9, 0xb1, 0x43, 0x75, 0xd8, 0x1b, - 0x9f, 0x9e, 0x9a, 0xb3, 0x85, 0x39, 0x35, 0x88, 0xb2, 0x98, 0xf9, 0x99, 0x79, 0xaa, 0xac, 0xdd, - 0x47, 0x2e, 0x34, 0xb7, 0xd6, 0xe8, 0x4d, 0x5c, 0x1d, 0x2a, 0xb3, 0xab, 0x85, 0x41, 0xd4, 0xc7, - 0x99, 0xb9, 0x30, 0x76, 0x69, 0x0b, 0x1a, 0x67, 0xe6, 0x62, 0x39, 0xbe, 0x9a, 0x5a, 0x0b, 0xa3, - 0x42, 0xdb, 0x00, 0xca, 0x64, 0xe6, 0x6c, 0x6c, 0x31, 0xa3, 0xaa, 0xec, 0xd9, 0x55, 0x61, 0xd7, - 0x28, 0x80, 0x36, 0x35, 0x2f, 0xcc, 0x85, 0x69, 0x68, 0xa3, 0x39, 0x68, 0xf8, 0x70, 0x82, 0x5a, - 0x00, 0x65, 0x2b, 0xf4, 0xad, 0xdb, 0xda, 0xc3, 0x51, 0xf5, 0x7a, 0xaf, 0xee, 0xbc, 0xbf, 0x33, - 0x20, 0x1f, 0x92, 0x49, 0xf5, 0x8b, 0xdd, 0x68, 0xb5, 0xd2, 0x90, 0x2a, 0x1f, 0xfd, 0x1e, 0x00, - 0x00, 0xff, 0xff, 0x87, 0x49, 0x03, 0x47, 0xfa, 0x0a, 0x00, 0x00, + // 879 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xdc, 0x55, 0x4b, 0x6f, 0x23, 0x45, + 0x10, 0x4e, 0xc7, 0xf6, 0x38, 0x2e, 0x3f, 0x32, 0xee, 0x0d, 0x2b, 0x63, 0x81, 0x12, 0xcc, 0xc5, + 0x2c, 0x92, 0xc3, 0x1a, 0x09, 0x69, 0x25, 0x14, 0xc9, 0x8f, 0x21, 0x0c, 0x89, 0xb2, 0x56, 0xdb, + 0x46, 0x88, 0x8b, 0x35, 0xf6, 0x34, 0xce, 0x68, 0xc7, 0x33, 0xc3, 0x74, 0x8f, 0xc4, 0xee, 0x81, + 0x0b, 0xe2, 0xc6, 0x81, 0x3f, 0xc4, 0x9d, 0x03, 0x12, 0x77, 0x0e, 0xcb, 0xff, 0xe0, 0x84, 0xba, + 0xe6, 0xe5, 0x40, 0x56, 0xe0, 0xec, 0x22, 0x01, 0xb7, 0xa9, 0xae, 0xfa, 0xaa, 0xba, 0xba, 0xbe, + 0xaf, 0x06, 0x6a, 0x7e, 0x68, 0xf3, 0x50, 0xf4, 0x82, 0xd0, 0x97, 0x3e, 0xd5, 0x62, 0xab, 0x0d, + 0x6b, 0x7f, 0xed, 0xc7, 0x67, 0xed, 0xe3, 0xb5, 0xef, 0xaf, 0x5d, 0x7e, 0x8a, 0xd6, 0x32, 0xfa, + 0xe2, 0x54, 0x3a, 0x1b, 0x2e, 0xa4, 0xb5, 0x09, 0x92, 0x00, 0xf0, 0x7c, 0x9b, 0xc7, 0xdf, 0x9d, + 0x6f, 0x34, 0x80, 0xc7, 0x2a, 0xc7, 0xa5, 0xb3, 0x71, 0x24, 0x7d, 0x04, 0x75, 0xc1, 0x43, 0xc7, + 0x72, 0x17, 0x5e, 0xb4, 0x59, 0xf2, 0xb0, 0x45, 0x4e, 0x48, 0xb7, 0x36, 0x3c, 0xfa, 0xf1, 0xf9, + 0xf1, 0xde, 0x2f, 0xcf, 0x8f, 0x6b, 0x53, 0x74, 0x5e, 0xa1, 0x8f, 0xd5, 0xc4, 0x96, 0x45, 0x1f, + 0x42, 0x4d, 0x58, 0x92, 0xbb, 0xae, 0x23, 0xf9, 0xc2, 0xb1, 0x5b, 0xfb, 0x88, 0x6c, 0x24, 0x48, + 0xed, 0xca, 0xb7, 0xb9, 0x39, 0x66, 0xd5, 0x2c, 0xc6, 0xb4, 0xe9, 0x87, 0x70, 0x64, 0xf3, 0x20, + 0xe4, 0x2b, 0x4b, 0x72, 0x7b, 0x11, 0x05, 0xae, 0xe3, 0x3d, 0x51, 0xd0, 0x02, 0x42, 0x61, 0x0b, + 0x46, 0xf3, 0xb8, 0x39, 0x86, 0x99, 0x36, 0x1d, 0x42, 0x33, 0x81, 0x04, 0xd1, 0xd2, 0x75, 0x56, + 0x8b, 0x27, 0xfc, 0x69, 0xab, 0x8e, 0xd0, 0xfb, 0x49, 0xd5, 0xc6, 0xc4, 0xe1, 0x2b, 0x3e, 0x41, + 0xf7, 0x05, 0x7f, 0xca, 0x0e, 0x63, 0x40, 0x76, 0x40, 0x3f, 0x80, 0x43, 0x21, 0xfd, 0xd0, 0x5a, + 0xf3, 0x85, 0x7a, 0x14, 0x55, 0xbc, 0x78, 0xeb, 0xbd, 0xeb, 0x49, 0x18, 0x9a, 0x36, 0x7d, 0x00, + 0x07, 0x81, 0x4a, 0xad, 0x00, 0x25, 0x04, 0x1c, 0x26, 0x80, 0x32, 0x96, 0x34, 0xc7, 0xac, 0x8c, + 0x01, 0xa6, 0x4d, 0x8f, 0xa0, 0xe4, 0xaa, 0xc7, 0x6d, 0x69, 0x27, 0xa4, 0x5b, 0x60, 0xb1, 0x41, + 0xdf, 0x05, 0xcd, 0x5a, 0x49, 0xc7, 0xf7, 0x5a, 0xe5, 0x13, 0xd2, 0x6d, 0xf4, 0xef, 0xf5, 0x92, + 0xc1, 0x22, 0x7e, 0x80, 0x2e, 0x96, 0x84, 0xd0, 0xc7, 0xa0, 0xc7, 0xe5, 0xf8, 0x57, 0x81, 0x13, + 0x5a, 0x08, 0x3b, 0x38, 0x21, 0xdd, 0x6a, 0xbf, 0xdd, 0x8b, 0xa7, 0xdd, 0x4b, 0xa7, 0xdd, 0x9b, + 0xa5, 0xd3, 0x1e, 0x1e, 0xa8, 0x2b, 0x7d, 0xff, 0xeb, 0x31, 0x61, 0x87, 0x88, 0x36, 0x32, 0xb0, + 0x4a, 0x88, 0xe5, 0xb6, 0x13, 0x56, 0x76, 0x49, 0x88, 0xe8, 0xad, 0x84, 0x17, 0xd0, 0x88, 0x13, + 0xae, 0x42, 0x1e, 0xa7, 0xab, 0xed, 0x90, 0xae, 0x8e, 0xd8, 0x51, 0x02, 0xa5, 0xa7, 0x70, 0x2f, + 0xa7, 0x92, 0x70, 0xd6, 0x9e, 0x25, 0xa3, 0x90, 0xb7, 0x40, 0x3d, 0x34, 0xa3, 0x99, 0x6b, 0x9a, + 0x7a, 0xe8, 0x19, 0x34, 0x73, 0x80, 0x65, 0xdb, 0x21, 0x17, 0xa2, 0x55, 0xc5, 0x0b, 0x34, 0x7b, + 0xc8, 0x76, 0x35, 0xb7, 0x41, 0xec, 0x60, 0x7a, 0x16, 0x9b, 0x9c, 0x74, 0x7e, 0x2b, 0x41, 0x33, + 0x57, 0x81, 0xca, 0xeb, 0x78, 0xeb, 0xff, 0x94, 0x18, 0xce, 0x5e, 0x2c, 0x06, 0xfa, 0x3f, 0x12, + 0xc2, 0xc5, 0x9d, 0x84, 0x50, 0xbc, 0x5d, 0x04, 0x17, 0x77, 0x12, 0x41, 0xf1, 0x76, 0x01, 0x9c, + 0xdf, 0x41, 0x00, 0xc5, 0x7f, 0x05, 0xf9, 0xbf, 0x25, 0x50, 0x42, 0xf2, 0xbf, 0x0c, 0xe1, 0xef, + 0x83, 0x66, 0x6d, 0xfc, 0xc8, 0x93, 0x48, 0xf5, 0x02, 0x4b, 0x2c, 0xfa, 0x0e, 0xe8, 0x09, 0x2f, + 0xf3, 0x56, 0x90, 0xd1, 0x29, 0x05, 0xb3, 0x3e, 0x3a, 0xdf, 0x11, 0xa8, 0xe1, 0x3d, 0x5e, 0x81, + 0xfe, 0x5e, 0xc1, 0x75, 0x7e, 0x22, 0x50, 0x41, 0x0a, 0x7e, 0x6c, 0x89, 0xeb, 0x1b, 0x3c, 0x27, + 0x7f, 0xc1, 0x73, 0x0a, 0xc5, 0x6b, 0x4b, 0x5c, 0xc7, 0xa2, 0x67, 0xf8, 0x4d, 0xdf, 0x04, 0x88, + 0xf1, 0xc2, 0x79, 0xc6, 0x51, 0x5a, 0x05, 0x56, 0xc1, 0x93, 0xa9, 0xf3, 0x8c, 0xd3, 0x21, 0x54, + 0xb2, 0xbf, 0x34, 0xea, 0xe8, 0xef, 0x6e, 0xce, 0x1c, 0x46, 0xdf, 0x80, 0xca, 0x1f, 0x9b, 0xca, + 0x0f, 0x3a, 0x3f, 0x13, 0xd0, 0xb3, 0x76, 0xd2, 0x17, 0xfe, 0x87, 0xbb, 0x3a, 0xdb, 0xad, 0xab, + 0xe2, 0x6e, 0x1d, 0x2d, 0xa1, 0x39, 0xe5, 0x52, 0xba, 0x7c, 0xc3, 0x3d, 0xc9, 0xf8, 0x97, 0x11, + 0x17, 0x92, 0x76, 0xd3, 0x1d, 0x43, 0xb0, 0x1c, 0x4d, 0x97, 0x49, 0xbe, 0xdd, 0xd3, 0xbd, 0xf3, + 0x36, 0x94, 0xd0, 0x87, 0x0d, 0x55, 0xfb, 0xf5, 0x1b, 0x91, 0x2c, 0xf6, 0x75, 0x7e, 0x20, 0x40, + 0xb7, 0x8b, 0x88, 0xc0, 0xf7, 0x04, 0x7f, 0x19, 0x66, 0x3e, 0x02, 0x4d, 0x48, 0x4b, 0x46, 0x02, + 0xeb, 0x36, 0xfa, 0x6f, 0xa5, 0x75, 0xff, 0x5c, 0xa6, 0x37, 0xc5, 0x40, 0x96, 0x00, 0x3a, 0x0f, + 0x41, 0x8b, 0x4f, 0x68, 0x15, 0xca, 0xe6, 0xd5, 0xa7, 0x83, 0x4b, 0x73, 0xac, 0xef, 0xd1, 0x1a, + 0x1c, 0x0c, 0x46, 0x23, 0x63, 0x32, 0x33, 0xc6, 0x3a, 0x51, 0x16, 0x33, 0x3e, 0x31, 0x46, 0xca, + 0xda, 0x7f, 0xf0, 0x35, 0x54, 0xb7, 0xd6, 0xe8, 0x4d, 0x5c, 0x19, 0x0a, 0x93, 0xf9, 0x4c, 0x27, + 0xea, 0xe3, 0xdc, 0x98, 0xe9, 0xfb, 0xb4, 0x0e, 0x95, 0x73, 0x63, 0xb6, 0x18, 0xcc, 0xc7, 0xe6, + 0x4c, 0x2f, 0xd0, 0x06, 0x80, 0x32, 0x99, 0x31, 0x19, 0x98, 0x4c, 0x2f, 0x2a, 0x7b, 0x32, 0xcf, + 0xec, 0x12, 0x05, 0xd0, 0xc6, 0xc6, 0xa5, 0x31, 0x33, 0x74, 0x8d, 0xbe, 0x06, 0x4d, 0xe5, 0x3b, + 0x67, 0x83, 0x91, 0xf1, 0xd1, 0xfc, 0x72, 0x61, 0x7c, 0x66, 0xce, 0xf4, 0x72, 0x7f, 0x0a, 0x1a, + 0xbe, 0xa7, 0xa0, 0x26, 0x40, 0xde, 0x21, 0x7d, 0xfd, 0xb6, 0xae, 0x71, 0x82, 0xed, 0xf6, 0x8b, + 0x1f, 0xa4, 0xb3, 0xd7, 0x25, 0xef, 0x91, 0x61, 0xf1, 0xf3, 0xfd, 0x60, 0xb9, 0xd4, 0x90, 0x41, + 0xef, 0xff, 0x1e, 0x00, 0x00, 0xff, 0xff, 0xc5, 0x1d, 0x6c, 0x8e, 0x11, 0x0b, 0x00, 0x00, } type DRPCOrdersClient interface { diff --git a/pkg/pb/orders.proto b/pkg/pb/orders.proto index a3532d961..9bb76f447 100644 --- a/pkg/pb/orders.proto +++ b/pkg/pb/orders.proto @@ -19,6 +19,7 @@ enum PieceAction { GET_REPAIR = 4; PUT_REPAIR = 5; DELETE = 6; + PUT_GRACEFUL_EXIT = 7; } // OrderLimit is provided by satellite to execute specific action on storage node within some limits diff --git a/proto.lock b/proto.lock index 7271e4517..e72b07e3e 100644 --- a/proto.lock +++ b/proto.lock @@ -985,6 +985,21 @@ "id": 3, "name": "replacement_piece_hash", "type": "orders.PieceHash" + }, + { + "id": 4, + "name": "piece_id", + "type": "bytes", + "options": [ + { + "name": "(gogoproto.customtype)", + "value": "PieceID" + }, + { + "name": "(gogoproto.nullable)", + "value": "false" + } + ] } ] }, @@ -4501,6 +4516,10 @@ { "name": "DELETE", "integer": 6 + }, + { + "name": "PUT_GRACEFUL_EXIT", + "integer": 7 } ] }, diff --git a/satellite/gracefulexit/chore.go b/satellite/gracefulexit/chore.go index de92246d3..750d5db5a 100644 --- a/satellite/gracefulexit/chore.go +++ b/satellite/gracefulexit/chore.go @@ -8,15 +8,12 @@ import ( "time" "go.uber.org/zap" - "gopkg.in/spacemonkeygo/monkit.v2" "storj.io/storj/internal/sync2" "storj.io/storj/satellite/metainfo" "storj.io/storj/satellite/overlay" ) -var mon = monkit.Package() - // Chore populates the graceful exit transfer queue. // // architecture: Chore @@ -29,14 +26,8 @@ type Chore struct { metainfoLoop *metainfo.Loop } -// Config for the chore -type Config struct { - ChoreBatchSize int `help:"size of the buffer used to batch inserts into the transfer queue." default:"500"` - ChoreInterval time.Duration `help:"how often to run the transfer queue chore." releaseDefault:"30s" devDefault:"10s"` -} - // NewChore instantiates Chore. -func NewChore(log *zap.Logger, db DB, overlay overlay.DB, config Config, metaLoop *metainfo.Loop) *Chore { +func NewChore(log *zap.Logger, db DB, overlay overlay.DB, metaLoop *metainfo.Loop, config Config) *Chore { return &Chore{ log: log, Loop: *sync2.NewCycle(config.ChoreInterval), diff --git a/satellite/gracefulexit/common.go b/satellite/gracefulexit/common.go new file mode 100644 index 000000000..30c48352c --- /dev/null +++ b/satellite/gracefulexit/common.go @@ -0,0 +1,27 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package gracefulexit + +import ( + "time" + + "github.com/zeebo/errs" + "gopkg.in/spacemonkeygo/monkit.v2" +) + +var ( + // Error is the default error class for graceful exit package. + Error = errs.Class("gracefulexit") + + mon = monkit.Package() +) + +// Config for the chore +type Config struct { + ChoreBatchSize int `help:"size of the buffer used to batch inserts into the transfer queue." default:"500"` + ChoreInterval time.Duration `help:"how often to run the transfer queue chore." releaseDefault:"30s" devDefault:"10s"` + + EndpointBatchSize int `help:"size of the buffer used to batch transfer queue reads and sends to the storage node." default:"100"` + EndpointMaxFailures int `help:"maximum number of transfer failures per piece." default:"3"` +} diff --git a/satellite/gracefulexit/db.go b/satellite/gracefulexit/db.go index c5d44c664..395704460 100644 --- a/satellite/gracefulexit/db.go +++ b/satellite/gracefulexit/db.go @@ -26,11 +26,11 @@ type TransferQueueItem struct { PieceNum int32 DurabilityRatio float64 QueuedAt time.Time - RequestedAt time.Time - LastFailedAt time.Time - LastFailedCode int - FailedCount int - FinishedAt time.Time + RequestedAt *time.Time + LastFailedAt *time.Time + LastFailedCode *int + FailedCount *int + FinishedAt *time.Time } // DB implements CRUD operations for graceful exit service @@ -54,6 +54,10 @@ type DB interface { DeleteFinishedTransferQueueItems(ctx context.Context, nodeID storj.NodeID) error // GetTransferQueueItem gets a graceful exit transfer queue entry. GetTransferQueueItem(ctx context.Context, nodeID storj.NodeID, path []byte) (*TransferQueueItem, error) - // GetIncomplete gets incomplete graceful exit transfer queue entries ordered by the queued date ascending. + // GetIncomplete gets incomplete graceful exit transfer queue entries ordered by durability ratio and queued date ascending. GetIncomplete(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) ([]*TransferQueueItem, error) + // GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries in the database ordered by durability ratio and queued date ascending. + GetIncompleteNotFailed(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) ([]*TransferQueueItem, error) + // GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries that have failed <= maxFailures times, ordered by durability ratio and queued date ascending. + GetIncompleteFailed(ctx context.Context, nodeID storj.NodeID, maxFailures int, limit int, offset int64) ([]*TransferQueueItem, error) } diff --git a/satellite/gracefulexit/db_test.go b/satellite/gracefulexit/db_test.go index 8611531ec..de86122c6 100644 --- a/satellite/gracefulexit/db_test.go +++ b/satellite/gracefulexit/db_test.go @@ -103,8 +103,9 @@ func TestTransferQueueItem(t *testing.T) { item, err := geDB.GetTransferQueueItem(ctx, tqi.NodeID, tqi.Path) require.NoError(t, err) + now := time.Now().UTC() item.DurabilityRatio = 1.2 - item.RequestedAt = time.Now() + item.RequestedAt = &now err = geDB.UpdateTransferQueueItem(ctx, *item) require.NoError(t, err) @@ -124,7 +125,9 @@ func TestTransferQueueItem(t *testing.T) { item, err := geDB.GetTransferQueueItem(ctx, nodeID1, path1) require.NoError(t, err) - item.FinishedAt = time.Now() + now := time.Now().UTC() + item.FinishedAt = &now + err = geDB.UpdateTransferQueueItem(ctx, *item) require.NoError(t, err) diff --git a/satellite/gracefulexit/endpoint.go b/satellite/gracefulexit/endpoint.go new file mode 100644 index 000000000..40fe63cc4 --- /dev/null +++ b/satellite/gracefulexit/endpoint.go @@ -0,0 +1,468 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package gracefulexit + +import ( + "context" + "io" + "sync" + "sync/atomic" + "time" + + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + + "storj.io/storj/pkg/identity" + "storj.io/storj/pkg/pb" + "storj.io/storj/pkg/rpc/rpcstatus" + "storj.io/storj/pkg/storj" + "storj.io/storj/satellite/metainfo" + "storj.io/storj/satellite/orders" + "storj.io/storj/satellite/overlay" + "storj.io/storj/uplink/eestream" +) + +// millis for the transfer queue building ticker +const buildQueueMillis = 100 + +// drpcEndpoint wraps streaming methods so that they can be used with drpc +type drpcEndpoint struct{ *Endpoint } + +// processStream is the minimum interface required to process requests. +type processStream interface { + Context() context.Context + Send(*pb.SatelliteMessage) error + Recv() (*pb.StorageNodeMessage, error) +} + +// Endpoint for handling the transfer of pieces for Graceful Exit. +type Endpoint struct { + log *zap.Logger + interval time.Duration + db DB + overlaydb overlay.DB + overlay *overlay.Service + metainfo *metainfo.Service + orders *orders.Service + config Config +} + +type pendingTransfer struct { + path []byte + pieceSize int64 + satelliteMessage *pb.SatelliteMessage +} + +// pendingMap for managing concurrent access to the pending transfer map. +type pendingMap struct { + mu sync.RWMutex + data map[storj.PieceID]*pendingTransfer +} + +// newPendingMap creates a new pendingMap and instantiates the map. +func newPendingMap() *pendingMap { + newData := make(map[storj.PieceID]*pendingTransfer) + return &pendingMap{ + data: newData, + } +} + +// put adds to the map. +func (pm *pendingMap) put(pieceID storj.PieceID, pendingTransfer *pendingTransfer) { + pm.mu.Lock() + defer pm.mu.Unlock() + + pm.data[pieceID] = pendingTransfer +} + +// get returns the pending transfer item from the map, if it exists. +func (pm *pendingMap) get(pieceID storj.PieceID) (pendingTransfer *pendingTransfer, ok bool) { + pm.mu.RLock() + defer pm.mu.RUnlock() + + pendingTransfer, ok = pm.data[pieceID] + return pendingTransfer, ok +} + +// length returns the number of elements in the map. +func (pm *pendingMap) length() int { + pm.mu.RLock() + defer pm.mu.RUnlock() + + return len(pm.data) +} + +// delete removes the pending transfer item from the map. +func (pm *pendingMap) delete(pieceID storj.PieceID) { + pm.mu.Lock() + defer pm.mu.Unlock() + + delete(pm.data, pieceID) +} + +// DRPC returns a DRPC form of the endpoint. +func (endpoint *Endpoint) DRPC() pb.DRPCSatelliteGracefulExitServer { + return &drpcEndpoint{Endpoint: endpoint} +} + +// NewEndpoint creates a new graceful exit endpoint. +func NewEndpoint(log *zap.Logger, db DB, overlaydb overlay.DB, overlay *overlay.Service, metainfo *metainfo.Service, orders *orders.Service, config Config) *Endpoint { + return &Endpoint{ + log: log, + interval: time.Millisecond * buildQueueMillis, + db: db, + overlaydb: overlaydb, + overlay: overlay, + metainfo: metainfo, + orders: orders, + config: config, + } +} + +// Process is called by storage nodes to receive pieces to transfer to new nodes and get exit status. +func (endpoint *Endpoint) Process(stream pb.SatelliteGracefulExit_ProcessServer) error { + return endpoint.doProcess(stream) +} + +// Process is called by storage nodes to receive pieces to transfer to new nodes and get exit status. +func (endpoint *drpcEndpoint) Process(stream pb.DRPCSatelliteGracefulExit_ProcessStream) error { + return endpoint.doProcess(stream) +} + +func (endpoint *Endpoint) doProcess(stream processStream) (err error) { + ctx := stream.Context() + defer mon.Task()(&ctx)(&err) + + peer, err := identity.PeerIdentityFromContext(ctx) + if err != nil { + return rpcstatus.Error(rpcstatus.Unauthenticated, err.Error()) + } + // TODO should we error if the node is DQ'd? + + nodeID := peer.ID + endpoint.log.Debug("graceful exit process.", zap.String("nodeID", nodeID.String())) + + eofHandler := func(err error) error { + if err == io.EOF { + endpoint.log.Debug("received EOF when trying to receive messages from storage node.", zap.String("nodeID", nodeID.String())) + return nil + } + return rpcstatus.Error(rpcstatus.Unknown, err.Error()) + } + + exitStatus, err := endpoint.overlaydb.GetExitStatus(ctx, nodeID) + if err != nil { + return Error.Wrap(err) + } + + if exitStatus.ExitFinishedAt != nil { + // TODO revisit this. Should check if signature was sent + completed := &pb.SatelliteMessage{Message: &pb.SatelliteMessage_ExitCompleted{ExitCompleted: &pb.ExitCompleted{}}} + err = stream.Send(completed) + return Error.Wrap(err) + } + + if exitStatus.ExitInitiatedAt == nil { + request := &overlay.ExitStatusRequest{NodeID: nodeID, ExitInitiatedAt: time.Now().UTC()} + _, err = endpoint.overlaydb.UpdateExitStatus(ctx, request) + if err != nil { + return Error.Wrap(err) + } + + err = stream.Send(&pb.SatelliteMessage{Message: &pb.SatelliteMessage_NotReady{NotReady: &pb.NotReady{}}}) + return Error.Wrap(err) + } + + if exitStatus.ExitLoopCompletedAt == nil { + err = stream.Send(&pb.SatelliteMessage{Message: &pb.SatelliteMessage_NotReady{NotReady: &pb.NotReady{}}}) + return Error.Wrap(err) + } + + pending := newPendingMap() + + var morePiecesFlag int32 = 1 + + var group errgroup.Group + group.Go(func() error { + ticker := time.NewTicker(endpoint.interval) + defer ticker.Stop() + + for range ticker.C { + if pending.length() == 0 { + incomplete, err := endpoint.db.GetIncompleteNotFailed(ctx, nodeID, endpoint.config.EndpointBatchSize, 0) + if err != nil { + return Error.Wrap(err) + } + + if len(incomplete) == 0 { + incomplete, err = endpoint.db.GetIncompleteFailed(ctx, nodeID, endpoint.config.EndpointMaxFailures, endpoint.config.EndpointBatchSize, 0) + if err != nil { + return Error.Wrap(err) + } + } + + if len(incomplete) == 0 { + endpoint.log.Debug("no more pieces to transfer for node.", zap.String("node ID", nodeID.String())) + atomic.StoreInt32(&morePiecesFlag, 0) + break + } + + for _, inc := range incomplete { + err = endpoint.processIncomplete(ctx, stream, pending, inc) + if err != nil { + return Error.Wrap(err) + } + } + } + } + return nil + }) + + for { + pendingCount := pending.length() + // if there are no more transfers and the pending queue is empty, send complete + if atomic.LoadInt32(&morePiecesFlag) == 0 && pendingCount == 0 { + // TODO check whether failure threshold is met before sending completed + // TODO needs exit signature + transferMsg := &pb.SatelliteMessage{ + Message: &pb.SatelliteMessage_ExitCompleted{ + ExitCompleted: &pb.ExitCompleted{}, + }, + } + err = stream.Send(transferMsg) + if err != nil { + return Error.Wrap(err) + } + break + } + // skip if there are none pending + if pendingCount == 0 { + continue + } + + request, err := stream.Recv() + if err != nil { + return eofHandler(err) + } + + switch m := request.GetMessage().(type) { + case *pb.StorageNodeMessage_Succeeded: + err = endpoint.handleSucceeded(ctx, pending, nodeID, m) + if err != nil { + return Error.Wrap(err) + } + + case *pb.StorageNodeMessage_Failed: + err = endpoint.handleFailed(ctx, pending, nodeID, m) + if err != nil { + return Error.Wrap(err) + } + default: + return Error.New("unknown storage node message: %v", m) + } + } + + if err := group.Wait(); err != nil { + return err + } + + return nil +} + +func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream processStream, pending *pendingMap, incomplete *TransferQueueItem) error { + nodeID := incomplete.NodeID + pointer, err := endpoint.metainfo.Get(ctx, string(incomplete.Path)) + if err != nil { + return Error.Wrap(err) + } + remote := pointer.GetRemote() + + pieces := remote.GetRemotePieces() + var nodePiece *pb.RemotePiece + excludedNodeIDs := make([]storj.NodeID, len(pieces)) + for i, piece := range pieces { + if piece.NodeId == nodeID && piece.PieceNum == incomplete.PieceNum { + nodePiece = piece + } + excludedNodeIDs[i] = piece.NodeId + } + + if nodePiece == nil { + endpoint.log.Debug("piece no longer held by node.", zap.String("node ID", nodeID.String()), zap.ByteString("path", incomplete.Path), zap.Int32("piece num", incomplete.PieceNum)) + + err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Path) + if err != nil { + return Error.Wrap(err) + } + + return nil + } + + redundancy, err := eestream.NewRedundancyStrategyFromProto(pointer.GetRemote().GetRedundancy()) + if err != nil { + return Error.Wrap(err) + } + + if len(remote.GetRemotePieces()) > redundancy.OptimalThreshold() { + endpoint.log.Debug("piece has more pieces than required. removing node from pointer.", zap.String("node ID", nodeID.String()), zap.ByteString("path", incomplete.Path), zap.Int32("piece num", incomplete.PieceNum)) + + _, err = endpoint.metainfo.UpdatePieces(ctx, string(incomplete.Path), pointer, nil, []*pb.RemotePiece{nodePiece}) + if err != nil { + return Error.Wrap(err) + } + + err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Path) + if err != nil { + return Error.Wrap(err) + } + + return nil + } + + pieceSize := eestream.CalcPieceSize(pointer.GetSegmentSize(), redundancy) + + request := overlay.FindStorageNodesRequest{ + RequestedCount: 1, + FreeBandwidth: pieceSize, + FreeDisk: pieceSize, + ExcludedNodes: excludedNodeIDs, + } + + newNodes, err := endpoint.overlay.FindStorageNodes(ctx, request) + if err != nil { + return Error.Wrap(err) + } + + if len(newNodes) == 0 { + return Error.New("could not find a node to transfer this piece to. nodeID %v, path %v, pieceNum %v.", nodeID.String(), zap.ByteString("path", incomplete.Path), incomplete.PieceNum) + } + newNode := newNodes[0] + endpoint.log.Debug("found new node for piece transfer.", zap.String("original node ID", nodeID.String()), zap.String("replacement node ID", newNode.Id.String()), + zap.ByteString("path", incomplete.Path), zap.Int32("piece num", incomplete.PieceNum)) + + pieceID := remote.RootPieceId.Derive(nodeID, incomplete.PieceNum) + + parts := storj.SplitPath(storj.Path(incomplete.Path)) + if len(parts) < 2 { + return Error.New("invalid path for %v %v.", zap.String("node ID", incomplete.NodeID.String()), zap.String("pieceID", pieceID.String())) + } + + bucketID := []byte(storj.JoinPaths(parts[0], parts[1])) + limit, privateKey, err := endpoint.orders.CreateGracefulExitPutOrderLimit(ctx, bucketID, newNode.Id, incomplete.PieceNum, remote.RootPieceId, remote.Redundancy.GetErasureShareSize()) + if err != nil { + return Error.Wrap(err) + } + + transferMsg := &pb.SatelliteMessage{ + Message: &pb.SatelliteMessage_TransferPiece{ + TransferPiece: &pb.TransferPiece{ + PieceId: pieceID, // original piece ID + AddressedOrderLimit: limit, + PrivateKey: privateKey, + }, + }, + } + err = stream.Send(transferMsg) + if err != nil { + return Error.Wrap(err) + } + pending.put(pieceID, &pendingTransfer{ + path: incomplete.Path, + pieceSize: pieceSize, + satelliteMessage: transferMsg, + }) + + return nil +} + +func (endpoint *Endpoint) handleSucceeded(ctx context.Context, pending *pendingMap, nodeID storj.NodeID, message *pb.StorageNodeMessage_Succeeded) (err error) { + defer mon.Task()(&ctx)(&err) + if message.Succeeded.GetAddressedOrderLimit() == nil { + return Error.New("Addressed order limit cannot be nil.") + } + if message.Succeeded.GetOriginalPieceHash() == nil { + return Error.New("Original piece hash cannot be nil.") + } + + pieceID := message.Succeeded.PieceId + endpoint.log.Debug("transfer succeeded.", zap.String("piece ID", pieceID.String())) + + // TODO validation + + transfer, ok := pending.get(pieceID) + if !ok { + endpoint.log.Debug("could not find transfer message in pending queue. skipping .", zap.String("piece ID", pieceID.String())) + + // TODO we should probably error out here so we don't get stuck in a loop with a SN that is not behaving properly + } + + transferQueueItem, err := endpoint.db.GetTransferQueueItem(ctx, nodeID, transfer.path) + if err != nil { + return Error.Wrap(err) + } + + var failed int64 + if transferQueueItem.FailedCount != nil && *transferQueueItem.FailedCount > 0 { + failed = -1 + } + + err = endpoint.db.IncrementProgress(ctx, nodeID, transfer.pieceSize, 1, failed) + if err != nil { + return Error.Wrap(err) + } + + err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.path) + if err != nil { + return Error.Wrap(err) + } + + pending.delete(pieceID) + + return nil +} + +func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *pendingMap, nodeID storj.NodeID, message *pb.StorageNodeMessage_Failed) (err error) { + defer mon.Task()(&ctx)(&err) + endpoint.log.Warn("transfer failed.", zap.String("piece ID", message.Failed.PieceId.String()), zap.String("transfer error", message.Failed.GetError().String())) + pieceID := message.Failed.PieceId + transfer, ok := pending.get(pieceID) + if !ok { + endpoint.log.Debug("could not find transfer message in pending queue. skipping .", zap.String("piece ID", pieceID.String())) + + // TODO we should probably error out here so we don't get stuck in a loop with a SN that is not behaving properl + } + transferQueueItem, err := endpoint.db.GetTransferQueueItem(ctx, nodeID, transfer.path) + if err != nil { + return Error.Wrap(err) + } + now := time.Now().UTC() + failedCount := 1 + if transferQueueItem.FailedCount != nil { + failedCount = *transferQueueItem.FailedCount + 1 + } + + errorCode := int(pb.TransferFailed_Error_value[message.Failed.Error.String()]) + + // TODO if error code is NOT_FOUND, the node no longer has the piece. remove the queue item and the pointer + + transferQueueItem.LastFailedAt = &now + transferQueueItem.FailedCount = &failedCount + transferQueueItem.LastFailedCode = &errorCode + err = endpoint.db.UpdateTransferQueueItem(ctx, *transferQueueItem) + if err != nil { + return Error.Wrap(err) + } + + // only increment failed if it hasn't failed before + if failedCount == 1 { + err = endpoint.db.IncrementProgress(ctx, nodeID, 0, 0, 1) + if err != nil { + return Error.Wrap(err) + } + } + + pending.delete(pieceID) + + return nil +} diff --git a/satellite/gracefulexit/endpoint_test.go b/satellite/gracefulexit/endpoint_test.go new file mode 100644 index 000000000..4df1ef073 --- /dev/null +++ b/satellite/gracefulexit/endpoint_test.go @@ -0,0 +1,260 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package gracefulexit_test + +import ( + "context" + "io" + "strconv" + "testing" + + "github.com/stretchr/testify/require" + "github.com/zeebo/errs" + "google.golang.org/grpc" + + "storj.io/storj/internal/memory" + "storj.io/storj/internal/testcontext" + "storj.io/storj/internal/testplanet" + "storj.io/storj/internal/testrand" + "storj.io/storj/pkg/pb" + "storj.io/storj/pkg/storj" + "storj.io/storj/storagenode" + "storj.io/storj/uplink" +) + +const numObjects = 6 + +func TestSuccess(t *testing.T) { + testTransfers(t, numObjects, func(ctx *testcontext.Context, satellite *testplanet.SatelliteSystem, processClient pb.SatelliteGracefulExit_ProcessClient, exitingNode *storagenode.Peer, numPieces int) { + var pieceID storj.PieceID + failedCount := 0 + for { + response, err := processClient.Recv() + if err == io.EOF { + // Done + break + } + require.NoError(t, err) + + switch m := response.GetMessage().(type) { + case *pb.SatelliteMessage_TransferPiece: + require.NotNil(t, m) + + // pick the first one to fail + if pieceID.IsZero() { + pieceID = m.TransferPiece.PieceId + } + + if failedCount > 0 || pieceID != m.TransferPiece.PieceId { + success := &pb.StorageNodeMessage{ + Message: &pb.StorageNodeMessage_Succeeded{ + Succeeded: &pb.TransferSucceeded{ + PieceId: m.TransferPiece.PieceId, + OriginalPieceHash: &pb.PieceHash{PieceId: m.TransferPiece.PieceId}, + AddressedOrderLimit: &pb.AddressedOrderLimit{ + Limit: &pb.OrderLimit{ + PieceId: m.TransferPiece.AddressedOrderLimit.Limit.PieceId, + }, + }, + }, + }, + } + err = processClient.Send(success) + require.NoError(t, err) + } else { + failedCount++ + failed := &pb.StorageNodeMessage{ + Message: &pb.StorageNodeMessage_Failed{ + Failed: &pb.TransferFailed{ + PieceId: m.TransferPiece.PieceId, + Error: pb.TransferFailed_UNKNOWN, + }, + }, + } + err = processClient.Send(failed) + require.NoError(t, err) + } + case *pb.SatelliteMessage_ExitCompleted: + // TODO test completed signature stuff + break + default: + t.FailNow() + } + } + + // check that the exit has completed and we have the correct transferred/failed values + progress, err := satellite.DB.GracefulExit().GetProgress(ctx, exitingNode.ID()) + require.NoError(t, err) + + require.EqualValues(t, numPieces, progress.PiecesTransferred) + // even though we failed 1, it eventually succeeded, so the count should be 0 + require.EqualValues(t, 0, progress.PiecesFailed) + }) +} + +func TestFailure(t *testing.T) { + testTransfers(t, 1, func(ctx *testcontext.Context, satellite *testplanet.SatelliteSystem, processClient pb.SatelliteGracefulExit_ProcessClient, exitingNode *storagenode.Peer, numPieces int) { + for { + response, err := processClient.Recv() + if err == io.EOF { + // Done + break + } + require.NoError(t, err) + + switch m := response.GetMessage().(type) { + case *pb.SatelliteMessage_TransferPiece: + require.NotNil(t, m) + + failed := &pb.StorageNodeMessage{ + Message: &pb.StorageNodeMessage_Failed{ + Failed: &pb.TransferFailed{ + PieceId: m.TransferPiece.PieceId, + Error: pb.TransferFailed_UNKNOWN, + }, + }, + } + err = processClient.Send(failed) + require.NoError(t, err) + case *pb.SatelliteMessage_ExitCompleted: + // TODO test completed signature stuff + break + default: + t.FailNow() + } + } + + // check that the exit has completed and we have the correct transferred/failed values + progress, err := satellite.DB.GracefulExit().GetProgress(ctx, exitingNode.ID()) + require.NoError(t, err) + + require.Equal(t, int64(0), progress.PiecesTransferred) + require.Equal(t, int64(1), progress.PiecesFailed) + }) +} + +func testTransfers(t *testing.T, objects int, verifier func(ctx *testcontext.Context, satellite *testplanet.SatelliteSystem, processClient pb.SatelliteGracefulExit_ProcessClient, exitingNode *storagenode.Peer, numPieces int)) { + successThreshold := 8 + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, + StorageNodeCount: successThreshold + 1, + UplinkCount: 1, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + uplinkPeer := planet.Uplinks[0] + satellite := planet.Satellites[0] + + satellite.GracefulExit.Chore.Loop.Pause() + + rs := &uplink.RSConfig{ + MinThreshold: 4, + RepairThreshold: 6, + SuccessThreshold: successThreshold, + MaxThreshold: successThreshold, + } + + for i := 0; i < objects; i++ { + err := uplinkPeer.UploadWithConfig(ctx, satellite, rs, "testbucket", "test/path"+strconv.Itoa(i), testrand.Bytes(5*memory.KiB)) + require.NoError(t, err) + } + // check that there are no exiting nodes. + exitingNodeIDs, err := satellite.DB.OverlayCache().GetExitingNodes(ctx) + require.NoError(t, err) + require.Len(t, exitingNodeIDs, 0) + + exitingNode, err := findNodeToExit(ctx, planet, objects) + require.NoError(t, err) + + // connect to satellite so we initiate the exit. + conn, err := exitingNode.Dialer.DialAddressID(ctx, satellite.Addr(), satellite.Identity.ID) + require.NoError(t, err) + defer func() { + err = errs.Combine(err, conn.Close()) + }() + + client := conn.SatelliteGracefulExitClient() + + c, err := client.Process(ctx, grpc.EmptyCallOption{}) + require.NoError(t, err) + + response, err := c.Recv() + require.NoError(t, err) + + // should get a NotReady since the metainfo loop would not be finished at this point. + switch response.GetMessage().(type) { + case *pb.SatelliteMessage_NotReady: + // now check that the exiting node is initiated. + exitingNodeIDs, err := satellite.DB.OverlayCache().GetExitingNodes(ctx) + require.NoError(t, err) + require.Len(t, exitingNodeIDs, 1) + + require.Equal(t, exitingNode.ID(), exitingNodeIDs[0]) + default: + t.FailNow() + } + // close the old client + require.NoError(t, c.CloseSend()) + + // trigger the metainfo loop chore so we can get some pieces to transfer + satellite.GracefulExit.Chore.Loop.TriggerWait() + + // make sure all the pieces are in the transfer queue + incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), objects, 0) + require.NoError(t, err) + + // connect to satellite again to start receiving transfers + c, err = client.Process(ctx, grpc.EmptyCallOption{}) + require.NoError(t, err) + defer func() { + err = errs.Combine(err, c.CloseSend()) + }() + + verifier(ctx, satellite, c, exitingNode, len(incompleteTransfers)) + }) +} + +func findNodeToExit(ctx context.Context, planet *testplanet.Planet, objects int) (*storagenode.Peer, error) { + satellite := planet.Satellites[0] + keys, err := satellite.Metainfo.Database.List(ctx, nil, objects) + if err != nil { + return nil, err + } + + pieceCountMap := make(map[storj.NodeID]int, len(planet.StorageNodes)) + for _, sn := range planet.StorageNodes { + pieceCountMap[sn.ID()] = 0 + } + + for _, key := range keys { + pointer, err := satellite.Metainfo.Service.Get(ctx, string(key)) + if err != nil { + return nil, err + } + pieces := pointer.GetRemote().GetRemotePieces() + for _, piece := range pieces { + pieceCountMap[piece.NodeId]++ + } + } + + var exitingNodeID storj.NodeID + maxCount := 0 + for k, v := range pieceCountMap { + if exitingNodeID.IsZero() { + exitingNodeID = k + maxCount = v + continue + } + if v > maxCount { + exitingNodeID = k + maxCount = v + } + } + + for _, sn := range planet.StorageNodes { + if sn.ID() == exitingNodeID { + return sn, nil + } + } + + return nil, nil +} diff --git a/satellite/gracefulexit/pathcollector.go b/satellite/gracefulexit/pathcollector.go index 2a168740e..981642b6c 100644 --- a/satellite/gracefulexit/pathcollector.go +++ b/satellite/gracefulexit/pathcollector.go @@ -21,7 +21,7 @@ var _ metainfo.Observer = (*PathCollector)(nil) // architecture: Observer type PathCollector struct { db DB - nodeIDMap map[storj.NodeID]struct{} + nodeIDs map[storj.NodeID]struct{} buffer []TransferQueueItem log *zap.Logger batchSize int @@ -38,9 +38,9 @@ func NewPathCollector(db DB, nodeIDs storj.NodeIDList, log *zap.Logger, batchSiz } if len(nodeIDs) > 0 { - collector.nodeIDMap = make(map[storj.NodeID]struct{}, len(nodeIDs)) + collector.nodeIDs = make(map[storj.NodeID]struct{}, len(nodeIDs)) for _, nodeID := range nodeIDs { - collector.nodeIDMap[nodeID] = struct{}{} + collector.nodeIDs[nodeID] = struct{}{} } } @@ -54,13 +54,13 @@ func (collector *PathCollector) Flush(ctx context.Context) (err error) { // RemoteSegment takes a remote segment found in metainfo and creates a graceful exit transfer queue item if it doesn't exist already func (collector *PathCollector) RemoteSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) { - if len(collector.nodeIDMap) == 0 { + if len(collector.nodeIDs) == 0 { return nil } numPieces := int32(len(pointer.GetRemote().GetRemotePieces())) for _, piece := range pointer.GetRemote().GetRemotePieces() { - if _, ok := collector.nodeIDMap[piece.NodeId]; !ok { + if _, ok := collector.nodeIDs[piece.NodeId]; !ok { continue } diff --git a/satellite/orders/service.go b/satellite/orders/service.go index 5f3822d03..ad0ce05e8 100644 --- a/satellite/orders/service.go +++ b/satellite/orders/service.go @@ -707,6 +707,72 @@ func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucketID return limits, piecePrivateKey, nil } +// CreateGracefulExitPutOrderLimit creates an order limit for graceful exit put transfers. +func (service *Service) CreateGracefulExitPutOrderLimit(ctx context.Context, bucketID []byte, nodeID storj.NodeID, pieceNum int32, rootPieceID storj.PieceID, shareSize int32) (limit *pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error) { + defer mon.Task()(&ctx)(&err) + + orderExpiration := time.Now().UTC().Add(service.orderExpiration) + + piecePublicKey, piecePrivateKey, err := storj.NewPieceKey() + if err != nil { + return nil, storj.PiecePrivateKey{}, Error.Wrap(err) + } + + serialNumber, err := service.createSerial(ctx) + if err != nil { + return nil, storj.PiecePrivateKey{}, Error.Wrap(err) + } + + node, err := service.overlay.Get(ctx, nodeID) + if err != nil { + return nil, storj.PiecePrivateKey{}, Error.Wrap(err) + } + + if node.Disqualified != nil { + return nil, storj.PiecePrivateKey{}, overlay.ErrNodeDisqualified.New("%v", nodeID) + } + + if !service.overlay.IsOnline(node) { + return nil, storj.PiecePrivateKey{}, overlay.ErrNodeOffline.New("%v", nodeID) + } + + orderLimit, err := signing.SignOrderLimit(ctx, service.satellite, &pb.OrderLimit{ + SerialNumber: serialNumber, + SatelliteId: service.satellite.ID(), + SatelliteAddress: service.satelliteAddress, + UplinkPublicKey: piecePublicKey, + StorageNodeId: nodeID, + PieceId: rootPieceID.Derive(nodeID, pieceNum), + Action: pb.PieceAction_PUT_GRACEFUL_EXIT, + Limit: int64(shareSize), + OrderCreation: time.Now().UTC(), + OrderExpiration: orderExpiration, + }) + if err != nil { + return nil, storj.PiecePrivateKey{}, Error.Wrap(err) + } + + limit = &pb.AddressedOrderLimit{ + Limit: orderLimit, + StorageNodeAddress: node.Address, + } + + err = service.saveSerial(ctx, serialNumber, bucketID, orderExpiration) + if err != nil { + return nil, storj.PiecePrivateKey{}, Error.Wrap(err) + } + + projectID, bucketName, err := SplitBucketID(bucketID) + if err != nil { + return limit, storj.PiecePrivateKey{}, Error.Wrap(err) + } + if err := service.updateBandwidth(ctx, *projectID, bucketName, limit); err != nil { + return nil, storj.PiecePrivateKey{}, Error.Wrap(err) + } + + return limit, piecePrivateKey, nil +} + // UpdateGetInlineOrder updates amount of inline GET bandwidth for given bucket func (service *Service) UpdateGetInlineOrder(ctx context.Context, projectID uuid.UUID, bucketName []byte, amount int64) (err error) { defer mon.Task()(&ctx)(&err) diff --git a/satellite/overlay/service.go b/satellite/overlay/service.go index 6a69355df..fb389a422 100644 --- a/satellite/overlay/service.go +++ b/satellite/overlay/service.go @@ -77,6 +77,8 @@ type DB interface { GetExitingNodes(ctx context.Context) (exitingNodes storj.NodeIDList, err error) // GetExitingNodesLoopIncomplete returns exiting nodes who haven't completed the metainfo loop iteration. GetExitingNodesLoopIncomplete(ctx context.Context) (exitingNodes storj.NodeIDList, err error) + + GetExitStatus(ctx context.Context, nodeID storj.NodeID) (exitStatus *ExitStatus, err error) } // NodeCheckInInfo contains all the info that will be updated when a node checkins @@ -129,6 +131,14 @@ type UpdateRequest struct { UptimeDQ float64 } +// ExitStatus is used for reading graceful exit status. +type ExitStatus struct { + NodeID storj.NodeID + ExitInitiatedAt *time.Time + ExitLoopCompletedAt *time.Time + ExitFinishedAt *time.Time +} + // ExitStatusRequest is used to update a node's graceful exit status. type ExitStatusRequest struct { NodeID storj.NodeID @@ -148,6 +158,7 @@ type NodeDossier struct { Contained bool Disqualified *time.Time PieceCount int64 + ExitStatus ExitStatus } // NodeStats contains statistics about a node. diff --git a/satellite/overlay/service_test.go b/satellite/overlay/service_test.go index fb125853b..5f897df52 100644 --- a/satellite/overlay/service_test.go +++ b/satellite/overlay/service_test.go @@ -375,6 +375,7 @@ func TestUpdateCheckIn(t *testing.T) { Contained: false, Disqualified: nil, PieceCount: 0, + ExitStatus: overlay.ExitStatus{NodeID: nodeID}, } config := overlay.NodeSelectionConfig{ UptimeReputationLambda: 0.99, @@ -404,7 +405,7 @@ func TestUpdateCheckIn(t *testing.T) { expectedNode.Reputation.LastContactSuccess = actualNode.Reputation.LastContactSuccess expectedNode.Reputation.LastContactFailure = actualNode.Reputation.LastContactFailure expectedNode.Version.Timestamp = actualNode.Version.Timestamp - require.Equal(t, actualNode, expectedNode) + require.Equal(t, expectedNode, actualNode) // confirm that we can update the address field startOfUpdateTest := time.Now().UTC() diff --git a/satellite/peer.go b/satellite/peer.go index 084372a12..a3a24e63c 100644 --- a/satellite/peer.go +++ b/satellite/peer.go @@ -241,7 +241,8 @@ type Peer struct { } GracefulExit struct { - Chore *gracefulexit.Chore + Endpoint *gracefulexit.Endpoint + Chore *gracefulexit.Chore } } @@ -660,7 +661,12 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo { // setup graceful exit log.Debug("Setting up graceful") - peer.GracefulExit.Chore = gracefulexit.NewChore(peer.Log.Named("graceful exit chore"), peer.DB.GracefulExit(), peer.Overlay.DB, config.GracefulExit, peer.Metainfo.Loop) + peer.GracefulExit.Chore = gracefulexit.NewChore(peer.Log.Named("graceful exit chore"), peer.DB.GracefulExit(), peer.Overlay.DB, peer.Metainfo.Loop, config.GracefulExit) + + peer.GracefulExit.Endpoint = gracefulexit.NewEndpoint(peer.Log.Named("gracefulexit:endpoint"), peer.DB.GracefulExit(), peer.Overlay.DB, peer.Overlay.Service, peer.Metainfo.Service, peer.Orders.Service, config.GracefulExit) + + pb.RegisterSatelliteGracefulExitServer(peer.Server.GRPC(), peer.GracefulExit.Endpoint) + pb.DRPCRegisterSatelliteGracefulExit(peer.Server.DRPC(), peer.GracefulExit.Endpoint.DRPC()) } return peer, nil diff --git a/satellite/satellitedb/dbx/satellitedb.dbx b/satellite/satellitedb/dbx/satellitedb.dbx index fb8fdc36a..c1064f7f8 100644 --- a/satellite/satellitedb/dbx/satellitedb.dbx +++ b/satellite/satellitedb/dbx/satellitedb.dbx @@ -875,12 +875,6 @@ read one ( where graceful_exit_transfer_queue.node_id = ? where graceful_exit_transfer_queue.path = ? ) -read limitoffset ( - select graceful_exit_transfer_queue - where graceful_exit_transfer_queue.node_id = ? - where graceful_exit_transfer_queue.finished_at = null - orderby asc graceful_exit_transfer_queue.queued_at -) //--- satellite payments ---// diff --git a/satellite/satellitedb/dbx/satellitedb.dbx.go b/satellite/satellitedb/dbx/satellitedb.dbx.go index dd0cec7cf..3e5d06c26 100644 --- a/satellite/satellitedb/dbx/satellitedb.dbx.go +++ b/satellite/satellitedb/dbx/satellitedb.dbx.go @@ -8352,42 +8352,6 @@ func (obj *postgresImpl) Get_GracefulExitTransferQueue_By_NodeId_And_Path(ctx co } -func (obj *postgresImpl) Limited_GracefulExitTransferQueue_By_NodeId_And_FinishedAt_Is_Null_OrderBy_Asc_QueuedAt(ctx context.Context, - graceful_exit_transfer_queue_node_id GracefulExitTransferQueue_NodeId_Field, - limit int, offset int64) ( - rows []*GracefulExitTransferQueue, err error) { - - var __embed_stmt = __sqlbundle_Literal("SELECT graceful_exit_transfer_queue.node_id, graceful_exit_transfer_queue.path, graceful_exit_transfer_queue.piece_num, graceful_exit_transfer_queue.durability_ratio, graceful_exit_transfer_queue.queued_at, graceful_exit_transfer_queue.requested_at, graceful_exit_transfer_queue.last_failed_at, graceful_exit_transfer_queue.last_failed_code, graceful_exit_transfer_queue.failed_count, graceful_exit_transfer_queue.finished_at FROM graceful_exit_transfer_queue WHERE graceful_exit_transfer_queue.node_id = ? AND graceful_exit_transfer_queue.finished_at is NULL ORDER BY graceful_exit_transfer_queue.queued_at LIMIT ? OFFSET ?") - - var __values []interface{} - __values = append(__values, graceful_exit_transfer_queue_node_id.value()) - - __values = append(__values, limit, offset) - - var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) - obj.logStmt(__stmt, __values...) - - __rows, err := obj.driver.Query(__stmt, __values...) - if err != nil { - return nil, obj.makeErr(err) - } - defer __rows.Close() - - for __rows.Next() { - graceful_exit_transfer_queue := &GracefulExitTransferQueue{} - err = __rows.Scan(&graceful_exit_transfer_queue.NodeId, &graceful_exit_transfer_queue.Path, &graceful_exit_transfer_queue.PieceNum, &graceful_exit_transfer_queue.DurabilityRatio, &graceful_exit_transfer_queue.QueuedAt, &graceful_exit_transfer_queue.RequestedAt, &graceful_exit_transfer_queue.LastFailedAt, &graceful_exit_transfer_queue.LastFailedCode, &graceful_exit_transfer_queue.FailedCount, &graceful_exit_transfer_queue.FinishedAt) - if err != nil { - return nil, obj.makeErr(err) - } - rows = append(rows, graceful_exit_transfer_queue) - } - if err := __rows.Err(); err != nil { - return nil, obj.makeErr(err) - } - return rows, nil - -} - func (obj *postgresImpl) Get_StripeCustomers_CustomerId_By_UserId(ctx context.Context, stripe_customers_user_id StripeCustomers_UserId_Field) ( row *CustomerId_Row, err error) { @@ -12572,42 +12536,6 @@ func (obj *sqlite3Impl) Get_GracefulExitTransferQueue_By_NodeId_And_Path(ctx con } -func (obj *sqlite3Impl) Limited_GracefulExitTransferQueue_By_NodeId_And_FinishedAt_Is_Null_OrderBy_Asc_QueuedAt(ctx context.Context, - graceful_exit_transfer_queue_node_id GracefulExitTransferQueue_NodeId_Field, - limit int, offset int64) ( - rows []*GracefulExitTransferQueue, err error) { - - var __embed_stmt = __sqlbundle_Literal("SELECT graceful_exit_transfer_queue.node_id, graceful_exit_transfer_queue.path, graceful_exit_transfer_queue.piece_num, graceful_exit_transfer_queue.durability_ratio, graceful_exit_transfer_queue.queued_at, graceful_exit_transfer_queue.requested_at, graceful_exit_transfer_queue.last_failed_at, graceful_exit_transfer_queue.last_failed_code, graceful_exit_transfer_queue.failed_count, graceful_exit_transfer_queue.finished_at FROM graceful_exit_transfer_queue WHERE graceful_exit_transfer_queue.node_id = ? AND graceful_exit_transfer_queue.finished_at is NULL ORDER BY graceful_exit_transfer_queue.queued_at LIMIT ? OFFSET ?") - - var __values []interface{} - __values = append(__values, graceful_exit_transfer_queue_node_id.value()) - - __values = append(__values, limit, offset) - - var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) - obj.logStmt(__stmt, __values...) - - __rows, err := obj.driver.Query(__stmt, __values...) - if err != nil { - return nil, obj.makeErr(err) - } - defer __rows.Close() - - for __rows.Next() { - graceful_exit_transfer_queue := &GracefulExitTransferQueue{} - err = __rows.Scan(&graceful_exit_transfer_queue.NodeId, &graceful_exit_transfer_queue.Path, &graceful_exit_transfer_queue.PieceNum, &graceful_exit_transfer_queue.DurabilityRatio, &graceful_exit_transfer_queue.QueuedAt, &graceful_exit_transfer_queue.RequestedAt, &graceful_exit_transfer_queue.LastFailedAt, &graceful_exit_transfer_queue.LastFailedCode, &graceful_exit_transfer_queue.FailedCount, &graceful_exit_transfer_queue.FinishedAt) - if err != nil { - return nil, obj.makeErr(err) - } - rows = append(rows, graceful_exit_transfer_queue) - } - if err := __rows.Err(); err != nil { - return nil, obj.makeErr(err) - } - return rows, nil - -} - func (obj *sqlite3Impl) Get_StripeCustomers_CustomerId_By_UserId(ctx context.Context, stripe_customers_user_id StripeCustomers_UserId_Field) ( row *CustomerId_Row, err error) { @@ -16082,17 +16010,6 @@ func (rx *Rx) Limited_BucketUsage_By_BucketId_And_RollupEndTime_Greater_And_Roll return tx.Limited_BucketUsage_By_BucketId_And_RollupEndTime_Greater_And_RollupEndTime_LessOrEqual_OrderBy_Desc_RollupEndTime(ctx, bucket_usage_bucket_id, bucket_usage_rollup_end_time_greater, bucket_usage_rollup_end_time_less_or_equal, limit, offset) } -func (rx *Rx) Limited_GracefulExitTransferQueue_By_NodeId_And_FinishedAt_Is_Null_OrderBy_Asc_QueuedAt(ctx context.Context, - graceful_exit_transfer_queue_node_id GracefulExitTransferQueue_NodeId_Field, - limit int, offset int64) ( - rows []*GracefulExitTransferQueue, err error) { - var tx *Tx - if tx, err = rx.getTx(ctx); err != nil { - return - } - return tx.Limited_GracefulExitTransferQueue_By_NodeId_And_FinishedAt_Is_Null_OrderBy_Asc_QueuedAt(ctx, graceful_exit_transfer_queue_node_id, limit, offset) -} - func (rx *Rx) Limited_Irreparabledb_By_Segmentpath_Greater_OrderBy_Asc_Segmentpath(ctx context.Context, irreparabledb_segmentpath_greater Irreparabledb_Segmentpath_Field, limit int, offset int64) ( @@ -16819,11 +16736,6 @@ type Methods interface { limit int, offset int64) ( rows []*BucketUsage, err error) - Limited_GracefulExitTransferQueue_By_NodeId_And_FinishedAt_Is_Null_OrderBy_Asc_QueuedAt(ctx context.Context, - graceful_exit_transfer_queue_node_id GracefulExitTransferQueue_NodeId_Field, - limit int, offset int64) ( - rows []*GracefulExitTransferQueue, err error) - Limited_Irreparabledb_By_Segmentpath_Greater_OrderBy_Asc_Segmentpath(ctx context.Context, irreparabledb_segmentpath_greater Irreparabledb_Segmentpath_Field, limit int, offset int64) ( diff --git a/satellite/satellitedb/gracefulexit.go b/satellite/satellitedb/gracefulexit.go index 4c78f3286..29d578b9c 100644 --- a/satellite/satellitedb/gracefulexit.go +++ b/satellite/satellitedb/gracefulexit.go @@ -6,11 +6,13 @@ package satellitedb import ( "bytes" "context" + "database/sql" "sort" "time" "github.com/lib/pq" sqlite3 "github.com/mattn/go-sqlite3" + "github.com/zeebo/errs" "storj.io/storj/pkg/storj" "storj.io/storj/satellite/gracefulexit" @@ -121,18 +123,18 @@ func (db *gracefulexitDB) UpdateTransferQueueItem(ctx context.Context, item grac defer mon.Task()(&ctx)(&err) update := dbx.GracefulExitTransferQueue_Update_Fields{ DurabilityRatio: dbx.GracefulExitTransferQueue_DurabilityRatio(item.DurabilityRatio), - LastFailedCode: dbx.GracefulExitTransferQueue_LastFailedCode_Raw(&item.LastFailedCode), - FailedCount: dbx.GracefulExitTransferQueue_FailedCount_Raw(&item.FailedCount), + LastFailedCode: dbx.GracefulExitTransferQueue_LastFailedCode_Raw(item.LastFailedCode), + FailedCount: dbx.GracefulExitTransferQueue_FailedCount_Raw(item.FailedCount), } - if !item.RequestedAt.IsZero() { - update.RequestedAt = dbx.GracefulExitTransferQueue_RequestedAt_Raw(&item.RequestedAt) + if item.RequestedAt != nil { + update.RequestedAt = dbx.GracefulExitTransferQueue_RequestedAt_Raw(item.RequestedAt) } - if !item.LastFailedAt.IsZero() { - update.LastFailedAt = dbx.GracefulExitTransferQueue_LastFailedAt_Raw(&item.LastFailedAt) + if item.LastFailedAt != nil { + update.LastFailedAt = dbx.GracefulExitTransferQueue_LastFailedAt_Raw(item.LastFailedAt) } - if !item.FinishedAt.IsZero() { - update.FinishedAt = dbx.GracefulExitTransferQueue_FinishedAt_Raw(&item.FinishedAt) + if item.FinishedAt != nil { + update.FinishedAt = dbx.GracefulExitTransferQueue_FinishedAt_Raw(item.FinishedAt) } return db.db.UpdateNoReturn_GracefulExitTransferQueue_By_NodeId_And_Path(ctx, @@ -181,23 +183,95 @@ func (db *gracefulexitDB) GetTransferQueueItem(ctx context.Context, nodeID storj return transferQueueItem, Error.Wrap(err) } -// GetIncomplete gets incomplete graceful exit transfer queue entries in the database ordered by the queued date ascending. +// GetIncomplete gets incomplete graceful exit transfer queue entries ordered by durability ratio and queued date ascending. func (db *gracefulexitDB) GetIncomplete(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) (_ []*gracefulexit.TransferQueueItem, err error) { defer mon.Task()(&ctx)(&err) - dbxTransferQueueItemRows, err := db.db.Limited_GracefulExitTransferQueue_By_NodeId_And_FinishedAt_Is_Null_OrderBy_Asc_QueuedAt(ctx, dbx.GracefulExitTransferQueue_NodeId(nodeID.Bytes()), limit, offset) + sql := `SELECT node_id, path, piece_num, durability_ratio, queued_at, requested_at, last_failed_at, last_failed_code, failed_count, finished_at + FROM graceful_exit_transfer_queue + WHERE node_id = ? + AND finished_at is NULL + ORDER BY durability_ratio asc, queued_at asc LIMIT ? OFFSET ?` + rows, err := db.db.Query(db.db.Rebind(sql), nodeID.Bytes(), limit, offset) if err != nil { return nil, Error.Wrap(err) } - var transferQueueItemRows = make([]*gracefulexit.TransferQueueItem, len(dbxTransferQueueItemRows)) - for i, dbxTransferQueue := range dbxTransferQueueItemRows { - transferQueueItem, err := dbxToTransferQueueItem(dbxTransferQueue) + defer func() { + err = errs.Combine(err, rows.Close()) + }() + + transferQueueItemRows, err := scanRows(rows) + if err != nil { + return nil, Error.Wrap(err) + } + + return transferQueueItemRows, nil +} + +// GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries that haven't failed, ordered by durability ratio and queued date ascending. +func (db *gracefulexitDB) GetIncompleteNotFailed(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) (_ []*gracefulexit.TransferQueueItem, err error) { + defer mon.Task()(&ctx)(&err) + sql := `SELECT node_id, path, piece_num, durability_ratio, queued_at, requested_at, last_failed_at, last_failed_code, failed_count, finished_at + FROM graceful_exit_transfer_queue + WHERE node_id = ? + AND finished_at is NULL + AND last_failed_at is NULL + ORDER BY durability_ratio asc, queued_at asc LIMIT ? OFFSET ?` + rows, err := db.db.Query(db.db.Rebind(sql), nodeID.Bytes(), limit, offset) + if err != nil { + return nil, Error.Wrap(err) + } + + defer func() { + err = errs.Combine(err, rows.Close()) + }() + + transferQueueItemRows, err := scanRows(rows) + if err != nil { + return nil, Error.Wrap(err) + } + + return transferQueueItemRows, nil +} + +// GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries that have failed <= maxFailures times, ordered by durability ratio and queued date ascending. +func (db *gracefulexitDB) GetIncompleteFailed(ctx context.Context, nodeID storj.NodeID, maxFailures int, limit int, offset int64) (_ []*gracefulexit.TransferQueueItem, err error) { + defer mon.Task()(&ctx)(&err) + sql := `SELECT node_id, path, piece_num, durability_ratio, queued_at, requested_at, last_failed_at, last_failed_code, failed_count, finished_at + FROM graceful_exit_transfer_queue + WHERE node_id = ? + AND finished_at is NULL + AND last_failed_at is not NULL + AND failed_count <= ? + ORDER BY durability_ratio asc, queued_at asc LIMIT ? OFFSET ?` + rows, err := db.db.Query(db.db.Rebind(sql), nodeID.Bytes(), maxFailures, limit, offset) + if err != nil { + return nil, Error.Wrap(err) + } + + defer func() { + err = errs.Combine(err, rows.Close()) + }() + + transferQueueItemRows, err := scanRows(rows) + if err != nil { + return nil, Error.Wrap(err) + } + + return transferQueueItemRows, nil +} + +func scanRows(rows *sql.Rows) (transferQueueItemRows []*gracefulexit.TransferQueueItem, err error) { + for rows.Next() { + transferQueueItem := &gracefulexit.TransferQueueItem{} + err = rows.Scan(&transferQueueItem.NodeID, &transferQueueItem.Path, &transferQueueItem.PieceNum, &transferQueueItem.DurabilityRatio, &transferQueueItem.QueuedAt, &transferQueueItem.RequestedAt, + &transferQueueItem.LastFailedAt, &transferQueueItem.LastFailedCode, &transferQueueItem.FailedCount, &transferQueueItem.FinishedAt) if err != nil { return nil, Error.Wrap(err) } - transferQueueItemRows[i] = transferQueueItem - } + transferQueueItemRows = append(transferQueueItemRows, transferQueueItem) + } return transferQueueItemRows, nil } @@ -215,19 +289,19 @@ func dbxToTransferQueueItem(dbxTransferQueue *dbx.GracefulExitTransferQueue) (it QueuedAt: dbxTransferQueue.QueuedAt, } if dbxTransferQueue.LastFailedCode != nil { - item.LastFailedCode = *dbxTransferQueue.LastFailedCode + item.LastFailedCode = dbxTransferQueue.LastFailedCode } if dbxTransferQueue.FailedCount != nil { - item.FailedCount = *dbxTransferQueue.FailedCount + item.FailedCount = dbxTransferQueue.FailedCount } if dbxTransferQueue.RequestedAt != nil && !dbxTransferQueue.RequestedAt.IsZero() { - item.RequestedAt = *dbxTransferQueue.RequestedAt + item.RequestedAt = dbxTransferQueue.RequestedAt } if dbxTransferQueue.LastFailedAt != nil && !dbxTransferQueue.LastFailedAt.IsZero() { - item.LastFailedAt = *dbxTransferQueue.LastFailedAt + item.LastFailedAt = dbxTransferQueue.LastFailedAt } if dbxTransferQueue.FinishedAt != nil && !dbxTransferQueue.FinishedAt.IsZero() { - item.FinishedAt = *dbxTransferQueue.FinishedAt + item.FinishedAt = dbxTransferQueue.FinishedAt } return item, nil diff --git a/satellite/satellitedb/locked.go b/satellite/satellitedb/locked.go index 5e6dba1da..9a8367de4 100644 --- a/satellite/satellitedb/locked.go +++ b/satellite/satellitedb/locked.go @@ -665,13 +665,27 @@ func (m *lockedGracefulExit) Enqueue(ctx context.Context, items []gracefulexit.T return m.db.Enqueue(ctx, items) } -// GetIncomplete gets incomplete graceful exit transfer queue entries ordered by the queued date ascending. +// GetIncomplete gets incomplete graceful exit transfer queue entries ordered by durability ratio and queued date ascending. func (m *lockedGracefulExit) GetIncomplete(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) ([]*gracefulexit.TransferQueueItem, error) { m.Lock() defer m.Unlock() return m.db.GetIncomplete(ctx, nodeID, limit, offset) } +// GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries that have failed <= maxFailures times, ordered by durability ratio and queued date ascending. +func (m *lockedGracefulExit) GetIncompleteFailed(ctx context.Context, nodeID storj.NodeID, maxFailures int, limit int, offset int64) ([]*gracefulexit.TransferQueueItem, error) { + m.Lock() + defer m.Unlock() + return m.db.GetIncompleteFailed(ctx, nodeID, maxFailures, limit, offset) +} + +// GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries in the database ordered by durability ratio and queued date ascending. +func (m *lockedGracefulExit) GetIncompleteNotFailed(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) ([]*gracefulexit.TransferQueueItem, error) { + m.Lock() + defer m.Unlock() + return m.db.GetIncompleteNotFailed(ctx, nodeID, limit, offset) +} + // GetProgress gets a graceful exit progress entry. func (m *lockedGracefulExit) GetProgress(ctx context.Context, nodeID storj.NodeID) (*gracefulexit.Progress, error) { m.Lock() @@ -872,6 +886,12 @@ func (m *lockedOverlayCache) Get(ctx context.Context, nodeID storj.NodeID) (*ove return m.db.Get(ctx, nodeID) } +func (m *lockedOverlayCache) GetExitStatus(ctx context.Context, nodeID storj.NodeID) (exitStatus *overlay.ExitStatus, err error) { + m.Lock() + defer m.Unlock() + return m.db.GetExitStatus(ctx, nodeID) +} + // GetExitingNodes returns nodes who have initiated a graceful exit, but have not completed it. func (m *lockedOverlayCache) GetExitingNodes(ctx context.Context) (exitingNodes storj.NodeIDList, err error) { m.Lock() diff --git a/satellite/satellitedb/overlaycache.go b/satellite/satellitedb/overlaycache.go index 6f5330d3d..ac87373a3 100644 --- a/satellite/satellitedb/overlaycache.go +++ b/satellite/satellitedb/overlaycache.go @@ -1022,6 +1022,23 @@ func (cache *overlaycache) GetExitingNodesLoopIncomplete(ctx context.Context) (e } return exitingNodes, nil } +func (cache *overlaycache) GetExitStatus(ctx context.Context, nodeID storj.NodeID) (_ *overlay.ExitStatus, err error) { + defer mon.Task()(&ctx)(&err) + + rows, err := cache.db.Query(cache.db.Rebind("select id, exit_initiated_at, exit_loop_completed_at, exit_finished_at from nodes where id = ?"), nodeID) + if err != nil { + return nil, Error.Wrap(err) + } + defer func() { + err = errs.Combine(err, rows.Close()) + }() + exitStatus := &overlay.ExitStatus{} + if rows.Next() { + err = rows.Scan(&exitStatus.NodeID, &exitStatus.ExitInitiatedAt, &exitStatus.ExitLoopCompletedAt, &exitStatus.ExitFinishedAt) + } + + return exitStatus, Error.Wrap(err) +} // UpdateExitStatus is used to update a node's graceful exit status. func (cache *overlaycache) UpdateExitStatus(ctx context.Context, request *overlay.ExitStatusRequest) (stats *overlay.NodeStats, err error) { @@ -1080,6 +1097,11 @@ func convertDBNode(ctx context.Context, info *dbx.Node) (_ *overlay.NodeDossier, Patch: info.Patch, } + exitStatus := overlay.ExitStatus{NodeID: id} + exitStatus.ExitInitiatedAt = info.ExitInitiatedAt + exitStatus.ExitLoopCompletedAt = info.ExitLoopCompletedAt + exitStatus.ExitFinishedAt = info.ExitFinishedAt + node := &overlay.NodeDossier{ Node: pb.Node{ Id: id, @@ -1108,6 +1130,7 @@ func convertDBNode(ctx context.Context, info *dbx.Node) (_ *overlay.NodeDossier, Contained: info.Contained, Disqualified: info.Disqualified, PieceCount: info.PieceCount, + ExitStatus: exitStatus, } return node, nil diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index 2168da16f..3e8ddcee1 100644 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -124,6 +124,12 @@ contact.external-address: "" # how often to run the transfer queue chore. # graceful-exit.chore-interval: 30s +# size of the buffer used to batch transfer queue reads and sends to the storage node. +# graceful-exit.endpoint-batch-size: 100 + +# maximum number of transfer failures per piece. +# graceful-exit.endpoint-max-failures: 3 + # path to the certificate chain for this identity identity.cert-path: /root/.local/share/storj/identity/satellite/identity.cert