From 39cc7dc2801adf9b8b921817d95b6994383e2afd Mon Sep 17 00:00:00 2001 From: Denis Bykhov Date: Fri, 4 Oct 2024 21:30:57 +0500 Subject: [PATCH] DatalakeService include retry for object upload (#6807) Signed-off-by: Denis Bykhov --- server/datalake/src/index.ts | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/server/datalake/src/index.ts b/server/datalake/src/index.ts index 2b3b48b4b24..071a063c503 100644 --- a/server/datalake/src/index.ts +++ b/server/datalake/src/index.ts @@ -127,8 +127,10 @@ export class DatalakeService implements StorageAdapter { size } - await ctx.with('put', {}, async () => { - return await this.client.putObject(ctx, workspaceId, objectName, stream, metadata) + await ctx.with('put', {}, async (ctx) => { + await withRetry(ctx, 5, async () => { + return await this.client.putObject(ctx, workspaceId, objectName, stream, metadata) + }) }) return { @@ -187,3 +189,25 @@ export function processConfigFromEnv (storageConfig: StorageConfiguration): stri storageConfig.storages.push(config) storageConfig.default = 'datalake' } + +async function withRetry ( + ctx: MeasureContext, + retries: number, + op: () => Promise, + delay: number = 100 +): Promise { + let error: any + while (retries > 0) { + retries-- + try { + return await op() + } catch (err: any) { + error = err + ctx.error('error', { err }) + if (retries !== 0) { + await new Promise((resolve) => setTimeout(resolve, delay)) + } + } + } + throw error +}