Skip to content

Commit

Permalink
Merge pull request #369 from brendandburns/watch
Browse files Browse the repository at this point in the history
Add some missing error handling.
  • Loading branch information
k8s-ci-robot authored Nov 19, 2019
2 parents 34cfa68 + c7892e3 commit 36c9095
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/watch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export class DefaultRequest implements RequestInterface {
}

export class Watch {
public static SERVER_SIDE_CLOSE = { error: 'Connection closed on server' };
public config: KubeConfig;
private readonly requestImpl: RequestInterface;

Expand Down Expand Up @@ -64,6 +65,10 @@ export class Watch {
// ignore parse errors
}
});
stream.on('error', (err) => {
done(err);
});
stream.on('close', () => done(null));
const req = this.requestImpl.webRequest(requestOptions, (error, response, body) => {
if (error) {
done(error);
Expand Down
117 changes: 117 additions & 0 deletions src/watch_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,123 @@ describe('Watch', () => {
expect(doneErr).to.deep.equal(errIn);
});

it('should handle errors correctly', () => {
const kc = new KubeConfig();
Object.assign(kc, fakeConfig);
const fakeRequestor = mock(DefaultRequest);
const watch = new Watch(kc, instance(fakeRequestor));

const obj1 = {
type: 'ADDED',
object: {
foo: 'bar',
},
};

const errIn = { error: 'err' };
const fakeRequest = {
pipe: (stream) => {
stream.write(JSON.stringify(obj1) + '\n');
stream.emit('error', errIn);
},
};

when(fakeRequestor.webRequest(anything(), anyFunction())).thenReturn(fakeRequest);

const path = '/some/path/to/object';

const receivedTypes: string[] = [];
const receivedObjects: string[] = [];
let doneCalled = false;
let doneErr: any;

watch.watch(
path,
{},
(phase: string, obj: string) => {
receivedTypes.push(phase);
receivedObjects.push(obj);
},
(err: any) => {
doneCalled = true;
doneErr = err;
},
);

verify(fakeRequestor.webRequest(anything(), anyFunction()));

const [opts, doneCallback] = capture(fakeRequestor.webRequest).last();
const reqOpts: request.OptionsWithUri = opts as request.OptionsWithUri;

expect(reqOpts.uri).to.equal(`${server}${path}`);
expect(reqOpts.method).to.equal('GET');
expect(reqOpts.json).to.equal(true);

expect(receivedTypes).to.deep.equal([obj1.type]);
expect(receivedObjects).to.deep.equal([obj1.object]);

expect(doneCalled).to.equal(true);
expect(doneErr).to.deep.equal(errIn);
});

it('should handle server side close correctly', () => {
const kc = new KubeConfig();
Object.assign(kc, fakeConfig);
const fakeRequestor = mock(DefaultRequest);
const watch = new Watch(kc, instance(fakeRequestor));

const obj1 = {
type: 'ADDED',
object: {
foo: 'bar',
},
};

const fakeRequest = {
pipe: (stream) => {
stream.write(JSON.stringify(obj1) + '\n');
stream.emit('close');
},
};

when(fakeRequestor.webRequest(anything(), anyFunction())).thenReturn(fakeRequest);

const path = '/some/path/to/object';

const receivedTypes: string[] = [];
const receivedObjects: string[] = [];
let doneCalled = false;
let doneErr: any;

watch.watch(
path,
{},
(phase: string, obj: string) => {
receivedTypes.push(phase);
receivedObjects.push(obj);
},
(err: any) => {
doneCalled = true;
doneErr = err;
},
);

verify(fakeRequestor.webRequest(anything(), anyFunction()));

const [opts, doneCallback] = capture(fakeRequestor.webRequest).last();
const reqOpts: request.OptionsWithUri = opts as request.OptionsWithUri;

expect(reqOpts.uri).to.equal(`${server}${path}`);
expect(reqOpts.method).to.equal('GET');
expect(reqOpts.json).to.equal(true);

expect(receivedTypes).to.deep.equal([obj1.type]);
expect(receivedObjects).to.deep.equal([obj1.object]);

expect(doneCalled).to.equal(true);
expect(doneErr).to.be.null;
});

it('should ignore JSON parse errors', () => {
const kc = new KubeConfig();
Object.assign(kc, fakeConfig);
Expand Down

0 comments on commit 36c9095

Please sign in to comment.