From 5c55dba2a2eef34026ce41dc303f218b3ac12b80 Mon Sep 17 00:00:00 2001 From: Ashley Williams Date: Mon, 30 Dec 2024 16:22:32 +0000 Subject: [PATCH 1/2] fix: logging fixes --- src/modules/tools/tools.resolver.ts | 15 +++--- src/modules/tools/tools.service.ts | 54 +++++++++---------- .../utils/registerAllWebhooksForRetailer.ts | 3 ++ .../tools/utils/registerWebhookForRetailer.ts | 2 + .../product-groups/process-product-groups.ts | 24 ++++----- .../product-groups/request-product-groups.ts | 6 +-- .../product/process-products.ts | 12 ++--- .../product/request-products.ts | 14 ++--- src/trigger/reuseables/noble_pollfills.ts | 8 +-- src/trigger/scheduled/safety_sync.ts | 6 +-- 10 files changed, 78 insertions(+), 66 deletions(-) diff --git a/src/modules/tools/tools.resolver.ts b/src/modules/tools/tools.resolver.ts index 4d91b30..e273141 100644 --- a/src/modules/tools/tools.resolver.ts +++ b/src/modules/tools/tools.resolver.ts @@ -1,3 +1,4 @@ +import { Logger } from '@nestjs/common'; import { Args, Mutation, Query, Resolver } from '@nestjs/graphql'; import { GraphQLBoolean, GraphQLInt } from 'graphql'; import { GraphQLString } from 'graphql/type'; @@ -10,6 +11,8 @@ import { ToolsService } from './tools.service'; @Resolver() export class ToolsResolver { + private readonly logger = new Logger('ToolsResolver'); + constructor( private readonly toolsService: ToolsService, private readonly retailerService: RetailerService, @@ -88,15 +91,15 @@ export class ToolsResolver { if (partial) { await ProductJobUtils.scheduleTriggerJob(retailer, false, 10, 'forceViaGql', { - info: (s, o) => console.info(s, o), - warn: (s, o) => console.warn(s, o), - error: (s, o) => console.error(s, o), + info: (logMessage: string, ...args: any[]) => this.logger.log(logMessage, ...args), + warn: (logMessage: string, ...args: any[]) => this.logger.warn(logMessage, ...args), + error: (logMessage: string, ...args: any[]) => this.logger.error(logMessage, ...args), }); } else { await ProductJobUtils.scheduleTriggerJob(retailer, true, undefined, 'forceViaGql', { - info: (s, o) => console.info(s, o), - warn: (s, o) => console.warn(s, o), - error: (s, o) => console.error(s, o), + info: (logMessage: string, ...args: any[]) => this.logger.log(logMessage, ...args), + warn: (logMessage: string, ...args: any[]) => this.logger.warn(logMessage, ...args), + error: (logMessage: string, ...args: any[]) => this.logger.error(logMessage, ...args), }); } return 'Scheduled a sync'; diff --git a/src/modules/tools/tools.service.ts b/src/modules/tools/tools.service.ts index ca7a4a1..0734e4c 100644 --- a/src/modules/tools/tools.service.ts +++ b/src/modules/tools/tools.service.ts @@ -32,25 +32,25 @@ export class ToolsService { async getWebhooks(retailer: RetailerEntity) { return getWebhooks(retailer, { - info: this.logger.log, - error: this.logger.error, - warn: this.logger.warn, + info: (logMessage: string, ...args: any[]) => this.logger.log(logMessage, ...args), + warn: (logMessage: string, ...args: any[]) => this.logger.warn(logMessage, ...args), + error: (logMessage: string, ...args: any[]) => this.logger.error(logMessage, ...args), }); } async registerAllWebhooksForRetailer(retailer: RetailerEntity) { return registerAllWebhooksForRetailer(retailer, process.env.HOST!, { - info: this.logger.log, - error: this.logger.error, - warn: this.logger.warn, + info: (logMessage: string, ...args: any[]) => this.logger.log(logMessage, ...args), + warn: (logMessage: string, ...args: any[]) => this.logger.warn(logMessage, ...args), + error: (logMessage: string, ...args: any[]) => this.logger.error(logMessage, ...args), }); } async registerWebhookForRetailer(retailer: RetailerEntity, topic: WebhookSubscriptionTopic, url: string) { return registerWebhookForRetailer(retailer, topic, url, { - info: this.logger.log, - error: this.logger.error, - warn: this.logger.warn, + info: (logMessage: string, ...args: any[]) => this.logger.log(logMessage, ...args), + warn: (logMessage: string, ...args: any[]) => this.logger.warn(logMessage, ...args), + error: (logMessage: string, ...args: any[]) => this.logger.error(logMessage, ...args), }); } @@ -67,26 +67,26 @@ export class ToolsService { from, limit, { - info: this.logger.log, - error: this.logger.error, - warn: this.logger.warn, + info: (logMessage: string, ...args: any[]) => this.logger.log(logMessage, ...args), + warn: (logMessage: string, ...args: any[]) => this.logger.warn(logMessage, ...args), + error: (logMessage: string, ...args: any[]) => this.logger.error(logMessage, ...args), }, ); } async deleteWebhookForStore(retailer: RetailerEntity, webhookId: string) { return deleteWebhookForStore(retailer, webhookId, { - info: this.logger.log, - error: this.logger.error, - warn: this.logger.warn, + info: (logMessage: string, ...args: any[]) => this.logger.log(logMessage, ...args), + warn: (logMessage: string, ...args: any[]) => this.logger.warn(logMessage, ...args), + error: (logMessage: string, ...args: any[]) => this.logger.error(logMessage, ...args), }); } async deleteAllWebhooksForRetailer(retailer: RetailerEntity) { return deleteAllWebhooksForRetailer(retailer, { - info: this.logger.log, - error: this.logger.error, - warn: this.logger.warn, + info: (logMessage: string, ...args: any[]) => this.logger.log(logMessage, ...args), + warn: (logMessage: string, ...args: any[]) => this.logger.warn(logMessage, ...args), + error: (logMessage: string, ...args: any[]) => this.logger.error(logMessage, ...args), }); } @@ -98,25 +98,25 @@ export class ToolsService { failed: string[]; }> { return deleteAllWebhooksForAllStores(this.entityManager, from, limit, { - info: this.logger.log, - error: this.logger.error, - warn: this.logger.warn, + info: (logMessage: string, ...args: any[]) => this.logger.log(logMessage, ...args), + warn: (logMessage: string, ...args: any[]) => this.logger.warn(logMessage, ...args), + error: (logMessage: string, ...args: any[]) => this.logger.error(logMessage, ...args), }); } async updateRetailerInfoWhereNull() { return updateRetailerInfoWhereNull(this.configService.get('CLOUDSHELF_API_URL')!, this.entityManager, { - info: this.logger.log, - error: this.logger.error, - warn: this.logger.warn, + info: (logMessage: string, ...args: any[]) => this.logger.log(logMessage, ...args), + warn: (logMessage: string, ...args: any[]) => this.logger.warn(logMessage, ...args), + error: (logMessage: string, ...args: any[]) => this.logger.error(logMessage, ...args), }); } async sendAllRetailersToCloudshelf() { return sendAllRetailersToCloudshelf(this.configService.get('CLOUDSHELF_API_URL')!, this.entityManager, { - info: this.logger.log, - error: this.logger.error, - warn: this.logger.warn, + info: (logMessage: string, ...args: any[]) => this.logger.log(logMessage, ...args), + warn: (logMessage: string, ...args: any[]) => this.logger.warn(logMessage, ...args), + error: (logMessage: string, ...args: any[]) => this.logger.error(logMessage, ...args), }); } } diff --git a/src/modules/tools/utils/registerAllWebhooksForRetailer.ts b/src/modules/tools/utils/registerAllWebhooksForRetailer.ts index 89eedac..440838b 100644 --- a/src/modules/tools/utils/registerAllWebhooksForRetailer.ts +++ b/src/modules/tools/utils/registerAllWebhooksForRetailer.ts @@ -6,11 +6,14 @@ import { registerWebhookForRetailer } from './registerWebhookForRetailer'; export async function registerAllWebhooksForRetailer(retailer: RetailerEntity, host: string, logs?: LogsInterface) { const allWebhooks = await getWebhooks(retailer); + const topics = allWebhooks.map(w => w.node.topic.toString()); + logs?.info(`existing webhooks`, { topics }); if (!allWebhooks.find(w => w.node.topic === WebhookSubscriptionTopic.OrdersUpdated)) { const r1 = await registerWebhookForRetailer( retailer, WebhookSubscriptionTopic.OrdersUpdated, `https://${host}/shopify/webhooks`, + logs, ); if (!r1) { logs?.error( diff --git a/src/modules/tools/utils/registerWebhookForRetailer.ts b/src/modules/tools/utils/registerWebhookForRetailer.ts index 94d3a4d..d259bd7 100644 --- a/src/modules/tools/utils/registerWebhookForRetailer.ts +++ b/src/modules/tools/utils/registerWebhookForRetailer.ts @@ -17,6 +17,7 @@ export async function registerWebhookForRetailer( logs?: LogsInterface, ) { try { + logs?.info(`Creating webook ${topic} to host ${url}`); const authedClient = await ShopifyGraphqlUtil.getShopifyAdminApolloClientByRetailer(retailer); const subscription: WebhookSubscriptionInput = { @@ -35,6 +36,7 @@ export async function registerWebhookForRetailer( }); if (!resp.data || resp.errors) { + logs?.error(`Error creating webhook`, resp.errors); return false; } diff --git a/src/trigger/data-ingestion/product-groups/process-product-groups.ts b/src/trigger/data-ingestion/product-groups/process-product-groups.ts index 9c16add..fa768f7 100644 --- a/src/trigger/data-ingestion/product-groups/process-product-groups.ts +++ b/src/trigger/data-ingestion/product-groups/process-product-groups.ts @@ -40,9 +40,9 @@ export const ProcessProductGroupsTask = task({ retailer.lastSafetySyncCompleted = new Date(); } await ProductJobUtils.scheduleTriggerJob(retailer, false, undefined, undefined, { - info: logger.info, - warn: logger.warn, - error: logger.error, + info: (logMessage: string, ...args: any[]) => logger.info(logMessage, ...args), + warn: (logMessage: string, ...args: any[]) => logger.warn(logMessage, ...args), + error: (logMessage: string, ...args: any[]) => logger.error(logMessage, ...args), }); } logger.info(`Handle Complete: ${msg}`); @@ -159,9 +159,9 @@ export const ProcessProductGroupsTask = task({ } logger.info(`Upserting collections to cloudshelf`, { productGroupInputs }); await CloudshelfApiUtils.updateProductGroups(cloudshelfAPI, retailer.domain, productGroupInputs, { - info: logger.info, - error: logger.error, - warn: logger.warn, + info: (logMessage: string, ...args: any[]) => logger.info(logMessage, ...args), + warn: (logMessage: string, ...args: any[]) => logger.warn(logMessage, ...args), + error: (logMessage: string, ...args: any[]) => logger.error(logMessage, ...args), }); logger.info(`Updating products in product groups on cloudshelf`); for (const [productGroupId, productIds] of Object.entries(productsInGroups)) { @@ -173,9 +173,9 @@ export const ProcessProductGroupsTask = task({ productGroupId, reversedProductIds, { - info: logger.info, - error: logger.error, - warn: logger.warn, + info: (logMessage: string, ...args: any[]) => logger.info(logMessage, ...args), + warn: (logMessage: string, ...args: any[]) => logger.warn(logMessage, ...args), + error: (logMessage: string, ...args: any[]) => logger.error(logMessage, ...args), }, ); } @@ -183,9 +183,9 @@ export const ProcessProductGroupsTask = task({ logger.info(`Finished reporting all products in all groups`); logger.info(`Creating first cloud shelf if required`); await CloudshelfApiUtils.createFirstCloudshelfIfRequired(cloudshelfAPI, em, retailer, { - info: logger.info, - error: logger.error, - warn: logger.warn, + info: (logMessage: string, ...args: any[]) => logger.info(logMessage, ...args), + warn: (logMessage: string, ...args: any[]) => logger.warn(logMessage, ...args), + error: (logMessage: string, ...args: any[]) => logger.error(logMessage, ...args), }); if (payload.fullSync) { const groupContentToSave: { id: string }[] = []; diff --git a/src/trigger/data-ingestion/product-groups/request-product-groups.ts b/src/trigger/data-ingestion/product-groups/request-product-groups.ts index 3b1d3ac..6e2612e 100644 --- a/src/trigger/data-ingestion/product-groups/request-product-groups.ts +++ b/src/trigger/data-ingestion/product-groups/request-product-groups.ts @@ -116,9 +116,9 @@ export const RequestProductGroupsTask = task({ queryPayload, payload.fullSync, { - info: logger.info, - error: logger.error, - warn: logger.warn, + info: (logMessage: string, ...args: any[]) => logger.info(logMessage, ...args), + warn: (logMessage: string, ...args: any[]) => logger.warn(logMessage, ...args), + error: (logMessage: string, ...args: any[]) => logger.error(logMessage, ...args), }, ); diff --git a/src/trigger/data-ingestion/product/process-products.ts b/src/trigger/data-ingestion/product/process-products.ts index 8c815b2..a87980b 100644 --- a/src/trigger/data-ingestion/product/process-products.ts +++ b/src/trigger/data-ingestion/product/process-products.ts @@ -43,9 +43,9 @@ export const ProcessProductsTask = task({ if (retailer) { retailer.lastProductSync = new Date(); await CollectionJobUtils.scheduleTriggerJob(retailer, payload.fullSync, undefined, { - info: logger.info, - warn: logger.warn, - error: logger.error, + info: (logMessage: string, ...args: any[]) => logger.info(logMessage, ...args), + warn: (logMessage: string, ...args: any[]) => logger.warn(logMessage, ...args), + error: (logMessage: string, ...args: any[]) => logger.error(logMessage, ...args), }); } logger.info(`Handle Complete: ${msg}`); @@ -239,9 +239,9 @@ export const ProcessProductsTask = task({ for (const chunk of chunkedProductInputs) { logger.info(`Upserting ${chunk.length} products to cloudshelf for current file`); await CloudshelfApiUtils.upsertProducts(cloudshelfAPI, bulkOperationRecord.domain, chunk, { - info: logger.info, - error: logger.error, - warn: logger.warn, + info: (logMessage: string, ...args: any[]) => logger.info(logMessage, ...args), + warn: (logMessage: string, ...args: any[]) => logger.warn(logMessage, ...args), + error: (logMessage: string, ...args: any[]) => logger.error(logMessage, ...args), }); } logger.info( diff --git a/src/trigger/data-ingestion/product/request-products.ts b/src/trigger/data-ingestion/product/request-products.ts index 62fb7fb..151a0c4 100644 --- a/src/trigger/data-ingestion/product/request-products.ts +++ b/src/trigger/data-ingestion/product/request-products.ts @@ -121,11 +121,13 @@ export const RequestProductsTask = task({ logger.info(`Requesting products for retailer ${retailer.displayName} (${retailer.id}) (${retailer.domain})`); if (payload.fullSync) { logger.info(`payload.fullSync is true, registering webhooks`); + const topics = ['1']; + logger.info(`test`, { topics }); //If its a full sync we register all the webhooks first, just to be safe. await registerAllWebhooksForRetailer(retailer, connectorHost, { - info: logger.info, - error: logger.error, - warn: logger.warn, + info: (logMessage: string, ...args: any[]) => logger.info(logMessage, ...args), + warn: (logMessage: string, ...args: any[]) => logger.warn(logMessage, ...args), + error: (logMessage: string, ...args: any[]) => logger.error(logMessage, ...args), }); } await TriggerWaitForNobleReschedule(retailer); @@ -149,9 +151,9 @@ export const RequestProductsTask = task({ queryPayload, payload.fullSync, { - info: logger.info, - error: logger.error, - warn: logger.warn, + info: (logMessage: string, ...args: any[]) => logger.info(logMessage, ...args), + warn: (logMessage: string, ...args: any[]) => logger.warn(logMessage, ...args), + error: (logMessage: string, ...args: any[]) => logger.error(logMessage, ...args), }, ); retailer.nextPartialSyncRequestTime = subMinutes(new Date(), 1); diff --git a/src/trigger/reuseables/noble_pollfills.ts b/src/trigger/reuseables/noble_pollfills.ts index d64fce0..e0def23 100644 --- a/src/trigger/reuseables/noble_pollfills.ts +++ b/src/trigger/reuseables/noble_pollfills.ts @@ -11,9 +11,9 @@ export const TriggerWaitForNobleReschedule = async (retailer: RetailerEntity) => do { logger.info(`Checking for running bulk operation`); const currentBulkOperation = await BulkOperationUtils.checkForRunningBulkOperationByRetailer(retailer, { - info: logger.info, - error: logger.error, - warn: logger.warn, + info: (logMessage: string, ...args: any[]) => logger.info(logMessage, ...args), + warn: (logMessage: string, ...args: any[]) => logger.warn(logMessage, ...args), + error: (logMessage: string, ...args: any[]) => logger.error(logMessage, ...args), }); if (currentBulkOperation) { @@ -28,9 +28,11 @@ export const TriggerWaitForNobleReschedule = async (retailer: RetailerEntity) => await wait.for({ minutes: 2 }); } else { shouldBeWaitingForQueue = false; + logger.info(`Exising bulk operation, but not running (${currentBulkOperation.status})`); } } else { shouldBeWaitingForQueue = false; + logger.info(`No running bulk operation`); } } while (shouldBeWaitingForQueue); }; diff --git a/src/trigger/scheduled/safety_sync.ts b/src/trigger/scheduled/safety_sync.ts index 40d1b03..2fa1077 100644 --- a/src/trigger/scheduled/safety_sync.ts +++ b/src/trigger/scheduled/safety_sync.ts @@ -37,9 +37,9 @@ export async function internalScheduleTriggerJobs(em: EntityManager) { for (const retailer of retailers) { logger.debug('Creating safety sync for retailer ' + retailer.domain); await ProductJobUtils.scheduleTriggerJob(retailer, true, undefined, 'safetySync', { - info: logger.info, - warn: logger.warn, - error: logger.error, + info: (logMessage: string, ...args: any[]) => logger.info(logMessage, ...args), + warn: (logMessage: string, ...args: any[]) => logger.warn(logMessage, ...args), + error: (logMessage: string, ...args: any[]) => logger.error(logMessage, ...args), }); retailer.lastSafetySyncRequested = new Date(); } From 63320f7b18c54dcbc0e5ee11c606acc50ac49518 Mon Sep 17 00:00:00 2001 From: Ashley Williams Date: Mon, 30 Dec 2024 16:29:27 +0000 Subject: [PATCH 2/2] chore: webhook reason and logging --- src/modules/data-ingestion/collection.job.utils.ts | 4 +++- src/modules/data-ingestion/product.job.utils.ts | 4 ++++ .../bulk.operation.finish.webhook.handler.ts | 12 ++++++++++-- .../data-ingestion/product/request-products.ts | 4 +--- 4 files changed, 18 insertions(+), 6 deletions(-) diff --git a/src/modules/data-ingestion/collection.job.utils.ts b/src/modules/data-ingestion/collection.job.utils.ts index 03383ef..c0711db 100644 --- a/src/modules/data-ingestion/collection.job.utils.ts +++ b/src/modules/data-ingestion/collection.job.utils.ts @@ -45,7 +45,9 @@ export class CollectionJobUtils { if (reason) { tags.push(`reason_${reason}`); } - + logs?.info( + `Asking trigger to schhedule productgroup consumer job for retailer ${retailer.domain} and bulk op ${bulkOp.shopifyBulkOpId}`, + ); await ProcessProductGroupsTask.trigger( { remoteBulkOperationId: bulkOp.shopifyBulkOpId, diff --git a/src/modules/data-ingestion/product.job.utils.ts b/src/modules/data-ingestion/product.job.utils.ts index e4cafb9..4b31267 100644 --- a/src/modules/data-ingestion/product.job.utils.ts +++ b/src/modules/data-ingestion/product.job.utils.ts @@ -103,6 +103,10 @@ export class ProductJobUtils { if (reason) { tags.push(`reason_${reason}`); } + + logs?.info( + `Asking trigger to schhedule product consumer job for retailer ${retailer.domain} and bulk op ${bulkOp.shopifyBulkOpId}`, + ); await ProcessProductsTask.trigger( { remoteBulkOperationId: bulkOp.shopifyBulkOpId, diff --git a/src/modules/shopify/webhooks/bulk.operation.finish.webhook.handler.ts b/src/modules/shopify/webhooks/bulk.operation.finish.webhook.handler.ts index 83ede61..42b57f1 100644 --- a/src/modules/shopify/webhooks/bulk.operation.finish.webhook.handler.ts +++ b/src/modules/shopify/webhooks/bulk.operation.finish.webhook.handler.ts @@ -67,10 +67,18 @@ export class BulkOperationFinishedWebhookHandler extends ShopifyWebhookHandler this.logger.log(logMessage, ...args), + warn: (logMessage: string, ...args: any[]) => this.logger.warn(logMessage, ...args), + error: (logMessage: string, ...args: any[]) => this.logger.error(logMessage, ...args), + }); } else if (bulkOp.type === BulkOperationType.ProductGroupSync) { //create the product group consumer - await CollectionJobUtils.scheduleConsumerJob(retailer, bulkOp); + await CollectionJobUtils.scheduleConsumerJob(retailer, bulkOp, `webhook`, { + info: (logMessage: string, ...args: any[]) => this.logger.log(logMessage, ...args), + warn: (logMessage: string, ...args: any[]) => this.logger.warn(logMessage, ...args), + error: (logMessage: string, ...args: any[]) => this.logger.error(logMessage, ...args), + }); } else { this.logger.error('Unknown bulk operation type ' + bulkOp.type); } diff --git a/src/trigger/data-ingestion/product/request-products.ts b/src/trigger/data-ingestion/product/request-products.ts index 151a0c4..35f710d 100644 --- a/src/trigger/data-ingestion/product/request-products.ts +++ b/src/trigger/data-ingestion/product/request-products.ts @@ -120,9 +120,7 @@ export const RequestProductsTask = task({ } logger.info(`Requesting products for retailer ${retailer.displayName} (${retailer.id}) (${retailer.domain})`); if (payload.fullSync) { - logger.info(`payload.fullSync is true, registering webhooks`); - const topics = ['1']; - logger.info(`test`, { topics }); + logger.info(`payload.fullSync is true, registering webhooks for host`, { connectorHost }); //If its a full sync we register all the webhooks first, just to be safe. await registerAllWebhooksForRetailer(retailer, connectorHost, { info: (logMessage: string, ...args: any[]) => logger.info(logMessage, ...args),