satellite/analytics: replace hubspot api with oauth access token
Hubspot is migrating from using API keys for authentication to OAuth. This change migrates our Hubspot integration to use OAuth tokens. It modifies the EnqueueCreateUser code to not send empty HubspotUTK to hubspot, and to return error for failed requests. see: https://developers.hubspot.com/changelog/upcoming-api-key-sunset Change-Id: I422f00e3e3caeff3ff3d08ddec059502b9addaee
This commit is contained in:
parent
ec6a79af85
commit
9a303b00bf
@ -11,9 +11,11 @@ import (
|
|||||||
"net/url"
|
"net/url"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/spacemonkeygo/monkit/v3"
|
"github.com/spacemonkeygo/monkit/v3"
|
||||||
|
"github.com/zeebo/errs"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"storj.io/common/sync2"
|
"storj.io/common/sync2"
|
||||||
@ -22,12 +24,16 @@ import (
|
|||||||
var mon = monkit.Package()
|
var mon = monkit.Package()
|
||||||
|
|
||||||
const (
|
const (
|
||||||
eventPrefix = "pe20293085"
|
eventPrefix = "pe20293085"
|
||||||
|
expiryBufferTime = 5 * time.Minute
|
||||||
)
|
)
|
||||||
|
|
||||||
// HubSpotConfig is a configuration struct for Concurrent Sending of Events.
|
// HubSpotConfig is a configuration struct for Concurrent Sending of Events.
|
||||||
type HubSpotConfig struct {
|
type HubSpotConfig struct {
|
||||||
APIKey string `help:"hubspot api key" default:""`
|
RefreshToken string `help:"hubspot refresh token" default:""`
|
||||||
|
TokenAPI string `help:"hubspot token refresh API" default:"https://api.hubapi.com/oauth/v1/token"`
|
||||||
|
ClientID string `help:"hubspot client ID" default:""`
|
||||||
|
ClientSecret string `help:"hubspot client secret" default:""`
|
||||||
ChannelSize int `help:"the number of events that can be in the queue before dropping" default:"1000"`
|
ChannelSize int `help:"the number of events that can be in the queue before dropping" default:"1000"`
|
||||||
ConcurrentSends int `help:"the number of concurrent api requests that can be made" default:"4"`
|
ConcurrentSends int `help:"the number of concurrent api requests that can be made" default:"4"`
|
||||||
DefaultTimeout time.Duration `help:"the default timeout for the hubspot http client" default:"10s"`
|
DefaultTimeout time.Duration `help:"the default timeout for the hubspot http client" default:"10s"`
|
||||||
@ -41,13 +47,24 @@ type HubSpotEvent struct {
|
|||||||
|
|
||||||
// HubSpotEvents is a configuration struct for sending Events data to HubSpot.
|
// HubSpotEvents is a configuration struct for sending Events data to HubSpot.
|
||||||
type HubSpotEvents struct {
|
type HubSpotEvents struct {
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
config HubSpotConfig
|
config HubSpotConfig
|
||||||
events chan []HubSpotEvent
|
events chan []HubSpotEvent
|
||||||
escapedAPIKey string
|
refreshToken string
|
||||||
satelliteName string
|
tokenAPI string
|
||||||
worker sync2.Limiter
|
satelliteName string
|
||||||
httpClient *http.Client
|
worker sync2.Limiter
|
||||||
|
httpClient *http.Client
|
||||||
|
clientID string
|
||||||
|
clientSecret string
|
||||||
|
accessTokenData *TokenData
|
||||||
|
mutex sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// TokenData contains data related to the Hubspot access token.
|
||||||
|
type TokenData struct {
|
||||||
|
AccessToken string
|
||||||
|
ExpiresAt time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewHubSpotEvents for sending user events to HubSpot.
|
// NewHubSpotEvents for sending user events to HubSpot.
|
||||||
@ -56,7 +73,10 @@ func NewHubSpotEvents(log *zap.Logger, config HubSpotConfig, satelliteName strin
|
|||||||
log: log,
|
log: log,
|
||||||
config: config,
|
config: config,
|
||||||
events: make(chan []HubSpotEvent, config.ChannelSize),
|
events: make(chan []HubSpotEvent, config.ChannelSize),
|
||||||
escapedAPIKey: url.QueryEscape(config.APIKey),
|
refreshToken: config.RefreshToken,
|
||||||
|
tokenAPI: config.TokenAPI,
|
||||||
|
clientID: config.ClientID,
|
||||||
|
clientSecret: config.ClientSecret,
|
||||||
satelliteName: satelliteName,
|
satelliteName: satelliteName,
|
||||||
worker: *sync2.NewLimiter(config.ConcurrentSends),
|
worker: *sync2.NewLimiter(config.ConcurrentSends),
|
||||||
httpClient: &http.Client{
|
httpClient: &http.Client{
|
||||||
@ -108,28 +128,33 @@ func (q *HubSpotEvents) EnqueueCreateUser(fields TrackCreateUserFields) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
createUser := HubSpotEvent{
|
data := map[string]interface{}{
|
||||||
Endpoint: "https://api.hsforms.com/submissions/v3/integration/submit/20293085/77cfa709-f533-44b8-bf3a-ed1278ca3202?hapikey=" + q.escapedAPIKey,
|
"fields": []map[string]interface{}{
|
||||||
Data: map[string]interface{}{
|
newField("email", fields.Email),
|
||||||
"context": map[string]interface{}{
|
newField("firstname", firstName),
|
||||||
"hutk": fields.HubspotUTK,
|
newField("lastname", lastName),
|
||||||
},
|
newField("lifecyclestage", "other"),
|
||||||
"fields": []map[string]interface{}{
|
newField("origin_header", fields.OriginHeader),
|
||||||
newField("email", fields.Email),
|
newField("signup_referrer", fields.Referrer),
|
||||||
newField("firstname", firstName),
|
newField("account_created", "true"),
|
||||||
newField("lastname", lastName),
|
newField("have_sales_contact", strconv.FormatBool(fields.HaveSalesContact)),
|
||||||
newField("lifecyclestage", "other"),
|
newField("signup_partner", fields.UserAgent),
|
||||||
newField("origin_header", fields.OriginHeader),
|
|
||||||
newField("signup_referrer", fields.Referrer),
|
|
||||||
newField("account_created", "true"),
|
|
||||||
newField("have_sales_contact", strconv.FormatBool(fields.HaveSalesContact)),
|
|
||||||
newField("signup_partner", fields.UserAgent),
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if fields.HubspotUTK != "" {
|
||||||
|
data["context"] = map[string]interface{}{
|
||||||
|
"hutk": fields.HubspotUTK,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
createUser := HubSpotEvent{
|
||||||
|
Endpoint: "https://api.hsforms.com/submissions/v3/integration/submit/20293085/77cfa709-f533-44b8-bf3a-ed1278ca3202",
|
||||||
|
Data: data,
|
||||||
|
}
|
||||||
|
|
||||||
sendUserEvent := HubSpotEvent{
|
sendUserEvent := HubSpotEvent{
|
||||||
Endpoint: "https://api.hubapi.com/events/v3/send?hapikey=" + q.escapedAPIKey,
|
Endpoint: "https://api.hubapi.com/events/v3/send",
|
||||||
Data: map[string]interface{}{
|
Data: map[string]interface{}{
|
||||||
"email": fields.Email,
|
"email": fields.Email,
|
||||||
"eventName": eventPrefix + "_" + strings.ToLower(q.satelliteName) + "_" + "account_created",
|
"eventName": eventPrefix + "_" + strings.ToLower(q.satelliteName) + "_" + "account_created",
|
||||||
@ -159,7 +184,7 @@ func (q *HubSpotEvents) EnqueueEvent(email, eventName string, properties map[str
|
|||||||
eventName = eventPrefix + "_" + eventName
|
eventName = eventPrefix + "_" + eventName
|
||||||
|
|
||||||
newEvent := HubSpotEvent{
|
newEvent := HubSpotEvent{
|
||||||
Endpoint: "https://api.hubapi.com/events/v3/send?hapikey=" + q.escapedAPIKey,
|
Endpoint: "https://api.hubapi.com/events/v3/send",
|
||||||
Data: map[string]interface{}{
|
Data: map[string]interface{}{
|
||||||
"email": email,
|
"email": email,
|
||||||
"eventName": eventName,
|
"eventName": eventName,
|
||||||
@ -185,15 +210,31 @@ func (q *HubSpotEvents) handleSingleEvent(ctx context.Context, ev HubSpotEvent)
|
|||||||
return Error.New("new request failed: %w", err)
|
return Error.New("new request failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
token, err := q.getAccessToken(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return Error.New("token request failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
req.Header.Set("Content-Type", "application/json")
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
req.Header.Set("Authorization", "Bearer "+token)
|
||||||
resp, err := q.httpClient.Do(req)
|
resp, err := q.httpClient.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Error.New("send request failed: %w", err)
|
return Error.New("send request failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = resp.Body.Close()
|
defer func() {
|
||||||
if err != nil {
|
err = errs.Combine(err, resp.Body.Close())
|
||||||
err = Error.New("closing resp body failed: %w", err)
|
}()
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
|
||||||
|
var data struct {
|
||||||
|
Message string `json:"message"`
|
||||||
|
}
|
||||||
|
err = json.NewDecoder(resp.Body).Decode(&data)
|
||||||
|
if err != nil {
|
||||||
|
return Error.New("decoding response failed: %w", err)
|
||||||
|
}
|
||||||
|
return Error.New("sending event failed: %s", data.Message)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -209,3 +250,69 @@ func (q *HubSpotEvents) Handle(ctx context.Context, events []HubSpotEvent) (err
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getAccessToken returns an access token for hubspot.
|
||||||
|
// It fetches a new token if there isn't one already or the old one is about to expire in expiryBufferTime.
|
||||||
|
// It locks q.mutex to ensure only one goroutine is able to request for a token.
|
||||||
|
func (q *HubSpotEvents) getAccessToken(ctx context.Context) (token string, err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
|
q.mutex.Lock()
|
||||||
|
defer q.mutex.Unlock()
|
||||||
|
|
||||||
|
if q.accessTokenData == nil || q.accessTokenData.ExpiresAt.Add(-expiryBufferTime).Before(time.Now()) {
|
||||||
|
q.accessTokenData, err = q.getAccessTokenFromHubspot(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return q.accessTokenData.AccessToken, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getAccessTokenFromHubspot gets a new access token from hubspot.
|
||||||
|
// Expects q.mutex to be locked.
|
||||||
|
func (q *HubSpotEvents) getAccessTokenFromHubspot(ctx context.Context) (_ *TokenData, err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
|
values := make(url.Values)
|
||||||
|
values.Set("grant_type", "refresh_token")
|
||||||
|
values.Set("client_id", q.clientID)
|
||||||
|
values.Set("client_secret", q.clientSecret)
|
||||||
|
values.Set("refresh_token", q.refreshToken)
|
||||||
|
|
||||||
|
encoded := values.Encode()
|
||||||
|
|
||||||
|
buff := bytes.NewBufferString(encoded)
|
||||||
|
|
||||||
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, q.tokenAPI, buff)
|
||||||
|
if err != nil {
|
||||||
|
return nil, Error.New("new request failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
|
||||||
|
resp, err := q.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, Error.New("send request failed: %w", err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
err = errs.Combine(err, resp.Body.Close())
|
||||||
|
}()
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
return nil, Error.New("send request failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var tokenData struct {
|
||||||
|
ExpiresIn int `json:"expires_in"`
|
||||||
|
AccessToken string `json:"access_token"`
|
||||||
|
}
|
||||||
|
err = json.NewDecoder(resp.Body).Decode(&tokenData)
|
||||||
|
if err != nil {
|
||||||
|
return nil, Error.New("decode response failed: %w", err)
|
||||||
|
}
|
||||||
|
return &TokenData{
|
||||||
|
AccessToken: tokenData.AccessToken,
|
||||||
|
ExpiresAt: time.Now().Add(time.Duration(tokenData.ExpiresIn * 1000)),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
15
scripts/testdata/satellite-config.yaml.lock
vendored
15
scripts/testdata/satellite-config.yaml.lock
vendored
@ -7,18 +7,27 @@
|
|||||||
# enable analytics reporting
|
# enable analytics reporting
|
||||||
# analytics.enabled: false
|
# analytics.enabled: false
|
||||||
|
|
||||||
# hubspot api key
|
|
||||||
# analytics.hub-spot.api-key: ""
|
|
||||||
|
|
||||||
# the number of events that can be in the queue before dropping
|
# the number of events that can be in the queue before dropping
|
||||||
# analytics.hub-spot.channel-size: 1000
|
# analytics.hub-spot.channel-size: 1000
|
||||||
|
|
||||||
|
# hubspot client ID
|
||||||
|
# analytics.hub-spot.client-id: ""
|
||||||
|
|
||||||
|
# hubspot client secret
|
||||||
|
# analytics.hub-spot.client-secret: ""
|
||||||
|
|
||||||
# the number of concurrent api requests that can be made
|
# the number of concurrent api requests that can be made
|
||||||
# analytics.hub-spot.concurrent-sends: 4
|
# analytics.hub-spot.concurrent-sends: 4
|
||||||
|
|
||||||
# the default timeout for the hubspot http client
|
# the default timeout for the hubspot http client
|
||||||
# analytics.hub-spot.default-timeout: 10s
|
# analytics.hub-spot.default-timeout: 10s
|
||||||
|
|
||||||
|
# hubspot refresh token
|
||||||
|
# analytics.hub-spot.refresh-token: ""
|
||||||
|
|
||||||
|
# hubspot token refresh API
|
||||||
|
# analytics.hub-spot.token-api: https://api.hubapi.com/oauth/v1/token
|
||||||
|
|
||||||
# segment write key
|
# segment write key
|
||||||
# analytics.segment-write-key: ""
|
# analytics.segment-write-key: ""
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user