all: remove grpc

It seems everyone has migrated to drpc.

Change-Id: Ica6b2d0bdef68c6603083f2963458843eca71e9e
This commit is contained in:
Egon Elbre 2020-05-06 13:54:14 +03:00
parent 4b612a7906
commit e6d5ce6b77
21 changed files with 22 additions and 548 deletions

View File

@ -45,8 +45,8 @@ func New(ctx context.Context, dialer rpc.Dialer, address string) (_ *Client, err
}, nil
}
// NewClientFrom creates a new certificate signing gRPC client from an existing
// grpc cert signing client.
// NewClientFrom creates a new certificate signing client from an existing
// cert signing client.
func NewClientFrom(client pb.DRPCCertificatesClient) *Client {
return &Client{
client: client,

View File

@ -25,7 +25,7 @@ type Endpoint struct {
minDifficulty uint16
}
// NewEndpoint creates a new certificate signing gRPC server.
// NewEndpoint creates a new certificate signing server.
func NewEndpoint(log *zap.Logger, ca *identity.FullCertificateAuthority, authorizationDB *authorization.DB, minDifficulty uint16) *Endpoint {
codeMap := errs2.CodeMap{
&authorization.ErrNotFound: rpcstatus.Unauthenticated,

View File

@ -15,7 +15,6 @@ import (
"storj.io/common/errs2"
"storj.io/common/identity"
"storj.io/common/pb"
"storj.io/common/pb/pbgrpc"
"storj.io/common/peertls/tlsopts"
"storj.io/storj/certificate/authorization"
"storj.io/storj/pkg/revocation"
@ -77,7 +76,7 @@ func New(log *zap.Logger, ident *identity.FullIdentity, ca *identity.FullCertifi
return nil, Error.Wrap(errs.Combine(err, peer.Close()))
}
peer.Server, err = server.New(log.Named("server"), tlsOptions, sc.Address, sc.PrivateAddress, nil)
peer.Server, err = server.New(log.Named("server"), tlsOptions, sc.Address, sc.PrivateAddress)
if err != nil {
return nil, Error.Wrap(err)
}
@ -86,7 +85,6 @@ func New(log *zap.Logger, ident *identity.FullIdentity, ca *identity.FullCertifi
peer.AuthorizationDB = authorizationDB
peer.Certificate.Endpoint = NewEndpoint(log.Named("certificate"), ca, authorizationDB, uint16(config.MinDifficulty))
pbgrpc.RegisterCertificatesServer(peer.Server.GRPC(), peer.Certificate.Endpoint)
if err := pb.DRPCRegisterCertificates(peer.Server.DRPC(), peer.Certificate.Endpoint); err != nil {
return nil, Error.Wrap(errs.Combine(err, peer.Close()))
}

View File

@ -41,7 +41,7 @@ var (
// ErrInspectorDial throws when there are errors dialing the inspector server
ErrInspectorDial = errs.Class("error dialing inspector server:")
// ErrRequest is for gRPC request errors after dialing
// ErrRequest is for request errors after dialing
ErrRequest = errs.Class("error processing request:")
// ErrIdentity is for errors during identity creation for this CLI
@ -124,7 +124,7 @@ type Inspector struct {
paymentsClient pb.DRPCPaymentsClient
}
// NewInspector creates a new gRPC inspector client for access to overlay.
// NewInspector creates a new inspector client for access to overlay.
func NewInspector(address, path string) (*Inspector, error) {
ctx := context.Background()

View File

@ -1,57 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"context"
"fmt"
"github.com/spf13/pflag"
"google.golang.org/grpc"
"storj.io/common/identity"
"storj.io/common/peertls/tlsopts"
"storj.io/private/cfgstruct"
"storj.io/storj/private/grpctlsopts"
)
var (
targetAddr = pflag.String("target", "satellite.staging.storj.io:7777", "address of target")
identityConfig identity.Config
)
func init() {
cfgstruct.Bind(pflag.CommandLine, &identityConfig, cfgstruct.UseDevDefaults(), cfgstruct.ConfDir("$HOME/.storj/gw"))
}
func main() {
ctx := context.Background()
pflag.Parse()
identity, err := identityConfig.Load()
if err != nil {
panic(err)
}
clientOptions, err := tlsopts.NewOptions(identity, tlsopts.Config{}, nil)
if err != nil {
panic(err)
}
dialOption := grpctlsopts.DialUnverifiedIDOption(clientOptions)
conn, err := grpc.Dial(*targetAddr, dialOption, grpc.WithInsecure())
if err != nil {
panic(err)
}
fmt.Println(conn.GetState())
err = conn.Invoke(ctx, "NonExistentMethod", nil, nil)
if err != nil && err.Error() != `rpc error: code = ResourceExhausted desc = malformed method name: "NonExistentMethod"` {
fmt.Println(err)
}
fmt.Println(conn.GetState())
err = conn.Close()
if err != nil {
fmt.Println(err)
}
}

4
go.mod
View File

@ -11,19 +11,16 @@ require (
github.com/fatih/color v1.7.0
github.com/go-redis/redis v6.14.1+incompatible
github.com/golang-migrate/migrate/v4 v4.7.0
github.com/gomodule/redigo v2.0.0+incompatible // indirect
github.com/google/go-cmp v0.4.0
github.com/gorilla/mux v1.7.1
github.com/gorilla/schema v1.1.0
github.com/graphql-go/graphql v0.7.9
github.com/lib/pq v1.3.0
github.com/mattn/go-isatty v0.0.9 // indirect
github.com/mattn/go-sqlite3 v2.0.3+incompatible
github.com/minio/sha256-simd v0.1.1 // indirect
github.com/nsf/jsondiff v0.0.0-20160203110537-7de28ed2b6e3
github.com/nsf/termbox-go v0.0.0-20190121233118-02980233997d
github.com/shopspring/decimal v0.0.0-20200105231215-408a2507e114
github.com/sirupsen/logrus v1.4.2 // indirect
github.com/spacemonkeygo/monkit/v3 v3.0.6
github.com/spf13/cast v1.3.0
github.com/spf13/cobra v0.0.6
@ -39,7 +36,6 @@ require (
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
google.golang.org/grpc v1.28.0
storj.io/common v0.0.0-20200508071628-78c867902ac1
storj.io/drpc v0.0.12
storj.io/monkit-jaeger v0.0.0-20200424180155-d5f5530ea079

18
go.sum
View File

@ -72,7 +72,6 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudfoundry/gosigar v1.1.0 h1:V/dVCzhKOdIU3WRB5inQU20s4yIgL9Dxx/Mhi0SF8eM=
github.com/cloudfoundry/gosigar v1.1.0/go.mod h1:3qLfc2GlfmwOx2+ZDaRGH3Y9fwQ0sQeaAleo2GV5pH0=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I=
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/cockroachdb/cockroach-go v0.0.0-20181001143604-e0a95dfd547c/go.mod h1:XGLbWH/ujMcbPbhZq52Nv6UrCghb1yGn//133kEsvDk=
@ -119,9 +118,7 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/edsrzf/mmap-go v0.0.0-20170320065105-0bce6a688712/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
@ -159,15 +156,12 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.3 h1:gyjaxf+svBWX08ZjK86iN9geUJF0H6gp2IRKX6Nf6/I=
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gomodule/redigo v1.7.1-0.20190322064113-39e2c31b7ca3 h1:6amM4HsNPOvMLVc2ZnyqrjeQ92YAVWn7T4WBKK87inY=
github.com/gomodule/redigo v1.7.1-0.20190322064113-39e2c31b7ca3/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0=
github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
@ -251,9 +245,8 @@ github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDe
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU=
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.9 h1:d5US/mDsogSGW37IV293h//ZFaeajb69h+EHFsv2xGg=
github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ=
github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/p7Y=
github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
@ -330,8 +323,6 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeV
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/spacemonkeygo/errors v0.0.0-20171212215202-9064522e9fd1 h1:xHQewZjohU9/wUsyC99navCjQDNHtTgUOM/J1jAbzfw=
github.com/spacemonkeygo/errors v0.0.0-20171212215202-9064522e9fd1/go.mod h1:7NL9UAYQnRM5iKHUCld3tf02fKb5Dft+41+VckASUy0=
@ -510,13 +501,11 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190426135247-a129542de9ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191210023423-ac6580df4449/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200107144601-ef85f5a75ddf/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -593,13 +582,10 @@ google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiq
google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.27.1 h1:zvIju4sqAGvwKspUQOhwnpcqSbzi7/H6QomNNjTL4sk=
google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.28.0 h1:bO/TA4OxCOummhSf10siHuG7vJOiwh7SpRpFZDkOgl4=
google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=

View File

@ -1,56 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package grpcauth
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"storj.io/storj/pkg/auth"
)
// NewAPIKeyInterceptor creates instance of apikey interceptor
func NewAPIKeyInterceptor() grpc.UnaryServerInterceptor {
return InterceptAPIKey
}
// InterceptAPIKey reads apikey from requests and puts the value into the context.
func InterceptAPIKey(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return handler(ctx, req)
}
apikeys, ok := md["apikey"]
if !ok || len(apikeys) == 0 {
return handler(ctx, req)
}
return handler(auth.WithAPIKey(ctx, []byte(apikeys[0])), req)
}
// DeprecatedAPIKeyCredentials implements grpc/credentials.PerRPCCredentials
// for authenticating with the grpc server. This does not work with drpc.
type DeprecatedAPIKeyCredentials struct {
value string
}
// NewDeprecatedAPIKeyCredentials returns a new DeprecatedAPIKeyCredentials
func NewDeprecatedAPIKeyCredentials(apikey string) *DeprecatedAPIKeyCredentials {
return &DeprecatedAPIKeyCredentials{apikey}
}
// GetRequestMetadata gets the current request metadata, refreshing tokens if required.
func (creds *DeprecatedAPIKeyCredentials) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"apikey": creds.value,
}, nil
}
// RequireTransportSecurity indicates whether the credentials requires transport security.
func (creds *DeprecatedAPIKeyCredentials) RequireTransportSecurity() bool {
return false // Deprecated anyway, but how was this the right choice?
}

View File

@ -1,89 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package grpcauth_test
import (
"context"
"net"
"testing"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
pb "google.golang.org/grpc/examples/helloworld/helloworld"
"google.golang.org/grpc/status"
"storj.io/common/errs2"
"storj.io/common/testcontext"
"storj.io/storj/pkg/auth"
"storj.io/storj/pkg/auth/grpcauth"
)
func TestAPIKey(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
// listener is closed in server.Stop() internally
server := grpc.NewServer(
grpc.UnaryInterceptor(grpcauth.InterceptAPIKey),
)
defer server.Stop()
pb.RegisterGreeterServer(server, &helloServer{})
ctx.Go(func() error {
err := errs2.IgnoreCanceled(server.Serve(listener))
t.Log(err)
return err
})
type testcase struct {
apikey string
expected codes.Code
}
for _, test := range []testcase{
{"", codes.Unauthenticated},
{"wrong key", codes.Unauthenticated},
{"good key", codes.OK},
} {
conn, err := grpc.DialContext(ctx, listener.Addr().String(),
grpc.WithPerRPCCredentials(grpcauth.NewDeprecatedAPIKeyCredentials(test.apikey)),
grpc.WithBlock(),
grpc.WithInsecure(),
)
require.NoError(t, err)
client := pb.NewGreeterClient(conn)
response, err := client.SayHello(ctx, &pb.HelloRequest{Name: "Me"})
if test.expected == codes.OK {
require.NoError(t, err)
require.Equal(t, "Hello Me", response.Message)
} else {
require.Error(t, err)
require.Equal(t, test.expected, status.Code(err))
}
require.NoError(t, conn.Close())
}
}
type helloServer struct{}
// SayHello implements helloworld.GreeterServer
func (s *helloServer) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
key, ok := auth.GetAPIKey(ctx)
if !ok {
return nil, status.Errorf(codes.Unauthenticated, "Invalid API credentials")
}
if string(key) != "good key" {
return nil, status.Errorf(codes.Unauthenticated, "Invalid API credentials")
}
return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}

View File

@ -1,126 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package server
import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"storj.io/common/identity"
"storj.io/common/rpc/rpcpeer"
"storj.io/storj/pkg/auth"
"storj.io/storj/pkg/macaroon"
"storj.io/storj/storage"
)
func (server *Server) monkitStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
mon.IntVal("grpc_stream").Observe(1)
return handler(srv, ss)
}
func (server *Server) monkitUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
mon.IntVal("grpc_call").Observe(1)
return handler(ctx, req)
}
func (server *Server) logOnErrorStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
err = handler(srv, ss)
if err != nil {
// no zap errors for canceled or wrong file downloads
if storage.ErrKeyNotFound.Has(err) ||
status.Code(err) == codes.Canceled ||
status.Code(err) == codes.Unavailable ||
err == io.EOF {
return err
}
server.log.Error("gRPC stream error response", zap.Error(err))
}
return err
}
func (server *Server) logOnErrorUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
resp, err = handler(ctx, req)
if err != nil {
// no zap errors for wrong file downloads
if status.Code(err) == codes.NotFound {
return resp, err
}
server.log.Error("gRPC unary error response", zap.Error(err))
}
return resp, err
}
type nodeRequestLog struct {
GRPCService string `json:"grpc_service"`
GRPCMethod string `json:"grpc_method"`
PeerAddress string `json:"peer_address"`
PeerNodeID string `json:"peer_node_id"`
APIHead string `json:"api_head,omitempty"`
Msg interface{} `json:"msg"`
}
func prepareRequestLog(ctx context.Context, req, server interface{}, methodName string) ([]byte, error) {
reqLog := nodeRequestLog{
GRPCService: fmt.Sprintf("%T", server),
GRPCMethod: methodName,
PeerAddress: "<no peer???>",
APIHead: "",
Msg: req,
}
if peer, err := rpcpeer.FromContext(ctx); err == nil {
reqLog.PeerAddress = peer.Addr.String()
if peerIdentity, err := identity.PeerIdentityFromPeer(peer); err == nil {
reqLog.PeerNodeID = peerIdentity.ID.String()
} else {
reqLog.PeerNodeID = fmt.Sprintf("<no peer id: %v>", err)
}
}
if apikey, ok := auth.GetAPIKey(ctx); ok {
key, err := macaroon.ParseAPIKey(string(apikey))
if err == nil {
reqLog.APIHead = hex.EncodeToString(key.Head())
}
}
return json.Marshal(reqLog)
}
// UnaryMessageLoggingInterceptor creates a UnaryServerInterceptor which
// logs the full contents of incoming unary requests.
func UnaryMessageLoggingInterceptor(log *zap.Logger) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
if jsonReq, err := prepareRequestLog(ctx, req, info.Server, info.FullMethod); err == nil {
log.Info(string(jsonReq))
} else {
log.Error("Failed to marshal request to JSON.",
zap.String("method", info.FullMethod), zap.Error(err),
)
}
return handler(ctx, req)
}
}
// StreamMessageLoggingInterceptor creates a StreamServerInterceptor which
// logs the full contents of incoming streaming requests.
func StreamMessageLoggingInterceptor(log *zap.Logger) grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// Are we even using any of these yet? I'm only guessing at how best to pass in things
// so that they make sense.
if jsonReq, err := prepareRequestLog(ss.Context(), srv, nil, info.FullMethod); err == nil {
log.Info(string(jsonReq))
} else {
log.Error("Failed to marshal request to JSON.",
zap.String("method", info.FullMethod), zap.Error(err),
)
}
return handler(srv, ss)
}
}

