Skip to content

Commit

Permalink
Download queued shards only if there is space for them all
Browse files Browse the repository at this point in the history
  • Loading branch information
undertome committed Oct 29, 2020
1 parent 0b4e34b commit 453894b
Show file tree
Hide file tree
Showing 10 changed files with 444 additions and 117 deletions.
51 changes: 28 additions & 23 deletions src/ripple/net/HTTPStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,14 @@ class HTTPStream

virtual ~HTTPStream() = default;

template <class T>
static std::unique_ptr<HTTPStream>
makeUnique(
Config const& config,
boost::asio::io_service::strand& strand,
beast::Journal j)
{
return std::make_unique<T>(config, strand, j);
}

[[nodiscard]] virtual boost::asio::ip::tcp::socket&
getStream() = 0;

[[nodiscard]] virtual bool
connect(
std::string& errorOut,
std::string const host,
std::string const port,
std::string const& host,
std::string const& port,
boost::asio::yield_context& yield) = 0;

virtual void
Expand All @@ -68,7 +58,13 @@ class HTTPStream
asyncRead(
boost::beast::flat_buffer& buf,
parser& p,
bool readSome,
boost::asio::yield_context& yield,
boost::system::error_code& ec) = 0;

virtual void
asyncReadSome(
boost::beast::flat_buffer& buf,
parser& p,
boost::asio::yield_context& yield,
boost::system::error_code& ec) = 0;
};
Expand All @@ -89,8 +85,8 @@ class SSLStream : public HTTPStream
bool
connect(
std::string& errorOut,
std::string const host,
std::string const port,
std::string const& host,
std::string const& port,
boost::asio::yield_context& yield) override;

void
Expand All @@ -103,7 +99,13 @@ class SSLStream : public HTTPStream
asyncRead(
boost::beast::flat_buffer& buf,
parser& p,
bool readSome,
boost::asio::yield_context& yield,
boost::system::error_code& ec) override;

void
asyncReadSome(
boost::beast::flat_buffer& buf,
parser& p,
boost::asio::yield_context& yield,
boost::system::error_code& ec) override;

