Skip to content

Commit

Permalink
wraps add_counts_to_db_and_dump*.rb and tests it
Browse files Browse the repository at this point in the history
  • Loading branch information
niquerio committed Sep 15, 2023
1 parent b2b2bb6 commit 9f7475d
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 78 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ data
.rspec_status
.bash_history
coverage/
.byebug_history
.irb_history
2 changes: 2 additions & 0 deletions bin/dump_terms_and_counts.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def self.run(url, field, output_file, limit)
output_file = ARGV.shift
limit = (ARGV.shift || -1).to_i

# :nocov:
if ENV["APP_ENV"] != "test"
unless url and field and url =~ /\Ahttp/
puts "\n#{$0} -- print a list of term/number-of-documents pairs from solr"
Expand All @@ -35,3 +36,4 @@ def self.run(url, field, output_file, limit)
TermFetcherWrapper.run(url, field, output_file, limit)
end
end
# :nocov:
163 changes: 88 additions & 75 deletions bin/names/add_counts_to_db_and_dump_unmatched_as_solr.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,88 +8,101 @@
require "milemarker"
require "logger"

LOGGER = Logger.new($stderr)

solr_extract = ARGV.shift
db_name = ARGV.shift
unmatched_file = ARGV.shift

DB = AuthorityBrowse.db(db_name)
names = DB[:names]

unless defined? JRUBY_VERSION
warn "Multithreaded and needs to run under JRuby."
exit 1
end

# Need some sort of placeholder for stuff from teh dump that doesn't match any LoC
class DumpCountsWrapper
def initialize(solr_extract, db_name, unmatched_file, logger = Logger.new($stderr))
@db = AuthorityBrowse.db(db_name)
@names = @db[:names]
@logger = logger
@unmatched_file = unmatched_file
@solr_extract = solr_extract

@match_text_match = @names.select(:id).where(match_text: :$match_text, deprecated: false).prepare(:select, :match_text_match)
@deprecated_match = @names.select(:id).where(match_text: :$match_text, deprecated: true).prepare(:select, :match_text_dep_match)
@increase_count = @names.where(id: :$id).prepare(:update, :increase_count, count: (Sequel[:count] + :$count))
end

milemarker = Milemarker.new(name: "Match and add counts to db", logger: LOGGER, batch_size: 50_000)
milemarker.log "Zeroing out all the counts from the last run. Can take 5mn."
names.db.transaction { names.update(count: 0) }
milemarker.log "...done"

@match_text_match = names.select(:id).where(match_text: :$match_text, deprecated: false).prepare(:select, :match_text_match)
@deprecated_match = names.select(:id).where(match_text: :$match_text, deprecated: true).prepare(:select, :match_text_dep_match)
@increase_count = names.where(id: :$id).prepare(:update, :increase_count, count: (Sequel[:count] + :$count))

# First try to match against a non-deprecated entry. Fall back to deprecated if we can't find one.
# @param [AuthorityBrowse::GenericXRef] unmatched
def best_match(unmatched)
resp = @match_text_match.call(match_text: unmatched.match_text)
if resp.count > 0
resp
else
@deprecated_match.call(match_text: unmatched.match_text)
# First try to match against a non-deprecated entry. Fall back to deprecated if we can't find one.
# @param [AuthorityBrowse::GenericXRef] unmatched
def best_match(unmatched)
resp = @match_text_match.call(match_text: unmatched.match_text)
if resp.count > 0
resp
else
@deprecated_match.call(match_text: unmatched.match_text)
end
end
end

require "concurrent"
lock = Concurrent::ReadWriteLock.new

pool = Concurrent::ThreadPoolExecutor.new(
min_threads: 8,
max_threads: 8,
max_queue: 200,
fallback_policy: :caller_runs
)

milemarker.log "Reading the solr extract (~8M recs). Matches update the db"
milemarker.log "Non-matches are turned into solr docs and written out"
milemarker.log "Matched records exported with unify_counts_with_xrefs_and_dump_matches_as_solr.rb"
milemarker.log "Takes about an hour."

milemarker.threadsafify!
records_read = 0

Zinzout.zout(unmatched_file) do |out|
DB.transaction do
Zinzout.zin(solr_extract).each_with_index do |line, i|
records_read += 1
pool.post(line, i) do |ln, i|
ln.chomp!
components = ln.split("\t")
count = components.pop.to_i
term = components.join(" ")
unmatched = AuthorityBrowse::LocSKOSRDF::UnmatchedEntry.new(label: term, count: count, id: AuthorityBrowse::Normalize.match_text(term))
resp = best_match(unmatched)
case resp.count
when 0
lock.with_write_lock { out.puts unmatched.to_solr }
else
rec = resp.first
@increase_count.call(id: rec[:id], count: count)
def run(threads = 8)
milemarker = Milemarker.new(name: "Match and add counts to db", logger: @logger, batch_size: 50_000)
milemarker.log "Zeroing out all the counts from the last run. Can take 5mn."
# names.db.transaction { names.update(count: 0) }
milemarker.log "...done"