View File

@ -12,7 +12,6 @@ import (
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"storj.io/common/identity"
"storj.io/common/peertls/tlsopts"
@ -22,7 +21,6 @@ import (
"storj.io/drpc/drpcserver"
jaeger "storj.io/monkit-jaeger"
"storj.io/storj/pkg/listenmux"
"storj.io/storj/private/grpctlsopts"
)
// Config holds server specific configuration parameters
@ -36,14 +34,12 @@ type Config struct {
type public struct {
listener net.Listener
drpc *drpcserver.Server
grpc *grpc.Server
mux *drpcmux.Mux
}
type private struct {
listener net.Listener
drpc *drpcserver.Server
grpc *grpc.Server
mux *drpcmux.Mux
}
@ -63,7 +59,7 @@ type Server struct {
// New creates a Server out of an Identity, a net.Listener,
// and interceptors.
func New(log *zap.Logger, tlsOptions *tlsopts.Options, publicAddr, privateAddr string, interceptors ...grpc.UnaryServerInterceptor) (*Server, error) {
func New(log *zap.Logger, tlsOptions *tlsopts.Options, publicAddr, privateAddr string) (*Server, error) {
server := &Server{
log: log,
tlsOptions: tlsOptions,
@ -74,17 +70,6 @@ func New(log *zap.Logger, tlsOptions *tlsopts.Options, publicAddr, privateAddr s
Manager: rpc.NewDefaultManagerOptions(),
}
unaryInterceptors := []grpc.UnaryServerInterceptor{
server.monkitUnaryInterceptor,
server.logOnErrorUnaryInterceptor,
}
for _, interceptor := range interceptors {
if interceptor == nil {
continue
}
unaryInterceptors = append(unaryInterceptors, interceptor)
}
publicListener, err := net.Listen("tcp", publicAddr)
if err != nil {
return nil, err
@ -95,15 +80,7 @@ func New(log *zap.Logger, tlsOptions *tlsopts.Options, publicAddr, privateAddr s
server.public = public{
listener: wrapListener(publicListener),
drpc: drpcserver.NewWithOptions(publicTracingHandler, serverOptions),
grpc: grpc.NewServer(
grpc.ChainStreamInterceptor(
server.logOnErrorStreamInterceptor,
server.monkitStreamInterceptor,
),
grpc.ChainUnaryInterceptor(unaryInterceptors...),
grpctlsopts.ServerOption(tlsOptions),
),
mux: publicMux,
mux: publicMux,
}
privateListener, err := net.Listen("tcp", privateAddr)
@ -115,7 +92,6 @@ func New(log *zap.Logger, tlsOptions *tlsopts.Options, publicAddr, privateAddr s
server.private = private{
listener: wrapListener(privateListener),
drpc: drpcserver.NewWithOptions(privateTracingHandler, serverOptions),
grpc: grpc.NewServer(),
mux: privateMux,
}
@ -131,15 +107,9 @@ func (p *Server) Addr() net.Addr { return p.public.listener.Addr() }
// PrivateAddr returns the server's private listener address
func (p *Server) PrivateAddr() net.Addr { return p.private.listener.Addr() }
// GRPC returns the server's gRPC handle for registration purposes
func (p *Server) GRPC() *grpc.Server { return p.public.grpc }
// DRPC returns the server's dRPC mux for registration purposes
func (p *Server) DRPC() *drpcmux.Mux { return p.public.mux }
// PrivateGRPC returns the server's gRPC handle for registration purposes
func (p *Server) PrivateGRPC() *grpc.Server { return p.private.grpc }
// PrivateDRPC returns the server's dRPC mux for registration purposes
func (p *Server) PrivateDRPC() *drpcmux.Mux { return p.private.mux }
@ -192,7 +162,7 @@ func (p *Server) Run(ctx context.Context) (err error) {
privateDRPCListener := privateMux.Route(drpcHeader)
// We need a new context chain because we require this context to be
// canceled only after all of the upcoming grpc/drpc servers have
// canceled only after all of the upcoming drpc servers have
// fully exited. The reason why is because Run closes listener for
// the mux when it exits, and we can only do that after all of the
// Servers are no longer accepting.
@ -219,24 +189,13 @@ func (p *Server) Run(ctx context.Context) (err error) {
case <-ctx.Done():
}
p.public.grpc.GracefulStop()
p.private.grpc.GracefulStop()
return nil
})
group.Go(func() error {
defer cancel()
return p.public.grpc.Serve(publicMux.Default())
})
group.Go(func() error {
defer cancel()
return p.public.drpc.Serve(ctx, publicDRPCListener)
})
group.Go(func() error {
defer cancel()
return p.private.grpc.Serve(privateMux.Default())
})
group.Go(func() error {
defer cancel()
return p.private.drpc.Serve(ctx, privateDRPCListener)

View File

@ -1,35 +0,0 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package grpctlsopts
import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"storj.io/common/peertls/tlsopts"
"storj.io/common/storj"
)
// ServerOption returns a grpc `ServerOption` for incoming connections
// to the node with this full identity.
func ServerOption(opts *tlsopts.Options) grpc.ServerOption {
tlsConfig := opts.ServerTLSConfig()
return grpc.Creds(credentials.NewTLS(tlsConfig))
}
// DialOption returns a grpc `DialOption` for making outgoing connections
// to the node with this peer identity.
func DialOption(opts *tlsopts.Options, id storj.NodeID) (grpc.DialOption, error) {
if id.IsZero() {
return nil, tlsopts.Error.New("no ID specified for DialOption")
}
tlsConfig := opts.ClientTLSConfig(id)
return grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), nil
}
// DialUnverifiedIDOption returns a grpc `DialUnverifiedIDOption`
func DialUnverifiedIDOption(opts *tlsopts.Options) grpc.DialOption {
tlsConfig := opts.UnverifiedClientTLSConfig()
return grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
}

View File

@ -1,42 +0,0 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package grpctlsopts_test
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"storj.io/common/identity"
"storj.io/common/identity/testidentity"
"storj.io/common/peertls/tlsopts"
"storj.io/common/storj"
"storj.io/storj/private/grpctlsopts"
)
func TestOptions_DialOption_error_on_empty_ID(t *testing.T) {
testidentity.CompleteIdentityVersionsTest(t, func(t *testing.T, version storj.IDVersion, ident *identity.FullIdentity) {
tlsOptions, err := tlsopts.NewOptions(ident, tlsopts.Config{
PeerIDVersions: "*",
}, nil)
require.NoError(t, err)
dialOption, err := grpctlsopts.DialOption(tlsOptions, storj.NodeID{})
assert.Nil(t, dialOption)
assert.Error(t, err)
})
}
func TestOptions_DialUnverifiedIDOption(t *testing.T) {
testidentity.CompleteIdentityVersionsTest(t, func(t *testing.T, version storj.IDVersion, ident *identity.FullIdentity) {
tlsOptions, err := tlsopts.NewOptions(ident, tlsopts.Config{
PeerIDVersions: "*",
}, nil)
require.NoError(t, err)
dialOption := grpctlsopts.DialUnverifiedIDOption(tlsOptions)
assert.NotNil(t, dialOption)
})
}

View File

@ -62,7 +62,7 @@ func (planet *Planet) newReferralManager() (*server.Server, error) {
return nil, err
}
referralmanager, err := server.New(log, tlsOptions, config.Address, config.PrivateAddress, nil)
referralmanager, err := server.New(log, tlsOptions, config.Address, config.PrivateAddress)
if err != nil {
return nil, err
}

View File

@ -214,7 +214,7 @@ func TestDownloadFromUnresponsiveNode(t *testing.T) {
tlsOptions, err := tlsopts.NewOptions(storageNode.Identity, tlscfg, revocationDB)
require.NoError(t, err)
server, err := server.New(storageNode.Log.Named("mock-server"), tlsOptions, storageNode.Addr(), storageNode.PrivateAddr(), nil)
server, err := server.New(storageNode.Log.Named("mock-server"), tlsOptions, storageNode.Addr(), storageNode.PrivateAddr())
require.NoError(t, err)
err = pb.DRPCRegisterPiecestore(server.DRPC(), &piecestoreMock{})

View File

@ -15,11 +15,9 @@ import (
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"storj.io/common/identity"
"storj.io/common/pb"
"storj.io/common/pb/pbgrpc"
"storj.io/common/peertls/extensions"
"storj.io/common/peertls/tlsopts"
"storj.io/common/rpc"
@ -27,7 +25,6 @@ import (
"storj.io/common/storj"
"storj.io/private/debug"
"storj.io/private/version"
"storj.io/storj/pkg/auth/grpcauth"
"storj.io/storj/pkg/server"
"storj.io/storj/private/lifecycle"
"storj.io/storj/private/post"
@ -231,13 +228,7 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
peer.Dialer = rpc.NewDefaultDialer(tlsOptions)
apiKeyInterceptor := grpcauth.NewAPIKeyInterceptor()
var loggingInterceptor grpc.UnaryServerInterceptor
if sc.DebugLogTraffic {
loggingInterceptor = server.UnaryMessageLoggingInterceptor(log)
}
peer.Server, err = server.New(log.Named("server"), tlsOptions, sc.Address, sc.PrivateAddress, apiKeyInterceptor, loggingInterceptor)
peer.Server, err = server.New(log.Named("server"), tlsOptions, sc.Address, sc.PrivateAddress)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
@ -265,7 +256,6 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
})
peer.Overlay.Inspector = overlay.NewInspector(peer.Overlay.Service)
pbgrpc.RegisterOverlayInspectorServer(peer.Server.PrivateGRPC(), peer.Overlay.Inspector)
if err := pb.DRPCRegisterOverlayInspector(peer.Server.PrivateDRPC(), peer.Overlay.Inspector); err != nil {
return nil, errs.Combine(err, peer.Close())
}
@ -294,7 +284,6 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
}
peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self, peer.Overlay.Service, peer.DB.PeerIdentities(), peer.Dialer, config.Contact.Timeout)
peer.Contact.Endpoint = contact.NewEndpoint(peer.Log.Named("contact:endpoint"), peer.Contact.Service)
pbgrpc.RegisterNodeServer(peer.Server.GRPC(), peer.Contact.Endpoint)
if err := pb.DRPCRegisterNode(peer.Server.DRPC(), peer.Contact.Endpoint); err != nil {
return nil, errs.Combine(err, peer.Close())
}
@ -306,7 +295,6 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
}
{ // setup vouchers
pbgrpc.RegisterVouchersServer(peer.Server.GRPC(), peer.Vouchers.Endpoint)
if err := pb.DRPCRegisterVouchers(peer.Server.DRPC(), peer.Vouchers.Endpoint); err != nil {
return nil, errs.Combine(err, peer.Close())
}
@ -355,7 +343,6 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
config.Repairer.MaxExcessRateOptimalThreshold,
config.Orders.NodeStatusLogging,
)
pbgrpc.RegisterOrdersServer(peer.Server.GRPC(), peer.Orders.Endpoint)
if err := pb.DRPCRegisterOrders(peer.Server.DRPC(), peer.Orders.Endpoint.DRPC()); err != nil {
return nil, errs.Combine(err, peer.Close())
}
@ -435,7 +422,6 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
return nil, errs.Combine(err, peer.Close())
}
pbgrpc.RegisterMetainfoServer(peer.Server.GRPC(), peer.Metainfo.Endpoint2)
if err := pb.DRPCRegisterMetainfo(peer.Server.DRPC(), peer.Metainfo.Endpoint2); err != nil {
return nil, errs.Combine(err, peer.Close())
}
@ -448,7 +434,6 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
{ // setup datarepair
peer.Repair.Inspector = irreparable.NewInspector(peer.DB.Irreparable())
pbgrpc.RegisterIrreparableInspectorServer(peer.Server.PrivateGRPC(), peer.Repair.Inspector)
if err := pb.DRPCRegisterIrreparableInspector(peer.Server.PrivateDRPC(), peer.Repair.Inspector); err != nil {
return nil, errs.Combine(err, peer.Close())
}
@ -460,7 +445,6 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
peer.Overlay.Service,
peer.Metainfo.Service,
)
pbgrpc.RegisterHealthInspectorServer(peer.Server.PrivateGRPC(), peer.Inspector.Endpoint)
if err := pb.DRPCRegisterHealthInspector(peer.Server.PrivateDRPC(), peer.Inspector.Endpoint); err != nil {
return nil, errs.Combine(err, peer.Close())
}
@ -571,7 +555,6 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
service,
pc.StripeCoinPayments.ConversionRatesCycleInterval)
pbgrpc.RegisterPaymentsServer(peer.Server.PrivateGRPC(), peer.Payments.Inspector)
if err := pb.DRPCRegisterPayments(peer.Server.PrivateDRPC(), peer.Payments.Inspector); err != nil {
return nil, errs.Combine(err, peer.Close())
}
@ -643,7 +626,6 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
peer.DB.StoragenodeAccounting(),
config.Payments,
)
pbgrpc.RegisterNodeStatsServer(peer.Server.GRPC(), peer.NodeStats.Endpoint)
if err := pb.DRPCRegisterNodeStats(peer.Server.DRPC(), peer.NodeStats.Endpoint); err != nil {
return nil, errs.Combine(err, peer.Close())
}
@ -659,7 +641,6 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
peer.DB.StoragenodeAccounting(),
peer.Overlay.DB,
peer.HeldAmount.Service)
pbgrpc.RegisterHeldAmountServer(peer.Server.GRPC(), peer.HeldAmount.Endpoint)
if err := pb.DRPCRegisterHeldAmount(peer.Server.DRPC(), peer.HeldAmount.Endpoint); err != nil {
return nil, errs.Combine(err, peer.Close())
}
@ -678,7 +659,6 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
peer.DB.PeerIdentities(),
config.GracefulExit)
pbgrpc.RegisterSatelliteGracefulExitServer(peer.Server.GRPC(), peer.GracefulExit.Endpoint)
if err := pb.DRPCRegisterSatelliteGracefulExit(peer.Server.DRPC(), peer.GracefulExit.Endpoint.DRPC()); err != nil {
return nil, errs.Combine(err, peer.Close())
}

