Skip to content

Commit 1a4e43d

Browse files
pimluburaizu
andauthored
[NPM-4291] Add config to exclude subnets from dynamic path (#35035)
Co-authored-by: Bryce Eadie <[email protected]>
1 parent 18bdd73 commit 1a4e43d

File tree

12 files changed

+372
-199
lines changed

12 files changed

+372
-199
lines changed

comp/networkpath/npcollector/npcollectorimpl/config.go

+4
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ type collectorConfigs struct {
2525
reverseDNSTimeout time.Duration
2626
disableIntraVPCCollection bool
2727
networkDevicesNamespace string
28+
sourceExcludedConns map[string][]string
29+
destExcludedConns map[string][]string
2830
}
2931

3032
func newConfig(agentConfig config.Component) *collectorConfigs {
@@ -46,6 +48,8 @@ func newConfig(agentConfig config.Component) *collectorConfigs {
4648
reverseDNSEnabled: agentConfig.GetBool("network_path.collector.reverse_dns_enrichment.enabled"),
4749
reverseDNSTimeout: agentConfig.GetDuration("network_path.collector.reverse_dns_enrichment.timeout") * time.Millisecond,
4850
disableIntraVPCCollection: agentConfig.GetBool("network_path.collector.disable_intra_vpc_collection"),
51+
sourceExcludedConns: agentConfig.GetStringMapStringSlice("network_path.collector.source_excludes"),
52+
destExcludedConns: agentConfig.GetStringMapStringSlice("network_path.collector.dest_excludes"),
4953
networkDevicesNamespace: agentConfig.GetString("network_devices.namespace"),
5054
}
5155
}

comp/networkpath/npcollector/npcollectorimpl/npcollector.go

+59-36
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"errors"
1313
"fmt"
1414
"net"
15+
"net/netip"
1516
"time"
1617

1718
model "github.com/DataDog/agent-payload/v5/process"
@@ -25,6 +26,7 @@ import (
2526
"github.com/DataDog/datadog-agent/comp/networkpath/npcollector/npcollectorimpl/pathteststore"
2627
rdnsquerier "github.com/DataDog/datadog-agent/comp/rdnsquerier/def"
2728
"github.com/DataDog/datadog-agent/pkg/logs/message"
29+
filter "github.com/DataDog/datadog-agent/pkg/network/tracer/networkfilter"
2830
"github.com/DataDog/datadog-agent/pkg/networkpath/payload"
2931
"github.com/DataDog/datadog-agent/pkg/networkpath/traceroute"
3032
"github.com/DataDog/datadog-agent/pkg/networkpath/traceroute/config"
@@ -40,7 +42,10 @@ const (
4042
)
4143

4244
type npCollectorImpl struct {
45+
// config related
4346
collectorConfigs *collectorConfigs
47+
sourceExcludes []*filter.ConnectionFilter
48+
destExcludes []*filter.ConnectionFilter
4449

4550
// Deps
4651
epForwarder eventplatform.Forwarder
@@ -84,26 +89,17 @@ func newNoopNpCollectorImpl() *npCollectorImpl {
8489
}
8590

8691
func newNpCollectorImpl(epForwarder eventplatform.Forwarder, collectorConfigs *collectorConfigs, logger log.Component, telemetrycomp telemetryComp.Component, rdnsquerier rdnsquerier.Component, statsd ddgostatsd.ClientInterface) *npCollectorImpl {
87-
logger.Infof("New NpCollector (workers=%d timeout=%d max_ttl=%d input_chan_size=%d processing_chan_size=%d pathtest_contexts_limit=%d pathtest_ttl=%s pathtest_interval=%s max_per_minute=%d flush_interval=%s reverse_dns_enabled=%t reverse_dns_timeout=%d)",
88-
collectorConfigs.workers,
89-
collectorConfigs.timeout,
90-
collectorConfigs.maxTTL,
91-
collectorConfigs.pathtestInputChanSize,
92-
collectorConfigs.pathtestProcessingChanSize,
93-
collectorConfigs.storeConfig.ContextsLimit,
94-
collectorConfigs.storeConfig.TTL,
95-
collectorConfigs.storeConfig.Interval,
96-
collectorConfigs.storeConfig.MaxPerMinute,
97-
collectorConfigs.flushInterval,
98-
collectorConfigs.reverseDNSEnabled,
99-
collectorConfigs.reverseDNSTimeout,
100-
)
92+
logger.Infof("New NpCollector %+v", collectorConfigs)
10193

10294
return &npCollectorImpl{
103-
epForwarder: epForwarder,
10495
collectorConfigs: collectorConfigs,
105-
rdnsquerier: rdnsquerier,
106-
logger: logger,
96+
sourceExcludes: filter.ParseConnectionFilters(collectorConfigs.sourceExcludedConns),
97+
destExcludes: filter.ParseConnectionFilters(collectorConfigs.destExcludedConns),
98+
99+
epForwarder: epForwarder,
100+
logger: logger,
101+
statsdClient: statsd,
102+
rdnsquerier: rdnsquerier,
107103

108104
pathtestStore: pathteststore.NewPathtestStore(collectorConfigs.storeConfig, logger, statsd, time.Now),
109105
pathtestInputChan: make(chan *common.Pathtest, collectorConfigs.pathtestInputChanSize),
@@ -124,7 +120,6 @@ func newNpCollectorImpl(epForwarder eventplatform.Forwarder, collectorConfigs *c
124120
flushLoopDone: make(chan struct{}),
125121

126122
runTraceroute: runTraceroute,
127-
statsdClient: statsd,
128123
}
129124
}
130125

@@ -157,15 +152,57 @@ func makePathtest(conn *model.Connection, dns map[string]*model.DNSEntry) common
157152
}
158153
}
159154

160-
func doSubnetsContainIP(subnets []*net.IPNet, ip net.IP) bool {
155+
func doSubnetsContainIP(subnets []*net.IPNet, ip netip.Addr) bool {
161156
for _, subnet := range subnets {
162-
if subnet.Contains(ip) {
157+
if subnet.Contains(net.IP(ip.AsSlice())) {
163158
return true
164159
}
165160
}
166161
return false
167162
}
168163

164+
func (s *npCollectorImpl) checkPassesConnCIDRFilters(conn *model.Connection, vpcSubnets []*net.IPNet) bool {
165+
if len(vpcSubnets) == 0 && len(s.sourceExcludes) == 0 && len(s.destExcludes) == 0 {
166+
// this should be most customers - parsing IPs is not necessary
167+
return true
168+
}
169+
170+
sourceAddr, err := netip.ParseAddr(conn.Laddr.Ip)
171+
if err != nil {
172+
s.statsdClient.Incr(netpathConnsSkippedMetricName, []string{"reason:failed_parse_source_ip"}, 1) //nolint:errcheck
173+
return false
174+
}
175+
source := netip.AddrPortFrom(sourceAddr, uint16(conn.Laddr.Port))
176+
177+
translatedDest := conn.Raddr.Ip
178+
// prefer IP translation if it's available
179+
if conn.IpTranslation != nil && conn.IpTranslation.ReplDstIP != "" {
180+
translatedDest = conn.IpTranslation.ReplDstIP
181+
}
182+
destAddr, err := netip.ParseAddr(translatedDest)
183+
if err != nil {
184+
s.statsdClient.Incr(netpathConnsSkippedMetricName, []string{"reason:failed_parse_dest_ip"}, 1) //nolint:errcheck
185+
return false
186+
}
187+
dest := netip.AddrPortFrom(destAddr, uint16(conn.Raddr.Port))
188+
189+
if doSubnetsContainIP(vpcSubnets, dest.Addr()) {
190+
s.statsdClient.Incr(netpathConnsSkippedMetricName, []string{"reason:skip_intra_vpc"}, 1) //nolint:errcheck
191+
return false
192+
}
193+
194+
filterable := filter.FilterableConnection{
195+
Type: conn.Type,
196+
Source: source,
197+
Dest: dest,
198+
}
199+
if filter.IsExcludedConnection(s.sourceExcludes, s.destExcludes, filterable) {
200+
s.statsdClient.Incr(netpathConnsSkippedMetricName, []string{"reason:skip_cidr_excluded"}, 1) //nolint:errcheck
201+
return false
202+
}
203+
return true
204+
205+
}
169206
func (s *npCollectorImpl) shouldScheduleNetworkPathForConn(conn *model.Connection, vpcSubnets []*net.IPNet) bool {
170207
if conn == nil {
171208
return false
@@ -183,22 +220,8 @@ func (s *npCollectorImpl) shouldScheduleNetworkPathForConn(conn *model.Connectio
183220
s.statsdClient.Incr(netpathConnsSkippedMetricName, []string{"reason:skip_ipv6"}, 1) //nolint:errcheck
184221
return false
185222
}
186-
translatedDest := conn.Raddr.Ip
187-
// prefer IP translation if it's available
188-
if conn.IpTranslation != nil && conn.IpTranslation.ReplDstIP != "" {
189-
translatedDest = conn.IpTranslation.ReplDstIP
190-
}
191-
remoteIP := net.ParseIP(translatedDest)
192-
if remoteIP.IsLoopback() {
193-
// is this case possible, given that we already filter out IntraHost?
194-
s.statsdClient.Incr(netpathConnsSkippedMetricName, []string{"reason:skip_loopback"}, 1) //nolint:errcheck
195-
return false
196-
}
197-
if doSubnetsContainIP(vpcSubnets, remoteIP) {
198-
s.statsdClient.Incr(netpathConnsSkippedMetricName, []string{"reason:skip_intra_vpc"}, 1) //nolint:errcheck
199-
return false
200-
}
201-
return true
223+
224+
return s.checkPassesConnCIDRFilters(conn, vpcSubnets)
202225
}
203226

204227
func (s *npCollectorImpl) getVPCSubnets() ([]*net.IPNet, error) {

comp/networkpath/npcollector/npcollectorimpl/npcollector_test.go

+112-5
Original file line numberDiff line numberDiff line change
@@ -891,14 +891,18 @@ func Test_npCollectorImpl_getReverseDNSResult(t *testing.T) {
891891
}
892892

893893
var subnetSkippedStat = teststatsd.MetricsArgs{Name: netpathConnsSkippedMetricName, Value: 1, Tags: []string{"reason:skip_intra_vpc"}, Rate: 1}
894+
var cidrExcludedStat = teststatsd.MetricsArgs{Name: netpathConnsSkippedMetricName, Value: 1, Tags: []string{"reason:skip_cidr_excluded"}, Rate: 1}
894895

895896
func Test_npCollectorImpl_shouldScheduleNetworkPathForConn(t *testing.T) {
896897
tests := []struct {
897-
name string
898-
conn *model.Connection
899-
vpcSubnets []*net.IPNet
900-
shouldSchedule bool
901-
subnetSkipped bool
898+
name string
899+
conn *model.Connection
900+
vpcSubnets []*net.IPNet
901+
shouldSchedule bool
902+
subnetSkipped bool
903+
sourceExcludes map[string][]string
904+
destExcludes map[string][]string
905+
connectionExcluded bool
902906
}{
903907
{
904908
name: "should schedule",
@@ -946,6 +950,7 @@ func Test_npCollectorImpl_shouldScheduleNetworkPathForConn(t *testing.T) {
946950
Raddr: &model.Addr{Ip: "127.0.0.2", Port: int32(80)},
947951
Direction: model.ConnectionDirection_outgoing,
948952
Family: model.ConnectionFamily_v4,
953+
IntraHost: true, // loopback is always IntraHost
949954
},
950955
shouldSchedule: false,
951956
},
@@ -1025,13 +1030,110 @@ func Test_npCollectorImpl_shouldScheduleNetworkPathForConn(t *testing.T) {
10251030
shouldSchedule: false,
10261031
subnetSkipped: true,
10271032
},
1033+
// connection exclusion tests
1034+
{
1035+
name: "exclusion: block dest exactly",
1036+
conn: &model.Connection{
1037+
Laddr: &model.Addr{Ip: "10.0.0.1", Port: int32(30000)},
1038+
Raddr: &model.Addr{Ip: "10.0.0.2", Port: int32(80)},
1039+
Direction: model.ConnectionDirection_outgoing,
1040+
},
1041+
destExcludes: map[string][]string{
1042+
"10.0.0.2": {"80"},
1043+
},
1044+
shouldSchedule: false,
1045+
connectionExcluded: true,
1046+
},
1047+
{
1048+
name: "exclusion: block dest but different port",
1049+
conn: &model.Connection{
1050+
Laddr: &model.Addr{Ip: "10.0.0.1", Port: int32(30000)},
1051+
Raddr: &model.Addr{Ip: "10.0.0.2", Port: int32(80)},
1052+
Direction: model.ConnectionDirection_outgoing,
1053+
},
1054+
destExcludes: map[string][]string{
1055+
"10.0.0.2": {"42"},
1056+
},
1057+
shouldSchedule: true,
1058+
connectionExcluded: false,
1059+
},
1060+
{
1061+
name: "exclusion: block source with port range",
1062+
conn: &model.Connection{
1063+
Laddr: &model.Addr{Ip: "10.0.0.1", Port: int32(30000)},
1064+
Raddr: &model.Addr{Ip: "10.0.0.2", Port: int32(80)},
1065+
Direction: model.ConnectionDirection_outgoing,
1066+
},
1067+
sourceExcludes: map[string][]string{
1068+
"10.0.0.1": {"30000-30005"},
1069+
},
1070+
shouldSchedule: false,
1071+
connectionExcluded: true,
1072+
},
1073+
{
1074+
name: "exclusion: block dest subnet",
1075+
conn: &model.Connection{
1076+
Laddr: &model.Addr{Ip: "10.0.0.1", Port: int32(30000)},
1077+
Raddr: &model.Addr{Ip: "10.0.0.2", Port: int32(80)},
1078+
Direction: model.ConnectionDirection_outgoing,
1079+
},
1080+
destExcludes: map[string][]string{
1081+
"10.0.0.0/8": {"*"},
1082+
},
1083+
shouldSchedule: false,
1084+
connectionExcluded: true,
1085+
},
1086+
{
1087+
name: "exclusion: block dest subnet, no match",
1088+
conn: &model.Connection{
1089+
Laddr: &model.Addr{Ip: "10.0.0.1", Port: int32(30000)},
1090+
Raddr: &model.Addr{Ip: "192.168.1.1", Port: int32(80)},
1091+
Direction: model.ConnectionDirection_outgoing,
1092+
},
1093+
destExcludes: map[string][]string{
1094+
"10.0.0.0/8": {"*"},
1095+
},
1096+
shouldSchedule: true,
1097+
connectionExcluded: false,
1098+
},
1099+
{
1100+
name: "exclusion: only UDP, matching case",
1101+
conn: &model.Connection{
1102+
Type: model.ConnectionType_udp,
1103+
Laddr: &model.Addr{Ip: "10.0.0.1", Port: int32(30000)},
1104+
Raddr: &model.Addr{Ip: "10.0.0.2", Port: int32(123)},
1105+
Direction: model.ConnectionDirection_outgoing,
1106+
},
1107+
sourceExcludes: map[string][]string{
1108+
"10.0.0.0/8": {"udp *"},
1109+
},
1110+
shouldSchedule: false,
1111+
connectionExcluded: true,
1112+
},
1113+
{
1114+
name: "exclusion: only UDP, non-matching case",
1115+
conn: &model.Connection{
1116+
// (tcp is 0 so this doesn't actually do anything)
1117+
Type: model.ConnectionType_tcp,
1118+
Laddr: &model.Addr{Ip: "10.0.0.1", Port: int32(30000)},
1119+
Raddr: &model.Addr{Ip: "10.0.0.2", Port: int32(123)},
1120+
Direction: model.ConnectionDirection_outgoing,
1121+
},
1122+
sourceExcludes: map[string][]string{
1123+
"10.0.0.0/8": {"udp *"},
1124+
},
1125+
shouldSchedule: true,
1126+
connectionExcluded: false,
1127+
},
10281128
}
10291129

10301130
for _, tt := range tests {
10311131
t.Run(tt.name, func(t *testing.T) {
10321132
agentConfigs := map[string]any{
10331133
"network_path.connections_monitoring.enabled": true,
10341134
"network_path.collector.disable_intra_vpc_collection": true,
1135+
"network_path.collector.source_excludes": tt.sourceExcludes,
1136+
"network_path.collector.dest_excludes": tt.destExcludes,
10351137
}
10361138
stats := &teststatsd.Client{}
10371139
_, npCollector := newTestNpCollector(t, agentConfigs, stats)
@@ -1043,6 +1145,11 @@ func Test_npCollectorImpl_shouldScheduleNetworkPathForConn(t *testing.T) {
10431145
} else {
10441146
require.NotContains(t, stats.CountCalls, subnetSkippedStat)
10451147
}
1148+
if tt.connectionExcluded {
1149+
require.Contains(t, stats.CountCalls, cidrExcludedStat)
1150+
} else {
1151+
require.NotContains(t, stats.CountCalls, cidrExcludedStat)
1152+
}
10461153
})
10471154
}
10481155
}

pkg/config/setup/config.go

+2
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,8 @@ func InitConfig(config pkgconfigmodel.Setup) {
492492
config.BindEnvAndSetDefault("network_path.collector.reverse_dns_enrichment.enabled", true)
493493
config.BindEnvAndSetDefault("network_path.collector.reverse_dns_enrichment.timeout", 5000)
494494
config.BindEnvAndSetDefault("network_path.collector.disable_intra_vpc_collection", false)
495+
config.BindEnvAndSetDefault("network_path.collector.source_excludes", map[string][]string{})
496+
config.BindEnvAndSetDefault("network_path.collector.dest_excludes", map[string][]string{})
495497
bindEnvAndSetLogsConfigKeys(config, "network_path.forwarder.")
496498

497499
// HA Agent

pkg/network/encoding/marshal/format.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func FormatConnection(builder *model.ConnectionBuilder, conn network.ConnectionS
7676
w.SetContainerId(containerID)
7777
})
7878
builder.SetFamily(uint64(formatFamily(conn.Family)))
79-
builder.SetType(uint64(formatType(conn.Type)))
79+
builder.SetType(uint64(FormatType(conn.Type)))
8080
builder.SetIsLocalPortEphemeral(uint64(formatEphemeralType(conn.SPortIsEphemeral)))
8181
builder.SetLastBytesSent(conn.Last.SentBytes)
8282
builder.SetLastBytesReceived(conn.Last.RecvBytes)
@@ -191,7 +191,8 @@ func formatFamily(f network.ConnectionFamily) model.ConnectionFamily {
191191
}
192192
}
193193

194-
func formatType(f network.ConnectionType) model.ConnectionType {
194+
// FormatType converts a network.ConnectionType to a protobuf model.ConnectionType
195+
func FormatType(f network.ConnectionType) model.ConnectionType {
195196
switch f {
196197
case network.TCP:
197198
return model.ConnectionType_tcp

0 commit comments

Comments
 (0)