diff --git a/satellite/analytics/hubspot.go b/satellite/analytics/hubspot.go index cf7ceb598..74663e542 100644 --- a/satellite/analytics/hubspot.go +++ b/satellite/analytics/hubspot.go @@ -11,9 +11,11 @@ import ( "net/url" "strconv" "strings" + "sync" "time" "github.com/spacemonkeygo/monkit/v3" + "github.com/zeebo/errs" "go.uber.org/zap" "storj.io/common/sync2" @@ -22,12 +24,16 @@ import ( var mon = monkit.Package() const ( - eventPrefix = "pe20293085" + eventPrefix = "pe20293085" + expiryBufferTime = 5 * time.Minute ) // HubSpotConfig is a configuration struct for Concurrent Sending of Events. type HubSpotConfig struct { - APIKey string `help:"hubspot api key" default:""` + RefreshToken string `help:"hubspot refresh token" default:""` + TokenAPI string `help:"hubspot token refresh API" default:"https://api.hubapi.com/oauth/v1/token"` + ClientID string `help:"hubspot client ID" default:""` + ClientSecret string `help:"hubspot client secret" default:""` ChannelSize int `help:"the number of events that can be in the queue before dropping" default:"1000"` ConcurrentSends int `help:"the number of concurrent api requests that can be made" default:"4"` DefaultTimeout time.Duration `help:"the default timeout for the hubspot http client" default:"10s"` @@ -41,13 +47,24 @@ type HubSpotEvent struct { // HubSpotEvents is a configuration struct for sending Events data to HubSpot. type HubSpotEvents struct { - log *zap.Logger - config HubSpotConfig - events chan []HubSpotEvent - escapedAPIKey string - satelliteName string - worker sync2.Limiter - httpClient *http.Client + log *zap.Logger + config HubSpotConfig + events chan []HubSpotEvent + refreshToken string + tokenAPI string + satelliteName string + worker sync2.Limiter + httpClient *http.Client + clientID string + clientSecret string + accessTokenData *TokenData + mutex sync.Mutex +} + +// TokenData contains data related to the Hubspot access token. +type TokenData struct { + AccessToken string + ExpiresAt time.Time } // NewHubSpotEvents for sending user events to HubSpot. @@ -56,7 +73,10 @@ func NewHubSpotEvents(log *zap.Logger, config HubSpotConfig, satelliteName strin log: log, config: config, events: make(chan []HubSpotEvent, config.ChannelSize), - escapedAPIKey: url.QueryEscape(config.APIKey), + refreshToken: config.RefreshToken, + tokenAPI: config.TokenAPI, + clientID: config.ClientID, + clientSecret: config.ClientSecret, satelliteName: satelliteName, worker: *sync2.NewLimiter(config.ConcurrentSends), httpClient: &http.Client{ @@ -108,28 +128,33 @@ func (q *HubSpotEvents) EnqueueCreateUser(fields TrackCreateUserFields) { } } - createUser := HubSpotEvent{ - Endpoint: "https://api.hsforms.com/submissions/v3/integration/submit/20293085/77cfa709-f533-44b8-bf3a-ed1278ca3202?hapikey=" + q.escapedAPIKey, - Data: map[string]interface{}{ - "context": map[string]interface{}{ - "hutk": fields.HubspotUTK, - }, - "fields": []map[string]interface{}{ - newField("email", fields.Email), - newField("firstname", firstName), - newField("lastname", lastName), - newField("lifecyclestage", "other"), - newField("origin_header", fields.OriginHeader), - newField("signup_referrer", fields.Referrer), - newField("account_created", "true"), - newField("have_sales_contact", strconv.FormatBool(fields.HaveSalesContact)), - newField("signup_partner", fields.UserAgent), - }, + data := map[string]interface{}{ + "fields": []map[string]interface{}{ + newField("email", fields.Email), + newField("firstname", firstName), + newField("lastname", lastName), + newField("lifecyclestage", "other"), + newField("origin_header", fields.OriginHeader), + newField("signup_referrer", fields.Referrer), + newField("account_created", "true"), + newField("have_sales_contact", strconv.FormatBool(fields.HaveSalesContact)), + newField("signup_partner", fields.UserAgent), }, } + if fields.HubspotUTK != "" { + data["context"] = map[string]interface{}{ + "hutk": fields.HubspotUTK, + } + } + + createUser := HubSpotEvent{ + Endpoint: "https://api.hsforms.com/submissions/v3/integration/submit/20293085/77cfa709-f533-44b8-bf3a-ed1278ca3202", + Data: data, + } + sendUserEvent := HubSpotEvent{ - Endpoint: "https://api.hubapi.com/events/v3/send?hapikey=" + q.escapedAPIKey, + Endpoint: "https://api.hubapi.com/events/v3/send", Data: map[string]interface{}{ "email": fields.Email, "eventName": eventPrefix + "_" + strings.ToLower(q.satelliteName) + "_" + "account_created", @@ -159,7 +184,7 @@ func (q *HubSpotEvents) EnqueueEvent(email, eventName string, properties map[str eventName = eventPrefix + "_" + eventName newEvent := HubSpotEvent{ - Endpoint: "https://api.hubapi.com/events/v3/send?hapikey=" + q.escapedAPIKey, + Endpoint: "https://api.hubapi.com/events/v3/send", Data: map[string]interface{}{ "email": email, "eventName": eventName, @@ -185,15 +210,31 @@ func (q *HubSpotEvents) handleSingleEvent(ctx context.Context, ev HubSpotEvent) return Error.New("new request failed: %w", err) } + token, err := q.getAccessToken(ctx) + if err != nil { + return Error.New("token request failed: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+token) resp, err := q.httpClient.Do(req) if err != nil { return Error.New("send request failed: %w", err) } - err = resp.Body.Close() - if err != nil { - err = Error.New("closing resp body failed: %w", err) + defer func() { + err = errs.Combine(err, resp.Body.Close()) + }() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { + var data struct { + Message string `json:"message"` + } + err = json.NewDecoder(resp.Body).Decode(&data) + if err != nil { + return Error.New("decoding response failed: %w", err) + } + return Error.New("sending event failed: %s", data.Message) } return err } @@ -209,3 +250,69 @@ func (q *HubSpotEvents) Handle(ctx context.Context, events []HubSpotEvent) (err } return nil } + +// getAccessToken returns an access token for hubspot. +// It fetches a new token if there isn't one already or the old one is about to expire in expiryBufferTime. +// It locks q.mutex to ensure only one goroutine is able to request for a token. +func (q *HubSpotEvents) getAccessToken(ctx context.Context) (token string, err error) { + defer mon.Task()(&ctx)(&err) + + q.mutex.Lock() + defer q.mutex.Unlock() + + if q.accessTokenData == nil || q.accessTokenData.ExpiresAt.Add(-expiryBufferTime).Before(time.Now()) { + q.accessTokenData, err = q.getAccessTokenFromHubspot(ctx) + if err != nil { + return "", err + } + } + + return q.accessTokenData.AccessToken, nil +} + +// getAccessTokenFromHubspot gets a new access token from hubspot. +// Expects q.mutex to be locked. +func (q *HubSpotEvents) getAccessTokenFromHubspot(ctx context.Context) (_ *TokenData, err error) { + defer mon.Task()(&ctx)(&err) + + values := make(url.Values) + values.Set("grant_type", "refresh_token") + values.Set("client_id", q.clientID) + values.Set("client_secret", q.clientSecret) + values.Set("refresh_token", q.refreshToken) + + encoded := values.Encode() + + buff := bytes.NewBufferString(encoded) + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, q.tokenAPI, buff) + if err != nil { + return nil, Error.New("new request failed: %w", err) + } + + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + resp, err := q.httpClient.Do(req) + if err != nil { + return nil, Error.New("send request failed: %w", err) + } + defer func() { + err = errs.Combine(err, resp.Body.Close()) + }() + + if resp.StatusCode != http.StatusOK { + return nil, Error.New("send request failed: %w", err) + } + + var tokenData struct { + ExpiresIn int `json:"expires_in"` + AccessToken string `json:"access_token"` + } + err = json.NewDecoder(resp.Body).Decode(&tokenData) + if err != nil { + return nil, Error.New("decode response failed: %w", err) + } + return &TokenData{ + AccessToken: tokenData.AccessToken, + ExpiresAt: time.Now().Add(time.Duration(tokenData.ExpiresIn * 1000)), + }, nil +} diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index 939ece2c8..9dfc7f71f 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -7,18 +7,27 @@ # enable analytics reporting # analytics.enabled: false -# hubspot api key -# analytics.hub-spot.api-key: "" - # the number of events that can be in the queue before dropping # analytics.hub-spot.channel-size: 1000 +# hubspot client ID +# analytics.hub-spot.client-id: "" + +# hubspot client secret +# analytics.hub-spot.client-secret: "" + # the number of concurrent api requests that can be made # analytics.hub-spot.concurrent-sends: 4 # the default timeout for the hubspot http client # analytics.hub-spot.default-timeout: 10s +# hubspot refresh token +# analytics.hub-spot.refresh-token: "" + +# hubspot token refresh API +# analytics.hub-spot.token-api: https://api.hubapi.com/oauth/v1/token + # segment write key # analytics.segment-write-key: ""