require "concurrent"
lock = Concurrent::ReadWriteLock.new

pool = Concurrent::ThreadPoolExecutor.new(
min_threads: threads,
max_threads: threads,
max_queue: 200,
fallback_policy: :caller_runs
)

milemarker.log "Reading the solr extract (~8M recs). Matches update the db"
milemarker.log "Non-matches are turned into solr docs and written out"
milemarker.log "Matched records exported with unify_counts_with_xrefs_and_dump_matches_as_solr.rb"
milemarker.log "Takes about an hour."

milemarker.threadsafify!
records_read = 0

Zinzout.zout(@unmatched_file) do |out|
@db.transaction do
Zinzout.zin(@solr_extract).each_with_index do |line, i|
records_read += 1
pool.post(line, i) do |ln, i|
ln.chomp!
components = ln.split("\t")
count = components.pop.to_i
term = components.join(" ")
unmatched = AuthorityBrowse::LocSKOSRDF::UnmatchedEntry.new(label: term, count: count, id: AuthorityBrowse::Normalize.match_text(term))
resp = best_match(unmatched)
case resp.count
when 0
lock.with_write_lock { out.puts unmatched.to_solr }
else
rec = resp.first
@increase_count.call(id: rec[:id], count: count)
end
milemarker.increment_and_log_batch_line
end
end
milemarker.increment_and_log_batch_line
end

pool.shutdown
pool.wait_for_termination
milemarker.log_final_line
end

total_matches = @names.where { count > 0 }.count
milemarker.log_final_line
milemarker.log "Matches: #{total_matches}; Non matches: #{records_read - total_matches}"
end
end

pool.shutdown
pool.wait_for_termination
milemarker.log_final_line
solr_extract = ARGV.shift
db_name = ARGV.shift
unmatched_file = ARGV.shift

# :nocov:
if ENV["APP_ENV"] != "test"
if defined? JRUBY_VERSION
DumpCountsWrapper.new(solr_extract, db_name, unmatched_file).run
else
warn "Multithreaded and needs to run under JRuby."
exit 1
end
end
# :nocov:

total_matches = names.where { count > 0 }.count
milemarker.log_final_line
milemarker.log "Matches: #{total_matches}; Non matches: #{records_read - total_matches}"
# Need some sort of placeholder for stuff from teh dump that doesn't match any LoC
Binary file added spec/fixtures/author_authoritative_browse.tsv.gz
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# takes in gzip with aab
# db
# unmatched zipfile

require_relative "../../bin/names/add_counts_to_db_and_dump_unmatched_as_solr"

RSpec.describe DumpCountsWrapper do
before(:each) do
`mkdir -p tmp`
@solr_extract = "spec/fixtures/author_authoritative_browse.tsv.gz"
@db_file = "tmp/database.db"
@unmatched_file = "tmp/unmatched.json.gz"
@logger = instance_double(Logger, info: nil)
@db = AuthorityBrowse.db(@db_file)
@db.create_table(:names) do
String :id, primary_key: true
String :label
String :match_text
Boolean :xrefs
Boolean :deprecated
Integer :count, default: 0
String :json, text: true
end
end
it "runs and matches nothing" do
expect(!File.exist?(@unmatched_file))
described_class.new(@solr_extract, @db_file, @unmatched_file, @logger).run(1)
expect(File.exist?(@unmatched_file))
expect(`zgrep Twain #{@unmatched_file}`).not_to eq("")
end
it "runs and matches something" do
@db[:names].insert(label: "Twain, Shania", match_text: "twain shania", xrefs: false, id: "http://id.loc.gov/authorities/names/no95055361", json: "{}", deprecated: false)
expect(@db[:names].first[:count]).to eq(0)
described_class.new(@solr_extract, @db_file, @unmatched_file, @logger).run(1)
# because Twain, Shania has 6 in the @solr_extract fixture
expect(@db[:names].first[:count]).to eq(6)
end
after(:each) do
`rm tmp/*`
@db.disconnect
end
end
4 changes: 2 additions & 2 deletions spec/integrations/dump_terms_and_counts_spec.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
require_relative "../../bin/dump_terms_and_counts.rb"
require_relative "../../bin/dump_terms_and_counts"
RSpec.describe TermFetcherWrapper do
before(:each) do
`mkdir -p tmp`
Expand All @@ -10,7 +10,7 @@

context ".run" do
it "runs" do
params =
params =
{
:q => "*:*",
"terms.limit" => 1_000,
Expand Down
3 changes: 2 additions & 1 deletion spec/spec_helper.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# frozen_string_literal: true

require "pry"
require "authority_browse"
require "byebug"
require "webmock/rspec"
require "simplecov"
SimpleCov.start
require "authority_browse"
ENV["APP_ENV"] = "test"

RSpec.configure do |config|
Expand Down

0 comments on commit 9f7475d

Please sign in to comment.