adds audit job (#521)
This commit is contained in:
parent
ee62e2a9d8
commit
3b572264ca
@ -12,6 +12,7 @@ import (
|
||||
"github.com/alicebob/miniredis"
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
"storj.io/storj/pkg/audit"
|
||||
"storj.io/storj/pkg/auth/grpcauth"
|
||||
"storj.io/storj/pkg/cfgstruct"
|
||||
"storj.io/storj/pkg/datarepair/checker"
|
||||
@ -24,6 +25,7 @@ import (
|
||||
"storj.io/storj/pkg/pointerdb"
|
||||
"storj.io/storj/pkg/process"
|
||||
"storj.io/storj/pkg/provider"
|
||||
"storj.io/storj/pkg/statdb"
|
||||
"storj.io/storj/pkg/utils"
|
||||
)
|
||||
|
||||
@ -39,6 +41,8 @@ type Satellite struct {
|
||||
Overlay overlay.Config
|
||||
Checker checker.Config
|
||||
Repairer repairer.Config
|
||||
Audit audit.Config
|
||||
StatDB statdb.Config
|
||||
MockOverlay struct {
|
||||
Enabled bool `default:"true" help:"if false, use real overlay"`
|
||||
Host string `default:"" help:"if set, the mock overlay will return storage nodes with this host"`
|
||||
@ -124,10 +128,15 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
|
||||
o = mock.Config{Nodes: strings.Join(storagenodes, ",")}
|
||||
}
|
||||
|
||||
if runCfg.Satellite.Audit.SatelliteAddr == "" {
|
||||
runCfg.Satellite.Audit.SatelliteAddr = runCfg.Satellite.Identity.Address
|
||||
}
|
||||
errch <- runCfg.Satellite.Identity.Run(ctx,
|
||||
grpcauth.NewAPIKeyInterceptor(),
|
||||
runCfg.Satellite.PointerDB,
|
||||
runCfg.Satellite.Kademlia,
|
||||
runCfg.Satellite.Audit,
|
||||
runCfg.Satellite.StatDB,
|
||||
o,
|
||||
// TODO(coyle): re-enable the checker after we determine why it is panicing
|
||||
// runCfg.Satellite.Checker,
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
// "storj.io/storj/pkg/audit"
|
||||
"storj.io/storj/pkg/auth/grpcauth"
|
||||
"storj.io/storj/pkg/cfgstruct"
|
||||
"storj.io/storj/pkg/kademlia"
|
||||
@ -49,6 +50,7 @@ var (
|
||||
// RepairQueue queue.Config
|
||||
// RepairChecker checker.Config
|
||||
// Repairer repairer.Config
|
||||
// Audit audit.Config
|
||||
}
|
||||
setupCfg struct {
|
||||
BasePath string `default:"$CONFDIR" help:"base path for setup"`
|
||||
@ -79,6 +81,7 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
|
||||
runCfg.PointerDB,
|
||||
o,
|
||||
runCfg.StatDB,
|
||||
// runCfg.Audit,
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -20,8 +20,9 @@ import (
|
||||
|
||||
// Stripe keeps track of a stripe's index and its parent segment
|
||||
type Stripe struct {
|
||||
Index int
|
||||
Segment *pb.Pointer
|
||||
Index int
|
||||
Segment *pb.Pointer
|
||||
Authorization *pb.SignedMessage
|
||||
}
|
||||
|
||||
// Cursor keeps track of audit location in pointer db
|
||||
@ -50,18 +51,19 @@ func (cursor *Cursor) NextStripe(ctx context.Context) (stripe *Stripe, err error
|
||||
} else {
|
||||
pointerItems, more, err = cursor.pointers.List(ctx, "", cursor.lastPath, "", true, 0, meta.None)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// get random pointer
|
||||
if len(pointerItems) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
pointerItem, err := getRandomPointer(pointerItems)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// get path
|
||||
path = pointerItem.Path
|
||||
|
||||
// keep track of last path listed
|
||||
@ -77,24 +79,31 @@ func (cursor *Cursor) NextStripe(ctx context.Context) (stripe *Stripe, err error
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if pointer.GetType() != pb.Pointer_REMOTE {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// create the erasure scheme so we can get the stripe size
|
||||
es, err := makeErasureScheme(pointer.GetRemote().GetRedundancy())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
//get random stripe
|
||||
index, err := getRandomStripe(es, pointer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Stripe{Index: index, Segment: pointer}, nil
|
||||
authorization := cursor.pointers.SignedMessage()
|
||||
|
||||
return &Stripe{Index: index, Segment: pointer, Authorization: authorization}, nil
|
||||
}
|
||||
|
||||
// create the erasure scheme
|
||||
func makeErasureScheme(rs *pb.RedundancyScheme) (eestream.ErasureScheme, error) {
|
||||
fc, err := infectious.NewFEC(int(rs.GetMinReq()), int(rs.GetTotal()))
|
||||
required := int(rs.GetMinReq())
|
||||
total := int(rs.GetTotal())
|
||||
|
||||
fc, err := infectious.NewFEC(required, total)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -6,7 +6,6 @@ package audit
|
||||
import (
|
||||
"context"
|
||||
|
||||
"storj.io/storj/pkg/auth"
|
||||
"storj.io/storj/pkg/provider"
|
||||
proto "storj.io/storj/pkg/statdb/proto"
|
||||
"storj.io/storj/pkg/statdb/sdbclient"
|
||||
@ -23,7 +22,7 @@ type Reporter struct {
|
||||
}
|
||||
|
||||
// NewReporter instantiates a reporter
|
||||
func NewReporter(ctx context.Context, statDBPort string, maxRetries int) (reporter *Reporter, err error) {
|
||||
func NewReporter(ctx context.Context, statDBPort string, maxRetries int, apiKey string) (reporter *Reporter, err error) {
|
||||
ca, err := provider.NewTestCA(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -32,12 +31,8 @@ func NewReporter(ctx context.Context, statDBPort string, maxRetries int) (report
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
apiKey, ok := auth.GetAPIKey(ctx)
|
||||
if !ok {
|
||||
return nil, Error.New("invalid API credentials")
|
||||
}
|
||||
|
||||
client, err := sdbclient.NewClient(identity, statDBPort, apiKey)
|
||||
client, err := sdbclient.NewClient(identity, statDBPort, []byte(apiKey))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -25,30 +25,41 @@ type Service struct {
|
||||
|
||||
// Config contains configurable values for audit service
|
||||
type Config struct {
|
||||
StatDBPort string `help:"port to contact statDB client" default:":9090"`
|
||||
MaxRetriesStatDB int `help:"max number of times to attempt updating a statdb batch" default:"3"`
|
||||
Pointers pdbclient.Client `help:"Pointers for a instantiation of a new service"`
|
||||
Transport transport.Client `help:"Transport for a instantiation of a new service"`
|
||||
Overlay overlay.Client `help:"Overlay for a instantiation of a new service"`
|
||||
ID provider.FullIdentity `help:"ID for a instantiation of a new service"`
|
||||
Interval time.Duration `help:"how frequently segements should audited" default:"30s"`
|
||||
APIKey string `help:"APIKey to access the statdb" default:"abc123"`
|
||||
SatelliteAddr string `help:"address to contact services on the satellite"`
|
||||
MaxRetriesStatDB int `help:"max number of times to attempt updating a statdb batch" default:"3"`
|
||||
Interval time.Duration `help:"how frequently segments are audited" default:"30s"`
|
||||
}
|
||||
|
||||
// Run runs the repairer with the configured values
|
||||
func (c Config) Run(ctx context.Context, server *provider.Provider) (err error) {
|
||||
service, err := NewService(ctx, c.StatDBPort, c.Interval, c.MaxRetriesStatDB, c.Pointers, c.Transport, c.Overlay, c.ID)
|
||||
identity := server.Identity()
|
||||
pointers, err := pdbclient.NewClient(identity, c.SatelliteAddr, c.APIKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return service.Run(ctx)
|
||||
overlay, err := overlay.NewOverlayClient(identity, c.SatelliteAddr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
transport := transport.NewClient(identity)
|
||||
service, err := NewService(ctx, c.SatelliteAddr, c.Interval, c.MaxRetriesStatDB, pointers, transport, overlay, *identity, c.APIKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
err := service.Run(ctx)
|
||||
zap.S().Error("audit service failed to run:", zap.Error(err))
|
||||
}()
|
||||
return server.Run(ctx)
|
||||
}
|
||||
|
||||
// NewService instantiates a Service with access to a Cursor and Verifier
|
||||
func NewService(ctx context.Context, statDBPort string, interval time.Duration, maxRetries int, pointers pdbclient.Client, transport transport.Client, overlay overlay.Client,
|
||||
id provider.FullIdentity) (service *Service, err error) {
|
||||
identity provider.FullIdentity, apiKey string) (service *Service, err error) {
|
||||
cursor := NewCursor(pointers)
|
||||
verifier := NewVerifier(transport, overlay, id)
|
||||
reporter, err := NewReporter(ctx, statDBPort, maxRetries)
|
||||
verifier := NewVerifier(transport, overlay, identity)
|
||||
reporter, err := NewReporter(ctx, statDBPort, maxRetries, apiKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -64,7 +75,6 @@ func NewService(ctx context.Context, statDBPort string, interval time.Duration,
|
||||
// Run runs auditing service
|
||||
func (service *Service) Run(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
zap.S().Info("Audit cron is starting up")
|
||||
|
||||
for {
|
||||
@ -87,6 +97,9 @@ func (service *Service) process(ctx context.Context) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if stripe == nil {
|
||||
return Error.New("stripe was nil")
|
||||
}
|
||||
|
||||
authorization := service.Cursor.pointers.SignedMessage()
|
||||
verifiedNodes, err := service.Verifier.verify(ctx, stripe.Index, stripe.Segment, authorization)
|
||||
@ -95,7 +108,6 @@ func (service *Service) process(ctx context.Context) error {
|
||||
}
|
||||
|
||||
err = service.Reporter.RecordAudits(ctx, verifiedNodes)
|
||||
// TODO: if Error.Has(err) then log the error because it means not all node stats updated
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"storj.io/storj/pkg/provider"
|
||||
sdbproto "storj.io/storj/pkg/statdb/proto"
|
||||
"storj.io/storj/pkg/transport"
|
||||
"storj.io/storj/pkg/utils"
|
||||
)
|
||||
|
||||
var mon = monkit.Package()
|
||||
@ -97,6 +98,7 @@ func (d *defaultDownloader) getShare(ctx context.Context, stripeIndex, shareSize
|
||||
if err != nil {
|
||||
return s, err
|
||||
}
|
||||
defer utils.LogClose(rc)
|
||||
|
||||
buf := make([]byte, shareSize)
|
||||
_, err = io.ReadFull(rc, buf)
|
||||
@ -135,11 +137,11 @@ func (d *defaultDownloader) DownloadShares(ctx context.Context, pointer *pb.Poin
|
||||
paddedSize := calcPadded(pointer.GetSize(), shareSize)
|
||||
pieceSize := paddedSize / int64(pointer.Remote.Redundancy.GetMinReq())
|
||||
|
||||
s, err := d.getShare(ctx, stripeIndex, shareSize, i, pieceID, pieceSize, node, authorization)
|
||||
s, err := d.getShare(ctx, stripeIndex, shareSize, int(pieces[i].PieceNum), pieceID, pieceSize, node, authorization)
|
||||
if err != nil {
|
||||
s = share{
|
||||
Error: err,
|
||||
PieceNumber: i,
|
||||
PieceNumber: int(pieces[i].PieceNum),
|
||||
Data: nil,
|
||||
}
|
||||
}
|
||||
@ -151,20 +153,14 @@ func (d *defaultDownloader) DownloadShares(ctx context.Context, pointer *pb.Poin
|
||||
|
||||
func makeCopies(ctx context.Context, originals []share) (copies []infectious.Share, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
// Have to use []infectious.Share instead of []audit.Share
|
||||
// in order to run the infectious Correct function.
|
||||
copies = make([]infectious.Share, len(originals))
|
||||
for i, original := range originals {
|
||||
|
||||
// If there was an error downloading a share before,
|
||||
// this line makes it so that there will be an empty
|
||||
// infectious.Share at the copies' index (same index
|
||||
// as in the original slice).
|
||||
copies = make([]infectious.Share, 0, len(originals))
|
||||
for _, original := range originals {
|
||||
if original.Error != nil {
|
||||
continue
|
||||
}
|
||||
copies[i].Data = append([]byte(nil), original.Data...)
|
||||
copies[i].Number = original.PieceNumber
|
||||
copies = append(copies, infectious.Share{
|
||||
Data: append([]byte{}, original.Data...),
|
||||
Number: original.PieceNumber})
|
||||
}
|
||||
return copies, nil
|
||||
}
|
||||
@ -177,6 +173,7 @@ func auditShares(ctx context.Context, required, total int, originals []share) (p
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
copies, err := makeCopies(ctx, originals)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -79,6 +79,9 @@ func (sdb *StatDB) Get(ctx context.Context, nodeID []byte) (stats *pb.NodeStats,
|
||||
APIKey: sdb.APIKey,
|
||||
}
|
||||
res, err := sdb.client.Get(ctx, getReq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return res.Stats, err
|
||||
}
|
||||
@ -126,6 +129,9 @@ func (sdb *StatDB) Update(ctx context.Context, nodeID []byte, auditSuccess, isUp
|
||||
}
|
||||
|
||||
res, err := sdb.client.Update(ctx, updateReq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return res.Stats, err
|
||||
}
|
||||
@ -140,6 +146,9 @@ func (sdb *StatDB) UpdateBatch(ctx context.Context, nodes []*pb.Node) (statsList
|
||||
}
|
||||
|
||||
res, err := sdb.client.UpdateBatch(ctx, updateBatchReq)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return res.StatsList, res.FailedNodes, err
|
||||
}
|
||||
@ -154,6 +163,9 @@ func (sdb *StatDB) CreateEntryIfNotExists(ctx context.Context, node *pb.Node) (s
|
||||
}
|
||||
|
||||
res, err := sdb.client.CreateEntryIfNotExists(ctx, createReq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return res.Stats, err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user