View File

@ -16,7 +16,6 @@ import (
"storj.io/common/errs2"
"storj.io/common/identity"
"storj.io/common/pb"
"storj.io/common/pb/pbgrpc"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/signing"
"storj.io/common/storj"
@ -121,11 +120,6 @@ func NewEndpoint(log *zap.Logger, signer signing.Signer, db DB, overlaydb overla
}
}
// Process is called by storage nodes to receive pieces to transfer to new nodes and get exit status.
func (endpoint *Endpoint) Process(stream pbgrpc.SatelliteGracefulExit_ProcessServer) (err error) {
return endpoint.doProcess(stream)
}
// Process is called by storage nodes to receive pieces to transfer to new nodes and get exit status.
func (endpoint *drpcEndpoint) Process(stream pb.DRPCSatelliteGracefulExit_ProcessStream) error {
return endpoint.doProcess(stream)

View File

@ -16,7 +16,6 @@ import (
"storj.io/common/identity"
"storj.io/common/pb"
"storj.io/common/pb/pbgrpc"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/signing"
"storj.io/common/storj"
@ -232,11 +231,6 @@ func monitoredSettlementStreamSend(ctx context.Context, stream settlementStream,
return stream.Send(resp)
}
// Settlement receives orders and handles them in batches
func (endpoint *Endpoint) Settlement(stream pbgrpc.Orders_SettlementServer) (err error) {
return endpoint.doSettlement(stream)
}
// Settlement receives orders and handles them in batches
func (endpoint *drpcEndpoint) Settlement(stream pb.DRPCOrders_SettlementStream) (err error) {
return endpoint.doSettlement(stream)

View File

@ -19,7 +19,6 @@ import (
"storj.io/common/identity"
"storj.io/common/pb"
"storj.io/common/pb/pbgrpc"
"storj.io/common/peertls/extensions"
"storj.io/common/peertls/tlsopts"
"storj.io/common/rpc"
@ -329,7 +328,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
peer.Dialer = rpc.NewDefaultDialer(tlsOptions)
peer.Server, err = server.New(log.Named("server"), tlsOptions, sc.Address, sc.PrivateAddress, nil)
peer.Server, err = server.New(log.Named("server"), tlsOptions, sc.Address, sc.PrivateAddress)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
@ -398,7 +397,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
})
peer.Contact.Endpoint = contact.NewEndpoint(peer.Log.Named("contact:endpoint"), peer.Contact.PingStats)
pbgrpc.RegisterContactServer(peer.Server.GRPC(), peer.Contact.Endpoint)
if err := pb.DRPCRegisterContact(peer.Server.DRPC(), peer.Contact.Endpoint); err != nil {
return nil, errs.Combine(err, peer.Close())
}
@ -498,7 +496,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
return nil, errs.Combine(err, peer.Close())
}
pbgrpc.RegisterPiecestoreServer(peer.Server.GRPC(), peer.Storage2.Endpoint)
if err := pb.DRPCRegisterPiecestore(peer.Server.DRPC(), peer.Storage2.Endpoint.DRPC()); err != nil {
return nil, errs.Combine(err, peer.Close())
}
@ -631,7 +628,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
peer.Console.Listener.Addr(),
config.Contact.ExternalAddress,
)
pbgrpc.RegisterPieceStoreInspectorServer(peer.Server.PrivateGRPC(), peer.Storage2.Inspector)
if err := pb.DRPCRegisterPieceStoreInspector(peer.Server.PrivateDRPC(), peer.Storage2.Inspector); err != nil {
return nil, errs.Combine(err, peer.Close())
}
@ -644,7 +640,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
peer.DB.Satellites(),
peer.Storage2.BlobsCache,
)
pbgrpc.RegisterNodeGracefulExitServer(peer.Server.PrivateGRPC(), peer.GracefulExit.Endpoint)
if err := pb.DRPCRegisterNodeGracefulExit(peer.Server.PrivateDRPC(), peer.GracefulExit.Endpoint); err != nil {
return nil, errs.Combine(err, peer.Close())
}

