diff --git a/internal/date/utils.go b/internal/date/utils.go index 067c18c35..ae7f5f90e 100644 --- a/internal/date/utils.go +++ b/internal/date/utils.go @@ -6,20 +6,15 @@ package date import "time" -// MonthBoundary return first and last day of current month -func MonthBoundary() (firstDay, lastDay time.Time) { - now := time.Now() - currentYear, currentMonth, _ := now.Date() - currentLocation := now.Location() - - firstDay = time.Date(currentYear, currentMonth, 1, 0, 0, 0, 0, currentLocation) - lastDay = firstDay.AddDate(0, 1, -1) - - return +// MonthBoundary extract month from the provided date and returns its edges +func MonthBoundary(t time.Time) (time.Time, time.Time) { + startDate := time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, t.Location()) + endDate := time.Date(t.Year(), t.Month()+1, 1, 0, 0, 0, -1, t.Location()) + return startDate, endDate } // DayBoundary returns start and end of the provided day func DayBoundary(t time.Time) (time.Time, time.Time) { - return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC), - time.Date(t.Year(), t.Month(), t.Day()+1, 0, 0, 0, -1, time.UTC) + return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()), + time.Date(t.Year(), t.Month(), t.Day()+1, 0, 0, 0, -1, t.Location()) } diff --git a/internal/date/utils_test.go b/internal/date/utils_test.go new file mode 100644 index 000000000..d4e0d8f76 --- /dev/null +++ b/internal/date/utils_test.go @@ -0,0 +1,29 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package date_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "storj.io/storj/internal/date" +) + +func TestMonthBoundary(t *testing.T) { + now := time.Now().UTC() + + start, end := date.MonthBoundary(now) + assert.Equal(t, start, time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, time.UTC)) + assert.Equal(t, end, time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, -1, time.UTC)) +} + +func TestDayBoundary(t *testing.T) { + now := time.Now().UTC() + + start, end := date.DayBoundary(now) + assert.Equal(t, start, time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)) + assert.Equal(t, end, time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, -1, time.UTC)) +} diff --git a/internal/dbutil/nulltime.go b/internal/dbutil/nulltime.go new file mode 100644 index 000000000..4f6e392e5 --- /dev/null +++ b/internal/dbutil/nulltime.go @@ -0,0 +1,50 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package dbutil + +import ( + "database/sql/driver" + "time" +) + +const ( + sqliteTimeLayout = "2006-01-02 15:04:05-07:00" +) + +// NullTime time helps convert nil to time.Time +type NullTime struct { + time.Time + Valid bool +} + +// Scan implements the Scanner interface. +func (nt *NullTime) Scan(value interface{}) error { + // check if it's time.Time which is what postgres returns + // for lagged time values + if nt.Time, nt.Valid = value.(time.Time); nt.Valid { + return nil + } + + // try to parse time from bytes which is what sqlite returns + date, ok := value.([]byte) + if !ok { + return nil + } + + times, err := time.Parse(sqliteTimeLayout, string(date)) + if err != nil { + return nil + } + + nt.Time, nt.Valid = times, true + return nil +} + +// Value implements the driver Valuer interface. +func (nt NullTime) Value() (driver.Value, error) { + if !nt.Valid { + return nil, nil + } + return nt.Time, nil +} diff --git a/internal/testplanet/storagenode.go b/internal/testplanet/storagenode.go index b57c424e2..7292e08f6 100644 --- a/internal/testplanet/storagenode.go +++ b/internal/testplanet/storagenode.go @@ -22,6 +22,7 @@ import ( "storj.io/storj/storagenode/collector" "storj.io/storj/storagenode/console/consoleserver" "storj.io/storj/storagenode/monitor" + "storj.io/storj/storagenode/nodestats" "storj.io/storj/storagenode/orders" "storj.io/storj/storagenode/piecestore" "storj.io/storj/storagenode/storagenodedb" @@ -87,6 +88,11 @@ func (planet *Planet) newStorageNodes(count int, whitelistedSatellites storj.Nod Collector: collector.Config{ Interval: time.Minute, }, + Nodestats: nodestats.Config{ + MaxSleep: time.Second, + ReputationSync: time.Second, + StorageSync: time.Second, + }, Console: consoleserver.Config{ Address: "127.0.0.1:0", StaticDir: filepath.Join(developmentRoot, "web/operator/"), diff --git a/internal/testrand/rand.go b/internal/testrand/rand.go index 51724fb76..049f7c2b9 100644 --- a/internal/testrand/rand.go +++ b/internal/testrand/rand.go @@ -26,6 +26,12 @@ func Int63n(n int64) int64 { return rand.Int63n(n) } +// Float64n returns floating point pseudo-random number in [-n,0] || [0,n] +// based on the sign of the input +func Float64n(n int64) float64 { + return rand.Float64() * float64(n) +} + // Read reads pseudo-random data into data. func Read(data []byte) { const newSourceThreshold = 64 diff --git a/pkg/pb/nodestats.pb.go b/pkg/pb/nodestats.pb.go index f7a8b33a9..aff8be9dc 100644 --- a/pkg/pb/nodestats.pb.go +++ b/pkg/pb/nodestats.pb.go @@ -258,8 +258,8 @@ func (m *DailyStorageUsageResponse) GetDailyStorageUsage() []*DailyStorageUsageR } type DailyStorageUsageResponse_StorageUsage struct { - AtRestTotal float64 `protobuf:"fixed64,2,opt,name=at_rest_total,json=atRestTotal,proto3" json:"at_rest_total,omitempty"` - TimeStamp time.Time `protobuf:"bytes,3,opt,name=time_stamp,json=timeStamp,proto3,stdtime" json:"time_stamp"` + AtRestTotal float64 `protobuf:"fixed64,1,opt,name=at_rest_total,json=atRestTotal,proto3" json:"at_rest_total,omitempty"` + Timestamp time.Time `protobuf:"bytes,2,opt,name=timestamp,proto3,stdtime" json:"timestamp"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -298,9 +298,9 @@ func (m *DailyStorageUsageResponse_StorageUsage) GetAtRestTotal() float64 { return 0 } -func (m *DailyStorageUsageResponse_StorageUsage) GetTimeStamp() time.Time { +func (m *DailyStorageUsageResponse_StorageUsage) GetTimestamp() time.Time { if m != nil { - return m.TimeStamp + return m.Timestamp } return time.Time{} } @@ -317,39 +317,39 @@ func init() { func init() { proto.RegisterFile("nodestats.proto", fileDescriptor_e0b184ee117142aa) } var fileDescriptor_e0b184ee117142aa = []byte{ - // 509 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x93, 0xcf, 0x6e, 0xd3, 0x40, - 0x10, 0xc6, 0x59, 0x27, 0x84, 0x66, 0x9c, 0x36, 0xcd, 0x72, 0x31, 0xe1, 0xe0, 0xc8, 0x45, 0x6a, - 0xb8, 0xb8, 0x22, 0x70, 0x40, 0x42, 0x1c, 0x48, 0x2a, 0xa1, 0x5e, 0x38, 0x6c, 0xca, 0x85, 0x03, - 0xab, 0x8d, 0xbd, 0x75, 0x2d, 0x92, 0xac, 0xf1, 0x8e, 0x85, 0x78, 0x05, 0x0e, 0x88, 0x47, 0xe0, - 0x11, 0x78, 0x0c, 0x9e, 0x80, 0x03, 0x87, 0xf2, 0x2a, 0x68, 0xd7, 0xce, 0x1f, 0x4c, 0xa1, 0x39, - 0xfa, 0xb7, 0xdf, 0x7c, 0x99, 0xf9, 0x66, 0x02, 0xdd, 0xa5, 0x8a, 0xa5, 0x46, 0x81, 0x3a, 0xcc, - 0x72, 0x85, 0x8a, 0xb6, 0xd7, 0xa0, 0x0f, 0x89, 0x4a, 0x54, 0x89, 0xfb, 0x7e, 0xa2, 0x54, 0x32, - 0x97, 0x27, 0xf6, 0x6b, 0x56, 0x5c, 0x9c, 0x60, 0xba, 0x30, 0xb2, 0x45, 0x56, 0x0a, 0x82, 0x1f, - 0x04, 0xba, 0x4c, 0x66, 0x05, 0x0a, 0x4c, 0xd5, 0x72, 0x6a, 0x0c, 0xa8, 0x0f, 0x2e, 0x2a, 0x14, - 0x73, 0x1e, 0xa9, 0x62, 0x89, 0x1e, 0x19, 0x90, 0x61, 0x83, 0x81, 0x45, 0x13, 0x43, 0xe8, 0x11, - 0xec, 0xeb, 0x22, 0x8a, 0xa4, 0xd6, 0x95, 0xc4, 0xb1, 0x92, 0x4e, 0x05, 0x4b, 0xd1, 0x43, 0x38, - 0xcc, 0xd7, 0xc6, 0x5c, 0xcc, 0xb3, 0x4b, 0xe1, 0x35, 0x06, 0x64, 0x48, 0x58, 0x77, 0xc3, 0x5f, - 0x18, 0x4c, 0x8f, 0x61, 0x0b, 0xf1, 0x99, 0x44, 0xe1, 0x35, 0xad, 0xf2, 0x60, 0x83, 0xc7, 0x12, - 0x45, 0xcd, 0x53, 0x47, 0x2a, 0x97, 0xde, 0xed, 0xba, 0xe7, 0xd4, 0xe0, 0xa0, 0x07, 0xdd, 0x97, - 0x12, 0xed, 0x40, 0x4c, 0xbe, 0x2f, 0xa4, 0xc6, 0xe0, 0x33, 0x81, 0xc3, 0x0d, 0xd3, 0x99, 0x5a, - 0x6a, 0x49, 0x9f, 0x43, 0xa7, 0xc8, 0x4c, 0x2a, 0x3c, 0xba, 0x94, 0xd1, 0x3b, 0x3b, 0xad, 0x3b, - 0xea, 0x87, 0x9b, 0x80, 0x6b, 0xf1, 0x30, 0xb7, 0xd4, 0x4f, 0x8c, 0x9c, 0x3e, 0x03, 0x57, 0x14, - 0x71, 0x8a, 0x55, 0xb5, 0x73, 0x63, 0x35, 0x58, 0xb9, 0x2d, 0x0e, 0x3e, 0x11, 0xf0, 0x4e, 0x45, - 0x3a, 0xff, 0x38, 0x45, 0x95, 0x8b, 0x44, 0xbe, 0xd6, 0x22, 0x91, 0x55, 0xb7, 0xf4, 0x29, 0x34, - 0x2f, 0x72, 0xb5, 0x58, 0x37, 0x54, 0x6e, 0x32, 0x5c, 0x6d, 0x32, 0x3c, 0x5f, 0x6d, 0x72, 0xbc, - 0xf7, 0xfd, 0xca, 0xbf, 0xf5, 0xe5, 0x97, 0x4f, 0x98, 0xad, 0xa0, 0x4f, 0xc0, 0x41, 0xb5, 0x6e, - 0x65, 0x97, 0x3a, 0x07, 0x55, 0xf0, 0xd5, 0x81, 0x7b, 0xd7, 0x34, 0x53, 0xc5, 0x74, 0x0c, 0x77, - 0xcc, 0x4c, 0x3c, 0x8d, 0x6d, 0x43, 0x9d, 0xf1, 0x81, 0x29, 0xfe, 0x79, 0xe5, 0xb7, 0x5e, 0xa9, - 0x58, 0x9e, 0x9d, 0xb2, 0x96, 0x79, 0x3e, 0x8b, 0xa9, 0x80, 0xbb, 0xb1, 0x71, 0xe1, 0xba, 0xb4, - 0xe1, 0x85, 0xf1, 0xf1, 0x9c, 0x41, 0x63, 0xe8, 0x8e, 0x1e, 0x6d, 0x05, 0xf3, 0xcf, 0xdf, 0x0a, - 0xff, 0x80, 0xbd, 0xb8, 0xae, 0xeb, 0x7f, 0x80, 0xce, 0xf6, 0x37, 0x0d, 0x60, 0x5f, 0x20, 0xcf, - 0xa5, 0x46, 0x6e, 0x8f, 0xd4, 0x8e, 0x4e, 0x98, 0x2b, 0x90, 0x49, 0x8d, 0xe7, 0x06, 0xd1, 0x09, - 0x80, 0x5d, 0xb2, 0x9d, 0xdc, 0xde, 0xe1, 0xae, 0xd9, 0xb4, 0x4d, 0xdd, 0xd4, 0xc0, 0xd1, 0x37, - 0x02, 0x6d, 0x33, 0x6e, 0xf9, 0x37, 0x99, 0xc0, 0xde, 0xea, 0x9a, 0xe8, 0xf6, 0xc6, 0x6b, 0x67, - 0xd7, 0xbf, 0x7f, 0xed, 0x5b, 0x95, 0xeb, 0x5b, 0xe8, 0xfd, 0x15, 0x04, 0x3d, 0xfa, 0x7f, 0x4c, - 0xa5, 0xed, 0x83, 0x5d, 0xb2, 0x1c, 0x37, 0xdf, 0x38, 0xd9, 0x6c, 0xd6, 0xb2, 0x13, 0x3e, 0xfe, - 0x1d, 0x00, 0x00, 0xff, 0xff, 0xf4, 0xc8, 0x68, 0x4c, 0x37, 0x04, 0x00, 0x00, + // 503 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x93, 0xc1, 0x6e, 0xd3, 0x40, + 0x10, 0x86, 0x59, 0x27, 0x84, 0x66, 0x9c, 0x36, 0xcd, 0x72, 0x31, 0xe6, 0xe0, 0xc8, 0x45, 0x6a, + 0xb8, 0xb8, 0x22, 0x70, 0x40, 0x42, 0x1c, 0x70, 0x2a, 0xa1, 0x5e, 0x38, 0x38, 0xe5, 0xc2, 0x01, + 0x6b, 0x63, 0x6f, 0x5d, 0x8b, 0x24, 0x6b, 0xbc, 0x63, 0x24, 0x5e, 0x81, 0x03, 0xe2, 0x0d, 0x78, + 0x05, 0x1e, 0x83, 0x27, 0xe0, 0xc0, 0xa1, 0xbc, 0x0a, 0xda, 0xb5, 0x63, 0x07, 0x53, 0x20, 0x1c, + 0xfd, 0xed, 0x3f, 0x7f, 0x66, 0xfe, 0x99, 0xc0, 0x70, 0x2d, 0x62, 0x2e, 0x91, 0xa1, 0xf4, 0xb2, + 0x5c, 0xa0, 0xa0, 0xfd, 0x1a, 0xd8, 0x90, 0x88, 0x44, 0x94, 0xd8, 0x76, 0x12, 0x21, 0x92, 0x25, + 0x3f, 0xd1, 0x5f, 0x8b, 0xe2, 0xe2, 0x04, 0xd3, 0x95, 0x92, 0xad, 0xb2, 0x52, 0xe0, 0x7e, 0x23, + 0x30, 0x0c, 0x78, 0x56, 0x20, 0xc3, 0x54, 0xac, 0xe7, 0xca, 0x80, 0x3a, 0x60, 0xa2, 0x40, 0xb6, + 0x0c, 0x23, 0x51, 0xac, 0xd1, 0x22, 0x63, 0x32, 0xe9, 0x04, 0xa0, 0xd1, 0x4c, 0x11, 0x7a, 0x04, + 0xfb, 0xb2, 0x88, 0x22, 0x2e, 0x65, 0x25, 0x31, 0xb4, 0x64, 0x50, 0xc1, 0x52, 0x74, 0x1f, 0x0e, + 0xf3, 0xda, 0x38, 0x64, 0xcb, 0xec, 0x92, 0x59, 0x9d, 0x31, 0x99, 0x90, 0x60, 0xd8, 0xf0, 0x67, + 0x0a, 0xd3, 0x63, 0xd8, 0x42, 0xe1, 0x82, 0x23, 0xb3, 0xba, 0x5a, 0x79, 0xd0, 0x60, 0x9f, 0x23, + 0x6b, 0x79, 0xca, 0x48, 0xe4, 0xdc, 0xba, 0xd9, 0xf6, 0x9c, 0x2b, 0xec, 0x8e, 0x60, 0xf8, 0x9c, + 0xa3, 0x1e, 0x28, 0xe0, 0x6f, 0x0b, 0x2e, 0xd1, 0xfd, 0x48, 0xe0, 0xb0, 0x61, 0x32, 0x13, 0x6b, + 0xc9, 0xe9, 0x53, 0x18, 0x14, 0x99, 0x4a, 0x25, 0x8c, 0x2e, 0x79, 0xf4, 0x46, 0x4f, 0x6b, 0x4e, + 0x6d, 0xaf, 0x09, 0xb8, 0x15, 0x4f, 0x60, 0x96, 0xfa, 0x99, 0x92, 0xd3, 0x27, 0x60, 0xb2, 0x22, + 0x4e, 0xb1, 0xaa, 0x36, 0xfe, 0x59, 0x0d, 0x5a, 0xae, 0x8b, 0xdd, 0x0f, 0x04, 0xac, 0x53, 0x96, + 0x2e, 0xdf, 0xcf, 0x51, 0xe4, 0x2c, 0xe1, 0x2f, 0x25, 0x4b, 0x78, 0xd5, 0x2d, 0x7d, 0x0c, 0xdd, + 0x8b, 0x5c, 0xac, 0xea, 0x86, 0xca, 0x4d, 0x7a, 0x9b, 0x4d, 0x7a, 0xe7, 0x9b, 0x4d, 0xfa, 0x7b, + 0x5f, 0xaf, 0x9c, 0x1b, 0x9f, 0x7e, 0x38, 0x24, 0xd0, 0x15, 0xf4, 0x11, 0x18, 0x28, 0xea, 0x56, + 0x76, 0xa9, 0x33, 0x50, 0xb8, 0x9f, 0x0d, 0xb8, 0x73, 0x4d, 0x33, 0x55, 0x4c, 0xc7, 0x70, 0x4b, + 0xcd, 0x14, 0xa6, 0xb1, 0x6e, 0x68, 0xe0, 0x1f, 0xa8, 0xe2, 0xef, 0x57, 0x4e, 0xef, 0x85, 0x88, + 0xf9, 0xd9, 0x69, 0xd0, 0x53, 0xcf, 0x67, 0x31, 0x65, 0x70, 0x3b, 0x56, 0x2e, 0xa1, 0x2c, 0x6d, + 0xc2, 0x42, 0xf9, 0x58, 0xc6, 0xb8, 0x33, 0x31, 0xa7, 0x0f, 0xb6, 0x82, 0xf9, 0xe3, 0x6f, 0x79, + 0xbf, 0xc0, 0x51, 0xdc, 0xd6, 0xd9, 0xef, 0x60, 0xb0, 0xfd, 0x4d, 0x5d, 0xd8, 0x67, 0x18, 0xe6, + 0x5c, 0x62, 0xa8, 0x8f, 0x54, 0x77, 0x48, 0x02, 0x93, 0x61, 0xc0, 0x25, 0x9e, 0x2b, 0x44, 0x7d, + 0xe8, 0xd7, 0xa7, 0xff, 0x5f, 0xd1, 0x34, 0x65, 0xd3, 0x2f, 0x04, 0xfa, 0x6a, 0xda, 0xf2, 0x5f, + 0x32, 0x83, 0xbd, 0xcd, 0x31, 0xd1, 0xed, 0x85, 0xb7, 0xae, 0xce, 0xbe, 0x7b, 0xed, 0x5b, 0x15, + 0xeb, 0x6b, 0x18, 0xfd, 0x96, 0x03, 0x3d, 0xfa, 0x7b, 0x4a, 0xa5, 0xed, 0xbd, 0x5d, 0xa2, 0xf4, + 0xbb, 0xaf, 0x8c, 0x6c, 0xb1, 0xe8, 0xe9, 0x09, 0x1f, 0xfe, 0x0c, 0x00, 0x00, 0xff, 0xff, 0x83, + 0x66, 0x1e, 0x03, 0x36, 0x04, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. diff --git a/pkg/pb/nodestats.proto b/pkg/pb/nodestats.proto index 3603a46fc..bd6134983 100644 --- a/pkg/pb/nodestats.proto +++ b/pkg/pb/nodestats.proto @@ -36,8 +36,8 @@ message DailyStorageUsageRequest { message DailyStorageUsageResponse { message StorageUsage { - double at_rest_total = 2; - google.protobuf.Timestamp time_stamp = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; + double at_rest_total = 1; + google.protobuf.Timestamp timestamp = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; } bytes node_id = 1 [(gogoproto.customtype) = "NodeID", (gogoproto.nullable) = false]; diff --git a/proto.lock b/proto.lock index 7c95843d2..8fc133402 100644 --- a/proto.lock +++ b/proto.lock @@ -4107,13 +4107,13 @@ "name": "StorageUsage", "fields": [ { - "id": 2, + "id": 1, "name": "at_rest_total", "type": "double" }, { - "id": 3, - "name": "time_stamp", + "id": 2, + "name": "timestamp", "type": "google.protobuf.Timestamp", "options": [ { diff --git a/satellite/accounting/db.go b/satellite/accounting/db.go index 0513ca227..a1e2097ff 100644 --- a/satellite/accounting/db.go +++ b/satellite/accounting/db.go @@ -45,12 +45,12 @@ type Rollup struct { AtRestTotal float64 } -// NodeSpaceUsage is node at rest space usage over a period of time -type NodeSpaceUsage struct { +// StorageNodeUsage is node at rest space usage over a period of time +type StorageNodeUsage struct { NodeID storj.NodeID - AtRestTotal float64 + StorageUsed float64 - TimeStamp time.Time + Timestamp time.Time } // StoragenodeAccounting stores information about bandwidth and storage usage for storage nodes @@ -69,8 +69,8 @@ type StoragenodeAccounting interface { LastTimestamp(ctx context.Context, timestampType string) (time.Time, error) // QueryPaymentInfo queries Nodes and Accounting_Rollup on nodeID QueryPaymentInfo(ctx context.Context, start time.Time, end time.Time) ([]*CSVRow, error) - // QueryNodeDailySpaceUsage returns slice of NodeSpaceUsage for given period - QueryNodeDailySpaceUsage(ctx context.Context, nodeID storj.NodeID, start time.Time, end time.Time) ([]NodeSpaceUsage, error) + // QueryStorageNodeUsage returns slice of StorageNodeUsage for given period + QueryStorageNodeUsage(ctx context.Context, nodeID storj.NodeID, start time.Time, end time.Time) ([]StorageNodeUsage, error) // DeleteTalliesBefore deletes all tallies prior to some time DeleteTalliesBefore(ctx context.Context, latestRollup time.Time) error } diff --git a/satellite/accounting/db_test.go b/satellite/accounting/db_test.go index 355af4ae8..35f8f9bd5 100644 --- a/satellite/accounting/db_test.go +++ b/satellite/accounting/db_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/skyrings/skyring-common/tools/uuid" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "storj.io/storj/internal/testcontext" @@ -19,6 +20,10 @@ import ( "storj.io/storj/satellite/satellitedb/satellitedbtest" ) +const ( + rollupsCount = 25 +) + func TestSaveBucketTallies(t *testing.T) { satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) { ctx := testcontext.New(t) @@ -41,6 +46,47 @@ func TestSaveBucketTallies(t *testing.T) { }) } +func TestStorageNodeUsage(t *testing.T) { + satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + nodeID := testrand.NodeID() + + var nodes storj.NodeIDList + nodes = append(nodes, nodeID) + nodes = append(nodes, testrand.NodeID()) + nodes = append(nodes, testrand.NodeID()) + nodes = append(nodes, testrand.NodeID()) + + rollups, dates := createRollups(nodes) + + // run 2 rollups for the same day + err := db.StoragenodeAccounting().SaveRollup(ctx, time.Now(), rollups) + require.NoError(t, err) + err = db.StoragenodeAccounting().SaveRollup(ctx, time.Now(), rollups) + require.NoError(t, err) + + nodeStorageUsages, err := db.StoragenodeAccounting().QueryStorageNodeUsage(ctx, nodeID, time.Time{}, time.Now()) + require.NoError(t, err) + assert.NotNil(t, nodeStorageUsages) + assert.Equal(t, rollupsCount-1, len(nodeStorageUsages)) + + for _, usage := range nodeStorageUsages { + assert.Equal(t, nodeID, usage.NodeID) + } + + lastDate, prevDate := dates[len(dates)-1], dates[len(dates)-2] + lastRollup, prevRollup := rollups[lastDate][nodeID], rollups[prevDate][nodeID] + + testValue := lastRollup.AtRestTotal - prevRollup.AtRestTotal + testValue /= lastRollup.StartTime.Sub(prevRollup.StartTime).Hours() + + assert.Equal(t, testValue, nodeStorageUsages[len(nodeStorageUsages)-1].StorageUsed) + assert.Equal(t, lastDate, nodeStorageUsages[len(nodeStorageUsages)-1].Timestamp.UTC()) + }) +} + func createBucketStorageTallies(projectID uuid.UUID) (map[string]*accounting.BucketTally, []accounting.BucketTally, error) { bucketTallies := make(map[string]*accounting.BucketTally) var expectedTallies []accounting.BucketTally @@ -66,3 +112,38 @@ func createBucketStorageTallies(projectID uuid.UUID) (map[string]*accounting.Buc } return bucketTallies, expectedTallies, nil } + +func createRollups(nodes storj.NodeIDList) (accounting.RollupStats, []time.Time) { + var dates []time.Time + rollups := make(accounting.RollupStats) + now := time.Now().UTC() + + var rollupCounter int64 + for i := 0; i < rollupsCount; i++ { + startDate := time.Date(now.Year(), now.Month()-1, 1+i, 0, 0, 0, 0, now.Location()) + if rollups[startDate] == nil { + rollups[startDate] = make(map[storj.NodeID]*accounting.Rollup) + } + + for _, nodeID := range nodes { + rollup := &accounting.Rollup{ + ID: rollupCounter, + NodeID: nodeID, + StartTime: startDate, + PutTotal: testrand.Int63n(10000), + GetTotal: testrand.Int63n(10000), + GetAuditTotal: testrand.Int63n(10000), + GetRepairTotal: testrand.Int63n(10000), + PutRepairTotal: testrand.Int63n(10000), + AtRestTotal: testrand.Float64n(10000), + } + + rollupCounter++ + rollups[startDate][nodeID] = rollup + } + + dates = append(dates, startDate) + } + + return rollups, dates +} diff --git a/satellite/nodestats/endpoint.go b/satellite/nodestats/endpoint.go index adb31d8dc..1aefb6be7 100644 --- a/satellite/nodestats/endpoint.go +++ b/satellite/nodestats/endpoint.go @@ -93,25 +93,25 @@ func (e *Endpoint) DailyStorageUsage(ctx context.Context, req *pb.DailyStorageUs return nil, NodeStatsEndpointErr.Wrap(err) } - nodeSpaceUsages, err := e.accounting.QueryNodeDailySpaceUsage(ctx, node.Id, req.GetFrom(), req.GetTo()) + nodeSpaceUsages, err := e.accounting.QueryStorageNodeUsage(ctx, node.Id, req.GetFrom(), req.GetTo()) if err != nil { return nil, NodeStatsEndpointErr.Wrap(err) } return &pb.DailyStorageUsageResponse{ NodeId: node.Id, - DailyStorageUsage: toPBDailyStorageUsage(nodeSpaceUsages), + DailyStorageUsage: toProtoDailyStorageUsage(nodeSpaceUsages), }, nil } -// toPBDailyStorageUsage converts NodeSpaceUsage to PB DailyStorageUsageResponse_StorageUsage -func toPBDailyStorageUsage(usages []accounting.NodeSpaceUsage) []*pb.DailyStorageUsageResponse_StorageUsage { +// toProtoDailyStorageUsage converts StorageNodeUsage to PB DailyStorageUsageResponse_StorageUsage +func toProtoDailyStorageUsage(usages []accounting.StorageNodeUsage) []*pb.DailyStorageUsageResponse_StorageUsage { var pbUsages []*pb.DailyStorageUsageResponse_StorageUsage for _, usage := range usages { pbUsages = append(pbUsages, &pb.DailyStorageUsageResponse_StorageUsage{ - AtRestTotal: usage.AtRestTotal, - TimeStamp: usage.TimeStamp, + AtRestTotal: usage.StorageUsed, + Timestamp: usage.Timestamp, }) } diff --git a/satellite/satellitedb/locked.go b/satellite/satellitedb/locked.go index db67e5441..0601e0451 100644 --- a/satellite/satellitedb/locked.go +++ b/satellite/satellitedb/locked.go @@ -1150,13 +1150,6 @@ func (m *lockedStoragenodeAccounting) LastTimestamp(ctx context.Context, timesta return m.db.LastTimestamp(ctx, timestampType) } -// QueryNodeDailySpaceUsage returns slice of NodeSpaceUsage for given period -func (m *lockedStoragenodeAccounting) QueryNodeDailySpaceUsage(ctx context.Context, nodeID storj.NodeID, start time.Time, end time.Time) ([]accounting.NodeSpaceUsage, error) { - m.Lock() - defer m.Unlock() - return m.db.QueryNodeDailySpaceUsage(ctx, nodeID, start, end) -} - // QueryPaymentInfo queries Nodes and Accounting_Rollup on nodeID func (m *lockedStoragenodeAccounting) QueryPaymentInfo(ctx context.Context, start time.Time, end time.Time) ([]*accounting.CSVRow, error) { m.Lock() @@ -1164,6 +1157,13 @@ func (m *lockedStoragenodeAccounting) QueryPaymentInfo(ctx context.Context, star return m.db.QueryPaymentInfo(ctx, start, end) } +// QueryStorageNodeUsage returns slice of StorageNodeUsage for given period +func (m *lockedStoragenodeAccounting) QueryStorageNodeUsage(ctx context.Context, nodeID storj.NodeID, start time.Time, end time.Time) ([]accounting.StorageNodeUsage, error) { + m.Lock() + defer m.Unlock() + return m.db.QueryStorageNodeUsage(ctx, nodeID, start, end) +} + // SaveRollup records tally and bandwidth rollup aggregations to the database func (m *lockedStoragenodeAccounting) SaveRollup(ctx context.Context, latestTally time.Time, stats accounting.RollupStats) error { m.Lock() diff --git a/satellite/satellitedb/storagenodeaccounting.go b/satellite/satellitedb/storagenodeaccounting.go index 12b573fbe..676e23966 100644 --- a/satellite/satellitedb/storagenodeaccounting.go +++ b/satellite/satellitedb/storagenodeaccounting.go @@ -10,6 +10,7 @@ import ( "github.com/zeebo/errs" + "storj.io/storj/internal/dbutil" "storj.io/storj/pkg/storj" "storj.io/storj/satellite/accounting" dbx "storj.io/storj/satellite/satellitedb/dbx" @@ -195,20 +196,25 @@ func (db *StoragenodeAccounting) QueryPaymentInfo(ctx context.Context, start tim return csv, nil } -// QueryNodeDailySpaceUsage returns slice of NodeSpaceUsage for given period -func (db *StoragenodeAccounting) QueryNodeDailySpaceUsage(ctx context.Context, nodeID storj.NodeID, start time.Time, end time.Time) (_ []accounting.NodeSpaceUsage, err error) { +// QueryStorageNodeUsage returns slice of StorageNodeUsage for given period +func (db *StoragenodeAccounting) QueryStorageNodeUsage(ctx context.Context, nodeID storj.NodeID, start time.Time, end time.Time) (_ []accounting.StorageNodeUsage, err error) { defer mon.Task()(&ctx)(&err) - // as entries are stored on daily basis we don't need - // to extract DATE from start_time - query := `SELECT at_rest_total, start_time + query := `SELECT at_rest_total, start_time, + LAG(at_rest_total) OVER win AS prev_at_rest, + LAG(start_time) OVER win AS prev_start_time FROM accounting_rollups - WHERE node_id = ? - AND ? <= start_time AND start_time <= ? - GROUP BY start_time - ORDER BY start_time ASC` + WHERE id IN ( + SELECT MAX(id) + FROM accounting_rollups + WHERE node_id = ? + AND ? <= start_time AND start_time <= ? + GROUP BY start_time + ORDER BY start_time ASC + ) + WINDOW win AS (ORDER BY start_time)` - rows, err := db.db.QueryContext(ctx, db.db.Rebind(query), nodeID, start, end) + rows, err := db.db.QueryContext(ctx, db.db.Rebind(query), nodeID, start.UTC(), end.UTC()) if err != nil { return nil, Error.Wrap(err) } @@ -217,24 +223,39 @@ func (db *StoragenodeAccounting) QueryNodeDailySpaceUsage(ctx context.Context, n err = errs.Combine(err, rows.Close()) }() - var nodeSpaceUsages []accounting.NodeSpaceUsage + var nodeStorageUsages []accounting.StorageNodeUsage for rows.Next() { var atRestTotal float64 var startTime time.Time + var prevAtRestTotal sql.NullFloat64 + var prevStartTime dbutil.NullTime - err = rows.Scan(atRestTotal, startTime) + err = rows.Scan(&atRestTotal, &startTime, &prevAtRestTotal, &prevStartTime) if err != nil { return nil, Error.Wrap(err) } - nodeSpaceUsages = append(nodeSpaceUsages, accounting.NodeSpaceUsage{ + // skip first entry as we can not extract hours + // properly without storagenode storage tallies + // which formed this value + if !prevStartTime.Valid { + continue + } + + atRest := atRestTotal - prevAtRestTotal.Float64 + hours := startTime.Sub(prevStartTime.Time).Hours() + if hours != 0 { + atRest /= hours + } + + nodeStorageUsages = append(nodeStorageUsages, accounting.StorageNodeUsage{ NodeID: nodeID, - AtRestTotal: atRestTotal, - TimeStamp: startTime, + StorageUsed: atRest, + Timestamp: startTime, }) } - return nodeSpaceUsages, nil + return nodeStorageUsages, nil } // DeleteTalliesBefore deletes all raw tallies prior to some time diff --git a/storagenode/console/bandwidth.go b/storagenode/console/bandwidth.go index 81b37dabe..9ffb073f6 100644 --- a/storagenode/console/bandwidth.go +++ b/storagenode/console/bandwidth.go @@ -4,13 +4,25 @@ package console import ( + "context" "time" "github.com/zeebo/errs" + "storj.io/storj/pkg/storj" "storj.io/storj/storagenode/bandwidth" ) +// Bandwidth is interface for querying bandwidth from the db +type Bandwidth interface { + // GetDaily returns slice of daily bandwidth usage for provided time range, + // sorted in ascending order for particular satellite + GetDaily(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) ([]BandwidthUsed, error) + // GetDailyTotal returns slice of daily bandwidth usage for provided time range, + // sorted in ascending order + GetDailyTotal(ctx context.Context, from, to time.Time) ([]BandwidthUsed, error) +} + // Egress stores info about storage node egress usage type Egress struct { Repair int64 `json:"repair"` diff --git a/storagenode/console/consoleserver/server.go b/storagenode/console/consoleserver/server.go index 61a733fca..cdc783863 100644 --- a/storagenode/console/consoleserver/server.go +++ b/storagenode/console/consoleserver/server.go @@ -20,7 +20,8 @@ import ( "storj.io/storj/internal/version" "storj.io/storj/pkg/storj" "storj.io/storj/storagenode/console" - "storj.io/storj/storagenode/nodestats" + "storj.io/storj/storagenode/reputation" + "storj.io/storj/storagenode/storageusage" ) const ( @@ -49,18 +50,18 @@ type DashboardResponse struct { // DashboardData stores all needed information about storagenode type DashboardData struct { - Bandwidth console.BandwidthInfo `json:"bandwidth"` - DiskSpace console.DiskSpaceInfo `json:"diskSpace"` - WalletAddress string `json:"walletAddress"` - VersionInfo version.Info `json:"versionInfo"` - IsLastVersion bool `json:"isLastVersion"` - Uptime time.Duration `json:"uptime"` - NodeID string `json:"nodeId"` - Satellites storj.NodeIDList `json:"satellites"` - UptimeCheck nodestats.ReputationStats `json:"uptimeCheck"` - AuditCheck nodestats.ReputationStats `json:"auditCheck"` - BandwidthChartData []console.BandwidthUsed `json:"bandwidthChartData"` - DiskSpaceChartData []nodestats.SpaceUsageStamp `json:"diskSpaceChartData"` + Bandwidth console.BandwidthInfo `json:"bandwidth"` + DiskSpace console.DiskSpaceInfo `json:"diskSpace"` + WalletAddress string `json:"walletAddress"` + VersionInfo version.Info `json:"versionInfo"` + IsLastVersion bool `json:"isLastVersion"` + Uptime time.Duration `json:"uptime"` + NodeID string `json:"nodeId"` + Satellites storj.NodeIDList `json:"satellites"` + UptimeCheck reputation.Metric `json:"uptimeCheck"` + AuditCheck reputation.Metric `json:"auditCheck"` + BandwidthChartData []console.BandwidthUsed `json:"bandwidthChartData"` + DiskSpaceChartData []storageusage.Stamp `json:"diskSpaceChartData"` } // Server represents storagenode console web server @@ -175,14 +176,11 @@ func (server *Server) dashboardHandler(writer http.ResponseWriter, request *http func (server *Server) getDashboardData(ctx context.Context, satelliteID *storj.NodeID) (DashboardData, error) { var response = DashboardData{} - satellites, err := server.service.GetSatellites(ctx) - if err != nil { - return response, err - } + satellites := server.service.GetSatellites(ctx) // checks if current satellite id is related to current storage node if satelliteID != nil { - if err = server.checkSatelliteID(satellites, *satelliteID); err != nil { + if err := server.checkSatelliteID(satellites, *satelliteID); err != nil { return response, err } } @@ -254,7 +252,7 @@ func (server *Server) getBandwidth(ctx context.Context, satelliteID *storj.NodeI } func (server *Server) getBandwidthChartData(ctx context.Context, satelliteID *storj.NodeID) (_ []console.BandwidthUsed, err error) { - from, to := date.MonthBoundary() + from, to := date.MonthBoundary(time.Now().UTC()) if satelliteID != nil { return server.service.GetDailyBandwidthUsed(ctx, *satelliteID, from, to) diff --git a/storagenode/console/db_test.go b/storagenode/console/db_test.go index c9b714aea..cc66ea527 100644 --- a/storagenode/console/db_test.go +++ b/storagenode/console/db_test.go @@ -20,18 +20,16 @@ func TestDB_Trivial(t *testing.T) { ctx := testcontext.New(t) defer ctx.Cleanup() - { // Ensure GetSatelliteIDs works at all - _, err := db.Console().GetSatelliteIDs(ctx, time.Now(), time.Now()) + satelliteID := testrand.NodeID() + now := time.Now() + + { // Ensure Bandwidth GetDailyTotal works at all + _, err := db.Console().Bandwidth().GetDailyTotal(ctx, now, now) require.NoError(t, err) } - { // Ensure GetDailyTotalBandwidthUsed works at all - _, err := db.Console().GetDailyTotalBandwidthUsed(ctx, time.Now(), time.Now()) - require.NoError(t, err) - } - - { // Ensure GetDailyBandwidthUsed works at all - _, err := db.Console().GetDailyBandwidthUsed(ctx, testrand.NodeID(), time.Now(), time.Now()) + { // Ensure Bandwidth GetDaily works at all + _, err := db.Console().Bandwidth().GetDaily(ctx, satelliteID, now, now) require.NoError(t, err) } }) diff --git a/storagenode/console/service.go b/storagenode/console/service.go index 53f810701..30db762ee 100644 --- a/storagenode/console/service.go +++ b/storagenode/console/service.go @@ -16,8 +16,10 @@ import ( "storj.io/storj/pkg/kademlia" "storj.io/storj/pkg/storj" "storj.io/storj/storagenode/bandwidth" - "storj.io/storj/storagenode/nodestats" "storj.io/storj/storagenode/pieces" + "storj.io/storj/storagenode/reputation" + "storj.io/storj/storagenode/storageusage" + "storj.io/storj/storagenode/trust" ) var ( @@ -29,27 +31,22 @@ var ( // DB exposes methods for managing SNO dashboard related data. type DB interface { - // GetSatelliteIDs returns list of satelliteIDs that storagenode has interacted with - // at least once - GetSatelliteIDs(ctx context.Context, from, to time.Time) (storj.NodeIDList, error) - // GetDailyBandwidthUsed returns slice of daily bandwidth usage for provided time range, - // sorted in ascending order - GetDailyTotalBandwidthUsed(ctx context.Context, from, to time.Time) ([]BandwidthUsed, error) - // GetDailyBandwidthUsed returns slice of daily bandwidth usage for provided time range, - // sorted in ascending order for particular satellite - GetDailyBandwidthUsed(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) ([]BandwidthUsed, error) + // Bandwidth is a getter for Bandwidth db + Bandwidth() Bandwidth } // Service is handling storage node operator related logic type Service struct { log *zap.Logger - consoleDB DB - bandwidthDB bandwidth.DB - pieceStore *pieces.Store - kademlia *kademlia.Kademlia - version *version.Service - nodestats *nodestats.Service + trust *trust.Pool + consoleDB DB + bandwidthDB bandwidth.DB + reputationDB reputation.DB + storageusageDB storageusage.DB + pieceStore *pieces.Store + kademlia *kademlia.Kademlia + version *version.Service allocatedBandwidth memory.Size allocatedDiskSpace memory.Size @@ -60,7 +57,7 @@ type Service struct { // NewService returns new instance of Service func NewService(log *zap.Logger, consoleDB DB, bandwidth bandwidth.DB, pieceStore *pieces.Store, kademlia *kademlia.Kademlia, version *version.Service, - nodestats *nodestats.Service, allocatedBandwidth, allocatedDiskSpace memory.Size, walletAddress string, versionInfo version.Info) (*Service, error) { + allocatedBandwidth, allocatedDiskSpace memory.Size, walletAddress string, versionInfo version.Info) (*Service, error) { if log == nil { return nil, errs.New("log can't be nil") } @@ -92,7 +89,6 @@ func NewService(log *zap.Logger, consoleDB DB, bandwidth bandwidth.DB, pieceStor pieceStore: pieceStore, kademlia: kademlia, version: version, - nodestats: nodestats, allocatedBandwidth: allocatedBandwidth, allocatedDiskSpace: allocatedDiskSpace, walletAddress: walletAddress, @@ -118,7 +114,7 @@ func (s *Service) GetUsedBandwidthTotal(ctx context.Context) (_ *BandwidthInfo, func (s *Service) GetDailyTotalBandwidthUsed(ctx context.Context, from, to time.Time) (_ []BandwidthUsed, err error) { defer mon.Task()(&ctx)(&err) - return s.consoleDB.GetDailyTotalBandwidthUsed(ctx, from, to) + return s.consoleDB.Bandwidth().GetDailyTotal(ctx, from, to) } // GetDailyBandwidthUsed returns slice of daily bandwidth usage for provided time range, @@ -126,7 +122,7 @@ func (s *Service) GetDailyTotalBandwidthUsed(ctx context.Context, from, to time. func (s *Service) GetDailyBandwidthUsed(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ []BandwidthUsed, err error) { defer mon.Task()(&ctx)(&err) - return s.consoleDB.GetDailyBandwidthUsed(ctx, satelliteID, from, to) + return s.consoleDB.Bandwidth().GetDaily(ctx, satelliteID, from, to) } // GetBandwidthBySatellite returns all info about storage node bandwidth usage by satellite @@ -179,27 +175,17 @@ func (s *Service) GetUptime(ctx context.Context) time.Duration { } // GetStatsFromSatellite returns storagenode stats from the satellite -func (s *Service) GetStatsFromSatellite(ctx context.Context, satelliteID storj.NodeID) (_ *nodestats.Stats, err error) { +func (s *Service) GetStatsFromSatellite(ctx context.Context, satelliteID storj.NodeID) (_ *reputation.Stats, err error) { defer mon.Task()(&ctx)(&err) - stats, err := s.nodestats.GetStatsFromSatellite(ctx, satelliteID) - if err != nil { - return nil, SNOServiceErr.Wrap(err) - } - - return stats, nil + return s.reputationDB.Get(ctx, satelliteID) } // GetDailyStorageUsedForSatellite returns daily SpaceUsageStamps for a particular satellite -func (s *Service) GetDailyStorageUsedForSatellite(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ []nodestats.SpaceUsageStamp, err error) { +func (s *Service) GetDailyStorageUsedForSatellite(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ []storageusage.Stamp, err error) { defer mon.Task()(&ctx)(&err) - stamps, err := s.nodestats.GetDailyStorageUsedForSatellite(ctx, satelliteID, from, to) - if err != nil { - return nil, SNOServiceErr.Wrap(err) - } - - return stamps, nil + return s.storageusageDB.GetDaily(ctx, satelliteID, from, to) } // GetNodeID return current node id @@ -221,7 +207,7 @@ func (s *Service) CheckVersion(ctx context.Context) (err error) { } // GetSatellites used to retrieve satellites list -func (s *Service) GetSatellites(ctx context.Context) (_ storj.NodeIDList, err error) { - defer mon.Task()(&ctx)(&err) - return s.consoleDB.GetSatelliteIDs(ctx, time.Time{}, time.Now()) +func (s *Service) GetSatellites(ctx context.Context) (_ storj.NodeIDList) { + defer mon.Task()(&ctx)(nil) + return s.trust.GetSatellites(ctx) } diff --git a/storagenode/nodestats/cache.go b/storagenode/nodestats/cache.go new file mode 100644 index 000000000..5967a1579 --- /dev/null +++ b/storagenode/nodestats/cache.go @@ -0,0 +1,167 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package nodestats + +import ( + "context" + "math/rand" + "time" + + "github.com/zeebo/errs" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + + "storj.io/storj/internal/date" + "storj.io/storj/internal/sync2" + "storj.io/storj/pkg/storj" + "storj.io/storj/storagenode/reputation" + "storj.io/storj/storagenode/storageusage" + "storj.io/storj/storagenode/trust" +) + +var ( + // NodeStatsCacheErr defines node stats cache loop error + NodeStatsCacheErr = errs.Class("node stats cache error") +) + +// Config defines nodestats cache configuration +type Config struct { + MaxSleep time.Duration `help:"maximum duration to wait before requesting data" releaseDefault:"300s" devDefault:"1s"` + ReputationSync time.Duration `help:"how often to sync reputation" releaseDefault:"4h" devDefault:"1m"` + StorageSync time.Duration `help:"how often to sync storage" releaseDefault:"12h" devDefault:"2m"` +} + +// CacheStorage encapsulates cache DBs +type CacheStorage struct { + Reputation reputation.DB + StorageUsage storageusage.DB +} + +// Cache runs cache loop and stores reputation stats +// and storage usage into db +type Cache struct { + log *zap.Logger + + db CacheStorage + service *Service + trust *trust.Pool + + maxSleep time.Duration + reputationCycle sync2.Cycle + storageCycle sync2.Cycle +} + +// NewCache creates new caching service instance +func NewCache(log *zap.Logger, config Config, db CacheStorage, service *Service, trust *trust.Pool) *Cache { + return &Cache{ + log: log, + db: db, + service: service, + trust: trust, + maxSleep: config.MaxSleep, + reputationCycle: *sync2.NewCycle(config.ReputationSync), + storageCycle: *sync2.NewCycle(config.StorageSync), + } +} + +// Run runs loop +func (cache *Cache) Run(ctx context.Context) error { + var group errgroup.Group + + cache.reputationCycle.Start(ctx, &group, func(ctx context.Context) error { + jitter := time.Duration(rand.Intn(int(cache.maxSleep))) + if !sync2.Sleep(ctx, jitter) { + return ctx.Err() + } + + err := cache.CacheReputationStats(ctx) + if err != nil { + cache.log.Error("Get stats query failed", zap.Error(err)) + } + + return nil + }) + cache.storageCycle.Start(ctx, &group, func(ctx context.Context) error { + jitter := time.Duration(rand.Intn(int(cache.maxSleep))) + if !sync2.Sleep(ctx, jitter) { + return ctx.Err() + } + + err := cache.CacheSpaceUsage(ctx) + if err != nil { + cache.log.Error("Get disk space usage query failed", zap.Error(err)) + } + + return nil + }) + + return group.Wait() +} + +// CacheReputationStats queries node stats from all the satellites +// known to the storagenode and stores information into db +func (cache *Cache) CacheReputationStats(ctx context.Context) (err error) { + defer mon.Task()(&ctx)(&err) + + return cache.satelliteLoop(ctx, func(satellite storj.NodeID) error { + stats, err := cache.service.GetReputationStats(ctx, satellite) + if err != nil { + return err + } + + if err = cache.db.Reputation.Store(ctx, *stats); err != nil { + return err + } + + return nil + }) +} + +// CacheSpaceUsage queries disk space usage from all the satellites +// known to the storagenode and stores information into db +func (cache *Cache) CacheSpaceUsage(ctx context.Context) (err error) { + defer mon.Task()(&ctx)(&err) + + // get current month edges + startDate, endDate := date.MonthBoundary(time.Now().UTC()) + + return cache.satelliteLoop(ctx, func(satellite storj.NodeID) error { + spaceUsages, err := cache.service.GetDailyStorageUsage(ctx, satellite, startDate, endDate) + if err != nil { + return err + } + + err = cache.db.StorageUsage.Store(ctx, spaceUsages) + if err != nil { + return err + } + + return nil + }) +} + +// satelliteLoop loops over all satellites from trust pool executing provided fn, caching errors if occurred, +// on each step checks if context has been cancelled +func (cache *Cache) satelliteLoop(ctx context.Context, fn func(id storj.NodeID) error) error { + var groupErr errs.Group + for _, satellite := range cache.trust.GetSatellites(ctx) { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + groupErr.Add(fn(satellite)) + } + + return groupErr.Err() +} + +// Close closes underlying cycles +func (cache *Cache) Close() error { + defer mon.Task()(nil)(nil) + cache.reputationCycle.Close() + cache.storageCycle.Close() + return nil +} diff --git a/storagenode/nodestats/service.go b/storagenode/nodestats/service.go index 4aaff609d..191bf9dfe 100644 --- a/storagenode/nodestats/service.go +++ b/storagenode/nodestats/service.go @@ -12,10 +12,12 @@ import ( "google.golang.org/grpc" "gopkg.in/spacemonkeygo/monkit.v2" - "storj.io/storj/pkg/kademlia" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/storj" "storj.io/storj/pkg/transport" + "storj.io/storj/storagenode/reputation" + "storj.io/storj/storagenode/storageusage" + "storj.io/storj/storagenode/trust" ) var ( @@ -25,32 +27,6 @@ var ( mon = monkit.Package() ) -// Stats encapsulates storagenode stats retrieved from the satellite -type Stats struct { - SatelliteID storj.NodeID - - UptimeCheck ReputationStats - AuditCheck ReputationStats -} - -// ReputationStats encapsulates storagenode reputation metrics -type ReputationStats struct { - TotalCount int64 - SuccessCount int64 - - ReputationAlpha float64 - ReputationBeta float64 - ReputationScore float64 -} - -// SpaceUsageStamp is space usage for satellite at some point in time -type SpaceUsageStamp struct { - SatelliteID storj.NodeID - AtRestTotal float64 - - TimeStamp time.Time -} - // Client encapsulates NodeStatsClient with underlying connection type Client struct { conn *grpc.ClientConn @@ -67,23 +43,23 @@ type Service struct { log *zap.Logger transport transport.Client - kademlia *kademlia.Kademlia + trust *trust.Pool } // NewService creates new instance of service -func NewService(log *zap.Logger, transport transport.Client, kademlia *kademlia.Kademlia) *Service { +func NewService(log *zap.Logger, transport transport.Client, trust *trust.Pool) *Service { return &Service{ log: log, transport: transport, - kademlia: kademlia, + trust: trust, } } -// GetStatsFromSatellite retrieves node stats from particular satellite -func (s *Service) GetStatsFromSatellite(ctx context.Context, satelliteID storj.NodeID) (_ *Stats, err error) { +// GetReputationStats retrieves reputation stats from particular satellite +func (s *Service) GetReputationStats(ctx context.Context, satelliteID storj.NodeID) (_ *reputation.Stats, err error) { defer mon.Task()(&ctx)(&err) - client, err := s.DialNodeStats(ctx, satelliteID) + client, err := s.dial(ctx, satelliteID) if err != nil { return nil, NodeStatsServiceErr.Wrap(err) } @@ -102,30 +78,31 @@ func (s *Service) GetStatsFromSatellite(ctx context.Context, satelliteID storj.N uptime := resp.GetUptimeCheck() audit := resp.GetAuditCheck() - return &Stats{ + return &reputation.Stats{ SatelliteID: satelliteID, - UptimeCheck: ReputationStats{ - TotalCount: uptime.GetTotalCount(), - SuccessCount: uptime.GetSuccessCount(), - ReputationAlpha: uptime.GetReputationAlpha(), - ReputationBeta: uptime.GetReputationBeta(), - ReputationScore: uptime.GetReputationScore(), + Uptime: reputation.Metric{ + TotalCount: uptime.GetTotalCount(), + SuccessCount: uptime.GetSuccessCount(), + Alpha: uptime.GetReputationAlpha(), + Beta: uptime.GetReputationBeta(), + Score: uptime.GetReputationScore(), }, - AuditCheck: ReputationStats{ - TotalCount: audit.GetTotalCount(), - SuccessCount: audit.GetSuccessCount(), - ReputationAlpha: audit.GetReputationAlpha(), - ReputationBeta: audit.GetReputationBeta(), - ReputationScore: audit.GetReputationScore(), + Audit: reputation.Metric{ + TotalCount: audit.GetTotalCount(), + SuccessCount: audit.GetSuccessCount(), + Alpha: audit.GetReputationAlpha(), + Beta: audit.GetReputationBeta(), + Score: audit.GetReputationScore(), }, + UpdatedAt: time.Now(), }, nil } -// GetDailyStorageUsedForSatellite returns daily SpaceUsageStamps over a period of time for a particular satellite -func (s *Service) GetDailyStorageUsedForSatellite(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ []SpaceUsageStamp, err error) { +// GetDailyStorageUsage returns daily storage usage over a period of time for a particular satellite +func (s *Service) GetDailyStorageUsage(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ []storageusage.Stamp, err error) { defer mon.Task()(&ctx)(&err) - client, err := s.DialNodeStats(ctx, satelliteID) + client, err := s.dial(ctx, satelliteID) if err != nil { return nil, NodeStatsServiceErr.Wrap(err) } @@ -136,7 +113,7 @@ func (s *Service) GetDailyStorageUsedForSatellite(ctx context.Context, satellite } }() - resp, err := client.DailyStorageUsage(ctx, &pb.DailyStorageUsageRequest{}) + resp, err := client.DailyStorageUsage(ctx, &pb.DailyStorageUsageRequest{From: from, To: to}) if err != nil { return nil, NodeStatsServiceErr.Wrap(err) } @@ -144,13 +121,23 @@ func (s *Service) GetDailyStorageUsedForSatellite(ctx context.Context, satellite return fromSpaceUsageResponse(resp, satelliteID), nil } -// DialNodeStats dials GRPC NodeStats client for the satellite by id -func (s *Service) DialNodeStats(ctx context.Context, satelliteID storj.NodeID) (*Client, error) { - satellite, err := s.kademlia.FindNode(ctx, satelliteID) +// dial dials GRPC NodeStats client for the satellite by id +func (s *Service) dial(ctx context.Context, satelliteID storj.NodeID) (_ *Client, err error) { + defer mon.Task()(&ctx)(&err) + + address, err := s.trust.GetAddress(ctx, satelliteID) if err != nil { return nil, errs.New("unable to find satellite %s: %v", satelliteID, err) } + satellite := pb.Node{ + Id: satelliteID, + Address: &pb.NodeAddress{ + Transport: pb.NodeTransport_TCP_TLS_GRPC, + Address: address, + }, + } + conn, err := s.transport.DialNode(ctx, &satellite) if err != nil { return nil, errs.New("unable to connect to the satellite %s: %v", satelliteID, err) @@ -162,15 +149,15 @@ func (s *Service) DialNodeStats(ctx context.Context, satelliteID storj.NodeID) ( }, nil } -// fromSpaceUsageResponse get SpaceUsageStamp slice from pb.SpaceUsageResponse -func fromSpaceUsageResponse(resp *pb.DailyStorageUsageResponse, satelliteID storj.NodeID) []SpaceUsageStamp { - var stamps []SpaceUsageStamp +// fromSpaceUsageResponse get DiskSpaceUsage slice from pb.SpaceUsageResponse +func fromSpaceUsageResponse(resp *pb.DailyStorageUsageResponse, satelliteID storj.NodeID) []storageusage.Stamp { + var stamps []storageusage.Stamp for _, pbUsage := range resp.GetDailyStorageUsage() { - stamps = append(stamps, SpaceUsageStamp{ + stamps = append(stamps, storageusage.Stamp{ SatelliteID: satelliteID, AtRestTotal: pbUsage.AtRestTotal, - TimeStamp: pbUsage.TimeStamp, + Timestamp: pbUsage.Timestamp, }) } diff --git a/storagenode/peer.go b/storagenode/peer.go index cf42a5de2..3736fbdd5 100644 --- a/storagenode/peer.go +++ b/storagenode/peer.go @@ -35,6 +35,8 @@ import ( "storj.io/storj/storagenode/orders" "storj.io/storj/storagenode/pieces" "storj.io/storj/storagenode/piecestore" + "storj.io/storj/storagenode/reputation" + "storj.io/storj/storagenode/storageusage" "storj.io/storj/storagenode/trust" "storj.io/storj/storagenode/vouchers" ) @@ -59,6 +61,8 @@ type DB interface { UsedSerials() piecestore.UsedSerials Vouchers() vouchers.DB Console() console.DB + Reputation() reputation.DB + StorageUsage() storageusage.DB // TODO: use better interfaces RoutingTable() (kdb, ndb, adb storage.KeyValueStore) @@ -78,6 +82,8 @@ type Config struct { Vouchers vouchers.Config + Nodestats nodestats.Config + Console consoleserver.Config Version version.Config @@ -126,7 +132,10 @@ type Peer struct { Collector *collector.Service - NodeStats *nodestats.Service + NodeStats struct { + Service *nodestats.Service + Cache *nodestats.Cache + } // Web server with web UI Console struct { @@ -266,10 +275,20 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config, ver } { // setup node stats service - peer.NodeStats = nodestats.NewService( - peer.Log.Named("nodestats"), + peer.NodeStats.Service = nodestats.NewService( + peer.Log.Named("nodestats:service"), peer.Transport, - peer.Kademlia.Service) + peer.Storage2.Trust) + + peer.NodeStats.Cache = nodestats.NewCache( + peer.Log.Named("nodestats:cache"), + config.Nodestats, + nodestats.CacheStorage{ + Reputation: peer.DB.Reputation(), + StorageUsage: peer.DB.StorageUsage(), + }, + peer.NodeStats.Service, + peer.Storage2.Trust) } { // setup vouchers @@ -287,7 +306,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config, ver peer.Storage2.Store, peer.Kademlia.Service, peer.Version, - peer.NodeStats, config.Storage.AllocatedBandwidth, config.Storage.AllocatedDiskSpace, config.Kademlia.Operator.Wallet, @@ -372,6 +390,9 @@ func (peer *Peer) Run(ctx context.Context) (err error) { return errs2.IgnoreCanceled(peer.Server.Run(ctx)) }) + group.Go(func() error { + return errs2.IgnoreCanceled(peer.NodeStats.Cache.Run(ctx)) + }) group.Go(func() error { return errs2.IgnoreCanceled(peer.Console.Endpoint.Run(ctx)) }) @@ -420,6 +441,10 @@ func (peer *Peer) Close() error { errlist.Add(peer.Console.Listener.Close()) } + if peer.NodeStats.Cache != nil { + errlist.Add(peer.NodeStats.Cache.Close()) + } + return errlist.Err() } diff --git a/storagenode/reputation/reputation.go b/storagenode/reputation/reputation.go new file mode 100644 index 000000000..06f64431b --- /dev/null +++ b/storagenode/reputation/reputation.go @@ -0,0 +1,39 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package reputation + +import ( + "context" + "time" + + "storj.io/storj/pkg/storj" +) + +// DB works with reputation database +type DB interface { + // Store inserts or updates reputation stats into the DB + Store(ctx context.Context, stats Stats) error + // Get retrieves stats for specific satellite + Get(ctx context.Context, satelliteID storj.NodeID) (*Stats, error) +} + +// Stats consist of reputation metrics +type Stats struct { + SatelliteID storj.NodeID + + Uptime Metric + Audit Metric + + UpdatedAt time.Time +} + +// Metric encapsulates storagenode reputation metrics +type Metric struct { + TotalCount int64 + SuccessCount int64 + + Alpha float64 + Beta float64 + Score float64 +} diff --git a/storagenode/storagenodedb/consoledb.go b/storagenode/storagenodedb/consoledb.go index 066830ac7..de323dc76 100644 --- a/storagenode/storagenodedb/consoledb.go +++ b/storagenode/storagenodedb/consoledb.go @@ -5,7 +5,6 @@ package storagenodedb import ( "context" - "database/sql" "time" "github.com/zeebo/errs" @@ -26,56 +25,14 @@ func (db *InfoDB) Console() console.DB { return &consoledb{db} } // Console returns console.DB func (db *DB) Console() console.DB { return db.info.Console() } -// GetSatelliteIDs returns list of satelliteIDs that storagenode has interacted with -// at least once -func (db *consoledb) GetSatelliteIDs(ctx context.Context, from, to time.Time) (_ storj.NodeIDList, err error) { - defer mon.Task()(&ctx)(&err) - - var satellites storj.NodeIDList - - rows, err := db.db.QueryContext(ctx, db.Rebind(` - SELECT DISTINCT satellite_id - FROM bandwidth_usage - WHERE ? <= created_at AND created_at <= ?`), from.UTC(), to.UTC()) - - if err != nil { - if err == sql.ErrNoRows { - return satellites, nil - } - return nil, err - } - defer func() { - err = errs.Combine(err, rows.Close()) - }() - - for rows.Next() { - var satelliteID storj.NodeID - if err = rows.Scan(&satelliteID); err != nil { - return nil, err - } - - satellites = append(satellites, satelliteID) - } - - return satellites, nil +// Bandwidth returns consoledb as console.Bandwidth +func (db *consoledb) Bandwidth() console.Bandwidth { + return db } -// GetDailyBandwidthUsed returns slice of daily bandwidth usage for provided time range, -// sorted in ascending order -func (db *consoledb) GetDailyTotalBandwidthUsed(ctx context.Context, from, to time.Time) (_ []console.BandwidthUsed, err error) { - defer mon.Task()(&ctx)(&err) - - since, _ := date.DayBoundary(from.UTC()) - _, before := date.DayBoundary(to.UTC()) - - return db.getDailyBandwidthUsed(ctx, - "WHERE ? <= created_at AND created_at <= ?", - since.UTC(), before.UTC()) -} - -// GetDailyBandwidthUsed returns slice of daily bandwidth usage for provided time range, +// GetDaily returns slice of daily bandwidth usage for provided time range, // sorted in ascending order for particular satellite -func (db *consoledb) GetDailyBandwidthUsed(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ []console.BandwidthUsed, err error) { +func (db *consoledb) GetDaily(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ []console.BandwidthUsed, err error) { defer mon.Task()(&ctx)(&err) since, _ := date.DayBoundary(from.UTC()) @@ -83,7 +40,20 @@ func (db *consoledb) GetDailyBandwidthUsed(ctx context.Context, satelliteID stor return db.getDailyBandwidthUsed(ctx, "WHERE satellite_id = ? AND ? <= created_at AND created_at <= ?", - satelliteID, since.UTC(), before.UTC()) + satelliteID, since, before) +} + +// GetDaily returns slice of daily bandwidth usage for provided time range, +// sorted in ascending order +func (db *consoledb) GetDailyTotal(ctx context.Context, from, to time.Time) (_ []console.BandwidthUsed, err error) { + defer mon.Task()(&ctx)(&err) + + since, _ := date.DayBoundary(from.UTC()) + _, before := date.DayBoundary(to.UTC()) + + return db.getDailyBandwidthUsed(ctx, + "WHERE ? <= created_at AND created_at <= ?", + since, before) } // getDailyBandwidthUsed returns slice of grouped by date bandwidth usage @@ -91,13 +61,11 @@ func (db *consoledb) GetDailyBandwidthUsed(ctx context.Context, satelliteID stor func (db *consoledb) getDailyBandwidthUsed(ctx context.Context, cond string, args ...interface{}) (_ []console.BandwidthUsed, err error) { defer mon.Task()(&ctx)(&err) - query := db.Rebind(` - SELECT action, SUM(amount), created_at - FROM bandwidth_usage - ` + cond + ` - GROUP BY DATE(created_at), action - ORDER BY created_at ASC - `) + query := `SELECT action, SUM(amount), created_at + FROM bandwidth_usage + ` + cond + ` + GROUP BY DATE(created_at), action + ORDER BY created_at ASC` rows, err := db.db.QueryContext(ctx, query, args...) if err != nil { diff --git a/storagenode/storagenodedb/infodb.go b/storagenode/storagenodedb/infodb.go index a16681dea..f1635adae 100644 --- a/storagenode/storagenodedb/infodb.go +++ b/storagenode/storagenodedb/infodb.go @@ -423,6 +423,52 @@ func (db *InfoDB) Migration() *migrate.Migration { `CREATE INDEX idx_piece_expirations_deletion_failed_at ON piece_expirations(deletion_failed_at)`, }, }, + { + Description: "Add reputation and storage usage cache tables", + Version: 16, + Action: migrate.SQL{ + `CREATE TABLE reputation ( + satellite_id BLOB NOT NULL, + uptime_success_count INTEGER NOT NULL, + uptime_total_count INTEGER NOT NULL, + uptime_reputation_alpha REAL NOT NULL, + uptime_reputation_beta REAL NOT NULL, + uptime_reputation_score REAL NOT NULL, + audit_success_count INTEGER NOT NULL, + audit_total_count INTEGER NOT NULL, + audit_reputation_alpha REAL NOT NULL, + audit_reputation_beta REAL NOT NULL, + audit_reputation_score REAL NOT NULL, + updated_at TIMESTAMP NOT NULL, + PRIMARY KEY (satellite_id) + )`, + `CREATE TABLE storage_usage ( + satellite_id BLOB NOT NULL, + at_rest_total REAL NOT NUll, + timestamp TIMESTAMP NOT NULL, + PRIMARY KEY (satellite_id, timestamp) + )`, + }, + }, }, } } + +// withTx is a helper method which executes callback in transaction scope +func (db *InfoDB) withTx(ctx context.Context, cb func(tx *sql.Tx) error) error { + tx, err := db.Begin() + if err != nil { + return err + } + + defer func() { + if err != nil { + err = errs.Combine(err, tx.Rollback()) + return + } + + err = tx.Commit() + }() + + return cb(tx) +} diff --git a/storagenode/storagenodedb/reputation.go b/storagenode/storagenodedb/reputation.go new file mode 100644 index 000000000..23803eed6 --- /dev/null +++ b/storagenode/storagenodedb/reputation.go @@ -0,0 +1,90 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package storagenodedb + +import ( + "context" + + "storj.io/storj/pkg/storj" + "storj.io/storj/storagenode/reputation" +) + +// Reputation returns reputation.DB +func (db *InfoDB) Reputation() reputation.DB { return &reputationDB{db} } + +// Reputation returns reputation.DB +func (db *DB) Reputation() reputation.DB { return db.info.Reputation() } + +// reputation works with node reputation DB +type reputationDB struct { + *InfoDB +} + +// Store inserts or updates reputation stats into the db +func (db *reputationDB) Store(ctx context.Context, stats reputation.Stats) (err error) { + defer mon.Task()(&ctx)(&err) + + stmt := `INSERT OR REPLACE INTO reputation ( + satellite_id, + uptime_success_count, + uptime_total_count, + uptime_reputation_alpha, + uptime_reputation_beta, + uptime_reputation_score, + audit_success_count, + audit_total_count, + audit_reputation_alpha, + audit_reputation_beta, + audit_reputation_score, + updated_at + ) VALUES(?,?,?,?,?,?,?,?,?,?,?,?)` + + _, err = db.db.ExecContext(ctx, stmt, + stats.SatelliteID, + stats.Uptime.SuccessCount, + stats.Uptime.TotalCount, + stats.Uptime.Alpha, + stats.Uptime.Beta, + stats.Uptime.Score, + stats.Audit.SuccessCount, + stats.Audit.TotalCount, + stats.Audit.Alpha, + stats.Audit.Beta, + stats.Audit.Score, + stats.UpdatedAt.UTC(), + ) + + return nil +} + +// Get retrieves stats for specific satellite +func (db *reputationDB) Get(ctx context.Context, satelliteID storj.NodeID) (_ *reputation.Stats, err error) { + defer mon.Task()(&ctx)(&err) + + stats := reputation.Stats{} + + row := db.db.QueryRowContext(ctx, + `SELECT * FROM reputation WHERE satellite_id = ?`, + satelliteID, + ) + + err = row.Scan(&stats.SatelliteID, + &stats.Uptime.SuccessCount, + &stats.Uptime.TotalCount, + &stats.Uptime.Alpha, + &stats.Uptime.Beta, + &stats.Uptime.Score, + &stats.Audit.SuccessCount, + &stats.Audit.TotalCount, + &stats.Audit.Alpha, + &stats.Audit.Beta, + &stats.Audit.Score, + &stats.UpdatedAt, + ) + if err != nil { + return nil, err + } + + return &stats, nil +} diff --git a/storagenode/storagenodedb/storageusage.go b/storagenode/storagenodedb/storageusage.go new file mode 100644 index 000000000..9788bb221 --- /dev/null +++ b/storagenode/storagenodedb/storageusage.go @@ -0,0 +1,137 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package storagenodedb + +import ( + "context" + "database/sql" + "time" + + "github.com/zeebo/errs" + + "storj.io/storj/pkg/storj" + "storj.io/storj/storagenode/storageusage" +) + +// StorageUsage returns storageusage.DB +func (db *InfoDB) StorageUsage() storageusage.DB { return &storageusageDB{db} } + +// StorageUsage returns storageusage.DB +func (db *DB) StorageUsage() storageusage.DB { return db.info.StorageUsage() } + +// storageusageDB storage usage DB +type storageusageDB struct { + *InfoDB +} + +// Store stores storage usage stamps to db replacing conflicting entries +func (db *storageusageDB) Store(ctx context.Context, stamps []storageusage.Stamp) (err error) { + defer mon.Task()(&ctx)(&err) + + if len(stamps) == 0 { + return nil + } + + query := `INSERT OR REPLACE INTO storage_usage(satellite_id, at_rest_total, timestamp) + VALUES(?,?,?)` + + return db.withTx(ctx, func(tx *sql.Tx) error { + for _, stamp := range stamps { + _, err = db.db.ExecContext(ctx, query, stamp.SatelliteID, stamp.AtRestTotal, stamp.Timestamp.UTC()) + + if err != nil { + return err + } + } + + return nil + }) +} + +// GetDaily returns daily storage usage stamps for particular satellite +// for provided time range +func (db *storageusageDB) GetDaily(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ []storageusage.Stamp, err error) { + defer mon.Task()(&ctx)(&err) + + query := `SELECT * + FROM storage_usage + WHERE timestamp IN ( + SELECT MAX(timestamp) + FROM storage_usage + WHERE satellite_id = ? + AND ? <= timestamp AND timestamp <= ? + GROUP BY DATE(timestamp) + )` + + rows, err := db.db.QueryContext(ctx, query, satelliteID, from.UTC(), to.UTC()) + if err != nil { + return nil, err + } + + defer func() { + err = errs.Combine(err, rows.Close()) + }() + + var stamps []storageusage.Stamp + for rows.Next() { + var satellite storj.NodeID + var atRestTotal float64 + var timeStamp time.Time + + err = rows.Scan(&satellite, &atRestTotal, &timeStamp) + if err != nil { + return nil, err + } + + stamps = append(stamps, storageusage.Stamp{ + SatelliteID: satellite, + AtRestTotal: atRestTotal, + Timestamp: timeStamp, + }) + } + + return stamps, nil +} + +// GetDailyTotal returns daily storage usage stamps summed across all known satellites +// for provided time range +func (db *storageusageDB) GetDailyTotal(ctx context.Context, from, to time.Time) (_ []storageusage.Stamp, err error) { + defer mon.Task()(&ctx)(&err) + + query := `SELECT SUM(at_rest_total), timestamp + FROM storage_usage + WHERE timestamp IN ( + SELECT MAX(timestamp) + FROM storage_usage + WHERE ? <= timestamp AND timestamp <= ? + GROUP BY DATE(timestamp), satellite_id + ) GROUP BY DATE(timestamp)` + + rows, err := db.db.QueryContext(ctx, query, from.UTC(), to.UTC()) + if err != nil { + return nil, err + } + + defer func() { + err = errs.Combine(err, rows.Close()) + }() + + var stamps []storageusage.Stamp + for rows.Next() { + var atRestTotal float64 + var timeStamp time.Time + + err = rows.Scan(&atRestTotal, &timeStamp) + if err != nil { + return nil, err + } + + stamps = append(stamps, storageusage.Stamp{ + AtRestTotal: atRestTotal, + Timestamp: timeStamp, + }) + } + + return stamps, nil +} diff --git a/storagenode/storagenodedb/testdata/sqlite.v16.sql b/storagenode/storagenodedb/testdata/sqlite.v16.sql new file mode 100644 index 000000000..27afd3a3a --- /dev/null +++ b/storagenode/storagenodedb/testdata/sqlite.v16.sql @@ -0,0 +1,178 @@ +-- table for keeping serials that need to be verified against +CREATE TABLE used_serial_ ( + satellite_id BLOB NOT NULL, + serial_number BLOB NOT NULL, + expiration TIMESTAMP NOT NULL +); +-- primary key on satellite id and serial number +CREATE UNIQUE INDEX pk_used_serial_ ON used_serial_(satellite_id, serial_number); +-- expiration index to allow fast deletion +CREATE INDEX idx_used_serial_ ON used_serial_(expiration); + +-- certificate table for storing uplink/satellite certificates +CREATE TABLE certificate ( + cert_id INTEGER +); + +-- table for storing piece meta info +CREATE TABLE pieceinfo_ ( + satellite_id BLOB NOT NULL, + piece_id BLOB NOT NULL, + piece_size BIGINT NOT NULL, + piece_expiration TIMESTAMP, + + order_limit BLOB NOT NULL, + uplink_piece_hash BLOB NOT NULL, + uplink_cert_id INTEGER NOT NULL, + + deletion_failed_at TIMESTAMP, + piece_creation TIMESTAMP NOT NULL, + + FOREIGN KEY(uplink_cert_id) REFERENCES certificate(cert_id) +); +-- primary key by satellite id and piece id +CREATE UNIQUE INDEX pk_pieceinfo_ ON pieceinfo_(satellite_id, piece_id); +-- fast queries for expiration for pieces that have one +CREATE INDEX idx_pieceinfo__expiration ON pieceinfo_(piece_expiration) WHERE piece_expiration IS NOT NULL; + +-- table for storing bandwidth usage +CREATE TABLE bandwidth_usage ( + satellite_id BLOB NOT NULL, + action INTEGER NOT NULL, + amount BIGINT NOT NULL, + created_at TIMESTAMP NOT NULL +); +CREATE INDEX idx_bandwidth_usage_satellite ON bandwidth_usage(satellite_id); +CREATE INDEX idx_bandwidth_usage_created ON bandwidth_usage(created_at); + +-- table for storing all unsent orders +CREATE TABLE unsent_order ( + satellite_id BLOB NOT NULL, + serial_number BLOB NOT NULL, + + order_limit_serialized BLOB NOT NULL, + order_serialized BLOB NOT NULL, + order_limit_expiration TIMESTAMP NOT NULL, + + uplink_cert_id INTEGER NOT NULL, + + FOREIGN KEY(uplink_cert_id) REFERENCES certificate(cert_id) +); +CREATE UNIQUE INDEX idx_orders ON unsent_order(satellite_id, serial_number); + +-- table for storing all sent orders +CREATE TABLE order_archive_ ( + satellite_id BLOB NOT NULL, + serial_number BLOB NOT NULL, + + order_limit_serialized BLOB NOT NULL, + order_serialized BLOB NOT NULL, + + uplink_cert_id INTEGER NOT NULL, + + status INTEGER NOT NULL, + archived_at TIMESTAMP NOT NULL, + + FOREIGN KEY(uplink_cert_id) REFERENCES certificate(cert_id) +); + +-- table for storing vouchers +CREATE TABLE vouchers ( + satellite_id BLOB PRIMARY KEY NOT NULL, + voucher_serialized BLOB NOT NULL, + expiration TIMESTAMP NOT NULL +); + +CREATE TABLE bandwidth_usage_rollups ( + interval_start TIMESTAMP NOT NULL, + satellite_id BLOB NOT NULL, + action INTEGER NOT NULL, + amount BIGINT NOT NULL, + PRIMARY KEY ( interval_start, satellite_id, action ) +); + +-- table to hold expiration data (and only expirations. no other pieceinfo) +CREATE TABLE piece_expirations ( + satellite_id BLOB NOT NULL, + piece_id BLOB NOT NULL, + piece_expiration TIMESTAMP NOT NULL, -- date when it can be deleted + deletion_failed_at TIMESTAMP, + PRIMARY KEY ( satellite_id, piece_id ) +); +CREATE INDEX idx_piece_expirations_piece_expiration ON piece_expirations(piece_expiration); +CREATE INDEX idx_piece_expirations_deletion_failed_at ON piece_expirations(deletion_failed_at); + +-- tables to store nodestats cache +CREATE TABLE reputation ( + satellite_id BLOB NOT NULL, + uptime_success_count INTEGER NOT NULL, + uptime_total_count INTEGER NOT NULL, + uptime_reputation_alpha REAL NOT NULL, + uptime_reputation_beta REAL NOT NULL, + uptime_reputation_score REAL NOT NULL, + audit_success_count INTEGER NOT NULL, + audit_total_count INTEGER NOT NULL, + audit_reputation_alpha REAL NOT NULL, + audit_reputation_beta REAL NOT NULL, + audit_reputation_score REAL NOT NULL, + updated_at TIMESTAMP NOT NULL, + PRIMARY KEY (satellite_id) +); + +CREATE TABLE storage_usage ( + satellite_id BLOB NOT NULL, + at_rest_total REAL NOT NUll, + timestamp TIMESTAMP NOT NULL, + PRIMARY KEY (satellite_id, timestamp) +); + +INSERT INTO unsent_order VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',X'1eddef484b4c03f01332279032796972',X'0a101eddef484b4c03f0133227903279697212202b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf410001a201968996e7ef170a402fdfd88b6753df792c063c07c555905ffac9cd3cbd1c00022200ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac30002a20d00cf14f3c68b56321ace04902dec0484eb6f9098b22b31c6b3f82db249f191630643802420c08dfeb88e50510a8c1a5b9034a0c08dfeb88e50510a8c1a5b9035246304402204df59dc6f5d1bb7217105efbc9b3604d19189af37a81efbf16258e5d7db5549e02203bb4ead16e6e7f10f658558c22b59c3339911841e8dbaae6e2dea821f7326894',X'0a101eddef484b4c03f0133227903279697210321a47304502206d4c106ddec88140414bac5979c95bdea7de2e0ecc5be766e08f7d5ea36641a7022100e932ff858f15885ffa52d07e260c2c25d3861810ea6157956c1793ad0c906284','2019-04-01 16:01:35.9254586+00:00',1); + +INSERT INTO bandwidth_usage VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',0,0,'2019-04-01 18:51:24.1074772+00:00'); +INSERT INTO bandwidth_usage VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',0,0,'2019-04-01 20:51:24.1074772+00:00'); +INSERT INTO bandwidth_usage VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',1,1,'2019-04-01 18:51:24.1074772+00:00'); +INSERT INTO bandwidth_usage VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',1,1,'2019-04-01 20:51:24.1074772+00:00'); +INSERT INTO bandwidth_usage VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',2,2,'2019-04-01 18:51:24.1074772+00:00'); +INSERT INTO bandwidth_usage VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',2,2,'2019-04-01 20:51:24.1074772+00:00'); +INSERT INTO bandwidth_usage VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',3,3,'2019-04-01 18:51:24.1074772+00:00'); +INSERT INTO bandwidth_usage VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',3,3,'2019-04-01 20:51:24.1074772+00:00'); +INSERT INTO bandwidth_usage VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',4,4,'2019-04-01 18:51:24.1074772+00:00'); +INSERT INTO bandwidth_usage VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',4,4,'2019-04-01 20:51:24.1074772+00:00'); +INSERT INTO bandwidth_usage VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',5,5,'2019-04-01 18:51:24.1074772+00:00'); +INSERT INTO bandwidth_usage VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',5,5,'2019-04-01 20:51:24.1074772+00:00'); +INSERT INTO bandwidth_usage VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',6,6,'2019-04-01 18:51:24.1074772+00:00'); +INSERT INTO bandwidth_usage VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',6,6,'2019-04-01 20:51:24.1074772+00:00'); +INSERT INTO bandwidth_usage VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',1,1,'2019-04-01 18:51:24.1074772+00:00'); +INSERT INTO bandwidth_usage VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',1,1,'2019-04-01 20:51:24.1074772+00:00'); +INSERT INTO bandwidth_usage VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',2,2,'2019-04-01 18:51:24.1074772+00:00'); +INSERT INTO bandwidth_usage VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',2,2,'2019-04-01 20:51:24.1074772+00:00'); +INSERT INTO bandwidth_usage VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',3,3,'2019-04-01 18:51:24.1074772+00:00'); +INSERT INTO bandwidth_usage VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',3,3,'2019-04-01 20:51:24.1074772+00:00'); +INSERT INTO bandwidth_usage VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',4,4,'2019-04-01 18:51:24.1074772+00:00'); +INSERT INTO bandwidth_usage VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',4,4,'2019-04-01 20:51:24.1074772+00:00'); +INSERT INTO bandwidth_usage VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',5,5,'2019-04-01 18:51:24.1074772+00:00'); +INSERT INTO bandwidth_usage VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',5,5,'2019-04-01 20:51:24.1074772+00:00'); +INSERT INTO bandwidth_usage VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',6,6,'2019-04-01 18:51:24.1074772+00:00'); +INSERT INTO bandwidth_usage VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',6,6,'2019-04-01 20:51:24.1074772+00:00'); + +INSERT INTO vouchers VALUES(X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000', X'd5e757fd8d207d1c46583fb58330f803dc961b71147308ff75ff1e72a0df6b0b', '2019-07-04 00:00:00.000000+00:00'); + +INSERT INTO bandwidth_usage_rollups VALUES('2019-07-12 18:00:00+00:00',X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',0,0); +INSERT INTO bandwidth_usage_rollups VALUES('2019-07-12 20:00:00+00:00',X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',0,0); +INSERT INTO bandwidth_usage_rollups VALUES('2019-07-12 18:00:00+00:00',X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',1,1); +INSERT INTO bandwidth_usage_rollups VALUES('2019-07-12 20:00:00+00:00',X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',1,1); +INSERT INTO bandwidth_usage_rollups VALUES('2019-07-12 18:00:00+00:00',X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',2,2); +INSERT INTO bandwidth_usage_rollups VALUES('2019-07-12 20:00:00+00:00',X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',2,2); +INSERT INTO bandwidth_usage_rollups VALUES('2019-07-12 18:00:00+00:00',X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',3,3); +INSERT INTO bandwidth_usage_rollups VALUES('2019-07-12 20:00:00+00:00',X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',3,3); +INSERT INTO bandwidth_usage_rollups VALUES('2019-07-12 18:00:00+00:00',X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',4,4); +INSERT INTO bandwidth_usage_rollups VALUES('2019-07-12 20:00:00+00:00',X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',4,4); +INSERT INTO bandwidth_usage_rollups VALUES('2019-07-12 18:00:00+00:00',X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',5,5); +INSERT INTO bandwidth_usage_rollups VALUES('2019-07-12 20:00:00+00:00',X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',5,5); +INSERT INTO bandwidth_usage_rollups VALUES('2019-07-12 18:00:00+00:00',X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',6,6); +INSERT INTO bandwidth_usage_rollups VALUES('2019-07-12 20:00:00+00:00',X'2b3a5863a41f25408a8f5348839d7a1361dbd886d75786bb139a8ca0bdf41000',6,6); + +-- NEW DATA -- +insert INTO reputation VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',1,1,1.0,1.0,1.0,1,1,1.0,1.0,1.0,'2019-07-19 20:00:00+00:00'); + +insert INTO storage_usage VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',5.0,'2019-07-19 20:00:00+00:00'); diff --git a/storagenode/storageusage/storageusage.go b/storagenode/storageusage/storageusage.go new file mode 100644 index 000000000..d484747f6 --- /dev/null +++ b/storagenode/storageusage/storageusage.go @@ -0,0 +1,32 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package storageusage + +import ( + "context" + "time" + + "storj.io/storj/pkg/storj" +) + +// DB works with storage usage database +type DB interface { + // Store stores storage usage stamps to db replacing conflicting entries + Store(ctx context.Context, stamps []Stamp) error + // GetDaily returns daily storage usage stamps for particular satellite + // for provided time range + GetDaily(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) ([]Stamp, error) + // GetDailyTotal returns daily storage usage stamps summed across all known satellites + // for provided time range + GetDailyTotal(ctx context.Context, from, to time.Time) ([]Stamp, error) +} + +// Stamp is storage usage stamp for satellite at some point in time +type Stamp struct { + SatelliteID storj.NodeID + + AtRestTotal float64 + + Timestamp time.Time +}