private/testplanet: expose storagenode and satellite Config
Change-Id: I80fe7ed8ef7356948879afcc6ecb984c5d1a6b9d
This commit is contained in:
parent
b47381138d
commit
e8f18a2cfe
@ -173,7 +173,7 @@ We should break out one process at a time from the Satellite. Here are the thin
|
||||
- repair workers
|
||||
- audit workers
|
||||
|
||||
We will need to add a `SatelliteSystem` in `testplanet` so that we can test continue using the same unit tests for the Satellite as we break it apart. See an example of that [here](https://github.com/storj/storj/pull/2836/files#diff-c2ce7a2c9b2f4920e2de524c0fffd2f1R70).
|
||||
We will need to add a `Satellite` in `testplanet` so that we can test continue using the same unit tests for the Satellite as we break it apart. See an example of that [here](https://github.com/storj/storj/pull/2836/files#diff-c2ce7a2c9b2f4920e2de524c0fffd2f1R70).
|
||||
|
||||
For each new satellite process we need to do the following steps:
|
||||
- create xProcess, where x is a Satellite service, e.g. RepairProcess, AccountingProcess, etc
|
||||
|
@ -26,7 +26,6 @@ import (
|
||||
"storj.io/storj/private/dbutil/pgutil"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
||||
"storj.io/storj/storagenode"
|
||||
"storj.io/storj/versioncontrol"
|
||||
)
|
||||
|
||||
@ -71,8 +70,8 @@ type Planet struct {
|
||||
uplinks []*Uplink
|
||||
|
||||
VersionControl *versioncontrol.Peer
|
||||
Satellites []*SatelliteSystem
|
||||
StorageNodes []*storagenode.Peer
|
||||
Satellites []*Satellite
|
||||
StorageNodes []*StorageNode
|
||||
Uplinks []*Uplink
|
||||
|
||||
ReferralManager *server.Server
|
||||
@ -238,7 +237,7 @@ func (planet *Planet) StopPeer(peer Peer) error {
|
||||
func (planet *Planet) Size() int { return len(planet.uplinks) + len(planet.peers) }
|
||||
|
||||
// FindNode is a helper to retrieve a storage node record by its node ID.
|
||||
func (planet *Planet) FindNode(nodeID storj.NodeID) *storagenode.Peer {
|
||||
func (planet *Planet) FindNode(nodeID storj.NodeID) *StorageNode {
|
||||
for _, node := range planet.StorageNodes {
|
||||
if node.ID() == nodeID {
|
||||
return node
|
||||
|
@ -58,8 +58,10 @@ import (
|
||||
"storj.io/storj/storage/redis/redisserver"
|
||||
)
|
||||
|
||||
// SatelliteSystem contains all the processes needed to run a full Satellite setup
|
||||
type SatelliteSystem struct {
|
||||
// Satellite contains all the processes needed to run a full Satellite setup
|
||||
type Satellite struct {
|
||||
Config satellite.Config
|
||||
|
||||
Core *satellite.Core
|
||||
API *satellite.API
|
||||
Repairer *satellite.Repairer
|
||||
@ -177,21 +179,21 @@ type SatelliteSystem struct {
|
||||
}
|
||||
|
||||
// ID returns the ID of the Satellite system.
|
||||
func (system *SatelliteSystem) ID() storj.NodeID { return system.API.Identity.ID }
|
||||
func (system *Satellite) ID() storj.NodeID { return system.API.Identity.ID }
|
||||
|
||||
// Local returns the peer local node info from the Satellite system API.
|
||||
func (system *SatelliteSystem) Local() overlay.NodeDossier { return system.API.Contact.Service.Local() }
|
||||
func (system *Satellite) Local() overlay.NodeDossier { return system.API.Contact.Service.Local() }
|
||||
|
||||
// Addr returns the public address from the Satellite system API.
|
||||
func (system *SatelliteSystem) Addr() string { return system.API.Server.Addr().String() }
|
||||
func (system *Satellite) Addr() string { return system.API.Server.Addr().String() }
|
||||
|
||||
// URL returns the storj.NodeURL from the Satellite system API.
|
||||
func (system *SatelliteSystem) URL() storj.NodeURL {
|
||||
func (system *Satellite) URL() storj.NodeURL {
|
||||
return storj.NodeURL{ID: system.API.ID(), Address: system.API.Addr()}
|
||||
}
|
||||
|
||||
// Close closes all the subsystems in the Satellite system
|
||||
func (system *SatelliteSystem) Close() error {
|
||||
func (system *Satellite) Close() error {
|
||||
return errs.Combine(
|
||||
system.API.Close(),
|
||||
system.Core.Close(),
|
||||
@ -202,7 +204,7 @@ func (system *SatelliteSystem) Close() error {
|
||||
}
|
||||
|
||||
// Run runs all the subsystems in the Satellite system
|
||||
func (system *SatelliteSystem) Run(ctx context.Context) (err error) {
|
||||
func (system *Satellite) Run(ctx context.Context) (err error) {
|
||||
group, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
group.Go(func() error {
|
||||
@ -224,11 +226,11 @@ func (system *SatelliteSystem) Run(ctx context.Context) (err error) {
|
||||
}
|
||||
|
||||
// PrivateAddr returns the private address from the Satellite system API.
|
||||
func (system *SatelliteSystem) PrivateAddr() string { return system.API.Server.PrivateAddr().String() }
|
||||
func (system *Satellite) PrivateAddr() string { return system.API.Server.PrivateAddr().String() }
|
||||
|
||||
// newSatellites initializes satellites
|
||||
func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) {
|
||||
var xs []*SatelliteSystem
|
||||
func (planet *Planet) newSatellites(count int) ([]*Satellite, error) {
|
||||
var xs []*Satellite
|
||||
defer func() {
|
||||
for _, x := range xs {
|
||||
planet.peers = append(planet.peers, newClosablePeer(x))
|
||||
@ -518,7 +520,7 @@ func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) {
|
||||
|
||||
log.Debug("id=" + peer.ID().String() + " addr=" + api.Addr())
|
||||
|
||||
system := createNewSystem(log, peer, api, repairerPeer, adminPeer, gcPeer)
|
||||
system := createNewSystem(log, config, peer, api, repairerPeer, adminPeer, gcPeer)
|
||||
xs = append(xs, system)
|
||||
}
|
||||
return xs, nil
|
||||
@ -528,8 +530,9 @@ func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) {
|
||||
// before we split out the API. In the short term this will help keep all the tests passing
|
||||
// without much modification needed. However long term, we probably want to rework this
|
||||
// so it represents how the satellite will run when it is made up of many prrocesses.
|
||||
func createNewSystem(log *zap.Logger, peer *satellite.Core, api *satellite.API, repairerPeer *satellite.Repairer, adminPeer *satellite.Admin, gcPeer *satellite.GarbageCollection) *SatelliteSystem {
|
||||
system := &SatelliteSystem{
|
||||
func createNewSystem(log *zap.Logger, config satellite.Config, peer *satellite.Core, api *satellite.API, repairerPeer *satellite.Repairer, adminPeer *satellite.Admin, gcPeer *satellite.GarbageCollection) *Satellite {
|
||||
system := &Satellite{
|
||||
Config: config,
|
||||
Core: peer,
|
||||
API: api,
|
||||
Repairer: repairerPeer,
|
||||
|
@ -37,9 +37,23 @@ import (
|
||||
"storj.io/storj/storagenode/trust"
|
||||
)
|
||||
|
||||
// newStorageNodes initializes storage nodes
|
||||
func (planet *Planet) newStorageNodes(count int, whitelistedSatellites storj.NodeURLs) ([]*storagenode.Peer, error) {
|
||||
var xs []*storagenode.Peer
|
||||
// StorageNode contains all the processes needed to run a full StorageNode setup.
|
||||
type StorageNode struct {
|
||||
Config storagenode.Config
|
||||
*storagenode.Peer
|
||||
}
|
||||
|
||||
// URL returns the storj.NodeURL.
|
||||
func (system *StorageNode) URL() storj.NodeURL {
|
||||
return storj.NodeURL{
|
||||
ID: system.Peer.ID(),
|
||||
Address: system.Peer.Addr(),
|
||||
}
|
||||
}
|
||||
|
||||
// newStorageNodes initializes storage nodes.
|
||||
func (planet *Planet) newStorageNodes(count int, whitelistedSatellites storj.NodeURLs) ([]*StorageNode, error) {
|
||||
var xs []*StorageNode
|
||||
defer func() {
|
||||
for _, x := range xs {
|
||||
planet.peers = append(planet.peers, newClosablePeer(x))
|
||||
@ -208,7 +222,10 @@ func (planet *Planet) newStorageNodes(count int, whitelistedSatellites storj.Nod
|
||||
planet.databases = append(planet.databases, db)
|
||||
|
||||
log.Debug("id=" + peer.ID().String() + " addr=" + peer.Addr())
|
||||
xs = append(xs, peer)
|
||||
xs = append(xs, &StorageNode{
|
||||
Config: config,
|
||||
Peer: peer,
|
||||
})
|
||||
}
|
||||
return xs, nil
|
||||
}
|
||||
|
@ -158,12 +158,12 @@ func (client *Uplink) DialPiecestore(ctx context.Context, destination Peer) (*pi
|
||||
}
|
||||
|
||||
// Upload data to specific satellite
|
||||
func (client *Uplink) Upload(ctx context.Context, satellite *SatelliteSystem, bucket string, path storj.Path, data []byte) error {
|
||||
func (client *Uplink) Upload(ctx context.Context, satellite *Satellite, bucket string, path storj.Path, data []byte) error {
|
||||
return client.UploadWithExpiration(ctx, satellite, bucket, path, data, time.Time{})
|
||||
}
|
||||
|
||||
// UploadWithExpiration data to specific satellite and expiration time
|
||||
func (client *Uplink) UploadWithExpiration(ctx context.Context, satellite *SatelliteSystem, bucketName string, path storj.Path, data []byte, expiration time.Time) error {
|
||||
func (client *Uplink) UploadWithExpiration(ctx context.Context, satellite *Satellite, bucketName string, path storj.Path, data []byte, expiration time.Time) error {
|
||||
config := client.GetConfig(satellite)
|
||||
project, bucket, err := client.GetProjectAndBucket(ctx, satellite, bucketName, config)
|
||||
if err != nil {
|
||||
@ -185,7 +185,7 @@ func (client *Uplink) UploadWithExpiration(ctx context.Context, satellite *Satel
|
||||
}
|
||||
|
||||
// UploadWithClientConfig uploads data to specific satellite with custom client configuration
|
||||
func (client *Uplink) UploadWithClientConfig(ctx context.Context, satellite *SatelliteSystem, clientConfig UplinkConfig, bucketName string, path storj.Path, data []byte) (err error) {
|
||||
func (client *Uplink) UploadWithClientConfig(ctx context.Context, satellite *Satellite, clientConfig UplinkConfig, bucketName string, path storj.Path, data []byte) (err error) {
|
||||
project, bucket, err := client.GetProjectAndBucket(ctx, satellite, bucketName, clientConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -205,7 +205,7 @@ func (client *Uplink) UploadWithClientConfig(ctx context.Context, satellite *Sat
|
||||
}
|
||||
|
||||
// Download data from specific satellite
|
||||
func (client *Uplink) Download(ctx context.Context, satellite *SatelliteSystem, bucketName string, path storj.Path) ([]byte, error) {
|
||||
func (client *Uplink) Download(ctx context.Context, satellite *Satellite, bucketName string, path storj.Path) ([]byte, error) {
|
||||
project, bucket, err := client.GetProjectAndBucket(ctx, satellite, bucketName, client.GetConfig(satellite))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -231,7 +231,7 @@ func (client *Uplink) Download(ctx context.Context, satellite *SatelliteSystem,
|
||||
}
|
||||
|
||||
// DownloadStream returns stream for downloading data
|
||||
func (client *Uplink) DownloadStream(ctx context.Context, satellite *SatelliteSystem, bucketName string, path storj.Path) (_ io.ReadCloser, cleanup func() error, err error) {
|
||||
func (client *Uplink) DownloadStream(ctx context.Context, satellite *Satellite, bucketName string, path storj.Path) (_ io.ReadCloser, cleanup func() error, err error) {
|
||||
project, bucket, err := client.GetProjectAndBucket(ctx, satellite, bucketName, client.GetConfig(satellite))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
@ -250,7 +250,7 @@ func (client *Uplink) DownloadStream(ctx context.Context, satellite *SatelliteSy
|
||||
}
|
||||
|
||||
// DownloadStreamRange returns stream for downloading data
|
||||
func (client *Uplink) DownloadStreamRange(ctx context.Context, satellite *SatelliteSystem, bucketName string, path storj.Path, start, limit int64) (_ io.ReadCloser, cleanup func() error, err error) {
|
||||
func (client *Uplink) DownloadStreamRange(ctx context.Context, satellite *Satellite, bucketName string, path storj.Path, start, limit int64) (_ io.ReadCloser, cleanup func() error, err error) {
|
||||
project, bucket, err := client.GetProjectAndBucket(ctx, satellite, bucketName, client.GetConfig(satellite))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
@ -269,7 +269,7 @@ func (client *Uplink) DownloadStreamRange(ctx context.Context, satellite *Satell
|
||||
}
|
||||
|
||||
// DeleteObject deletes an object at the path in a bucket
|
||||
func (client *Uplink) DeleteObject(ctx context.Context, satellite *SatelliteSystem, bucketName string, path storj.Path) error {
|
||||
func (client *Uplink) DeleteObject(ctx context.Context, satellite *Satellite, bucketName string, path storj.Path) error {
|
||||
project, bucket, err := client.GetProjectAndBucket(ctx, satellite, bucketName, client.GetConfig(satellite))
|
||||
if err != nil {
|
||||
return err
|
||||
@ -284,7 +284,7 @@ func (client *Uplink) DeleteObject(ctx context.Context, satellite *SatelliteSyst
|
||||
}
|
||||
|
||||
// CreateBucket creates a new bucket
|
||||
func (client *Uplink) CreateBucket(ctx context.Context, satellite *SatelliteSystem, bucketName string) error {
|
||||
func (client *Uplink) CreateBucket(ctx context.Context, satellite *Satellite, bucketName string) error {
|
||||
project, err := client.GetProject(ctx, satellite)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -306,7 +306,7 @@ func (client *Uplink) CreateBucket(ctx context.Context, satellite *SatelliteSyst
|
||||
}
|
||||
|
||||
// DeleteBucket deletes a bucket.
|
||||
func (client *Uplink) DeleteBucket(ctx context.Context, satellite *SatelliteSystem, bucketName string) error {
|
||||
func (client *Uplink) DeleteBucket(ctx context.Context, satellite *Satellite, bucketName string) error {
|
||||
project, err := client.GetProject(ctx, satellite)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -321,7 +321,7 @@ func (client *Uplink) DeleteBucket(ctx context.Context, satellite *SatelliteSyst
|
||||
}
|
||||
|
||||
// GetConfig returns a default config for a given satellite.
|
||||
func (client *Uplink) GetConfig(satellite *SatelliteSystem) UplinkConfig {
|
||||
func (client *Uplink) GetConfig(satellite *Satellite) UplinkConfig {
|
||||
config := getDefaultConfig()
|
||||
|
||||
// client.APIKey[satellite.ID()] is a *macaroon.APIKey, but we want a
|
||||
@ -394,7 +394,7 @@ func (client *Uplink) NewLibuplink(ctx context.Context) (*libuplink.Uplink, erro
|
||||
}
|
||||
|
||||
// GetProject returns a libuplink.Project which allows interactions with a specific project
|
||||
func (client *Uplink) GetProject(ctx context.Context, satellite *SatelliteSystem) (*libuplink.Project, error) {
|
||||
func (client *Uplink) GetProject(ctx context.Context, satellite *Satellite) (*libuplink.Project, error) {
|
||||
testLibuplink, err := client.NewLibuplink(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -414,7 +414,7 @@ func (client *Uplink) GetProject(ctx context.Context, satellite *SatelliteSystem
|
||||
}
|
||||
|
||||
// GetProjectAndBucket returns a libuplink.Project and Bucket which allows interactions with a specific project and its buckets
|
||||
func (client *Uplink) GetProjectAndBucket(ctx context.Context, satellite *SatelliteSystem, bucketName string, clientCfg UplinkConfig) (_ *libuplink.Project, _ *libuplink.Bucket, err error) {
|
||||
func (client *Uplink) GetProjectAndBucket(ctx context.Context, satellite *Satellite, bucketName string, clientCfg UplinkConfig) (_ *libuplink.Project, _ *libuplink.Bucket, err error) {
|
||||
project, err := client.GetProject(ctx, satellite)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
|
@ -140,7 +140,6 @@ func TestDownloadWithSomeNodesOffline(t *testing.T) {
|
||||
err = satellite.Overlay.Service.UpdateCheckIn(ctx, info, time.Now().Add(-4*time.Hour))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
}
|
||||
// confirm that we marked the correct number of storage nodes as offline
|
||||
nodes, err := satellite.Overlay.Service.Reliable(ctx)
|
||||
|
@ -23,7 +23,6 @@ import (
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/storagenode"
|
||||
"storj.io/uplink/private/ecclient"
|
||||
"storj.io/uplink/private/eestream"
|
||||
)
|
||||
@ -145,7 +144,7 @@ func testDelete(ctx context.Context, t *testing.T, planet *testplanet.Planet, ec
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func newAddressedOrderLimit(ctx context.Context, action pb.PieceAction, satellite *testplanet.SatelliteSystem, piecePublicKey storj.PiecePublicKey, storageNode *storagenode.Peer, pieceID storj.PieceID) (*pb.AddressedOrderLimit, error) {
|
||||
func newAddressedOrderLimit(ctx context.Context, action pb.PieceAction, satellite *testplanet.Satellite, piecePublicKey storj.PiecePublicKey, storageNode *testplanet.StorageNode, pieceID storj.PieceID) (*pb.AddressedOrderLimit, error) {
|
||||
// TODO refactor to avoid OrderLimit duplication
|
||||
serialNumber := testrand.SerialNumber()
|
||||
|
||||
|
@ -22,7 +22,6 @@ import (
|
||||
"storj.io/storj/private/teststorj"
|
||||
"storj.io/storj/satellite/accounting"
|
||||
"storj.io/storj/satellite/accounting/tally"
|
||||
"storj.io/storj/storagenode"
|
||||
)
|
||||
|
||||
func TestDeleteTalliesBefore(t *testing.T) {
|
||||
@ -321,7 +320,7 @@ func addBucketTally(existingTally *accounting.BucketTally, inline, last bool) *a
|
||||
}
|
||||
|
||||
// makePointer creates a pointer
|
||||
func makePointer(storageNodes []*storagenode.Peer, rs storj.RedundancyScheme, segmentSize int64, inline bool) *pb.Pointer {
|
||||
func makePointer(storageNodes []*testplanet.StorageNode, rs storj.RedundancyScheme, segmentSize int64, inline bool) *pb.Pointer {
|
||||
if inline {
|
||||
inlinePointer := &pb.Pointer{
|
||||
CreationDate: time.Now(),
|
||||
|
@ -238,7 +238,7 @@ func TestDisqualifiedNodeRemainsDisqualified(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func isDisqualified(t *testing.T, ctx *testcontext.Context, satellite *testplanet.SatelliteSystem, nodeID storj.NodeID) bool {
|
||||
func isDisqualified(t *testing.T, ctx *testcontext.Context, satellite *testplanet.Satellite, nodeID storj.NodeID) bool {
|
||||
node, err := satellite.Overlay.Service.Get(ctx, nodeID)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -179,7 +179,7 @@ func TestReverifyFailMissingShare(t *testing.T) {
|
||||
// delete the piece from the first node
|
||||
piece := pointer.GetRemote().GetRemotePieces()[0]
|
||||
pieceID := pointer.GetRemote().RootPieceId.Derive(piece.NodeId, piece.PieceNum)
|
||||
node := getStorageNode(planet, piece.NodeId)
|
||||
node := planet.FindNode(piece.NodeId)
|
||||
err = node.Storage2.Store.Delete(ctx, satellite.ID(), pieceID)
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -271,7 +271,7 @@ func TestReverifyFailMissingShareNotVerified(t *testing.T) {
|
||||
// delete the piece from the first node
|
||||
piece := pieces[0]
|
||||
pieceID := pointer.GetRemote().RootPieceId.Derive(piece.NodeId, piece.PieceNum)
|
||||
node := getStorageNode(planet, piece.NodeId)
|
||||
node := planet.FindNode(piece.NodeId)
|
||||
err = node.Storage2.Store.Delete(ctx, satellite.ID(), pieceID)
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -875,7 +875,7 @@ func TestReverifyDifferentShare(t *testing.T) {
|
||||
|
||||
// delete the piece for pointer1 from the selected node
|
||||
pieceID := pointer1.GetRemote().RootPieceId.Derive(selectedNode, selectedPieceNum)
|
||||
node := getStorageNode(planet, selectedNode)
|
||||
node := planet.FindNode(selectedNode)
|
||||
err = node.Storage2.Store.Delete(ctx, satellite.ID(), pieceID)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -9,22 +9,12 @@ import (
|
||||
|
||||
"storj.io/common/storj"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/storagenode"
|
||||
)
|
||||
|
||||
func getStorageNode(planet *testplanet.Planet, nodeID storj.NodeID) *storagenode.Peer {
|
||||
for _, node := range planet.StorageNodes {
|
||||
if node.ID() == nodeID {
|
||||
return node
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func stopStorageNode(ctx context.Context, planet *testplanet.Planet, nodeID storj.NodeID) error {
|
||||
node := getStorageNode(planet, nodeID)
|
||||
node := planet.FindNode(nodeID)
|
||||
if node == nil {
|
||||
return fmt.Errorf("no such node: %s", nodeID.String())
|
||||
return fmt.Errorf("no such node: %s", nodeID)
|
||||
}
|
||||
|
||||
err := planet.StopPeer(node)
|
||||
|
@ -501,7 +501,7 @@ func TestVerifierMissingPiece(t *testing.T) {
|
||||
origNumPieces := len(pointer.GetRemote().GetRemotePieces())
|
||||
piece := pointer.GetRemote().GetRemotePieces()[0]
|
||||
pieceID := pointer.GetRemote().RootPieceId.Derive(piece.NodeId, piece.PieceNum)
|
||||
node := getStorageNode(planet, piece.NodeId)
|
||||
node := planet.FindNode(piece.NodeId)
|
||||
err = node.Storage2.Store.Delete(ctx, satellite.ID(), pieceID)
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -552,7 +552,7 @@ func TestVerifierMissingPieceHashesNotVerified(t *testing.T) {
|
||||
origNumPieces := len(pointer.GetRemote().GetRemotePieces())
|
||||
piece := pointer.GetRemote().GetRemotePieces()[0]
|
||||
pieceID := pointer.GetRemote().RootPieceId.Derive(piece.NodeId, piece.PieceNum)
|
||||
node := getStorageNode(planet, piece.NodeId)
|
||||
node := planet.FindNode(piece.NodeId)
|
||||
err = node.Storage2.Store.Delete(ctx, satellite.ID(), pieceID)
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -748,7 +748,7 @@ func TestVerifierModifiedSegmentFailsOnce(t *testing.T) {
|
||||
origNumPieces := len(pointer.GetRemote().GetRemotePieces())
|
||||
piece := pointer.GetRemote().GetRemotePieces()[0]
|
||||
pieceID := pointer.GetRemote().RootPieceId.Derive(piece.NodeId, piece.PieceNum)
|
||||
node := getStorageNode(planet, piece.NodeId)
|
||||
node := planet.FindNode(piece.NodeId)
|
||||
err = node.Storage2.Store.Delete(ctx, satellite.ID(), pieceID)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -121,7 +121,7 @@ func TestGarbageCollection(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func getPointer(ctx *testcontext.Context, t *testing.T, satellite *testplanet.SatelliteSystem, upl *testplanet.Uplink, bucket, path string) (lastSegPath string, pointer *pb.Pointer) {
|
||||
func getPointer(ctx *testcontext.Context, t *testing.T, satellite *testplanet.Satellite, upl *testplanet.Uplink, bucket, path string) (lastSegPath string, pointer *pb.Pointer) {
|
||||
projects, err := satellite.DB.Console().Projects().GetAll(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, projects, 1)
|
||||
|
@ -46,7 +46,7 @@ type exitProcessClient interface {
|
||||
}
|
||||
|
||||
func TestSuccess(t *testing.T) {
|
||||
testTransfers(t, numObjects, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
|
||||
testTransfers(t, numObjects, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
|
||||
var pieceID storj.PieceID
|
||||
failedCount := 0
|
||||
deletedCount := 0
|
||||
@ -313,7 +313,7 @@ func TestRecvTimeout(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestInvalidStorageNodeSignature(t *testing.T) {
|
||||
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
|
||||
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
|
||||
response, err := processClient.Recv()
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -424,7 +424,7 @@ func TestExitDisqualifiedNodeFailOnStart(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestExitDisqualifiedNodeFailEventually(t *testing.T) {
|
||||
testTransfers(t, numObjects, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
|
||||
testTransfers(t, numObjects, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
|
||||
var disqualifiedError error
|
||||
isDisqualified := false
|
||||
for {
|
||||
@ -512,7 +512,7 @@ func TestExitDisqualifiedNodeFailEventually(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFailureHashMismatch(t *testing.T) {
|
||||
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
|
||||
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
|
||||
response, err := processClient.Recv()
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -590,7 +590,7 @@ func TestFailureHashMismatch(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFailureUnknownError(t *testing.T) {
|
||||
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
|
||||
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
|
||||
response, err := processClient.Recv()
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -631,7 +631,7 @@ func TestFailureUnknownError(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFailureUplinkSignature(t *testing.T) {
|
||||
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
|
||||
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
|
||||
response, err := processClient.Recv()
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -711,7 +711,7 @@ func TestFailureUplinkSignature(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSuccessPointerUpdate(t *testing.T) {
|
||||
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
|
||||
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
|
||||
var recNodeID storj.NodeID
|
||||
|
||||
response, err := processClient.Recv()
|
||||
@ -808,7 +808,7 @@ func TestSuccessPointerUpdate(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestUpdatePointerFailure_DuplicatedNodeID(t *testing.T) {
|
||||
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
|
||||
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
|
||||
response, err := processClient.Recv()
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -1060,7 +1060,7 @@ func TestPointerChangedOrDeleted(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFailureNotFoundPieceHashVerified(t *testing.T) {
|
||||
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
|
||||
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
|
||||
response, err := processClient.Recv()
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -1128,7 +1128,7 @@ func TestFailureNotFoundPieceHashVerified(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFailureNotFoundPieceHashUnverified(t *testing.T) {
|
||||
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
|
||||
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
|
||||
// retrieve remote segment
|
||||
keys, err := satellite.Metainfo.Database.List(ctx, nil, -1)
|
||||
require.NoError(t, err)
|
||||
@ -1414,7 +1414,7 @@ func TestIneligibleNodeAge(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func testTransfers(t *testing.T, objects int, verifier func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int)) {
|
||||
func testTransfers(t *testing.T, objects int, verifier func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int)) {
|
||||
const successThreshold = 4
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1,
|
||||
@ -1528,11 +1528,6 @@ func findNodeToExit(ctx context.Context, planet *testplanet.Planet, objects int)
|
||||
}
|
||||
}
|
||||
|
||||
for _, sn := range planet.StorageNodes {
|
||||
if sn.ID() == exitingNodeID {
|
||||
return sn, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
node := planet.FindNode(exitingNodeID)
|
||||
return node.Peer, nil
|
||||
}
|
||||
|
@ -446,7 +446,7 @@ func TestEndpoint_DeleteObjectPieces_ObjectWithoutLastSegment(t *testing.T) {
|
||||
}
|
||||
|
||||
func getProjectIDAndEncPathFirstObject(
|
||||
ctx context.Context, t *testing.T, satellite *testplanet.SatelliteSystem,
|
||||
ctx context.Context, t *testing.T, satellite *testplanet.Satellite,
|
||||
) (projectID *uuid.UUID, encryptedPath []byte) {
|
||||
t.Helper()
|
||||
|
||||
@ -463,7 +463,7 @@ func getProjectIDAndEncPathFirstObject(
|
||||
|
||||
func uploadFirstObjectWithoutLastSegmentPointer(
|
||||
ctx context.Context, t *testing.T, uplnk *testplanet.Uplink,
|
||||
satelliteSys *testplanet.SatelliteSystem, segmentSize memory.Size,
|
||||
satelliteSys *testplanet.Satellite, segmentSize memory.Size,
|
||||
bucketName string, objectName string, objectData []byte,
|
||||
) (projectID *uuid.UUID, encryptedPath []byte) {
|
||||
t.Helper()
|
||||
@ -475,7 +475,7 @@ func uploadFirstObjectWithoutLastSegmentPointer(
|
||||
|
||||
func uploadFirstObjectWithoutSomeSegmentsPointers(
|
||||
ctx context.Context, t *testing.T, uplnk *testplanet.Uplink,
|
||||
satelliteSys *testplanet.SatelliteSystem, segmentSize memory.Size,
|
||||
satelliteSys *testplanet.Satellite, segmentSize memory.Size,
|
||||
bucketName string, objectName string, objectData []byte, noSegmentsIndexes []int64,
|
||||
) (projectID *uuid.UUID, encryptedPath []byte) {
|
||||
t.Helper()
|
||||
|
@ -22,7 +22,6 @@ import (
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
"storj.io/storj/storage"
|
||||
"storj.io/storj/storagenode"
|
||||
)
|
||||
|
||||
// TestDataRepair does the following:
|
||||
@ -210,7 +209,6 @@ func TestCorruptDataRepair_Failed(t *testing.T) {
|
||||
nodesToKill := make(map[storj.NodeID]bool)
|
||||
originalNodes := make(map[storj.NodeID]bool)
|
||||
|
||||
var corruptedNode *storagenode.Peer
|
||||
var corruptedNodeID storj.NodeID
|
||||
var corruptedPieceID storj.PieceID
|
||||
|
||||
@ -230,13 +228,9 @@ func TestCorruptDataRepair_Failed(t *testing.T) {
|
||||
require.NotNil(t, corruptedNodeID)
|
||||
require.NotNil(t, corruptedPieceID)
|
||||
|
||||
for _, node := range planet.StorageNodes {
|
||||
if node.ID() == corruptedNodeID {
|
||||
corruptedNode = node
|
||||
}
|
||||
if nodesToKill[node.ID()] {
|
||||
stopNodeByID(t, ctx, planet, node.ID())
|
||||
}
|
||||
corruptedNode := planet.FindNode(corruptedNodeID)
|
||||
for nodeID := range nodesToKill {
|
||||
stopNodeByID(t, ctx, planet, nodeID)
|
||||
}
|
||||
require.NotNil(t, corruptedNode)
|
||||
|
||||
@ -325,7 +319,6 @@ func TestCorruptDataRepair_Succeed(t *testing.T) {
|
||||
nodesToKill := make(map[storj.NodeID]bool)
|
||||
originalNodes := make(map[storj.NodeID]bool)
|
||||
|
||||
var corruptedNode *storagenode.Peer
|
||||
var corruptedNodeID storj.NodeID
|
||||
var corruptedPieceID storj.PieceID
|
||||
var corruptedPiece *pb.RemotePiece
|
||||
@ -348,13 +341,9 @@ func TestCorruptDataRepair_Succeed(t *testing.T) {
|
||||
require.NotNil(t, corruptedPieceID)
|
||||
require.NotNil(t, corruptedPiece)
|
||||
|
||||
for _, node := range planet.StorageNodes {
|
||||
if node.ID() == corruptedNodeID {
|
||||
corruptedNode = node
|
||||
}
|
||||
if nodesToKill[node.ID()] {
|
||||
stopNodeByID(t, ctx, planet, node.ID())
|
||||
}
|
||||
corruptedNode := planet.FindNode(corruptedNodeID)
|
||||
for nodeID := range nodesToKill {
|
||||
stopNodeByID(t, ctx, planet, nodeID)
|
||||
}
|
||||
require.NotNil(t, corruptedNode)
|
||||
|
||||
@ -561,7 +550,7 @@ func TestIrreparableSegmentAccordingToOverlay(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func updateNodeCheckIn(ctx context.Context, overlayDB overlay.DB, node *storagenode.Peer, isUp bool, timestamp time.Time) error {
|
||||
func updateNodeCheckIn(ctx context.Context, overlayDB overlay.DB, node *testplanet.StorageNode, isUp bool, timestamp time.Time) error {
|
||||
local := node.Local()
|
||||
checkInInfo := overlay.NodeCheckInInfo{
|
||||
NodeID: node.ID(),
|
||||
@ -1102,7 +1091,7 @@ func TestDataRepairUploadLimit(t *testing.T) {
|
||||
// getRemoteSegment returns a remote pointer its path from satellite.
|
||||
// nolint:golint
|
||||
func getRemoteSegment(
|
||||
t *testing.T, ctx context.Context, satellite *testplanet.SatelliteSystem,
|
||||
t *testing.T, ctx context.Context, satellite *testplanet.Satellite,
|
||||
) (_ *pb.Pointer, path string) {
|
||||
t.Helper()
|
||||
|
||||
@ -1128,35 +1117,31 @@ func getRemoteSegment(
|
||||
func stopNodeByID(t *testing.T, ctx context.Context, planet *testplanet.Planet, nodeID storj.NodeID) {
|
||||
t.Helper()
|
||||
|
||||
for _, node := range planet.StorageNodes {
|
||||
if node.ID() == nodeID {
|
||||
err := planet.StopPeer(node)
|
||||
require.NoError(t, err)
|
||||
node := planet.FindNode(nodeID)
|
||||
require.NotNil(t, node)
|
||||
err := planet.StopPeer(node)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, satellite := range planet.Satellites {
|
||||
err = satellite.Overlay.Service.UpdateCheckIn(ctx, overlay.NodeCheckInInfo{
|
||||
NodeID: node.ID(),
|
||||
Address: &pb.NodeAddress{
|
||||
Address: node.Addr(),
|
||||
},
|
||||
IsUp: true,
|
||||
Version: &pb.NodeVersion{
|
||||
Version: "v0.0.0",
|
||||
CommitHash: "",
|
||||
Timestamp: time.Time{},
|
||||
Release: false,
|
||||
},
|
||||
}, time.Now().Add(-4*time.Hour))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
for _, satellite := range planet.Satellites {
|
||||
err = satellite.Overlay.Service.UpdateCheckIn(ctx, overlay.NodeCheckInInfo{
|
||||
NodeID: node.ID(),
|
||||
Address: &pb.NodeAddress{
|
||||
Address: node.Addr(),
|
||||
},
|
||||
IsUp: true,
|
||||
Version: &pb.NodeVersion{
|
||||
Version: "v0.0.0",
|
||||
CommitHash: "",
|
||||
Timestamp: time.Time{},
|
||||
Release: false,
|
||||
},
|
||||
}, time.Now().Add(-4*time.Hour))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
// corruptPieceData manipulates piece data on a storage node.
|
||||
func corruptPieceData(ctx context.Context, t *testing.T, planet *testplanet.Planet, corruptedNode *storagenode.Peer, corruptedPieceID storj.PieceID) {
|
||||
func corruptPieceData(ctx context.Context, t *testing.T, planet *testplanet.Planet, corruptedNode *testplanet.StorageNode, corruptedPieceID storj.PieceID) {
|
||||
t.Helper()
|
||||
|
||||
blobRef := storage.BlobRef{
|
||||
|
@ -18,7 +18,6 @@ import (
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
"storj.io/storj/storage"
|
||||
"storj.io/storj/storagenode"
|
||||
)
|
||||
|
||||
func TestChore(t *testing.T) {
|
||||
@ -58,19 +57,14 @@ func TestChore(t *testing.T) {
|
||||
require.NotNil(t, newExitingNodeID)
|
||||
require.NotEqual(t, exitingNode.ID(), newExitingNodeID)
|
||||
|
||||
var newExitingNode *storagenode.Peer
|
||||
for _, node := range planet.StorageNodes {
|
||||
if node.ID() == newExitingNodeID {
|
||||
newExitingNode = node
|
||||
}
|
||||
}
|
||||
newExitingNode := planet.FindNode(newExitingNodeID)
|
||||
require.NotNil(t, newExitingNode)
|
||||
|
||||
exitSatellite(ctx, t, planet, newExitingNode)
|
||||
})
|
||||
}
|
||||
|
||||
func exitSatellite(ctx context.Context, t *testing.T, planet *testplanet.Planet, exitingNode *storagenode.Peer) {
|
||||
func exitSatellite(ctx context.Context, t *testing.T, planet *testplanet.Planet, exitingNode *testplanet.StorageNode) {
|
||||
satellite1 := planet.Satellites[0]
|
||||
exitingNode.GracefulExit.Chore.Loop.Pause()
|
||||
|
||||
@ -162,7 +156,7 @@ func getNodePieceCounts(ctx context.Context, planet *testplanet.Planet) (_ map[s
|
||||
}
|
||||
|
||||
// findNodeToExit selects the node storing the most pieces as the node to graceful exit.
|
||||
func findNodeToExit(ctx context.Context, planet *testplanet.Planet, objects int) (*storagenode.Peer, error) {
|
||||
func findNodeToExit(ctx context.Context, planet *testplanet.Planet, objects int) (*testplanet.StorageNode, error) {
|
||||
satellite := planet.Satellites[0]
|
||||
keys, err := satellite.Metainfo.Database.List(ctx, nil, objects)
|
||||
if err != nil {
|
||||
@ -199,11 +193,5 @@ func findNodeToExit(ctx context.Context, planet *testplanet.Planet, objects int)
|
||||
}
|
||||
}
|
||||
|
||||
for _, sn := range planet.StorageNodes {
|
||||
if sn.ID() == exitingNodeID {
|
||||
return sn, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
return planet.FindNode(exitingNodeID), nil
|
||||
}
|
||||
|
@ -576,8 +576,8 @@ func GenerateOrderLimit(t *testing.T, satellite storj.NodeID, storageNode storj.
|
||||
|
||||
// uploadPiece uploads piece to storageNode.
|
||||
func uploadPiece(
|
||||
t *testing.T, ctx *testcontext.Context, piece storj.PieceID, storageNode *storagenode.Peer,
|
||||
uplink *testplanet.Uplink, satellite *testplanet.SatelliteSystem,
|
||||
t *testing.T, ctx *testcontext.Context, piece storj.PieceID, storageNode *testplanet.StorageNode,
|
||||
uplink *testplanet.Uplink, satellite *testplanet.Satellite,
|
||||
) (uploadedData []byte, _ *pb.OrderLimit, _ *pb.PieceHash) {
|
||||
t.Helper()
|
||||
|
||||
@ -618,7 +618,7 @@ func uploadPiece(
|
||||
// downloadPiece downlodads piece from storageNode.
|
||||
func downloadPiece(
|
||||
t *testing.T, ctx *testcontext.Context, piece storj.PieceID, limit int64,
|
||||
storageNode *storagenode.Peer, uplink *testplanet.Uplink, satellite *testplanet.SatelliteSystem,
|
||||
storageNode *testplanet.StorageNode, uplink *testplanet.Uplink, satellite *testplanet.Satellite,
|
||||
) (pieceData []byte, err error) {
|
||||
t.Helper()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user