Skip to content

Commit

Permalink
Merge pull request #93 from Cloudshelf/logging-fixes
Browse files Browse the repository at this point in the history
Logging fixes
  • Loading branch information
ashleyww93 authored Dec 30, 2024
2 parents e1abd53 + 63320f7 commit e0f672c
Show file tree
Hide file tree
Showing 13 changed files with 94 additions and 70 deletions.
4 changes: 3 additions & 1 deletion src/modules/data-ingestion/collection.job.utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ export class CollectionJobUtils {
if (reason) {
tags.push(`reason_${reason}`);
}

logs?.info(
`Asking trigger to schhedule productgroup consumer job for retailer ${retailer.domain} and bulk op ${bulkOp.shopifyBulkOpId}`,
);
await ProcessProductGroupsTask.trigger(
{
remoteBulkOperationId: bulkOp.shopifyBulkOpId,
Expand Down
4 changes: 4 additions & 0 deletions src/modules/data-ingestion/product.job.utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ export class ProductJobUtils {
if (reason) {
tags.push(`reason_${reason}`);
}

logs?.info(
`Asking trigger to schhedule product consumer job for retailer ${retailer.domain} and bulk op ${bulkOp.shopifyBulkOpId}`,
);
await ProcessProductsTask.trigger(
{
remoteBulkOperationId: bulkOp.shopifyBulkOpId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,18 @@ export class BulkOperationFinishedWebhookHandler extends ShopifyWebhookHandler<u

if (bulkOp.type === BulkOperationType.ProductSync) {
//create the product consumer
await ProductJobUtils.scheduleConsumerJob(retailer, bulkOp);
await ProductJobUtils.scheduleConsumerJob(retailer, bulkOp, `webhook`, {
info: (logMessage: string, ...args: any[]) => this.logger.log(logMessage, ...args),
warn: (logMessage: string, ...args: any[]) => this.logger.warn(logMessage, ...args),
error: (logMessage: string, ...args: any[]) => this.logger.error(logMessage, ...args),
});
} else if (bulkOp.type === BulkOperationType.ProductGroupSync) {
//create the product group consumer
await CollectionJobUtils.scheduleConsumerJob(retailer, bulkOp);
await CollectionJobUtils.scheduleConsumerJob(retailer, bulkOp, `webhook`, {
info: (logMessage: string, ...args: any[]) => this.logger.log(logMessage, ...args),
warn: (logMessage: string, ...args: any[]) => this.logger.warn(logMessage, ...args),
error: (logMessage: string, ...args: any[]) => this.logger.error(logMessage, ...args),
});
} else {
this.logger.error('Unknown bulk operation type ' + bulkOp.type);
}
Expand Down
15 changes: 9 additions & 6 deletions src/modules/tools/tools.resolver.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Logger } from '@nestjs/common';
import { Args, Mutation, Query, Resolver } from '@nestjs/graphql';
import { GraphQLBoolean, GraphQLInt } from 'graphql';
import { GraphQLString } from 'graphql/type';
Expand All @@ -10,6 +11,8 @@ import { ToolsService } from './tools.service';

@Resolver()
export class ToolsResolver {
private readonly logger = new Logger('ToolsResolver');

constructor(
private readonly toolsService: ToolsService,
private readonly retailerService: RetailerService,
Expand Down Expand Up @@ -88,15 +91,15 @@ export class ToolsResolver {

if (partial) {
await ProductJobUtils.scheduleTriggerJob(retailer, false, 10, 'forceViaGql', {
info: (s, o) => console.info(s, o),
warn: (s, o) => console.warn(s, o),
error: (s, o) => console.error(s, o),
info: (logMessage: string, ...args: any[]) => this.logger.log(logMessage, ...args),
warn: (logMessage: string, ...args: any[]) => this.logger.warn(logMessage, ...args),
error: (logMessage: string, ...args: any[]) => this.logger.error(logMessage, ...args),
});
} else {
await ProductJobUtils.scheduleTriggerJob(retailer, true, undefined, 'forceViaGql', {
info: (s, o) => console.info(s, o),
warn: (s, o) => console.warn(s, o),
error: (s, o) => console.error(s, o),
info: (logMessage: string, ...args: any[]) => this.logger.log(logMessage, ...args),
warn: (logMessage: string, ...args: any[]) => this.logger.warn(logMessage, ...args),
error: (logMessage: string, ...args: any[]) => this.logger.error(logMessage, ...args),
});
}
return 'Scheduled a sync';
Expand Down
54 changes: 27 additions & 27 deletions src/modules/tools/tools.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,25 @@ export class ToolsService {

async getWebhooks(retailer: RetailerEntity) {
return getWebhooks(retailer, {
info: this.logger.log,
error: this.logger.error,
warn: this.logger.warn,
info: (logMessage: string, ...args: any[]) => this.logger.log(logMessage, ...args),
warn: (logMessage: string, ...args: any[]) => this.logger.warn(logMessage, ...args),
error: (logMessage: string, ...args: any[]) => this.logger.error(logMessage, ...args),
});
}

async registerAllWebhooksForRetailer(retailer: RetailerEntity) {
return registerAllWebhooksForRetailer(retailer, process.env.HOST!, {
info: this.logger.log,
error: this.logger.error,
warn: this.logger.warn,
info: (logMessage: string, ...args: any[]) => this.logger.log(logMessage, ...args),
warn: (logMessage: string, ...args: any[]) => this.logger.warn(logMessage, ...args),
error: (logMessage: string, ...args: any[]) => this.logger.error(logMessage, ...args),
});
}

async registerWebhookForRetailer(retailer: RetailerEntity, topic: WebhookSubscriptionTopic, url: string) {
return registerWebhookForRetailer(retailer, topic, url, {
info: this.logger.log,
error: this.logger.error,
warn: this.logger.warn,
info: (logMessage: string, ...args: any[]) => this.logger.log(logMessage, ...args),
warn: (logMessage: string, ...args: any[]) => this.logger.warn(logMessage, ...args),
error: (logMessage: string, ...args: any[]) => this.logger.error(logMessage, ...args),
});
}

Expand All @@ -67,26 +67,26 @@ export class ToolsService {
from,
limit,
{
info: this.logger.log,
error: this.logger.error,
warn: this.logger.warn,
info: (logMessage: string, ...args: any[]) => this.logger.log(logMessage, ...args),
warn: (logMessage: string, ...args: any[]) => this.logger.warn(logMessage, ...args),
error: (logMessage: string, ...args: any[]) => this.logger.error(logMessage, ...args),
},
);
}

async deleteWebhookForStore(retailer: RetailerEntity, webhookId: string) {
return deleteWebhookForStore(retailer, webhookId, {
info: this.logger.log,
error: this.logger.error,
warn: this.logger.warn,
info: (logMessage: string, ...args: any[]) => this.logger.log(logMessage, ...args),
warn: (logMessage: string, ...args: any[]) => this.logger.warn(logMessage, ...args),
error: (logMessage: string, ...args: any[]) => this.logger.error(logMessage, ...args),
});
}

async deleteAllWebhooksForRetailer(retailer: RetailerEntity) {
return deleteAllWebhooksForRetailer(retailer, {
info: this.logger.log,
error: this.logger.error,
warn: this.logger.warn,
info: (logMessage: string, ...args: any[]) => this.logger.log(logMessage, ...args),
warn: (logMessage: string, ...args: any[]) => this.logger.warn(logMessage, ...args),
error: (logMessage: string, ...args: any[]) => this.logger.error(logMessage, ...args),
});
}

Expand All @@ -98,25 +98,25 @@ export class ToolsService {
failed: string[];
}> {
return deleteAllWebhooksForAllStores(this.entityManager, from, limit, {
info: this.logger.log,
error: this.logger.error,
warn: this.logger.warn,
info: (logMessage: string, ...args: any[]) => this.logger.log(logMessage, ...args),
warn: (logMessage: string, ...args: any[]) => this.logger.warn(logMessage, ...args),
error: (logMessage: string, ...args: any[]) => this.logger.error(logMessage, ...args),
});
}

async updateRetailerInfoWhereNull() {
return updateRetailerInfoWhereNull(this.configService.get('CLOUDSHELF_API_URL')!, this.entityManager, {
info: this.logger.log,
error: this.logger.error,
warn: this.logger.warn,
info: (logMessage: string, ...args: any[]) => this.logger.log(logMessage, ...args),
warn: (logMessage: string, ...args: any[]) => this.logger.warn(logMessage, ...args),
error: (logMessage: string, ...args: any[]) => this.logger.error(logMessage, ...args),
});
}

async sendAllRetailersToCloudshelf() {
return sendAllRetailersToCloudshelf(this.configService.get('CLOUDSHELF_API_URL')!, this.entityManager, {
info: this.logger.log,
error: this.logger.error,
warn: this.logger.warn,
info: (logMessage: string, ...args: any[]) => this.logger.log(logMessage, ...args),
warn: (logMessage: string, ...args: any[]) => this.logger.warn(logMessage, ...args),
error: (logMessage: string, ...args: any[]) => this.logger.error(logMessage, ...args),
});
}
}
3 changes: 3 additions & 0 deletions src/modules/tools/utils/registerAllWebhooksForRetailer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ import { registerWebhookForRetailer } from './registerWebhookForRetailer';

export async function registerAllWebhooksForRetailer(retailer: RetailerEntity, host: string, logs?: LogsInterface) {
const allWebhooks = await getWebhooks(retailer);
const topics = allWebhooks.map(w => w.node.topic.toString());
logs?.info(`existing webhooks`, { topics });
if (!allWebhooks.find(w => w.node.topic === WebhookSubscriptionTopic.OrdersUpdated)) {
const r1 = await registerWebhookForRetailer(
retailer,
WebhookSubscriptionTopic.OrdersUpdated,
`https://${host}/shopify/webhooks`,
logs,
);
if (!r1) {
logs?.error(
Expand Down
2 changes: 2 additions & 0 deletions src/modules/tools/utils/registerWebhookForRetailer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export async function registerWebhookForRetailer(
logs?: LogsInterface,
) {
try {
logs?.info(`Creating webook ${topic} to host ${url}`);
const authedClient = await ShopifyGraphqlUtil.getShopifyAdminApolloClientByRetailer(retailer);

const subscription: WebhookSubscriptionInput = {
Expand All @@ -35,6 +36,7 @@ export async function registerWebhookForRetailer(
});

if (!resp.data || resp.errors) {
logs?.error(`Error creating webhook`, resp.errors);
return false;
}

Expand Down
24 changes: 12 additions & 12 deletions src/trigger/data-ingestion/product-groups/process-product-groups.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ export const ProcessProductGroupsTask = task({
retailer.lastSafetySyncCompleted = new Date();
}
await ProductJobUtils.scheduleTriggerJob(retailer, false, undefined, undefined, {
info: logger.info,
warn: logger.warn,
error: logger.error,
info: (logMessage: string, ...args: any[]) => logger.info(logMessage, ...args),
warn: (logMessage: string, ...args: any[]) => logger.warn(logMessage, ...args),
error: (logMessage: string, ...args: any[]) => logger.error(logMessage, ...args),
});
}
logger.info(`Handle Complete: ${msg}`);
Expand Down Expand Up @@ -159,9 +159,9 @@ export const ProcessProductGroupsTask = task({
}
logger.info(`Upserting collections to cloudshelf`, { productGroupInputs });
await CloudshelfApiUtils.updateProductGroups(cloudshelfAPI, retailer.domain, productGroupInputs, {
info: logger.info,
error: logger.error,
warn: logger.warn,
info: (logMessage: string, ...args: any[]) => logger.info(logMessage, ...args),
warn: (logMessage: string, ...args: any[]) => logger.warn(logMessage, ...args),
error: (logMessage: string, ...args: any[]) => logger.error(logMessage, ...args),
});
logger.info(`Updating products in product groups on cloudshelf`);
for (const [productGroupId, productIds] of Object.entries(productsInGroups)) {
Expand All @@ -173,19 +173,19 @@ export const ProcessProductGroupsTask = task({
productGroupId,
reversedProductIds,
{
info: logger.info,
error: logger.error,
warn: logger.warn,
info: (logMessage: string, ...args: any[]) => logger.info(logMessage, ...args),
warn: (logMessage: string, ...args: any[]) => logger.warn(logMessage, ...args),
error: (logMessage: string, ...args: any[]) => logger.error(logMessage, ...args),
},
);
}
//
logger.info(`Finished reporting all products in all groups`);
logger.info(`Creating first cloud shelf if required`);
await CloudshelfApiUtils.createFirstCloudshelfIfRequired(cloudshelfAPI, em, retailer, {
info: logger.info,
error: logger.error,
warn: logger.warn,
info: (logMessage: string, ...args: any[]) => logger.info(logMessage, ...args),
warn: (logMessage: string, ...args: any[]) => logger.warn(logMessage, ...args),
error: (logMessage: string, ...args: any[]) => logger.error(logMessage, ...args),
});
if (payload.fullSync) {
const groupContentToSave: { id: string }[] = [];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ export const RequestProductGroupsTask = task({
queryPayload,
payload.fullSync,
{
info: logger.info,
error: logger.error,
warn: logger.warn,
info: (logMessage: string, ...args: any[]) => logger.info(logMessage, ...args),
warn: (logMessage: string, ...args: any[]) => logger.warn(logMessage, ...args),
error: (logMessage: string, ...args: any[]) => logger.error(logMessage, ...args),
},
);

Expand Down
12 changes: 6 additions & 6 deletions src/trigger/data-ingestion/product/process-products.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ export const ProcessProductsTask = task({
if (retailer) {
retailer.lastProductSync = new Date();
await CollectionJobUtils.scheduleTriggerJob(retailer, payload.fullSync, undefined, {
info: logger.info,
warn: logger.warn,
error: logger.error,
info: (logMessage: string, ...args: any[]) => logger.info(logMessage, ...args),
warn: (logMessage: string, ...args: any[]) => logger.warn(logMessage, ...args),
error: (logMessage: string, ...args: any[]) => logger.error(logMessage, ...args),
});
}
logger.info(`Handle Complete: ${msg}`);
Expand Down Expand Up @@ -239,9 +239,9 @@ export const ProcessProductsTask = task({
for (const chunk of chunkedProductInputs) {
logger.info(`Upserting ${chunk.length} products to cloudshelf for current file`);
await CloudshelfApiUtils.upsertProducts(cloudshelfAPI, bulkOperationRecord.domain, chunk, {
info: logger.info,
error: logger.error,
warn: logger.warn,
info: (logMessage: string, ...args: any[]) => logger.info(logMessage, ...args),
warn: (logMessage: string, ...args: any[]) => logger.warn(logMessage, ...args),
error: (logMessage: string, ...args: any[]) => logger.error(logMessage, ...args),
});
}
logger.info(
Expand Down
14 changes: 7 additions & 7 deletions src/trigger/data-ingestion/product/request-products.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,12 @@ export const RequestProductsTask = task({
}
logger.info(`Requesting products for retailer ${retailer.displayName} (${retailer.id}) (${retailer.domain})`);
if (payload.fullSync) {
logger.info(`payload.fullSync is true, registering webhooks`);
logger.info(`payload.fullSync is true, registering webhooks for host`, { connectorHost });
//If its a full sync we register all the webhooks first, just to be safe.
await registerAllWebhooksForRetailer(retailer, connectorHost, {
info: logger.info,
error: logger.error,
warn: logger.warn,
info: (logMessage: string, ...args: any[]) => logger.info(logMessage, ...args),
warn: (logMessage: string, ...args: any[]) => logger.warn(logMessage, ...args),
error: (logMessage: string, ...args: any[]) => logger.error(logMessage, ...args),
});
}
await TriggerWaitForNobleReschedule(retailer);
Expand All @@ -149,9 +149,9 @@ export const RequestProductsTask = task({
queryPayload,
payload.fullSync,
{
info: logger.info,
error: logger.error,
warn: logger.warn,
info: (logMessage: string, ...args: any[]) => logger.info(logMessage, ...args),
warn: (logMessage: string, ...args: any[]) => logger.warn(logMessage, ...args),
error: (logMessage: string, ...args: any[]) => logger.error(logMessage, ...args),
},
);
retailer.nextPartialSyncRequestTime = subMinutes(new Date(), 1);
Expand Down
8 changes: 5 additions & 3 deletions src/trigger/reuseables/noble_pollfills.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ export const TriggerWaitForNobleReschedule = async (retailer: RetailerEntity) =>
do {
logger.info(`Checking for running bulk operation`);
const currentBulkOperation = await BulkOperationUtils.checkForRunningBulkOperationByRetailer(retailer, {
info: logger.info,
error: logger.error,
warn: logger.warn,
info: (logMessage: string, ...args: any[]) => logger.info(logMessage, ...args),
warn: (logMessage: string, ...args: any[]) => logger.warn(logMessage, ...args),
error: (logMessage: string, ...args: any[]) => logger.error(logMessage, ...args),
});

if (currentBulkOperation) {
Expand All @@ -28,9 +28,11 @@ export const TriggerWaitForNobleReschedule = async (retailer: RetailerEntity) =>
await wait.for({ minutes: 2 });
} else {
shouldBeWaitingForQueue = false;
logger.info(`Exising bulk operation, but not running (${currentBulkOperation.status})`);
}
} else {
shouldBeWaitingForQueue = false;
logger.info(`No running bulk operation`);
}
} while (shouldBeWaitingForQueue);
};
6 changes: 3 additions & 3 deletions src/trigger/scheduled/safety_sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ export async function internalScheduleTriggerJobs(em: EntityManager) {
for (const retailer of retailers) {
logger.debug('Creating safety sync for retailer ' + retailer.domain);
await ProductJobUtils.scheduleTriggerJob(retailer, true, undefined, 'safetySync', {
info: logger.info,
warn: logger.warn,
error: logger.error,
info: (logMessage: string, ...args: any[]) => logger.info(logMessage, ...args),
warn: (logMessage: string, ...args: any[]) => logger.warn(logMessage, ...args),
error: (logMessage: string, ...args: any[]) => logger.error(logMessage, ...args),
});
retailer.lastSafetySyncRequested = new Date();
}
Expand Down

0 comments on commit e0f672c

Please sign in to comment.