Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): add emergency contacts database functions #1081

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 55 additions & 28 deletions database/redis/contact.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@ import (
// GetContact returns contact data by given id, if no value, return database.ErrNil error.
func (connector *DbConnector) GetContact(id string) (moira.ContactData, error) {
c := *connector.client
ctx := connector.context

var contact moira.ContactData

result := c.Get(connector.context, contactKey(id))
result := c.Get(ctx, contactKey(id))
if errors.Is(result.Err(), redis.Nil) {
return contact, database.ErrNil
}

contact, err := reply.Contact(result)
if err != nil {
return contact, err
return contact, fmt.Errorf("failed to deserialize contact '%s': %w", id, err)
}

contact.ID = id

return contact, nil
}

Expand All @@ -37,26 +41,30 @@ func (connector *DbConnector) GetContacts(contactIDs []string) ([]*moira.Contact
results := make([]*redis.StringCmd, 0, len(contactIDs))

c := *connector.client
ctx := connector.context

pipe := c.TxPipeline()
for _, id := range contactIDs {
result := pipe.Get(connector.context, contactKey(id))
result := pipe.Get(ctx, contactKey(id))
kissken marked this conversation as resolved.
Show resolved Hide resolved
results = append(results, result)
}
_, err := pipe.Exec(connector.context)

_, err := pipe.Exec(ctx)
if err != nil && !errors.Is(err, redis.Nil) {
return nil, err
return nil, fmt.Errorf("failed to get contacts by id: %w", err)
kissken marked this conversation as resolved.
Show resolved Hide resolved
}

contacts, err := reply.Contacts(results)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to reply contacts: %w", err)
}

for i := range contacts {
if contacts[i] != nil {
contacts[i].ID = contactIDs[i]
}
}

return contacts, nil
}

Expand All @@ -79,6 +87,7 @@ func getContactsKeysOnRedisNode(ctx context.Context, client redis.UniversalClien
break
}
}

return keys, nil
}

Expand All @@ -91,6 +100,7 @@ func (connector *DbConnector) GetAllContacts() ([]*moira.ContactData, error) {
if err != nil {
return err
}

keys = append(keys, keysResult...)
return nil
})
Expand All @@ -102,81 +112,98 @@ func (connector *DbConnector) GetAllContacts() ([]*moira.ContactData, error) {
for _, key := range keys {
contactIDs = append(contactIDs, strings.TrimPrefix(key, contactKey("")))
}

return connector.GetContacts(contactIDs)
}

// SaveContact writes contact data and updates user contacts.
func (connector *DbConnector) SaveContact(contact *moira.ContactData) error {
existing, getContactErr := connector.GetContact(contact.ID)
if getContactErr != nil && !errors.Is(getContactErr, database.ErrNil) {
return getContactErr
return fmt.Errorf("failed to get contact '%s': %w", contact.ID, getContactErr)
}
contactString, err := json.Marshal(contact)

contactStr, err := json.Marshal(contact)
if err != nil {
return err
return fmt.Errorf("failed to marshal contact '%s': %w", contact.ID, err)
}

c := *connector.client
ctx := connector.context

pipe := c.TxPipeline()
pipe.Set(connector.context, contactKey(contact.ID), contactString, redis.KeepTTL)
pipe.Set(ctx, contactKey(contact.ID), contactStr, redis.KeepTTL)
if !errors.Is(getContactErr, database.ErrNil) && contact.User != existing.User {
pipe.SRem(connector.context, userContactsKey(existing.User), contact.ID)
pipe.SRem(ctx, userContactsKey(existing.User), contact.ID)
}

if !errors.Is(getContactErr, database.ErrNil) && contact.Team != existing.Team {
pipe.SRem(connector.context, teamContactsKey(existing.Team), contact.ID)
pipe.SRem(ctx, teamContactsKey(existing.Team), contact.ID)
}

if contact.User != "" {
pipe.SAdd(connector.context, userContactsKey(contact.User), contact.ID)
pipe.SAdd(ctx, userContactsKey(contact.User), contact.ID)
}

if contact.Team != "" {
pipe.SAdd(connector.context, teamContactsKey(contact.Team), contact.ID)
pipe.SAdd(ctx, teamContactsKey(contact.Team), contact.ID)
}
_, err = pipe.Exec(connector.context)

_, err = pipe.Exec(ctx)
if err != nil {
return fmt.Errorf("failed to EXEC: %s", err.Error())
return fmt.Errorf("failed to save contact '%s': %w", contact.ID, err)
}

return nil
}

