continue agreement sender loop on error (#1020)

* continue agreement sender loop on error
* added REJECTED grpc status
This commit is contained in:
Bill Thorp 2019-01-11 13:15:49 -05:00 committed by GitHub
parent 10005d869c
commit 273c3924ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 38 additions and 31 deletions

View File

@ -59,7 +59,7 @@ func (s *Server) BandwidthAgreements(ctx context.Context, ba *pb.RenterBandwidth
s.logger.Debug("Received Agreement...")
reply = &pb.AgreementsSummary{
Status: pb.AgreementsSummary_FAIL,
Status: pb.AgreementsSummary_REJECTED,
}
// storagenode signature is empty
@ -106,13 +106,12 @@ func (s *Server) BandwidthAgreements(ctx context.Context, ba *pb.RenterBandwidth
})
if err != nil {
//todo: better classify transport errors (AgreementsSummary_FAIL) vs logical (AgreementsSummary_REJECTED)
return reply, BwAgreementError.New("SerialNumber already exists in the PayerBandwidthAllocation")
}
reply.Status = pb.AgreementsSummary_OK
s.logger.Debug("Stored Agreement...")
return reply, nil
}

View File

@ -68,14 +68,14 @@ func TestSameSerialNumberBandwidthAgreements(t *testing.T) {
reply, err = server.BandwidthAgreements(ctx, rbaNode1)
assert.EqualError(t, err, "bwagreement error: SerialNumber already exists in the PayerBandwidthAllocation")
assert.Equal(t, pb.AgreementsSummary_FAIL, reply.Status)
assert.Equal(t, pb.AgreementsSummary_REJECTED, reply.Status)
/* Storage nodes can't submit the same bwagreement twice.
This test is kind of duplicate cause it will most likely trigger the same sequence error.
For safety we will try it anyway to make sure nothing strange will happen */
reply, err = server.BandwidthAgreements(ctx, rbaNode2)
assert.EqualError(t, err, "bwagreement error: SerialNumber already exists in the PayerBandwidthAllocation")
assert.Equal(t, pb.AgreementsSummary_FAIL, reply.Status)
assert.Equal(t, pb.AgreementsSummary_REJECTED, reply.Status)
})
}
@ -107,7 +107,7 @@ func TestManipulatedBandwidthAgreements(t *testing.T) {
reply, err = server.BandwidthAgreements(ctx, rba)
assert.Error(t, err)
assert.Equal(t, pb.AgreementsSummary_FAIL, reply.Status)
assert.Equal(t, pb.AgreementsSummary_REJECTED, reply.Status)
/* Todo: Add more tests for bwagreement manipulations
@ -143,7 +143,7 @@ func TestInvalidBandwidthAgreements(t *testing.T) {
reply, err = server.BandwidthAgreements(ctx, rba)
assert.EqualError(t, err, "bwagreement error: Invalid Renter's Signature Length")
assert.Equal(t, pb.AgreementsSummary_FAIL, reply.Status)
assert.Equal(t, pb.AgreementsSummary_REJECTED, reply.Status)
})
}

View File

@ -26,24 +26,27 @@ const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
type AgreementsSummary_Status int32
const (
AgreementsSummary_FAIL AgreementsSummary_Status = 0
AgreementsSummary_OK AgreementsSummary_Status = 1
AgreementsSummary_FAIL AgreementsSummary_Status = 0
AgreementsSummary_OK AgreementsSummary_Status = 1
AgreementsSummary_REJECTED AgreementsSummary_Status = 2
)
var AgreementsSummary_Status_name = map[int32]string{
0: "FAIL",
1: "OK",
2: "REJECTED",
}
var AgreementsSummary_Status_value = map[string]int32{
"FAIL": 0,
"OK": 1,
"FAIL": 0,
"OK": 1,
"REJECTED": 2,
}
func (x AgreementsSummary_Status) String() string {
return proto.EnumName(AgreementsSummary_Status_name, int32(x))
}
func (AgreementsSummary_Status) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_bandwidth_01db992f91c47bae, []int{0, 0}
return fileDescriptor_bandwidth_1560d6d92a271e58, []int{0, 0}
}
type AgreementsSummary struct {
@ -57,7 +60,7 @@ func (m *AgreementsSummary) Reset() { *m = AgreementsSummary{} }
func (m *AgreementsSummary) String() string { return proto.CompactTextString(m) }
func (*AgreementsSummary) ProtoMessage() {}
func (*AgreementsSummary) Descriptor() ([]byte, []int) {
return fileDescriptor_bandwidth_01db992f91c47bae, []int{0}
return fileDescriptor_bandwidth_1560d6d92a271e58, []int{0}
}
func (m *AgreementsSummary) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_AgreementsSummary.Unmarshal(m, b)
@ -161,21 +164,22 @@ var _Bandwidth_serviceDesc = grpc.ServiceDesc{
Metadata: "bandwidth.proto",
}
func init() { proto.RegisterFile("bandwidth.proto", fileDescriptor_bandwidth_01db992f91c47bae) }
func init() { proto.RegisterFile("bandwidth.proto", fileDescriptor_bandwidth_1560d6d92a271e58) }
var fileDescriptor_bandwidth_01db992f91c47bae = []byte{
// 196 bytes of a gzipped FileDescriptorProto
var fileDescriptor_bandwidth_1560d6d92a271e58 = []byte{
// 210 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4f, 0x4a, 0xcc, 0x4b,
0x29, 0xcf, 0x4c, 0x29, 0xc9, 0xd0, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x84, 0x0b, 0x48,
0x09, 0x14, 0x64, 0xa6, 0x26, 0xa7, 0x16, 0x97, 0xe4, 0x17, 0xa5, 0x42, 0x24, 0x95, 0x72, 0xb8,
0x09, 0x14, 0x64, 0xa6, 0x26, 0xa7, 0x16, 0x97, 0xe4, 0x17, 0xa5, 0x42, 0x24, 0x95, 0xaa, 0xb8,
0x04, 0x1d, 0xd3, 0x8b, 0x52, 0x53, 0x73, 0x53, 0xf3, 0x4a, 0x8a, 0x83, 0x4b, 0x73, 0x73, 0x13,
0x8b, 0x2a, 0x85, 0xac, 0xb9, 0xd8, 0x8a, 0x4b, 0x12, 0x4b, 0x4a, 0x8b, 0x25, 0x18, 0x15, 0x18,
0x35, 0xf8, 0x8c, 0x94, 0xf5, 0x10, 0x66, 0x62, 0xa8, 0xd6, 0x0b, 0x06, 0x2b, 0x0d, 0x82, 0x6a,
0x51, 0x92, 0xe2, 0x62, 0x83, 0x88, 0x08, 0x71, 0x70, 0xb1, 0xb8, 0x39, 0x7a, 0xfa, 0x08, 0x30,
0x08, 0xb1, 0x71, 0x31, 0xf9, 0x7b, 0x0b, 0x30, 0x1a, 0xe5, 0x73, 0x71, 0x3a, 0xc1, 0x4c, 0x12,
0x4a, 0xe2, 0x12, 0x86, 0x73, 0x10, 0xa6, 0x0a, 0x69, 0xeb, 0x21, 0x1c, 0x59, 0x94, 0x5f, 0x5a,
0x92, 0x5a, 0xac, 0x17, 0x94, 0x9a, 0x57, 0x92, 0x5a, 0x84, 0x50, 0x9c, 0x93, 0x93, 0x9f, 0x9c,
0x58, 0x92, 0x99, 0x9f, 0x27, 0x25, 0x83, 0xcf, 0x65, 0x4a, 0x0c, 0x4e, 0x2c, 0x51, 0x4c, 0x05,
0x49, 0x49, 0x6c, 0x60, 0xbf, 0x1a, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0xdc, 0x38, 0x8e, 0x59,
0x1b, 0x01, 0x00, 0x00,
0x51, 0xd2, 0xe0, 0x62, 0x83, 0x88, 0x08, 0x71, 0x70, 0xb1, 0xb8, 0x39, 0x7a, 0xfa, 0x08, 0x30,
0x08, 0xb1, 0x71, 0x31, 0xf9, 0x7b, 0x0b, 0x30, 0x0a, 0xf1, 0x70, 0x71, 0x04, 0xb9, 0x7a, 0xb9,
0x3a, 0x87, 0xb8, 0xba, 0x08, 0x30, 0x19, 0xe5, 0x73, 0x71, 0x3a, 0xc1, 0xcc, 0x15, 0x4a, 0xe2,
0x12, 0x86, 0x73, 0x10, 0x76, 0x08, 0x69, 0xeb, 0x21, 0x9c, 0x5c, 0x94, 0x5f, 0x5a, 0x92, 0x5a,
0xac, 0x17, 0x94, 0x9a, 0x57, 0x92, 0x5a, 0x84, 0x50, 0x9c, 0x93, 0x93, 0x9f, 0x9c, 0x58, 0x92,
0x99, 0x9f, 0x27, 0x25, 0x83, 0xcf, 0x9d, 0x4a, 0x0c, 0x4e, 0x2c, 0x51, 0x4c, 0x05, 0x49, 0x49,
0x6c, 0x60, 0x9f, 0x1b, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0x2a, 0x6a, 0xd9, 0xfd, 0x29, 0x01,
0x00, 0x00,
}

View File

@ -16,6 +16,7 @@ message AgreementsSummary {
enum Status {
FAIL = 0;
OK = 1;
REJECTED = 2;
}
Status status = 1;

View File

@ -67,23 +67,24 @@ func (as *AgreementSender) sendAgreementsToSatellite(ctx context.Context, satID
// Get satellite ip from kademlia
satellite, err := as.kad.FindNode(ctx, satID)
if err != nil {
as.log.Error("Agreementsender could not find satellite", zap.Error(err))
as.log.Warn("Agreementsender could not find satellite", zap.Error(err))
return
}
// Create client from satellite ip
conn, err := as.transport.DialNode(ctx, &satellite)
if err != nil {
as.log.Error("Agreementsender could not dial satellite", zap.Error(err))
as.log.Warn("Agreementsender could not dial satellite", zap.Error(err))
return
}
client := pb.NewBandwidthClient(conn)
defer func() {
err := conn.Close()
if err != nil {
as.log.Error("Agreementsender failed to close connection", zap.Error(err))
as.log.Warn("Agreementsender failed to close connection", zap.Error(err))
}
}()
//todo: stop sending these one-by-one, send all at once
for _, agreement := range agreements {
msg := &pb.RenterBandwidthAllocation{
Data: agreement.Agreement,
@ -91,14 +92,16 @@ func (as *AgreementSender) sendAgreementsToSatellite(ctx context.Context, satID
}
// Send agreement to satellite
r, err := client.BandwidthAgreements(ctx, msg)
if err != nil || r.GetStatus() != pb.AgreementsSummary_OK {
as.log.Error("Agreementsender failed to send agreement to satellite", zap.Error(err))
return
if err != nil || r.GetStatus() == pb.AgreementsSummary_FAIL {
as.log.Warn("Agreementsender failed to send agreement to satellite : will retry", zap.Error(err))
continue
} else if r.GetStatus() != pb.AgreementsSummary_REJECTED {
//todo: something better than a delete here?
as.log.Error("Agreementsender had agreement explicitly rejected by satellite : will delete", zap.Error(err))
}
// Delete from PSDB by signature
if err = as.DB.DeleteBandwidthAllocationBySignature(agreement.Signature); err != nil {
as.log.Error("Agreementsender failed to delete bandwidth allocation", zap.Error(err))
return
}
}
}