Skip to content

Commit

Permalink
fix: adjust mutex handling around ddwaf_context (#52)
Browse files Browse the repository at this point in the history
- Removes a leftover mutex on `Handle` that no longer served any purpose
- Acquiring the mutex on `Context` prior to calling
`ddwaf_context_destroy` to avoid concurrent calls to `ddwaf_run`, as
these are not thread-safe.
  • Loading branch information
RomainMuller authored Dec 7, 2023
1 parent 10615b9 commit af37d14
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 61 deletions.
54 changes: 27 additions & 27 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,18 @@ import (
// when calling it multiple times to run its rules every time new addresses
// become available. Each request must have its own Context.
type Context struct {
// Instance of the WAF
handle *Handle
cContext wafContext
// cgoRefs is used to retain go references to WafObjects until the context is destroyed.
// As per libddwaf documentation, WAF Objects must be alive during all the context lifetime
cgoRefs cgoRefPool
// Mutex protecting the use of cContext which is not thread-safe and cgoRefs.
mutex sync.Mutex
handle *Handle // Instance of the WAF

cgoRefs cgoRefPool // Used to retain go data referenced by WAF Objects the context holds
cContext wafContext // The C ddwaf_context pointer

// Stats
// Cumulated internal WAF run time - in nanoseconds - for this context.
totalRuntimeNs atomic.Uint64
// Cumulated overall run time - in nanoseconds - for this context.
totalOverallRuntimeNs atomic.Uint64
// Cumulated timeout count for this context.
timeoutCount atomic.Uint64
totalRuntimeNs atomic.Uint64 // Cumulative internal WAF run time - in nanoseconds - for this context.
totalOverallRuntimeNs atomic.Uint64 // Cumulative overall run time - in nanoseconds - for this context.
timeoutCount atomic.Uint64 // Cumulative timeout count for this context.

// Mutex protecting the use of cContext which is not thread-safe and cgoRefs.
mutex sync.Mutex
}

// NewContext returns a new WAF context of to the given WAF handle.
Expand All @@ -41,13 +37,13 @@ type Context struct {
// or the WAF context couldn't be created.
func NewContext(handle *Handle) *Context {
// Handle has been released
if handle.addRefCounter(1) == 0 {
if !handle.retain() {
return nil
}

cContext := wafLib.wafContextInit(handle.cHandle)
if cContext == 0 {
handle.addRefCounter(-1)
handle.release() // We couldn't get a context, so we no longer have an implicit reference to the Handle in it...
return nil
}

Expand Down Expand Up @@ -120,12 +116,9 @@ func (context *Context) Run(addressData RunAddressData, timeout time.Duration) (
return
}

// run executes the ddwaf_run call with the provided data on this context. The caller is responsible for locking the
// context appropriately around this call.
func (context *Context) run(persistentData, ephemeralData *wafObject, timeout time.Duration, cgoRefs *cgoRefPool) (Result, error) {
// RLock the handle to safely get read access to the WAF handle and prevent concurrent changes of it
// such as a rules-data update.
context.handle.mutex.RLock()
defer context.handle.mutex.RUnlock()

result := new(wafResult)
defer wafLib.wafResultFree(result)

Expand Down Expand Up @@ -173,13 +166,20 @@ func unwrapWafResult(ret wafReturnCode, result *wafResult) (res Result, err erro
return res, err
}

// Close calls handle.closeContext which calls ddwaf_context_destroy and maybe also close the handle if it in termination state.
// Close the underlying `ddwaf_context` and releases the associated internal
// data. Also decreases the reference count of the `ddwaf_hadnle` which created
// this context, possibly releasing it completely (if this was the last context
// created from this handle & it was released by its creator).
func (context *Context) Close() {
defer context.handle.closeContext(context)
// Keep the Go pointer references until the end of the context
keepAlive(context.cgoRefs)
// The context is no longer used so we can try releasing the Go pointer references asap by nulling them
context.cgoRefs = cgoRefPool{}
context.mutex.Lock()
defer context.mutex.Unlock()

wafLib.wafContextDestroy(context.cContext)
keepAlive(context.cgoRefs) // Keep the Go pointer references until the end of the context
defer context.handle.release() // Reduce the reference counter of the Handle.

context.cgoRefs = cgoRefPool{} // The data in context.cgoRefs is no longer needed, explicitly release
context.cContext = 0 // Makes it easy to spot use-after-free/double-free issues
}

// TotalRuntime returns the cumulated WAF runtime across various run calls within the same WAF context.
Expand Down
2 changes: 1 addition & 1 deletion encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import (
// reference now or in the future, are stored and referenced in the `cgoRefs` field. The user MUST leverage
// `keepAlive()` with it according to its ddwaf use-case.
type encoder struct {
cgoRefs cgoRefPool
containerMaxSize int
stringMaxSize int
objectMaxDepth int
cgoRefs cgoRefPool
}

type native interface {
Expand Down
70 changes: 42 additions & 28 deletions handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,15 @@ package waf
import (
"errors"
"fmt"
"sync"

"github.com/DataDog/go-libddwaf/v2/internal/noopfree"
"go.uber.org/atomic"
)

// Handle represents an instance of the WAF for a given ruleset.
type Handle struct {
// Instance of the WAF
cHandle wafHandle
// diagnostics holds information about rules initialization
diagnostics Diagnostics

// Lock-less reference counter avoiding blocking calls to the Close() method
// while WAF contexts are still using the WAF handle. Instead, we let the
Expand All @@ -31,12 +30,8 @@ type Handle struct {
// block the request handlers for the time of the security rules update.
refCounter *atomic.Int32

// RWMutex protecting the R/W accesses to the internal rules data (stored
// in the handle).
mutex sync.RWMutex

// diagnostics holds information about rules initialization
diagnostics Diagnostics
// Instance of the WAF
cHandle wafHandle
}

// NewHandle creates and returns a new instance of the WAF with the given security rules and configuration
Expand Down Expand Up @@ -115,7 +110,6 @@ func (handle *Handle) Addresses() []string {
// Update the ruleset of a WAF instance into a new handle on its own
// the previous handle still needs to be closed manually
func (handle *Handle) Update(newRules any) (*Handle, error) {

encoder := newMaxEncoder()
obj, err := encoder.Encode(newRules)
if err != nil {
Expand All @@ -142,36 +136,56 @@ func (handle *Handle) Update(newRules any) (*Handle, error) {
}, nil
}

// closeContext calls ddwaf_context_destroy and eventually ddwaf_destroy on the handle
func (handle *Handle) closeContext(context *Context) {
wafLib.wafContextDestroy(context.cContext)
if handle.addRefCounter(-1) == 0 {
wafLib.wafDestroy(handle.cHandle)
}
}

// Close puts the handle in termination state, when all the contexts are closed the handle will be destroyed
func (handle *Handle) Close() {
if handle.addRefCounter(-1) > 0 {
// There are still Contexts that are not closed
if handle.addRefCounter(-1) != 0 {
// Either the counter is still positive (this Handle is still referenced), or it had previously
// reached 0 and some other call has done the cleanup already.
return
}

wafLib.wafDestroy(handle.cHandle)
handle.diagnostics = Diagnostics{} // Data in diagnostics may no longer be valid (e.g: strings from libddwaf)
handle.cHandle = 0 // Makes it easy to spot use-after-free/double-free issues
}

// retain increments the reference counter of this Handle. Returns true if the
// Handle is still valid, false if it is no longer usable. Calls to retain()
// must be balanced with calls to release() in order to avoid leaking Handles.
func (handle *Handle) retain() bool {
return handle.addRefCounter(1) > 0
}

// addRefCounter add x to Handle.refCounter.
// It relies on a CAS spin-loop implementation in order to avoid changing the
// counter when 0 has been reached.
// release decrements the reference counter of this Handle, possibly causing it
// to be completely closed if no other reference to it exist.
func (handle *Handle) release() {
handle.Close()
}

// addRefCounter adds x to Handle.refCounter. The return valid indicates whether the refCounter reached 0 as part of
// this call or not, which can be used to perform "only-once" activities:
// - result > 0 => the Handle is still usable
// - result == 0 => the handle is no longer usable, ref counter reached 0 as part of this call
// - result == -1 => the handle is no longer usable, ref counter was already 0 previously
func (handle *Handle) addRefCounter(x int32) int32 {
// We use a CAS loop to avoid setting the refCounter to a negative value.
for {
current := handle.refCounter.Load()
if current == 0 {
// The object was released
return 0
if current <= 0 {
// The object had already been released
return -1
}
if swapped := handle.refCounter.CompareAndSwap(current, current+x); swapped {
return current + x

next := current + x
if swapped := handle.refCounter.CompareAndSwap(current, next); swapped {
if next < 0 {
// TODO(romain.marcadier): somehow signal unexpected behavior to the
// caller (panic? error?). We currently clamp to 0 in order to avoid
// causing a customer program crash, but this is the symptom of a bug
// and should be investigated (however this clamping hides the issue).
return 0
}
return next
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions safe.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (
// error is unreliable and the caller must rather stop using the library.
// Examples include safety checks errors.
type PanicError struct {
// The function symbol name that was given to `tryCall()`.
in string
// The recovered panic error while executing the function `in`.
Err error
// The function symbol name that was given to `tryCall()`.
in string
}

func newPanicError(in func() error, err error) *PanicError {
Expand Down
6 changes: 3 additions & 3 deletions waf.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ import (

// Diagnostics stores the information - provided by the WAF - about WAF rules initialization.
type Diagnostics struct {
Version string
Rules *DiagnosticEntry
CustomRules *DiagnosticEntry
Exclusions *DiagnosticEntry
RulesOverrides *DiagnosticEntry
RulesData *DiagnosticEntry
Processors *DiagnosticEntry
Scanners *DiagnosticEntry
Version string
}

// TopLevelErrors returns the list of top-level errors reported by the WAF on any of the Diagnostics
Expand Down Expand Up @@ -56,10 +56,10 @@ func (d *Diagnostics) TopLevelError() error {
// for a specific entry in the WAF ruleset
type DiagnosticEntry struct {
Addresses *DiagnosticAddresses
Errors map[string][]string // Item-level errors (map of error message to entity identifiers or index:#)
Error string // If the entire entry was in error (e.g: invalid format)
Loaded []string // Successfully loaded entity identifiers (or index:#)
Failed []string // Failed entity identifiers (or index:#)
Error string // If the entire entry was in error (e.g: invalid format)
Errors map[string][]string // Item-level errors (map of error message to entity identifiers or index:#)
}

// DiagnosticAddresses stores the information - provided by the WAF - about the known addresses and
Expand Down
61 changes: 61 additions & 0 deletions waf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,67 @@ func TestConcurrency(t *testing.T) {
// The test mustn't crash and ref-counter must be 0
require.Zero(t, waf.refCounter.Load())
})

t.Run("concurrent-context-use-destroy", func(t *testing.T) {
// This test validates that the WAF Context can be used from multiple
// threads, with mixed calls to `ddwaf_run` and `ddwaf_context_destroy`,
// which are not thread-safe.

waf, err := newDefaultHandle(testArachniRule)
require.NoError(t, err)

wafCtx := NewContext(waf)
require.NotNil(t, wafCtx)

var startBarrier, stopBarrier sync.WaitGroup
startBarrier.Add(1)
stopBarrier.Add(nbUsers + 1)

data := map[string]any{
"server.request.headers.no_cookies": map[string][]string{
"user-agent": {"Arachni/test"},
},
}

for n := 0; n < nbUsers; n++ {
n := n
go func() {
startBarrier.Wait()
defer stopBarrier.Done()

// A microsecond sleep gives us some scheduler-backed order of execution randomization
time.Sleep(time.Microsecond)

// Half of these goroutines will try to use wafCtx.Run(...), while the other half will try
// to use wafCtx.Close(). The expected outcome is that exactly one call to wafCtx.Close()
// effectively releases the WAF context, and between 0 and N calls to wafCtx.Run(...) are
// done (those that land after `wafCtx.Close()` happened will be silent no-ops).
if n%2 == 0 {
wafCtx.Run(RunAddressData{Ephemeral: data}, time.Second)
} else {
wafCtx.Close()
}
}()
}

go func() {
startBarrier.Wait()
defer stopBarrier.Done()

// A microsecond sleep gives us some scheduler-backed order of execution randomization
time.Sleep(time.Microsecond)

// We also asynchronously release the WAF handle, which is fine to do as the WAF context is
// still in use, as the WAF handle has a reference counter guarding it's destruction.
waf.Close()
}()

startBarrier.Done()
stopBarrier.Wait()

// Verify the WAF Handle was properly released.
require.Zero(t, waf.refCounter.Load())
})
}

func TestRunError(t *testing.T) {
Expand Down

0 comments on commit af37d14

Please sign in to comment.