Skip to content

Commit

Permalink
codec: fix Avro schema cache data race (#1407) (#1411)
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
ti-srebot authored Feb 5, 2021
1 parent a89d6ca commit 6eb2159
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 4 deletions.
21 changes: 19 additions & 2 deletions cdc/sink/codec/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net/url"
"regexp"
"strings"
"sync"
"time"

"github.com/cenkalti/backoff"
Expand All @@ -40,10 +41,12 @@ import (
// in cache the local cache entry is missing.
type AvroSchemaManager struct {
registryURL string
cache map[string]*schemaCacheEntry
subjectSuffix string

credential *security.Credential

cacheRWLock sync.RWMutex
cache map[string]*schemaCacheEntry
}

type schemaCacheEntry struct {
Expand Down Expand Up @@ -177,15 +180,19 @@ func (m *AvroSchemaManager) Register(ctx context.Context, tableName model.TableN
// TiSchemaId is only used to trigger fetching from the Registry server.
// Calling this method with a tiSchemaID other than that used last time will invariably trigger a RESTful request to the Registry.
// Returns (codec, registry schema ID, error)
// NOT USED for now, reserved for future use.
func (m *AvroSchemaManager) Lookup(ctx context.Context, tableName model.TableName, tiSchemaID uint64) (*goavro.Codec, int, error) {
key := m.tableNameToSchemaSubject(tableName)
m.cacheRWLock.RLock()
if entry, exists := m.cache[key]; exists && entry.tiSchemaID == tiSchemaID {
log.Info("Avro schema lookup cache hit",
zap.String("key", key),
zap.Uint64("tiSchemaID", tiSchemaID),
zap.Int("registryID", entry.registryID))
m.cacheRWLock.RUnlock()
return entry.codec, entry.registryID, nil
}
m.cacheRWLock.RUnlock()

log.Info("Avro schema lookup cache miss",
zap.String("key", key),
Expand Down Expand Up @@ -245,7 +252,10 @@ func (m *AvroSchemaManager) Lookup(ctx context.Context, tableName model.TableNam
}
cacheEntry.registryID = jsonResp.RegistryID
cacheEntry.tiSchemaID = tiSchemaID

m.cacheRWLock.Lock()
m.cache[m.tableNameToSchemaSubject(tableName)] = cacheEntry
m.cacheRWLock.Unlock()

log.Info("Avro schema lookup successful with cache miss",
zap.Uint64("tiSchemaID", cacheEntry.tiSchemaID),
Expand All @@ -263,13 +273,16 @@ type SchemaGenerator func() (string, error)
// If not, a new schema is generated, registered and cached.
func (m *AvroSchemaManager) GetCachedOrRegister(ctx context.Context, tableName model.TableName, tiSchemaID uint64, schemaGen SchemaGenerator) (*goavro.Codec, int, error) {
key := m.tableNameToSchemaSubject(tableName)
m.cacheRWLock.RLock()
if entry, exists := m.cache[key]; exists && entry.tiSchemaID == tiSchemaID {
log.Info("Avro schema GetCachedOrRegister cache hit",
log.Debug("Avro schema GetCachedOrRegister cache hit",
zap.String("key", key),
zap.Uint64("tiSchemaID", tiSchemaID),
zap.Int("registryID", entry.registryID))
m.cacheRWLock.RUnlock()
return entry.codec, entry.registryID, nil
}
m.cacheRWLock.RUnlock()

log.Info("Avro schema lookup cache miss",
zap.String("key", key),
Expand All @@ -296,7 +309,10 @@ func (m *AvroSchemaManager) GetCachedOrRegister(ctx context.Context, tableName m
cacheEntry.codec = codec
cacheEntry.registryID = id
cacheEntry.tiSchemaID = tiSchemaID

m.cacheRWLock.Lock()
m.cache[m.tableNameToSchemaSubject(tableName)] = cacheEntry
m.cacheRWLock.Unlock()

log.Info("Avro schema GetCachedOrRegister successful with cache miss",
zap.Uint64("tiSchemaID", cacheEntry.tiSchemaID),
Expand All @@ -308,6 +324,7 @@ func (m *AvroSchemaManager) GetCachedOrRegister(ctx context.Context, tableName m

// ClearRegistry clears the Registry subject for the given table. Should be idempotent.
// Exported for testing.
// NOT USED for now, reserved for future use.
func (m *AvroSchemaManager) ClearRegistry(ctx context.Context, tableName model.TableName) error {
uri := m.registryURL + "/subjects/" + url.QueryEscape(m.tableNameToSchemaSubject(tableName))
req, err := http.NewRequestWithContext(ctx, "DELETE", uri, nil)
Expand Down
93 changes: 91 additions & 2 deletions cdc/sink/codec/schema_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ type AvroSchemaRegistrySuite struct {
var _ = check.Suite(&AvroSchemaRegistrySuite{})

type mockRegistry struct {
mu sync.Mutex
subjects map[string]*mockRegistrySchema
newID int
mu sync.Mutex
}

type mockRegistrySchema struct {
Expand Down Expand Up @@ -96,8 +96,8 @@ func startHTTPInterceptForTestingRegistry(c *check.C) {
respData.ID = registry.newID
}
}
registry.mu.Unlock()
registry.newID++
registry.mu.Unlock()
return httpmock.NewJsonResponse(200, &respData)
})

Expand Down Expand Up @@ -293,6 +293,95 @@ func (s *AvroSchemaRegistrySuite) TestSchemaRegistryIdempotent(c *check.C) {
}
}

func (s *AvroSchemaRegistrySuite) TestGetCachedOrRegister(c *check.C) {
defer testleak.AfterTest(c)()
table := model.TableName{
Schema: "testdb",
Table: "test1",
}

manager, err := NewAvroSchemaManager(getTestingContext(), &security.Credential{}, "http://127.0.0.1:8081", "-value")
c.Assert(err, check.IsNil)

called := 0
schemaGen := func() (string, error) {
called++
return `{
"type": "record",
"name": "test",
"fields":
[
{
"type": "string",
"name": "field1"
},
{
"type": [
"null",
"string"
],
"default": null,
"name": "field2"
}
]
}`, nil
}

codec, id, err := manager.GetCachedOrRegister(getTestingContext(), table, 1, schemaGen)
c.Assert(err, check.IsNil)
c.Assert(id, check.Greater, 0)
c.Assert(codec, check.NotNil)
c.Assert(called, check.Equals, 1)

codec1, _, err := manager.GetCachedOrRegister(getTestingContext(), table, 1, schemaGen)
c.Assert(err, check.IsNil)
c.Assert(codec1, check.Equals, codec)
c.Assert(called, check.Equals, 1)

codec2, _, err := manager.GetCachedOrRegister(getTestingContext(), table, 2, schemaGen)
c.Assert(err, check.IsNil)
c.Assert(codec2, check.Not(check.Equals), codec)
c.Assert(called, check.Equals, 2)

schemaGen = func() (string, error) {
return `{
"type": "record",
"name": "test",
"fields":
[
{
"type": "string",
"name": "field1"
},
{
"type": [
"null",
"string"
],
"default": null,
"name": "field2"
}
]
}`, nil
}

var wg sync.WaitGroup
for i := 0; i < 20; i++ {
finalI := i
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 100; j++ {
codec, id, err := manager.GetCachedOrRegister(getTestingContext(), table, uint64(finalI), schemaGen)
c.Assert(err, check.IsNil)
c.Assert(id, check.Greater, 0)
c.Assert(codec, check.NotNil)
}
}()
}
wg.Wait()
}

func (s *AvroSchemaRegistrySuite) TestHTTPRetry(c *check.C) {
defer testleak.AfterTest(c)()
payload := []byte("test")
Expand Down

0 comments on commit 6eb2159

Please sign in to comment.