Expand All @@ -117,10 +119,7 @@ class SSLStream : public HTTPStream
class RawStream : public HTTPStream
{
public:
RawStream(
Config const& config,
boost::asio::io_service::strand& strand,
beast::Journal j);
RawStream(boost::asio::io_service::strand& strand);

virtual ~RawStream() = default;

Expand All @@ -130,8 +129,8 @@ class RawStream : public HTTPStream
bool
connect(
std::string& errorOut,
std::string const host,
std::string const port,
std::string const& host,
std::string const& port,
boost::asio::yield_context& yield) override;

void
Expand All @@ -144,7 +143,13 @@ class RawStream : public HTTPStream
asyncRead(
boost::beast::flat_buffer& buf,
parser& p,
bool readSome,
boost::asio::yield_context& yield,
boost::system::error_code& ec) override;

void
asyncReadSome(
boost::beast::flat_buffer& buf,
parser& p,
boost::asio::yield_context& yield,
boost::system::error_code& ec) override;

Expand Down
16 changes: 9 additions & 7 deletions src/ripple/net/impl/HTTPDownloader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,10 @@ HTTPDownloader::do_session(
//////////////////////////////////////////////
// Prepare for download and establish the
// connection:
std::uint64_t const rangeStart = size(p);

stream_ = ssl ? HTTPStream::makeUnique<SSLStream>(config_, strand_, j_)
: HTTPStream::makeUnique<RawStream>(config_, strand_, j_);
if (ssl)
stream_ = std::make_unique<SSLStream>(config_, strand_, j_);
else
stream_ = std::make_unique<RawStream>(strand_);

std::string error;
if (!stream_->connect(error, host, port, yield))
Expand All @@ -166,6 +166,8 @@ HTTPDownloader::do_session(
req.set(http::field::host, host);
req.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);

std::uint64_t const rangeStart = size(p);

// Requesting a portion of the file
if (rangeStart)
{
Expand All @@ -182,7 +184,7 @@ HTTPDownloader::do_session(
// Read the response
http::response_parser<http::empty_body> connectParser;
connectParser.skip(true);
stream_->asyncRead(read_buf_, connectParser, false, yield, ec);
stream_->asyncRead(read_buf_, connectParser, yield, ec);
if (ec)
return failAndExit("async_read", p);

Expand All @@ -198,7 +200,7 @@ HTTPDownloader::do_session(
http::response_parser<http::empty_body> rangeParser;
rangeParser.skip(true);

stream_->asyncRead(read_buf_, rangeParser, false, yield, ec);
stream_->asyncRead(read_buf_, rangeParser, yield, ec);
if (ec)
return failAndExit("async_read_range_verify", p);

Expand Down Expand Up @@ -268,7 +270,7 @@ HTTPDownloader::do_session(
return exit();
}

stream_->asyncRead(read_buf_, *p, true, yield, ec);
stream_->asyncReadSome(read_buf_, *p, yield, ec);
}

JLOG(j_.trace()) << "download completed: " << dstPath.string();
Expand Down
62 changes: 37 additions & 25 deletions src/ripple/net/impl/HTTPStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,19 @@ SSLStream::getStream()
bool
SSLStream::connect(
std::string& errorOut,
std::string const host,
std::string const port,
std::string const& host,
std::string const& port,
boost::asio::yield_context& yield)
{
using namespace boost::asio;
using namespace boost::beast;

boost::system::error_code ec;

auto fail = [&errorOut](std::string const& errorIn) {
errorOut = errorIn;
auto fail = [&errorOut, &ec](
std::string const& errorIn,
std::string const& message = "") {
errorOut = errorIn + ": " + (message.empty() ? ec.message() : message);
return false;
};

Expand All @@ -65,7 +67,7 @@ SSLStream::connect(
}
catch (std::exception const& e)
{
return fail(std::string("exception: ") + e.what());
return fail("exception", e.what());
}

ec = ssl_ctx_.preConnectVerify(*stream_, host);
Expand Down Expand Up @@ -101,21 +103,23 @@ void
SSLStream::asyncRead(
boost::beast::flat_buffer& buf,
parser& p,
bool readSome,
boost::asio::yield_context& yield,
boost::system::error_code& ec)
{
if (readSome)
boost::beast::http::async_read_some(*stream_, buf, p, yield[ec]);
else
boost::beast::http::async_read(*stream_, buf, p, yield[ec]);
boost::beast::http::async_read(*stream_, buf, p, yield[ec]);
}

RawStream::RawStream(
Config const& config,
boost::asio::io_service::strand& strand,
beast::Journal j)
: strand_(strand)
void
SSLStream::asyncReadSome(
boost::beast::flat_buffer& buf,
parser& p,
boost::asio::yield_context& yield,
boost::system::error_code& ec)
{
boost::beast::http::async_read_some(*stream_, buf, p, yield[ec]);
}

RawStream::RawStream(boost::asio::io_service::strand& strand) : strand_(strand)
{
}

Expand All @@ -129,17 +133,19 @@ RawStream::getStream()
bool
RawStream::connect(
std::string& errorOut,
std::string const host,
std::string const port,
std::string const& host,
std::string const& port,
boost::asio::yield_context& yield)
{
using namespace boost::asio;
using namespace boost::beast;

boost::system::error_code ec;

auto fail = [&errorOut](std::string const& errorIn) {
errorOut = errorIn;
auto fail = [&errorOut, &ec](
std::string const& errorIn,
std::string const& message = "") {
errorOut = errorIn + ": " + (message.empty() ? ec.message() : message);
return false;
};

Expand All @@ -154,7 +160,7 @@ RawStream::connect(
}
catch (std::exception const& e)
{
return fail(std::string("exception: ") + e.what());
return fail("exception", e.what());
}

boost::asio::async_connect(
Expand All @@ -178,14 +184,20 @@ void
RawStream::asyncRead(
boost::beast::flat_buffer& buf,
parser& p,
bool readSome,
boost::asio::yield_context& yield,
boost::system::error_code& ec)
{
if (readSome)
boost::beast::http::async_read_some(*stream_, buf, p, yield[ec]);
else
boost::beast::http::async_read(*stream_, buf, p, yield[ec]);
boost::beast::http::async_read(*stream_, buf, p, yield[ec]);
}

void
RawStream::asyncReadSome(
boost::beast::flat_buffer& buf,
parser& p,
boost::asio::yield_context& yield,
boost::system::error_code& ec)
{
boost::beast::http::async_read_some(*stream_, buf, p, yield[ec]);
}

} // namespace ripple
8 changes: 4 additions & 4 deletions src/ripple/nodestore/DatabaseShard.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ class DatabaseShard : public Database
virtual boost::optional<std::uint32_t>
prepareLedger(std::uint32_t validLedgerSeq) = 0;

/** Prepare a shard index to be imported into the database
/** Prepare one or more shard indexes to be imported into the database
@param shardIndex Shard index to be prepared for import
@return true if shard index successfully prepared for import
@param shardIndexes Shard indexes to be prepared for import
@return true if all shard indexes successfully prepared for import
*/
virtual bool
prepareShard(std::uint32_t shardIndex) = 0;
prepareShards(std::vector<std::uint32_t> const& shardIndexes) = 0;

/** Remove a previously prepared shard index for import
Expand Down
Loading

0 comments on commit 453894b

Please sign in to comment.