Skip to content

Commit

Permalink
Nullable limit and before parameters in pagination, more precise Posi…
Browse files Browse the repository at this point in the history
…tion type, multiple fixes (#484)

* Fix AuditLogGetRequestBuilder docs

* AuditLogGetRequestBuilder.limit is 50 by default

* Better types in pagination

* Always use service getters in RestEntitySupplier

* Default limit for getCurrentUserGuilds, see https://discord.com/developers/docs/resources/user#get-current-user-guilds

* Add checkLimitAndGetBatchSize function

* Add limitPagination function

* RestEntitySupplier.getMessagesAround did not use limit parameter

* Max limit/batchSize for RestEntitySupplier.guilds, see https://discord.com/developers/docs/resources/user#get-current-user-guilds

* Use more specific position types where possible

* Fixes for archived threads

* Lambdas for guilds idSelector

* Fix pagination for audit log

* Higher order function for limited pagination

* unexpected bigger page will not stop pagination

* batchSize as first param in pagination

* Make sure Position.Around can not be used in UserService.getCurrentUserGuilds

* Nullable limit and position parameters to use Discord's defaults

* Clearer names in pagination

* Cannot paginate nullable items

* Clarify no maxBatchSize documented

* Move map one more level in RestEntitySupplier.getJoinedPrivateArchivedThreads()

* idLt for consistency
  • Loading branch information
lukellmann authored Jan 18, 2022
1 parent 084f026 commit 8263ee5
Show file tree
Hide file tree
Showing 20 changed files with 408 additions and 354 deletions.
133 changes: 78 additions & 55 deletions core/src/main/kotlin/Util.kt
Original file line number Diff line number Diff line change
Expand Up @@ -120,26 +120,25 @@ internal suspend fun <T> Flow<T>.indexOfFirstOrNull(predicate: suspend (T) -> Bo
.singleOrNull()?.first
}

internal fun <C : Collection<T>, T> paginate(
internal fun <Batch : Collection<Item>, Item : Any, Direction : Position.BeforeOrAfter> paginate(
start: Snowflake,
batchSize: Int,
idSelector: (T) -> Snowflake,
itemSelector: (Collection<T>) -> T?,
directionSelector: (Snowflake) -> Position,
request: suspend (position: Position) -> C,
): Flow<T> = flow {
var position = directionSelector(start)
var size = batchSize
itemSelector: (Batch) -> Item?,
idSelector: (Item) -> Snowflake,
directionSelector: (Snowflake) -> Direction,
request: suspend (Direction) -> Batch,
): Flow<Item> = flow {

var direction = directionSelector(start)

while (true) {
val response = request(position)
for (item in response) emit(item)
val batch = request(direction)
for (item in batch) emit(item)

val id = itemSelector(response)?.let(idSelector) ?: break
position = directionSelector(id)
if (batch.size < batchSize) break

if (response.size < size) break
size = response.size
val item = itemSelector(batch) ?: break
direction = directionSelector(idSelector(item))
}
}

Expand Down Expand Up @@ -180,87 +179,111 @@ internal fun <T> oldestItem(idSelector: (T) -> Snowflake): (Collection<T>) -> T?
/**
* Selects the [Position.After] the youngest item in the batch.
*/
internal fun <C : Collection<T>, T> paginateForwards(
start: Snowflake = Snowflake("0"),
internal fun <T : Any> paginateForwards(
batchSize: Int,
start: Snowflake = Snowflake.min,
idSelector: (T) -> Snowflake,
request: suspend (position: Position) -> C
): Flow<T> =
paginate(start, batchSize, idSelector, youngestItem(idSelector), Position::After, request)
request: suspend (after: Position.After) -> Collection<T>,
): Flow<T> = paginate(
start,
batchSize,
itemSelector = youngestItem(idSelector),
idSelector,
directionSelector = Position::After,
request,
)

/**
* Selects the [Position.After] the youngest item in the batch.
*/
internal fun <C : Collection<T>, T : KordEntity> paginateForwards(
start: Snowflake = Snowflake("0"),
internal fun <T : KordEntity> paginateForwards(
batchSize: Int,
request: suspend (position: Position) -> C
): Flow<T> =
paginate(start, batchSize, { it.id }, youngestItem { it.id }, Position::After, request)
start: Snowflake = Snowflake.min,
request: suspend (after: Position.After) -> Collection<T>,
): Flow<T> = paginate(
start,
batchSize,
itemSelector = youngestItem { it.id },
idSelector = { it.id },
directionSelector = Position::After,
request,
)

/**
* Selects the [Position.Before] the oldest item in the batch.
*/
internal fun <C : Collection<T>, T> paginateBackwards(
start: Snowflake = Snowflake.max,
internal fun <T : Any> paginateBackwards(
batchSize: Int,
start: Snowflake = Snowflake.max,
idSelector: (T) -> Snowflake,
request: suspend (position: Position) -> C
): Flow<T> =
paginate(start, batchSize, idSelector, oldestItem(idSelector), Position::Before, request)
request: suspend (before: Position.Before) -> Collection<T>,
): Flow<T> = paginate(
start,
batchSize,
itemSelector = oldestItem(idSelector),
idSelector,
directionSelector = Position::Before,
request,
)

/**
* Selects the [Position.Before] the oldest item in the batch.
*/
internal fun <C : Collection<T>, T : KordEntity> paginateBackwards(
start: Snowflake = Snowflake.max,
internal fun <T : KordEntity> paginateBackwards(
batchSize: Int,
request: suspend (position: Position) -> C
): Flow<T> =
paginate(start, batchSize, { it.id }, oldestItem { it.id }, Position::Before, request)
start: Snowflake = Snowflake.max,
request: suspend (before: Position.Before) -> Collection<T>,
): Flow<T> = paginate(
start,
batchSize,
itemSelector = oldestItem { it.id },
idSelector = { it.id },
directionSelector = Position::Before,
request,
)

/**
* Paginates the [Collection] returned by [request] with [start] as a initial reference in time.
* Paginates the [Collection] returned by [request] with [start] as an initial reference in time.
* [instantSelector] is used to select the new reference to fetch from.
*
* Termination scenarios:
* * [Collection]'s size fall behind [batchSize].
* * [instantSelector] returns null.
*/
internal fun <C : Collection<T>, T> paginateByDate(
start: Instant = Clock.System.now(),
internal fun <Batch : Collection<Item>, Item : Any> paginateByDate(
batchSize: Int,
instantSelector: (Collection<T>) -> Instant?,
request: suspend (Instant) -> C
): Flow<T> = flow {
start: Instant?,
instantSelector: (Batch) -> Instant?,
request: suspend (Instant) -> Batch,
): Flow<Item> = flow {

var currentTimestamp = start ?: Clock.System.now() // get default current time as late as possible

var currentTimestamp = start
while (true) {
val response = request(currentTimestamp)
val batch = request(currentTimestamp)
for (item in batch) emit(item)

for (item in response) emit(item)
if (batch.size < batchSize) break

currentTimestamp = instantSelector(response) ?: break
if (response.size < batchSize) break
currentTimestamp = instantSelector(batch) ?: break
}
}

/**
* A special function to paginate [ThreadChannel] endpoints.
* selects the earliest time reference found in the response of the request on each pagination.
* selects the earliest reference in time found in the response of the request on each pagination.
* see [paginateByDate]
*/
internal fun paginateThreads(
batchSize: Int,
start: Instant = Clock.System.now(),
request: suspend (Instant) -> Collection<ThreadChannel>
) =
paginateByDate(
start,
batchSize,
{ threads -> threads.minOfOrNull { it.archiveTimestamp } },
request
)
start: Instant?,
request: suspend (Instant) -> Collection<ThreadChannel>,
) = paginateByDate(
batchSize,
start,
instantSelector = { threads -> threads.minOfOrNull { it.archiveTimestamp } },
request,
)

public inline fun <reified T : Event> Intents.IntentsBuilder.enableEvent(): Unit = enableEvent(T::class)

Expand Down
12 changes: 5 additions & 7 deletions core/src/main/kotlin/behavior/GuildBehavior.kt
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ public interface GuildBehavior : KordEntity, Strategizable {
/**
* Requests to get all present members in this guild.
*
* Unrestricted consumption of the returned [Flow] is a potentially performance intensive operation, it is thus recommended
* to limit the amount of messages requested by using [Flow.take], [Flow.takeWhile] or other functions that limit the amount
* of messages requested.
* Unrestricted consumption of the returned [Flow] is a potentially performance-intensive operation, it is thus
* recommended limiting the amount of messages requested by using [Flow.take], [Flow.takeWhile] or other functions
* that limit the amount of messages requested.
*
* ```kotlin
* guild.members.first { it.displayName == targetName }
Expand Down Expand Up @@ -288,7 +288,7 @@ public interface GuildBehavior : KordEntity, Strategizable {
supplier.getGuildApplicationCommandOrNull(kord.resources.applicationId, id, commandId)

/**
* Requests to get the this behavior as a [Guild].
* Requests to get this behavior as a [Guild].
*
* @throws [RequestException] if anything went wrong during the request.
* @throws [EntityNotFoundException] if the guild wasn't present.
Expand Down Expand Up @@ -350,11 +350,9 @@ public interface GuildBehavior : KordEntity, Strategizable {
*
* The returned flow is lazily executed, any [RequestException] will be thrown on
* [terminal operators](https://kotlinlang.org/docs/reference/coroutines/flow.html#terminal-flow-operators) instead.
*
* This function is not part of the officially documented Discord API and may be removed/altered/stop working in the future.
*/
@KordExperimental
public suspend fun getMembers(query: String, limit: Int = 1000): Flow<Member> = flow {
public fun getMembers(query: String, limit: Int = 1000): Flow<Member> = flow {
kord.rest.guild.getGuildMembers(id, query, limit).forEach {
emit(
Member(
Expand Down
39 changes: 21 additions & 18 deletions core/src/main/kotlin/behavior/channel/MessageChannelBehavior.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import kotlin.time.TimeMark
public interface MessageChannelBehavior : ChannelBehavior, Strategizable {

/**
* Requests to get the this behavior as a [MessageChannel].
* Requests to get this behavior as a [MessageChannel].
*
* @throws [RequestException] if something went wrong during the request.
* @throws [EntityNotFoundException] if the channel wasn't present.
Expand Down Expand Up @@ -74,9 +74,9 @@ public interface MessageChannelBehavior : ChannelBehavior, Strategizable {
* Requests to get all messages in this channel.
*
* Messages retrieved by this function will be emitted in chronological order (oldest -> newest).
* Unrestricted consumption of the returned [Flow] is a potentially performance intensive operation, it is thus recommended
* to limit the amount of messages requested by using [Flow.take], [Flow.takeWhile] or other functions that limit the amount
* of messages requested.
* Unrestricted consumption of the returned [Flow] is a potentially performance-intensive operation, it is thus
* recommended limiting the amount of messages requested by using [Flow.take], [Flow.takeWhile] or other functions
* that limit the amount of messages requested.
*
* ```kotlin
* channel.getMessagesBefore(newer.id).takeWhile { it.id > older.id }
Expand Down Expand Up @@ -127,11 +127,11 @@ public interface MessageChannelBehavior : ChannelBehavior, Strategizable {
* Messages retrieved by this function will be emitted in reverse-chronological older (newest -> oldest).
*
* The flow may use paginated requests to supply messages, [limit] will limit the maximum number of messages
* supplied and may optimize the batch size accordingly. A value of [Int.MAX_VALUE] means no limit.
* supplied and may optimize the batch size accordingly. `null` means no limit.
*
* Unrestricted consumption of the returned [Flow] is a potentially performance intensive operation, it is thus recommended
* to limit the amount of messages requested by using [Flow.take], [Flow.takeWhile] or other functions that limit the amount
* of messages requested.
* Unrestricted consumption of the returned [Flow] is a potentially performance-intensive operation, it is thus
* recommended limiting the amount of messages requested by using [Flow.take], [Flow.takeWhile] or other functions
* that limit the amount of messages requested.
*
* ```kotlin
* channel.getMessagesBefore(newer.id).takeWhile { it.id > older.id }
Expand All @@ -142,7 +142,7 @@ public interface MessageChannelBehavior : ChannelBehavior, Strategizable {
*
* @throws IllegalArgumentException if a [limit] < 1 was supplied.
*/
public fun getMessagesBefore(messageId: Snowflake, limit: Int = Int.MAX_VALUE): Flow<Message> =
public fun getMessagesBefore(messageId: Snowflake, limit: Int? = null): Flow<Message> =
supplier.getMessagesBefore(channelId = id, messageId = messageId, limit = limit)

/**
Expand All @@ -151,11 +151,11 @@ public interface MessageChannelBehavior : ChannelBehavior, Strategizable {
* Messages retrieved by this function will be emitted in chronological older (oldest -> newest).
*
* The flow may use paginated requests to supply messages, [limit] will limit the maximum number of messages
* supplied and may optimize the batch size accordingly. A value of [Int.MAX_VALUE] means no limit.
* supplied and may optimize the batch size accordingly. `null` means no limit.
*
* Unrestricted consumption of the returned [Flow] is a potentially performance intensive operation, it is thus recommended
* to limit the amount of messages requested by using [Flow.take], [Flow.takeWhile] or other functions that limit the amount
* of messages requested.
* Unrestricted consumption of the returned [Flow] is a potentially performance-intensive operation, it is thus
* recommended limiting the amount of messages requested by using [Flow.take], [Flow.takeWhile] or other functions
* that limit the amount of messages requested.
*
* ```kotlin
* channel.getMessagesAfter(older.id).takeWhile { it.id < newer.id }
Expand All @@ -165,27 +165,30 @@ public interface MessageChannelBehavior : ChannelBehavior, Strategizable {
*
* @throws IllegalArgumentException if a [limit] < 1 was supplied.
*/
public fun getMessagesAfter(messageId: Snowflake, limit: Int = Int.MAX_VALUE): Flow<Message> =
public fun getMessagesAfter(messageId: Snowflake, limit: Int? = null): Flow<Message> =
supplier.getMessagesAfter(channelId = id, messageId = messageId, limit = limit)

/**
* Requests to get messages around (both older and newer) the [messageId].
* Requests to get [Message]s around (both older and newer) the [messageId].
*
* Messages retrieved by this function will be emitted in chronological older (oldest -> newest).
*
* Unlike [getMessagesAfter] and [getMessagesBefore], this flow can return **a maximum of 100 messages**.
* As such, the accepted range of [limit] is reduced to 1..100.
*
* Supplied messages will be equally distributed before and after the [messageId].
* The remaining message for an odd [limit] is undefined and may appear on either side.
* Supplied messages will be equally distributed before and after the [messageId].
* The remaining message for an odd [limit] is undefined and may appear on either side or no side at all.
*
* If a message with the given [messageId] exists, the flow might also contain it, so it **could have one more
* element than the given [limit]**.
*
* The returned flow is lazily executed, any [RequestException] will be thrown on
* [terminal operators](https://kotlinlang.org/docs/reference/coroutines/flow.html#terminal-flow-operators) instead.
*
* @throws IllegalArgumentException if the [limit] is outside the range of 1..100.
*/
public fun getMessagesAround(messageId: Snowflake, limit: Int = 100): Flow<Message> =
supplier.getMessagesAround(channelId = id, messageId = messageId, limit = 100)
supplier.getMessagesAround(channelId = id, messageId = messageId, limit = limit)

/**
* Requests to get a message with the given [messageId].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public interface NewsChannelBehavior : ThreadParentChannelBehavior {
}


override fun getPublicArchivedThreads(before: Instant, limit: Int): Flow<NewsChannelThread> {
override fun getPublicArchivedThreads(before: Instant?, limit: Int?): Flow<NewsChannelThread> {
return super.getPublicArchivedThreads(before, limit).filterIsInstance()
}

Expand Down
8 changes: 4 additions & 4 deletions core/src/main/kotlin/behavior/channel/TextChannelBehavior.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public interface TextChannelBehavior : PrivateThreadParentChannelBehavior {
get() = super.activeThreads.filterIsInstance()

/**
* Requests to get the this behavior as a [TextChannel].
* Requests to get this behavior as a [TextChannel].
*
* @throws [RequestException] if anything went wrong during the request.
* @throws [EntityNotFoundException] if the channel wasn't present.
Expand Down Expand Up @@ -96,15 +96,15 @@ public interface TextChannelBehavior : PrivateThreadParentChannelBehavior {
return unsafeStartPublicThreadWithMessage(messageId, name, archiveDuration, reason) as TextChannelThread
}

override fun getPublicArchivedThreads(before: Instant, limit: Int): Flow<TextChannelThread> {
override fun getPublicArchivedThreads(before: Instant?, limit: Int?): Flow<TextChannelThread> {
return super.getPublicArchivedThreads(before, limit).filterIsInstance()
}

override fun getPrivateArchivedThreads(before: Instant, limit: Int): Flow<TextChannelThread> {
override fun getPrivateArchivedThreads(before: Instant?, limit: Int?): Flow<TextChannelThread> {
return super.getPrivateArchivedThreads(before, limit).filterIsInstance()
}

override fun getJoinedPrivateArchivedThreads(before: Snowflake, limit: Int): Flow<TextChannelThread> {
override fun getJoinedPrivateArchivedThreads(before: Snowflake?, limit: Int?): Flow<TextChannelThread> {
return super.getJoinedPrivateArchivedThreads(before, limit).filterIsInstance()
}

Expand Down
Loading

0 comments on commit 8263ee5

Please sign in to comment.