Skip to content

Commit

Permalink
Merge branch 'fix/delete-stale-items' into 'master'
Browse files Browse the repository at this point in the history
[DEX-2706] fix: fix deleting stale items from MySQL and PostgreSQL

See merge request nstmrt/rubygems/outbox!105
  • Loading branch information
Меркушин Михаил Сергеевич committed Oct 22, 2024
2 parents d3ff6de + f21521b commit 91ab1ae
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 21 deletions.
2 changes: 2 additions & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ tests:
DATABASE_URL: postgres://postgres:secret@postgres:5432
REDIS_URL: redis://redis:6379/0
before_script:
- gem sources --remove https://rubygems.org/
- gem sources --add https://nexus.sbmt.io/repository/rubygems/
- gem update --system 3.4.22
- bin/setup
script:
Expand Down
14 changes: 10 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,33 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Fixed

## [6.10.3] - 2024-10-22

### Fixed

- fix deleting stale items from MySQL and PostgreSQL

## [6.10.2] - 2024-09-30

### Fixed

- change `DEFAULT_PARTITION_STRATEGY` to string

## [6.10.1] - 2024-09-23

### Fixed

- log OTEL `trace_id`

## [6.10.0] - 2024-09-19

### Changed

- Renamed `backtrace` log tag to `stacktrace`
- Renamed `backtrace` log tag to `stacktrace`

### Fixed

- Fixed handling of errors if database is not available
- Fixed handling of errors if database is not available

## [6.9.0] - 2024-09-13

Expand Down
2 changes: 1 addition & 1 deletion Gemfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

source "https://rubygems.org"
source ENV.fetch("NEXUS_PUBLIC_SOURCE_URL", "https://rubygems.org")

gemspec
100 changes: 90 additions & 10 deletions app/jobs/sbmt/outbox/base_delete_stale_items_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ module Outbox
class BaseDeleteStaleItemsJob < Outbox.active_job_base_class
MIN_RETENTION_PERIOD = 1.day
LOCK_TTL = 10_800_000
BATCH_SIZE = 1000
SLEEP_TIME = 1
BATCH_SIZE = 1_000
SLEEP_TIME = 0.5

class << self
def enqueue
Expand All @@ -25,7 +25,7 @@ def item_classes
delegate :config, :logger, to: "Sbmt::Outbox"
delegate :box_type, :box_name, to: :item_class

attr_accessor :item_class
attr_accessor :item_class, :lock_timer

def perform(item_class_name)
self.item_class = item_class_name.constantize
Expand All @@ -36,6 +36,7 @@ def perform(item_class_name)
Redis.new(config.redis)
end

self.lock_timer = Cutoff.new(LOCK_TTL / 1000)
lock_manager = Redlock::Client.new([client], retry_count: 0)

lock_manager.lock("#{self.class.name}:#{item_class_name}:lock", LOCK_TTL) do |locked|
Expand All @@ -51,6 +52,8 @@ def perform(item_class_name)
logger.log_info("Failed to acquire lock #{self.class.name}:#{item_class_name}")
end
end
rescue Cutoff::CutoffExceededError
logger.log_info("Lock timeout while processing #{item_class_name}")
end

private
Expand All @@ -64,17 +67,94 @@ def validate_retention!(duration)
def delete_stale_items(waterline)
logger.log_info("Start deleting #{box_type} items for #{box_name} older than #{waterline}")

case database_type
when :postgresql
postgres_delete_in_batches(waterline)
when :mysql
mysql_delete_in_batches(waterline)
else
raise "Unsupported database type"
end

logger.log_info("Successfully deleted #{box_type} items for #{box_name} older than #{waterline}")
end

# Deletes stale items from PostgreSQL database in batches
#
# This method efficiently deletes items older than the given waterline
# using a subquery approach to avoid locking large portions of the table.
#
#
# Example SQL generated for deletion:
# DELETE FROM "items"
# WHERE "items"."id" IN (
# SELECT "items"."id"
# FROM "items"
# WHERE "items"."created_at" < '2023-05-01 00:00:00'
# LIMIT 1000
# )
def postgres_delete_in_batches(waterline)
table = item_class.arel_table
condition = table[:created_at].lt(waterline)
subquery = table
.project(table[:id])
.where(condition)
.take(BATCH_SIZE)

