Skip to content

Commit

Permalink
Merge pull request #91 from Cloudshelf/trigger_improvements
Browse files Browse the repository at this point in the history
feat: webhook trigger idempotency & attempt to fix extra jobs being s…
  • Loading branch information
ashleyww93 authored Dec 30, 2024
2 parents b1e4274 + af78358 commit 8ef2142
Show file tree
Hide file tree
Showing 25 changed files with 458 additions and 409 deletions.
1 change: 0 additions & 1 deletion src/modules/data-ingestion/bulk.operation.utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { Injectable } from '@nestjs/common';
import {
BulkOperationByShopifyIdDocument,
BulkOperationByShopifyIdQuery,
Expand Down
17 changes: 14 additions & 3 deletions src/modules/data-ingestion/collection.job.utils.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import { ProcessProductGroupsTask } from '../../trigger/data-ingestion/product-groups/process-product-groups';
import { RequestProductGroupsTask } from '../../trigger/data-ingestion/product-groups/request-product-groups';
import { LogsInterface } from '../cloudshelf/logs.interface';
import { RetailerEntity } from '../retailer/retailer.entity';
import { BulkOperation } from './bulk.operation.entity';

export class CollectionJobUtils {
static async scheduleTriggerJob(retailer: RetailerEntity, fullSync?: boolean, reason?: string) {
static async scheduleTriggerJob(
retailer: RetailerEntity,
fullSync?: boolean,
reason?: string,
logs?: LogsInterface,
) {
const tags: string[] = [`retailer_${retailer.id}`, fullSync ? 'type_full' : 'type_partial'];
if (reason) {
tags.push(`reason_${reason}`);
Expand All @@ -28,7 +34,12 @@ export class CollectionJobUtils {
);
}

static async scheduleConsumerJob(retailer: RetailerEntity, bulkOp: BulkOperation, reason?: string) {
static async scheduleConsumerJob(
retailer: RetailerEntity,
bulkOp: BulkOperation,
reason?: string,
logs?: LogsInterface,
) {
const delay = '1s';
const tags: string[] = [`retailer_${retailer.id}`, bulkOp.installSync ? 'type_full' : 'type_partial'];
if (reason) {
Expand All @@ -47,8 +58,8 @@ export class CollectionJobUtils {
concurrencyLimit: 1,
},
tags,

concurrencyKey: retailer.id,
idempotencyKey: bulkOp.shopifyBulkOpId,
},
);
}
Expand Down
3 changes: 2 additions & 1 deletion src/modules/data-ingestion/location.job.utils.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { SyncLocationsTask } from '../../trigger/data-ingestion/location/sync-locations';
import { LogsInterface } from '../cloudshelf/logs.interface';
import { RetailerEntity } from '../retailer/retailer.entity';

export class LocationJobUtils {
static async schedule(retailer: RetailerEntity, reason?: string) {
static async schedule(retailer: RetailerEntity, reason?: string, logs?: LogsInterface) {
const tags: string[] = [`retailer_${retailer.id}`];
if (reason) {
tags.push(`reason_${reason}`);
Expand Down
57 changes: 55 additions & 2 deletions src/modules/data-ingestion/product.job.utils.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
import { ProcessProductsTask } from '../../trigger/data-ingestion/product/process-products';
import { RequestProductsTask } from '../../trigger/data-ingestion/product/request-products';
import { LogsInterface } from '../cloudshelf/logs.interface';
import { RetailerEntity } from '../retailer/retailer.entity';
import { BulkOperation } from './bulk.operation.entity';
import { runs } from '@trigger.dev/sdk/v3';

export class ProductJobUtils {
static async scheduleTriggerJob(
retailer: RetailerEntity,
fullSync?: boolean,
delayOverride?: number,
reason?: string,
logs?: LogsInterface,
) {
const tags: string[] = [`retailer_${retailer.id}`, fullSync ? 'type_full' : 'type_partial'];
const retailerTag = `retailer_${retailer.id}`;
const syncTypeTag = fullSync ? 'type_full' : 'type_partial';
const tags: string[] = [retailerTag, syncTypeTag];
if (reason) {
tags.push(`reason_${reason}`);
}
Expand All @@ -24,6 +29,48 @@ export class ProductJobUtils {
delay = `${delayOverride}s`;
}

// if its full sync, then cancel everything else
// if its partial sync, and there is a full sync scheduled, cancel any partial syncs and then do nothing.
// if its partial sync, and there is NO full sync scheduled, then if there is any partials do nothing, otherwise schedule

const searchTags: string[] = [retailerTag];
const pendingRuns: { id: string; type: 'type_full' | 'type_partial' }[] = [];

for await (const run of runs.list({
status: ['WAITING_FOR_DEPLOY', 'DELAYED', 'EXECUTING', 'FROZEN', 'INTERRUPTED', 'QUEUED', 'REATTEMPTING'],
taskIdentifier: [RequestProductsTask.id],
tag: searchTags,
})) {
console.log(run);
pendingRuns.push({ id: run.id, type: run.tags.includes('type_full') ? 'type_full' : 'type_partial' });
}
logs?.info(`Found ${pendingRuns.length} existing jobs...`);

if (fullSync) {
for (const runToCancel of pendingRuns) {
logs?.info(`Cancelling ${runToCancel.id}`, runToCancel);
await runs.cancel(runToCancel.id);
}
} else {
const hasAnyFullSyncs = pendingRuns.filter(f => f.type === 'type_full').length > 0;
const partialSyncRuns = pendingRuns.filter(f => f.type === 'type_partial');
if (hasAnyFullSyncs) {
for (const runToCancel of partialSyncRuns) {
logs?.info(`Cancelling ${runToCancel.id}`, runToCancel);
await runs.cancel(runToCancel.id);
}
//we dont want to schedule, so return
logs?.info(`Not scheduling another run as there is already a pending full sync`);
return;
} else {
if (partialSyncRuns.length > 0) {
//we dont want to schedule, so return
logs?.info(`Not scheduling another run as there is already a pending partial`);
return;
}
}
}

await RequestProductsTask.trigger(
{
organisationId: retailer.id,
Expand All @@ -41,7 +88,12 @@ export class ProductJobUtils {
);
}

static async scheduleConsumerJob(retailer: RetailerEntity, bulkOp: BulkOperation, reason?: string) {
static async scheduleConsumerJob(
retailer: RetailerEntity,
bulkOp: BulkOperation,
reason?: string,
logs?: LogsInterface,
) {
const delay = '1s';
const tags: string[] = [`retailer_${retailer.id}`, bulkOp.installSync ? 'type_full' : 'type_partial'];
if (reason) {
Expand All @@ -60,6 +112,7 @@ export class ProductJobUtils {
},
tags,
concurrencyKey: retailer.id,
idempotencyKey: bulkOp.shopifyBulkOpId,
},
);
}
Expand Down
40 changes: 0 additions & 40 deletions src/modules/integrations/slack.service.ts

This file was deleted.

1 change: 0 additions & 1 deletion src/modules/retailer/retailer.utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ export class RetailerUtils {
return em.findOne(RetailerEntity, { domain });
}


static async updateShopInformationFromShopifyOnlineSession(
em: EntityManager,
shopifyApiInstance: Shopify,
Expand Down
4 changes: 2 additions & 2 deletions src/modules/shopify/auth/after.auth.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ import { ConfigService } from '@nestjs/config';
import { ExtendedLogger } from '../../../utils/ExtendedLogger';
import { NotificationUtils } from '../../../utils/NotificationUtils';
import { RequestUtils } from '../../../utils/RequestUtils';
import { SlackUtils } from '../../../utils/SlackUtils';
import { SentryInstrument } from '../../apm/sentry.function.instrumenter';
import { CloudshelfApiService } from '../../cloudshelf/cloudshelf.api.service';
import { shopifySchema } from '../../configuration/schemas/shopify.schema';
import { slackSchema } from '../../configuration/schemas/slack.schema';
import { LocationJobUtils } from '../../data-ingestion/location.job.utils';
import { ProductJobUtils } from '../../data-ingestion/product.job.utils';
import { RetailerService } from '../../retailer/retailer.service';
Expand All @@ -17,8 +19,6 @@ import { InjectShopify } from '@nestjs-shopify/core';
import { ShopifyWebhooksService } from '@nestjs-shopify/webhooks';
import { Shopify } from '@shopify/shopify-api';
import { Request, Response } from 'express';
import { slackSchema } from 'src/modules/configuration/schemas/slack.schema';
import { SlackUtils } from 'src/utils/SlackUtils';

@Injectable()
export class AfterAuthHandlerService implements ShopifyAuthAfterHandler {
Expand Down
4 changes: 2 additions & 2 deletions src/modules/shopify/webhooks/uninstall.webhook.handler.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { ConfigService } from '@nestjs/config';
import { slackSchema } from '../../../modules/configuration/schemas/slack.schema';
import { ExtendedLogger } from '../../../utils/ExtendedLogger';
import { NotificationUtils } from '../../../utils/NotificationUtils';
import { SentryUtil } from '../../../utils/SentryUtil';
import { SlackUtils } from '../../../utils/SlackUtils';
import { SentryInstrument } from '../../apm/sentry.function.instrumenter';
import { CloudshelfApiService } from '../../cloudshelf/cloudshelf.api.service';
import { RetailerService } from '../../retailer/retailer.service';
import { DatabaseSessionStorage } from '../sessions/database.session.storage';
import { ShopifyWebhookHandler, WebhookHandler } from '@nestjs-shopify/webhooks';
import { slackSchema } from 'src/modules/configuration/schemas/slack.schema';
import { SlackUtils } from 'src/utils/SlackUtils';

@WebhookHandler('APP_UNINSTALLED')
export class UninstalledWebhookHandler extends ShopifyWebhookHandler<unknown> {
Expand Down
56 changes: 27 additions & 29 deletions src/modules/tools/tools.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@ import { Injectable, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { WebhookSubscriptionTopic } from '../../graphql/shopifyAdmin/generated/shopifyAdmin';
import { EntityManager } from '@mikro-orm/postgresql';
import { CloudshelfApiService } from '../cloudshelf/cloudshelf.api.service';
import { internalScheduleTriggerJobs } from '../../trigger/scheduled/safety_sync';
import { cloudshelfSchema } from '../configuration/schemas/cloudshelf.schema';
import { runtimeSchema } from '../configuration/schemas/runtime.schema';
import { RetailerEntity } from '../retailer/retailer.entity';
import { RetailerService } from '../retailer/retailer.service';
import { ToolsUtils } from './tools.utils';
import { internalScheduleTriggerJobs } from 'src/trigger/scheduled/safety_sync';
import { deleteAllWebhooksForAllStores } from './utils/deleteAllWebhooksForAllStores';
import { deleteAllWebhooksForRetailer } from './utils/deleteAllWebhooksForRetailer';
import { deleteWebhookForStore } from './utils/deleteWebhookForStore';
import { getWebhooks } from './utils/getWebhooks';
import { registerAllWebhooksForAllRetailers } from './utils/registerAllWebhooksForAllRetailers';
import { registerAllWebhooksForRetailer } from './utils/registerAllWebhooksForRetailer';
import { registerWebhookForRetailer } from './utils/registerWebhookForRetailer';
import { sendAllRetailersToCloudshelf } from './utils/sendAllRetailersToCloudshelf';
import { updateRetailerInfoWhereNull } from './utils/updateRetailerInfoWhereNull';

@Injectable()
export class ToolsService {
Expand All @@ -25,23 +31,23 @@ export class ToolsService {
}

async getWebhooks(retailer: RetailerEntity) {
return ToolsUtils.getWebhooks(retailer, {
return getWebhooks(retailer, {
info: this.logger.log,
error: this.logger.error,
warn: this.logger.warn,
});
}

async registerAllWebhooksForRetailer(retailer: RetailerEntity) {
return ToolsUtils.registerAllWebhooksForRetailer(retailer, process.env.HOST!, {
return registerAllWebhooksForRetailer(retailer, process.env.HOST!, {
info: this.logger.log,
error: this.logger.error,
warn: this.logger.warn,
});
}

async registerWebhookForRetailer(retailer: RetailerEntity, topic: WebhookSubscriptionTopic, url: string) {
return ToolsUtils.registerWebhookForRetailer(retailer, topic, url, {
return registerWebhookForRetailer(retailer, topic, url, {
info: this.logger.log,
error: this.logger.error,
warn: this.logger.warn,
Expand All @@ -55,7 +61,7 @@ export class ToolsService {
success: string[];
failed: string[];
}> {
return ToolsUtils.registerAllWebhooksForAllRetailers(
return registerAllWebhooksForAllRetailers(
this.entityManager,
this.runtimeConfigService.get('HOST')!,
from,
Expand All @@ -69,15 +75,15 @@ export class ToolsService {
}

async deleteWebhookForStore(retailer: RetailerEntity, webhookId: string) {
return ToolsUtils.deleteWebhookForStore(retailer, webhookId, {
return deleteWebhookForStore(retailer, webhookId, {
info: this.logger.log,
error: this.logger.error,
warn: this.logger.warn,
});
}

async deleteAllWebhooksForRetailer(retailer: RetailerEntity) {
return ToolsUtils.deleteAllWebhooksForRetailer(retailer, {
return deleteAllWebhooksForRetailer(retailer, {
info: this.logger.log,
error: this.logger.error,
warn: this.logger.warn,
Expand All @@ -91,34 +97,26 @@ export class ToolsService {
success: string[];
failed: string[];
}> {
return ToolsUtils.deleteAllWebhooksForAllStores(this.entityManager, from, limit, {
return deleteAllWebhooksForAllStores(this.entityManager, from, limit, {
info: this.logger.log,
error: this.logger.error,
warn: this.logger.warn,
});
}

async updateRetailerInfoWhereNull() {
return ToolsUtils.updateRetailerInfoWhereNull(
this.configService.get('CLOUDSHELF_API_URL')!,
this.entityManager,
{
info: this.logger.log,
error: this.logger.error,
warn: this.logger.warn,
},
);
return updateRetailerInfoWhereNull(this.configService.get('CLOUDSHELF_API_URL')!, this.entityManager, {
info: this.logger.log,
error: this.logger.error,
warn: this.logger.warn,
});
}

async sendAllRetailersToCloudshelf() {
return ToolsUtils.sendAllRetailersToCloudshelf(
this.configService.get('CLOUDSHELF_API_URL')!,
this.entityManager,
{
info: this.logger.log,
error: this.logger.error,
warn: this.logger.warn,
},
);
return sendAllRetailersToCloudshelf(this.configService.get('CLOUDSHELF_API_URL')!, this.entityManager, {
info: this.logger.log,
error: this.logger.error,
warn: this.logger.warn,
});
}
}
Loading

0 comments on commit 8ef2142

Please sign in to comment.