// RemoveContact deletes contact data and contactID from user contacts.
func (connector *DbConnector) RemoveContact(contactID string) error {
existing, err := connector.GetContact(contactID)
if err != nil && !errors.Is(err, database.ErrNil) {
return err
return fmt.Errorf("failed to get contact '%s': %w", contactID, err)
}

c := *connector.client
ctx := connector.context

pipe := c.TxPipeline()
pipe.Del(connector.context, contactKey(contactID))
pipe.SRem(connector.context, userContactsKey(existing.User), contactID)
pipe.SRem(connector.context, teamContactsKey(existing.Team), contactID)
_, err = pipe.Exec(connector.context)
pipe.Del(ctx, contactKey(contactID))
pipe.SRem(ctx, userContactsKey(existing.User), contactID)
pipe.SRem(ctx, teamContactsKey(existing.Team), contactID)

_, err = pipe.Exec(ctx)
if err != nil {
return fmt.Errorf("failed to EXEC: %s", err.Error())
return fmt.Errorf("failed to remove contact '%s': %w", contactID, err)
}

return nil
}

// GetUserContactIDs returns contacts ids by given login.
func (connector *DbConnector) GetUserContactIDs(login string) ([]string, error) {
c := *connector.client
ctx := connector.context

contacts, err := c.SMembers(connector.context, userContactsKey(login)).Result()
contactIDs, err := c.SMembers(ctx, userContactsKey(login)).Result()
if err != nil {
return nil, fmt.Errorf("failed to get contacts for user login %s: %s", login, err.Error())
return nil, fmt.Errorf("failed to get contact IDs for user login '%s': %w", login, err)
}
return contacts, nil

return contactIDs, nil
}

// GetTeamContactIDs returns contacts ids by given team.
func (connector *DbConnector) GetTeamContactIDs(login string) ([]string, error) {
c := *connector.client
contacts, err := c.SMembers(connector.context, teamContactsKey(login)).Result()
ctx := connector.context

contactIDs, err := c.SMembers(ctx, teamContactsKey(login)).Result()
if err != nil {
return nil, fmt.Errorf("failed to get contacts for team login %s: %s", login, err.Error())
return nil, fmt.Errorf("failed to get contact IDs for team login '%s': %w", login, err)
}
return contacts, nil

return contactIDs, nil
}

