diff --git a/packages/grpc-js-xds/src/xds-stream-state/cds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/cds-state.ts index 7720c5673..5dae09fb4 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/cds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/cds-state.ts @@ -137,17 +137,21 @@ export class CdsState implements XdsStreamState { } handleResponses(responses: Cluster__Output[], isV2: boolean): string | null { + const validResponses: Cluster__Output[] = []; + let errorMessage: string | null = null; for (const message of responses) { - if (!this.validateResponse(message)) { + if (this.validateResponse(message)) { + validResponses.push(message); + } else { trace('CDS validation failed for message ' + JSON.stringify(message)); - return 'CDS Error: Cluster validation failed'; + errorMessage = 'CDS Error: Cluster validation failed'; } } - this.latestResponses = responses; + this.latestResponses = validResponses; this.latestIsV2 = isV2; const allEdsServiceNames: Set = new Set(); const allClusterNames: Set = new Set(); - for (const message of responses) { + for (const message of validResponses) { allClusterNames.add(message.name); const edsServiceName = message.eds_cluster_config?.service_name ?? ''; allEdsServiceNames.add( @@ -161,7 +165,7 @@ export class CdsState implements XdsStreamState { trace('Received CDS updates for cluster names ' + Array.from(allClusterNames)); this.handleMissingNames(allClusterNames); this.edsState.handleMissingNames(allEdsServiceNames); - return null; + return errorMessage; } reportStreamError(status: StatusObject): void { diff --git a/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts index 3861f4d2a..7d28ed5ff 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts @@ -146,16 +146,20 @@ export class EdsState implements XdsStreamState { } handleResponses(responses: ClusterLoadAssignment__Output[], isV2: boolean) { + const validResponses: ClusterLoadAssignment__Output[] = []; + let errorMessage: string | null = null; for (const message of responses) { - if (!this.validateResponse(message)) { + if (this.validateResponse(message)) { + validResponses.push(message); + } else { trace('EDS validation failed for message ' + JSON.stringify(message)); - return 'EDS Error: ClusterLoadAssignment validation failed'; + errorMessage = 'EDS Error: ClusterLoadAssignment validation failed'; } } - this.latestResponses = responses; + this.latestResponses = validResponses; this.latestIsV2 = isV2; const allClusterNames: Set = new Set(); - for (const message of responses) { + for (const message of validResponses) { allClusterNames.add(message.cluster_name); const watchers = this.watchers.get(message.cluster_name) ?? []; for (const watcher of watchers) { @@ -163,7 +167,7 @@ export class EdsState implements XdsStreamState { } } trace('Received EDS updates for cluster names ' + Array.from(allClusterNames)); - return null; + return errorMessage; } reportStreamError(status: StatusObject): void { diff --git a/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts index 10e71babf..5706e3766 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts @@ -154,17 +154,21 @@ export class LdsState implements XdsStreamState { } handleResponses(responses: Listener__Output[], isV2: boolean): string | null { + const validResponses: Listener__Output[] = []; + let errorMessage: string | null = null; for (const message of responses) { - if (!this.validateResponse(message, isV2)) { + if (this.validateResponse(message, isV2)) { + validResponses.push(message); + } else { trace('LDS validation failed for message ' + JSON.stringify(message)); - return 'LDS Error: Route validation failed'; + errorMessage = 'LDS Error: Route validation failed'; } } - this.latestResponses = responses; + this.latestResponses = validResponses; this.latestIsV2 = isV2; const allTargetNames = new Set(); const allRouteConfigNames = new Set(); - for (const message of responses) { + for (const message of validResponses) { allTargetNames.add(message.name); const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL_V3, message.api_listener!.api_listener!.value); if (httpConnectionManager.rds) { @@ -178,7 +182,7 @@ export class LdsState implements XdsStreamState { trace('Received RDS response with route config names ' + Array.from(allTargetNames)); this.handleMissingNames(allTargetNames); this.rdsState.handleMissingNames(allRouteConfigNames); - return null; + return errorMessage; } reportStreamError(status: StatusObject): void { diff --git a/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts index ec7abe55a..5a3854323 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts @@ -183,16 +183,20 @@ export class RdsState implements XdsStreamState { } handleResponses(responses: RouteConfiguration__Output[], isV2: boolean): string | null { + const validResponses: RouteConfiguration__Output[] = []; + let errorMessage: string | null = null; for (const message of responses) { - if (!this.validateResponse(message, isV2)) { + if (this.validateResponse(message, isV2)) { + validResponses.push(message); + } else { trace('RDS validation failed for message ' + JSON.stringify(message)); - return 'RDS Error: Route validation failed'; + errorMessage = 'RDS Error: Route validation failed'; } } - this.latestResponses = responses; + this.latestResponses = validResponses; this.latestIsV2 = isV2; const allRouteConfigNames = new Set(); - for (const message of responses) { + for (const message of validResponses) { allRouteConfigNames.add(message.name); const watchers = this.watchers.get(message.name) ?? []; for (const watcher of watchers) { @@ -200,7 +204,7 @@ export class RdsState implements XdsStreamState { } } trace('Received RDS response with route config names ' + Array.from(allRouteConfigNames)); - return null; + return errorMessage; } reportStreamError(status: StatusObject): void {