diff --git a/pkg/audit/reporter.go b/pkg/audit/reporter.go index 3d69d2562..8a9355581 100644 --- a/pkg/audit/reporter.go +++ b/pkg/audit/reporter.go @@ -28,8 +28,8 @@ type RecordAuditsInfo struct { } // NewReporter instantiates a reporter -func NewReporter(sdb statdb.DB, maxRetries int) (reporter *Reporter, err error) { - return &Reporter{statdb: sdb, maxRetries: maxRetries}, nil +func NewReporter(sdb statdb.DB, maxRetries int) *Reporter { + return &Reporter{statdb: sdb, maxRetries: maxRetries} } // RecordAudits saves failed audit details to statdb diff --git a/pkg/audit/service.go b/pkg/audit/service.go index f69dc693f..ba43cbe44 100644 --- a/pkg/audit/service.go +++ b/pkg/audit/service.go @@ -24,29 +24,25 @@ type Config struct { // Service helps coordinate Cursor and Verifier to run the audit process continuously type Service struct { - log *zap.Logger + log *zap.Logger + Cursor *Cursor Verifier *Verifier Reporter reporter - ticker *time.Ticker + + ticker *time.Ticker } // NewService instantiates a Service with access to a Cursor and Verifier func NewService(log *zap.Logger, sdb statdb.DB, interval time.Duration, maxRetries int, pointers *pointerdb.Service, allocation *pointerdb.AllocationSigner, transport transport.Client, overlay *overlay.Cache, identity *identity.FullIdentity) (service *Service, err error) { - // TODO: instead of overlay.Client use overlay.Service - cursor := NewCursor(pointers, allocation, identity) - verifier := NewVerifier(transport, overlay, identity) - reporter, err := NewReporter(sdb, maxRetries) - if err != nil { - return nil, err - } - return &Service{ - log: log, - Cursor: cursor, - Verifier: verifier, - Reporter: reporter, - ticker: time.NewTicker(interval), + log: log, + // TODO: instead of overlay.Client use overlay.Service + Cursor: NewCursor(pointers, allocation, identity), + Verifier: NewVerifier(transport, overlay, identity), + Reporter: NewReporter(sdb, maxRetries), + + ticker: time.NewTicker(interval), }, nil } @@ -54,7 +50,6 @@ func NewService(log *zap.Logger, sdb statdb.DB, interval time.Duration, maxRetri func (service *Service) Run(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) service.log.Info("Audit cron is starting up") - for { err := service.process(ctx) if err != nil { diff --git a/pkg/certificates/certificates.go b/pkg/certificates/certificates.go index 4d8ad13f1..4c38590bf 100644 --- a/pkg/certificates/certificates.go +++ b/pkg/certificates/certificates.go @@ -18,6 +18,7 @@ import ( "github.com/btcsuite/btcutil/base58" "github.com/zeebo/errs" "go.uber.org/zap" + "google.golang.org/grpc" "google.golang.org/grpc/peer" monkit "gopkg.in/spacemonkeygo/monkit.v2" @@ -102,6 +103,7 @@ type Claim struct { // Client implements pb.CertificateClient type Client struct { + conn *grpc.ClientConn client pb.CertificatesClient } @@ -129,6 +131,7 @@ func NewClient(ctx context.Context, ident *identity.FullIdentity, address string } return &Client{ + conn: conn, client: pb.NewCertificatesClient(conn), }, nil } @@ -182,9 +185,17 @@ func ParseToken(tokenString string) (*Token, error) { return t, nil } +// Close closes the client +func (c *Client) Close() error { + if c.conn != nil { + return c.conn.Close() + } + return nil +} + // Sign claims an authorization using the token string and returns a signed // copy of the client's CA certificate -func (c Client) Sign(ctx context.Context, tokenStr string) ([][]byte, error) { +func (c *Client) Sign(ctx context.Context, tokenStr string) ([][]byte, error) { res, err := c.client.Sign(ctx, &pb.SigningRequest{ AuthToken: tokenStr, Timestamp: time.Now().Unix(), diff --git a/pkg/certificates/certificates_test.go b/pkg/certificates/certificates_test.go index 9255886a8..788061637 100644 --- a/pkg/certificates/certificates_test.go +++ b/pkg/certificates/certificates_test.go @@ -10,13 +10,12 @@ import ( "encoding/gob" "fmt" "net" - "path/filepath" - "strings" "testing" "time" "github.com/btcsuite/btcutil/base58" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/zeebo/errs" "go.uber.org/zap" "google.golang.org/grpc/credentials" @@ -48,10 +47,9 @@ var ( func TestCertSignerConfig_NewAuthDB(t *testing.T) { ctx := testcontext.New(t) defer ctx.Cleanup() + authDB, err := newTestAuthDB(ctx) - if !assert.NoError(t, err) { - t.Fatal(err) - } + require.NoError(t, err) defer ctx.Check(authDB.Close) assert.NotNil(t, authDB) @@ -61,10 +59,9 @@ func TestCertSignerConfig_NewAuthDB(t *testing.T) { func TestAuthorizationDB_Create(t *testing.T) { ctx := testcontext.New(t) defer ctx.Cleanup() + authDB, err := newTestAuthDB(ctx) - if !assert.NoError(t, err) { - t.Fatal(err) - } + require.NoError(t, err) defer ctx.Check(authDB.Close) cases := []struct { @@ -118,9 +115,7 @@ func TestAuthorizationDB_Create(t *testing.T) { var existingAuths Authorizations err = existingAuths.Unmarshal(v) assert.NoError(t, err) - if !assert.Len(t, existingAuths, c.startCount) { - t.FailNow() - } + require.Len(t, existingAuths, c.startCount) } expectedAuths, err := authDB.Create(c.email, c.incCount) @@ -149,15 +144,11 @@ func TestAuthorizationDB_Create(t *testing.T) { func TestAuthorizationDB_Get(t *testing.T) { ctx := testcontext.New(t) + defer ctx.Cleanup() + authDB, err := newTestAuthDB(ctx) - if !assert.NoError(t, err) { - t.Fatal(err) - } - defer func() { - err := authDB.Close() - assert.NoError(t, err) - ctx.Cleanup() - }() + require.NoError(t, err) + defer ctx.Check(authDB.Close) var expectedAuths Authorizations for i := 0; i < 5; i++ { @@ -165,14 +156,12 @@ func TestAuthorizationDB_Get(t *testing.T) { Token: t1, }) } + authsBytes, err := expectedAuths.Marshal() - if !assert.NoError(t, err) { - t.Fatal(err) - } + require.NoError(t, err) + err = authDB.DB.Put(storage.Key("user@example.com"), authsBytes) - if !assert.NoError(t, err) { - t.Fatal(err) - } + require.NoError(t, err) cases := []struct { testID, @@ -207,26 +196,21 @@ func TestAuthorizationDB_Get(t *testing.T) { func TestAuthorizationDB_Claim_Valid(t *testing.T) { ctx := testcontext.New(t) - userID := "user@example.com" + defer ctx.Cleanup() + authDB, err := newTestAuthDB(ctx) - if !assert.NoError(t, err) || !assert.NotNil(t, authDB) { - t.Fatal(err) - } - defer func() { - err := authDB.Close() - assert.NoError(t, err) - ctx.Cleanup() - }() + require.NoError(t, err) + defer ctx.Check(authDB.Close) + + userID := "user@example.com" auths, err := authDB.Create(userID, 1) - if !assert.NoError(t, err) || !assert.NotEmpty(t, auths) { - t.Fatal(err) - } + require.NoError(t, err) + require.NotEmpty(t, auths) ident, err := testidentity.NewTestIdentity(ctx) - if !assert.NoError(t, err) || !assert.NotNil(t, ident) { - t.Fatal(err) - } + require.NoError(t, err) + require.NotNil(t, ident) addr := &net.TCPAddr{ IP: net.ParseIP("1.2.3.4"), @@ -247,9 +231,7 @@ func TestAuthorizationDB_Claim_Valid(t *testing.T) { Timestamp: now, } difficulty, err := ident.ID.Difficulty() - if !assert.NoError(t, err) { - t.Fatal(err) - } + require.NoError(t, err) err = authDB.Claim(&ClaimOpts{ Req: req, @@ -257,19 +239,15 @@ func TestAuthorizationDB_Claim_Valid(t *testing.T) { ChainBytes: [][]byte{ident.CA.Raw}, MinDifficulty: difficulty, }) - if !assert.NoError(t, err) { - t.Fatal(err) - } + require.NoError(t, err) updatedAuths, err := authDB.Get(userID) - if !assert.NoError(t, err) || !assert.NotEmpty(t, updatedAuths) { - t.Fatal(err) - } + require.NoError(t, err) + require.NotEmpty(t, updatedAuths) assert.Equal(t, auths[0].Token, updatedAuths[0].Token) - if !assert.NotNil(t, updatedAuths[0].Claim) { - t.FailNow() - } + require.NotNil(t, updatedAuths[0].Claim) + claim := updatedAuths[0].Claim assert.Equal(t, grpcPeer.Addr.String(), claim.Addr) assert.Equal(t, [][]byte{ident.CA.Raw}, claim.SignedChainBytes) @@ -281,32 +259,28 @@ func TestAuthorizationDB_Claim_Valid(t *testing.T) { func TestAuthorizationDB_Claim_Invalid(t *testing.T) { ctx := testcontext.New(t) + defer ctx.Cleanup() + + authDB, err := newTestAuthDB(ctx) + require.NoError(t, err) + defer ctx.Check(authDB.Close) + userID := "user@example.com" claimedTime := int64(1000000) claimedAddr := "6.7.8.9:0" + ident1, err := testidentity.NewTestIdentity(ctx) - if !assert.NoError(t, err) || !assert.NotNil(t, ident1) { - t.Fatal(err) - } + require.NoError(t, err) + require.NotNil(t, ident1) + claimedIdent := &identity.PeerIdentity{ CA: ident1.CA, Leaf: ident1.Leaf, } - authDB, err := newTestAuthDB(ctx) - if !assert.NoError(t, err) || !assert.NotNil(t, authDB) { - t.Fatal(err) - } - defer func() { - err := authDB.Close() - assert.NoError(t, err) - ctx.Cleanup() - }() - auths, err := authDB.Create(userID, 2) - if !assert.NoError(t, err) || !assert.NotEmpty(t, auths) { - t.Fatal(err) - } + require.NoError(t, err) + require.NotEmpty(t, auths) claimedIndex, unclaimedIndex := 0, 1 @@ -317,14 +291,11 @@ func TestAuthorizationDB_Claim_Invalid(t *testing.T) { SignedChainBytes: [][]byte{claimedIdent.CA.Raw}, } err = authDB.put(userID, auths) - if !assert.NoError(t, err) { - t.Fatal(err) - } + require.NoError(t, err) ident2, err := testidentity.NewTestIdentity(ctx) - if !assert.NoError(t, err) || !assert.NotNil(t, ident2) { - t.Fatal(err) - } + require.NoError(t, err) + require.NotNil(t, ident2) addr := &net.TCPAddr{ IP: net.ParseIP("1.2.3.4"), @@ -340,9 +311,7 @@ func TestAuthorizationDB_Claim_Invalid(t *testing.T) { } difficulty2, err := ident2.ID.Difficulty() - if !assert.NoError(t, err) { - t.Fatal(err) - } + require.NoError(t, err) t.Run("double claim", func(t *testing.T) { err = authDB.Claim(&ClaimOpts{ @@ -361,9 +330,9 @@ func TestAuthorizationDB_Claim_Invalid(t *testing.T) { } updatedAuths, err := authDB.Get(userID) - if !assert.NoError(t, err) || !assert.NotEmpty(t, updatedAuths) { - t.Fatal(err) - } + require.NoError(t, err) + require.NotEmpty(t, updatedAuths) + assert.Equal(t, auths[claimedIndex].Token, updatedAuths[claimedIndex].Token) claim := updatedAuths[claimedIndex].Claim @@ -390,9 +359,8 @@ func TestAuthorizationDB_Claim_Invalid(t *testing.T) { } updatedAuths, err := authDB.Get(userID) - if !assert.NoError(t, err) || !assert.NotEmpty(t, updatedAuths) { - t.Fatal(err) - } + require.NoError(t, err) + require.NotEmpty(t, updatedAuths) assert.Equal(t, auths[unclaimedIndex].Token, updatedAuths[unclaimedIndex].Token) assert.Nil(t, updatedAuths[unclaimedIndex].Claim) @@ -415,9 +383,8 @@ func TestAuthorizationDB_Claim_Invalid(t *testing.T) { } updatedAuths, err := authDB.Get(userID) - if !assert.NoError(t, err) || !assert.NotEmpty(t, updatedAuths) { - t.Fatal(err) - } + require.NoError(t, err) + require.NotEmpty(t, updatedAuths) assert.Equal(t, auths[unclaimedIndex].Token, updatedAuths[unclaimedIndex].Token) assert.Nil(t, updatedAuths[unclaimedIndex].Claim) @@ -427,10 +394,9 @@ func TestAuthorizationDB_Claim_Invalid(t *testing.T) { func TestNewAuthorization(t *testing.T) { userID := "user@example.com" auth, err := NewAuthorization(userID) - assert.NoError(t, err) - if !assert.NotNil(t, auth) { - t.FailNow() - } + require.NoError(t, err) + require.NotNil(t, auth) + assert.NotZero(t, auth.Token) assert.Equal(t, userID, auth.Token.UserID) assert.NotEmpty(t, auth.Token.Data) @@ -499,15 +465,11 @@ func TestAuthorizations_Group(t *testing.T) { func TestAuthorizationDB_Emails(t *testing.T) { ctx := testcontext.New(t) + defer ctx.Cleanup() + authDB, err := newTestAuthDB(ctx) - if !assert.NoError(t, err) { - t.Fatal(err) - } - defer func() { - err = authDB.Close() - assert.NoError(t, err) - ctx.Cleanup() - }() + require.NoError(t, err) + defer ctx.Check(authDB.Close) var authErrs utils.ErrorGroup for i := 0; i < 5; i++ { @@ -517,9 +479,7 @@ func TestAuthorizationDB_Emails(t *testing.T) { } } err = authErrs.Finish() - if !assert.NoError(t, err) { - t.Fatal(err) - } + require.NoError(t, err) userIDs, err := authDB.UserIDs() assert.NoError(t, err) @@ -549,11 +509,9 @@ func TestParseToken_Valid(t *testing.T) { b58Data := base58.CheckEncode(data[:], tokenVersion) tokenString := c.userID + tokenDelimiter + b58Data token, err := ParseToken(tokenString) + require.NoError(t, err) + require.NotNil(t, token) - assert.NoError(t, err) - if !assert.NotNil(t, token) { - t.FailNow() - } assert.Equal(t, c.userID, token.UserID) assert.Equal(t, data[:], token.Data[:]) }) @@ -607,10 +565,10 @@ func TestToken_Equal(t *testing.T) { // TODO: test sad path func TestCertificateSigner_Sign_E2E(t *testing.T) { ctx := testcontext.New(t) - tmp := ctx.Dir() defer ctx.Cleanup() - caCert := filepath.Join(tmp, "ca.cert") - caKey := filepath.Join(tmp, "ca.key") + + caCert := ctx.File("ca.cert") + caKey := ctx.File("ca.key") userID := "user@example.com" caSetupConfig := identity.CASetupConfig{ CertPath: caCert, @@ -621,27 +579,21 @@ func TestCertificateSigner_Sign_E2E(t *testing.T) { KeyPath: caKey, } config := CertServerConfig{ - AuthorizationDBURL: "bolt://" + filepath.Join(tmp, "authorizations.db"), + AuthorizationDBURL: "bolt://" + ctx.File("authorizations.db"), CA: caConfig, } signingCA, err := caSetupConfig.Create(ctx, nil) - if !assert.NoError(t, err) { - t.Fatal(err) - } + require.NoError(t, err) authDB, err := config.NewAuthDB() - if !assert.NoError(t, err) || !assert.NotNil(t, authDB) { - t.Fatal(err) - } + require.NoError(t, err) auths, err := authDB.Create("user@example.com", 1) - if !assert.NoError(t, err) || !assert.NotEmpty(t, auths) { - t.Fatal(err) - } + require.NoError(t, err) + require.NotEmpty(t, auths) + err = authDB.Close() - if !assert.NoError(t, err) { - t.Fatal(err) - } + require.NoError(t, err) // TODO(bryanchriswhite): figure out why pregenerated // identities change issuers when signed @@ -654,30 +606,26 @@ func TestCertificateSigner_Sign_E2E(t *testing.T) { //serverIdent, err := idents.NewIdentity() //------ serverCA, err := testidentity.NewTestCA(ctx) - if !assert.NoError(t, err) || !assert.NotNil(t, serverCA) { - t.Fatal(err) - } + require.NoError(t, err) + require.NotNil(t, serverCA) + serverIdent, err := serverCA.NewIdentity() //------ - if !assert.NoError(t, err) || !assert.NotNil(t, serverIdent) { - t.Fatal(err) - } + require.NoError(t, err) + require.NotNil(t, serverIdent) listener, err := net.Listen("tcp", "127.0.0.1:0") - if !assert.NoError(t, err) || !assert.NotNil(t, listener) { - t.Fatal(err) - } + require.NoError(t, err) + require.NotNil(t, listener) serverConfig := server.Config{Address: listener.Addr().String()} opts, err := server.NewOptions(serverIdent, serverConfig) - if !assert.NoError(t, err) || !assert.NotNil(t, opts) { - t.Fatal(err) - } + require.NoError(t, err) + require.NotNil(t, opts) service, err := server.New(opts, listener, nil, config) - if !assert.NoError(t, err) || !assert.NotNil(t, service) { - t.Fatal(err) - } + require.NoError(t, err) + require.NotNil(t, service) ctx.Go(func() error { err := service.Run(ctx) @@ -700,29 +648,23 @@ func TestCertificateSigner_Sign_E2E(t *testing.T) { //clientIdent, err := idents.NewIdentity() //------ clientCA, err := testidentity.NewTestCA(ctx) - if !assert.NoError(t, err) || !assert.NotNil(t, clientCA) { - t.Fatal(err) - } + require.NoError(t, err) + require.NotNil(t, clientCA) clientIdent, err := clientCA.NewIdentity() //------ - if !assert.NoError(t, err) || !assert.NotNil(t, clientIdent) { - t.Fatal(err) - } + require.NoError(t, err) + require.NotNil(t, clientIdent) client, err := NewClient(ctx, clientIdent, listener.Addr().String()) - if !assert.NoError(t, err) || !assert.NotNil(t, client) { - t.Fatal(err) - } + require.NoError(t, err) + require.NotNil(t, client) signedChainBytes, err := client.Sign(ctx, auths[0].Token.String()) - if !assert.NoError(t, err) || !assert.NotEmpty(t, signedChainBytes) { - t.Fatal(err) - } + require.NoError(t, err) + require.NotEmpty(t, signedChainBytes) signedChain, err := identity.ParseCertChain(signedChainBytes) - if !assert.NoError(t, err) { - t.Fatal(err) - } + require.NoError(t, err) assert.Equal(t, clientIdent.CA.RawTBSCertificate, signedChain[0].RawTBSCertificate) assert.Equal(t, signingCA.Cert.Raw, signedChainBytes[1]) @@ -737,26 +679,25 @@ func TestCertificateSigner_Sign_E2E(t *testing.T) { // NB: re-open after closing for server authDB, err = config.NewAuthDB() - if !assert.NoError(t, err) || !assert.NotNil(t, authDB) { - t.Fatal(err) - } - defer func() { - err := authDB.Close() - assert.NoError(t, err) - }() + require.NoError(t, err) + require.NotNil(t, authDB) + + defer ctx.Check(authDB.Close) updatedAuths, err := authDB.Get(userID) - if !assert.NoError(t, err) || - !assert.NotEmpty(t, updatedAuths) || - !assert.NotNil(t, updatedAuths[0].Claim) { - t.Fatal(err) - } + require.NoError(t, err) + require.NotEmpty(t, updatedAuths) + require.NotNil(t, updatedAuths[0].Claim) now := time.Now().Unix() claim := updatedAuths[0].Claim - assert.Equal(t, - strings.Split(listener.Addr().String(), ":")[0], - strings.Split(claim.Addr, ":")[0]) + + listenerHost, _, err := net.SplitHostPort(listener.Addr().String()) + require.NoError(t, err) + claimHost, _, err := net.SplitHostPort(claim.Addr) + require.NoError(t, err) + + assert.Equal(t, listenerHost, claimHost) assert.Equal(t, signedChainBytes, claim.SignedChainBytes) assert.Condition(t, func() bool { return now-10 < claim.Timestamp && @@ -765,77 +706,85 @@ func TestCertificateSigner_Sign_E2E(t *testing.T) { } func TestNewClient(t *testing.T) { - ctx := testcontext.New(t) - ident, err := idents.NewIdentity() - if !assert.NoError(t, err) || !assert.NotNil(t, ident) { - t.Fatal(err) - } + t.Skip("needs proper grpc listener to work") - client, err := NewClient(ctx, ident, "") - assert.NoError(t, err) - assert.NotNil(t, client) -} - -func TestNewClientFrom(t *testing.T) { ctx := testcontext.New(t) + defer ctx.Cleanup() + ident, err := idents.NewIdentity() - if !assert.NoError(t, err) || !assert.NotNil(t, ident) { - t.Fatal(err) - } + require.NoError(t, err) + require.NotNil(t, ident) listener, err := net.Listen("tcp", "127.0.0.1:0") - if !assert.NoError(t, err) || !assert.NotNil(t, listener) { - t.Fatal(err) - } + require.NoError(t, err) + require.NotNil(t, listener) - tc := transport.NewClient(ident) - conn, err := tc.DialAddress(ctx, listener.Addr().String()) - if !assert.NoError(t, err) || !assert.NotNil(t, conn) { - t.Fatal(err) - } + defer ctx.Check(listener.Close) + ctx.Go(func() error { + for { + conn, err := listener.Accept() + if err != nil { + return nil + } + if err := conn.Close(); err != nil { + return err + } + } + }) - pbClient := pb.NewCertificatesClient(conn) - if !assert.NotNil(t, pbClient) { - t.FailNow() - } + t.Run("Basic", func(t *testing.T) { + client, err := NewClient(ctx, ident, listener.Addr().String()) + assert.NoError(t, err) + assert.NotNil(t, client) - client, err := NewClientFrom(pbClient) - assert.NoError(t, err) - assert.NotNil(t, client) + defer ctx.Check(client.Close) + }) + + t.Run("ClientFrom", func(t *testing.T) { + tc := transport.NewClient(ident) + conn, err := tc.DialAddress(ctx, listener.Addr().String()) + require.NoError(t, err) + require.NotNil(t, conn) + + defer ctx.Check(conn.Close) + + pbClient := pb.NewCertificatesClient(conn) + require.NotNil(t, pbClient) + + client, err := NewClientFrom(pbClient) + assert.NoError(t, err) + assert.NotNil(t, client) + + defer ctx.Check(client.Close) + }) } func TestCertificateSigner_Sign(t *testing.T) { ctx := testcontext.New(t) - tmp := ctx.Dir() defer ctx.Cleanup() - caCert := filepath.Join(tmp, "ca.cert") - caKey := filepath.Join(tmp, "ca.key") + + caCert := ctx.File("ca.cert") + caKey := ctx.File("ca.key") userID := "user@example.com" caSetupConfig := identity.CASetupConfig{ CertPath: caCert, KeyPath: caKey, } config := CertServerConfig{ - AuthorizationDBURL: "bolt://" + filepath.Join(tmp, "authorizations.db"), + AuthorizationDBURL: "bolt://" + ctx.File("authorizations.db"), } signingCA, err := caSetupConfig.Create(ctx, nil) - if !assert.NoError(t, err) { - t.Fatal(err) - } + require.NoError(t, err) authDB, err := config.NewAuthDB() - if !assert.NoError(t, err) || !assert.NotNil(t, authDB) { - t.Fatal(err) - } - defer func() { - err := authDB.Close() - assert.NoError(t, err) - }() + require.NoError(t, err) + require.NotNil(t, authDB) + + defer ctx.Check(authDB.Close) auths, err := authDB.Create(userID, 1) - if !assert.NoError(t, err) || !assert.NotEmpty(t, auths) { - t.Fatal(err) - } + require.NoError(t, err) + require.NotEmpty(t, auths) // TODO(bryanchriswhite): figure out why pregenerated // identities change issuers when signed @@ -848,14 +797,13 @@ func TestCertificateSigner_Sign(t *testing.T) { //clientIdent, err := idents.NewIdentity() //------ clientCA, err := testidentity.NewTestCA(ctx) - if !assert.NoError(t, err) || !assert.NotNil(t, clientCA) { - t.Fatal(err) - } + require.NoError(t, err) + require.NotNil(t, clientCA) + clientIdent, err := clientCA.NewIdentity() + require.NoError(t, err) + require.NotNil(t, clientIdent) //------ - if !assert.NoError(t, err) || !assert.NotNil(t, clientIdent) { - t.Fatal(err) - } expectedAddr := &net.TCPAddr{ IP: net.ParseIP("1.2.3.4"), @@ -871,25 +819,18 @@ func TestCertificateSigner_Sign(t *testing.T) { } peerCtx := peer.NewContext(ctx, grpcPeer) - certSigner := NewServer( - zap.L(), - signingCA, - authDB, - 0, - ) + certSigner := NewServer(zap.L(), signingCA, authDB, 0) req := pb.SigningRequest{ Timestamp: time.Now().Unix(), AuthToken: auths[0].Token.String(), } res, err := certSigner.Sign(peerCtx, &req) - if !assert.NoError(t, err) || !assert.NotNil(t, res) || !assert.NotEmpty(t, res.Chain) { - t.Fatal(err) - } + require.NoError(t, err) + require.NotNil(t, res) + require.NotEmpty(t, res.Chain) signedChain, err := identity.ParseCertChain(res.Chain) - if !assert.NoError(t, err) { - t.Fatal(err) - } + require.NoError(t, err) assert.Equal(t, clientIdent.CA.RawTBSCertificate, signedChain[0].RawTBSCertificate) assert.Equal(t, signingCA.Cert.Raw, signedChain[1].Raw) @@ -900,11 +841,9 @@ func TestCertificateSigner_Sign(t *testing.T) { assert.NoError(t, err) updatedAuths, err := authDB.Get(userID) - if !assert.NoError(t, err) || - !assert.NotEmpty(t, updatedAuths) || - !assert.NotNil(t, updatedAuths[0].Claim) { - t.Fatal(err) - } + require.NoError(t, err) + require.NotEmpty(t, updatedAuths) + require.NotNil(t, updatedAuths[0].Claim) now := time.Now().Unix() claim := updatedAuths[0].Claim @@ -917,7 +856,7 @@ func TestCertificateSigner_Sign(t *testing.T) { } func newTestAuthDB(ctx *testcontext.Context) (*AuthorizationDB, error) { - dbPath := "bolt://" + filepath.Join(ctx.Dir(), "authorizations.db") + dbPath := "bolt://" + ctx.File("authorizations.db") config := CertServerConfig{ AuthorizationDBURL: dbPath, } diff --git a/pkg/datarepair/repairer/repairer.go b/pkg/datarepair/repairer/repairer.go index ba5f83446..61a82c509 100644 --- a/pkg/datarepair/repairer/repairer.go +++ b/pkg/datarepair/repairer/repairer.go @@ -11,6 +11,7 @@ import ( "storj.io/storj/internal/sync2" "storj.io/storj/pkg/datarepair/queue" + "storj.io/storj/pkg/identity" "storj.io/storj/pkg/storj" "storj.io/storj/storage" ) @@ -23,16 +24,19 @@ type SegmentRepairer interface { // Service contains the information needed to run the repair service type Service struct { queue queue.RepairQueue + config *Config + identity *identity.FullIdentity repairer SegmentRepairer limiter *sync2.Limiter ticker *time.Ticker } // NewService creates repairing service -func NewService(queue queue.RepairQueue, repairer SegmentRepairer, interval time.Duration, concurrency int) *Service { +func NewService(queue queue.RepairQueue, config *Config, identity *identity.FullIdentity, interval time.Duration, concurrency int) *Service { return &Service{ queue: queue, - repairer: repairer, + config: config, + identity: identity, limiter: sync2.NewLimiter(concurrency), ticker: time.NewTicker(interval), } @@ -45,6 +49,12 @@ func (service *Service) Close() error { return nil } func (service *Service) Run(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) + // TODO: close segment repairer, currently this leaks connections + service.repairer, err = service.config.GetSegmentRepairer(ctx, service.identity) + if err != nil { + return err + } + // wait for all repairs to complete defer service.limiter.Wait() diff --git a/pkg/overlay/client_test.go b/pkg/overlay/client_test.go index 3bc3d3982..3ef6f9a7d 100644 --- a/pkg/overlay/client_test.go +++ b/pkg/overlay/client_test.go @@ -11,49 +11,23 @@ import ( "github.com/stretchr/testify/require" "storj.io/storj/internal/testcontext" - "storj.io/storj/internal/testidentity" "storj.io/storj/internal/testplanet" "storj.io/storj/pkg/overlay" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/storj" ) -func TestNewClient(t *testing.T) { - ctx := testcontext.New(t) - defer ctx.Cleanup() - - cases := []struct { - address string - }{ - { - address: "127.0.0.1:8080", - }, - } - - for _, v := range cases { - ca, err := testidentity.NewTestCA(ctx) - assert.NoError(t, err) - identity, err := ca.NewIdentity() - assert.NoError(t, err) - - oc, err := overlay.NewClient(identity, v.address) - assert.NoError(t, err) - - assert.NotNil(t, oc) - } -} - func TestChoose(t *testing.T) { ctx := testcontext.New(t) defer ctx.Cleanup() planet, err := testplanet.New(t, 1, 8, 1) require.NoError(t, err) + defer ctx.Check(planet.Shutdown) planet.Start(ctx) // we wait a second for all the nodes to complete bootstrapping off the satellite time.Sleep(2 * time.Second) - defer ctx.Check(planet.Shutdown) oc, err := planet.Uplinks[0].DialOverlay(planet.Satellites[0]) if err != nil { @@ -93,11 +67,11 @@ func TestLookup(t *testing.T) { planet, err := testplanet.New(t, 1, 4, 1) require.NoError(t, err) + defer ctx.Check(planet.Shutdown) planet.Start(ctx) // we wait a second for all the nodes to complete bootstrapping off the satellite time.Sleep(2 * time.Second) - defer ctx.Check(planet.Shutdown) oc, err := planet.Uplinks[0].DialOverlay(planet.Satellites[0]) if err != nil { diff --git a/satellite/peer.go b/satellite/peer.go index 7d180f9fe..0c71111dd 100644 --- a/satellite/peer.go +++ b/satellite/peer.go @@ -305,13 +305,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config) (* 0, peer.Log.Named("checker"), config.Checker.Interval) - // TODO: close segment repairer, currently this leaks connections - segmentRepairer, err := config.Repairer.GetSegmentRepairer(context.TODO(), peer.Identity) - if err != nil { - return nil, errs.Combine(err, peer.Close()) - } - - peer.Repair.Repairer = repairer.NewService(peer.DB.RepairQueue(), segmentRepairer, config.Repairer.Interval, config.Repairer.MaxRepair) + peer.Repair.Repairer = repairer.NewService(peer.DB.RepairQueue(), &config.Repairer, peer.Identity, config.Repairer.Interval, config.Repairer.MaxRepair) } { // setup audit