delete_statement = Arel::Nodes::DeleteStatement.new
delete_statement.relation = table
delete_statement.wheres = [table[:id].in(subquery)]

loop do
ids = Outbox.database_switcher.use_slave do
item_class.where("created_at < ?", waterline).limit(BATCH_SIZE).ids
end
break if ids.empty?
deleted_count = item_class
.connection
.execute(delete_statement.to_sql)
.cmd_tuples

logger.log_info("Deleted #{deleted_count} #{box_type} items for #{box_name} items")
break if deleted_count == 0
lock_timer.checkpoint!
sleep(SLEEP_TIME)
end
end

item_class.where(id: ids).delete_all
sleep SLEEP_TIME
# Deletes stale items from MySQL database in batches
#
# This method efficiently deletes items older than the given waterline
# using MySQL's built-in LIMIT clause for DELETE statements.
#
# The main difference from the PostgreSQL method is that MySQL allows
# direct use of LIMIT in DELETE statements, simplifying the query.
# This approach doesn't require a subquery, making it more straightforward.
#
# Example SQL generated for deletion:
# DELETE FROM `items`
# WHERE `items`.`created_at` < '2023-05-01 00:00:00'
# LIMIT 1000
def mysql_delete_in_batches(waterline)
loop do
deleted_count = item_class
.where("created_at < ?", waterline)
.limit(BATCH_SIZE)
.delete_all

logger.log_info("Deleted #{deleted_count} #{box_type} items for #{box_name} items")
break if deleted_count == 0
lock_timer.checkpoint!
sleep(SLEEP_TIME)
end
end

logger.log_info("Successfully deleted #{box_type} items for #{box_name} older than #{waterline}")
def database_type
adapter_name = item_class.connection.adapter_name.downcase
case adapter_name
when "postgresql"
:postgresql
when "mysql2"
:mysql
else
:unknown
end
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/sbmt/outbox/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module Sbmt
module Outbox
VERSION = "6.10.2"
VERSION = "6.10.3"
end
end
7 changes: 3 additions & 4 deletions sbmt-outbox.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@ Gem::Specification.new do |s|
s.add_development_dependency "rspec-rails"
s.add_development_dependency "rspec_junit_formatter"
s.add_development_dependency "rubocop"
s.add_development_dependency "rubocop-rails"
s.add_development_dependency "rubocop-rspec"
s.add_development_dependency "rubocop-performance"
s.add_development_dependency "standard", ">= 1.7"
s.add_development_dependency "rubocop-rails", ">= 2.5"
s.add_development_dependency "rubocop-rspec", ">= 2.11"
s.add_development_dependency "standard", ">= 1.12"
s.add_development_dependency "schked", ">= 0.3", "< 2"
s.add_development_dependency "zeitwerk"
s.add_development_dependency "sentry-rails", "> 5.2.0"
Expand Down
7 changes: 6 additions & 1 deletion spec/jobs/sbmt/outbox/base_delete_stale_items_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,13 @@ def item_classes
end

let!(:item) { create(:outbox_item, created_at: created_at) }
let!(:item_2) { create(:outbox_item, created_at: created_at) }
let(:created_at) { 1.month.ago }

before do
stub_const("Sbmt::Outbox::BaseDeleteStaleItemsJob::BATCH_SIZE", 1)
end

describe ".enqueue" do
it "enqueue all item classes" do
expect { job_class.enqueue }.to have_enqueued_job(job_class).with("OutboxItem")
Expand All @@ -22,7 +27,7 @@ def item_classes

it "deletes item" do
expect { job_class.perform_now("OutboxItem") }
.to change(OutboxItem, :count).by(-1)
.to change(OutboxItem, :count).by(-2)
end

context "when item is too young" do
Expand Down

0 comments on commit 91ab1ae

Please sign in to comment.