diff --git a/ext/native/base/buffer.cpp b/ext/native/base/buffer.cpp index 68cc412dd3e0..2c28a09f7b0f 100644 --- a/ext/native/base/buffer.cpp +++ b/ext/native/base/buffer.cpp @@ -152,11 +152,22 @@ bool Buffer::FlushToFile(const char *filename) { return true; } -bool Buffer::FlushSocket(uintptr_t sock, double timeout) { +bool Buffer::FlushSocket(uintptr_t sock, double timeout, bool *cancelled) { + static constexpr float CANCEL_INTERVAL = 0.25f; for (size_t pos = 0, end = data_.size(); pos < end; ) { - if (timeout >= 0.0 && !fd_util::WaitUntilReady(sock, timeout, true)) { - ELOG("FlushSocket timed out"); - return false; + bool ready = false; + double leftTimeout = timeout; + while (!ready && (leftTimeout >= 0 || cancelled)) { + if (cancelled && *cancelled) + return false; + ready = fd_util::WaitUntilReady(sock, CANCEL_INTERVAL, true); + if (!ready && leftTimeout >= 0.0) { + leftTimeout -= CANCEL_INTERVAL; + if (leftTimeout < 0) { + ELOG("FlushSocket timed out"); + return false; + } + } } int sent = send(sock, &data_[pos], (int)(end - pos), MSG_NOSIGNAL); if (sent < 0) { @@ -199,6 +210,7 @@ bool Buffer::ReadAll(int fd, int hintSize) { } bool Buffer::ReadAllWithProgress(int fd, int knownSize, float *progress, bool *cancelled) { + static constexpr float CANCEL_INTERVAL = 0.25f; std::vector buf; if (knownSize >= 65536 * 16) { buf.resize(65536); @@ -210,8 +222,12 @@ bool Buffer::ReadAllWithProgress(int fd, int knownSize, float *progress, bool *c int total = 0; while (true) { - if (cancelled && *cancelled) - return false; + bool ready = false; + while (!ready && cancelled) { + if (*cancelled) + return false; + ready = fd_util::WaitUntilReady(fd, CANCEL_INTERVAL, false); + } int retval = recv(fd, &buf[0], (int)buf.size(), 0); if (retval == 0) { return true; @@ -222,7 +238,8 @@ bool Buffer::ReadAllWithProgress(int fd, int knownSize, float *progress, bool *c char *p = Append((size_t)retval); memcpy(p, &buf[0], retval); total += retval; - *progress = (float)total / (float)knownSize; + if (progress) + *progress = (float)total / (float)knownSize; } return true; } diff --git a/ext/native/base/buffer.h b/ext/native/base/buffer.h index b34901fb1716..87018a399981 100644 --- a/ext/native/base/buffer.h +++ b/ext/native/base/buffer.h @@ -64,7 +64,7 @@ class Buffer { // written. bool Flush(int fd); bool FlushToFile(const char *filename); - bool FlushSocket(uintptr_t sock, double timeout = -1.0); // Windows portability + bool FlushSocket(uintptr_t sock, double timeout = -1.0, bool *cancelled = nullptr); // Windows portability bool ReadAll(int fd, int hintSize = 0); bool ReadAllWithProgress(int fd, int knownSize, float *progress, bool *cancelled); diff --git a/ext/native/net/http_client.cpp b/ext/native/net/http_client.cpp index ca2fb0f36c1e..29641aff7754 100644 --- a/ext/native/net/http_client.cpp +++ b/ext/native/net/http_client.cpp @@ -229,13 +229,13 @@ int Client::GET(const char *resource, Buffer *output, std::vector & const char *otherHeaders = "Accept: */*\r\n" "Accept-Encoding: gzip\r\n"; - int err = SendRequest("GET", resource, otherHeaders, progress); + int err = SendRequest("GET", resource, otherHeaders, progress, cancelled); if (err < 0) { return err; } Buffer readbuf; - int code = ReadResponseHeaders(&readbuf, responseHeaders, progress); + int code = ReadResponseHeaders(&readbuf, responseHeaders, progress, cancelled); if (code < 0) { return code; } @@ -283,11 +283,11 @@ int Client::POST(const char *resource, const std::string &data, Buffer *output, return POST(resource, data, "", output, progress); } -int Client::SendRequest(const char *method, const char *resource, const char *otherHeaders, float *progress) { - return SendRequestWithData(method, resource, "", otherHeaders, progress); +int Client::SendRequest(const char *method, const char *resource, const char *otherHeaders, float *progress, bool *cancelled) { + return SendRequestWithData(method, resource, "", otherHeaders, progress, cancelled); } -int Client::SendRequestWithData(const char *method, const char *resource, const std::string &data, const char *otherHeaders, float *progress) { +int Client::SendRequestWithData(const char *method, const char *resource, const std::string &data, const char *otherHeaders, float *progress, bool *cancelled) { if (progress) { *progress = 0.01f; } @@ -314,12 +314,24 @@ int Client::SendRequestWithData(const char *method, const char *resource, const return 0; } -int Client::ReadResponseHeaders(Buffer *readbuf, std::vector &responseHeaders, float *progress) { +int Client::ReadResponseHeaders(Buffer *readbuf, std::vector &responseHeaders, float *progress, bool *cancelled) { // Snarf all the data we can into RAM. A little unsafe but hey. - if (dataTimeout_ >= 0.0 && !fd_util::WaitUntilReady(sock(), dataTimeout_, false)) { - ELOG("HTTP headers timed out"); - return -1; - } + static constexpr float CANCEL_INTERVAL = 0.25f; + bool ready = false; + double leftTimeout = dataTimeout_; + while (!ready) { + if (cancelled && *cancelled) + return -1; + ready = fd_util::WaitUntilReady(sock(), CANCEL_INTERVAL, false); + if (!ready && leftTimeout >= 0.0) { + leftTimeout -= CANCEL_INTERVAL; + if (leftTimeout < 0) { + ELOG("HTTP headers timed out"); + return -1; + } + } + }; + // Let's hope all the headers are available in a single packet... if (readbuf->Read(sock(), 4096) < 0) { ELOG("Failed to read HTTP headers :("); return -1; @@ -396,7 +408,7 @@ int Client::ReadResponseEntity(Buffer *readbuf, const std::vector & if (!contentLength || !progress) { // No way to know how far along we are. Let's just not update the progress counter. - if (!readbuf->ReadAll(sock(), contentLength)) + if (!readbuf->ReadAllWithProgress(sock(), contentLength, nullptr, cancelled)) return -1; } else { // Let's read in chunks, updating progress between each. diff --git a/ext/native/net/http_client.h b/ext/native/net/http_client.h index 694cad33edfc..9c935858face 100644 --- a/ext/native/net/http_client.h +++ b/ext/native/net/http_client.h @@ -71,9 +71,9 @@ class Client : public net::Connection { // HEAD, PUT, DELETE aren't implemented yet, but can be done with SendRequest. - int SendRequest(const char *method, const char *resource, const char *otherHeaders = nullptr, float *progress = nullptr); - int SendRequestWithData(const char *method, const char *resource, const std::string &data, const char *otherHeaders = nullptr, float *progress = nullptr); - int ReadResponseHeaders(Buffer *readbuf, std::vector &responseHeaders, float *progress = nullptr); + int SendRequest(const char *method, const char *resource, const char *otherHeaders = nullptr, float *progress = nullptr, bool *cancelled = nullptr); + int SendRequestWithData(const char *method, const char *resource, const std::string &data, const char *otherHeaders = nullptr, float *progress = nullptr, bool *cancelled = nullptr); + int ReadResponseHeaders(Buffer *readbuf, std::vector &responseHeaders, float *progress = nullptr, bool *cancelled = nullptr); // If your response contains a response, you must read it. int ReadResponseEntity(Buffer *readbuf, const std::vector &responseHeaders, Buffer *output, float *progress = nullptr, bool *cancelled = nullptr);