5132d285db
Change-Id: I6545915c5cb93f6349c7b9d90f39e7d67c29038c
163 lines
3.4 KiB
Go
163 lines
3.4 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"net/url"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/zeebo/errs"
|
|
)
|
|
|
|
// InfluxDest is a MetricDest that sends data with the Influx TCP wire
|
|
// protocol.
|
|
type InfluxDest struct {
|
|
url string
|
|
token string
|
|
|
|
mu sync.Mutex
|
|
buf bytes.Buffer
|
|
stopped bool
|
|
}
|
|
|
|
// NewInfluxDest creates a InfluxDest with stats URL url. Because
|
|
// this function is called in a Lua pipeline domain-specific language, the DSL
|
|
// wants a Influx destination to be flushing every few seconds, so this
|
|
// constructor will start that process. Use Close to stop it.
|
|
func NewInfluxDest(writeURL string) *InfluxDest {
|
|
parsed, err := url.Parse(writeURL)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
token := parsed.Query().Get("authorization")
|
|
parsed.Query().Del("authorization")
|
|
|
|
rv := &InfluxDest{
|
|
url: parsed.String(),
|
|
token: token,
|
|
}
|
|
go rv.flush()
|
|
return rv
|
|
}
|
|
|
|
// Metric implements MetricDest
|
|
func (d *InfluxDest) Metric(application, instance string, key []byte, val float64, ts time.Time) error {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
|
|
// TODO(jeff): actual parsing of the key is very tricky in the presence of influx's busted
|
|
// escapes. If we could do that, we could more easily put the application tag in sorted order
|
|
// but since it begins with a, we'll do the easy thing and insert it first.
|
|
added := false
|
|
keyRange:
|
|
for i, val := range key {
|
|
switch {
|
|
case val != ' ' && val != ',':
|
|
continue
|
|
case i == 0:
|
|
break keyRange
|
|
case key[i-1] == '\\':
|
|
continue
|
|
}
|
|
|
|
var newKey []byte
|
|
newKey = append(newKey, key[:i]...)
|
|
newKey = append(newKey, ",application="...)
|
|
newKey = appendTag(newKey, application)
|
|
|
|
if strings.Contains(application, "satellite") {
|
|
newKey = append(newKey, ",instance="...)
|
|
newKey = appendTag(newKey, instance)
|
|
}
|
|
|
|
newKey = append(newKey, key[i:]...)
|
|
key = newKey
|
|
|
|
added = true
|
|
break
|
|
}
|
|
if !added {
|
|
log.Printf("metric dropped: %q", key)
|
|
return nil
|
|
}
|
|
|
|
_, err := fmt.Fprintf(&d.buf, "%s=%v %d\n", key, val, ts.Truncate(time.Second).UnixNano())
|
|
return err
|
|
}
|
|
|
|
// appendTag writes a tag key, value, or field key to the buffer.
|
|
func appendTag(buf []byte, tag string) []byte {
|
|
if !strings.ContainsAny(tag, ",= ") {
|
|
return append(buf, tag...)
|
|
}
|
|
|
|
for i := 0; i < len(tag); i++ {
|
|
if tag[i] == ',' ||
|
|
tag[i] == '=' ||
|
|
tag[i] == ' ' {
|
|
buf = append(buf, '\\')
|
|
}
|
|
buf = append(buf, tag[i])
|
|
}
|
|
|
|
return buf
|
|
}
|
|
|
|
// Close stops the flushing goroutine.
|
|
func (d *InfluxDest) Close() error {
|
|
d.mu.Lock()
|
|
d.stopped = true
|
|
d.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
func (d *InfluxDest) flush() {
|
|
for {
|
|
time.Sleep(5 * time.Second)
|
|
|
|
d.mu.Lock()
|
|
if d.stopped {
|
|
d.mu.Unlock()
|
|
return
|
|
}
|
|
data := append([]byte{}, d.buf.Bytes()...)
|
|
d.buf.Reset()
|
|
d.mu.Unlock()
|
|
|
|
if len(data) == 0 {
|
|
continue
|
|
}
|
|
|
|
err := func() (err error) {
|
|
req, err := http.NewRequest("POST", d.url, bytes.NewReader(data))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if d.token != "" {
|
|
req.Header.Set("Authorization", "Token "+d.token)
|
|
}
|
|
|
|
resp, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() { err = errs.Combine(err, resp.Body.Close()) }()
|
|
|
|
if resp.StatusCode != http.StatusNoContent {
|
|
return errs.New("invalid status code: %d", resp.StatusCode)
|
|
}
|
|
return nil
|
|
}()
|
|
if err != nil {
|
|
log.Printf("failed flushing: %v", err)
|
|
}
|
|
}
|
|
}
|