View File

@ -22,7 +22,6 @@ import (
"storj.io/common/identity"
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/pb/pbgrpc"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/rpc/rpctimeout"
"storj.io/common/signing"
@ -40,8 +39,6 @@ var (
mon = monkit.Package()
)
var _ pbgrpc.PiecestoreServer = (*Endpoint)(nil)
// OldConfig contains everything necessary for a server
type OldConfig struct {
Path string `help:"path to store data in" default:"$CONFDIR/storage"`
@ -78,9 +75,8 @@ type pingStatsSource interface {
//
// architecture: Endpoint
type Endpoint struct {
log *zap.Logger
config Config
grpcReqLimit int
log *zap.Logger
config Config
signer signing.Signer
trust *trust.Pool
@ -109,17 +105,9 @@ func (endpoint *Endpoint) DRPC() pb.DRPCPiecestoreServer { return &drpcEndpoint{
// NewEndpoint creates a new piecestore endpoint.
func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, monitor *monitor.Service, retain *retain.Service, pingStats pingStatsSource, store *pieces.Store, pieceDeleter *pieces.Deleter, orders orders.DB, usage bandwidth.DB, usedSerials UsedSerials, config Config) (*Endpoint, error) {
// If config.MaxConcurrentRequests is set we want to repsect it for grpc.
// However, if it is 0 (unlimited) we force a limit.
grpcReqLimit := config.MaxConcurrentRequests
if grpcReqLimit <= 0 {
grpcReqLimit = 7
}
return &Endpoint{
log: log,
config: config,
grpcReqLimit: grpcReqLimit,
log: log,
config: config,
signer: signer,
trust: trust,
@ -198,11 +186,6 @@ func (endpoint *Endpoint) DeletePieces(
}, nil
}
// Upload handles uploading a piece on piece store.
func (endpoint *Endpoint) Upload(stream pbgrpc.Piecestore_UploadServer) (err error) {
return endpoint.doUpload(stream, endpoint.grpcReqLimit)
}
// Upload handles uploading a piece on piece store.
func (endpoint *drpcEndpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err error) {
return endpoint.doUpload(stream, endpoint.config.MaxConcurrentRequests)
@ -453,11 +436,6 @@ func (endpoint *Endpoint) doUpload(stream uploadStream, requestLimit int) (err e
}
}
// Download handles Downloading a piece on piece store.
func (endpoint *Endpoint) Download(stream pbgrpc.Piecestore_DownloadServer) (err error) {
return endpoint.doDownload(stream)
}
// Download handles Downloading a piece on piece store.
func (endpoint *drpcEndpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err error) {
return endpoint.doDownload(stream)

View File

@ -16,7 +16,6 @@ import (
"storj.io/common/identity/testidentity"
"storj.io/common/pb"
"storj.io/common/pb/pbgrpc"
"storj.io/common/peertls/extensions"
"storj.io/common/peertls/tlsopts"
"storj.io/common/rpc"
@ -30,7 +29,7 @@ import (
type mockServer struct {
localTime time.Time
pbgrpc.NodeServer
pb.DRPCNodeServer
}
func TestLocalTime_InSync(t *testing.T) {
@ -77,7 +76,7 @@ func TestLocalTime_OutOfSync(t *testing.T) {
var group errgroup.Group
defer ctx.Check(group.Wait)
contactServer, err := server.New(log, mockSatTLSOptions, config.Address, config.PrivateAddress, nil)
contactServer, err := server.New(log, mockSatTLSOptions, config.Address, config.PrivateAddress)
require.NoError(t, err)
defer ctx.Check(contactServer.Close)
@ -133,7 +132,7 @@ func TestLocalTime_OutOfSync(t *testing.T) {
var group errgroup.Group
defer ctx.Check(group.Wait)
contactServer, err := server.New(log, mockSatTLSOptions, config.Address, config.PrivateAddress, nil)
contactServer, err := server.New(log, mockSatTLSOptions, config.Address, config.PrivateAddress)
require.NoError(t, err)
defer ctx.Check(contactServer.Close)