From e8d072ff709670684d265761a7500967d4fcdfed Mon Sep 17 00:00:00 2001 From: Simon Whitty Date: Sun, 20 Feb 2022 15:24:55 +1100 Subject: [PATCH] ChunkedAsyncSequence --- Sources/ChunkedAsyncSequence.swift | 56 ++++++++++++++++++++++++++++++ Sources/HTTPConnection.swift | 4 +-- Sources/HTTPRequestDecoder.swift | 13 +++---- Sources/Socket/AsyncSocket.swift | 35 +++++++++++++++++-- Sources/Socket/Socket.swift | 19 ++++++++++ Tests/ConsumingAsyncSequence.swift | 4 ++- 6 files changed, 120 insertions(+), 11 deletions(-) create mode 100644 Sources/ChunkedAsyncSequence.swift diff --git a/Sources/ChunkedAsyncSequence.swift b/Sources/ChunkedAsyncSequence.swift new file mode 100644 index 00000000..3bd62a57 --- /dev/null +++ b/Sources/ChunkedAsyncSequence.swift @@ -0,0 +1,56 @@ +// +// ChunkedAsyncSequence.swift +// FlyingFox +// +// Created by Simon Whitty on 20/02/2022. +// Copyright © 2022 Simon Whitty. All rights reserved. +// +// Distributed under the permissive MIT license +// Get the latest version from here: +// +// https://github.com/swhitty/FlyingFox +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. +// + +protocol ChuckedAsyncSequence: AsyncSequence { + + /// Retrieve next n elements can be requested from an AsyncSequence. + /// - Parameter count: Number of elements to be read from the sequence + /// - Returns: Returns all the requested number or elements or nil when the + /// sequence ends before all of requested elements are retrieved. + func next(count: Int) async throws -> [Element]? +} + +extension ChuckedAsyncSequence { + + /// Default implementation that does not read elements in chunks, but slowly + /// reads the chunk one element at a time. + func next(count: Int) async throws -> [Element]? { + var buffer = [Element]() + var iterator = makeAsyncIterator() + + while buffer.count < count, + let element = try await iterator.next() { + buffer.append(element) + } + + return buffer.count == count ? buffer : nil + } +} diff --git a/Sources/HTTPConnection.swift b/Sources/HTTPConnection.swift index b5780988..7abb2c9b 100644 --- a/Sources/HTTPConnection.swift +++ b/Sources/HTTPConnection.swift @@ -42,7 +42,7 @@ struct HTTPConnection { } // some AsyncSequence - var requests: HTTPRequestSequence> { + var requests: HTTPRequestSequence { HTTPRequestSequence(bytes: socket.bytes) } @@ -55,7 +55,7 @@ struct HTTPConnection { } } -struct HTTPRequestSequence: AsyncSequence, AsyncIteratorProtocol where S.Element == UInt8 { +struct HTTPRequestSequence: AsyncSequence, AsyncIteratorProtocol where S.Element == UInt8 { typealias Element = HTTPRequest private let bytes: S diff --git a/Sources/HTTPRequestDecoder.swift b/Sources/HTTPRequestDecoder.swift index 1a24bbe8..d930cc7b 100644 --- a/Sources/HTTPRequestDecoder.swift +++ b/Sources/HTTPRequestDecoder.swift @@ -33,7 +33,7 @@ import Foundation struct HTTPRequestDecoder { - static func decodeRequest(from bytes: S) async throws -> HTTPRequest where S: AsyncSequence, S.Element == UInt8 { + static func decodeRequest(from bytes: S) async throws -> HTTPRequest where S: ChuckedAsyncSequence, S.Element == UInt8 { let status = try await bytes.takeLine() let comps = status .trimmingCharacters(in: .whitespacesAndNewlines) @@ -82,15 +82,16 @@ struct HTTPRequestDecoder { return (HTTPHeader(name), value) } - static func readBody(from bytes: S, length: String?) async throws -> Data where S.Element == UInt8 { + static func readBody(from bytes: S, length: String?) async throws -> Data where S.Element == UInt8 { guard let length = length.flatMap(Int.init) else { return Data() } - return try await bytes - .collectUntil { $0.count == length } - .map { Data($0) } - .first() + guard let buffer = try await bytes.next(count: length) else { + throw Error("ChuckedAsyncSequence prematurely ended") + } + + return Data(buffer) } } diff --git a/Sources/Socket/AsyncSocket.swift b/Sources/Socket/AsyncSocket.swift index 48d96775..87fbc9c7 100644 --- a/Sources/Socket/AsyncSocket.swift +++ b/Sources/Socket/AsyncSocket.swift @@ -73,6 +73,21 @@ struct AsyncSocket: Sendable { } while true } + func read(bytes: Int) async throws -> [UInt8] { + var buffer = [UInt8]() + while buffer.count < bytes { + let toRead = min(bytes - buffer.count, 8192) + do { + try buffer.append(contentsOf: socket.read(atMost: toRead)) + } catch SocketError.blocked { + try await pool.suspend(untilReady: socket) + } catch { + throw error + } + } + return buffer + } + func write(_ data: Data) async throws { var sent = data.startIndex while sent < data.endIndex { @@ -104,11 +119,27 @@ struct AsyncSocket: Sendable { } while true } - var bytes: ClosureSequence { - ClosureSequence(closure: read) + var bytes: ByteSequence { + ByteSequence(socket: self) } var sockets: ClosureSequence { ClosureSequence(closure: accept) } } + +struct ByteSequence: ChuckedAsyncSequence, AsyncIteratorProtocol { + typealias Element = UInt8 + + let socket: AsyncSocket + + func makeAsyncIterator() -> ByteSequence { self } + + mutating func next() async throws -> UInt8? { + try await socket.read() + } + + func next(count: Int) async throws -> [UInt8]? { + try await socket.read(bytes: count) + } +} diff --git a/Sources/Socket/Socket.swift b/Sources/Socket/Socket.swift index 5375414b..31ed95c7 100644 --- a/Sources/Socket/Socket.swift +++ b/Sources/Socket/Socket.swift @@ -135,6 +135,25 @@ struct Socket: Sendable, Hashable { } } + func read(atMost length: Int) throws -> [UInt8] { + try [UInt8](unsafeUninitializedCapacity: length) { buffer, count in + count = try read(into: &buffer, length: length) + } + } + + private func read(into buffer: inout UnsafeMutableBufferPointer, length: Int) throws -> Int { + let count = Socket.read(file, buffer.baseAddress, length) + if count == 0 { + throw SocketError.disconnected + } else if count > 0 { + return count + } else if errno == EWOULDBLOCK { + throw SocketError.blocked + } else { + throw SocketError.makeFailed("Read") + } + } + func write(_ data: Data, from index: Data.Index) throws -> Data.Index { guard index < data.endIndex else { return data.endIndex } return try data.withUnsafeBytes { diff --git a/Tests/ConsumingAsyncSequence.swift b/Tests/ConsumingAsyncSequence.swift index 7d6fcad2..2013e4f8 100644 --- a/Tests/ConsumingAsyncSequence.swift +++ b/Tests/ConsumingAsyncSequence.swift @@ -29,7 +29,9 @@ // SOFTWARE. // -final class ConsumingAsyncSequence: AsyncSequence, AsyncIteratorProtocol { +@testable import FlyingFox + +final class ConsumingAsyncSequence: ChuckedAsyncSequence, AsyncIteratorProtocol { private var iterator: AnySequence.Iterator