satellite/reputation: don't need 3 identical AuditHistory types

The two protobuf types are identical except that one is in our common/pb
package, and the other is in internalpb. Since the type is public
already, and there is no difference in the internal one, it seems better
to use the public one for all satellite needs.

There is also another type which is essentially identical, but which is
not a protobuf type, also called "AuditHistory". It looks like we don't
ever actually need to have a separate type from the protobuf one.

This change makes us use "storj/common/pb".AuditHistory for all of our
AuditHistory needs.

Refs: https://github.com/storj/storj/issues/4601

Change-Id: If845fde21bb31c801db6d67ffc9a146d1617b991
This commit is contained in:
paul cannon 2022-05-04 16:12:01 -05:00 committed by Jeff Wendling
parent 87f6a3dcda
commit 2ee9463e34
8 changed files with 29 additions and 237 deletions

View File

@ -1,153 +0,0 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: audithistory.proto
package internalpb
import (
fmt "fmt"
math "math"
time "time"
proto "github.com/gogo/protobuf/proto"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
var _ = time.Kitchen
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
type AuditHistory struct {
Windows []*AuditWindow `protobuf:"bytes,1,rep,name=windows,proto3" json:"windows,omitempty"`
Score float64 `protobuf:"fixed64,2,opt,name=score,proto3" json:"score,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *AuditHistory) Reset() { *m = AuditHistory{} }
func (m *AuditHistory) String() string { return proto.CompactTextString(m) }
func (*AuditHistory) ProtoMessage() {}
func (*AuditHistory) Descriptor() ([]byte, []int) {
return fileDescriptor_2ab8e94de62e54ec, []int{0}
}
func (m *AuditHistory) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_AuditHistory.Unmarshal(m, b)
}
func (m *AuditHistory) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_AuditHistory.Marshal(b, m, deterministic)
}
func (m *AuditHistory) XXX_Merge(src proto.Message) {
xxx_messageInfo_AuditHistory.Merge(m, src)
}
func (m *AuditHistory) XXX_Size() int {
return xxx_messageInfo_AuditHistory.Size(m)
}
func (m *AuditHistory) XXX_DiscardUnknown() {
xxx_messageInfo_AuditHistory.DiscardUnknown(m)
}
var xxx_messageInfo_AuditHistory proto.InternalMessageInfo
func (m *AuditHistory) GetWindows() []*AuditWindow {
if m != nil {
return m.Windows
}
return nil
}
func (m *AuditHistory) GetScore() float64 {
if m != nil {
return m.Score
}
return 0
}
type AuditWindow struct {
WindowStart time.Time `protobuf:"bytes,1,opt,name=window_start,json=windowStart,proto3,stdtime" json:"window_start"`
OnlineCount int32 `protobuf:"varint,2,opt,name=online_count,json=onlineCount,proto3" json:"online_count,omitempty"`
TotalCount int32 `protobuf:"varint,3,opt,name=total_count,json=totalCount,proto3" json:"total_count,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *AuditWindow) Reset() { *m = AuditWindow{} }
func (m *AuditWindow) String() string { return proto.CompactTextString(m) }
func (*AuditWindow) ProtoMessage() {}
func (*AuditWindow) Descriptor() ([]byte, []int) {
return fileDescriptor_2ab8e94de62e54ec, []int{1}
}
func (m *AuditWindow) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_AuditWindow.Unmarshal(m, b)
}
func (m *AuditWindow) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_AuditWindow.Marshal(b, m, deterministic)
}
func (m *AuditWindow) XXX_Merge(src proto.Message) {
xxx_messageInfo_AuditWindow.Merge(m, src)
}
func (m *AuditWindow) XXX_Size() int {
return xxx_messageInfo_AuditWindow.Size(m)
}
func (m *AuditWindow) XXX_DiscardUnknown() {
xxx_messageInfo_AuditWindow.DiscardUnknown(m)
}
var xxx_messageInfo_AuditWindow proto.InternalMessageInfo
func (m *AuditWindow) GetWindowStart() time.Time {
if m != nil {
return m.WindowStart
}
return time.Time{}
}
func (m *AuditWindow) GetOnlineCount() int32 {
if m != nil {
return m.OnlineCount
}
return 0
}
func (m *AuditWindow) GetTotalCount() int32 {
if m != nil {
return m.TotalCount
}
return 0
}
func init() {
proto.RegisterType((*AuditHistory)(nil), "satellite.audithistory.AuditHistory")
proto.RegisterType((*AuditWindow)(nil), "satellite.audithistory.AuditWindow")
}
func init() { proto.RegisterFile("audithistory.proto", fileDescriptor_2ab8e94de62e54ec) }
var fileDescriptor_2ab8e94de62e54ec = []byte{
// 274 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x8f, 0xc1, 0x4a, 0xc4, 0x30,
0x10, 0x86, 0x8d, 0xcb, 0xaa, 0x24, 0x7b, 0x0a, 0x22, 0xa5, 0x97, 0xd6, 0x5d, 0x84, 0x9e, 0x52,
0x58, 0xcf, 0x1e, 0x5c, 0x0f, 0x7a, 0xae, 0x82, 0xe0, 0x65, 0x49, 0xbb, 0xb1, 0x46, 0xb2, 0x99,
0x92, 0x4c, 0x59, 0x7c, 0x0b, 0xcf, 0x3e, 0x91, 0x4f, 0xa1, 0xaf, 0x22, 0x4d, 0xac, 0xec, 0xc1,
0x5b, 0xe6, 0xcf, 0xf7, 0x31, 0xff, 0x50, 0x2e, 0xfb, 0x8d, 0xc6, 0x17, 0xed, 0x11, 0xdc, 0x9b,
0xe8, 0x1c, 0x20, 0xf0, 0x33, 0x2f, 0x51, 0x19, 0xa3, 0x51, 0x89, 0xfd, 0xdf, 0x94, 0xb6, 0xd0,
0x42, 0x64, 0xd2, 0xac, 0x05, 0x68, 0x8d, 0x2a, 0xc3, 0x54, 0xf7, 0xcf, 0x25, 0xea, 0xad, 0xf2,
0x28, 0xb7, 0x5d, 0x04, 0xe6, 0x0d, 0x9d, 0x5d, 0x0f, 0xf2, 0x5d, 0x94, 0xf9, 0x15, 0x3d, 0xde,
0x69, 0xbb, 0x81, 0x9d, 0x4f, 0x48, 0x3e, 0x29, 0xd8, 0x72, 0x21, 0xfe, 0x5f, 0x23, 0x82, 0xf6,
0x18, 0xd8, 0x6a, 0x74, 0xf8, 0x29, 0x9d, 0xfa, 0x06, 0x9c, 0x4a, 0x0e, 0x73, 0x52, 0x90, 0x2a,
0x0e, 0xf3, 0x0f, 0x42, 0xd9, 0x1e, 0xce, 0x6f, 0xe9, 0x2c, 0x0a, 0x6b, 0x8f, 0xd2, 0x61, 0x42,
0x72, 0x52, 0xb0, 0x65, 0x2a, 0x62, 0x59, 0x31, 0x96, 0x15, 0x0f, 0x63, 0xd9, 0xd5, 0xc9, 0xe7,
0x57, 0x76, 0xf0, 0xfe, 0x9d, 0x91, 0x8a, 0x45, 0xf3, 0x7e, 0x10, 0xf9, 0x39, 0x9d, 0x81, 0x35,
0xda, 0xaa, 0x75, 0x03, 0xbd, 0xc5, 0xb0, 0x75, 0x5a, 0xb1, 0x98, 0xdd, 0x0c, 0x11, 0xcf, 0x28,
0x43, 0x40, 0x69, 0x7e, 0x89, 0x49, 0x20, 0x68, 0x88, 0x02, 0xb0, 0xba, 0x78, 0x5a, 0x0c, 0x07,
0xbd, 0x0a, 0x0d, 0x65, 0x78, 0x94, 0x7f, 0x07, 0x97, 0xda, 0xa2, 0x72, 0x56, 0x9a, 0xae, 0xae,
0x8f, 0x42, 0xab, 0xcb, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xbc, 0xff, 0x7f, 0x76, 0x8a, 0x01,
0x00, 0x00,
}

