cmd/statreceiver: allow for packet filtering to packet destinations (#2019)

What: allow packetfilter to work on packet destinations instead of only on metric destinations. this will allow us to filter what applications get sent to rothko.

Why: currently rothko is drowning in storj-sim data and it'd be nice to filter out.
This commit is contained in:
JT Olio 2019-05-22 16:19:32 -06:00 committed by GitHub
parent f9045f8385
commit 24787adb5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 48 additions and 34 deletions

View File

@ -33,15 +33,10 @@ metric_handlers = mcopy(
-- create a metric parser.
metric_parser =
parse( -- parse takes one or two arguments. the first argument is
-- a metric handler, the remaining one is a per-packet application or
-- instance filter. each filter is a regex. all packets must
-- match all packet filters.
sanitize(metric_handlers), -- sanitize converts weird chars to underscores
packetfilter("storagenode-prod|satellite-prod|uplink-prod", ""))
parse(sanitize(metric_handlers)) -- sanitize converts weird chars to underscores
-- pcopy forks data to multiple outputs
-- output types include parse, fileout, and udpout
-- output types include parse, fileout, packetfilter, and udpout
destination = pcopy(
fileout("dump.out"),
metric_parser,
@ -50,7 +45,8 @@ destination = pcopy(
udpout("localhost:9001"),
-- rothko
udpout("localhost:9002"))
packetfilter("storagenode-prod|satellite-prod|uplink-prod", "",
udpout("localhost:9002")))
-- tie the source to the destination
deliver(source, destination)

View File

@ -5,29 +5,58 @@ package main
import (
"regexp"
"sync"
"time"
"github.com/zeebo/admission/admproto"
"storj.io/storj/internal/memory"
)
// PacketFilter is used during Packet parsing to determine if the Packet should
// continue to be parsed.
// PacketFilter inspects a packet header to determine if it should be passed
// through
type PacketFilter struct {
application *regexp.Regexp
instance *regexp.Regexp
dest PacketDest
scratch sync.Pool
}
// NewPacketFilter creates a PacketFilter. It takes an application regular
// expression and an instance regular expression. If the regular expression
// is matched, the packet will be parsed.
func NewPacketFilter(applicationRegex, instanceRegex string) *PacketFilter {
// NewPacketFilter creates a PacketFilter. It takes a packet destination,
// an application regular expression, and an instance regular expression.
// If the regular expression is matched, the packet will be passed through.
func NewPacketFilter(applicationRegex, instanceRegex string, dest PacketDest) *PacketFilter {
return &PacketFilter{
application: regexp.MustCompile(applicationRegex),
instance: regexp.MustCompile(instanceRegex),
dest: dest,
scratch: sync.Pool{
New: func() interface{} {
var x [10 * memory.KB]byte
return &x
},
},
}
}
// Filter returns true if the application and instance match the filter.
func (a *PacketFilter) Filter(application, instance string) bool {
return a.application.MatchString(application) && a.instance.MatchString(instance)
// Packet passes the packet along to the given destination if the regexes pass
func (a *PacketFilter) Packet(data []byte, ts time.Time) error {
cdata, err := admproto.CheckChecksum(data)
if err != nil {
return err
}
scratch := a.scratch.Get().(*[10 * memory.KB]byte)
defer a.scratch.Put(scratch)
r := admproto.NewReaderWith((*scratch)[:])
_, application, instance, err := r.Begin(cdata)
if err != nil {
return err
}
if a.application.Match(application) && a.instance.Match(instance) {
return a.dest.Packet(data, ts)
}
return nil
}
// KeyFilter is a MetricDest that only passes along metrics that pass the key

View File

@ -9,28 +9,23 @@ import (
"time"
"github.com/zeebo/admission/admproto"
)
const (
kb = 1024
"storj.io/storj/internal/memory"
)
// Parser is a PacketDest that sends data to a MetricDest
type Parser struct {
dest MetricDest
filters []*PacketFilter
scratch sync.Pool
}
// NewParser creates a Parser. It sends metrics to dest, provided they pass all
// of the provided PacketFilters
func NewParser(dest MetricDest, filters ...*PacketFilter) *Parser {
// NewParser creates a Parser. It sends metrics to dest.
func NewParser(dest MetricDest) *Parser {
return &Parser{
dest: dest,
filters: filters,
dest: dest,
scratch: sync.Pool{
New: func() interface{} {
var x [10 * kb]byte
var x [10 * memory.KB]byte
return &x
},
},
@ -44,7 +39,7 @@ func (p *Parser) Packet(data []byte, ts time.Time) (err error) {
return err
}
scratch := p.scratch.Get().(*[10 * kb]byte)
scratch := p.scratch.Get().(*[10 * memory.KB]byte)
defer p.scratch.Put(scratch)
r := admproto.NewReaderWith((*scratch)[:])
@ -54,12 +49,6 @@ func (p *Parser) Packet(data []byte, ts time.Time) (err error) {
}
app, inst := string(appb), string(instb)
for _, filter := range p.filters {
if !filter.Filter(app, inst) {
return nil
}
}
var key []byte
var value float64
for len(data) > 0 {