From 24787adb5bd088f8cb00dff9271003f21b334fb2 Mon Sep 17 00:00:00 2001 From: JT Olio Date: Wed, 22 May 2019 16:19:32 -0600 Subject: [PATCH] cmd/statreceiver: allow for packet filtering to packet destinations (#2019) What: allow packetfilter to work on packet destinations instead of only on metric destinations. this will allow us to filter what applications get sent to rothko. Why: currently rothko is drowning in storj-sim data and it'd be nice to filter out. --- cmd/statreceiver/example.lua | 12 +++------ cmd/statreceiver/filter.go | 47 +++++++++++++++++++++++++++++------- cmd/statreceiver/parser.go | 23 +++++------------- 3 files changed, 48 insertions(+), 34 deletions(-) diff --git a/cmd/statreceiver/example.lua b/cmd/statreceiver/example.lua index 611622d8b..88f186f1b 100644 --- a/cmd/statreceiver/example.lua +++ b/cmd/statreceiver/example.lua @@ -33,15 +33,10 @@ metric_handlers = mcopy( -- create a metric parser. metric_parser = - parse( -- parse takes one or two arguments. the first argument is - -- a metric handler, the remaining one is a per-packet application or - -- instance filter. each filter is a regex. all packets must - -- match all packet filters. - sanitize(metric_handlers), -- sanitize converts weird chars to underscores - packetfilter("storagenode-prod|satellite-prod|uplink-prod", "")) + parse(sanitize(metric_handlers)) -- sanitize converts weird chars to underscores -- pcopy forks data to multiple outputs --- output types include parse, fileout, and udpout +-- output types include parse, fileout, packetfilter, and udpout destination = pcopy( fileout("dump.out"), metric_parser, @@ -50,7 +45,8 @@ destination = pcopy( udpout("localhost:9001"), -- rothko - udpout("localhost:9002")) + packetfilter("storagenode-prod|satellite-prod|uplink-prod", "", + udpout("localhost:9002"))) -- tie the source to the destination deliver(source, destination) diff --git a/cmd/statreceiver/filter.go b/cmd/statreceiver/filter.go index d7ed294d5..1e59b466d 100644 --- a/cmd/statreceiver/filter.go +++ b/cmd/statreceiver/filter.go @@ -5,29 +5,58 @@ package main import ( "regexp" + "sync" "time" + + "github.com/zeebo/admission/admproto" + + "storj.io/storj/internal/memory" ) -// PacketFilter is used during Packet parsing to determine if the Packet should -// continue to be parsed. +// PacketFilter inspects a packet header to determine if it should be passed +// through type PacketFilter struct { application *regexp.Regexp instance *regexp.Regexp + dest PacketDest + scratch sync.Pool } -// NewPacketFilter creates a PacketFilter. It takes an application regular -// expression and an instance regular expression. If the regular expression -// is matched, the packet will be parsed. -func NewPacketFilter(applicationRegex, instanceRegex string) *PacketFilter { +// NewPacketFilter creates a PacketFilter. It takes a packet destination, +// an application regular expression, and an instance regular expression. +// If the regular expression is matched, the packet will be passed through. +func NewPacketFilter(applicationRegex, instanceRegex string, dest PacketDest) *PacketFilter { return &PacketFilter{ application: regexp.MustCompile(applicationRegex), instance: regexp.MustCompile(instanceRegex), + dest: dest, + scratch: sync.Pool{ + New: func() interface{} { + var x [10 * memory.KB]byte + return &x + }, + }, } } -// Filter returns true if the application and instance match the filter. -func (a *PacketFilter) Filter(application, instance string) bool { - return a.application.MatchString(application) && a.instance.MatchString(instance) +// Packet passes the packet along to the given destination if the regexes pass +func (a *PacketFilter) Packet(data []byte, ts time.Time) error { + cdata, err := admproto.CheckChecksum(data) + if err != nil { + return err + } + scratch := a.scratch.Get().(*[10 * memory.KB]byte) + defer a.scratch.Put(scratch) + + r := admproto.NewReaderWith((*scratch)[:]) + _, application, instance, err := r.Begin(cdata) + if err != nil { + return err + } + if a.application.Match(application) && a.instance.Match(instance) { + return a.dest.Packet(data, ts) + } + return nil } // KeyFilter is a MetricDest that only passes along metrics that pass the key diff --git a/cmd/statreceiver/parser.go b/cmd/statreceiver/parser.go index 0187d443e..6eae384db 100644 --- a/cmd/statreceiver/parser.go +++ b/cmd/statreceiver/parser.go @@ -9,28 +9,23 @@ import ( "time" "github.com/zeebo/admission/admproto" -) -const ( - kb = 1024 + "storj.io/storj/internal/memory" ) // Parser is a PacketDest that sends data to a MetricDest type Parser struct { dest MetricDest - filters []*PacketFilter scratch sync.Pool } -// NewParser creates a Parser. It sends metrics to dest, provided they pass all -// of the provided PacketFilters -func NewParser(dest MetricDest, filters ...*PacketFilter) *Parser { +// NewParser creates a Parser. It sends metrics to dest. +func NewParser(dest MetricDest) *Parser { return &Parser{ - dest: dest, - filters: filters, + dest: dest, scratch: sync.Pool{ New: func() interface{} { - var x [10 * kb]byte + var x [10 * memory.KB]byte return &x }, }, @@ -44,7 +39,7 @@ func (p *Parser) Packet(data []byte, ts time.Time) (err error) { return err } - scratch := p.scratch.Get().(*[10 * kb]byte) + scratch := p.scratch.Get().(*[10 * memory.KB]byte) defer p.scratch.Put(scratch) r := admproto.NewReaderWith((*scratch)[:]) @@ -54,12 +49,6 @@ func (p *Parser) Packet(data []byte, ts time.Time) (err error) { } app, inst := string(appb), string(instb) - for _, filter := range p.filters { - if !filter.Filter(app, inst) { - return nil - } - } - var key []byte var value float64 for len(data) > 0 {