Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dump job and write service refactor #860

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 3 additions & 12 deletions app/controllers/oai_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -155,23 +155,14 @@ def normalized_dumps(set, from_date, until_date)
end

# get all dumps in this stream, optionally between two dates; error if none
dumps = filter_dumps(streams, from_date, until_date)
dumps = streams.map do |stream|
stream.current_dumps(from_date: from_date, until_date: until_date)
end
raise OaiConcern::NoRecordsMatch if dumps.empty?

dumps
end

def filter_dumps(streams, from_date, until_date)
# get candidate dumps (the current full dump and its deltas for each stream)
dumps = streams.flat_map(&:current_dumps).sort_by(&:created_at)

# filter candidate dumps (by from date and until date)
dumps = dumps.select { |dump| dump.created_at >= Time.zone.parse(from_date).beginning_of_day } if from_date.present?
dumps = dumps.select { |dump| dump.created_at <= Time.zone.parse(until_date).end_of_day } if until_date.present?

dumps
end

# Wrap the provided Nokogiri::XML::Builder block in an OAI-PMH response
# See http://www.openarchives.org/OAI/openarchivesprotocol.html#XMLResponse
def build_oai_response(xml, params)
Expand Down
96 changes: 17 additions & 79 deletions app/jobs/generate_delta_dump_job.rb
Original file line number Diff line number Diff line change
@@ -1,92 +1,30 @@
# frozen_string_literal: true

##
# Background job to create a delta dump download for a resource (organization)
class GenerateDeltaDumpJob < ApplicationJob
with_job_tracking

# Background job to create delta (changes/deletes) files for an organization
class GenerateDeltaDumpJob < GenerateDumpJob
def self.enqueue_all
Organization.find_each { |org| GenerateDeltaDumpJob.perform_later(org) }
Organization.each { |org| perform_later(org) }
end

# rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity
def perform(organization)
now = Time.zone.now
full_dump = organization.default_stream.current_full_dump
return unless full_dump

from = full_dump.last_delta_dump_at

uploads = organization.default_stream.uploads.active.where(created_at: from...now)

return unless uploads.any?

uploads.where.not(status: 'processed').each do |upload|
ExtractMarcRecordMetadataJob.perform_now(upload)
end

progress.total = uploads.sum(&:marc_records_count)

delta_dump = full_dump.deltas.create(stream_id: full_dump.stream_id)
base_name = "#{organization.slug}-#{Time.zone.today}-delta"
writer = MarcRecordWriterService.new(base_name)
oai_file_counter = 0

begin
NormalizedMarcRecordReader.new(uploads).each_slice(Settings.oai_max_page_size) do |records|
oai_writer = OaiMarcRecordWriterService.new(base_name)
records.each do |record|
if record.status == 'delete'
writer.write_delete(record)
oai_writer.write_delete(record)
else
writer.write_marc_record(record)
oai_writer.write_marc_record(record)
end
end
oai_writer.finalize
delta_dump.public_send(:oai_xml).attach(io: File.open(oai_writer.oai_file),
filename: human_readable_filename(base_name, :oai_xml, oai_file_counter))

oai_file_counter += 1
progress.increment(records.length)
ensure
oai_writer.close
oai_writer.unlink
end

writer.finalize

writer.files.each do |as, file|
delta_dump.public_send(as).attach(io: File.open(file),
filename: human_readable_filename(base_name, as))
end
private

delta_dump.save!
full_dump.update(last_delta_dump_at: now)
ensure
writer.close
writer.unlink
end
def full_dump
@organization.default_stream.current_full_dump
end
# rubocop:enable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity

private
def dump
@dump ||= full_dump.deltas.create(stream_id: full_dump.stream_id)
end

def human_readable_filename(base_name, file_type, counter = nil)
as = case file_type
when :deletes
'deletes.del.txt'
when :marc21
'marc21.mrc.gz'
when :marcxml
'marcxml.xml.gz'
when :oai_xml
"oai-#{format('%010d', counter)}.xml.gz"
else
"#{file_type}.gz"
end
# Only process uploads added to the stream since the last delta dump
def uploads
@organization.default_stream.uploads
.active
.where(created_at: full_dump.last_delta_dump_at...Time.zone.now)
end

"#{base_name}-#{as}"
def base_name
"#{@organization.slug}-#{Time.zone.today}-delta"
end
end
61 changes: 61 additions & 0 deletions app/jobs/generate_dump_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# frozen_string_literal: true

##
# Background job to create downloadable files representing an organization's stream
class GenerateDumpJob < ApplicationJob
with_job_tracking

def perform(organization)
@organization = organization
return unless full_dump && uploads.any?

# ensure all uploads have been processed before starting
uploads.where.not(status: 'processed').each do |upload|
ExtractMarcRecordMetadataJob.perform_now(upload)
end

write_files

dump.save!
full_dump.update(last_delta_dump_at: Time.zone.now)
end

private

# Write all MARC records to tempfiles using the configured writers
# rubocop:disable Metrics/AbcSize
# rubocop:disable Metrics/CyclomaticComplexity
def write_files
progress.total = uploads.sum(&:marc_records_count)

NormalizedMarcRecordReader.new(uploads).each_slice(100) do |records|
records.each { |record| write_record(record) }
progress.increment(records.length)
end

writers.each(&:finalize)
writers.each { |writer| writer.attach_files_to_dump(dump, base_name) }
ensure
writers.each(&:close)
writers.each(&:unlink)
end
# rubocop:enable Metrics/AbcSize
# rubocop:enable Metrics/CyclomaticComplexity

