From 24b6f2e4a97496cf7d228b01deb20f4f3dfd7272 Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Wed, 21 Aug 2024 20:52:48 -0700 Subject: [PATCH] client: unexport the watcher type and update NewUpdater The watcher semantics are still useful for plumbing, but it's hard to use correctly, so unexport it. Now that watcher is unexported, move its tests from a separate package into an internal package test. It is now no longer necessary to have the "watcher" method, so remove it. An Updater now no longer accepts a watcher (which is unexported), but instead takes a *Store and the desired secret name directly. Rework the constructor to allow reporting an error. Add a StaticUpdater constructor to make an Updater that vends a static value, analogous to StaticSecret. --- client/setec/fields.go | 4 +- client/setec/fields_test.go | 24 ++--- client/setec/internal_test.go | 78 +++++++++++++++ client/setec/store.go | 177 ++++++++++++++-------------------- client/setec/store_test.go | 158 +++++++++++------------------- client/setec/watcher.go | 63 ++++++++++++ 6 files changed, 282 insertions(+), 222 deletions(-) create mode 100644 client/setec/internal_test.go create mode 100644 client/setec/watcher.go diff --git a/client/setec/fields.go b/client/setec/fields.go index e4edf8c..37d5632 100644 --- a/client/setec/fields.go +++ b/client/setec/fields.go @@ -166,7 +166,7 @@ func (f fieldInfo) apply(ctx context.Context, s *Store, fullName string) error { } if f.vtype == watcherType { - w, err := s.LookupWatcher(ctx, fullName) + w, err := s.lookupWatcher(ctx, fullName) if err != nil { return err } @@ -198,7 +198,7 @@ var ( bytesType = reflect.TypeOf([]byte(nil)) secretType = reflect.TypeOf(Secret(nil)) stringType = reflect.TypeOf(string("")) - watcherType = reflect.TypeOf(Watcher{}) + watcherType = reflect.TypeOf(watcher{}) binaryType = reflect.TypeOf((*encoding.BinaryUnmarshaler)(nil)).Elem() ) diff --git a/client/setec/fields_test.go b/client/setec/fields_test.go index 0a2b50c..7c931cd 100644 --- a/client/setec/fields_test.go +++ b/client/setec/fields_test.go @@ -66,15 +66,14 @@ func TestFields(t *testing.T) { // Verify that if we parse secrets with a store enabled, we correctly plumb // the values from the service into the tagged fields. type testTarget struct { - A string `setec:"apple"` - B binValue `setec:"bin-value"` - BP *binValue `setec:"bin-value-ptr"` - P []byte `setec:"pear"` - L setec.Secret `setec:"plum"` - C setec.Watcher `setec:"cherry"` - X string // untagged, not affected - J testObj `setec:"object-value,json"` - Z int `setec:"int-value,json"` + A string `setec:"apple"` + B binValue `setec:"bin-value"` + BP *binValue `setec:"bin-value-ptr"` + P []byte `setec:"pear"` + L setec.Secret `setec:"plum"` + X string // untagged, not affected + J testObj `setec:"object-value,json"` + Z int `setec:"int-value,json"` } var obj testTarget @@ -98,7 +97,7 @@ func TestFields(t *testing.T) { // Check that secret names respect prefixing. if diff := cmp.Diff(f.Secrets(), []string{ "test/apple", "test/bin-value", "test/bin-value-ptr", - "test/pear", "test/plum", "test/cherry", + "test/pear", "test/plum", "test/object-value", "test/int-value", }); diff != "" { t.Errorf("Prefixed secret names (-got, +want):\n%s", diff) @@ -110,7 +109,7 @@ func TestFields(t *testing.T) { } // Don't try to compare complex plumbing; see below. - opt := cmpopts.IgnoreFields(testTarget{}, "L", "C") + opt := cmpopts.IgnoreFields(testTarget{}, "L") if diff := cmp.Diff(obj, testTarget{ A: secrets["apple"], B: binValue{"kumquat", "quince"}, @@ -126,9 +125,6 @@ func TestFields(t *testing.T) { if got, want := string(obj.L.Get()), secrets["plum"]; got != want { t.Errorf("Secret field: got %q, want %q", got, want) } - if got, want := string(obj.C.Get()), secrets["cherry"]; got != want { - t.Errorf("Secret field: got %q, want %q", got, want) - } } func TestParseErrors(t *testing.T) { diff --git a/client/setec/internal_test.go b/client/setec/internal_test.go new file mode 100644 index 0000000..130ccd5 --- /dev/null +++ b/client/setec/internal_test.go @@ -0,0 +1,78 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package setec + +import ( + "context" + "net/http/httptest" + "testing" + "time" + + "github.com/tailscale/setec/setectest" +) + +func TestWatcher(t *testing.T) { + d := setectest.NewDB(t, nil) + d.MustPut(d.Superuser, "green", "eggs and ham") // active + v2 := d.MustPut(d.Superuser, "green", "grow the rushes oh") + + ts := setectest.NewServer(t, d, nil) + hs := httptest.NewServer(ts.Mux) + defer hs.Close() + + ctx := context.Background() + cli := Client{Server: hs.URL, DoHTTP: hs.Client().Do} + + pollTicker := setectest.NewFakeTicker() + st, err := NewStore(ctx, StoreConfig{ + Client: cli, + Secrets: []string{"green"}, + PollTicker: pollTicker, + }) + if err != nil { + t.Fatalf("NewStore: unexpected error: %v", err) + } + defer st.Close() + + // With lookups disabled, an unknown watcher reports an error. + if w, err := st.lookupWatcher(ctx, "nonesuch"); err == nil { + t.Errorf("Lookup: got %v, want error", w) + } + + // Observe the initial value of the secret. + w, err := st.lookupWatcher(ctx, "green") + if err != nil { + t.Errorf("Initial value: unexpected error: %v", err) + } else if got, want := string(w.Get()), "eggs and ham"; got != want { + t.Errorf("Initial value: got %q, want %q", got, want) + } + + // The secret gets updated... + if err := cli.Activate(ctx, "green", v2); err != nil { + t.Fatalf("Activate to %v: unexpected error: %v", v2, err) + } + + // The next poll occurs... + pollTicker.Poll() + + // The watcher should get notified in a timely manner. + select { + case <-w.Ready(): + t.Logf("✓ A new version of the secret is available") + case <-time.After(5 * time.Second): + t.Fatal("Timed out waiting for a watcher update") + } + + if got, want := string(w.Get()), "grow the rushes oh"; got != want { + t.Errorf("Updated value: got %q, want %q", got, want) + } + + // With no updates, the watchers should not appear ready. + select { + case <-w.Ready(): + t.Error("Watcher is unexpectedly ready after no update") + case <-time.After(100 * time.Millisecond): + // OK + } +} diff --git a/client/setec/store.go b/client/setec/store.go index f5a90bb..c7e5bee 100644 --- a/client/setec/store.go +++ b/client/setec/store.go @@ -10,6 +10,7 @@ import ( "errors" "expvar" "fmt" + "io" "log" "math/rand" "os" @@ -43,7 +44,7 @@ type Store struct { m map[string]*cachedSecret // :: secret name → active value f map[string]Secret // :: secret name → fetch function - w map[string][]Watcher // :: secret name → watchers + w map[string][]watcher // :: secret name → watchers } ctx context.Context // governs the polling task and lookups @@ -92,8 +93,7 @@ type StoreConfig struct { // AllowLookup instructs the store to allow the caller to look up secrets // not known to the store at the time of construction. If false, only // secrets pre-declared in the Secrets and Structs slices can be fetched, - // and the Lookup and LookupWatcher methods will report an error for all - // un-listed secrets. + // and the Lookup method will report an error for all un-listed secrets. AllowLookup bool // Cache, if non-nil, is a cache that persists secrets locally. @@ -198,7 +198,7 @@ func NewStore(ctx context.Context, cfg StoreConfig) (*Store, error) { // Initialize the active versions maps. s.active.m = make(map[string]*cachedSecret) s.active.f = make(map[string]Secret) - s.active.w = make(map[string][]Watcher) + s.active.w = make(map[string][]watcher) // If we have a cache, try to load data from there first. data, err := s.loadCache() @@ -335,7 +335,7 @@ func (s *Store) secretLocked(name string) Secret { // Since the caller is actively requesting the value of the secret, // update the last-accessed timestamp. This also applies to accesses - // via a Watcher, since the watchers wrap the underlying Secret. + // via a watcher, since the watchers wrap the underlying Secret. s.countSecretFetch.Add(1) cs := s.active.m[name] cs.LastAccess = s.timeNow().Unix() @@ -414,57 +414,6 @@ func (s *Store) lookupSecretInternal(ctx context.Context, name string) (Secret, } } -// Watcher returns a watcher for the named secret. -// -// If s has lookups enabled, Watcher returns a zero Watcher for an unknown name. -// Otherwise, Watcher panics for an unknown name. -func (s *Store) Watcher(name string) Watcher { - s.active.Lock() - defer s.active.Unlock() - secret := s.secretLocked(name) - if secret == nil { - if s.allowLookup { - return Watcher{} - } - panic(fmt.Sprintf("secret %q not found in StoreConfig with lookup disabled", name)) - } - w := Watcher{ready: make(chan struct{}, 1), secret: secret} - s.active.w[name] = append(s.active.w[name], w) - return w -} - -// LookupWatcher returns a watcher for the named secret. If name is already -// known by s, this is equivalent to Watcher; otherwise s attempts to fetch the -// latest active version of the secret from the service and either adds it to -// the collection or reports an error. -// LookupWatcher does not automatically retry in case of errors. -func (s *Store) LookupWatcher(ctx context.Context, name string) (Watcher, error) { - s.active.Lock() - defer s.active.Unlock() - var secret Secret - if _, ok := s.active.m[name]; ok { - secret = s.secretLocked(name) // OK, we already have it - } else if !s.allowLookup { - return Watcher{}, errors.New("lookup is not enabled") - } else { - // We must release the lock to fetch from the server; do this in a - // closure to ensure lock discipline is restored in case of a panic. - got, err := func() (Secret, error) { - s.active.Unlock() // NOTE: This order is intended. - defer s.active.Lock() - return s.lookupSecretInternal(ctx, name) - }() - if err != nil { - return Watcher{}, err - } - secret = got - } - - w := Watcher{ready: make(chan struct{}, 1), secret: secret} - s.active.w[name] = append(s.active.w[name], w) - return w, nil -} - // A Secret is a function that fetches the current active value of a secret. // The caller should not cache the value returned; the function does not block // and will always report a valid (if possibly stale) result. @@ -513,6 +462,17 @@ func StaticFile(path string) (Secret, error) { return func() []byte { return bs }, nil } +func panicOnUpdate[T any]([]byte) (T, error) { panic("unexpected value update") } + +// StaticUpdater returns an [Updater] that vends the specified fixed value. +// The value reported by the updater never changes. +func StaticUpdater[T any](fixedValue T) *Updater[T] { + return &Updater[T]{ + newValue: panicOnUpdate[T], + value: fixedValue, + } +} + // StaticTextFile returns a secret that vends the contents of path, which are // treated as text with leading and trailing whitespace trimmed. // @@ -767,78 +727,87 @@ type stdTicker struct{ *time.Ticker } func (s stdTicker) Chan() <-chan time.Time { return s.Ticker.C } func (stdTicker) Done() {} -// A Watcher monitors the current active value of a secret, and allows the user -// to be notified when the value of the secret changes. -type Watcher struct { - ready chan struct{} - secret Secret -} - -// Get returns the current active value of the secret. -// A zero-valued Watcher returns nil. -func (w Watcher) Get() []byte { return w.secret.Get() } - -// Ready returns a channel that delivers a value when the current active -// version of the secret has changed. The channel is never closed. +// NewUpdater creates a new Updater that maintains a value based on the +// specified secret in s. The newValue function constructs a value of type T +// from the bytes of a secret. // -// The ready channel is a level trigger. The Watcher does not queue multiple -// notifications, and if the caller does not drain the channel subsequent -// notifications will be dropped. -func (w Watcher) Ready() <-chan struct{} { return w.ready } - -func (w Watcher) notify() { - select { - case w.ready <- struct{}{}: - default: - } -} - -// IsValid reports whether w is valid, meaning that it has a secret available. -func (w Watcher) IsValid() bool { return w.secret != nil } - -// NewUpdater creates a new Updater that tracks updates to a value based on new -// secret versions delivered to w. The newValue function returns a new value -// of the type based on its argument, a secret value. +// The initial value is constructed using newValue on the current secret +// version when NewUpdater is called. If this initial call reports an error, +// NewUpdater returns nil and that error. Otherwise, the Updater begins with +// that value. // -// The initial value is constructed by calling newValue with the current secret -// version in w at the time NewUpdater is called. Calls to the Get method -// update the value as needed when w changes. +// Once constructed, call the Get method to fetch the current value. It is safe +// to call Get concurrently from multiple goroutines. See [Updater.Get] for +// details of how updates are handled. // -// The updater synchronizes calls to Get and newValue, so the callback can -// safely interact with shared state without additional locking. -func NewUpdater[T any](w Watcher, newValue func([]byte) T) *Updater[T] { +// If s has lookups enabled, NewWatcher will attempt to look up name if it is +// not already declared in s. If lookups are not enabled, or of the secret is +// not found, NewUpdater reports an error. It does not retry in case of lookup +// errors. +func NewUpdater[T any](ctx context.Context, s *Store, name string, newValue func([]byte) (T, error)) (*Updater[T], error) { + w, err := s.lookupWatcher(ctx, name) + if err != nil { + return nil, err + } + init, err := newValue(w.Get()) + if err != nil { + return nil, err + } return &Updater[T]{ newValue: newValue, w: w, - value: newValue(w.Get()), - } + value: init, + logf: s.logf, // same place as the underlying store + }, nil } -// An Updater tracks a value whose state depends on a secret, together with a -// watcher for updates to the secret. The caller provides a function to update -// the value when a new version of the secret is delivered, and the Updater -// manages access and updates to the value. +// An Updater tracks a value whose state depends on a secret. It watches for +// updates to the secret, and invokes a caller-provided function to update the +// value when a new version of the secret is delivered. type Updater[T any] struct { - newValue func([]byte) T - w Watcher + newValue func([]byte) (T, error) + w watcher mu sync.Mutex - value T + value T // the current value + err error // if non-nil, the error from the last update attempt + logf logger.Logf } // Get fetches the current value of u, first updating it if the secret has -// changed. It is safe to call Get concurrently from multiple goroutines. +// changed. It is safe to call Get concurrently from multiple goroutines. +// +// If Get receives an error while trying to update u, it returns the previous +// value. Use the Err method to check for an update error. If T implements the +// [io.Closer] interface, Get calls Close on the old value before updating. func (u *Updater[T]) Get() T { u.mu.Lock() defer u.mu.Unlock() select { case <-u.w.Ready(): - u.value = u.newValue(u.w.Get()) + nv, err := u.newValue(u.w.Get()) + if err != nil { + u.logf("WARNING: Error updating value: %v (keeping old value)", err) + } else { + if c, ok := any(u.value).(io.Closer); ok { + c.Close() + } + u.value = nv + } + u.err = err + return u.value default: // no change, use the existing value } return u.value } +// Err reports the error, if any, from the latest update to the value of u. +func (u *Updater[T]) Err() error { + u.mu.Lock() + defer u.mu.Unlock() + return u.err +} + type cachedSecret struct { Secret *api.SecretValue `json:"secret"` LastAccess int64 `json:"lastAccess,string"` diff --git a/client/setec/store_test.go b/client/setec/store_test.go index 198d4bc..bc76a14 100644 --- a/client/setec/store_test.go +++ b/client/setec/store_test.go @@ -7,7 +7,6 @@ import ( "bytes" "context" "errors" - "fmt" "net/http/httptest" "os" "path/filepath" @@ -115,9 +114,6 @@ func TestStore(t *testing.T) { if s, err := st.LookupSecret(ctx, "bravo"); err == nil { t.Errorf("Lookup(bravo): got %q, want error", s.Get()) } - if w, err := st.LookupWatcher(ctx, "bravo"); err == nil { - t.Errorf("Lookup(bravo): got %q, want error", w.Get()) - } }) } @@ -291,71 +287,6 @@ func TestSlowInit(t *testing.T) { checkSecretValue(t, st, "boo", "go for the eyes") } -func TestWatcher(t *testing.T) { - d := setectest.NewDB(t, nil) - d.MustPut(d.Superuser, "green", "eggs and ham") // active - v2 := d.MustPut(d.Superuser, "green", "grow the rushes oh") - - ts := setectest.NewServer(t, d, nil) - hs := httptest.NewServer(ts.Mux) - defer hs.Close() - - ctx := context.Background() - cli := setec.Client{Server: hs.URL, DoHTTP: hs.Client().Do} - - pollTicker := setectest.NewFakeTicker() - st, err := setec.NewStore(ctx, setec.StoreConfig{ - Client: cli, - Secrets: []string{"green"}, - PollTicker: pollTicker, - }) - if err != nil { - t.Fatalf("NewStore: unexpected error: %v", err) - } - defer st.Close() - - // With lookups disabled, an unknown watcher panics. - mtest.MustPanicf(t, func() { st.Watcher("nonesuch") }, - "Expected panic for an unknown watcher") - - // Observe the initial value of the secret. - w := st.Watcher("green") - if got, want := string(w.Get()), "eggs and ham"; got != want { - t.Errorf("Initial value: got %q, want %q", got, want) - } - if !w.IsValid() { - t.Error("Watcher should be valid, but is not") - } - - // The secret gets updated... - if err := cli.Activate(ctx, "green", v2); err != nil { - t.Fatalf("Activate to %v: unexpected error: %v", v2, err) - } - - // The next poll occurs... - pollTicker.Poll() - - // The watcher should get notified in a timely manner. - select { - case <-w.Ready(): - t.Logf("✓ A new version of the secret is available") - case <-time.After(5 * time.Second): - t.Fatal("Timed out waiting for a watcher update") - } - - if got, want := string(w.Get()), "grow the rushes oh"; got != want { - t.Errorf("Updated value: got %q, want %q", got, want) - } - - // With no updates, the watchers should not appear ready. - select { - case <-w.Ready(): - t.Error("Watcher is unexpectedly ready after no update") - case <-time.After(100 * time.Millisecond): - // OK - } -} - func TestUpdater(t *testing.T) { d := setectest.NewDB(t, nil) d.MustPut(d.Superuser, "label", "malarkey") // active @@ -380,30 +311,57 @@ func TestUpdater(t *testing.T) { defer st.Close() // Set up an updater that tracks a string against the secret named "label". - u := setec.NewUpdater(st.Watcher("label"), func(secret []byte) string { - return fmt.Sprintf("value: %q", secret) - }) - checkValue := func(label, want string) { - if got := u.Get(); got != want { - t.Errorf("%s: got %q, want %q", label, got, want) + t.Run("Dynamic", func(t *testing.T) { + u, err := setec.NewUpdater(ctx, st, "label", func(secret []byte) (*closeable[string], error) { + return &closeable[string]{Value: string(secret)}, nil + }) + if err != nil { + t.Fatalf("NewUpdater: unexpected error: %v", err) + } + checkValue := func(label, want string) { + if got := u.Get().Value; got != want { + t.Errorf("%s: got %q, want %q", label, got, want) + } } - } - checkValue("Initial value", `value: "malarkey"`) + checkValue("Initial value", `malarkey`) + last := u.Get() + if last.Closed { + t.Error("Initial value is closed early") + } - // The secret gets updated... - if err := cli.Activate(ctx, "label", v2); err != nil { - t.Fatalf("Activate to %v: unexpected error: %v", v2, err) - } - pollTicker.Poll() + // The secret gets updated... + if err := cli.Activate(ctx, "label", v2); err != nil { + t.Fatalf("Activate to %v: unexpected error: %v", v2, err) + } + pollTicker.Poll() - // The next get should see the updated value. - checkValue("Updated value", `value: "dog-faced pony soldier"`) + // The next get should see the updated value. + checkValue("Updated value", `dog-faced pony soldier`) - pollTicker.Poll() + // The previous value should have been closed. + if !last.Closed { + t.Errorf("Initial value was not closed: %v", last) + } + + last = u.Get() + pollTicker.Poll() + + // The next get should not see a change. + checkValue("Updated value", `dog-faced pony soldier`) + if last.Closed { + t.Errorf("Update value was closed: %v", last) + } + }) + + t.Run("Static", func(t *testing.T) { + const testValue = "I am the chosen one" + u := setec.StaticUpdater(testValue) - // The next get should not see a change. - checkValue("Updated value", `value: "dog-faced pony soldier"`) + if got := u.Get(); got != testValue { + t.Errorf("Get: got %q, want %q", got, testValue) + } + }) } func TestLookup(t *testing.T) { @@ -453,14 +411,7 @@ func TestLookup(t *testing.T) { t.Errorf("Lookup(green): got %q, want %q", got, want) } - // Case 4: We can look up a watcher for "blue". - if w, err := st.LookupWatcher(ctx, "blue"); err != nil { - t.Errorf("Lookup(blue): unexpected error: %v", err) - } else if got, want := string(w.Get()), "dolphins"; got != want { - t.Errorf("Lookup(blue): got %q, want %q", got, want) - } - - // Case 5: We still can't lookup a non-existent secret. + // Case 4: We still can't lookup a non-existent secret. if s, err := st.LookupSecret(ctx, "orange"); err == nil { t.Errorf("Lookup(orange): got %q, want error", s.Get()) } else { @@ -471,9 +422,6 @@ func TestLookup(t *testing.T) { if f := st.Secret("nonesuch"); f != nil { t.Errorf("Lookup(nonesuch): got %v, want nil", f) } - if w := st.Watcher("nonesuch"); w.IsValid() { - t.Errorf("Watcher(nonesuch): got %v, want invalid", w) - } } func TestCacheExpiry(t *testing.T) { @@ -625,7 +573,6 @@ func TestNewFileCache(t *testing.T) { func TestNilSecret(t *testing.T) { var s setec.Secret - var w setec.Watcher if got := s.Get(); got != nil { t.Errorf("(nil).Get: got %v, want nil", got) @@ -633,12 +580,19 @@ func TestNilSecret(t *testing.T) { if got := s.GetString(); got != "" { t.Errorf(`(nil).GetString: got %q, want ""`, got) } - if got := w.Get(); got != nil { - t.Errorf("(zero).Get: got %v, want nil", got) - } } type badCache struct{} func (badCache) Write([]byte) error { return errors.New("write failed") } func (badCache) Read() ([]byte, error) { return nil, errors.New("read failed") } + +type closeable[T any] struct { + Value T + Closed bool +} + +func (c *closeable[T]) Close() error { + c.Closed = true + return nil +} diff --git a/client/setec/watcher.go b/client/setec/watcher.go new file mode 100644 index 0000000..0b2600f --- /dev/null +++ b/client/setec/watcher.go @@ -0,0 +1,63 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package setec + +import ( + "context" + "errors" +) + +// A watcher monitors the current active value of a secret, and allows the user +// to be notified when the value of the secret changes. +type watcher struct { + ready chan struct{} + Secret +} + +// Ready returns a channel that delivers a value when the current active +// version of the secret has changed. The channel is never closed. +// +// The ready channel is a level trigger. The watcher does not queue multiple +// notifications, and if the caller does not drain the channel subsequent +// notifications will be dropped. +func (w watcher) Ready() <-chan struct{} { return w.ready } + +func (w watcher) notify() { + select { + case w.ready <- struct{}{}: + default: + } +} + +// lookupWatcher returns a watcher for the named secret. If name is already +// known by s, this is equivalent to watcher; otherwise s attempts to fetch the +// latest active version of the secret from the service and either adds it to +// the collection or reports an error. +// lookupWatcher does not automatically retry in case of errors. +func (s *Store) lookupWatcher(ctx context.Context, name string) (watcher, error) { + s.active.Lock() + defer s.active.Unlock() + var secret Secret + if _, ok := s.active.m[name]; ok { + secret = s.secretLocked(name) // OK, we already have it + } else if !s.allowLookup { + return watcher{}, errors.New("lookup is not enabled") + } else { + // We must release the lock to fetch from the server; do this in a + // closure to ensure lock discipline is restored in case of a panic. + got, err := func() (Secret, error) { + s.active.Unlock() // NOTE: This order is intended. + defer s.active.Lock() + return s.lookupSecretInternal(ctx, name) + }() + if err != nil { + return watcher{}, err + } + secret = got + } + + w := watcher{ready: make(chan struct{}, 1), Secret: secret} + s.active.w[name] = append(s.active.w[name], w) + return w, nil +}