removes bandwidth limiting
On satellite, remove all references to free_bandwidth column in nodes table. On storage node, remove references to AllocatedBandwidth and MinimumBandwidth and mark as deprecated. Protobuf message, NodeCapacity, is left intact for backwards compatibility. Once this is released to all satellites, we can drop the column from the DB. Change-Id: I2ff6c6537fc9008a0c5588e951afea58ede85838
This commit is contained in:
parent
5f2ca0338b
commit
1c1750e6be
@ -16,5 +16,4 @@ ENTRYPOINT ["/entrypoint"]
|
||||
ENV ADDRESS="" \
|
||||
EMAIL="" \
|
||||
WALLET="" \
|
||||
BANDWIDTH="2.0TB" \
|
||||
STORAGE="2.0TB"
|
||||
|
@ -112,13 +112,7 @@ func printDashboard(data *pb.DashboardResponse) error {
|
||||
|
||||
stats := data.GetStats()
|
||||
if stats != nil {
|
||||
availBW := memory.Size(stats.GetAvailableBandwidth())
|
||||
usedBandwidth := color.WhiteString(memory.Size(stats.GetUsedBandwidth()).Base10String())
|
||||
if availBW < 0 {
|
||||
warnFlag = true
|
||||
availBW = 0
|
||||
}
|
||||
availableBandwidth := color.WhiteString((availBW).Base10String())
|
||||
availableSpace := color.WhiteString(memory.Size(stats.GetAvailableSpace()).Base10String())
|
||||
usedSpace := color.WhiteString(memory.Size(stats.GetUsedSpace()).Base10String())
|
||||
usedEgress := color.WhiteString(memory.Size(stats.GetUsedEgress()).Base10String())
|
||||
@ -126,7 +120,7 @@ func printDashboard(data *pb.DashboardResponse) error {
|
||||
|
||||
w = tabwriter.NewWriter(color.Output, 0, 0, 5, ' ', tabwriter.AlignRight)
|
||||
fmt.Fprintf(w, "\n\t%s\t%s\t%s\t%s\t\n", color.GreenString("Available"), color.GreenString("Used"), color.GreenString("Egress"), color.GreenString("Ingress"))
|
||||
fmt.Fprintf(w, "Bandwidth\t%s\t%s\t%s\t%s\t (since %s 1)\n", availableBandwidth, usedBandwidth, usedEgress, usedIngress, time.Now().Format("Jan"))
|
||||
fmt.Fprintf(w, "Bandwidth\t%s\t%s\t%s\t (since %s 1)\n", usedBandwidth, usedEgress, usedIngress, time.Now().Format("Jan"))
|
||||
fmt.Fprintf(w, "Disk\t%s\t%s\t\n", availableSpace, usedSpace)
|
||||
if err = w.Flush(); err != nil {
|
||||
return err
|
||||
|
@ -14,7 +14,6 @@ RUN_PARAMS="${RUN_PARAMS:-} --contact.external-address=${ADDRESS}"
|
||||
RUN_PARAMS="${RUN_PARAMS:-} --operator.email=${EMAIL}"
|
||||
RUN_PARAMS="${RUN_PARAMS:-} --operator.wallet=${WALLET}"
|
||||
RUN_PARAMS="${RUN_PARAMS:-} --console.address=:14002"
|
||||
RUN_PARAMS="${RUN_PARAMS:-} --storage.allocated-bandwidth=${BANDWIDTH}"
|
||||
RUN_PARAMS="${RUN_PARAMS:-} --storage.allocated-disk-space=${STORAGE}"
|
||||
|
||||
exec ./storagenode run $RUN_PARAMS "$@"
|
||||
|
@ -568,7 +568,6 @@ func newNetwork(flags *Flags) (*Processes, error) {
|
||||
"--operator.wallet", "0x0123456789012345678901234567890123456789",
|
||||
|
||||
"--storage2.monitor.minimum-disk-space", "0",
|
||||
"--storage2.monitor.minimum-bandwidth", "0",
|
||||
|
||||
"--server.extensions.revocation=false",
|
||||
"--server.use-peer-ca-whitelist=false",
|
||||
|
@ -1,28 +0,0 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Wix xmlns="http://schemas.microsoft.com/wix/2006/wi">
|
||||
<Fragment>
|
||||
<UI>
|
||||
<Dialog Id="AllocatedBandwidthConfigDlg" Width="370" Height="270" Title="[ProductName] Setup">
|
||||
<Control Id="Next" Type="PushButton" X="236" Y="243" Width="56" Height="17" Default="yes" Text="!(loc.WixUINext)" />
|
||||
<Control Id="Back" Type="PushButton" X="180" Y="243" Width="56" Height="17" Text="!(loc.WixUIBack)" />
|
||||
<Control Id="Cancel" Type="PushButton" X="304" Y="243" Width="56" Height="17" Cancel="yes" Text="!(loc.WixUICancel)">
|
||||
<Publish Event="SpawnDialog" Value="CancelDlg">1</Publish>
|
||||
</Control>
|
||||
|
||||
<Control Id="Description" Type="Text" X="25" Y="23" Width="280" Height="15" Transparent="yes" NoPrefix="yes" Text="Enter how much bandwidth is allocated for transferring data." />
|
||||
<Control Id="Title" Type="Text" X="15" Y="6" Width="200" Height="15" Transparent="yes" NoPrefix="yes" Text="{\WixUI_Font_Title}Bandwidth Configuration" />
|
||||
<Control Id="BannerBitmap" Type="Bitmap" X="0" Y="0" Width="370" Height="44" TabSkip="no" Text="!(loc.InstallDirDlgBannerBitmap)" />
|
||||
<Control Id="BannerLine" Type="Line" X="0" Y="44" Width="370" Height="0" />
|
||||
<Control Id="BottomLine" Type="Line" X="0" Y="234" Width="370" Height="0" />
|
||||
|
||||
<Control Id="AllocatedBandwidthLabel" Type="Text" X="20" Y="60" Width="320" Height="16" NoPrefix="yes" Text="Allocated bandwidth:" />
|
||||
<Control Id="AllocatedBandwidth" Type="Edit" RightAligned="yes" Property="STORJ_BANDWIDTH" X="20" Y="100" Width="60" Height="18" />
|
||||
<Control Id="AllocatedBandwidthTB" Type="Text" X="84" Y="103" Width="20" Height="16" NoPrefix="yes" Text="TB" />
|
||||
<Control Id="AllocatedBandwidthDesc" Type="Text" X="20" Y="150" Width="320" Height="48" NoPrefix="yes" Text="How much bandwidth you can allocate to the Storj network. Be sure to allow for other use cases you have for your internet connection, and do not allocate more than the total up and download bandwidth your ISP can physically supply. The minimum bandwidth requirement is 2 TB." />
|
||||
<Control Id="AllocatedBandwidthHowto" Type="Hyperlink" X="20" Y="200" Width="320" Height="16">
|
||||
<Text><![CDATA[<a href="https://support.storj.io/hc/en-us/articles/360026893111">Learn how to calculate the maximum monthly bandwidth you can enter here.</a>]]></Text>
|
||||
</Control>
|
||||
</Dialog>
|
||||
</UI>
|
||||
</Fragment>
|
||||
</Wix>
|
@ -12,7 +12,6 @@ First-time install dialog sequence:
|
||||
- PublicAddressConfigDlg
|
||||
- StorageDirConfigDlg
|
||||
- AllocatedStorageConfigDlg
|
||||
- AllocatedBandwidthConfigDlg
|
||||
- WixUI_VerifyReadyDlg
|
||||
|
||||
Maintenance dialog sequence:
|
||||
@ -31,7 +30,6 @@ Patch dialog sequence:
|
||||
<CustomAction Id="CA.ValidateWallet" BinaryKey="StorjDLL" DllEntry="ValidateWallet" Return="check" />
|
||||
<CustomAction Id="CA.ValidateStorageDir" BinaryKey="StorjDLL" DllEntry="ValidateStorageDir" Return="check" />
|
||||
<CustomAction Id="CA.ValidateStorage" BinaryKey="StorjDLL" DllEntry="ValidateStorage" Return="check" />
|
||||
<CustomAction Id="CA.ValidateBandwidth" BinaryKey="StorjDLL" DllEntry="ValidateBandwidth" Return="check" />
|
||||
|
||||
<UI Id="CustomInstallDir">
|
||||
<TextStyle Id="WixUI_Font_Normal" FaceName="Tahoma" Size="8" />
|
||||
@ -105,15 +103,9 @@ Patch dialog sequence:
|
||||
<Publish Dialog="AllocatedStorageConfigDlg" Control="Next" Event="DoAction" Value="CA.ValidateStorage" Order="1">1</Publish>
|
||||
<Publish Dialog="AllocatedStorageConfigDlg" Control="Next" Property="ErrorMsg" Value="[STORJ_STORAGE_VALID]" Order="2">1</Publish>
|
||||
<Publish Dialog="AllocatedStorageConfigDlg" Control="Next" Event="SpawnDialog" Value="ValidationErrorDlg" Order="3"><![CDATA[STORJ_STORAGE_VALID<>"1"]]></Publish>
|
||||
<Publish Dialog="AllocatedStorageConfigDlg" Control="Next" Event="NewDialog" Value="AllocatedBandwidthConfigDlg" Order="4">STORJ_STORAGE_VALID="1"</Publish>
|
||||
<Publish Dialog="AllocatedStorageConfigDlg" Control="Next" Event="NewDialog" Value="VerifyReadyDlg" Order="4">STORJ_STORAGE_VALID="1"</Publish>
|
||||
|
||||
<Publish Dialog="AllocatedBandwidthConfigDlg" Control="Back" Event="NewDialog" Value="AllocatedStorageConfigDlg" Order="1">1</Publish>
|
||||
<Publish Dialog="AllocatedBandwidthConfigDlg" Control="Next" Event="DoAction" Value="CA.ValidateBandwidth" Order="1">1</Publish>
|
||||
<Publish Dialog="AllocatedBandwidthConfigDlg" Control="Next" Property="ErrorMsg" Value="[STORJ_BANDWIDTH_VALID]" Order="2">1</Publish>
|
||||
<Publish Dialog="AllocatedBandwidthConfigDlg" Control="Next" Event="SpawnDialog" Value="ValidationErrorDlg" Order="3"><![CDATA[STORJ_BANDWIDTH_VALID<>"1"]]></Publish>
|
||||
<Publish Dialog="AllocatedBandwidthConfigDlg" Control="Next" Event="NewDialog" Value="VerifyReadyDlg" Order="4">STORJ_BANDWIDTH_VALID="1"</Publish>
|
||||
|
||||
<Publish Dialog="VerifyReadyDlg" Control="Back" Event="NewDialog" Value="AllocatedBandwidthConfigDlg" Order="1">NOT Installed AND NOT WIX_UPGRADE_DETECTED</Publish>
|
||||
<Publish Dialog="VerifyReadyDlg" Control="Back" Event="NewDialog" Value="AllocatedStorageConfigDlg" Order="1">NOT Installed AND NOT WIX_UPGRADE_DETECTED</Publish>
|
||||
<Publish Dialog="VerifyReadyDlg" Control="Back" Event="NewDialog" Value="MaintenanceTypeDlg" Order="2">Installed AND NOT PATCH AND NOT WIX_UPGRADE_DETECTED</Publish>
|
||||
<Publish Dialog="VerifyReadyDlg" Control="Back" Event="NewDialog" Value="StorjWelcomeDlg" Order="2">Installed AND PATCH AND NOT WIX_UPGRADE_DETECTED</Publish>
|
||||
|
||||
|
@ -93,7 +93,6 @@
|
||||
<Property Id="WIXUI_INSTALLDIR">INSTALLFOLDER</Property>
|
||||
<Property Id="STORJ_STORAGEDIR">STORAGEDIR</Property>
|
||||
<Property Id="STORJ_STORAGE">1.0</Property>
|
||||
<Property Id="STORJ_BANDWIDTH">2.0</Property>
|
||||
<Property Id="STORJ_IDENTITYDIR">IDENTITYDIR</Property>
|
||||
<Property Id="STORJ_DEFAULTIDENTITYDIR">
|
||||
<DirectorySearch Id='search1' Path='[PersonalFolder]' />
|
||||
@ -121,7 +120,7 @@
|
||||
<Binary Id="StorjDLL" SourceFile="$(var.Storj.TargetDir)Storj.CA.dll" />
|
||||
|
||||
<CustomAction Id='StoragenodeSetup' Directory='INSTALLFOLDER' Execute='deferred' Impersonate='no'
|
||||
ExeCommand=""[INSTALLFOLDER]storagenode.exe" setup --config-dir "[INSTALLFOLDER]\" --identity-dir "[IDENTITYDIR]\" --operator.email "[STORJ_EMAIL]" --operator.wallet "[STORJ_WALLET]" --contact.external-address "[STORJ_PUBLIC_ADDRESS]" --storage.path "[STORAGEDIR]\" --storage.allocated-bandwidth "[STORJ_BANDWIDTH] TB" --storage.allocated-disk-space "[STORJ_STORAGE] TB" --log.output "winfile:///[INSTALLFOLDER]\storagenode.log"" />
|
||||
ExeCommand=""[INSTALLFOLDER]storagenode.exe" setup --config-dir "[INSTALLFOLDER]\" --identity-dir "[IDENTITYDIR]\" --operator.email "[STORJ_EMAIL]" --operator.wallet "[STORJ_WALLET]" --contact.external-address "[STORJ_PUBLIC_ADDRESS]" --storage.path "[STORAGEDIR]\" --storage.allocated-disk-space "[STORJ_STORAGE] TB" --log.output "winfile:///[INSTALLFOLDER]\storagenode.log"" />
|
||||
|
||||
<CustomAction Id="DeleteConfigFile" Directory='INSTALLFOLDER'
|
||||
ExeCommand="cmd /C "del config.yaml"" Execute="deferred" Return="ignore" Impersonate="no" />
|
||||
|
@ -94,27 +94,6 @@ namespace Storj
|
||||
return ActionResult.Success;
|
||||
}
|
||||
|
||||
[CustomAction]
|
||||
public static ActionResult ValidateBandwidth(Session session)
|
||||
{
|
||||
string bandwidthStr = session["STORJ_BANDWIDTH"];
|
||||
|
||||
try
|
||||
{
|
||||
new CustomActionRunner().ValidateBandwidth(bandwidthStr);
|
||||
}
|
||||
catch (ArgumentException e)
|
||||
{
|
||||
// Allocated Bandwidth is invalid
|
||||
session["STORJ_BANDWIDTH_VALID"] = e.Message;
|
||||
return ActionResult.Success;
|
||||
}
|
||||
|
||||
// Allocated Bandwidth value is valid
|
||||
session["STORJ_BANDWIDTH_VALID"] = "1";
|
||||
return ActionResult.Success;
|
||||
}
|
||||
|
||||
[CustomAction]
|
||||
public static ActionResult ExtractInstallDir(Session session)
|
||||
{
|
||||
@ -263,24 +242,6 @@ namespace Storj
|
||||
}
|
||||
}
|
||||
|
||||
public void ValidateBandwidth(string bandwidthStr)
|
||||
{
|
||||
if (string.IsNullOrEmpty(bandwidthStr))
|
||||
{
|
||||
throw new ArgumentException("The value cannot be empty.");
|
||||
}
|
||||
|
||||
if (!double.TryParse(bandwidthStr, NumberStyles.Number, CultureInfo.CreateSpecificCulture("en-US"), out double bandwidth))
|
||||
{
|
||||
throw new ArgumentException(string.Format("'{0}' is not a valid number.", bandwidthStr));
|
||||
}
|
||||
|
||||
if (bandwidth < 2.0)
|
||||
{
|
||||
throw new ArgumentException("The allocated bandwidth cannot be less than 2 TB.");
|
||||
}
|
||||
}
|
||||
|
||||
public string ExtractInstallDir(string serviceCmd)
|
||||
{
|
||||
if (string.IsNullOrEmpty(serviceCmd))
|
||||
|
@ -84,7 +84,6 @@
|
||||
<Compile Include="ValidateStorageDirTests.cs" />
|
||||
<Compile Include="MockHelpers.cs" />
|
||||
<Compile Include="ValidateStorageTests.cs" />
|
||||
<Compile Include="ValidateBandwidthTests.cs" />
|
||||
<Compile Include="ValidateWalletTests.cs" />
|
||||
<Compile Include="Properties\AssemblyInfo.cs" />
|
||||
</ItemGroup>
|
||||
|
@ -1,44 +0,0 @@
|
||||
using System;
|
||||
using Microsoft.VisualStudio.TestTools.UnitTesting;
|
||||
using Storj;
|
||||
|
||||
namespace StorjTests
|
||||
{
|
||||
[TestClass]
|
||||
public class ValidateBandwidthTests
|
||||
{
|
||||
[TestMethod]
|
||||
[ExpectedExceptionWithMessage(typeof(ArgumentException), "The value cannot be empty.")]
|
||||
public void NullBandwidth()
|
||||
{
|
||||
new CustomActionRunner().ValidateBandwidth(null);
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
[ExpectedExceptionWithMessage(typeof(ArgumentException), "The value cannot be empty.")]
|
||||
public void EmptyBandwidth()
|
||||
{
|
||||
new CustomActionRunner().ValidateBandwidth("");
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
[ExpectedExceptionWithMessage(typeof(ArgumentException), "'some random text' is not a valid number.")]
|
||||
public void InvalidNumber()
|
||||
{
|
||||
new CustomActionRunner().ValidateBandwidth("some random text");
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
[ExpectedExceptionWithMessage(typeof(ArgumentException), "The allocated bandwidth cannot be less than 2 TB.")]
|
||||
public void TooSmall()
|
||||
{
|
||||
new CustomActionRunner().ValidateBandwidth("1.41");
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
public void ValidBandwidth()
|
||||
{
|
||||
new CustomActionRunner().ValidateBandwidth("3.14");
|
||||
}
|
||||
}
|
||||
}
|
@ -26,7 +26,6 @@
|
||||
<Compile Include="InstallDirConfig.wxs" />
|
||||
<Compile Include="CustomInstallDir.wxs" />
|
||||
<Compile Include="EmailConfig.wxs" />
|
||||
<Compile Include="AllocatedBandwidthConfig.wxs" />
|
||||
<Compile Include="PublicAddressConfig.wxs" />
|
||||
<Compile Include="StorageDirConfig.wxs" />
|
||||
<Compile Include="WalletConfig.wxs" />
|
||||
|
@ -90,5 +90,3 @@ storj.io/storj/satellite/satellitedb."audit_reputation_beta" FloatVal
|
||||
storj.io/storj/storage/filestore."open_file_in_trash" Meter
|
||||
storj.io/storj/storagenode/contact."satellite_contact_request" Meter
|
||||
storj.io/storj/storagenode/gracefulexit."satellite_gracefulexit_request" Meter
|
||||
storj.io/storj/storagenode/monitor."allocated_bandwidth" IntVal
|
||||
storj.io/storj/storagenode/monitor."used_bandwidth" IntVal
|
||||
|
@ -98,7 +98,6 @@ func (planet *Planet) newStorageNodes(count int, whitelistedSatellites storj.Nod
|
||||
Storage: piecestore.OldConfig{
|
||||
Path: filepath.Join(storageDir, "pieces/"),
|
||||
AllocatedDiskSpace: 1 * memory.GB,
|
||||
AllocatedBandwidth: memory.TB,
|
||||
KBucketRefreshInterval: defaultInterval,
|
||||
},
|
||||
Collector: collector.Config{
|
||||
@ -128,7 +127,6 @@ func (planet *Planet) newStorageNodes(count int, whitelistedSatellites storj.Nod
|
||||
MaxSleep: 0,
|
||||
},
|
||||
Monitor: monitor.Config{
|
||||
MinimumBandwidth: 100 * memory.MB,
|
||||
MinimumDiskSpace: 100 * memory.MB,
|
||||
NotifyLowDiskCooldown: defaultInterval,
|
||||
},
|
||||
|
@ -175,7 +175,6 @@ func TestDisqualifiedNodesGetNoUpload(t *testing.T) {
|
||||
request := overlay.FindStorageNodesRequest{
|
||||
MinimumRequiredNodes: 4,
|
||||
RequestedCount: 0,
|
||||
FreeBandwidth: 0,
|
||||
ExcludedNodes: nil,
|
||||
MinimumVersion: "", // semver or empty
|
||||
}
|
||||
|
@ -396,7 +396,6 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream processS
|
||||
// get replacement node
|
||||
request := &overlay.FindStorageNodesRequest{
|
||||
RequestedCount: 1,
|
||||
FreeBandwidth: pieceSize,
|
||||
ExcludedNodes: excludedNodeIDs,
|
||||
}
|
||||
|
||||
|
@ -201,7 +201,6 @@ func (endpoint *Endpoint) CreateSegmentOld(ctx context.Context, req *pb.SegmentW
|
||||
|
||||
request := overlay.FindStorageNodesRequest{
|
||||
RequestedCount: int(req.Redundancy.Total),
|
||||
FreeBandwidth: maxPieceSize,
|
||||
}
|
||||
nodes, err := endpoint.overlay.FindStorageNodes(ctx, request)
|
||||
if err != nil {
|
||||
@ -1483,7 +1482,6 @@ func (endpoint *Endpoint) BeginSegment(ctx context.Context, req *pb.SegmentBegin
|
||||
|
||||
request := overlay.FindStorageNodesRequest{
|
||||
RequestedCount: redundancy.TotalCount(),
|
||||
FreeBandwidth: maxPieceSize,
|
||||
}
|
||||
nodes, err := endpoint.overlay.FindStorageNodes(ctx, request)
|
||||
if err != nil {
|
||||
|
@ -108,8 +108,7 @@ func BenchmarkOverlay(b *testing.B) {
|
||||
Email: "a@mail.test",
|
||||
},
|
||||
Capacity: &pb.NodeCapacity{
|
||||
FreeBandwidth: 1000,
|
||||
FreeDisk: 1000,
|
||||
FreeDisk: 1000,
|
||||
},
|
||||
Version: &pb.NodeVersion{
|
||||
Version: "1.0.0",
|
||||
@ -141,8 +140,7 @@ func BenchmarkOverlay(b *testing.B) {
|
||||
},
|
||||
IsUp: true,
|
||||
Capacity: &pb.NodeCapacity{
|
||||
FreeBandwidth: int64(i),
|
||||
FreeDisk: int64(i),
|
||||
FreeDisk: int64(i),
|
||||
},
|
||||
Operator: &pb.NodeOperator{
|
||||
Email: "a@mail.test",
|
||||
|
@ -56,8 +56,7 @@ func TestMinimumDiskSpace(t *testing.T) {
|
||||
Address: nodeDossier.Address.GetAddress(),
|
||||
Version: &nodeDossier.Version,
|
||||
Capacity: &pb.NodeCapacity{
|
||||
FreeBandwidth: 100000,
|
||||
FreeDisk: 9 * memory.MB.Int64(),
|
||||
FreeDisk: 9 * memory.MB.Int64(),
|
||||
},
|
||||
Operator: &nodeDossier.Operator,
|
||||
})
|
||||
@ -76,8 +75,7 @@ func TestMinimumDiskSpace(t *testing.T) {
|
||||
Address: nodeDossier.Address.GetAddress(),
|
||||
Version: &nodeDossier.Version,
|
||||
Capacity: &pb.NodeCapacity{
|
||||
FreeBandwidth: 100000,
|
||||
FreeDisk: 11 * memory.MB.Int64(),
|
||||
FreeDisk: 11 * memory.MB.Int64(),
|
||||
},
|
||||
Operator: &nodeDossier.Operator,
|
||||
})
|
||||
@ -251,7 +249,6 @@ func testNodeSelection(t *testing.T, ctx *testcontext.Context, planet *testplane
|
||||
}
|
||||
|
||||
response, err := service.FindStorageNodesWithPreferences(ctx, overlay.FindStorageNodesRequest{
|
||||
FreeBandwidth: 0,
|
||||
RequestedCount: tt.RequestCount,
|
||||
ExcludedNodes: excludedNodes,
|
||||
}, &tt.Preferences)
|
||||
@ -348,7 +345,6 @@ func TestNodeSelectionGracefulExit(t *testing.T) {
|
||||
t.Logf("#%2d. %+v", i, tt)
|
||||
|
||||
response, err := satellite.Overlay.Service.FindStorageNodesWithPreferences(ctx, overlay.FindStorageNodesRequest{
|
||||
FreeBandwidth: 0,
|
||||
RequestedCount: tt.RequestCount,
|
||||
}, &tt.Preferences)
|
||||
|
||||
@ -556,7 +552,6 @@ func testDistinctIPs(t *testing.T, ctx *testcontext.Context, planet *testplanet.
|
||||
|
||||
for _, tt := range tests {
|
||||
response, err := service.FindStorageNodesWithPreferences(ctx, overlay.FindStorageNodesRequest{
|
||||
FreeBandwidth: 0,
|
||||
RequestedCount: tt.requestCount,
|
||||
}, &tt.preferences)
|
||||
if tt.shouldFailWith != nil {
|
||||
|
@ -111,14 +111,12 @@ type NodeCheckInInfo struct {
|
||||
type FindStorageNodesRequest struct {
|
||||
MinimumRequiredNodes int
|
||||
RequestedCount int
|
||||
FreeBandwidth int64
|
||||
ExcludedNodes []storj.NodeID
|
||||
MinimumVersion string // semver or empty
|
||||
}
|
||||
|
||||
// NodeCriteria are the requirements for selecting nodes
|
||||
type NodeCriteria struct {
|
||||
FreeBandwidth int64
|
||||
FreeDisk int64
|
||||
AuditCount int64
|
||||
UptimeCount int64
|
||||
@ -286,7 +284,6 @@ func (service *Service) FindStorageNodesWithPreferences(ctx context.Context, req
|
||||
var newNodes []*pb.Node
|
||||
if newNodeCount > 0 {
|
||||
newNodes, err = service.db.SelectNewStorageNodes(ctx, newNodeCount, &NodeCriteria{
|
||||
FreeBandwidth: req.FreeBandwidth,
|
||||
FreeDisk: preferences.MinimumDiskSpace.Int64(),
|
||||
AuditCount: preferences.AuditCount,
|
||||
ExcludedNodes: excludedNodes,
|
||||
@ -309,7 +306,6 @@ func (service *Service) FindStorageNodesWithPreferences(ctx context.Context, req
|
||||
}
|
||||
|
||||
criteria := NodeCriteria{
|
||||
FreeBandwidth: req.FreeBandwidth,
|
||||
FreeDisk: preferences.MinimumDiskSpace.Int64(),
|
||||
AuditCount: preferences.AuditCount,
|
||||
UptimeCount: preferences.UptimeCount,
|
||||
|
@ -271,7 +271,6 @@ func TestNodeInfo(t *testing.T) {
|
||||
assert.NotEmpty(t, node.Operator.Email)
|
||||
assert.NotEmpty(t, node.Operator.Wallet)
|
||||
assert.Equal(t, planet.StorageNodes[0].Local().Operator, node.Operator)
|
||||
assert.NotEmpty(t, node.Capacity.FreeBandwidth)
|
||||
assert.NotEmpty(t, node.Capacity.FreeDisk)
|
||||
assert.Equal(t, planet.StorageNodes[0].Local().Capacity, node.Capacity)
|
||||
assert.NotEmpty(t, node.Version.Version)
|
||||
@ -352,8 +351,7 @@ func TestUpdateCheckIn(t *testing.T) {
|
||||
},
|
||||
IsUp: true,
|
||||
Capacity: &pb.NodeCapacity{
|
||||
FreeBandwidth: int64(1234),
|
||||
FreeDisk: int64(5678),
|
||||
FreeDisk: int64(5678),
|
||||
},
|
||||
Operator: &pb.NodeOperator{
|
||||
Email: expectedEmail,
|
||||
@ -381,8 +379,7 @@ func TestUpdateCheckIn(t *testing.T) {
|
||||
Wallet: info.Operator.GetWallet(),
|
||||
},
|
||||
Capacity: pb.NodeCapacity{
|
||||
FreeBandwidth: info.Capacity.GetFreeBandwidth(),
|
||||
FreeDisk: info.Capacity.GetFreeDisk(),
|
||||
FreeDisk: info.Capacity.GetFreeDisk(),
|
||||
},
|
||||
Reputation: overlay.NodeStats{
|
||||
UptimeCount: 1,
|
||||
@ -434,9 +431,6 @@ func TestUpdateCheckIn(t *testing.T) {
|
||||
Address: expectedAddress,
|
||||
},
|
||||
IsUp: true,
|
||||
Capacity: &pb.NodeCapacity{
|
||||
FreeBandwidth: int64(12355),
|
||||
},
|
||||
Version: &pb.NodeVersion{
|
||||
Version: "v0.1.0",
|
||||
CommitHash: "abc123",
|
||||
@ -454,7 +448,6 @@ func TestUpdateCheckIn(t *testing.T) {
|
||||
require.True(t, updatedNode.Reputation.LastContactFailure.Equal(time.Time{}.UTC()))
|
||||
require.Equal(t, updatedNode.Address.GetAddress(), expectedAddress)
|
||||
require.Equal(t, updatedNode.Reputation.UptimeSuccessCount, actualNode.Reputation.UptimeSuccessCount+1)
|
||||
require.Equal(t, updatedNode.Capacity.GetFreeBandwidth(), int64(12355))
|
||||
require.Equal(t, updatedInfo.Version.GetVersion(), updatedNode.Version.GetVersion())
|
||||
require.Equal(t, updatedInfo.Version.GetCommitHash(), updatedNode.Version.GetCommitHash())
|
||||
require.Equal(t, updatedInfo.Version.GetRelease(), updatedNode.Version.GetRelease())
|
||||
@ -468,9 +461,6 @@ func TestUpdateCheckIn(t *testing.T) {
|
||||
Address: "9.8.7.6",
|
||||
},
|
||||
IsUp: false,
|
||||
Capacity: &pb.NodeCapacity{
|
||||
FreeBandwidth: int64(12355),
|
||||
},
|
||||
Version: &pb.NodeVersion{
|
||||
Version: "v0.0.0",
|
||||
CommitHash: "",
|
||||
|
@ -100,8 +100,6 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s
|
||||
return true, Error.Wrap(err)
|
||||
}
|
||||
|
||||
pieceSize := eestream.CalcPieceSize(pointer.GetSegmentSize(), redundancy)
|
||||
|
||||
var excludeNodeIDs storj.NodeIDList
|
||||
var healthyPieces, unhealthyPieces []*pb.RemotePiece
|
||||
healthyMap := make(map[int32]bool)
|
||||
@ -171,7 +169,6 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s
|
||||
// Request Overlay for n-h new storage nodes
|
||||
request := overlay.FindStorageNodesRequest{
|
||||
RequestedCount: requestCount,
|
||||
FreeBandwidth: pieceSize,
|
||||
ExcludedNodes: excludeNodeIDs,
|
||||
}
|
||||
newNodes, err := repairer.overlay.FindStorageNodes(ctx, request)
|
||||
|
@ -47,13 +47,12 @@ func (cache *overlaycache) SelectStorageNodes(ctx context.Context, count int, cr
|
||||
WHERE disqualified IS NULL
|
||||
AND exit_initiated_at IS NULL
|
||||
AND type = ?
|
||||
AND free_bandwidth >= ?
|
||||
AND free_disk >= ?
|
||||
AND total_audit_count >= ?
|
||||
AND total_uptime_count >= ?
|
||||
AND last_contact_success > ?`
|
||||
args := append(make([]interface{}, 0, 13),
|
||||
nodeType, criteria.FreeBandwidth, criteria.FreeDisk, criteria.AuditCount,
|
||||
nodeType, criteria.FreeDisk, criteria.AuditCount,
|
||||
criteria.UptimeCount, time.Now().Add(-criteria.OnlineWindow))
|
||||
|
||||
if criteria.MinimumVersion != "" {
|
||||
@ -103,12 +102,11 @@ func (cache *overlaycache) SelectNewStorageNodes(ctx context.Context, count int,
|
||||
WHERE disqualified IS NULL
|
||||
AND exit_initiated_at IS NULL
|
||||
AND type = ?
|
||||
AND free_bandwidth >= ?
|
||||
AND free_disk >= ?
|
||||
AND (total_audit_count < ? OR total_uptime_count < ?)
|
||||
AND last_contact_success > ?`
|
||||
args := append(make([]interface{}, 0, 10),
|
||||
nodeType, criteria.FreeBandwidth, criteria.FreeDisk, criteria.AuditCount, criteria.UptimeCount, time.Now().Add(-criteria.OnlineWindow))
|
||||
nodeType, criteria.FreeDisk, criteria.AuditCount, criteria.UptimeCount, time.Now().Add(-criteria.OnlineWindow))
|
||||
|
||||
if criteria.MinimumVersion != "" {
|
||||
v, err := version.NewSemVer(criteria.MinimumVersion)
|
||||
@ -193,7 +191,7 @@ func (cache *overlaycache) queryNodes(ctx context.Context, excludedNodes []storj
|
||||
|
||||
var rows *sql.Rows
|
||||
rows, err = cache.db.Query(ctx, cache.db.Rebind(`SELECT id, type, address, last_net,
|
||||
free_bandwidth, free_disk, total_audit_count, audit_success_count,
|
||||
free_disk, total_audit_count, audit_success_count,
|
||||
total_uptime_count, uptime_success_count, disqualified, audit_reputation_alpha,
|
||||
audit_reputation_beta
|
||||
FROM nodes
|
||||
@ -210,7 +208,7 @@ func (cache *overlaycache) queryNodes(ctx context.Context, excludedNodes []storj
|
||||
for rows.Next() {
|
||||
dbNode := &dbx.Node{}
|
||||
err = rows.Scan(&dbNode.Id, &dbNode.Type,
|
||||
&dbNode.Address, &dbNode.LastNet, &dbNode.FreeBandwidth, &dbNode.FreeDisk,
|
||||
&dbNode.Address, &dbNode.LastNet, &dbNode.FreeDisk,
|
||||
&dbNode.TotalAuditCount, &dbNode.AuditSuccessCount,
|
||||
&dbNode.TotalUptimeCount, &dbNode.UptimeSuccessCount, &dbNode.Disqualified,
|
||||
&dbNode.AuditReputationAlpha, &dbNode.AuditReputationBeta,
|
||||
@ -257,7 +255,7 @@ func (cache *overlaycache) queryNodesDistinct(ctx context.Context, excludedNodes
|
||||
SELECT *
|
||||
FROM (
|
||||
SELECT DISTINCT ON (last_net) last_net, -- choose at max 1 node from this IP or network
|
||||
id, type, address, free_bandwidth, free_disk, total_audit_count,
|
||||
id, type, address, free_disk, total_audit_count,
|
||||
audit_success_count, total_uptime_count, uptime_success_count,
|
||||
audit_reputation_alpha, audit_reputation_beta
|
||||
FROM nodes
|
||||
@ -276,7 +274,7 @@ func (cache *overlaycache) queryNodesDistinct(ctx context.Context, excludedNodes
|
||||
for rows.Next() {
|
||||
dbNode := &dbx.Node{}
|
||||
err = rows.Scan(&dbNode.LastNet, &dbNode.Id, &dbNode.Type,
|
||||
&dbNode.Address, &dbNode.FreeBandwidth, &dbNode.FreeDisk,
|
||||
&dbNode.Address, &dbNode.FreeDisk,
|
||||
&dbNode.TotalAuditCount, &dbNode.AuditSuccessCount,
|
||||
&dbNode.TotalUptimeCount, &dbNode.UptimeSuccessCount,
|
||||
&dbNode.AuditReputationAlpha, &dbNode.AuditReputationBeta,
|
||||
@ -517,66 +515,45 @@ func (cache *overlaycache) UpdateAddress(ctx context.Context, info *pb.Node, def
|
||||
return overlay.ErrEmptyNode
|
||||
}
|
||||
|
||||
err = cache.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) (err error) {
|
||||
// TODO: use upsert
|
||||
_, err = tx.Get_Node_By_Id(ctx, dbx.Node_Id(info.Id.Bytes()))
|
||||
|
||||
address := info.Address
|
||||
if address == nil {
|
||||
address = &pb.NodeAddress{}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if err != sql.ErrNoRows {
|
||||
return err
|
||||
}
|
||||
// add the node to DB for first time
|
||||
err = tx.CreateNoReturn_Node(
|
||||
ctx,
|
||||
dbx.Node_Id(info.Id.Bytes()),
|
||||
dbx.Node_Address(address.Address),
|
||||
dbx.Node_LastNet(info.LastIp),
|
||||
dbx.Node_Protocol(int(address.Transport)),
|
||||
dbx.Node_Type(int(pb.NodeType_INVALID)),
|
||||
dbx.Node_Email(""),
|
||||
dbx.Node_Wallet(""),
|
||||
dbx.Node_FreeBandwidth(-1),
|
||||
dbx.Node_FreeDisk(-1),
|
||||
dbx.Node_Major(0),
|
||||
dbx.Node_Minor(0),
|
||||
dbx.Node_Patch(0),
|
||||
dbx.Node_Hash(""),
|
||||
dbx.Node_Timestamp(time.Time{}),
|
||||
dbx.Node_Release(false),
|
||||
dbx.Node_Latency90(0),
|
||||
dbx.Node_AuditSuccessCount(0),
|
||||
dbx.Node_TotalAuditCount(0),
|
||||
dbx.Node_UptimeSuccessCount(0),
|
||||
dbx.Node_TotalUptimeCount(0),
|
||||
dbx.Node_LastContactSuccess(time.Now()),
|
||||
dbx.Node_LastContactFailure(time.Time{}),
|
||||
dbx.Node_Contained(false),
|
||||
dbx.Node_AuditReputationAlpha(defaults.AuditReputationAlpha0),
|
||||
dbx.Node_AuditReputationBeta(defaults.AuditReputationBeta0),
|
||||
//TODO: remove uptime reputation after finishing db migration
|
||||
dbx.Node_UptimeReputationAlpha(0),
|
||||
dbx.Node_UptimeReputationBeta(0),
|
||||
dbx.Node_ExitSuccess(false),
|
||||
dbx.Node_Create_Fields{
|
||||
Disqualified: dbx.Node_Disqualified_Null(),
|
||||
},
|
||||
address := info.Address
|
||||
if address == nil {
|
||||
address = &pb.NodeAddress{}
|
||||
}
|
||||
query := `
|
||||
INSERT INTO nodes
|
||||
(
|
||||
id, address, last_net, protocol, type,
|
||||
email, wallet, free_disk,
|
||||
uptime_success_count, total_uptime_count,
|
||||
last_contact_success,
|
||||
last_contact_failure,
|
||||
audit_reputation_alpha, audit_reputation_beta,
|
||||
major, minor, patch, hash, timestamp, release
|
||||
)
|
||||
} else {
|
||||
err = tx.UpdateNoReturn_Node_By_Id(ctx, dbx.Node_Id(info.Id.Bytes()),
|
||||
dbx.Node_Update_Fields{
|
||||
Address: dbx.Node_Address(address.Address),
|
||||
LastNet: dbx.Node_LastNet(info.LastIp),
|
||||
Protocol: dbx.Node_Protocol(int(address.Transport)),
|
||||
})
|
||||
}
|
||||
|
||||
return err
|
||||
})
|
||||
VALUES (
|
||||
$1, $2, $3, $4, $5,
|
||||
'', '', -1,
|
||||
0, 0,
|
||||
$8::timestamptz,
|
||||
'0001-01-01 00:00:00+00'::timestamptz,
|
||||
$6, $7,
|
||||
0, 0, 0, '', '0001-01-01 00:00:00+00'::timestamptz, false
|
||||
)
|
||||
ON CONFLICT (id)
|
||||
DO UPDATE
|
||||
SET
|
||||
address=$2,
|
||||
last_net=$3,
|
||||
protocol=$4
|
||||
`
|
||||
_, err = cache.db.ExecContext(ctx, query,
|
||||
// args $1 - $5
|
||||
info.Id.Bytes(), address.Address, info.LastIp, int(address.Transport), int(pb.NodeType_INVALID),
|
||||
// args $6 - $7
|
||||
defaults.AuditReputationAlpha0, defaults.AuditReputationBeta0,
|
||||
// args $8
|
||||
time.Now(),
|
||||
)
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
@ -702,7 +679,7 @@ func (cache *overlaycache) UpdateStats(ctx context.Context, updateReq *overlay.U
|
||||
}
|
||||
|
||||
// UpdateNodeInfo updates the following fields for a given node ID:
|
||||
// wallet, email for node operator, free disk and bandwidth capacity, and version
|
||||
// wallet, email for node operator, free disk, and version
|
||||
func (cache *overlaycache) UpdateNodeInfo(ctx context.Context, nodeID storj.NodeID, nodeInfo *pb.InfoResponse) (stats *overlay.NodeDossier, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
@ -717,7 +694,6 @@ func (cache *overlaycache) UpdateNodeInfo(ctx context.Context, nodeID storj.Node
|
||||
}
|
||||
if nodeInfo.GetCapacity() != nil {
|
||||
updateFields.FreeDisk = dbx.Node_FreeDisk(nodeInfo.GetCapacity().GetFreeDisk())
|
||||
updateFields.FreeBandwidth = dbx.Node_FreeBandwidth(nodeInfo.GetCapacity().GetFreeBandwidth())
|
||||
}
|
||||
if nodeInfo.GetVersion() != nil {
|
||||
semVer, err := version.NewSemVer(nodeInfo.GetVersion().GetVersion())
|
||||
@ -1132,8 +1108,7 @@ func convertDBNode(ctx context.Context, info *dbx.Node) (_ *overlay.NodeDossier,
|
||||
Wallet: info.Wallet,
|
||||
},
|
||||
Capacity: pb.NodeCapacity{
|
||||
FreeBandwidth: info.FreeBandwidth,
|
||||
FreeDisk: info.FreeDisk,
|
||||
FreeDisk: info.FreeDisk,
|
||||
},
|
||||
Reputation: *getNodeStats(info),
|
||||
Version: pb.NodeVersion{
|
||||
@ -1426,7 +1401,7 @@ func (cache *overlaycache) UpdateCheckIn(ctx context.Context, node overlay.NodeC
|
||||
INSERT INTO nodes
|
||||
(
|
||||
id, address, last_net, protocol, type,
|
||||
email, wallet, free_bandwidth, free_disk,
|
||||
email, wallet, free_disk,
|
||||
uptime_success_count, total_uptime_count,
|
||||
last_contact_success,
|
||||
last_contact_failure,
|
||||
@ -1436,17 +1411,17 @@ func (cache *overlaycache) UpdateCheckIn(ctx context.Context, node overlay.NodeC
|
||||
)
|
||||
VALUES (
|
||||
$1, $2, $3, $4, $5,
|
||||
$6, $7, $8, $9,
|
||||
$10::bool::int, 1,
|
||||
CASE WHEN $10::bool IS TRUE THEN $19::timestamptz
|
||||
$6, $7, $8,
|
||||
$9::bool::int, 1,
|
||||
CASE WHEN $9::bool IS TRUE THEN $18::timestamptz
|
||||
ELSE '0001-01-01 00:00:00+00'::timestamptz
|
||||
END,
|
||||
CASE WHEN $10::bool IS FALSE THEN $19::timestamptz
|
||||
CASE WHEN $9::bool IS FALSE THEN $18::timestamptz
|
||||
ELSE '0001-01-01 00:00:00+00'::timestamptz
|
||||
END,
|
||||
$11, $12,
|
||||
$11, $12,
|
||||
$13, $14, $15, $16, $17, $18
|
||||
$10, $11,
|
||||
$10, $11,
|
||||
$12, $13, $14, $15, $16, $17
|
||||
)
|
||||
ON CONFLICT (id)
|
||||
DO UPDATE
|
||||
@ -1456,32 +1431,32 @@ func (cache *overlaycache) UpdateCheckIn(ctx context.Context, node overlay.NodeC
|
||||
protocol=$4,
|
||||
email=$6,
|
||||
wallet=$7,
|
||||
free_bandwidth=$8,
|
||||
free_disk=$9,
|
||||
major=$13, minor=$14, patch=$15, hash=$16, timestamp=$17, release=$18,
|
||||
|
||||
free_disk=$8,
|
||||
major=$12, minor=$13, patch=$14, hash=$15, timestamp=$16, release=$17,
|
||||
total_uptime_count=nodes.total_uptime_count+1,
|
||||
uptime_success_count = nodes.uptime_success_count + $10::bool::int,
|
||||
last_contact_success = CASE WHEN $10::bool IS TRUE
|
||||
THEN $19::timestamptz
|
||||
uptime_success_count = nodes.uptime_success_count + $9::bool::int,
|
||||
last_contact_success = CASE WHEN $9::bool IS TRUE
|
||||
THEN $18::timestamptz
|
||||
ELSE nodes.last_contact_success
|
||||
END,
|
||||
last_contact_failure = CASE WHEN $10::bool IS FALSE
|
||||
THEN $19::timestamptz
|
||||
last_contact_failure = CASE WHEN $9::bool IS FALSE
|
||||
THEN $18::timestamptz
|
||||
ELSE nodes.last_contact_failure
|
||||
END;
|
||||
`
|
||||
_, err = cache.db.ExecContext(ctx, query,
|
||||
// args $1 - $5
|
||||
node.NodeID.Bytes(), node.Address.GetAddress(), node.LastIP, node.Address.GetTransport(), int(pb.NodeType_STORAGE),
|
||||
// args $6 - $9
|
||||
node.Operator.GetEmail(), node.Operator.GetWallet(), node.Capacity.GetFreeBandwidth(), node.Capacity.GetFreeDisk(),
|
||||
// args $10
|
||||
// args $6 - $8
|
||||
node.Operator.GetEmail(), node.Operator.GetWallet(), node.Capacity.GetFreeDisk(),
|
||||
// args $9
|
||||
node.IsUp,
|
||||
// args $11 - $12
|
||||
// args $10 - $11
|
||||
config.AuditReputationAlpha0, config.AuditReputationBeta0,
|
||||
// args $13 - $18
|
||||
// args $12 - $17
|
||||
semVer.Major, semVer.Minor, semVer.Patch, node.Version.GetCommitHash(), node.Version.Timestamp, node.Version.GetRelease(),
|
||||
// args $19
|
||||
// args $18
|
||||
timestamp,
|
||||
)
|
||||
if err != nil {
|
||||
|
@ -46,7 +46,6 @@ type Service struct {
|
||||
version *checker.Service
|
||||
pingStats *contact.PingStats
|
||||
|
||||
allocatedBandwidth memory.Size
|
||||
allocatedDiskSpace memory.Size
|
||||
|
||||
walletAddress string
|
||||
@ -56,7 +55,7 @@ type Service struct {
|
||||
|
||||
// NewService returns new instance of Service.
|
||||
func NewService(log *zap.Logger, bandwidth bandwidth.DB, pieceStore *pieces.Store, version *checker.Service,
|
||||
allocatedBandwidth, allocatedDiskSpace memory.Size, walletAddress string, versionInfo version.Info, trust *trust.Pool,
|
||||
allocatedDiskSpace memory.Size, walletAddress string, versionInfo version.Info, trust *trust.Pool,
|
||||
reputationDB reputation.DB, storageUsageDB storageusage.DB, pingStats *contact.PingStats, contact *contact.Service) (*Service, error) {
|
||||
if log == nil {
|
||||
return nil, errs.New("log can't be nil")
|
||||
@ -91,7 +90,6 @@ func NewService(log *zap.Logger, bandwidth bandwidth.DB, pieceStore *pieces.Stor
|
||||
pieceStore: pieceStore,
|
||||
version: version,
|
||||
pingStats: pingStats,
|
||||
allocatedBandwidth: allocatedBandwidth,
|
||||
allocatedDiskSpace: allocatedDiskSpace,
|
||||
contact: contact,
|
||||
walletAddress: walletAddress,
|
||||
@ -177,8 +175,7 @@ func (s *Service) GetDashboardData(ctx context.Context) (_ *Dashboard, err error
|
||||
}
|
||||
|
||||
data.Bandwidth = BandwidthInfo{
|
||||
Used: bandwidthUsage,
|
||||
Available: s.allocatedBandwidth.Int64(),
|
||||
Used: bandwidthUsage,
|
||||
}
|
||||
|
||||
return data, nil
|
||||
|
@ -57,8 +57,7 @@ func TestNodeInfoUpdated(t *testing.T) {
|
||||
oldCapacity := oldInfo.Capacity
|
||||
|
||||
newCapacity := pb.NodeCapacity{
|
||||
FreeBandwidth: 0,
|
||||
FreeDisk: 0,
|
||||
FreeDisk: 0,
|
||||
}
|
||||
require.NotEqual(t, oldCapacity, newCapacity)
|
||||
node.Contact.Service.UpdateSelf(&newCapacity)
|
||||
|
@ -92,12 +92,11 @@ func (inspector *Endpoint) retrieveStats(ctx context.Context) (_ *pb.StatSummary
|
||||
totalUsedBandwidth := usage.Total()
|
||||
|
||||
return &pb.StatSummaryResponse{
|
||||
UsedSpace: piecesContentSize,
|
||||
AvailableSpace: inspector.pieceStoreConfig.AllocatedDiskSpace.Int64() - piecesContentSize,
|
||||
UsedIngress: ingress,
|
||||
UsedEgress: egress,
|
||||
UsedBandwidth: totalUsedBandwidth,
|
||||
AvailableBandwidth: inspector.pieceStoreConfig.AllocatedBandwidth.Int64() - totalUsedBandwidth,
|
||||
UsedSpace: piecesContentSize,
|
||||
AvailableSpace: inspector.pieceStoreConfig.AllocatedDiskSpace.Int64() - piecesContentSize,
|
||||
UsedIngress: ingress,
|
||||
UsedEgress: egress,
|
||||
UsedBandwidth: totalUsedBandwidth,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -26,7 +26,6 @@ func TestInspectorStats(t *testing.T) {
|
||||
Satellite: testplanet.ReconfigureRS(requiredShares, 3, 4, 5),
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
var availableBandwidth int64
|
||||
var availableSpace int64
|
||||
for _, storageNode := range planet.StorageNodes {
|
||||
response, err := storageNode.Storage2.Inspector.Stats(ctx, &pb.StatsRequest{})
|
||||
@ -36,11 +35,9 @@ func TestInspectorStats(t *testing.T) {
|
||||
assert.Zero(t, response.UsedSpace)
|
||||
assert.Zero(t, response.UsedEgress)
|
||||
assert.Zero(t, response.UsedIngress)
|
||||
assert.True(t, response.AvailableBandwidth > 0)
|
||||
assert.True(t, response.AvailableSpace > 0)
|
||||
|
||||
// assume that all storage node should have the same initial values
|
||||
availableBandwidth = response.AvailableBandwidth
|
||||
availableSpace = response.AvailableSpace
|
||||
}
|
||||
|
||||
@ -74,7 +71,6 @@ func TestInspectorStats(t *testing.T) {
|
||||
if response.UsedSpace > 0 {
|
||||
assert.NotZero(t, response.UsedBandwidth)
|
||||
assert.Equal(t, response.UsedBandwidth, response.UsedIngress+response.UsedEgress)
|
||||
assert.Equal(t, availableBandwidth-response.UsedBandwidth, response.AvailableBandwidth)
|
||||
assert.Equal(t, availableSpace-response.UsedSpace, response.AvailableSpace)
|
||||
|
||||
assert.Equal(t, response.UsedSpace, response.UsedBandwidth-response.UsedEgress)
|
||||
@ -84,8 +80,6 @@ func TestInspectorStats(t *testing.T) {
|
||||
}
|
||||
} else {
|
||||
assert.Zero(t, response.UsedSpace)
|
||||
// TODO track why this is failing
|
||||
//assert.Equal(t, availableBandwidth, response.AvailableBandwidth)
|
||||
assert.Equal(t, availableSpace, response.AvailableSpace)
|
||||
}
|
||||
}
|
||||
|
@ -31,7 +31,7 @@ var (
|
||||
type Config struct {
|
||||
Interval time.Duration `help:"how frequently Kademlia bucket should be refreshed with node stats" default:"1h0m0s"`
|
||||
MinimumDiskSpace memory.Size `help:"how much disk space a node at minimum has to advertise" default:"500GB"`
|
||||
MinimumBandwidth memory.Size `help:"how much bandwidth a node at minimum has to advertise" default:"500GB"`
|
||||
MinimumBandwidth memory.Size `help:"how much bandwidth a node at minimum has to advertise (deprecated)" default:"0TB"`
|
||||
NotifyLowDiskCooldown time.Duration `help:"minimum length of time between capacity reports" default:"10m" hidden:"true"`
|
||||
}
|
||||
|
||||
@ -44,23 +44,19 @@ type Service struct {
|
||||
contact *contact.Service
|
||||
usageDB bandwidth.DB
|
||||
allocatedDiskSpace int64
|
||||
allocatedBandwidth int64
|
||||
cooldown *sync2.Cooldown
|
||||
Loop *sync2.Cycle
|
||||
Config Config
|
||||
}
|
||||
|
||||
// TODO: should it be responsible for monitoring actual bandwidth as well?
|
||||
|
||||
// NewService creates a new storage node monitoring service.
|
||||
func NewService(log *zap.Logger, store *pieces.Store, contact *contact.Service, usageDB bandwidth.DB, allocatedDiskSpace, allocatedBandwidth int64, interval time.Duration, reportCapacity func(context.Context), config Config) *Service {
|
||||
func NewService(log *zap.Logger, store *pieces.Store, contact *contact.Service, usageDB bandwidth.DB, allocatedDiskSpace int64, interval time.Duration, reportCapacity func(context.Context), config Config) *Service {
|
||||
return &Service{
|
||||
log: log,
|
||||
store: store,
|
||||
contact: contact,
|
||||
usageDB: usageDB,
|
||||
allocatedDiskSpace: allocatedDiskSpace,
|
||||
allocatedBandwidth: allocatedBandwidth,
|
||||
cooldown: sync2.NewCooldown(config.NotifyLowDiskCooldown),
|
||||
Loop: sync2.NewCycle(interval),
|
||||
Config: config,
|
||||
@ -84,17 +80,6 @@ func (service *Service) Run(ctx context.Context) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
usedBandwidth, err := service.usedBandwidth(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if usedBandwidth > service.allocatedBandwidth {
|
||||
service.log.Warn("Exceed the allowed Bandwidth setting")
|
||||
} else {
|
||||
service.log.Info("Remaining Bandwidth", zap.Int64("bytes", service.allocatedBandwidth-usedBandwidth))
|
||||
}
|
||||
|
||||
// check your hard drive is big enough
|
||||
// first time setup as a piece node server
|
||||
if totalUsed == 0 && freeDiskSpace < service.allocatedDiskSpace {
|
||||
@ -121,13 +106,6 @@ func (service *Service) Run(ctx context.Context) (err error) {
|
||||
service.log.Error("Total disk space less than required minimum", zap.Int64("bytes", service.Config.MinimumDiskSpace.Int64()))
|
||||
return Error.New("disk space requirement not met")
|
||||
}
|
||||
|
||||
// Ensure the bandwidth is at least 500GB
|
||||
if service.allocatedBandwidth < service.Config.MinimumBandwidth.Int64() {
|
||||
service.log.Error("Total Bandwidth available less than required minimum", zap.Int64("bytes", service.Config.MinimumBandwidth.Int64()))
|
||||
return Error.New("bandwidth requirement not met")
|
||||
}
|
||||
|
||||
var group errgroup.Group
|
||||
group.Go(func() error {
|
||||
return service.Loop.Run(ctx, func(ctx context.Context) error {
|
||||
@ -175,14 +153,8 @@ func (service *Service) updateNodeInformation(ctx context.Context) (err error) {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
usedBandwidth, err := service.usedBandwidth(ctx)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
service.contact.UpdateSelf(&pb.NodeCapacity{
|
||||
FreeBandwidth: service.allocatedBandwidth - usedBandwidth,
|
||||
FreeDisk: service.allocatedDiskSpace - usedSpace,
|
||||
FreeDisk: service.allocatedDiskSpace - usedSpace,
|
||||
})
|
||||
|
||||
return nil
|
||||
@ -197,15 +169,6 @@ func (service *Service) usedSpace(ctx context.Context) (_ int64, err error) {
|
||||
return usedSpace, nil
|
||||
}
|
||||
|
||||
func (service *Service) usedBandwidth(ctx context.Context) (_ int64, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
usage, err := service.usageDB.MonthSummary(ctx, time.Now())
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return usage, nil
|
||||
}
|
||||
|
||||
// AvailableSpace returns available disk space for upload
|
||||
func (service *Service) AvailableSpace(ctx context.Context) (_ int64, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
@ -221,19 +184,3 @@ func (service *Service) AvailableSpace(ctx context.Context) (_ int64, err error)
|
||||
|
||||
return allocatedSpace - usedSpace, nil
|
||||
}
|
||||
|
||||
// AvailableBandwidth returns available bandwidth for upload/download
|
||||
func (service *Service) AvailableBandwidth(ctx context.Context) (_ int64, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
usage, err := service.usageDB.MonthSummary(ctx, time.Now())
|
||||
if err != nil {
|
||||
return 0, Error.Wrap(err)
|
||||
}
|
||||
allocatedBandwidth := service.allocatedBandwidth
|
||||
|
||||
mon.IntVal("allocated_bandwidth").Observe(allocatedBandwidth) //locked
|
||||
mon.IntVal("used_bandwidth").Observe(usage) //locked
|
||||
mon.IntVal("available_bandwidth").Observe(allocatedBandwidth - usage)
|
||||
|
||||
return allocatedBandwidth - usage, nil
|
||||
}
|
||||
|
@ -20,12 +20,8 @@ func TestMonitor(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
var freeBandwidthInit int64
|
||||
for _, storageNode := range planet.StorageNodes {
|
||||
storageNode.Storage2.Monitor.Loop.Pause()
|
||||
|
||||
// assume that all storage nodes have the same initial values
|
||||
freeBandwidthInit = storageNode.Local().Capacity.FreeBandwidth
|
||||
}
|
||||
|
||||
expectedData := testrand.Bytes(100 * memory.KiB)
|
||||
@ -36,12 +32,9 @@ func TestMonitor(t *testing.T) {
|
||||
nodeAssertions := 0
|
||||
for _, storageNode := range planet.StorageNodes {
|
||||
storageNode.Storage2.Monitor.Loop.TriggerWait()
|
||||
|
||||
freeBandwidthNew := storageNode.Local().Capacity.FreeBandwidth
|
||||
stats, err := storageNode.Storage2.Inspector.Stats(ctx, &pb.StatsRequest{})
|
||||
require.NoError(t, err)
|
||||
if stats.UsedSpace > 0 {
|
||||
assert.Equal(t, freeBandwidthInit-stats.UsedBandwidth, freeBandwidthNew)
|
||||
nodeAssertions++
|
||||
}
|
||||
}
|
||||
|
@ -407,7 +407,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
|
||||
peer.Contact.Service,
|
||||
peer.DB.Bandwidth(),
|
||||
config.Storage.AllocatedDiskSpace.Int64(),
|
||||
config.Storage.AllocatedBandwidth.Int64(),
|
||||
//TODO use config.Storage.Monitor.Interval, but for some reason is not set
|
||||
config.Storage.KBucketRefreshInterval,
|
||||
peer.Contact.Chore.Trigger,
|
||||
@ -515,7 +514,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
|
||||
peer.DB.Bandwidth(),
|
||||
peer.Storage2.Store,
|
||||
peer.Version.Service,
|
||||
config.Storage.AllocatedBandwidth,
|
||||
config.Storage.AllocatedDiskSpace,
|
||||
config.Operator.Wallet,
|
||||
versionInfo,
|
||||
|
@ -46,7 +46,7 @@ type OldConfig struct {
|
||||
Path string `help:"path to store data in" default:"$CONFDIR/storage"`
|
||||
WhitelistedSatellites storj.NodeURLs `help:"a comma-separated list of approved satellite node urls (unused)" devDefault:"" releaseDefault:""`
|
||||
AllocatedDiskSpace memory.Size `user:"true" help:"total allocated disk space in bytes" default:"1TB"`
|
||||
AllocatedBandwidth memory.Size `user:"true" help:"total allocated bandwidth in bytes" default:"2TB"`
|
||||
AllocatedBandwidth memory.Size `user:"true" help:"total allocated bandwidth in bytes (deprecated)" default:"0B"`
|
||||
KBucketRefreshInterval time.Duration `help:"how frequently Kademlia bucket should be refreshed with node stats" default:"1h0m0s"`
|
||||
}
|
||||
|
||||
@ -269,11 +269,6 @@ func (endpoint *Endpoint) doUpload(stream uploadStream, requestLimit int) (err e
|
||||
return err
|
||||
}
|
||||
|
||||
availableBandwidth, err := endpoint.monitor.AvailableBandwidth(ctx)
|
||||
if err != nil {
|
||||
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
||||
}
|
||||
|
||||
availableSpace, err := endpoint.monitor.AvailableSpace(ctx)
|
||||
if err != nil {
|
||||
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
||||
@ -325,7 +320,6 @@ func (endpoint *Endpoint) doUpload(stream uploadStream, requestLimit int) (err e
|
||||
zap.Stringer("Piece ID", limit.PieceId),
|
||||
zap.Stringer("Satellite ID", limit.SatelliteId),
|
||||
zap.Stringer("Action", limit.Action),
|
||||
zap.Int64("Available Bandwidth", availableBandwidth),
|
||||
zap.Int64("Available Space", availableSpace))
|
||||
|
||||
pieceWriter, err = endpoint.store.Writer(ctx, limit.SatelliteId, limit.PieceId)
|
||||
@ -395,10 +389,6 @@ func (endpoint *Endpoint) doUpload(stream uploadStream, requestLimit int) (err e
|
||||
largestOrder.Amount, pieceWriter.Size()+int64(len(message.Chunk.Data)))
|
||||
}
|
||||
|
||||
availableBandwidth -= chunkSize
|
||||
if availableBandwidth < 0 {
|
||||
return rpcstatus.Error(rpcstatus.Internal, "out of bandwidth")
|
||||
}
|
||||
availableSpace -= chunkSize
|
||||
if availableSpace < 0 {
|
||||
return rpcstatus.Error(rpcstatus.Internal, "out of space")
|
||||
@ -606,12 +596,6 @@ func (endpoint *Endpoint) doDownload(stream downloadStream) (err error) {
|
||||
chunk.Offset+chunk.ChunkSize, pieceReader.Size())
|
||||
}
|
||||
|
||||
availableBandwidth, err := endpoint.monitor.AvailableBandwidth(ctx)
|
||||
if err != nil {
|
||||
endpoint.log.Error("error getting available bandwidth", zap.Error(err))
|
||||
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
||||
}
|
||||
|
||||
throttle := sync2.NewThrottle()
|
||||
// TODO: see whether this can be implemented without a goroutine
|
||||
|
||||
@ -702,11 +686,6 @@ func (endpoint *Endpoint) doDownload(stream downloadStream) (err error) {
|
||||
}
|
||||
|
||||
chunkSize := message.Order.Amount - largestOrder.Amount
|
||||
availableBandwidth -= chunkSize
|
||||
if availableBandwidth < 0 {
|
||||
return rpcstatus.Error(rpcstatus.ResourceExhausted, "out of bandwidth")
|
||||
}
|
||||
|
||||
if err := throttle.Produce(chunkSize); err != nil {
|
||||
// shouldn't happen since only receiving side is calling Fail
|
||||
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
||||
|
@ -35,7 +35,6 @@ func TestOrderLimitPutValidation(t *testing.T) {
|
||||
pieceExpiration time.Duration
|
||||
orderExpiration time.Duration
|
||||
limit int64
|
||||
availableBandwidth int64
|
||||
availableSpace int64
|
||||
err string
|
||||
}{
|
||||
@ -99,17 +98,6 @@ func TestOrderLimitPutValidation(t *testing.T) {
|
||||
limit: memory.KiB.Int64(),
|
||||
err: "order expired:",
|
||||
},
|
||||
{
|
||||
testName: "allocated bandwidth limit",
|
||||
pieceID: storj.PieceID{7},
|
||||
action: pb.PieceAction_PUT,
|
||||
serialNumber: storj.SerialNumber{7},
|
||||
pieceExpiration: oneWeek,
|
||||
orderExpiration: oneWeek,
|
||||
limit: 10 * memory.KiB.Int64(),
|
||||
availableBandwidth: 5 * memory.KiB.Int64(),
|
||||
err: "out of bandwidth",
|
||||
},
|
||||
{
|
||||
testName: "allocated space limit",
|
||||
pieceID: storj.PieceID{8},
|
||||
@ -128,8 +116,6 @@ func TestOrderLimitPutValidation(t *testing.T) {
|
||||
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
|
||||
// set desirable bandwidth
|
||||
setBandwidth(ctx, t, planet, tt.availableBandwidth)
|
||||
// set desirable space
|
||||
setSpace(ctx, t, planet, tt.availableSpace)
|
||||
|
||||
@ -246,14 +232,14 @@ func TestOrderLimitGetValidation(t *testing.T) {
|
||||
limit int64
|
||||
err string
|
||||
}{
|
||||
{ // allocated bandwidth limit
|
||||
{ // incorrect action - PUT rather than GET
|
||||
pieceID: storj.PieceID{1},
|
||||
action: pb.PieceAction_GET,
|
||||
action: pb.PieceAction_PUT,
|
||||
serialNumber: storj.SerialNumber{1},
|
||||
pieceExpiration: oneWeek,
|
||||
orderExpiration: oneWeek,
|
||||
limit: 10 * memory.KiB.Int64(),
|
||||
err: "out of bandwidth",
|
||||
err: "expected get or get repair or audit action got PUT",
|
||||
},
|
||||
} {
|
||||
client, err := planet.Uplinks[0].DialPiecestore(ctx, planet.StorageNodes[0])
|
||||
@ -289,8 +275,8 @@ func TestOrderLimitGetValidation(t *testing.T) {
|
||||
closeErr := downloader.Close()
|
||||
err = errs.Combine(readErr, closeErr)
|
||||
if tt.err != "" {
|
||||
assert.Equal(t, 0, len(buffer))
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, 0, len(buffer)) //errors 10240
|
||||
require.Error(t, err) //nil
|
||||
require.Contains(t, err.Error(), tt.err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
@ -299,19 +285,6 @@ func TestOrderLimitGetValidation(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func setBandwidth(ctx context.Context, t *testing.T, planet *testplanet.Planet, bandwidth int64) {
|
||||
if bandwidth == 0 {
|
||||
return
|
||||
}
|
||||
for _, storageNode := range planet.StorageNodes {
|
||||
availableBandwidth, err := storageNode.Storage2.Monitor.AvailableBandwidth(ctx)
|
||||
require.NoError(t, err)
|
||||
diff := (bandwidth - availableBandwidth) * -1
|
||||
err = storageNode.DB.Bandwidth().Add(ctx, planet.Satellites[0].ID(), pb.PieceAction_GET, diff, time.Now())
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func setSpace(ctx context.Context, t *testing.T, planet *testplanet.Planet, space int64) {
|
||||
if space == 0 {
|
||||
return
|
||||
|
Loading…
Reference in New Issue
Block a user