Track user events through HubSpot events API (#4300)
Track user events through HubSpot events API
This commit is contained in:
parent
27c6c6aeae
commit
a41758bba5
197
satellite/analytics/hubspot.go
Normal file
197
satellite/analytics/hubspot.go
Normal file
@ -0,0 +1,197 @@
|
||||
// Copyright (C) 2021 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package analytics
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/spacemonkeygo/monkit/v3"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/sync2"
|
||||
)
|
||||
|
||||
var mon = monkit.Package()
|
||||
|
||||
const (
|
||||
eventPrefix = "pe20293085"
|
||||
)
|
||||
|
||||
// HubSpotConfig is a configuration struct for Concurrent Sending of Events.
|
||||
type HubSpotConfig struct {
|
||||
APIKey string `help:"hubspot api key" default:""`
|
||||
ChannelSize int `help:"the number of events that can be in the queue before dropping" default:"1000"`
|
||||
ConcurrentSends int `help:"the number of concurrent api requests that can be made" default:"4"`
|
||||
DefaultTimeout time.Duration `help:"the default timeout for the hubspot http client" default:"10s"`
|
||||
}
|
||||
|
||||
// HubSpotEvent is a configuration struct for sending API request to HubSpot.
|
||||
type HubSpotEvent struct {
|
||||
Data map[string]interface{}
|
||||
Endpoint string
|
||||
}
|
||||
|
||||
// HubSpotEvents is a configuration struct for sending Events data to HubSpot.
|
||||
type HubSpotEvents struct {
|
||||
log *zap.Logger
|
||||
config HubSpotConfig
|
||||
events chan []HubSpotEvent
|
||||
escapedAPIKey string
|
||||
satelliteName string
|
||||
worker sync2.Limiter
|
||||
httpClient *http.Client
|
||||
}
|
||||
|
||||
// NewHubSpotEvents for sending user events to HubSpot.
|
||||
func NewHubSpotEvents(log *zap.Logger, config HubSpotConfig, satelliteName string) *HubSpotEvents {
|
||||
return &HubSpotEvents{
|
||||
log: log,
|
||||
config: config,
|
||||
events: make(chan []HubSpotEvent, config.ChannelSize),
|
||||
escapedAPIKey: url.QueryEscape(config.APIKey),
|
||||
satelliteName: satelliteName,
|
||||
worker: *sync2.NewLimiter(config.ConcurrentSends),
|
||||
httpClient: &http.Client{
|
||||
Timeout: config.DefaultTimeout,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Run for concurrent API requests.
|
||||
func (q *HubSpotEvents) Run(ctx context.Context) error {
|
||||
defer q.worker.Wait()
|
||||
for {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case ev := <-q.events:
|
||||
q.worker.Go(ctx, func() {
|
||||
err := q.Handle(ctx, ev)
|
||||
if err != nil {
|
||||
q.log.Error("Sending hubspot event", zap.Error(err))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// EnqueueCreateUser for creating user in HubSpot.
|
||||
func (q *HubSpotEvents) EnqueueCreateUser(fields TrackCreateUserFields) {
|
||||
fullName := fields.FullName
|
||||
names := strings.SplitN(fullName, " ", 2)
|
||||
|
||||
var firstName string
|
||||
var lastName string
|
||||
|
||||
if len(names) > 1 {
|
||||
firstName = names[0]
|
||||
lastName = names[1]
|
||||
} else {
|
||||
firstName = fullName
|
||||
}
|
||||
|
||||
createUser := HubSpotEvent{
|
||||
Endpoint: "https://api.hubapi.com/crm/v3/objects/contacts?hapikey=" + q.escapedAPIKey,
|
||||
Data: map[string]interface{}{
|
||||
"email": fields.Email,
|
||||
"properties": map[string]interface{}{
|
||||
"email": fields.Email,
|
||||
"firstname": firstName,
|
||||
"lastname": lastName,
|
||||
"lifecyclestage": "customer",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
sendUserEvent := HubSpotEvent{
|
||||
Endpoint: "https://api.hubapi.com/events/v3/send?hapikey=" + q.escapedAPIKey,
|
||||
Data: map[string]interface{}{
|
||||
"email": fields.Email,
|
||||
"eventName": eventPrefix + "_" + "account_created_new",
|
||||
"properties": map[string]interface{}{
|
||||
"userid": fields.ID.String(),
|
||||
"email": fields.Email,
|
||||
"name": fields.FullName,
|
||||
"satellite_selected": q.satelliteName,
|
||||
"account_type": string(fields.Type),
|
||||
"company_size": fields.EmployeeCount,
|
||||
"company_name": fields.CompanyName,
|
||||
"job_title": fields.JobTitle,
|
||||
"have_sales_contact": fields.HaveSalesContact,
|
||||
},
|
||||
},
|
||||
}
|
||||
select {
|
||||
case q.events <- []HubSpotEvent{createUser, sendUserEvent}:
|
||||
default:
|
||||
q.log.Error("create user hubspot event failed, event channel is full")
|
||||
}
|
||||
}
|
||||
|
||||
// EnqueueEvent for sending user behavioral event to HubSpot.
|
||||
func (q *HubSpotEvents) EnqueueEvent(email, eventName string, properties map[string]interface{}) {
|
||||
eventName = strings.ReplaceAll(eventName, " ", "_")
|
||||
eventName = strings.ToLower(eventName)
|
||||
eventName = eventPrefix + "_" + eventName
|
||||
|
||||
newEvent := HubSpotEvent{
|
||||
Endpoint: "https://api.hubapi.com/events/v3/send?hapikey=" + q.escapedAPIKey,
|
||||
Data: map[string]interface{}{
|
||||
"email": email,
|
||||
"eventName": eventName,
|
||||
"properties": properties,
|
||||
},
|
||||
}
|
||||
select {
|
||||
case q.events <- []HubSpotEvent{newEvent}:
|
||||
default:
|
||||
q.log.Error("sending hubspot event failed, event channel is full")
|
||||
}
|
||||
}
|
||||
|
||||
// handleSingleEvent for handle the single HubSpot API request.
|
||||
func (q *HubSpotEvents) handleSingleEvent(ctx context.Context, ev HubSpotEvent) (err error) {
|
||||
payloadBytes, err := json.Marshal(ev.Data)
|
||||
if err != nil {
|
||||
return Error.New("json marshal failed: %w", err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, ev.Endpoint, bytes.NewReader(payloadBytes))
|
||||
if err != nil {
|
||||
return Error.New("new request failed: %w", err)
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
resp, err := q.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return Error.New("send request failed: %w", err)
|
||||
}
|
||||
|
||||
err = resp.Body.Close()
|
||||
if err != nil {
|
||||
err = Error.New("closing resp body failed: %w", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Handle for handle the HubSpot API requests.
|
||||
func (q *HubSpotEvents) Handle(ctx context.Context, events []HubSpotEvent) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
for _, ev := range events {
|
||||
err := q.handleSingleEvent(ctx, ev)
|
||||
if err != nil {
|
||||
return Error.New("handle event: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
@ -4,6 +4,9 @@
|
||||
package analytics
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
segment "gopkg.in/segmentio/analytics-go.v3"
|
||||
|
||||
@ -23,10 +26,16 @@ const (
|
||||
eventLinkShared = "Link Shared"
|
||||
)
|
||||
|
||||
var (
|
||||
// Error is the default error class the analytics package.
|
||||
Error = errs.Class("analytics service")
|
||||
)
|
||||
|
||||
// Config is a configuration struct for analytics Service.
|
||||
type Config struct {
|
||||
SegmentWriteKey string `help:"segment write key" default:""`
|
||||
Enabled bool `help:"enable analytics reporting" default:"false"`
|
||||
HubSpot HubSpotConfig
|
||||
}
|
||||
|
||||
// Service for sending analytics.
|
||||
@ -39,6 +48,7 @@ type Service struct {
|
||||
clientEvents map[string]bool
|
||||
|
||||
segment segment.Client
|
||||
hubspot *HubSpotEvents
|
||||
}
|
||||
|
||||
// NewService creates new service for creating sending analytics.
|
||||
@ -48,6 +58,7 @@ func NewService(log *zap.Logger, config Config, satelliteName string) *Service {
|
||||
config: config,
|
||||
satelliteName: satelliteName,
|
||||
clientEvents: make(map[string]bool),
|
||||
hubspot: NewHubSpotEvents(log.Named("hubspotclient"), config.HubSpot, satelliteName),
|
||||
}
|
||||
if config.Enabled {
|
||||
service.segment = segment.New(config.SegmentWriteKey)
|
||||
@ -55,15 +66,23 @@ func NewService(log *zap.Logger, config Config, satelliteName string) *Service {
|
||||
for _, name := range []string{eventGatewayCredentialsCreated, eventPassphraseCreated, eventExternalLinkClicked, eventPathSelected, eventLinkShared} {
|
||||
service.clientEvents[name] = true
|
||||
}
|
||||
|
||||
return service
|
||||
}
|
||||
|
||||
// Run runs the service and use the context in new requests.
|
||||
func (service *Service) Run(ctx context.Context) error {
|
||||
if !service.config.Enabled {
|
||||
return nil
|
||||
}
|
||||
return service.hubspot.Run(ctx)
|
||||
}
|
||||
|
||||
// Close closes the Segment client.
|
||||
func (service *Service) Close() error {
|
||||
if !service.config.Enabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
return service.segment.Close()
|
||||
}
|
||||
|
||||
@ -91,10 +110,6 @@ type TrackCreateUserFields struct {
|
||||
}
|
||||
|
||||
func (service *Service) enqueueMessage(message segment.Message) {
|
||||
if !service.config.Enabled {
|
||||
return
|
||||
}
|
||||
|
||||
err := service.segment.Enqueue(message)
|
||||
if err != nil {
|
||||
service.log.Error("Error enqueueing message", zap.Error(err))
|
||||
@ -103,6 +118,10 @@ func (service *Service) enqueueMessage(message segment.Message) {
|
||||
|
||||
// TrackCreateUser sends an "Account Created" event to Segment.
|
||||
func (service *Service) TrackCreateUser(fields TrackCreateUserFields) {
|
||||
if !service.config.Enabled {
|
||||
return
|
||||
}
|
||||
|
||||
traits := segment.NewTraits()
|
||||
traits.SetName(fields.FullName)
|
||||
traits.SetEmail(fields.Email)
|
||||
@ -132,10 +151,16 @@ func (service *Service) TrackCreateUser(fields TrackCreateUserFields) {
|
||||
Event: eventAccountCreated,
|
||||
Properties: props,
|
||||
})
|
||||
|
||||
service.hubspot.EnqueueCreateUser(fields)
|
||||
}
|
||||
|
||||
// TrackSignedIn sends an "Signed In" event to Segment.
|
||||
func (service *Service) TrackSignedIn(userID uuid.UUID, email string) {
|
||||
if !service.config.Enabled {
|
||||
return
|
||||
}
|
||||
|
||||
traits := segment.NewTraits()
|
||||
traits.SetEmail(email)
|
||||
|
||||
@ -152,10 +177,17 @@ func (service *Service) TrackSignedIn(userID uuid.UUID, email string) {
|
||||
Event: eventSignedIn,
|
||||
Properties: props,
|
||||
})
|
||||
|
||||
service.hubspot.EnqueueEvent(email, eventSignedIn, map[string]interface{}{
|
||||
"userid": userID.String(),
|
||||
})
|
||||
}
|
||||
|
||||
// TrackProjectCreated sends an "Project Created" event to Segment.
|
||||
func (service *Service) TrackProjectCreated(userID, projectID uuid.UUID, currentProjectCount int) {
|
||||
func (service *Service) TrackProjectCreated(userID uuid.UUID, email string, projectID uuid.UUID, currentProjectCount int) {
|
||||
if !service.config.Enabled {
|
||||
return
|
||||
}
|
||||
|
||||
props := segment.NewProperties()
|
||||
props.Set("project_count", currentProjectCount)
|
||||
@ -166,19 +198,36 @@ func (service *Service) TrackProjectCreated(userID, projectID uuid.UUID, current
|
||||
Event: eventProjectCreated,
|
||||
Properties: props,
|
||||
})
|
||||
|
||||
service.hubspot.EnqueueEvent(email, eventProjectCreated, map[string]interface{}{
|
||||
"userid": userID.String(),
|
||||
"project_count": currentProjectCount,
|
||||
"project_id": projectID.String(),
|
||||
})
|
||||
}
|
||||
|
||||
// TrackAccessGrantCreated sends an "Access Grant Created" event to Segment.
|
||||
func (service *Service) TrackAccessGrantCreated(userID uuid.UUID) {
|
||||
func (service *Service) TrackAccessGrantCreated(userID uuid.UUID, email string) {
|
||||
if !service.config.Enabled {
|
||||
return
|
||||
}
|
||||
|
||||
service.enqueueMessage(segment.Track{
|
||||
UserId: userID.String(),
|
||||
Event: eventAccessGrantCreated,
|
||||
})
|
||||
|
||||
service.hubspot.EnqueueEvent(email, eventAccessGrantCreated, map[string]interface{}{
|
||||
"userid": userID.String(),
|
||||
})
|
||||
}
|
||||
|
||||
// TrackAccountVerified sends an "Account Verified" event to Segment.
|
||||
func (service *Service) TrackAccountVerified(userID uuid.UUID, email string) {
|
||||
if !service.config.Enabled {
|
||||
return
|
||||
}
|
||||
|
||||
traits := segment.NewTraits()
|
||||
traits.SetEmail(email)
|
||||
|
||||
@ -195,11 +244,19 @@ func (service *Service) TrackAccountVerified(userID uuid.UUID, email string) {
|
||||
Event: eventAccountVerified,
|
||||
Properties: props,
|
||||
})
|
||||
|
||||
service.hubspot.EnqueueEvent(email, eventAccountVerified, map[string]interface{}{
|
||||
"userid": userID.String(),
|
||||
})
|
||||
}
|
||||
|
||||
// TrackEvent sends an arbitrary event associated with user ID to Segment.
|
||||
// It is used for tracking occurrences of client-side events.
|
||||
func (service *Service) TrackEvent(eventName string, userID uuid.UUID) {
|
||||
func (service *Service) TrackEvent(eventName string, userID uuid.UUID, email string) {
|
||||
if !service.config.Enabled {
|
||||
return
|
||||
}
|
||||
|
||||
// do not track if the event name is an invalid client-side event
|
||||
if !service.clientEvents[eventName] {
|
||||
service.log.Error("Invalid client-triggered event", zap.String("eventName", eventName))
|
||||
@ -209,11 +266,18 @@ func (service *Service) TrackEvent(eventName string, userID uuid.UUID) {
|
||||
UserId: userID.String(),
|
||||
Event: eventName,
|
||||
})
|
||||
|
||||
service.hubspot.EnqueueEvent(email, eventName, map[string]interface{}{
|
||||
"userid": userID.String(),
|
||||
})
|
||||
}
|
||||
|
||||
// TrackLinkEvent sends an arbitrary event and link associated with user ID to Segment.
|
||||
// It is used for tracking occurrences of client-side events.
|
||||
func (service *Service) TrackLinkEvent(eventName string, userID uuid.UUID, link string) {
|
||||
func (service *Service) TrackLinkEvent(eventName string, userID uuid.UUID, email, link string) {
|
||||
if !service.config.Enabled {
|
||||
return
|
||||
}
|
||||
|
||||
// do not track if the event name is an invalid client-side event
|
||||
if !service.clientEvents[eventName] {
|
||||
@ -229,4 +293,9 @@ func (service *Service) TrackLinkEvent(eventName string, userID uuid.UUID, link
|
||||
Event: eventName,
|
||||
Properties: props,
|
||||
})
|
||||
|
||||
service.hubspot.EnqueueEvent(email, eventName, map[string]interface{}{
|
||||
"userid": userID.String(),
|
||||
"link": link,
|
||||
})
|
||||
}
|
||||
|
@ -381,6 +381,7 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
|
||||
|
||||
peer.Services.Add(lifecycle.Item{
|
||||
Name: "analytics:service",
|
||||
Run: peer.Analytics.Service.Run,
|
||||
Close: peer.Analytics.Service.Close,
|
||||
})
|
||||
}
|
||||
|
@ -56,16 +56,16 @@ func (a *Analytics) EventTriggered(w http.ResponseWriter, r *http.Request) {
|
||||
if err != nil {
|
||||
a.serveJSONError(w, http.StatusInternalServerError, err)
|
||||
}
|
||||
userID, err := a.service.GetUserID(ctx)
|
||||
|
||||
auth, err := console.GetAuth(ctx)
|
||||
if err != nil {
|
||||
a.serveJSONError(w, http.StatusInternalServerError, err)
|
||||
return
|
||||
}
|
||||
|
||||
if et.Link != "" {
|
||||
a.analytics.TrackLinkEvent(et.EventName, userID, et.Link)
|
||||
a.analytics.TrackLinkEvent(et.EventName, auth.User.ID, auth.User.Email, et.Link)
|
||||
} else {
|
||||
a.analytics.TrackEvent(et.EventName, userID)
|
||||
a.analytics.TrackEvent(et.EventName, auth.User.ID, auth.User.Email)
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
@ -1136,7 +1136,7 @@ func (s *Service) CreateProject(ctx context.Context, projectInfo ProjectInfo) (p
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
s.analytics.TrackProjectCreated(auth.User.ID, projectID, currentProjectCount+1)
|
||||
s.analytics.TrackProjectCreated(auth.User.ID, auth.User.Email, projectID, currentProjectCount+1)
|
||||
|
||||
return p, nil
|
||||
}
|
||||
@ -1403,7 +1403,7 @@ func (s *Service) CreateAPIKey(ctx context.Context, projectID uuid.UUID, name st
|
||||
return nil, nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
s.analytics.TrackAccessGrantCreated(auth.User.ID)
|
||||
s.analytics.TrackAccessGrantCreated(auth.User.ID, auth.User.Email)
|
||||
|
||||
return info, key, nil
|
||||
}
|
||||
|
12
scripts/testdata/satellite-config.yaml.lock
vendored
12
scripts/testdata/satellite-config.yaml.lock
vendored
@ -37,6 +37,18 @@
|
||||
# enable analytics reporting
|
||||
# analytics.enabled: false
|
||||
|
||||
# hubspot api key
|
||||
# analytics.hub-spot.api-key: ""
|
||||
|
||||
# the number of events that can be in the queue before dropping
|
||||
# analytics.hub-spot.channel-size: 1000
|
||||
|
||||
# the number of concurrent api requests that can be made
|
||||
# analytics.hub-spot.concurrent-sends: 4
|
||||
|
||||
# the default timeout for the hubspot http client
|
||||
# analytics.hub-spot.default-timeout: 10s
|
||||
|
||||
# segment write key
|
||||
# analytics.segment-write-key: ""
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user