View File

@ -1,21 +0,0 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
syntax = "proto3";
option go_package = "storj.io/storj/satellite/internalpb";
package satellite.audithistory;
import "gogo.proto";
import "google/protobuf/timestamp.proto";
message AuditHistory {
repeated AuditWindow windows = 1;
double score = 2;
}
message AuditWindow {
google.protobuf.Timestamp window_start = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
int32 online_count = 2;
int32 total_count = 3;
}

View File

@ -93,7 +93,7 @@ func (e *Endpoint) GetStats(ctx context.Context, req *pb.GetStatsRequest) (_ *pb
OfflineSuspended: node.OfflineSuspended, OfflineSuspended: node.OfflineSuspended,
OfflineUnderReview: reputationInfo.UnderReview, OfflineUnderReview: reputationInfo.UnderReview,
VettedAt: node.Reputation.Status.VettedAt, VettedAt: node.Reputation.Status.VettedAt,
AuditHistory: reputation.AuditHistoryToPB(reputationInfo.AuditHistory), AuditHistory: reputation.DuplicateAuditHistory(reputationInfo.AuditHistory),
JoinedAt: node.CreatedAt, JoinedAt: node.CreatedAt,
}, nil }, nil
} }

View File

@ -7,15 +7,8 @@ import (
"time" "time"
"storj.io/common/pb" "storj.io/common/pb"
"storj.io/storj/satellite/internalpb"
) )
// AuditHistory represents a node's audit history for the most recent tracking period.
type AuditHistory struct {
Score float64
Windows []*AuditWindow
}
// UpdateAuditHistoryResponse contains information returned by UpdateAuditHistory. // UpdateAuditHistoryResponse contains information returned by UpdateAuditHistory.
type UpdateAuditHistoryResponse struct { type UpdateAuditHistoryResponse struct {
NewScore float64 NewScore float64
@ -23,52 +16,26 @@ type UpdateAuditHistoryResponse struct {
History []byte History []byte
} }
// AuditWindow represents the number of online and total audits a node received for a specific time period. // DuplicateAuditHistory creates a duplicate (deep copy) of an AuditHistory object.
type AuditWindow struct { func DuplicateAuditHistory(auditHistory *pb.AuditHistory) *pb.AuditHistory {
WindowStart time.Time // argument is not a pointer type, so auditHistory is already a copy.
TotalCount int32 // Just need to copy the slice.
OnlineCount int32 if auditHistory == nil {
return nil
} }
windows := make([]*pb.AuditWindow, len(auditHistory.Windows))
// AuditHistoryToPB converts an overlay.AuditHistory to a pb.AuditHistory. for i := range windows {
func AuditHistoryToPB(auditHistory AuditHistory) (historyPB *pb.AuditHistory) { windows[i] = &pb.AuditWindow{}
historyPB = &pb.AuditHistory{ *windows[i] = *auditHistory.Windows[i]
Score: auditHistory.Score,
Windows: make([]*pb.AuditWindow, len(auditHistory.Windows)),
} }
for i, window := range auditHistory.Windows { auditHistory.Windows = windows
historyPB.Windows[i] = &pb.AuditWindow{ return auditHistory
TotalCount: window.TotalCount,
OnlineCount: window.OnlineCount,
WindowStart: window.WindowStart,
}
}
return historyPB
}
// AuditHistoryFromBytes returns the corresponding AuditHistory for the given
// encoded internalpb.AuditHistory.
func AuditHistoryFromBytes(encodedHistory []byte) (history AuditHistory, err error) {
var pbHistory internalpb.AuditHistory
if err := pb.Unmarshal(encodedHistory, &pbHistory); err != nil {
return AuditHistory{}, err
}
history.Score = pbHistory.Score
history.Windows = make([]*AuditWindow, len(pbHistory.Windows))
for i, window := range pbHistory.Windows {
history.Windows[i] = &AuditWindow{
TotalCount: window.TotalCount,
OnlineCount: window.OnlineCount,
WindowStart: window.WindowStart,
}
}
return history, nil
} }
// AddAuditToHistory adds a single online/not-online event to an AuditHistory. // AddAuditToHistory adds a single online/not-online event to an AuditHistory.
// If the AuditHistory contains windows that are now outside the tracking // If the AuditHistory contains windows that are now outside the tracking
// period, those windows will be trimmed. // period, those windows will be trimmed.
func AddAuditToHistory(a *internalpb.AuditHistory, online bool, auditTime time.Time, config AuditHistoryConfig) error { func AddAuditToHistory(a *pb.AuditHistory, online bool, auditTime time.Time, config AuditHistoryConfig) error {
newAuditWindowStartTime := auditTime.Truncate(config.WindowSize) newAuditWindowStartTime := auditTime.Truncate(config.WindowSize)
earliestWindow := newAuditWindowStartTime.Add(-config.TrackingPeriod) earliestWindow := newAuditWindowStartTime.Add(-config.TrackingPeriod)
// windowsModified is used to determine whether we will need to recalculate the score because windows have been added or removed. // windowsModified is used to determine whether we will need to recalculate the score because windows have been added or removed.
@ -90,7 +57,7 @@ func AddAuditToHistory(a *internalpb.AuditHistory, online bool, auditTime time.T
// if there are no windows or the latest window has passed, add another window // if there are no windows or the latest window has passed, add another window
if len(a.Windows) == 0 || a.Windows[len(a.Windows)-1].WindowStart.Before(newAuditWindowStartTime) { if len(a.Windows) == 0 || a.Windows[len(a.Windows)-1].WindowStart.Before(newAuditWindowStartTime) {
windowsModified = true windowsModified = true
a.Windows = append(a.Windows, &internalpb.AuditWindow{WindowStart: newAuditWindowStartTime}) a.Windows = append(a.Windows, &pb.AuditWindow{WindowStart: newAuditWindowStartTime})
} }
latestIndex := len(a.Windows) - 1 latestIndex := len(a.Windows) - 1

View File

@ -9,7 +9,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"storj.io/storj/satellite/internalpb" "storj.io/common/pb"
"storj.io/storj/satellite/reputation" "storj.io/storj/satellite/reputation"
) )
@ -27,7 +27,7 @@ func TestAddAuditToHistory(t *testing.T) {
windowsInTrackingPeriod := int(config.TrackingPeriod.Seconds() / config.WindowSize.Seconds()) windowsInTrackingPeriod := int(config.TrackingPeriod.Seconds() / config.WindowSize.Seconds())
currentWindow := startingWindow currentWindow := startingWindow
history := &internalpb.AuditHistory{} history := &pb.AuditHistory{}
// online score should be 1 until the first window is finished // online score should be 1 until the first window is finished
err := reputation.AddAuditToHistory(history, false, currentWindow.Add(2*time.Minute), config) err := reputation.AddAuditToHistory(history, false, currentWindow.Add(2*time.Minute), config)

View File

@ -9,6 +9,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"storj.io/common/pb"
"storj.io/common/storj" "storj.io/common/storj"
"storj.io/storj/satellite/overlay" "storj.io/storj/satellite/overlay"
) )
@ -37,7 +38,7 @@ type Info struct {
Disqualified *time.Time Disqualified *time.Time
DisqualificationReason overlay.DisqualificationReason DisqualificationReason overlay.DisqualificationReason
OnlineScore float64 OnlineScore float64
AuditHistory AuditHistory AuditHistory *pb.AuditHistory
AuditReputationAlpha float64 AuditReputationAlpha float64
AuditReputationBeta float64 AuditReputationBeta float64
UnknownAuditReputationAlpha float64 UnknownAuditReputationAlpha float64

View File

@ -8,7 +8,6 @@ import (
"time" "time"
"storj.io/common/pb" "storj.io/common/pb"
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/reputation" "storj.io/storj/satellite/reputation"
) )
@ -21,7 +20,7 @@ func updateAuditHistory(ctx context.Context, oldHistory []byte, config reputatio
} }
// deserialize node audit history // deserialize node audit history
history := &internalpb.AuditHistory{} history := &pb.AuditHistory{}
err = pb.Unmarshal(oldHistory, history) err = pb.Unmarshal(oldHistory, history)
if err != nil { if err != nil {
return res, err return res, err

View File

@ -14,7 +14,6 @@ import (
"storj.io/common/pb" "storj.io/common/pb"
"storj.io/common/storj" "storj.io/common/storj"
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/overlay" "storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/reputation" "storj.io/storj/satellite/reputation"
"storj.io/storj/satellite/satellitedb/dbx" "storj.io/storj/satellite/satellitedb/dbx"
@ -46,7 +45,7 @@ func (reputations *reputations) Update(ctx context.Context, updateReq reputation
// if this is a new node, we will insert a new entry into the table // if this is a new node, we will insert a new entry into the table
if dbNode == nil { if dbNode == nil {
historyBytes, err := pb.Marshal(&internalpb.AuditHistory{}) historyBytes, err := pb.Marshal(&pb.AuditHistory{})
if err != nil { if err != nil {
return nil, Error.Wrap(err) return nil, Error.Wrap(err)
} }
@ -126,8 +125,8 @@ func (reputations *reputations) Get(ctx context.Context, nodeID storj.NodeID) (*
return nil, Error.Wrap(err) return nil, Error.Wrap(err)
} }
history, err := reputation.AuditHistoryFromBytes(res.AuditHistory) history := &pb.AuditHistory{}
if err != nil { if err := pb.Unmarshal(res.AuditHistory, history); err != nil {
return nil, Error.Wrap(err) return nil, Error.Wrap(err)
} }
var dqReason overlay.DisqualificationReason var dqReason overlay.DisqualificationReason
@ -165,7 +164,7 @@ func (reputations *reputations) DisqualifyNode(ctx context.Context, nodeID storj
_, err = tx.Get_Reputation_By_Id(ctx, dbx.Reputation_Id(nodeID.Bytes())) _, err = tx.Get_Reputation_By_Id(ctx, dbx.Reputation_Id(nodeID.Bytes()))
if errors.Is(err, sql.ErrNoRows) { if errors.Is(err, sql.ErrNoRows) {
historyBytes, err := pb.Marshal(&internalpb.AuditHistory{}) historyBytes, err := pb.Marshal(&pb.AuditHistory{})
if err != nil { if err != nil {
return err return err
} }
@ -204,7 +203,7 @@ func (reputations *reputations) SuspendNodeUnknownAudit(ctx context.Context, nod
_, err = tx.Get_Reputation_By_Id(ctx, dbx.Reputation_Id(nodeID.Bytes())) _, err = tx.Get_Reputation_By_Id(ctx, dbx.Reputation_Id(nodeID.Bytes()))
if errors.Is(err, sql.ErrNoRows) { if errors.Is(err, sql.ErrNoRows) {
historyBytes, err := pb.Marshal(&internalpb.AuditHistory{}) historyBytes, err := pb.Marshal(&pb.AuditHistory{})
if err != nil { if err != nil {
return err return err
} }
@ -242,7 +241,7 @@ func (reputations *reputations) UnsuspendNodeUnknownAudit(ctx context.Context, n
_, err = tx.Get_Reputation_By_Id(ctx, dbx.Reputation_Id(nodeID.Bytes())) _, err = tx.Get_Reputation_By_Id(ctx, dbx.Reputation_Id(nodeID.Bytes()))
if errors.Is(err, sql.ErrNoRows) { if errors.Is(err, sql.ErrNoRows) {
historyBytes, err := pb.Marshal(&internalpb.AuditHistory{}) historyBytes, err := pb.Marshal(&pb.AuditHistory{})
if err != nil { if err != nil {
return err return err
} }
@ -327,6 +326,7 @@ func (reputations *reputations) populateCreateFields(update updateNodeStats) dbx
return createFields return createFields
} }
func (reputations *reputations) populateUpdateFields(update updateNodeStats, history []byte) dbx.Reputation_Update_Fields { func (reputations *reputations) populateUpdateFields(update updateNodeStats, history []byte) dbx.Reputation_Update_Fields {
updateFields := dbx.Reputation_Update_Fields{ updateFields := dbx.Reputation_Update_Fields{
AuditHistory: dbx.Reputation_AuditHistory(history), AuditHistory: dbx.Reputation_AuditHistory(history),
@ -643,11 +643,10 @@ func dbxToReputationInfo(dbNode *dbx.Reputation) (reputation.Info, error) {
info.DisqualificationReason = overlay.DisqualificationReason(*dbNode.DisqualificationReason) info.DisqualificationReason = overlay.DisqualificationReason(*dbNode.DisqualificationReason)
} }
if dbNode.AuditHistory != nil { if dbNode.AuditHistory != nil {
auditHistory, err := reputation.AuditHistoryFromBytes(dbNode.AuditHistory) info.AuditHistory = &pb.AuditHistory{}
if err != nil { if err := pb.Unmarshal(dbNode.AuditHistory, info.AuditHistory); err != nil {
return info, err return info, err
} }
info.AuditHistory = auditHistory
} }
return info, nil return info, nil
} }