# Write a single MARC record/delete using each writer
def write_record(record)
if record.status == 'delete'
writers.each { |writer| writer.write_delete(record) }
else
writers.each { |writer| writer.write_marc_record(record) }
end
end

# Services that support #write_marc_record and #write_delete
def writers
@writers ||= [
MarcRecordWriterService.new(base_name),
OaiMarcRecordWriterService.new(base_name)
]
end
end
101 changes: 27 additions & 74 deletions app/jobs/generate_full_dump_job.rb
Original file line number Diff line number Diff line change
@@ -1,93 +1,46 @@
# frozen_string_literal: true

##
# Background job to create a full dump download for a resource (organization)
class GenerateFullDumpJob < ApplicationJob
with_job_tracking
# Background job to create full stream files for an organization
class GenerateFullDumpJob < GenerateDumpJob
after_perform do |job|
GenerateDeltaDumpJob.perform_later(*job.arguments)
end

# Only process organizations with changes since last full dump
def self.enqueue_all
Organization.find_each do |org|
organizations = Organization.select do |org|
full_dump = org.default_stream.normalized_dumps.last
next if full_dump && org.default_stream.uploads.where(updated_at: full_dump.last_full_dump_at...Time.zone.now).none?

GenerateFullDumpJob.perform_later(org)
end
end

# rubocop:disable Metrics/AbcSize, Metrics/MethodLength, Metrics/CyclomaticComplexity
def perform(organization)
now = Time.zone.now
uploads = Upload.active.where(stream: organization.default_stream)

uploads.where.not(status: 'processed').each do |upload|
ExtractMarcRecordMetadataJob.perform_now(upload)
end

progress.total = uploads.sum(&:marc_records_count)

full_dump = organization.default_stream.normalized_dumps.build(last_full_dump_at: now, last_delta_dump_at: now)

base_name = "#{organization.slug}-#{Time.zone.today}-full"
writer = MarcRecordWriterService.new(base_name)
oai_file_counter = 0

begin
NormalizedMarcRecordReader.new(uploads).each_slice(Settings.oai_max_page_size) do |records|
oai_writer = OaiMarcRecordWriterService.new(base_name)
records.each do |record|
# In a full dump, we can omit the deletes
next if record.status == 'delete'

writer.write_marc_record(record)
oai_writer.write_marc_record(record)
end

if oai_writer.bytes_written?
oai_writer.finalize
full_dump.public_send(:oai_xml).attach(io: File.open(oai_writer.oai_file),
filename: human_readable_filename(
base_name, :oai_xml, oai_file_counter
))
end
organizations.each { |org| perform_later(org) }
end

oai_file_counter += 1
progress.increment(records.length)
ensure
oai_writer.finalize
oai_writer.close
oai_writer.unlink
end
private

writer.finalize
# Skip deletes when writing a full dump
def write_record(record)
return if record.status == 'delete'

writer.files.each do |as, file|
full_dump.public_send(as).attach(io: File.open(file), filename: human_readable_filename(base_name, as))
end
writers.each { |writer| writer.write_marc_record(record) }
end

full_dump.save!
def full_dump
dump
end

GenerateDeltaDumpJob.perform_later(organization)
ensure
writer.close
writer.unlink
end
def dump
@dump ||= @organization.default_stream.normalized_dumps
.create(last_full_dump_at: Time.zone.now,
last_delta_dump_at: Time.zone.now)
end
# rubocop:enable Metrics/AbcSize, Metrics/MethodLength, Metrics/CyclomaticComplexity

def human_readable_filename(base_name, file_type, counter = nil)
as = case file_type
when :deletes
'deletes.del.txt'
when :marc21
'marc21.mrc.gz'
when :marcxml
'marcxml.xml.gz'
when :oai_xml
"oai-#{format('%010d', counter)}.xml.gz"
else
"#{file_type}.gz"
end
def uploads
@organization.default_stream.uploads.active
end

"#{base_name}-#{as}"
def base_name
"#{@organization.slug}-#{Time.zone.today}-full"
end
end
12 changes: 9 additions & 3 deletions app/models/stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,15 @@ def current_full_dump
normalized_dumps.full_dumps.create(last_delta_dump_at: Time.zone.at(0))
end

# the current full dump and its associated deltas
def current_dumps
[current_full_dump, *current_full_dump.deltas]
# the current full dump and its associated deltas, optionally filtered using
# a start and end date range query
def current_dumps(from_date: nil, until_date: nil)
dumps = normalized_dumps.where(id: current_full_dump.id)
.or(normalized_dumps.where(full_dump_id: current_full_dump.id))
.order(created_at: :asc)
dumps = dumps.where('created_at >= ?', from_date) if from_date
dumps = dumps.where('created_at <= ?', until_date) if until_date
dumps
end

# If no datetime is provided then assume we want the previous DefaultStreamHistory
Expand Down
21 changes: 21 additions & 0 deletions app/services/marc_record_writer_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ def unlink
@opened_files.each(&:unlink)
end

def attach_files_to_dump(dump, base_name)
files.each do |file_type, file|
dump.public_send(file_type).attach(io: File.open(file), filename: human_readable_filename(base_name, file_type))
end
end

private

def write_marc21_record(record)
Expand Down Expand Up @@ -83,4 +89,19 @@ def split_marc(marc)

raise e
end

def human_readable_filename(base_name, file_type)
file_name = case file_type
when :deletes
'deletes.del.txt'
when :marc21
'marc21.mrc.gz'
when :marcxml
'marcxml.xml.gz'
else
"#{file_type}.gz"
end

"#{base_name}-#{file_name}"
end
end
Loading