From 167f66a4284d6bf258b3e99ff1d7aded0d4c98a1 Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Tue, 23 Jan 2024 10:40:02 -0500 Subject: [PATCH 1/2] roachtest: make ruby-pg test work on Ubuntu 22.04 This required updating the version under test, which led to a few new test failures that we track now. Release note: None --- pkg/cmd/roachtest/tests/ruby_pg.go | 15 +- pkg/cmd/roachtest/tests/ruby_pg_blocklist.go | 71 ++-- pkg/cmd/roachtest/tests/ruby_pg_helpers.rb | 375 ++++++++++++------- 3 files changed, 297 insertions(+), 164 deletions(-) diff --git a/pkg/cmd/roachtest/tests/ruby_pg.go b/pkg/cmd/roachtest/tests/ruby_pg.go index d021707829f7..413a893336f8 100644 --- a/pkg/cmd/roachtest/tests/ruby_pg.go +++ b/pkg/cmd/roachtest/tests/ruby_pg.go @@ -22,12 +22,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" - "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/roachprod/config" rperrors "github.com/cockroachdb/cockroach/pkg/roachprod/errors" "github.com/cockroachdb/cockroach/pkg/roachprod/install" - "github.com/cockroachdb/cockroach/pkg/roachprod/vm" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -38,7 +36,7 @@ var testSummaryRegexp = regexp.MustCompile("^([0-9]+) examples, [0-9]+ failures" // WARNING: DO NOT MODIFY the name of the below constant/variable without approval from the docs team. // This is used by docs automation to produce a list of supported versions for ORM's. -var rubyPGVersion = "v1.3.5" +var rubyPGVersion = "v1.4.6" // This test runs Ruby PG's full test suite against a single cockroach node. func registerRubyPG(r registry.Registry) { @@ -239,13 +237,10 @@ func registerRubyPG(r registry.Registry) { } r.Add(registry.TestSpec{ - Name: "ruby-pg", - Timeout: 1 * time.Hour, - Owner: registry.OwnerSQLFoundations, - // TODO(DarrylWong): This test currently fails on Ubuntu 22.04 so we run it on 20.04. - // See: https://github.com/cockroachdb/cockroach/issues/112109 - // Once this issue is fixed we should remove this Ubuntu Version override. - Cluster: r.MakeClusterSpec(1, spec.UbuntuVersion(vm.FocalFossa)), + Name: "ruby-pg", + Timeout: 1 * time.Hour, + Owner: registry.OwnerSQLFoundations, + Cluster: r.MakeClusterSpec(1), Leases: registry.MetamorphicLeases, NativeLibs: registry.LibGEOS, CompatibleClouds: registry.AllExceptAWS, diff --git a/pkg/cmd/roachtest/tests/ruby_pg_blocklist.go b/pkg/cmd/roachtest/tests/ruby_pg_blocklist.go index da2026236449..c622233fdda9 100644 --- a/pkg/cmd/roachtest/tests/ruby_pg_blocklist.go +++ b/pkg/cmd/roachtest/tests/ruby_pg_blocklist.go @@ -41,6 +41,7 @@ var rubyPGBlocklist = blocklist{ `Basic type mapping PG::BasicTypeMapForResults connection wide type mapping should do cidr type conversions`: "unknown", `Basic type mapping PG::BasicTypeMapForResults connection wide type mapping should do text datetime without time zone type conversions`: "unknown", `GC.compact should compact PG::TypeMapByClass #328`: "unknown", + `PG::Connection #get_result should send remaining data before waiting`: "unknown", `PG::Connection accepts nil as the timeout in #wait_for_notify `: "unknown", `PG::Connection allows a query to be cancelled`: "unknown", `PG::Connection calls a block for NOTIFY events if one is given`: "unknown", @@ -48,13 +49,9 @@ var rubyPGBlocklist = blocklist{ `PG::Connection calls the block supplied to wait_for_notify with the notify payload if it accepts three arguments`: "unknown", `PG::Connection calls the block supplied to wait_for_notify with the notify payload if it accepts two arguments`: "unknown", `PG::Connection calls the block supplied to wait_for_notify with the notify payload if it doesn't accept arguments`: "unknown", - `PG::Connection can handle client errors in #copy_data for input`: "unknown", - `PG::Connection can handle server errors in #copy_data for input`: "unknown", - `PG::Connection can handle server errors in #copy_data for output`: "unknown", - `PG::Connection can process #copy_data input queries`: "unknown", - `PG::Connection can process #copy_data input queries with lots of data`: "unknown", `PG::Connection can receive notices while waiting for NOTIFY without exceeding the timeout`: "unknown", `PG::Connection can wait for NOTIFY events`: "unknown", + `PG::Connection carries the connection in case of connection errors`: "unknown", `PG::Connection connection information related to SSL can retrieve a single ssl connection attribute`: "unknown", `PG::Connection connection information related to SSL can retrieve connection's ssl state`: "unknown", `PG::Connection connects using URI with UnixSocket host`: "unknown", @@ -63,37 +60,36 @@ var rubyPGBlocklist = blocklist{ `PG::Connection doesn't collapse sequential notifications`: "unknown", `PG::Connection doesn't leave stale server connections after finish`: "unknown", `PG::Connection emits a suitable error_message at connection errors`: "unknown", - `PG::Connection gracefully handle SQL statements while in #copy_data for input`: "unknown", - `PG::Connection gracefully handle SQL statements while in #copy_data for output`: "unknown", `PG::Connection in nonblocking mode can send query with params`: "unknown", `PG::Connection in nonblocking mode needs to flush data after send_query`: "unknown", `PG::Connection in nonblocking mode rejects to send lots of COPY data`: "unknown", `PG::Connection in nonblocking mode returns immediately from get_copy_data(nonblock=true)`: "unknown", + `PG::Connection large objects large object can handle big data`: "unknown", + `PG::Connection large objects not read past the end of a large object`: "unknown", `PG::Connection multinationalization support Ruby 1.9.x default_internal encoding allows users of the async interface to set the client_encoding to the default_internal`: "unknown", `PG::Connection multinationalization support Ruby 1.9.x default_internal encoding honors the Encoding.default_internal if it's set and the synchronous interface is used`: "unknown", `PG::Connection multinationalization support encodes exception messages with the connection's encoding (#96)`: "unknown", `PG::Connection multinationalization support handles clearing result in or after set_notice_receiver`: "unknown", `PG::Connection multinationalization support receives properly encoded messages in the notice callbacks`: "unknown", `PG::Connection multinationalization support receives properly encoded text from wait_for_notify`: "unknown", + `PG::Connection multinationalization support respect and convert character encoding of input strings should convert error string to #put_copy_end`: "unknown", `PG::Connection multinationalization support respect and convert character encoding of input strings should convert query string and parameters to #exec_params`: "unknown", `PG::Connection multinationalization support respect and convert character encoding of input strings should convert query string and parameters to #send_query_params`: "unknown", `PG::Connection multinationalization support respect and convert character encoding of input strings should convert strings and parameters to #prepare and #exec_prepared`: "unknown", `PG::Connection multinationalization support respect and convert character encoding of input strings should convert strings and parameters to #send_prepare and #send_query_prepared`: "unknown", `PG::Connection multinationalization support returns properly encoded text from notifies`: "unknown", - `PG::Connection multinationalization support rubyforge #22925: m17n support can use an encoding with high index for client encoding`: "unknown", `PG::Connection multinationalization support rubyforge #22925: m17n support raises appropriate error if set_client_encoding is called with invalid arguments`: "unknown", `PG::Connection multinationalization support rubyforge #22925: m17n support returns the results in the correct encoding even if the client_encoding has changed since the results were fetched`: "unknown", `PG::Connection multinationalization support rubyforge #22925: m17n support should return results in the same encoding as the client (EUC-JP)`: "unknown", `PG::Connection multinationalization support rubyforge #22925: m17n support should return results in the same encoding as the client (iso-8859-1)`: "unknown", `PG::Connection multinationalization support rubyforge #22925: m17n support the connection should return ASCII-8BIT when it's set to SQL_ASCII`: "unknown", - `PG::Connection multinationalization support rubyforge #22925: m17n support the connection should use JOHAB dummy encoding when it's set to JOHAB`: "unknown", + `PG::Connection multinationalization support rubyforge #22925: m17n support the connection should use the BINARY encoding when it's set to JOHAB`: "unknown", `PG::Connection multinationalization support rubyforge #22925: m17n support uses the client encoding for escaped identifier`: "unknown", `PG::Connection multinationalization support rubyforge #22925: m17n support uses the client encoding for escaped literal`: "unknown", `PG::Connection multinationalization support rubyforge #22925: m17n support uses the client encoding for escaped string`: "unknown", `PG::Connection multinationalization support rubyforge #22925: m17n support uses the client encoding for quote_ident`: "unknown", `PG::Connection multinationalization support rubyforge #22925: m17n support uses the previous string encoding for escaped string`: "unknown", `PG::Connection multinationalization support rubyforge #22925: m17n support uses the previous string encoding for quote_ident`: "unknown", - `PG::Connection not read past the end of a large object`: "unknown", `PG::Connection returns notifications which are already in the queue before wait_for_notify is called without waiting for the socket to become readable`: "unknown", `PG::Connection sends nil as the payload if the notification wasn't given one`: "unknown", `PG::Connection set_single_row_mode should receive rows before entire query is finished`: "unknown", @@ -105,11 +101,24 @@ var rubyPGBlocklist = blocklist{ `PG::Connection type casting with default result type map should respect a type mapping for result`: "unknown", `PG::Connection type casting with default result type map should work with arbitrary number of params in conjunction with type casting`: "unknown", `PG::Connection with async established connection provides the server generated error message`: "unknown", + `PG::Connection with multiple PostgreSQL servers honors target_session_attrs requirements`: "unknown", + `PG::Connection#copy_data can handle client errors in #copy_data for input`: "unknown", + `PG::Connection#copy_data can handle server errors in #copy_data for input`: "unknown", + `PG::Connection#copy_data can handle server errors in #copy_data for output`: "unknown", + `PG::Connection#copy_data can process #copy_data input queries`: "unknown", + `PG::Connection#copy_data can process #copy_data input queries with lots of data`: "unknown", + `PG::Connection#copy_data doesn't lose client error when #copy_data can not be finished`: "unknown", + `PG::Connection#copy_data gracefully handle SQL statements while in #copy_data for input`: "unknown", + `PG::Connection#copy_data gracefully handle SQL statements while in #copy_data for output`: "unknown", + `PG::Connection#discard_results returns false on connection failures`: "unknown", + `PG::Connection#inspect should print host, port and user of a fresh connection, but not more`: "unknown", + `PG::Connection#inspect should tell about non UTF8 client encoding`: "unknown", `PG::Connection#transaction automatically rolls back a transaction if an exception is raised`: "unknown", + `PG::Connection#transaction commits even if the block includes an early break/return`: "unknown", `PG::Connection#transaction doesn't worry about an already finished connection`: "unknown", `PG::Connection#transaction passes the connection to the block and returns the block result`: "unknown", - `PG::Connection#transaction stops a thread that runs a blocking transaction with async_exec`: "unknown", - `PG::Connection#transaction stops a thread that runs a failing transaction with async_exec`: "unknown", + `PG::Connection#transaction stops a thread that runs a blocking transaction with exec`: "unknown", + `PG::Connection#transaction stops a thread that runs a failing transaction with exec`: "unknown", `PG::Connection#transaction stops a thread that runs a no query but a transacted ruby sleep`: "unknown", `PG::Result encapsulates database object names for integrity constraint violations`: "unknown", `PG::Result encapsulates errors in a PG::Error object`: "unknown", @@ -118,6 +127,7 @@ var rubyPGBlocklist = blocklist{ `PG::TypeMapByOid should allow mixed type conversions in binary format`: "unknown", `PG::TypeMapByOid should allow mixed type conversions in text format`: "unknown", `PG::TypeMapByOid should build a TypeMapByColumn when assigned and the number of rows is high enough`: "unknown", + `running with sync_* methods PG::Connection #get_result should send remaining data before waiting`: "unknown", `running with sync_* methods PG::Connection accepts nil as the timeout in #wait_for_notify `: "unknown", `running with sync_* methods PG::Connection allows a query to be cancelled`: "unknown", `running with sync_* methods PG::Connection calls a block for NOTIFY events if one is given`: "unknown", @@ -125,13 +135,9 @@ var rubyPGBlocklist = blocklist{ `running with sync_* methods PG::Connection calls the block supplied to wait_for_notify with the notify payload if it accepts three arguments`: "unknown", `running with sync_* methods PG::Connection calls the block supplied to wait_for_notify with the notify payload if it accepts two arguments`: "unknown", `running with sync_* methods PG::Connection calls the block supplied to wait_for_notify with the notify payload if it doesn't accept arguments`: "unknown", - `running with sync_* methods PG::Connection can handle client errors in #copy_data for input`: "unknown", - `running with sync_* methods PG::Connection can handle server errors in #copy_data for input`: "unknown", - `running with sync_* methods PG::Connection can handle server errors in #copy_data for output`: "unknown", - `running with sync_* methods PG::Connection can process #copy_data input queries`: "unknown", - `running with sync_* methods PG::Connection can process #copy_data input queries with lots of data`: "unknown", `running with sync_* methods PG::Connection can receive notices while waiting for NOTIFY without exceeding the timeout`: "unknown", `running with sync_* methods PG::Connection can wait for NOTIFY events`: "unknown", + `running with sync_* methods PG::Connection carries the connection in case of connection errors`: "unknown", `running with sync_* methods PG::Connection connection information related to SSL can retrieve a single ssl connection attribute`: "unknown", `running with sync_* methods PG::Connection connection information related to SSL can retrieve connection's ssl state`: "unknown", `running with sync_* methods PG::Connection connects using URI with UnixSocket host`: "unknown", @@ -140,12 +146,12 @@ var rubyPGBlocklist = blocklist{ `running with sync_* methods PG::Connection doesn't collapse sequential notifications`: "unknown", `running with sync_* methods PG::Connection doesn't leave stale server connections after finish`: "unknown", `running with sync_* methods PG::Connection emits a suitable error_message at connection errors`: "unknown", - `running with sync_* methods PG::Connection gracefully handle SQL statements while in #copy_data for input`: "unknown", - `running with sync_* methods PG::Connection gracefully handle SQL statements while in #copy_data for output`: "unknown", `running with sync_* methods PG::Connection in nonblocking mode can send query with params`: "unknown", `running with sync_* methods PG::Connection in nonblocking mode needs to flush data after send_query`: "unknown", `running with sync_* methods PG::Connection in nonblocking mode rejects to send lots of COPY data`: "unknown", `running with sync_* methods PG::Connection in nonblocking mode returns immediately from get_copy_data(nonblock=true)`: "unknown", + `running with sync_* methods PG::Connection large objects large object can handle big data`: "unknown", + `running with sync_* methods PG::Connection large objects not read past the end of a large object`: "unknown", `running with sync_* methods PG::Connection multinationalization support Ruby 1.9.x default_internal encoding allows users of the async interface to set the client_encoding to the default_internal`: "unknown", `running with sync_* methods PG::Connection multinationalization support Ruby 1.9.x default_internal encoding honors the Encoding.default_internal if it's set and the synchronous interface is used`: "unknown", `running with sync_* methods PG::Connection multinationalization support encodes exception messages with the connection's encoding (#96)`: "unknown", @@ -157,20 +163,18 @@ var rubyPGBlocklist = blocklist{ `running with sync_* methods PG::Connection multinationalization support respect and convert character encoding of input strings should convert strings and parameters to #prepare and #exec_prepared`: "unknown", `running with sync_* methods PG::Connection multinationalization support respect and convert character encoding of input strings should convert strings and parameters to #send_prepare and #send_query_prepared`: "unknown", `running with sync_* methods PG::Connection multinationalization support returns properly encoded text from notifies`: "unknown", - `running with sync_* methods PG::Connection multinationalization support rubyforge #22925: m17n support can use an encoding with high index for client encoding`: "unknown", `running with sync_* methods PG::Connection multinationalization support rubyforge #22925: m17n support raises appropriate error if set_client_encoding is called with invalid arguments`: "unknown", `running with sync_* methods PG::Connection multinationalization support rubyforge #22925: m17n support returns the results in the correct encoding even if the client_encoding has changed since the results were fetched`: "unknown", `running with sync_* methods PG::Connection multinationalization support rubyforge #22925: m17n support should return results in the same encoding as the client (EUC-JP)`: "unknown", `running with sync_* methods PG::Connection multinationalization support rubyforge #22925: m17n support should return results in the same encoding as the client (iso-8859-1)`: "unknown", `running with sync_* methods PG::Connection multinationalization support rubyforge #22925: m17n support the connection should return ASCII-8BIT when it's set to SQL_ASCII`: "unknown", - `running with sync_* methods PG::Connection multinationalization support rubyforge #22925: m17n support the connection should use JOHAB dummy encoding when it's set to JOHAB`: "unknown", + `running with sync_* methods PG::Connection multinationalization support rubyforge #22925: m17n support the connection should use the BINARY encoding when it's set to JOHAB`: "unknown", `running with sync_* methods PG::Connection multinationalization support rubyforge #22925: m17n support uses the client encoding for escaped identifier`: "unknown", `running with sync_* methods PG::Connection multinationalization support rubyforge #22925: m17n support uses the client encoding for escaped literal`: "unknown", `running with sync_* methods PG::Connection multinationalization support rubyforge #22925: m17n support uses the client encoding for escaped string`: "unknown", `running with sync_* methods PG::Connection multinationalization support rubyforge #22925: m17n support uses the client encoding for quote_ident`: "unknown", `running with sync_* methods PG::Connection multinationalization support rubyforge #22925: m17n support uses the previous string encoding for escaped string`: "unknown", `running with sync_* methods PG::Connection multinationalization support rubyforge #22925: m17n support uses the previous string encoding for quote_ident`: "unknown", - `running with sync_* methods PG::Connection not read past the end of a large object`: "unknown", `running with sync_* methods PG::Connection returns notifications which are already in the queue before wait_for_notify is called without waiting for the socket to become readable`: "unknown", `running with sync_* methods PG::Connection sends nil as the payload if the notification wasn't given one`: "unknown", `running with sync_* methods PG::Connection set_single_row_mode should receive rows before entire query is finished`: "unknown", @@ -182,12 +186,27 @@ var rubyPGBlocklist = blocklist{ `running with sync_* methods PG::Connection type casting with default result type map should respect a type mapping for result`: "unknown", `running with sync_* methods PG::Connection type casting with default result type map should work with arbitrary number of params in conjunction with type casting`: "unknown", `running with sync_* methods PG::Connection with async established connection provides the server generated error message`: "unknown", + `running with sync_* methods PG::Connection with multiple PostgreSQL servers honors target_session_attrs requirements`: "unknown", + `running with sync_* methods PG::Connection#copy_data can handle client errors after all data is consumed in #copy_data for output`: "unknown", + `running with sync_* methods PG::Connection#copy_data can handle client errors in #copy_data for input`: "unknown", + `running with sync_* methods PG::Connection#copy_data can handle client errors in #copy_data for output`: "unknown", + `running with sync_* methods PG::Connection#copy_data can handle incomplete #copy_data output queries`: "unknown", + `running with sync_* methods PG::Connection#copy_data can handle server errors in #copy_data for input`: "unknown", + `running with sync_* methods PG::Connection#copy_data can handle server errors in #copy_data for output`: "unknown", + `running with sync_* methods PG::Connection#copy_data can process #copy_data input queries`: "unknown", + `running with sync_* methods PG::Connection#copy_data can process #copy_data input queries with lots of data`: "unknown", + `running with sync_* methods PG::Connection#copy_data can process #copy_data output queries`: "unknown", + `running with sync_* methods PG::Connection#copy_data doesn't lose client error when #copy_data can not be finished`: "unknown", + `running with sync_* methods PG::Connection#copy_data gracefully handle SQL statements while in #copy_data for input`: "unknown", + `running with sync_* methods PG::Connection#copy_data gracefully handle SQL statements while in #copy_data for output`: "unknown", + `running with sync_* methods PG::Connection#copy_data should raise an error for non copy statements in #copy_data`: "unknown", + `running with sync_* methods PG::Connection#discard_results discards previous results`: "unknown", + `running with sync_* methods PG::Connection#discard_results returns false on connection failures`: "unknown", + `running with sync_* methods PG::Connection#inspect should print host, port and user of a fresh connection, but not more`: "unknown", + `running with sync_* methods PG::Connection#inspect should tell about non UTF8 client encoding`: "unknown", `running with sync_* methods PG::Connection#transaction automatically rolls back a transaction if an exception is raised`: "unknown", - `running with sync_* methods PG::Connection#transaction doesn't worry about an already finished connection`: "unknown", + `running with sync_* methods PG::Connection#transaction commits even if the block includes an early break/return`: "unknown", `running with sync_* methods PG::Connection#transaction passes the connection to the block and returns the block result`: "unknown", - `running with sync_* methods PG::Connection#transaction stops a thread that runs a blocking transaction with async_exec`: "unknown", - `running with sync_* methods PG::Connection#transaction stops a thread that runs a failing transaction with async_exec`: "unknown", - `running with sync_* methods PG::Connection#transaction stops a thread that runs a no query but a transacted ruby sleep`: "unknown", `with a Fiber scheduler can cancel a query`: "unknown", `with a Fiber scheduler can receive COPY data`: "unknown", `with a Fiber scheduler can retrieve several results`: "unknown", diff --git a/pkg/cmd/roachtest/tests/ruby_pg_helpers.rb b/pkg/cmd/roachtest/tests/ruby_pg_helpers.rb index b8c4c1c87ab1..d9547ad722c4 100644 --- a/pkg/cmd/roachtest/tests/ruby_pg_helpers.rb +++ b/pkg/cmd/roachtest/tests/ruby_pg_helpers.rb @@ -10,8 +10,9 @@ require 'openssl' require_relative 'helpers/scheduler.rb' require_relative 'helpers/tcp_gate_scheduler.rb' +require_relative 'helpers/tcp_gate_switcher.rb' -DEFAULT_TEST_DIR_STR = File.join(Dir.pwd, "tmp_test_specs") +DEFAULT_TEST_DIR_STR = Dir.pwd TEST_DIR_STR = ENV['RUBY_PG_TEST_DIR'] || DEFAULT_TEST_DIR_STR TEST_DIRECTORY = Pathname.new(TEST_DIR_STR) DATA_OBJ_MEMSIZE = 40 @@ -25,7 +26,10 @@ def self::included( mod ) if mod.respond_to?( :around ) mod.before( :all ) do - @conn = connect_testing_db + @port = $pg_server.port + @conninfo = $pg_server.conninfo + @unix_socket = $pg_server.unix_socket + @conn = $pg_server.connect end mod.around( :each ) do |example| @@ -95,96 +99,99 @@ def self::included( mod ) module_function ############### - ### Create a string that contains the ANSI codes specified and return it - def ansi_code( *attributes ) - attributes.flatten! - attributes.collect! {|at| at.to_s } + module Loggable + ### Create a string that contains the ANSI codes specified and return it + def ansi_code( *attributes ) + attributes.flatten! + attributes.collect! {|at| at.to_s } - return '' unless /(?:vt10[03]|xterm(?:-color)?|linux|screen)/i =~ ENV['TERM'] - attributes = ANSI_ATTRIBUTES.values_at( *attributes ).compact.join(';') + return '' unless /(?:vt10[03]|xterm(?:-color)?|linux|screen)/i =~ ENV['TERM'] + attributes = ANSI_ATTRIBUTES.values_at( *attributes ).compact.join(';') - # $stderr.puts " attr is: %p" % [attributes] - if attributes.empty? - return '' - else - return "\e[%sm" % attributes + # $stderr.puts " attr is: %p" % [attributes] + if attributes.empty? + return '' + else + return "\e[%sm" % attributes + end end - end - ### Colorize the given +string+ with the specified +attributes+ and return it, handling - ### line-endings, color reset, etc. - def colorize( *args ) - string = '' + ### Colorize the given +string+ with the specified +attributes+ and return it, handling + ### line-endings, color reset, etc. + def colorize( *args ) + string = '' - if block_given? - string = yield - else - string = args.shift - end + if block_given? + string = yield + else + string = args.shift + end - ending = string[/(\s)$/] || '' - string = string.rstrip + ending = string[/(\s)$/] || '' + string = string.rstrip - return ansi_code( args.flatten ) + string + ansi_code( 'reset' ) + ending - end + return ansi_code( args.flatten ) + string + ansi_code( 'reset' ) + ending + end - ### Output a message with highlighting. - def message( *msg ) - $stderr.puts( colorize(:bold) { msg.flatten.join(' ') } ) - end + ### Output a message with highlighting. + def message( *msg ) + $stderr.puts( colorize(:bold) { msg.flatten.join(' ') } ) + end - ### Output a logging message if $VERBOSE is true - def trace( *msg ) - return unless $VERBOSE - output = colorize( msg.flatten.join(' '), 'yellow' ) - $stderr.puts( output ) - end + ### Output a logging message if $VERBOSE is true + def trace( *msg ) + return unless $VERBOSE + output = colorize( msg.flatten.join(' '), 'yellow' ) + $stderr.puts( output ) + end - ### Return the specified args as a string, quoting any that have a space. - def quotelist( *args ) - return args.flatten.collect {|part| part.to_s =~ /\s/ ? part.to_s.inspect : part.to_s } - end + ### Return the specified args as a string, quoting any that have a space. + def quotelist( *args ) + return args.flatten.collect {|part| part.to_s =~ /\s/ ? part.to_s.inspect : part.to_s } + end - ### Run the specified command +cmd+ with system(), failing if the execution - ### fails. - def run( *cmd ) - cmd.flatten! + ### Run the specified command +cmd+ with system(), failing if the execution + ### fails. + def run( *cmd ) + cmd.flatten! - if cmd.length > 1 - trace( quotelist(*cmd) ) - else - trace( cmd ) - end + if cmd.length > 1 + trace( quotelist(*cmd) ) + else + trace( cmd ) + end - system( *cmd ) - raise "Command failed: [%s]" % [cmd.join(' ')] unless $?.success? - end + system( *cmd ) + raise "Command failed: [%s]" % [cmd.join(' ')] unless $?.success? + end - ### Run the specified command +cmd+ after redirecting stdout and stderr to the specified - ### +logpath+, failing if the execution fails. - def log_and_run( logpath, *cmd ) - cmd.flatten! + ### Run the specified command +cmd+ after redirecting stdout and stderr to the specified + ### +logpath+, failing if the execution fails. + def log_and_run( logpath, *cmd ) + cmd.flatten! - if cmd.length > 1 - trace( quotelist(*cmd) ) - else - trace( cmd ) - end + if cmd.length > 1 + trace( quotelist(*cmd) ) + else + trace( cmd ) + end - # Eliminate the noise of creating/tearing down the database by - # redirecting STDERR/STDOUT to a logfile - logfh = File.open( logpath, File::WRONLY|File::CREAT|File::APPEND ) - system( *cmd, [STDOUT, STDERR] => logfh ) + # Eliminate the noise of creating/tearing down the database by + # redirecting STDERR/STDOUT to a logfile + logfh = File.open( logpath, File::WRONLY|File::CREAT|File::APPEND ) + system( *cmd, [STDOUT, STDERR] => logfh ) - raise "Command failed: [%s]" % [cmd.join(' ')] unless $?.success? + raise "Command failed: [%s]" % [cmd.join(' ')] unless $?.success? + end end + extend Loggable ### Check the current directory for directories that look like they're ### testing directories from previous tests, and tell any postgres instances @@ -212,61 +219,92 @@ def stop_existing_postmasters end end - def define_testing_conninfo - ENV['PGPORT'] ||= "26257" - @port = ENV['PGPORT'].to_i - ENV['PGHOST'] = 'localhost' - ENV['PGUSER'] = 'test_admin' - @conninfo = "user=test_admin host=localhost port=#{@port} dbname=test" - @unix_socket = TEST_DIRECTORY.to_s - end + class PostgresServer + include Loggable - ### Set up a CockroachDB database instance for testing. - def setup_testing_db( description ) - stop_existing_postmasters() + attr_reader :port + attr_reader :conninfo + attr_reader :unix_socket - trace "Setting up test database for #{description}" - @test_pgdata = TEST_DIRECTORY + 'data' - @test_pgdata.mkpath + ### Set up a PostgreSQL database instance for testing. + def initialize( name, port: 26257, postgresql_conf: '' ) + trace "Setting up test database for #{name}" + @name = name + @port = port + @test_dir = TEST_DIRECTORY + "tmp_test_#{@name}" + @test_pgdata = @test_dir + 'data' + @test_pgdata.mkpath - define_testing_conninfo + @logfile = @test_dir + 'setup.log' + trace "Command output logged to #{@logfile}" - @logfile = TEST_DIRECTORY + 'setup.log' - trace "Command output logged to #{@logfile}" + begin + unless (@test_pgdata+"postgresql.conf").exist? + FileUtils.rm_rf( @test_pgdata, :verbose => $DEBUG ) + end - begin - unless (@test_pgdata+"postgresql.conf").exist? - FileUtils.rm_rf( @test_pgdata, :verbose => $DEBUG ) - trace "GG" - trace "Running initdb" + unless @port == 26257 + # The main database instance is started by the roachtest + # script, so skip this step if we're using the default port. + trace "Starting cockroachdb" + log_and_run @logfile, '/home/ubuntu/cockroach', 'start-single-node', '--insecure', + "--store=#{@test_pgdata.to_s}", + "--advertise-addr=localhost", + "--sql-addr=localhost:#{@port}" + sleep(2) + log_and_run @logfile, '/home/ubuntu/cockroach', 'sql', '--insecure', '-e', 'CREATE USER test_admin' + log_and_run @logfile, '/home/ubuntu/cockroach', 'sql', '--insecure', '-e', 'GRANT admin TO test_admin' + end + + td = @test_pgdata + @conninfo = "user=test_admin host=localhost port=#{@port} dbname=test" + @unix_socket = @test_dir.to_s + rescue => err + $stderr.puts "%p during test setup: %s" % [ err.class, err.message ] + $stderr.puts "See #{@logfile} for details." + $stderr.puts err.backtrace if $DEBUG + fail end + end + + def generate_ssl_certs(output_dir) + gen = CertGenerator.new(output_dir) + + trace "create ca-key" + ca_key = gen.create_key('ruby-pg-ca-key') + ca_cert = gen.create_ca_cert('ruby-pg-ca-cert', ca_key, '/CN=ruby-pg root key') + trace "create server cert" + key = gen.create_key('ruby-pg-server-key') + csr = gen.create_signing_request('ruby-pg-server-csr', '/CN=localhost', key) + gen.create_cert_from_csr('ruby-pg-server-cert', csr, ca_cert, ca_key, dns_names: %w[localhost] ) + + trace "create client cert" + key = gen.create_key('ruby-pg-client-key') + csr = gen.create_signing_request('ruby-pg-client-csr', '/CN=ruby-pg client', key) + gen.create_cert_from_csr('ruby-pg-client-cert', csr, ca_cert, ca_key) + end + + def create_test_db trace "Creating the test DB" log_and_run @logfile, '/home/ubuntu/cockroach', 'sql', '--insecure', '-e', 'DROP DATABASE IF EXISTS test' log_and_run @logfile, '/home/ubuntu/cockroach', 'sql', '--insecure', '-e', 'CREATE DATABASE test' - - rescue => err - $stderr.puts "%p during test setup: %s" % [ err.class, err.message ] - $stderr.puts "See #{@logfile} for details." - $stderr.puts err.backtrace if $DEBUG - fail end - end - def connect_testing_db - define_testing_conninfo - conn = PG.connect( @conninfo ) - conn.set_notice_processor do |message| - $stderr.puts( description + ':' + message ) if $DEBUG - end + def connect + conn = PG.connect( @conninfo ) + conn.set_notice_processor do |message| + $stderr.puts( @name + ':' + message ) if $DEBUG + end - return conn - end + return conn + end - def teardown_testing_db - trace "Tearing down test database" - # This is changed to a no-op so that we can inspect the database after the - # test runs. + def teardown + trace "Tearing down test database for #{@name}" + # This is changed to a no-op so that we can inspect the database after the + # test runs. + end end class CertGenerator @@ -378,24 +416,6 @@ def create_cert_from_csr(name, csr, ca_cert, ca_key, valid_years: 10, dns_names: end end - def generate_ssl_certs(output_dir) - gen = CertGenerator.new(output_dir) - - trace "create ca-key" - ca_key = gen.create_key('ruby-pg-ca-key') - ca_cert = gen.create_ca_cert('ruby-pg-ca-cert', ca_key, '/CN=ruby-pg root key') - - trace "create server cert" - key = gen.create_key('ruby-pg-server-key') - csr = gen.create_signing_request('ruby-pg-server-csr', '/CN=localhost', key) - gen.create_cert_from_csr('ruby-pg-server-cert', csr, ca_cert, ca_key, dns_names: %w[localhost] ) - - trace "create client cert" - key = gen.create_key('ruby-pg-client-key') - csr = gen.create_signing_request('ruby-pg-client-csr', '/CN=ruby-pg client', key) - gen.create_cert_from_csr('ruby-pg-client-cert', csr, ca_cert, ca_key) - end - def check_for_lingering_connections( conn ) conn.exec( "SELECT * FROM pg_stat_activity" ) do |res| conns = res.find_all {|row| row['pid'].to_i != conn.backend_pid && ["client backend", nil].include?(row["backend_type"]) } @@ -510,6 +530,97 @@ def wait_for_flush(conn) end end end + + def scheduler_setup + # Run examples with gated scheduler + sched = Helpers::TcpGateScheduler.new(external_host: 'localhost', external_port: ENV['PGPORT'].to_i, debug: ENV['PG_DEBUG']=='1') + Fiber.set_scheduler(sched) + @conninfo_gate = @conninfo.gsub(/(^| )port=\d+/, " port=#{sched.internal_port} sslmode=disable") + + # Run examples with default scheduler + #Fiber.set_scheduler(Helpers::Scheduler.new) + #@conninfo_gate = @conninfo + + # Run examples without scheduler + #def Fiber.schedule; yield; end + #@conninfo_gate = @conninfo + end + + def scheduler_teardown + Fiber.set_scheduler(nil) + end + + def scheduler_stop + if Fiber.scheduler && Fiber.scheduler.respond_to?(:finish) + Fiber.scheduler.finish + end + end + + def thread_with_timeout(timeout) + th = Thread.new do + yield + end + unless th.join(timeout) + th.kill + $scheduler_timeout = true + raise("scheduler timeout in:\n#{th.backtrace.join("\n")}") + end + end + + def run_with_scheduler(timeout=10) + thread_with_timeout(timeout) do + scheduler_setup + Fiber.schedule do + conn = PG.connect(@conninfo_gate) + + yield conn + + conn.finish + scheduler_stop + end + end + scheduler_teardown + end + + def gate_setup + # Run examples with gate + gate = Helpers::TcpGateSwitcher.new(external_host: 'localhost', external_port: ENV['PGPORT'].to_i, debug: ENV['PG_DEBUG']=='1') + @conninfo_gate = @conninfo.gsub(/(^| )port=\d+/, " port=#{gate.internal_port} sslmode=disable") + + # Run examples without gate + #@conninfo_gate = @conninfo + gate + end + + def gate_stop(gate) + gate&.finish + end + + def run_with_gate(timeout=10) + thread_with_timeout(timeout) do + gate = gate_setup + conn = PG.connect(@conninfo_gate) + + yield conn, gate + + conn.finish + gate_stop(gate) + end + end + + # Define environment variables for the time of the given block + # + # All environment variables are restored to the original value or undefined after the block. + def with_env_vars(**kwargs) + kwargs = kwargs.map{|k,v| [k.to_s, v && v.to_s] }.to_h + old_values = kwargs.map{|k,_| [k, ENV[k]] }.to_h + ENV.update(kwargs) + begin + yield + ensure + ENV.update(old_values) + end + end end @@ -538,12 +649,20 @@ def wait_for_flush(conn) config.filter_run_excluding( :unix_socket ) if RUBY_PLATFORM=~/mingw|mswin/i config.filter_run_excluding( :scheduler ) if RUBY_VERSION < "3.0" || !Fiber.respond_to?(:scheduler) config.filter_run_excluding( :scheduler_address_resolve ) if RUBY_VERSION < "3.1" + config.filter_run_excluding( :ipv6 ) if Addrinfo.getaddrinfo("localhost", nil, nil, :STREAM).size < 2 ### Automatically set up and tear down the database config.before(:suite) do |*args| - PG::TestingHelpers.setup_testing_db("the spec suite") + PG::TestingHelpers.stop_existing_postmasters + + ENV['PGHOST'] = 'localhost' + ENV['PGPORT'] ||= "26257" + port = ENV['PGPORT'].to_i + ENV['PGUSER'] = 'test_admin' + $pg_server = PG::TestingHelpers::PostgresServer.new("specs", port: port) + $pg_server.create_test_db end config.after(:suite) do - PG::TestingHelpers.teardown_testing_db + $pg_server.teardown end end From cb674c239069480b57ebd9ba574eb3e3907e1342 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Thu, 11 Jan 2024 14:47:10 -0500 Subject: [PATCH 2/2] changefeedccl: introduce quota to parallelio Problem: In this (#111829) investigation, it was observed that there was a lot of wasted CPU performing unions in intsets. This is caused by the `parallelIO` struct which relies on performing unions to check for conflicting keys. The problem is not with intsets being slow, but it has to do with how they are used: `parallelIO` uses a goroutine to both process incoming requests and emit outgoing results. It accepts incoming requests unconditionally, enqueing them if they cannot be emitted due to conflicting in flight requests. As outgoing results are processed, each outgoing result is cross checked with, in the worst case, all enqueued requests to see if they can be emitted. The cross checking requires unions. incoming request -> request queue -> request handler -> result ^ ^ | cross check all entries to see | | if a new request can be emitted | A problem arises when the incoming request queue grows to some critical length where it significantly slows down the cross checking. This slows down result processing and ultimately slows down consumption from the request queue. This creates a negative feedback loop which causes the request queue to grow so large that results take very long to process. This creates a bottle neck, which throttles the entire changefeed. See comments #115536 for more details. The request queue is unbounded. The only reason it doesn't cause an OOM is because the incoming requests are bounded (by the per-changefeed memory limit). Solution: This change solves this problem by setting a quota for the maximum events being processed by the library at the same time. This change sets a size of 128 requests by default. This setting can be changed using a new cluster setting `changefeed.parallel_io.request_quota`. Before this change, the API for the parallelio library was very bad. It required the caller to select on both the request channel and result channel to prevent deadlock. There were also no public methods. This made it unclear how to properly use the API. This change makes an explicit API with public methods. However, it keeps the same 2-channel scheme because removing that would require a larger refactor. This is left as a TODO. Closes: https://github.com/cockroachdb/cockroach/issues/115536 Release note: None Epic: None --- pkg/ccl/changefeedccl/BUILD.bazel | 1 + pkg/ccl/changefeedccl/batching_sink.go | 67 +++++-- pkg/ccl/changefeedccl/parallel_io.go | 218 +++++++++++++++------ pkg/ccl/changefeedccl/sink.go | 9 +- pkg/ccl/changefeedccl/sink_pubsub_v2.go | 3 + pkg/ccl/changefeedccl/sink_webhook_test.go | 3 +- pkg/ccl/changefeedccl/sink_webhook_v2.go | 3 + 7 files changed, 218 insertions(+), 86 deletions(-) diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 660b6199d56e..9428ec81adc9 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -142,6 +142,7 @@ go_library( "//pkg/util/mon", "//pkg/util/parquet", "//pkg/util/protoutil", + "//pkg/util/quotapool", "//pkg/util/randutil", "//pkg/util/retry", "//pkg/util/span", diff --git a/pkg/ccl/changefeedccl/batching_sink.go b/pkg/ccl/changefeedccl/batching_sink.go index 2f759d8df048..6341d7a49503 100644 --- a/pkg/ccl/changefeedccl/batching_sink.go +++ b/pkg/ccl/changefeedccl/batching_sink.go @@ -16,12 +16,15 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/intsets" + "github.com/cockroachdb/cockroach/pkg/util/log/logcrash" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" ) // SinkClient is an interface to an external sink, where messages are written @@ -66,9 +69,10 @@ type batchingSink struct { minFlushFrequency time.Duration retryOpts retry.Options - ts timeutil.TimeSource - metrics metricsRecorder - knobs batchingSinkKnobs + ts timeutil.TimeSource + metrics metricsRecorder + settings *cluster.Settings + knobs batchingSinkKnobs // eventCh is the channel used to send requests from the Sink caller routines // to the batching routine. Messages can either be a flushReq or a rowEvent. @@ -157,7 +161,7 @@ func freeRowEvent(e *rowEvent) { eventPool.Put(e) } -var batchPool sync.Pool = sync.Pool{ +var batchPool = sync.Pool{ New: func() interface{} { return new(sinkBatch) }, @@ -331,7 +335,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) { s.metrics.recordSinkIOInflightChange(int64(batch.numMessages)) return s.client.Flush(ctx, batch.payload) } - ioEmitter := newParallelIO(ctx, s.retryOpts, s.ioWorkers, ioHandler, s.metrics) + ioEmitter := NewParallelIO(ctx, s.retryOpts, s.ioWorkers, ioHandler, s.metrics, s.settings) defer ioEmitter.Close() // Flushing requires tracking the number of inflight messages and confirming @@ -339,11 +343,12 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) { inflight := 0 var sinkFlushWaiter chan struct{} - handleResult := func(result *ioResult) { - batch, _ := result.request.(*sinkBatch) + handleResult := func(result IOResult) { + req, err := result.Consume() + batch, _ := req.(*sinkBatch) - if result.err != nil { - s.handleError(result.err) + if err != nil { + s.handleError(err) } else { s.metrics.recordEmittedBatch( batch.bufferTime, batch.numMessages, batch.mvcc, batch.numKVBytes, sinkDoesNotCompress, @@ -352,12 +357,11 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) { inflight -= batch.numMessages - if (result.err != nil || inflight == 0) && sinkFlushWaiter != nil { + if (err != nil || inflight == 0) && sinkFlushWaiter != nil { close(sinkFlushWaiter) sinkFlushWaiter = nil } - freeIOResult(result) batch.alloc.Release(ctx) freeSinkBatchEvent(batch) } @@ -373,22 +377,41 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) { return err } - // Emitting needs to also handle any incoming results to avoid a deadlock - // with trying to emit while the emitter is blocked on returning a result. - for { + req, send, err := ioEmitter.AdmitRequest(ctx, batchBuffer) + if errors.Is(err, ErrNotEnoughQuota) { + // Quota can only be freed by consuming a result. select { case <-ctx.Done(): return ctx.Err() - case ioEmitter.requestCh <- batchBuffer: - case result := <-ioEmitter.resultCh: - handleResult(result) - continue case <-s.doneCh: + return nil + case result := <-ioEmitter.GetResult(): + handleResult(result) } - break + + // The request should be emitted after freeing quota since this is + // a single producer scenario. + req, send, err = ioEmitter.AdmitRequest(ctx, batchBuffer) + if errors.Is(err, ErrNotEnoughQuota) { + logcrash.ReportOrPanic(ctx, &s.settings.SV, "expected request to be emitted after waiting for quota") + return errors.AssertionFailedf("expected request to be emitted after waiting for quota") + } else if err != nil { + return err + } + } else if err != nil { + return err } - return nil + // The request was admitted, it must be sent. There are no concurrent requests being sent which + // would use up the quota. + select { + case <-ctx.Done(): + return ctx.Err() + case <-s.doneCh: + return nil + case send <- req: + return nil + } } flushAll := func() error { @@ -478,7 +501,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) { default: s.handleError(fmt.Errorf("received unknown request of unknown type: %v", r)) } - case result := <-ioEmitter.resultCh: + case result := <-ioEmitter.GetResult(): handleResult(result) case <-flushTimer.Ch(): flushTimer.MarkRead() @@ -505,6 +528,7 @@ func makeBatchingSink( pacerFactory func() *admission.Pacer, timeSource timeutil.TimeSource, metrics metricsRecorder, + settings *cluster.Settings, ) Sink { sink := &batchingSink{ client: client, @@ -515,6 +539,7 @@ func makeBatchingSink( retryOpts: retryOpts, ts: timeSource, metrics: metrics, + settings: settings, eventCh: make(chan interface{}, flushQueueDepth), wg: ctxgroup.WithContext(ctx), hasher: makeHasher(), diff --git a/pkg/ccl/changefeedccl/parallel_io.go b/pkg/ccl/changefeedccl/parallel_io.go index bc351bf35575..0961bdf4e4a6 100644 --- a/pkg/ccl/changefeedccl/parallel_io.go +++ b/pkg/ccl/changefeedccl/parallel_io.go @@ -13,13 +13,17 @@ import ( "sync" "time" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/intsets" + "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" ) -// parallelIO allows performing blocking "IOHandler" calls on in parallel. +// ParallelIO allows performing blocking "IOHandler" calls on in parallel. // IORequests implement a Keys() function returning keys on which ordering is // preserved. // Example: if the events [[a,b], [b,c], [c,d], [e,f]] are all submitted in that @@ -28,7 +32,7 @@ import ( // errored, [b,c] would never be sent, and an error would be returned with [c,d] // in an ioResult struct sent to resultCh. After sending an error to resultCh // all workers are torn down and no further requests are received or handled. -type parallelIO struct { +type ParallelIO struct { retryOpts retry.Options wg ctxgroup.Group metrics metricsRecorder @@ -36,8 +40,9 @@ type parallelIO struct { ioHandler IOHandler - requestCh chan IORequest - resultCh chan *ioResult // readers should freeIOResult after handling result events + quota *quotapool.IntPool + requestCh chan AdmittedIORequest + resultCh chan IOResult } // IORequest represents an abstract unit of IO that has a set of keys upon which @@ -47,56 +52,47 @@ type IORequest interface { NumMessages() int } -// ioResult stores the full request that was sent as well as an error if even -// after retries the IOHanlder was unable to succeed. -type ioResult struct { - request IORequest - err error - // Time representing when this result was received from the sink. - arrivalTime time.Time -} - var resultPool = sync.Pool{ New: func() interface{} { - return new(ioResult) + return new(ioRequest) }, } -func newIOResult(req IORequest, err error) *ioResult { - res := resultPool.Get().(*ioResult) - res.request = req - res.err = err - res.arrivalTime = timeutil.Now() +// newIORequest is used to allocate *ioRequest structs using a pool. +func newIORequest(req IORequest, a *quotapool.IntAlloc) *ioRequest { + res := resultPool.Get().(*ioRequest) + res.r = req + res.a = a return res } -func freeIOResult(e *ioResult) { - *e = ioResult{} - resultPool.Put(e) -} -type queuedRequest struct { - req IORequest - admitTime time.Time +// freeIORequest frees the ioRequest and returns it to the pool. +func freeIORequest(e *ioRequest) { + *e = ioRequest{} + resultPool.Put(e) } // IOHandler performs a blocking IO operation on an IORequest type IOHandler func(context.Context, IORequest) error -func newParallelIO( +// NewParallelIO creates a new ParallelIO. +func NewParallelIO( ctx context.Context, retryOpts retry.Options, numWorkers int, handler IOHandler, metrics metricsRecorder, -) *parallelIO { + settings *cluster.Settings, +) *ParallelIO { wg := ctxgroup.WithContext(ctx) - io := ¶llelIO{ + io := &ParallelIO{ retryOpts: retryOpts, wg: wg, metrics: metrics, ioHandler: handler, - requestCh: make(chan IORequest, numWorkers), - resultCh: make(chan *ioResult, numWorkers), + quota: quotapool.NewIntPool("changefeed-parallel-io", uint64(requestQuota.Get(&settings.SV))), + requestCh: make(chan AdmittedIORequest, numWorkers), + resultCh: make(chan IOResult, numWorkers), doneCh: make(chan struct{}), } @@ -109,7 +105,7 @@ func newParallelIO( // Close stops all workers immediately and returns once they shut down. Inflight // requests sent to requestCh may never result in being sent to resultCh. -func (p *parallelIO) Close() { +func (p *ParallelIO) Close() { close(p.doneCh) _ = p.wg.Wait() } @@ -123,6 +119,103 @@ var testingEnableQueuingDelay = func() func() { } } +// ioRequest is a wrapper around the IORequest that handles quota to limit the +// number of in-flight requests. +type ioRequest struct { + // Set upon initialization. + r IORequest + a *quotapool.IntAlloc + + // If the request conflicts with in-flight requests, this is the time at + // which the request is placed in the pending queue. + pendingQueueAdmitTime time.Time + + // err is the result of this request. resultTime is when it was obtained. + err error + resultTime time.Time +} + +// Consume implements IOResult. +func (r *ioRequest) Consume() (IORequest, error) { + result := r.r + err := r.err + + r.a.Release() + freeIORequest(r) + + return result, err +} + +// IOResult contains the original IORequest and its resultant error. +type IOResult interface { + // Consume returns the original request and result error. It removes the + // request from ParallelIO, freeing its resources and budget in the request + // quota. + Consume() (IORequest, error) +} + +// requestQuota is the number of requests which can be admitted into the +// parallelio system before blocking the producer. +var requestQuota = settings.RegisterIntSetting( + settings.ApplicationLevel, + "changefeed.parallel_io.request_quota", + "the number of requests which can be admitted into the parallelio"+ + " system before blocking the producer", + 128, + settings.PositiveInt, + settings.WithVisibility(settings.Reserved), +) + +// ErrNotEnoughQuota indicates that a request was not emitted due to a lack of +// quota. +var ErrNotEnoughQuota = quotapool.ErrNotEnoughQuota + +// AdmitRequest returns a AdmittedIORequest and a channel to send it on +// if there is quota available for the request. Otherwise, it returns an +// ErrNotEnoughQuota. +// +// Quota can be freed by calling GetResult() and Consume()ing the IOResult. +// +// TODO(jayants): This should use an `Acquire` instead of a `TryAcquire`, and +// the API should not use channels. The reasons things are done this way are: +// +// 1.The callers (batching sink) use one goroutine to both produce requests and +// consume results. If it blocks on producing, it will deadlock because that +// goroutine cannot free up quota by consuming. This is why the `TryAcquire` is +// used. The caller should use separate goroutines. One for consuming and one for producing. +// +// 2. Both the batching sink and ParallelIO use a `done` channel to close. They +// should use context cancellation. The `done` channel is problematic because `Acquire` +// cannot select on that channel, and ParallelIO cannot detect if the batching +// sink's channel has been closed. Using contexts and cancelling them fixes +// this problem. +func (p *ParallelIO) AdmitRequest( + ctx context.Context, r IORequest, +) (req AdmittedIORequest, send chan AdmittedIORequest, err error) { + a, err := p.quota.TryAcquire(ctx, 1) + + if errors.Is(err, quotapool.ErrNotEnoughQuota) { + return nil, nil, ErrNotEnoughQuota + } else if err != nil { + return nil, nil, err + } + + ra := newIORequest(r, a) + + return ra, p.requestCh, nil +} + +// AdmittedIORequest is an admitted IORequest. +type AdmittedIORequest interface{} + +var _ AdmittedIORequest = (*ioRequest)(nil) + +// GetResult returns a channel which can be waited upon to read the next +// IOResult. +func (p *ParallelIO) GetResult() chan IOResult { + return p.resultCh +} + // processIO starts numEmitWorkers worker threads to run the IOHandler on // non-conflicting IORequests each retrying according to the retryOpts, then: // - Reads incoming messages from requestCh, sending them to any worker if there @@ -147,8 +240,8 @@ var testingEnableQueuingDelay = func() func() { // // The conflict checking is done via an intset.Fast storing the union of all // keys currently being sent, followed by checking each pending batch's intset. -func (p *parallelIO) processIO(ctx context.Context, numEmitWorkers int) error { - emitWithRetries := func(ctx context.Context, payload IORequest) error { +func (p *ParallelIO) processIO(ctx context.Context, numEmitWorkers int) error { + emitWithRetries := func(ctx context.Context, r IORequest) error { if testQueuingDelay > 0*time.Second { select { case <-ctx.Done(): @@ -160,28 +253,29 @@ func (p *parallelIO) processIO(ctx context.Context, numEmitWorkers int) error { initialSend := true return retry.WithMaxAttempts(ctx, p.retryOpts, p.retryOpts.MaxRetries+1, func() error { if !initialSend { - p.metrics.recordInternalRetry(int64(payload.Keys().Len()), false) + p.metrics.recordInternalRetry(int64(r.Keys().Len()), false) } initialSend = false - return p.ioHandler(ctx, payload) + return p.ioHandler(ctx, r) }) } // Multiple worker routines handle the IO operations, retrying when necessary. - workerEmitCh := make(chan IORequest, numEmitWorkers) + workerEmitCh := make(chan *ioRequest, numEmitWorkers) defer close(workerEmitCh) - workerResultCh := make(chan *ioResult, numEmitWorkers) + workerResultCh := make(chan *ioRequest, numEmitWorkers) for i := 0; i < numEmitWorkers; i++ { p.wg.GoCtx(func(ctx context.Context) error { for req := range workerEmitCh { - result := newIOResult(req, emitWithRetries(ctx, req)) + req.err = emitWithRetries(ctx, req.r) + req.resultTime = timeutil.Now() select { case <-ctx.Done(): return ctx.Err() case <-p.doneCh: return nil - case workerResultCh <- result: + case workerResultCh <- req: } } return nil @@ -196,8 +290,8 @@ func (p *parallelIO) processIO(ctx context.Context, numEmitWorkers int) error { // submitIO -> handleResult -> submitIO -> handleResult chain which is complex // to manage. To avoid this, results are added to a pending list to be handled // separately. - var pendingResults []*ioResult - submitIO := func(req IORequest) error { + var pendingResults []*ioRequest + submitIO := func(req *ioRequest) error { for { select { case <-ctx.Done(): @@ -214,42 +308,42 @@ func (p *parallelIO) processIO(ctx context.Context, numEmitWorkers int) error { // The main routine keeps track of incoming and completed requests, where // admitted requests yet to be completed have their Keys() tracked in an - // intset, and any incoming request with keys already in the intset are placed + // intset, and any incoming ioRequest with keys already in the intset are placed // in a Queue to be sent to IO workers once the conflicting requests complete. var inflight intsets.Fast - var pending []queuedRequest + var pending []*ioRequest metricsRec := p.metrics.newParallelIOMetricsRecorder() - handleResult := func(res *ioResult) error { + handleResult := func(res *ioRequest) error { if res.err == nil { // Clear out the completed keys to check for newly valid pending requests. - requestKeys := res.request.Keys() + requestKeys := res.r.Keys() inflight.DifferenceWith(requestKeys) metricsRec.setInFlightKeys(int64(inflight.Len())) // Check for a pending request that is now able to be sent i.e. is not // conflicting with any inflight requests or any requests that arrived // earlier than itself in the pending queue. pendingKeys := intsets.Fast{} - for i, pendingReq := range pending { - if !inflight.Intersects(pendingReq.req.Keys()) && !pendingKeys.Intersects(pendingReq.req.Keys()) { - inflight.UnionWith(pendingReq.req.Keys()) + for i, req := range pending { + if !inflight.Intersects(req.r.Keys()) && !pendingKeys.Intersects(req.r.Keys()) { + inflight.UnionWith(req.r.Keys()) metricsRec.setInFlightKeys(int64(inflight.Len())) pending = append(pending[:i], pending[i+1:]...) - metricsRec.recordPendingQueuePop(int64(pendingReq.req.NumMessages()), timeutil.Since(pendingReq.admitTime)) - if err := submitIO(pendingReq.req); err != nil { + metricsRec.recordPendingQueuePop(int64(req.r.NumMessages()), timeutil.Since(req.pendingQueueAdmitTime)) + if err := submitIO(req); err != nil { return err } break } - pendingKeys.UnionWith(pendingReq.req.Keys()) + pendingKeys.UnionWith(req.r.Keys()) } } // Copy the arrival time for the metrics recorder below. // Otherwise, it would be possible for res to be admitted to the - // resultCh and freed before we read rec.arrivalTime. - arrivalTime := res.arrivalTime + // resultCh and freed before we read res.resultTime. + arrivalTime := res.resultTime select { case <-ctx.Done(): return ctx.Err() @@ -268,8 +362,8 @@ func (p *parallelIO) processIO(ctx context.Context, numEmitWorkers int) error { if inflight.Intersects(keys) { return true } - for _, pendingReq := range pending { - if pendingReq.req.Keys().Intersects(keys) { + for _, req := range pending { + if req.r.Keys().Intersects(keys) { return true } } @@ -287,14 +381,16 @@ func (p *parallelIO) processIO(ctx context.Context, numEmitWorkers int) error { } select { - case req := <-p.requestCh: - if hasConflictingKeys(req.Keys()) { + case admittedReq := <-p.requestCh: + req := admittedReq.(*ioRequest) + if hasConflictingKeys(req.r.Keys()) { // If a request conflicts with any currently unhandled requests, add it // to the pending queue to be rechecked for validity later. - pending = append(pending, queuedRequest{req: req, admitTime: timeutil.Now()}) - metricsRec.recordPendingQueuePush(int64(req.NumMessages())) + req.pendingQueueAdmitTime = timeutil.Now() + pending = append(pending, req) + metricsRec.recordPendingQueuePush(int64(req.r.NumMessages())) } else { - newInFlightKeys := req.Keys() + newInFlightKeys := req.r.Keys() inflight.UnionWith(newInFlightKeys) metricsRec.setInFlightKeys(int64(inflight.Len())) if err := submitIO(req); err != nil { diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index 34c86d862e4a..a4bf4f3b003b 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -244,7 +244,8 @@ func getSink( if WebhookV2Enabled.Get(&serverCfg.Settings.SV) { return validateOptionsAndMakeSink(changefeedbase.WebhookValidOptions, func() (Sink, error) { return makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, webhookOpts, - numSinkIOWorkers(serverCfg), newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{}, metricsBuilder) + numSinkIOWorkers(serverCfg), newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{}, + metricsBuilder, serverCfg.Settings) }) } else { return validateOptionsAndMakeSink(changefeedbase.WebhookValidOptions, func() (Sink, error) { @@ -258,8 +259,10 @@ func getSink( testingKnobs = knobs } if PubsubV2Enabled.Get(&serverCfg.Settings.SV) { - return makePubsubSink(ctx, u, encodingOpts, opts.GetPubsubConfigJSON(), AllTargets(feedCfg), opts.IsSet(changefeedbase.OptUnordered), - numSinkIOWorkers(serverCfg), newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{}, metricsBuilder, testingKnobs) + return makePubsubSink(ctx, u, encodingOpts, opts.GetPubsubConfigJSON(), AllTargets(feedCfg), + opts.IsSet(changefeedbase.OptUnordered), numSinkIOWorkers(serverCfg), + newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{}, + metricsBuilder, serverCfg.Settings, testingKnobs) } else { return makeDeprecatedPubsubSink(ctx, u, encodingOpts, AllTargets(feedCfg), opts.IsSet(changefeedbase.OptUnordered), metricsBuilder, testingKnobs) } diff --git a/pkg/ccl/changefeedccl/sink_pubsub_v2.go b/pkg/ccl/changefeedccl/sink_pubsub_v2.go index 48648807c70e..b15d94310c29 100644 --- a/pkg/ccl/changefeedccl/sink_pubsub_v2.go +++ b/pkg/ccl/changefeedccl/sink_pubsub_v2.go @@ -19,6 +19,7 @@ import ( pb "cloud.google.com/go/pubsub/apiv1/pubsubpb" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/cloud" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/json" @@ -420,6 +421,7 @@ func makePubsubSink( pacerFactory func() *admission.Pacer, source timeutil.TimeSource, mb metricsRecorderBuilder, + settings *cluster.Settings, knobs *TestingKnobs, ) (Sink, error) { batchCfg, retryOpts, err := getSinkConfigFromJson(jsonConfig, sinkJSONConfig{ @@ -463,5 +465,6 @@ func makePubsubSink( pacerFactory, source, mb(requiresResourceAccounting), + settings, ), nil } diff --git a/pkg/ccl/changefeedccl/sink_webhook_test.go b/pkg/ccl/changefeedccl/sink_webhook_test.go index f0f20ba3a37e..7f1bfa69622e 100644 --- a/pkg/ccl/changefeedccl/sink_webhook_test.go +++ b/pkg/ccl/changefeedccl/sink_webhook_test.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -85,7 +86,7 @@ func setupWebhookSinkWithDetails( if err != nil { return nil, err } - sinkSrc, err := makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, sinkOpts, parallelism, nilPacerFactory, source, nilMetricsRecorderBuilder) + sinkSrc, err := makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, sinkOpts, parallelism, nilPacerFactory, source, nilMetricsRecorderBuilder, cluster.MakeClusterSettings()) if err != nil { return nil, err } diff --git a/pkg/ccl/changefeedccl/sink_webhook_v2.go b/pkg/ccl/changefeedccl/sink_webhook_v2.go index b7fff6ddeced..d654d94e9f01 100644 --- a/pkg/ccl/changefeedccl/sink_webhook_v2.go +++ b/pkg/ccl/changefeedccl/sink_webhook_v2.go @@ -22,6 +22,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/httputil" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -349,6 +350,7 @@ func makeWebhookSink( pacerFactory func() *admission.Pacer, source timeutil.TimeSource, mb metricsRecorderBuilder, + settings *cluster.Settings, ) (Sink, error) { batchCfg, retryOpts, err := getSinkConfigFromJson(opts.JSONConfig, sinkJSONConfig{}) if err != nil { @@ -371,5 +373,6 @@ func makeWebhookSink( pacerFactory, source, mb(requiresResourceAccounting), + settings, ), nil }