Skip to content

Commit

Permalink
Merge pull request #1924 from murgatroid99/v1.4.x_upmerge_1
Browse files Browse the repository at this point in the history
Merge 1.4.x branch into master
  • Loading branch information
murgatroid99 authored Oct 6, 2021
2 parents 3e5d102 + 134171c commit 8aec160
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 39 deletions.
11 changes: 9 additions & 2 deletions packages/grpc-js-xds/interop/xds-interop-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,13 @@ class CallStatsTracker {

private subscribers: CallSubscriber[] = [];

private removeSubscriber(subscriber: CallSubscriber) {
const index = this.subscribers.indexOf(subscriber);
if (index >= 0) {
this.subscribers.splice(index, 1);
}
}

getCallStats(callCount: number, timeoutSec: number): Promise<LoadBalancerStatsResponse> {
return new Promise((resolve, reject) => {
let finished = false;
Expand All @@ -142,7 +149,7 @@ class CallStatsTracker {
setTimeout(() => {
if (!finished) {
finished = true;
this.subscribers.splice(this.subscribers.indexOf(subscriber), 1);
this.removeSubscriber(subscriber);
resolve(subscriber.getFinalStats());
}
}, timeoutSec * 1000)
Expand All @@ -155,7 +162,7 @@ class CallStatsTracker {
for (const subscriber of callSubscribers) {
subscriber.addCallStarted();
if (!subscriber.needsMoreCalls()) {
this.subscribers.splice(this.subscribers.indexOf(subscriber), 1);
this.removeSubscriber(subscriber);
}
}
return {
Expand Down
2 changes: 1 addition & 1 deletion packages/grpc-js-xds/scripts/xds.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ GRPC_NODE_TRACE=xds_client,xds_resolver,cds_balancer,eds_balancer,priority,weigh
--gcp_suffix=$(date '+%s') \
--verbose \
${XDS_V3_OPT-} \
--client_cmd="$(which node) grpc-node/packages/grpc-js-xds/build/interop/xds-interop-client \
--client_cmd="$(which node) --enable-source-maps grpc-node/packages/grpc-js-xds/build/interop/xds-interop-client \
--server=xds:///{server_uri} \
--stats_port={stats_port} \
--qps={qps} \
Expand Down
2 changes: 1 addition & 1 deletion packages/grpc-js-xds/src/xds-bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ function validateXdsServerConfig(obj: any): XdsServerConfig {
return {
serverUri: obj.server_uri,
channelCreds: obj.channel_creds.map(validateChannelCredsConfig),
serverFeatures: obj.server_features
serverFeatures: obj.server_features ?? []
};
}

Expand Down
45 changes: 30 additions & 15 deletions packages/grpc-js-xds/src/xds-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,16 @@ export class XdsClient {
return;
}
trace('Loaded bootstrap info: ' + JSON.stringify(bootstrapInfo, undefined, 2));
if (bootstrapInfo.xdsServers.length < 1) {
trace('Failed to initialize xDS Client. No servers provided in bootstrap info.');
// Bubble this error up to any listeners
this.reportStreamError({
code: status.INTERNAL,
details: 'Failed to initialize xDS Client. No servers provided in bootstrap info.',
metadata: new Metadata(),
});
return;
}
if (bootstrapInfo.xdsServers[0].serverFeatures.indexOf('xds_v3') >= 0) {
this.apiVersion = XdsApiVersion.V3;
} else {
Expand Down Expand Up @@ -425,8 +435,7 @@ export class XdsClient {
{channelOverride: channel}
);
this.maybeStartLrsStream();
},
(error) => {
}).catch((error) => {
trace('Failed to initialize xDS Client. ' + error.message);
// Bubble this error up to any listeners
this.reportStreamError({
Expand Down Expand Up @@ -507,13 +516,15 @@ export class XdsClient {
}
}

private handleAdsCallError(error: ServiceError) {
private handleAdsCallStatus(streamStatus: StatusObject) {
trace(
'ADS stream ended. code=' + error.code + ' details= ' + error.details
'ADS stream ended. code=' + streamStatus.code + ' details= ' + streamStatus.details
);
this.adsCallV2 = null;
this.adsCallV3 = null;
this.reportStreamError(error);
if (streamStatus.code !== status.OK) {
this.reportStreamError(streamStatus);
}
/* If the backoff timer is no longer running, we do not need to wait any
* more to start the new call. */
if (!this.adsBackoff.isRunning()) {
Expand All @@ -535,9 +546,10 @@ export class XdsClient {
this.adsCallV2.on('data', (message: DiscoveryResponse__Output) => {
this.handleAdsResponse(message);
});
this.adsCallV2.on('error', (error: ServiceError) => {
this.handleAdsCallError(error);
this.adsCallV2.on('status', (status: StatusObject) => {
this.handleAdsCallStatus(status);
});
this.adsCallV2.on('error', () => {});
return true;
}

Expand All @@ -555,9 +567,10 @@ export class XdsClient {
this.adsCallV3.on('data', (message: DiscoveryResponse__Output) => {
this.handleAdsResponse(message);
});
this.adsCallV3.on('error', (error: ServiceError) => {
this.handleAdsCallError(error);
this.adsCallV3.on('status', (status: StatusObject) => {
this.handleAdsCallStatus(status);
});
this.adsCallV3.on('error', () => {});
return true;
}

Expand Down Expand Up @@ -763,9 +776,9 @@ export class XdsClient {
this.receivedLrsSettingsForCurrentStream = true;
}

private handleLrsCallError(error: ServiceError) {
private handleLrsCallStatus(streamStatus: StatusObject) {
trace(
'LRS stream ended. code=' + error.code + ' details= ' + error.details
'LRS stream ended. code=' + streamStatus.code + ' details= ' + streamStatus.details
);
this.lrsCallV2 = null;
this.lrsCallV3 = null;
Expand All @@ -789,9 +802,10 @@ export class XdsClient {
this.lrsCallV2.on('data', (message: LoadStatsResponse__Output) => {
this.handleLrsResponse(message);
});
this.lrsCallV2.on('error', (error: ServiceError) => {
this.handleLrsCallError(error);
this.lrsCallV2.on('status', (status: StatusObject) => {
this.handleLrsCallStatus(status);
});
this.lrsCallV2.on('error', () => {});
return true;
}

Expand All @@ -807,9 +821,10 @@ export class XdsClient {
this.lrsCallV3.on('data', (message: LoadStatsResponse__Output) => {
this.handleLrsResponse(message);
});
this.lrsCallV3.on('error', (error: ServiceError) => {
this.handleLrsCallError(error);
this.lrsCallV3.on('status', (status: StatusObject) => {
this.handleLrsCallStatus(status);
});
this.lrsCallV3.on('error', () => {});
return true;
}

Expand Down
1 change: 0 additions & 1 deletion packages/grpc-js-xds/src/xds-stream-state/eds-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ export class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
}
}
trace('Received EDS updates for cluster names ' + Array.from(allClusterNames));
this.handleMissingNames(allClusterNames);
return null;
}

Expand Down
6 changes: 6 additions & 0 deletions packages/grpc-js-xds/src/xds-stream-state/lds-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,15 +163,21 @@ export class LdsState implements XdsStreamState<Listener__Output> {
this.latestResponses = responses;
this.latestIsV2 = isV2;
const allTargetNames = new Set<string>();
const allRouteConfigNames = new Set<string>();
for (const message of responses) {
allTargetNames.add(message.name);
const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL_V3, message.api_listener!.api_listener!.value);
if (httpConnectionManager.rds) {
allRouteConfigNames.add(httpConnectionManager.rds.route_config_name);
}
const watchers = this.watchers.get(message.name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message, isV2);
}
}
trace('Received RDS response with route config names ' + Array.from(allTargetNames));
this.handleMissingNames(allTargetNames);
this.rdsState.handleMissingNames(allRouteConfigNames);
return null;
}

Expand Down
3 changes: 1 addition & 2 deletions packages/grpc-js-xds/src/xds-stream-state/rds-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ export class RdsState implements XdsStreamState<RouteConfiguration__Output> {
return true;
}

private handleMissingNames(allRouteConfigNames: Set<string>) {
handleMissingNames(allRouteConfigNames: Set<string>) {
for (const [routeConfigName, watcherList] of this.watchers.entries()) {
if (!allRouteConfigNames.has(routeConfigName)) {
for (const watcher of watcherList) {
Expand Down Expand Up @@ -200,7 +200,6 @@ export class RdsState implements XdsStreamState<RouteConfiguration__Output> {
}
}
trace('Received RDS response with route config names ' + Array.from(allRouteConfigNames));
this.handleMissingNames(allRouteConfigNames);
return null;
}

Expand Down
14 changes: 2 additions & 12 deletions packages/grpc-js/src/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,18 +192,8 @@ export class ChannelImplementation implements Channel {
);
}
if (options) {
if (
typeof options !== 'object' ||
!Object.values(options).every(
(value) =>
typeof value === 'string' ||
typeof value === 'number' ||
typeof value === 'undefined'
)
) {
throw new TypeError(
'Channel options must be an object with string or number values'
);
if (typeof options !== 'object') {
throw new TypeError('Channel options must be an object');
}
}
this.originalTarget = target;
Expand Down
16 changes: 11 additions & 5 deletions packages/grpc-js/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,10 @@ export class Server {
};
}

const deferredCallback = (error: Error | null, port: number) => {
process.nextTick(() => callback(error, port));
}

const setupServer = (): http2.Http2Server | http2.Http2SecureServer => {
let http2Server: http2.Http2Server | http2.Http2SecureServer;
if (creds._isSecure()) {
Expand Down Expand Up @@ -388,6 +392,7 @@ export class Server {
const http2Server = setupServer();
return new Promise<number | Error>((resolve, reject) => {
function onError(err: Error): void {
trace('Failed to bind ' + subchannelAddressToString(address) + ' with error ' + err.message);
resolve(err);
}

Expand Down Expand Up @@ -463,6 +468,7 @@ export class Server {
const http2Server = setupServer();
return new Promise<BindResult>((resolve, reject) => {
function onError(err: Error): void {
trace('Failed to bind ' + subchannelAddressToString(address) + ' with error ' + err.message);
resolve(bindWildcardPort(addressList.slice(1)));
}

Expand Down Expand Up @@ -518,7 +524,7 @@ export class Server {
// We only want one resolution result. Discard all future results
resolverListener.onSuccessfulResolution = () => {};
if (addressList.length === 0) {
callback(new Error(`No addresses resolved for port ${port}`), 0);
deferredCallback(new Error(`No addresses resolved for port ${port}`), 0);
return;
}
let bindResultPromise: Promise<BindResult>;
Expand All @@ -541,26 +547,26 @@ export class Server {
if (bindResult.count === 0) {
const errorString = `No address added out of total ${addressList.length} resolved`;
logging.log(LogVerbosity.ERROR, errorString);
callback(new Error(errorString), 0);
deferredCallback(new Error(errorString), 0);
} else {
if (bindResult.count < addressList.length) {
logging.log(
LogVerbosity.INFO,
`WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`
);
}
callback(null, bindResult.port);
deferredCallback(null, bindResult.port);
}
},
(error) => {
const errorString = `No address added out of total ${addressList.length} resolved`;
logging.log(LogVerbosity.ERROR, errorString);
callback(new Error(errorString), 0);
deferredCallback(new Error(errorString), 0);
}
);
},
onError: (error) => {
callback(new Error(error.details), 0);
deferredCallback(new Error(error.details), 0);
},
};

Expand Down

0 comments on commit 8aec160

Please sign in to comment.