Skip to content

Commit

Permalink
feat: add sharding support to the collection
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosgz committed Aug 5, 2024
1 parent 6646e96 commit 30e7f79
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 12 deletions.
43 changes: 35 additions & 8 deletions lib/esse/active_record/collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ class Collection
class_attribute :batch_contexts
self.batch_contexts = {}

# Connects to a database or role (ex writing, reading, or another custom role) for the collection query
# @param [Symbol] role The role to connect to
# @param [Symbol] shard The shard to connect to
class_attribute :connect_with
self.connect_with = nil

class << self
def inspect
return super unless self < Esse::ActiveRecord::Collection
Expand All @@ -40,6 +46,7 @@ def inherited(subclass)

subclass.scopes = scopes.dup
subclass.batch_contexts = batch_contexts.dup
subclass.connect_with = connect_with&.dup
end

def scope(name, proc = nil, override: false, &block)
Expand All @@ -57,6 +64,10 @@ def batch_context(name, proc = nil, override: false, &block)

batch_contexts[name.to_sym] = proc
end

def connected_to(**kwargs)
self.connect_with = kwargs
end
end

attr_reader :start, :finish, :batch_size, :params
Expand All @@ -74,23 +85,29 @@ def initialize(start: nil, finish: nil, batch_size: nil, **params)
end

def each
dataset.find_in_batches(**batch_options) do |rows|
kwargs = params.dup
self.class.batch_contexts.each do |name, proc|
kwargs[name] = proc.call(rows, **params)
with_connection do
dataset.find_in_batches(**batch_options) do |rows|
kwargs = params.dup
self.class.batch_contexts.each do |name, proc|
kwargs[name] = proc.call(rows, **params)
end
yield(rows, **kwargs)
end
yield(rows, **kwargs)
end
end

def each_batch_ids
dataset.select(:id).except(:includes, :preload, :eager_load).find_in_batches(**batch_options) do |rows|
yield(rows.map(&:id))
with_connection do
dataset.select(:id).except(:includes, :preload, :eager_load).find_in_batches(**batch_options) do |rows|
yield(rows.map(&:id))
end
end
end

def count
dataset.except(:includes, :preload, :eager_load, :group, :order, :limit, :offset).count
with_connection do
dataset.except(:includes, :preload, :eager_load, :group, :order, :limit, :offset).count
end
end
alias_method :size, :count

Expand Down Expand Up @@ -127,6 +144,16 @@ def inspect

protected

def with_connection
if self.class.connect_with&.any?
::ActiveRecord::Base.connected_to(**self.class.connect_with) do
yield
end
else
yield
end
end

def batch_options
{
batch_size: batch_size
Expand Down
29 changes: 29 additions & 0 deletions spec/esse/active_record/collection_connected_to_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
require 'spec_helper'

RSpec.describe Esse::ActiveRecord::Collection, '.connected_to' do
let(:collection_class) { Class.new(described_class) }

describe '.connected_to' do
it 'sets the connect_with' do
collection_class.connected_to(role: :reading, shard: :default)
expect(collection_class.connect_with).to eq(role: :reading, shard: :default)
end
end

describe '#each using custom connection', sharding: true do
let(:collection_class) do
klass = Class.new(described_class)
klass.base_scope = -> { State }
klass.connected_to(role: :reading, shard: :replica)
klass
end

it 'uses the custom connection' do
expect(ActiveRecord::Base).to receive(:connected_to).with(role: :reading, shard: :replica).and_call_original
il_state = State.create!(name: 'Illinois', abbr_name: 'IL')

instance = collection_class.new
expect { |b| instance.each(&b) }.to yield_successive_args([[il_state]])
end
end
end
1 change: 1 addition & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require 'esse/rspec'

require 'support/config_helpers'
require 'support/sharding_hook'
require 'support/webmock'
require 'support/models'
require 'pry'
Expand Down
29 changes: 25 additions & 4 deletions spec/support/models.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
if ENV['VERBOSE']
ActiveRecord::Base.logger = Logger.new($stdout)
end
ActiveRecord::Base.establish_connection(adapter: 'sqlite3', database: ':memory:')

ACTIVE_RECORD_DEFAULT_ENV = ActiveRecord::ConnectionHandling::DEFAULT_ENV.call.to_sym

ActiveRecord::Base.configurations = {
ACTIVE_RECORD_DEFAULT_ENV.to_s => {
'adapter' => 'sqlite3',
'database' => ':memory:',
},
'replica' => {
'adapter' => 'sqlite3',
'database' => ':memory:',
},
}
ActiveRecord::Base.establish_connection(ACTIVE_RECORD_DEFAULT_ENV)

ActiveRecord::Schema.define do
self.verbose = false
Expand All @@ -27,7 +40,15 @@
end
end

class Animal < ActiveRecord::Base
class ApplicationRecord < ActiveRecord::Base
self.abstract_class = true

if ::ActiveRecord.gem_version >= Gem::Version.new('6.0.0')
connects_to database: { writing: ACTIVE_RECORD_DEFAULT_ENV, reading: :replica }
end
end

class Animal < ApplicationRecord
end

class Dog < Animal
Expand All @@ -36,11 +57,11 @@ class Dog < Animal
class Cat < Animal
end

class State < ActiveRecord::Base
class State < ApplicationRecord
has_many :counties
end

class County < ActiveRecord::Base
class County < ApplicationRecord
belongs_to :state
end

Expand Down
21 changes: 21 additions & 0 deletions spec/support/sharding_hook.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# frozen_string_literal: true

module ShardingHook
extend ActiveSupport::Concern

included do
around do |example|
if example.metadata[:sharding] &&
!ActiveRecord::Base.respond_to?(:connected_to)
skip 'ActiveRecord::Base.connected_to is not available in this version of Rails'
return
end

example.run
end
end
end

RSpec.configure do |config|
config.include ShardingHook
end

0 comments on commit 30e7f79

Please sign in to comment.