Skip to content

Commit

Permalink
Fix a problem with overwriting messages with the same content (issue u…
Browse files Browse the repository at this point in the history
…mputun#107 )

- change primary key for message table
- add "deleted" field for message table to avoid retrying the processing
- add migration for message table
- add DeleteMessage function to mark processed and deleted messages
- change Messages function to return all messages by msg
fix and add unit tests
  • Loading branch information
coperius committed Aug 6, 2024
1 parent 68dcc02 commit 47ac478
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 59 deletions.
102 changes: 61 additions & 41 deletions app/events/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,62 +76,82 @@ func (a *admin) MsgHandler(update tbapi.Update) error {

// it would be nice to ban this user right away, but we don't have forwarded user ID here due to tg privacy limitation.
// it is empty in update.Message. to ban this user, we need to get the match on the message from the locator and ban from there.
info, ok := a.locator.Message(msgTxt)
infos, ok := a.locator.Messages(msgTxt)
if !ok {
return fmt.Errorf("not found %q in locator", shrink(msgTxt, 50))
}

log.Printf("[DEBUG] locator found message %s", info)
log.Printf("[DEBUG] locator found messages %s", infos)
errs := new(multierror.Error)

// check if the forwarded message will ban a super-user and ignore it
if info.UserName != "" && a.superUsers.IsSuper(info.UserName) {
return fmt.Errorf("forwarded message is about super-user %s (%d), ignored", info.UserName, info.UserID)
}
users := make(map[int64]string)

// remove user from the approved list and from storage
if err := a.bot.RemoveApprovedUser(info.UserID); err != nil {
errs = multierror.Append(errs, fmt.Errorf("failed to remove user %d from approved list: %w", info.UserID, err))
}
for _, info := range infos {
// check if the forwarded message will ban a super-user and ignore it
if info.UserName != "" && a.superUsers.IsSuper(info.UserName) {
errs = multierror.Append(errs, fmt.Errorf("forwarded message is about super-user %s (%d), ignored", info.UserName, info.UserID))
continue
}

// make a message with spam info and send to admin chat
spamInfo := []string{}
resp := a.bot.OnMessage(bot.Message{Text: update.Message.Text, From: bot.User{ID: info.UserID}})
spamInfoText := "**can't get spam info**"
for _, check := range resp.CheckResults {
spamInfo = append(spamInfo, "- "+escapeMarkDownV1Text(check.String()))
}
if len(spamInfo) > 0 {
spamInfoText = strings.Join(spamInfo, "\n")
}
newMsgText := fmt.Sprintf("**original detection results for %q (%d)**\n\n%s\n\n\n*the user banned and message deleted*",
escapeMarkDownV1Text(info.UserName), info.UserID, spamInfoText)
if err := send(tbapi.NewMessage(a.adminChatID, newMsgText), a.tbAPI); err != nil {
errs = multierror.Append(errs, fmt.Errorf("failed to send spap detection results to admin chat: %w", err))
}
users[info.UserID] = info.UserName

if a.dry {
return errs.ErrorOrNil()
}
if a.dry {
continue
}

// update spam samples
if err := a.bot.UpdateSpam(msgTxt); err != nil {
return fmt.Errorf("failed to update spam for %q: %w", msgTxt, err)
// delete message
if _, err := a.tbAPI.Request(tbapi.DeleteMessageConfig{ChatID: a.primChatID, MessageID: info.MsgID}); err != nil {
errs = multierror.Append(errs, fmt.Errorf("failed to delete message %d: %w", info.MsgID, err))
} else {
log.Printf("[INFO] message %d deleted", info.MsgID)
// delete from locator
if err := a.locator.DeleteMessage(a.primChatID, info.MsgID); err != nil {
errs = multierror.Append(errs, fmt.Errorf("failed to delete message %d from locator: %w", info.MsgID, err))
}
}
}

// delete message
if _, err := a.tbAPI.Request(tbapi.DeleteMessageConfig{ChatID: a.primChatID, MessageID: info.MsgID}); err != nil {
errs = multierror.Append(errs, fmt.Errorf("failed to delete message %d: %w", info.MsgID, err))
} else {
log.Printf("[INFO] message %d deleted", info.MsgID)
if !a.dry {
// update spam samples
if err := a.bot.UpdateSpam(msgTxt); err != nil {
errs = multierror.Append(errs, fmt.Errorf("failed to update spam for %q: %w", msgTxt, err))
}
}

// ban user
banReq := banRequest{duration: bot.PermanentBanDuration, userID: info.UserID, chatID: a.primChatID,
tbAPI: a.tbAPI, dry: a.dry, training: a.trainingMode, userName: update.Message.ForwardSenderName}
for userID, userName := range users {
// remove user from the approved list and from storage
if err := a.bot.RemoveApprovedUser(userID); err != nil {
errs = multierror.Append(errs, fmt.Errorf("failed to remove user %d from approved list: %w", userID, err))
}

// make a message with spam info and send to admin chat
spamInfo := []string{}
resp := a.bot.OnMessage(bot.Message{Text: update.Message.Text, From: bot.User{ID: userID}})
spamInfoText := "**can't get spam info**"
for _, check := range resp.CheckResults {
spamInfo = append(spamInfo, "- "+escapeMarkDownV1Text(check.String()))
}
if len(spamInfo) > 0 {
spamInfoText = strings.Join(spamInfo, "\n")
}
newMsgText := fmt.Sprintf("**original detection results for %q (%d)**\n\n%s\n\n\n*the user banned and message deleted*",
escapeMarkDownV1Text(userName), userID, spamInfoText)
if err := send(tbapi.NewMessage(a.adminChatID, newMsgText), a.tbAPI); err != nil {
errs = multierror.Append(errs, fmt.Errorf("failed to send spap detection results to admin chat: %w", err))
}

if a.dry {
continue
}

// ban user
banReq := banRequest{duration: bot.PermanentBanDuration, userID: userID, chatID: a.primChatID,
tbAPI: a.tbAPI, dry: a.dry, training: a.trainingMode, userName: update.Message.ForwardSenderName}

if err := banUserOrChannel(banReq); err != nil {
errs = multierror.Append(errs, fmt.Errorf("failed to ban user %d: %w", userID, err))
}

if err := banUserOrChannel(banReq); err != nil {
errs = multierror.Append(errs, fmt.Errorf("failed to ban user %d: %w", info.UserID, err))
}

return errs.ErrorOrNil()
Expand Down
3 changes: 2 additions & 1 deletion app/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ func (f SpamLoggerFunc) Save(msg *bot.Message, response *bot.Response) {
type Locator interface {
AddMessage(msg string, chatID, userID int64, userName string, msgID int) error
AddSpam(userID int64, checks []spamcheck.Response) error
Message(msg string) (storage.MsgMeta, bool)
Messages(msg string) ([]storage.MsgMeta, bool)
Spam(userID int64) (storage.SpamData, bool)
MsgHash(msg string) string
UserNameByID(userID int64) string
DeleteMessage(chatID int64, msgID int) error
}

// Bot is an interface for bot events.
Expand Down
110 changes: 99 additions & 11 deletions app/storage/locator.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package storage

import (
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"log"
"strings"
"time"

"github.com/jmoiron/sqlx"
Expand Down Expand Up @@ -39,13 +41,21 @@ type SpamData struct {

// NewLocator creates new Locator. ttl defines how long to keep messages in db, minSize defines the minimum number of messages to keep
func NewLocator(ttl time.Duration, minSize int, db *sqlx.DB) (*Locator, error) {
_, err := db.Exec(`CREATE TABLE IF NOT EXISTS messages (
hash TEXT PRIMARY KEY,
// migrate message table if necessary
err := migrateMessageTable(db)
if err != nil {
return nil, err
}

_, err = db.Exec(`CREATE TABLE IF NOT EXISTS messages (
hash TEXT,
time TIMESTAMP,
chat_id INTEGER,
user_id INTEGER,
user_name TEXT,
msg_id INTEGER
msg_id INTEGER,
deleted INT default 0,
PRIMARY KEY (chat_id, msg_id)
)`)
if err != nil {
return nil, fmt.Errorf("failed to create messages table: %w", err)
Expand Down Expand Up @@ -127,17 +137,26 @@ func (l *Locator) AddSpam(userID int64, checks []spamcheck.Response) error {
return l.cleanupSpam()
}

// Message returns message MsgMeta for given msg
// this allows to match messages from admin chat (only text available) to the original message
func (l *Locator) Message(msg string) (MsgMeta, bool) {
var meta MsgMeta
// Messages returns messages MsgMeta for given msg
// this allows to match messages from admin chat (only text available) to the original messages
func (l *Locator) Messages(msg string) ([]MsgMeta, bool) {
var meta []MsgMeta
hash := l.MsgHash(msg)
err := l.db.Get(&meta, `SELECT time, chat_id, user_id, user_name, msg_id FROM messages WHERE hash = ?`, hash)
err := l.db.Select(&meta, `SELECT time, chat_id, user_id, user_name, msg_id FROM messages WHERE hash = ? and deleted == 0`, hash)
if err != nil {
log.Printf("[DEBUG] failed to find message by hash %q: %v", hash, err)
return MsgMeta{}, false
log.Printf("[DEBUG] failed to find messages by hash %q: %v", hash, err)
return []MsgMeta{}, false
}
return meta, true
return meta, len(meta) > 0
}

// DeleteMessage removes message by chatID and msgID
func (l *Locator) DeleteMessage(chatID int64, msgID int) error {
_, err := l.db.Exec(`UPDATE messages SET deleted = 1 WHERE chat_id = ? and msg_id = ?`, chatID, msgID)
if err != nil {
return fmt.Errorf("failed to set message as deleted: %w", err)
}
return nil
}

// UserNameByID returns username by user id. Returns empty string if not found
Expand All @@ -153,6 +172,11 @@ func (l *Locator) UserNameByID(userID int64) string {

// UserIDByName returns user id by username. Returns 0 if not found
func (l *Locator) UserIDByName(userName string) int64 {
// many users have empty usernames, so we need to ignore them
if strings.TrimSpace(userName) == "" {
log.Printf("[DEBUG] failed to find user id by empty name")
return 0
}
var userID int64
err := l.db.Get(&userID, `SELECT user_id FROM messages WHERE user_name = ? LIMIT 1`, userName)
if err != nil {
Expand Down Expand Up @@ -212,3 +236,67 @@ func (m MsgMeta) String() string {
func (s SpamData) String() string {
return fmt.Sprintf("{time: %s, checks: %+v}", s.Time.Format(time.RFC3339), s.Checks)
}

func migrateMessageTable(db *sqlx.DB) error {
// check if table not exists or hash isn't a primary key
var pk int
err := db.Get(&pk, `SELECT COUNT(*) FROM pragma_table_info('messages') WHERE name = 'hash' and pk != 0`)
if err != nil {
return fmt.Errorf("failed to check for hash primary key: %w", err)
}
if pk == 0 {
// no need to migrate
return nil
}

ctx := context.Background()
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to start transaction: %w", err)
}
defer tx.Rollback()

// drop old tmp table
_, err = tx.Exec(`DROP TABLE IF EXISTS messages_dg_tmp`)
if err != nil {
return fmt.Errorf("failed to drop messages_dg_tmp table: %w", err)
}

// create new table
_, err = tx.Exec(`CREATE TABLE messages_dg_tmp (
hash TEXT,
time TIMESTAMP,
chat_id INTEGER,
user_id INTEGER,
user_name TEXT,
msg_id INTEGER,
deleted INT default 0,
PRIMARY KEY (chat_id, msg_id)
)`)
if err != nil {
return fmt.Errorf("failed to create messages_dg_tmp table: %w", err)
}
// copy data
_, err = tx.Exec(`INSERT INTO messages_dg_tmp (hash, time, chat_id, user_id, user_name, msg_id)
SELECT hash, time, chat_id, user_id, user_name, msg_id FROM messages`)
if err != nil {
return fmt.Errorf("failed to copy data to messages_dg_tmp: %w", err)
}
// drop old table
_, err = tx.Exec(`DROP TABLE messages`)
if err != nil {
return fmt.Errorf("failed to drop messages table: %w", err)
}
// rename new table
_, err = tx.Exec(`ALTER TABLE messages_dg_tmp RENAME TO messages`)
if err != nil {
return fmt.Errorf("failed to rename messages_dg_tmp to messages: %w", err)
}

err = tx.Commit()
if err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}

return nil
}
Loading

0 comments on commit 47ac478

Please sign in to comment.