Fix issues with blocking during startup (#1212)

This commit is contained in:
Egon Elbre 2019-02-01 19:28:40 +02:00 committed by GitHub
parent 3e3088e37c
commit 5a63c00442
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 215 additions and 292 deletions

View File

@ -28,8 +28,8 @@ type RecordAuditsInfo struct {
}
// NewReporter instantiates a reporter
func NewReporter(sdb statdb.DB, maxRetries int) (reporter *Reporter, err error) {
return &Reporter{statdb: sdb, maxRetries: maxRetries}, nil
func NewReporter(sdb statdb.DB, maxRetries int) *Reporter {
return &Reporter{statdb: sdb, maxRetries: maxRetries}
}
// RecordAudits saves failed audit details to statdb

View File

@ -24,29 +24,25 @@ type Config struct {
// Service helps coordinate Cursor and Verifier to run the audit process continuously
type Service struct {
log *zap.Logger
log *zap.Logger
Cursor *Cursor
Verifier *Verifier
Reporter reporter
ticker *time.Ticker
ticker *time.Ticker
}
// NewService instantiates a Service with access to a Cursor and Verifier
func NewService(log *zap.Logger, sdb statdb.DB, interval time.Duration, maxRetries int, pointers *pointerdb.Service, allocation *pointerdb.AllocationSigner, transport transport.Client, overlay *overlay.Cache, identity *identity.FullIdentity) (service *Service, err error) {
// TODO: instead of overlay.Client use overlay.Service
cursor := NewCursor(pointers, allocation, identity)
verifier := NewVerifier(transport, overlay, identity)
reporter, err := NewReporter(sdb, maxRetries)
if err != nil {
return nil, err
}
return &Service{
log: log,
Cursor: cursor,
Verifier: verifier,
Reporter: reporter,
ticker: time.NewTicker(interval),
log: log,
// TODO: instead of overlay.Client use overlay.Service
Cursor: NewCursor(pointers, allocation, identity),
Verifier: NewVerifier(transport, overlay, identity),
Reporter: NewReporter(sdb, maxRetries),
ticker: time.NewTicker(interval),
}, nil
}
@ -54,7 +50,6 @@ func NewService(log *zap.Logger, sdb statdb.DB, interval time.Duration, maxRetri
func (service *Service) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
service.log.Info("Audit cron is starting up")
for {
err := service.process(ctx)
if err != nil {

View File

@ -18,6 +18,7 @@ import (
"github.com/btcsuite/btcutil/base58"
"github.com/zeebo/errs"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/peer"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
@ -102,6 +103,7 @@ type Claim struct {
// Client implements pb.CertificateClient
type Client struct {
conn *grpc.ClientConn
client pb.CertificatesClient
}
@ -129,6 +131,7 @@ func NewClient(ctx context.Context, ident *identity.FullIdentity, address string
}
return &Client{
conn: conn,
client: pb.NewCertificatesClient(conn),
}, nil
}
@ -182,9 +185,17 @@ func ParseToken(tokenString string) (*Token, error) {
return t, nil
}
// Close closes the client
func (c *Client) Close() error {
if c.conn != nil {
return c.conn.Close()
}
return nil
}
// Sign claims an authorization using the token string and returns a signed
// copy of the client's CA certificate
func (c Client) Sign(ctx context.Context, tokenStr string) ([][]byte, error) {
func (c *Client) Sign(ctx context.Context, tokenStr string) ([][]byte, error) {
res, err := c.client.Sign(ctx, &pb.SigningRequest{
AuthToken: tokenStr,
Timestamp: time.Now().Unix(),

View File

@ -10,13 +10,12 @@ import (
"encoding/gob"
"fmt"
"net"
"path/filepath"
"strings"
"testing"
"time"
"github.com/btcsuite/btcutil/base58"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/zeebo/errs"
"go.uber.org/zap"
"google.golang.org/grpc/credentials"
@ -48,10 +47,9 @@ var (
func TestCertSignerConfig_NewAuthDB(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
authDB, err := newTestAuthDB(ctx)
if !assert.NoError(t, err) {
t.Fatal(err)
}
require.NoError(t, err)
defer ctx.Check(authDB.Close)
assert.NotNil(t, authDB)
@ -61,10 +59,9 @@ func TestCertSignerConfig_NewAuthDB(t *testing.T) {
func TestAuthorizationDB_Create(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
authDB, err := newTestAuthDB(ctx)
if !assert.NoError(t, err) {
t.Fatal(err)
}
require.NoError(t, err)
defer ctx.Check(authDB.Close)
cases := []struct {
@ -118,9 +115,7 @@ func TestAuthorizationDB_Create(t *testing.T) {
var existingAuths Authorizations
err = existingAuths.Unmarshal(v)
assert.NoError(t, err)
if !assert.Len(t, existingAuths, c.startCount) {
t.FailNow()
}
require.Len(t, existingAuths, c.startCount)
}
expectedAuths, err := authDB.Create(c.email, c.incCount)
@ -149,15 +144,11 @@ func TestAuthorizationDB_Create(t *testing.T) {
func TestAuthorizationDB_Get(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
authDB, err := newTestAuthDB(ctx)
if !assert.NoError(t, err) {
t.Fatal(err)
}
defer func() {
err := authDB.Close()
assert.NoError(t, err)
ctx.Cleanup()
}()
require.NoError(t, err)
defer ctx.Check(authDB.Close)
var expectedAuths Authorizations
for i := 0; i < 5; i++ {
@ -165,14 +156,12 @@ func TestAuthorizationDB_Get(t *testing.T) {
Token: t1,
})
}
authsBytes, err := expectedAuths.Marshal()
if !assert.NoError(t, err) {
t.Fatal(err)
}
require.NoError(t, err)
err = authDB.DB.Put(storage.Key("user@example.com"), authsBytes)
if !assert.NoError(t, err) {
t.Fatal(err)
}
require.NoError(t, err)
cases := []struct {
testID,
@ -207,26 +196,21 @@ func TestAuthorizationDB_Get(t *testing.T) {
func TestAuthorizationDB_Claim_Valid(t *testing.T) {
ctx := testcontext.New(t)
userID := "user@example.com"
defer ctx.Cleanup()
authDB, err := newTestAuthDB(ctx)
if !assert.NoError(t, err) || !assert.NotNil(t, authDB) {
t.Fatal(err)
}
defer func() {
err := authDB.Close()
assert.NoError(t, err)
ctx.Cleanup()
}()
require.NoError(t, err)
defer ctx.Check(authDB.Close)
userID := "user@example.com"
auths, err := authDB.Create(userID, 1)
if !assert.NoError(t, err) || !assert.NotEmpty(t, auths) {
t.Fatal(err)
}
require.NoError(t, err)
require.NotEmpty(t, auths)
ident, err := testidentity.NewTestIdentity(ctx)
if !assert.NoError(t, err) || !assert.NotNil(t, ident) {
t.Fatal(err)
}
require.NoError(t, err)
require.NotNil(t, ident)
addr := &net.TCPAddr{
IP: net.ParseIP("1.2.3.4"),
@ -247,9 +231,7 @@ func TestAuthorizationDB_Claim_Valid(t *testing.T) {
Timestamp: now,
}
difficulty, err := ident.ID.Difficulty()
if !assert.NoError(t, err) {
t.Fatal(err)
}
require.NoError(t, err)
err = authDB.Claim(&ClaimOpts{
Req: req,
@ -257,19 +239,15 @@ func TestAuthorizationDB_Claim_Valid(t *testing.T) {
ChainBytes: [][]byte{ident.CA.Raw},
MinDifficulty: difficulty,
})
if !assert.NoError(t, err) {
t.Fatal(err)
}
require.NoError(t, err)
updatedAuths, err := authDB.Get(userID)
if !assert.NoError(t, err) || !assert.NotEmpty(t, updatedAuths) {
t.Fatal(err)
}
require.NoError(t, err)
require.NotEmpty(t, updatedAuths)
assert.Equal(t, auths[0].Token, updatedAuths[0].Token)
if !assert.NotNil(t, updatedAuths[0].Claim) {
t.FailNow()
}
require.NotNil(t, updatedAuths[0].Claim)
claim := updatedAuths[0].Claim
assert.Equal(t, grpcPeer.Addr.String(), claim.Addr)
assert.Equal(t, [][]byte{ident.CA.Raw}, claim.SignedChainBytes)
@ -281,32 +259,28 @@ func TestAuthorizationDB_Claim_Valid(t *testing.T) {
func TestAuthorizationDB_Claim_Invalid(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
authDB, err := newTestAuthDB(ctx)
require.NoError(t, err)
defer ctx.Check(authDB.Close)
userID := "user@example.com"
claimedTime := int64(1000000)
claimedAddr := "6.7.8.9:0"
ident1, err := testidentity.NewTestIdentity(ctx)
if !assert.NoError(t, err) || !assert.NotNil(t, ident1) {
t.Fatal(err)
}
require.NoError(t, err)
require.NotNil(t, ident1)
claimedIdent := &identity.PeerIdentity{
CA: ident1.CA,
Leaf: ident1.Leaf,
}
authDB, err := newTestAuthDB(ctx)
if !assert.NoError(t, err) || !assert.NotNil(t, authDB) {
t.Fatal(err)
}
defer func() {
err := authDB.Close()
assert.NoError(t, err)
ctx.Cleanup()
}()
auths, err := authDB.Create(userID, 2)
if !assert.NoError(t, err) || !assert.NotEmpty(t, auths) {
t.Fatal(err)
}
require.NoError(t, err)
require.NotEmpty(t, auths)
claimedIndex, unclaimedIndex := 0, 1
@ -317,14 +291,11 @@ func TestAuthorizationDB_Claim_Invalid(t *testing.T) {
SignedChainBytes: [][]byte{claimedIdent.CA.Raw},
}
err = authDB.put(userID, auths)
if !assert.NoError(t, err) {
t.Fatal(err)
}
require.NoError(t, err)
ident2, err := testidentity.NewTestIdentity(ctx)
if !assert.NoError(t, err) || !assert.NotNil(t, ident2) {
t.Fatal(err)
}
require.NoError(t, err)
require.NotNil(t, ident2)
addr := &net.TCPAddr{
IP: net.ParseIP("1.2.3.4"),
@ -340,9 +311,7 @@ func TestAuthorizationDB_Claim_Invalid(t *testing.T) {
}
difficulty2, err := ident2.ID.Difficulty()
if !assert.NoError(t, err) {
t.Fatal(err)
}
require.NoError(t, err)
t.Run("double claim", func(t *testing.T) {
err = authDB.Claim(&ClaimOpts{
@ -361,9 +330,9 @@ func TestAuthorizationDB_Claim_Invalid(t *testing.T) {
}
updatedAuths, err := authDB.Get(userID)
if !assert.NoError(t, err) || !assert.NotEmpty(t, updatedAuths) {
t.Fatal(err)
}
require.NoError(t, err)
require.NotEmpty(t, updatedAuths)
assert.Equal(t, auths[claimedIndex].Token, updatedAuths[claimedIndex].Token)
claim := updatedAuths[claimedIndex].Claim
@ -390,9 +359,8 @@ func TestAuthorizationDB_Claim_Invalid(t *testing.T) {
}
updatedAuths, err := authDB.Get(userID)
if !assert.NoError(t, err) || !assert.NotEmpty(t, updatedAuths) {
t.Fatal(err)
}
require.NoError(t, err)
require.NotEmpty(t, updatedAuths)
assert.Equal(t, auths[unclaimedIndex].Token, updatedAuths[unclaimedIndex].Token)
assert.Nil(t, updatedAuths[unclaimedIndex].Claim)
@ -415,9 +383,8 @@ func TestAuthorizationDB_Claim_Invalid(t *testing.T) {
}
updatedAuths, err := authDB.Get(userID)
if !assert.NoError(t, err) || !assert.NotEmpty(t, updatedAuths) {
t.Fatal(err)
}
require.NoError(t, err)
require.NotEmpty(t, updatedAuths)
assert.Equal(t, auths[unclaimedIndex].Token, updatedAuths[unclaimedIndex].Token)
assert.Nil(t, updatedAuths[unclaimedIndex].Claim)
@ -427,10 +394,9 @@ func TestAuthorizationDB_Claim_Invalid(t *testing.T) {
func TestNewAuthorization(t *testing.T) {
userID := "user@example.com"
auth, err := NewAuthorization(userID)
assert.NoError(t, err)
if !assert.NotNil(t, auth) {
t.FailNow()
}
require.NoError(t, err)
require.NotNil(t, auth)
assert.NotZero(t, auth.Token)
assert.Equal(t, userID, auth.Token.UserID)
assert.NotEmpty(t, auth.Token.Data)
@ -499,15 +465,11 @@ func TestAuthorizations_Group(t *testing.T) {
func TestAuthorizationDB_Emails(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
authDB, err := newTestAuthDB(ctx)
if !assert.NoError(t, err) {
t.Fatal(err)
}
defer func() {
err = authDB.Close()
assert.NoError(t, err)
ctx.Cleanup()
}()
require.NoError(t, err)
defer ctx.Check(authDB.Close)
var authErrs utils.ErrorGroup
for i := 0; i < 5; i++ {
@ -517,9 +479,7 @@ func TestAuthorizationDB_Emails(t *testing.T) {
}
}
err = authErrs.Finish()
if !assert.NoError(t, err) {
t.Fatal(err)
}
require.NoError(t, err)
userIDs, err := authDB.UserIDs()
assert.NoError(t, err)
@ -549,11 +509,9 @@ func TestParseToken_Valid(t *testing.T) {
b58Data := base58.CheckEncode(data[:], tokenVersion)
tokenString := c.userID + tokenDelimiter + b58Data
token, err := ParseToken(tokenString)
require.NoError(t, err)
require.NotNil(t, token)
assert.NoError(t, err)
if !assert.NotNil(t, token) {
t.FailNow()
}
assert.Equal(t, c.userID, token.UserID)
assert.Equal(t, data[:], token.Data[:])
})
@ -607,10 +565,10 @@ func TestToken_Equal(t *testing.T) {
// TODO: test sad path
func TestCertificateSigner_Sign_E2E(t *testing.T) {
ctx := testcontext.New(t)
tmp := ctx.Dir()
defer ctx.Cleanup()
caCert := filepath.Join(tmp, "ca.cert")
caKey := filepath.Join(tmp, "ca.key")
caCert := ctx.File("ca.cert")
caKey := ctx.File("ca.key")
userID := "user@example.com"
caSetupConfig := identity.CASetupConfig{
CertPath: caCert,
@ -621,27 +579,21 @@ func TestCertificateSigner_Sign_E2E(t *testing.T) {
KeyPath: caKey,
}
config := CertServerConfig{
AuthorizationDBURL: "bolt://" + filepath.Join(tmp, "authorizations.db"),
AuthorizationDBURL: "bolt://" + ctx.File("authorizations.db"),
CA: caConfig,
}
signingCA, err := caSetupConfig.Create(ctx, nil)
if !assert.NoError(t, err) {
t.Fatal(err)
}
require.NoError(t, err)
authDB, err := config.NewAuthDB()
if !assert.NoError(t, err) || !assert.NotNil(t, authDB) {
t.Fatal(err)
}
require.NoError(t, err)
auths, err := authDB.Create("user@example.com", 1)
if !assert.NoError(t, err) || !assert.NotEmpty(t, auths) {
t.Fatal(err)
}
require.NoError(t, err)
require.NotEmpty(t, auths)
err = authDB.Close()
if !assert.NoError(t, err) {
t.Fatal(err)
}
require.NoError(t, err)
// TODO(bryanchriswhite): figure out why pregenerated
// identities change issuers when signed
@ -654,30 +606,26 @@ func TestCertificateSigner_Sign_E2E(t *testing.T) {
//serverIdent, err := idents.NewIdentity()
//------
serverCA, err := testidentity.NewTestCA(ctx)
if !assert.NoError(t, err) || !assert.NotNil(t, serverCA) {
t.Fatal(err)
}
require.NoError(t, err)
require.NotNil(t, serverCA)
serverIdent, err := serverCA.NewIdentity()
//------
if !assert.NoError(t, err) || !assert.NotNil(t, serverIdent) {
t.Fatal(err)
}
require.NoError(t, err)
require.NotNil(t, serverIdent)
listener, err := net.Listen("tcp", "127.0.0.1:0")
if !assert.NoError(t, err) || !assert.NotNil(t, listener) {
t.Fatal(err)
}
require.NoError(t, err)
require.NotNil(t, listener)
serverConfig := server.Config{Address: listener.Addr().String()}
opts, err := server.NewOptions(serverIdent, serverConfig)
if !assert.NoError(t, err) || !assert.NotNil(t, opts) {
t.Fatal(err)
}
require.NoError(t, err)
require.NotNil(t, opts)
service, err := server.New(opts, listener, nil, config)
if !assert.NoError(t, err) || !assert.NotNil(t, service) {
t.Fatal(err)
}
require.NoError(t, err)
require.NotNil(t, service)
ctx.Go(func() error {
err := service.Run(ctx)
@ -700,29 +648,23 @@ func TestCertificateSigner_Sign_E2E(t *testing.T) {
//clientIdent, err := idents.NewIdentity()
//------
clientCA, err := testidentity.NewTestCA(ctx)
if !assert.NoError(t, err) || !assert.NotNil(t, clientCA) {
t.Fatal(err)
}
require.NoError(t, err)
require.NotNil(t, clientCA)
clientIdent, err := clientCA.NewIdentity()
//------
if !assert.NoError(t, err) || !assert.NotNil(t, clientIdent) {
t.Fatal(err)
}
require.NoError(t, err)
require.NotNil(t, clientIdent)
client, err := NewClient(ctx, clientIdent, listener.Addr().String())
if !assert.NoError(t, err) || !assert.NotNil(t, client) {
t.Fatal(err)
}
require.NoError(t, err)
require.NotNil(t, client)
signedChainBytes, err := client.Sign(ctx, auths[0].Token.String())
if !assert.NoError(t, err) || !assert.NotEmpty(t, signedChainBytes) {
t.Fatal(err)
}
require.NoError(t, err)
require.NotEmpty(t, signedChainBytes)
signedChain, err := identity.ParseCertChain(signedChainBytes)
if !assert.NoError(t, err) {
t.Fatal(err)
}
require.NoError(t, err)
assert.Equal(t, clientIdent.CA.RawTBSCertificate, signedChain[0].RawTBSCertificate)
assert.Equal(t, signingCA.Cert.Raw, signedChainBytes[1])
@ -737,26 +679,25 @@ func TestCertificateSigner_Sign_E2E(t *testing.T) {
// NB: re-open after closing for server
authDB, err = config.NewAuthDB()
if !assert.NoError(t, err) || !assert.NotNil(t, authDB) {
t.Fatal(err)
}
defer func() {
err := authDB.Close()
assert.NoError(t, err)
}()
require.NoError(t, err)
require.NotNil(t, authDB)
defer ctx.Check(authDB.Close)
updatedAuths, err := authDB.Get(userID)
if !assert.NoError(t, err) ||
!assert.NotEmpty(t, updatedAuths) ||
!assert.NotNil(t, updatedAuths[0].Claim) {
t.Fatal(err)
}
require.NoError(t, err)
require.NotEmpty(t, updatedAuths)
require.NotNil(t, updatedAuths[0].Claim)
now := time.Now().Unix()
claim := updatedAuths[0].Claim
assert.Equal(t,
strings.Split(listener.Addr().String(), ":")[0],
strings.Split(claim.Addr, ":")[0])
listenerHost, _, err := net.SplitHostPort(listener.Addr().String())
require.NoError(t, err)
claimHost, _, err := net.SplitHostPort(claim.Addr)
require.NoError(t, err)
assert.Equal(t, listenerHost, claimHost)
assert.Equal(t, signedChainBytes, claim.SignedChainBytes)
assert.Condition(t, func() bool {
return now-10 < claim.Timestamp &&
@ -765,77 +706,85 @@ func TestCertificateSigner_Sign_E2E(t *testing.T) {
}
func TestNewClient(t *testing.T) {
ctx := testcontext.New(t)
ident, err := idents.NewIdentity()
if !assert.NoError(t, err) || !assert.NotNil(t, ident) {
t.Fatal(err)
}
t.Skip("needs proper grpc listener to work")
client, err := NewClient(ctx, ident, "")
assert.NoError(t, err)
assert.NotNil(t, client)
}
func TestNewClientFrom(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
ident, err := idents.NewIdentity()
if !assert.NoError(t, err) || !assert.NotNil(t, ident) {
t.Fatal(err)
}
require.NoError(t, err)
require.NotNil(t, ident)
listener, err := net.Listen("tcp", "127.0.0.1:0")
if !assert.NoError(t, err) || !assert.NotNil(t, listener) {
t.Fatal(err)
}
require.NoError(t, err)
require.NotNil(t, listener)
tc := transport.NewClient(ident)
conn, err := tc.DialAddress(ctx, listener.Addr().String())
if !assert.NoError(t, err) || !assert.NotNil(t, conn) {
t.Fatal(err)
}
defer ctx.Check(listener.Close)
ctx.Go(func() error {
for {
conn, err := listener.Accept()
if err != nil {
return nil
}
if err := conn.Close(); err != nil {
return err
}
}
})
pbClient := pb.NewCertificatesClient(conn)
if !assert.NotNil(t, pbClient) {
t.FailNow()
}
t.Run("Basic", func(t *testing.T) {
client, err := NewClient(ctx, ident, listener.Addr().String())
assert.NoError(t, err)
assert.NotNil(t, client)
client, err := NewClientFrom(pbClient)
assert.NoError(t, err)
assert.NotNil(t, client)
defer ctx.Check(client.Close)
})
t.Run("ClientFrom", func(t *testing.T) {
tc := transport.NewClient(ident)
conn, err := tc.DialAddress(ctx, listener.Addr().String())
require.NoError(t, err)
require.NotNil(t, conn)
defer ctx.Check(conn.Close)
pbClient := pb.NewCertificatesClient(conn)
require.NotNil(t, pbClient)
client, err := NewClientFrom(pbClient)
assert.NoError(t, err)
assert.NotNil(t, client)
defer ctx.Check(client.Close)
})
}
func TestCertificateSigner_Sign(t *testing.T) {
ctx := testcontext.New(t)
tmp := ctx.Dir()
defer ctx.Cleanup()
caCert := filepath.Join(tmp, "ca.cert")
caKey := filepath.Join(tmp, "ca.key")
caCert := ctx.File("ca.cert")
caKey := ctx.File("ca.key")
userID := "user@example.com"
caSetupConfig := identity.CASetupConfig{
CertPath: caCert,
KeyPath: caKey,
}
config := CertServerConfig{
AuthorizationDBURL: "bolt://" + filepath.Join(tmp, "authorizations.db"),
AuthorizationDBURL: "bolt://" + ctx.File("authorizations.db"),
}
signingCA, err := caSetupConfig.Create(ctx, nil)
if !assert.NoError(t, err) {
t.Fatal(err)
}
require.NoError(t, err)
authDB, err := config.NewAuthDB()
if !assert.NoError(t, err) || !assert.NotNil(t, authDB) {
t.Fatal(err)
}
defer func() {
err := authDB.Close()
assert.NoError(t, err)
}()
require.NoError(t, err)
require.NotNil(t, authDB)
defer ctx.Check(authDB.Close)
auths, err := authDB.Create(userID, 1)
if !assert.NoError(t, err) || !assert.NotEmpty(t, auths) {
t.Fatal(err)
}
require.NoError(t, err)
require.NotEmpty(t, auths)
// TODO(bryanchriswhite): figure out why pregenerated
// identities change issuers when signed
@ -848,14 +797,13 @@ func TestCertificateSigner_Sign(t *testing.T) {
//clientIdent, err := idents.NewIdentity()
//------
clientCA, err := testidentity.NewTestCA(ctx)
if !assert.NoError(t, err) || !assert.NotNil(t, clientCA) {
t.Fatal(err)
}
require.NoError(t, err)
require.NotNil(t, clientCA)
clientIdent, err := clientCA.NewIdentity()
require.NoError(t, err)
require.NotNil(t, clientIdent)
//------
if !assert.NoError(t, err) || !assert.NotNil(t, clientIdent) {
t.Fatal(err)
}
expectedAddr := &net.TCPAddr{
IP: net.ParseIP("1.2.3.4"),
@ -871,25 +819,18 @@ func TestCertificateSigner_Sign(t *testing.T) {
}
peerCtx := peer.NewContext(ctx, grpcPeer)
certSigner := NewServer(
zap.L(),
signingCA,
authDB,
0,
)
certSigner := NewServer(zap.L(), signingCA, authDB, 0)
req := pb.SigningRequest{
Timestamp: time.Now().Unix(),
AuthToken: auths[0].Token.String(),
}
res, err := certSigner.Sign(peerCtx, &req)
if !assert.NoError(t, err) || !assert.NotNil(t, res) || !assert.NotEmpty(t, res.Chain) {
t.Fatal(err)
}
require.NoError(t, err)
require.NotNil(t, res)
require.NotEmpty(t, res.Chain)
signedChain, err := identity.ParseCertChain(res.Chain)
if !assert.NoError(t, err) {
t.Fatal(err)
}
require.NoError(t, err)
assert.Equal(t, clientIdent.CA.RawTBSCertificate, signedChain[0].RawTBSCertificate)
assert.Equal(t, signingCA.Cert.Raw, signedChain[1].Raw)
@ -900,11 +841,9 @@ func TestCertificateSigner_Sign(t *testing.T) {
assert.NoError(t, err)
updatedAuths, err := authDB.Get(userID)
if !assert.NoError(t, err) ||
!assert.NotEmpty(t, updatedAuths) ||
!assert.NotNil(t, updatedAuths[0].Claim) {
t.Fatal(err)
}
require.NoError(t, err)
require.NotEmpty(t, updatedAuths)
require.NotNil(t, updatedAuths[0].Claim)
now := time.Now().Unix()
claim := updatedAuths[0].Claim
@ -917,7 +856,7 @@ func TestCertificateSigner_Sign(t *testing.T) {
}
func newTestAuthDB(ctx *testcontext.Context) (*AuthorizationDB, error) {
dbPath := "bolt://" + filepath.Join(ctx.Dir(), "authorizations.db")
dbPath := "bolt://" + ctx.File("authorizations.db")
config := CertServerConfig{
AuthorizationDBURL: dbPath,
}

View File

@ -11,6 +11,7 @@ import (
"storj.io/storj/internal/sync2"
"storj.io/storj/pkg/datarepair/queue"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/storj"
"storj.io/storj/storage"
)
@ -23,16 +24,19 @@ type SegmentRepairer interface {
// Service contains the information needed to run the repair service
type Service struct {
queue queue.RepairQueue
config *Config
identity *identity.FullIdentity
repairer SegmentRepairer
limiter *sync2.Limiter
ticker *time.Ticker
}
// NewService creates repairing service
func NewService(queue queue.RepairQueue, repairer SegmentRepairer, interval time.Duration, concurrency int) *Service {
func NewService(queue queue.RepairQueue, config *Config, identity *identity.FullIdentity, interval time.Duration, concurrency int) *Service {
return &Service{
queue: queue,
repairer: repairer,
config: config,
identity: identity,
limiter: sync2.NewLimiter(concurrency),
ticker: time.NewTicker(interval),
}
@ -45,6 +49,12 @@ func (service *Service) Close() error { return nil }
func (service *Service) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
// TODO: close segment repairer, currently this leaks connections
service.repairer, err = service.config.GetSegmentRepairer(ctx, service.identity)
if err != nil {
return err
}
// wait for all repairs to complete
defer service.limiter.Wait()

View File

@ -11,49 +11,23 @@ import (
"github.com/stretchr/testify/require"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testidentity"
"storj.io/storj/internal/testplanet"
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
)
func TestNewClient(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
cases := []struct {
address string
}{
{
address: "127.0.0.1:8080",
},
}
for _, v := range cases {
ca, err := testidentity.NewTestCA(ctx)
assert.NoError(t, err)
identity, err := ca.NewIdentity()
assert.NoError(t, err)
oc, err := overlay.NewClient(identity, v.address)
assert.NoError(t, err)
assert.NotNil(t, oc)
}
}
func TestChoose(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
planet, err := testplanet.New(t, 1, 8, 1)
require.NoError(t, err)
defer ctx.Check(planet.Shutdown)
planet.Start(ctx)
// we wait a second for all the nodes to complete bootstrapping off the satellite
time.Sleep(2 * time.Second)
defer ctx.Check(planet.Shutdown)
oc, err := planet.Uplinks[0].DialOverlay(planet.Satellites[0])
if err != nil {
@ -93,11 +67,11 @@ func TestLookup(t *testing.T) {
planet, err := testplanet.New(t, 1, 4, 1)
require.NoError(t, err)
defer ctx.Check(planet.Shutdown)
planet.Start(ctx)
// we wait a second for all the nodes to complete bootstrapping off the satellite
time.Sleep(2 * time.Second)
defer ctx.Check(planet.Shutdown)
oc, err := planet.Uplinks[0].DialOverlay(planet.Satellites[0])
if err != nil {

View File

@ -305,13 +305,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config) (*
0, peer.Log.Named("checker"),
config.Checker.Interval)
// TODO: close segment repairer, currently this leaks connections
segmentRepairer, err := config.Repairer.GetSegmentRepairer(context.TODO(), peer.Identity)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Repair.Repairer = repairer.NewService(peer.DB.RepairQueue(), segmentRepairer, config.Repairer.Interval, config.Repairer.MaxRepair)
peer.Repair.Repairer = repairer.NewService(peer.DB.RepairQueue(), &config.Repairer, peer.Identity, config.Repairer.Interval, config.Repairer.MaxRepair)
}
{ // setup audit