func contactKey(id string) string {
Expand Down
187 changes: 187 additions & 0 deletions database/redis/emergency_contact.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package redis

import (
"context"
"errors"
"fmt"
"strings"

"github.com/go-redis/redis/v8"
"github.com/moira-alert/moira/database"
"github.com/moira-alert/moira/database/redis/reply"
"github.com/moira-alert/moira/datatypes"
)

// GetEmergencyContact method to retrieve an emergency contact from the database.
func (connector *DbConnector) GetEmergencyContact(contactID string) (datatypes.EmergencyContact, error) {
c := *connector.client
ctx := connector.context

cmd := c.Get(ctx, emergencyContactsKey(contactID))

if errors.Is(cmd.Err(), redis.Nil) {
return datatypes.EmergencyContact{}, database.ErrNil
}

return reply.EmergencyContact(cmd)
}

// GetEmergencyContacts method to retrieve all emergency contacts from the database.
func (connector *DbConnector) GetEmergencyContacts() ([]*datatypes.EmergencyContact, error) {
emergencyContactIDs, err := connector.getEmergencyContactIDs()
if err != nil {
return nil, fmt.Errorf("failed to get emergency contact IDs: %w", err)
}

return connector.GetEmergencyContactsByIDs(emergencyContactIDs)
}

// GetEmergencyContactsByIDs method to retrieve all emergency contacts from the database by their identifiers.
func (connector *DbConnector) GetEmergencyContactsByIDs(contactIDs []string) ([]*datatypes.EmergencyContact, error) {
c := *connector.client
ctx := connector.context

pipe := c.TxPipeline()
for _, contactID := range contactIDs {
pipe.Get(ctx, emergencyContactsKey(contactID))
}

cmds, err := pipe.Exec(ctx)
if err != nil && !errors.Is(err, redis.Nil) {
return nil, fmt.Errorf("failed to get emergency contacts by IDs: %w", err)
}

emergencyContactCmds := make([]*redis.StringCmd, 0, len(cmds))
for _, cmd := range cmds {
emergencyContactCmd, ok := cmd.(*redis.StringCmd)
if !ok {
return nil, fmt.Errorf("failed to convert cmd to emergency contact cmd")
}

emergencyContactCmds = append(emergencyContactCmds, emergencyContactCmd)
}

return reply.EmergencyContacts(emergencyContactCmds)
}

func (connector *DbConnector) getEmergencyContactIDs() ([]string, error) {
c := *connector.client
ctx := connector.context

var emergencyContactIDs []string

iter := c.Scan(ctx, 0, emergencyContactsKey("*"), 0).Iterator()
for iter.Next(ctx) {
key := iter.Val()

emergencyContactID := strings.TrimPrefix(key, emergencyContactsKey(""))
emergencyContactIDs = append(emergencyContactIDs, emergencyContactID)
}

if err := iter.Err(); err != nil {
return nil, fmt.Errorf("failed to scan emergency contacts: %w", err)
}

return emergencyContactIDs, nil
}

// GetHeartbeatTypeContactIDs a method for obtaining contact IDs by specific emergency type.
func (connector *DbConnector) GetHeartbeatTypeContactIDs(heartbeatType datatypes.HeartbeatType) ([]string, error) {
c := *connector.client
ctx := connector.context

contactIDs, err := c.SMembers(ctx, heartbeatTypeContactsKey(heartbeatType)).Result()
if err != nil {
return nil, fmt.Errorf("failed to get heartbeat type contact IDs '%s': %w", heartbeatType, err)
}

return contactIDs, nil
}

func (connector *DbConnector) saveEmergencyContacts(emergencyContacts []datatypes.EmergencyContact) error {
c := *connector.client
ctx := connector.context

pipe := c.TxPipeline()
for _, emergencyContact := range emergencyContacts {
if err := addSaveEmergencyContactToPipe(ctx, pipe, emergencyContact); err != nil {
return fmt.Errorf("failed to add save emergency contact '%s' to pipe: %w", emergencyContact.ContactID, err)
}
}

if _, err := pipe.Exec(ctx); err != nil {
return fmt.Errorf("failed to save emergency contacts: %w", err)
}

return nil
}

// SaveEmergencyContact a method for saving emergency contact.
func (connector *DbConnector) SaveEmergencyContact(emergencyContact datatypes.EmergencyContact) error {
c := *connector.client
ctx := connector.context

pipe := c.TxPipeline()

if err := addSaveEmergencyContactToPipe(ctx, pipe, emergencyContact); err != nil {
return err
}

if _, err := pipe.Exec(ctx); err != nil {
return fmt.Errorf("failed to save emergency contact '%s': %w", emergencyContact.ContactID, err)
}

return nil
}

// RemoveEmergencyContact method for removing emergency contact.
func (connector *DbConnector) RemoveEmergencyContact(contactID string) error {
c := *connector.client
ctx := connector.context

emergencyContact, err := connector.GetEmergencyContact(contactID)
if err != nil {
return fmt.Errorf("failed to get emergency contact '%s': %w", contactID, err)
}

pipe := c.TxPipeline()

addRemoveEmergencyContactToPipe(ctx, pipe, emergencyContact)

if _, err := pipe.Exec(ctx); err != nil {
return fmt.Errorf("failed to remove emergency contact '%s': %w", contactID, err)
}

return nil
}

func addSaveEmergencyContactToPipe(ctx context.Context, pipe redis.Pipeliner, emergencyContact datatypes.EmergencyContact) error {
emergencyContactBytes, err := reply.GetEmergencyContactBytes(emergencyContact)
if err != nil {
return fmt.Errorf("failed to get emergency contact '%s' bytes: %w", emergencyContact.ContactID, err)
}

pipe.Set(ctx, emergencyContactsKey(emergencyContact.ContactID), emergencyContactBytes, redis.KeepTTL)

for _, heartbeatType := range emergencyContact.HeartbeatTypes {
pipe.SAdd(ctx, heartbeatTypeContactsKey(heartbeatType), emergencyContact.ContactID)
}

return nil
}

func addRemoveEmergencyContactToPipe(ctx context.Context, pipe redis.Pipeliner, emergencyContact datatypes.EmergencyContact) {
pipe.Del(ctx, emergencyContactsKey(emergencyContact.ContactID))

for _, heartbeatType := range emergencyContact.HeartbeatTypes {
pipe.SRem(ctx, heartbeatTypeContactsKey(heartbeatType), emergencyContact.ContactID)
}
}

func emergencyContactsKey(contactID string) string {
return "moira-emergency-contacts:" + contactID
}

func heartbeatTypeContactsKey(heartbeatType datatypes.HeartbeatType) string {
return "moira-heartbeat-type-contacts:" + string(heartbeatType)
}
Loading
Loading