Skip to content

Commit

Permalink
Compress streams in notebook outputs (#160946)
Browse files Browse the repository at this point in the history
* Revert "Compress notebook output streams before rendering (#160667)"

This reverts commit 4230c22.

* Compress stream output items

* Minor perf improvements

* Misc

* Comments

* Added tests

* Merge issue

* More merge issues

* Misc

* Address code review comments
  • Loading branch information
DonJayamanne authored Oct 11, 2022
1 parent 4322170 commit 43957cc
Show file tree
Hide file tree
Showing 9 changed files with 341 additions and 141 deletions.
27 changes: 15 additions & 12 deletions extensions/ipynb/src/serializers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import type * as nbformat from '@jupyterlab/nbformat';
import { NotebookCell, NotebookCellData, NotebookCellKind, NotebookCellOutput } from 'vscode';
import { CellOutputMetadata } from './common';
import { textMimeTypes } from './deserializers';
import { compressOutputItemStreams } from './streamCompressor';

const textDecoder = new TextDecoder();

Expand Down Expand Up @@ -277,17 +276,21 @@ type JupyterOutput =

function convertStreamOutput(output: NotebookCellOutput): JupyterOutput {
const outputs: string[] = [];
const compressedStream = output.items.length ? new TextDecoder().decode(compressOutputItemStreams(output.items[0].mime, output.items)) : '';
// Ensure each line is a separate entry in an array (ending with \n).
const lines = compressedStream.split('\n');
// If the last item in `outputs` is not empty and the first item in `lines` is not empty, then concate them.
// As they are part of the same line.
if (outputs.length && lines.length && lines[0].length > 0) {
outputs[outputs.length - 1] = `${outputs[outputs.length - 1]}${lines.shift()!}`;
}
for (const line of lines) {
outputs.push(line);
}
output.items
.filter((opit) => opit.mime === CellOutputMimeTypes.stderr || opit.mime === CellOutputMimeTypes.stdout)
.map((opit) => textDecoder.decode(opit.data))
.forEach(value => {
// Ensure each line is a separate entry in an array (ending with \n).
const lines = value.split('\n');
// If the last item in `outputs` is not empty and the first item in `lines` is not empty, then concate them.
// As they are part of the same line.
if (outputs.length && lines.length && lines[0].length > 0) {
outputs[outputs.length - 1] = `${outputs[outputs.length - 1]}${lines.shift()!}`;
}
for (const line of lines) {
outputs.push(line);
}
});

for (let index = 0; index < (outputs.length - 1); index++) {
outputs[index] = `${outputs[index]}\n`;
Expand Down
63 changes: 0 additions & 63 deletions extensions/ipynb/src/streamCompressor.ts

This file was deleted.

26 changes: 26 additions & 0 deletions src/vs/workbench/api/common/extHostNotebookDocument.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,32 @@ export class ExtHostCell {
output.items.length = 0;
}
output.items.push(...newItems);

if (output.items.length > 1 && output.items.every(item => notebookCommon.isTextStreamMime(item.mime))) {
// Look for the mimes in the items, and keep track of their order.
// Merge the streams into one output item, per mime type.
const mimeOutputs = new Map<string, Uint8Array[]>();
const mimeTypes: string[] = [];
output.items.forEach(item => {
let items: Uint8Array[];
if (mimeOutputs.has(item.mime)) {
items = mimeOutputs.get(item.mime)!;
} else {
items = [];
mimeOutputs.set(item.mime, items);
mimeTypes.push(item.mime);
}
items.push(item.data);
});
output.items.length = 0;
mimeTypes.forEach(mime => {
const compressed = notebookCommon.compressOutputItemStreams(mimeOutputs.get(mime)!);
output.items.push({
mime,
data: compressed.buffer
});
});
}
}
}

Expand Down
75 changes: 74 additions & 1 deletion src/vs/workbench/api/test/browser/extHostNotebook.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { TestRPCProtocol } from 'vs/workbench/api/test/common/testRPCProtocol';
import { DisposableStore } from 'vs/base/common/lifecycle';
import { NullLogService } from 'vs/platform/log/common/log';
import { mock } from 'vs/base/test/common/mock';
import { IModelAddedData, MainContext, MainThreadCommandsShape, MainThreadNotebookShape } from 'vs/workbench/api/common/extHost.protocol';
import { IModelAddedData, MainContext, MainThreadCommandsShape, MainThreadNotebookShape, NotebookCellsChangedEventDto, NotebookOutputItemDto } from 'vs/workbench/api/common/extHost.protocol';
import { ExtHostNotebookController } from 'vs/workbench/api/common/extHostNotebook';
import { ExtHostNotebookDocument } from 'vs/workbench/api/common/extHostNotebookDocument';
import { CellKind, CellUri, NotebookCellsChangeType } from 'vs/workbench/contrib/notebook/common/notebookCommon';
Expand Down Expand Up @@ -542,4 +542,77 @@ suite('NotebookCell#Document', function () {
assert.ok(cellChange.metadata === undefined);
assert.ok(cellChange.outputs === undefined);
});

async function replaceOutputs(cellIndex: number, outputId: string, outputItems: NotebookOutputItemDto[]) {
const changeEvent = Event.toPromise(extHostNotebookDocuments.onDidChangeNotebookDocument);
extHostNotebookDocuments.$acceptModelChanged(notebook.uri, new SerializableObjectWithBuffers<NotebookCellsChangedEventDto>({
versionId: notebook.apiNotebook.version + 1,
rawEvents: [{
kind: NotebookCellsChangeType.Output,
index: cellIndex,
outputs: [{ outputId, items: outputItems }]
}]
}), false);
await changeEvent;
}
async function appendOutputItem(cellIndex: number, outputId: string, outputItems: NotebookOutputItemDto[]) {
const changeEvent = Event.toPromise(extHostNotebookDocuments.onDidChangeNotebookDocument);
extHostNotebookDocuments.$acceptModelChanged(notebook.uri, new SerializableObjectWithBuffers<NotebookCellsChangedEventDto>({
versionId: notebook.apiNotebook.version + 1,
rawEvents: [{
kind: NotebookCellsChangeType.OutputItem,
index: cellIndex,
append: true,
outputId,
outputItems
}]
}), false);
await changeEvent;
}
test('Append multiple text/plain output items', async function () {
await replaceOutputs(1, '1', [{ mime: 'text/plain', valueBytes: VSBuffer.fromString('foo') }]);
await appendOutputItem(1, '1', [{ mime: 'text/plain', valueBytes: VSBuffer.fromString('bar') }]);
await appendOutputItem(1, '1', [{ mime: 'text/plain', valueBytes: VSBuffer.fromString('baz') }]);


assert.strictEqual(notebook.apiNotebook.cellAt(1).outputs.length, 1);
assert.strictEqual(notebook.apiNotebook.cellAt(1).outputs[0].items.length, 3);
assert.strictEqual(notebook.apiNotebook.cellAt(1).outputs[0].items[0].mime, 'text/plain');
assert.strictEqual(VSBuffer.wrap(notebook.apiNotebook.cellAt(1).outputs[0].items[0].data).toString(), 'foo');
assert.strictEqual(notebook.apiNotebook.cellAt(1).outputs[0].items[1].mime, 'text/plain');
assert.strictEqual(VSBuffer.wrap(notebook.apiNotebook.cellAt(1).outputs[0].items[1].data).toString(), 'bar');
assert.strictEqual(notebook.apiNotebook.cellAt(1).outputs[0].items[2].mime, 'text/plain');
assert.strictEqual(VSBuffer.wrap(notebook.apiNotebook.cellAt(1).outputs[0].items[2].data).toString(), 'baz');
});
test('Append multiple stdout stream output items to an output with another mime', async function () {
await replaceOutputs(1, '1', [{ mime: 'text/plain', valueBytes: VSBuffer.fromString('foo') }]);
await appendOutputItem(1, '1', [{ mime: 'application/vnd.code.notebook.stdout', valueBytes: VSBuffer.fromString('bar') }]);
await appendOutputItem(1, '1', [{ mime: 'application/vnd.code.notebook.stdout', valueBytes: VSBuffer.fromString('baz') }]);

assert.strictEqual(notebook.apiNotebook.cellAt(1).outputs.length, 1);
assert.strictEqual(notebook.apiNotebook.cellAt(1).outputs[0].items.length, 3);
assert.strictEqual(notebook.apiNotebook.cellAt(1).outputs[0].items[0].mime, 'text/plain');
assert.strictEqual(notebook.apiNotebook.cellAt(1).outputs[0].items[1].mime, 'application/vnd.code.notebook.stdout');
assert.strictEqual(notebook.apiNotebook.cellAt(1).outputs[0].items[2].mime, 'application/vnd.code.notebook.stdout');
});
test('Compress multiple stdout stream output items', async function () {
await replaceOutputs(1, '1', [{ mime: 'application/vnd.code.notebook.stdout', valueBytes: VSBuffer.fromString('foo') }]);
await appendOutputItem(1, '1', [{ mime: 'application/vnd.code.notebook.stdout', valueBytes: VSBuffer.fromString('bar') }]);
await appendOutputItem(1, '1', [{ mime: 'application/vnd.code.notebook.stdout', valueBytes: VSBuffer.fromString('baz') }]);

assert.strictEqual(notebook.apiNotebook.cellAt(1).outputs.length, 1);
assert.strictEqual(notebook.apiNotebook.cellAt(1).outputs[0].items.length, 1);
assert.strictEqual(notebook.apiNotebook.cellAt(1).outputs[0].items[0].mime, 'application/vnd.code.notebook.stdout');
assert.strictEqual(VSBuffer.wrap(notebook.apiNotebook.cellAt(1).outputs[0].items[0].data).toString(), 'foobarbaz');
});
test('Compress multiple stderr stream output items', async function () {
await replaceOutputs(1, '1', [{ mime: 'application/vnd.code.notebook.stderr', valueBytes: VSBuffer.fromString('foo') }]);
await appendOutputItem(1, '1', [{ mime: 'application/vnd.code.notebook.stderr', valueBytes: VSBuffer.fromString('bar') }]);
await appendOutputItem(1, '1', [{ mime: 'application/vnd.code.notebook.stderr', valueBytes: VSBuffer.fromString('baz') }]);

assert.strictEqual(notebook.apiNotebook.cellAt(1).outputs.length, 1);
assert.strictEqual(notebook.apiNotebook.cellAt(1).outputs[0].items.length, 1);
assert.strictEqual(notebook.apiNotebook.cellAt(1).outputs[0].items[0].mime, 'application/vnd.code.notebook.stderr');
assert.strictEqual(VSBuffer.wrap(notebook.apiNotebook.cellAt(1).outputs[0].items[0].data).toString(), 'foobarbaz');
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import { NOTEBOOK_WEBVIEW_BOUNDARY } from 'vs/workbench/contrib/notebook/browser
import { preloadsScriptStr } from 'vs/workbench/contrib/notebook/browser/view/renderers/webviewPreloads';
import { transformWebviewThemeVars } from 'vs/workbench/contrib/notebook/browser/view/renderers/webviewThemeMapping';
import { MarkupCellViewModel } from 'vs/workbench/contrib/notebook/browser/viewModel/markupCellViewModel';
import { CellUri, INotebookRendererInfo, isTextStreamMime, NotebookSetting, RendererMessagingSpec } from 'vs/workbench/contrib/notebook/common/notebookCommon';
import { CellUri, INotebookRendererInfo, NotebookSetting, RendererMessagingSpec } from 'vs/workbench/contrib/notebook/common/notebookCommon';
import { INotebookKernel } from 'vs/workbench/contrib/notebook/common/notebookKernelService';
import { IScopedRendererMessaging } from 'vs/workbench/contrib/notebook/common/notebookRendererMessagingService';
import { INotebookService } from 'vs/workbench/contrib/notebook/common/notebookService';
Expand All @@ -44,7 +44,6 @@ import { WebviewWindowDragMonitor } from 'vs/workbench/contrib/webview/browser/w
import { IEditorGroupsService } from 'vs/workbench/services/editor/common/editorGroupsService';
import { IWorkbenchEnvironmentService } from 'vs/workbench/services/environment/common/environmentService';
import { FromWebviewMessage, IAckOutputHeight, IClickedDataUrlMessage, ICodeBlockHighlightRequest, IContentWidgetTopRequest, IControllerPreload, ICreationContent, ICreationRequestMessage, IFindMatch, IMarkupCellInitialization, RendererMetadata, ToWebviewMessage } from './webviewMessages';
import { compressOutputItemStreams } from 'vs/workbench/contrib/notebook/browser/view/renderers/stdOutErrorPreProcessor';
import { DeferredPromise } from 'vs/base/common/async';

export interface ICachedInset<K extends ICommonCellInfo> {
Expand Down Expand Up @@ -1277,14 +1276,12 @@ var requirejs = (function() {
let updatedContent: ICreationContent | undefined = undefined;
if (content.type === RenderOutputType.Extension) {
const output = content.source.model;
const firstBuffer = isTextStreamMime(content.mimeType) ?
compressOutputItemStreams(content.mimeType, output.outputs) :
output.outputs.find(op => op.mime === content.mimeType)!.data.buffer;
const firstBuffer = output.outputs.find(op => op.mime === content.mimeType)!.data;
updatedContent = {
type: RenderOutputType.Extension,
outputId: outputCache.outputId,
mimeType: content.mimeType,
valueBytes: firstBuffer,
valueBytes: firstBuffer.buffer,
metadata: output.metadata,
};
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { TextModel } from 'vs/editor/common/model/textModel';
import { PLAINTEXT_LANGUAGE_ID } from 'vs/editor/common/languages/modesRegistry';
import { ILanguageService } from 'vs/editor/common/languages/language';
import { NotebookCellOutputTextModel } from 'vs/workbench/contrib/notebook/common/model/notebookCellOutputTextModel';
import { CellInternalMetadataChangedEvent, CellKind, ICell, ICellDto2, ICellOutput, IOutputDto, IOutputItemDto, NotebookCellCollapseState, NotebookCellInternalMetadata, NotebookCellMetadata, NotebookCellOutputsSplice, TransientOptions } from 'vs/workbench/contrib/notebook/common/notebookCommon';
import { CellInternalMetadataChangedEvent, CellKind, compressOutputItemStreams, ICell, ICellDto2, ICellOutput, IOutputDto, IOutputItemDto, isTextStreamMime, NotebookCellCollapseState, NotebookCellInternalMetadata, NotebookCellMetadata, NotebookCellOutputsSplice, TransientOptions } from 'vs/workbench/contrib/notebook/common/notebookCommon';

export class NotebookCellTextModel extends Disposable implements ICell {
private readonly _onDidChangeOutputs = this._register(new Emitter<NotebookCellOutputsSplice>());
Expand Down Expand Up @@ -297,6 +297,31 @@ export class NotebookCellTextModel extends Disposable implements ICell {
} else {
output.replaceData(items);
}
if (output.outputs.length > 1 && output.outputs.every(item => isTextStreamMime(item.mime))) {
// Look for the mimes in the items, and keep track of their order.
// Merge the streams into one output item, per mime type.
const mimeOutputs = new Map<string, Uint8Array[]>();
const mimeTypes: string[] = [];
output.outputs.forEach(item => {
let items: Uint8Array[];
if (mimeOutputs.has(item.mime)) {
items = mimeOutputs.get(item.mime)!;
} else {
items = [];
mimeOutputs.set(item.mime, items);
mimeTypes.push(item.mime);
}
items.push(item.data.buffer);
});
output.outputs.length = 0;
mimeTypes.forEach(mime => {
const compressed = compressOutputItemStreams(mimeOutputs.get(mime)!);
output.outputs.push({
mime,
data: compressed
});
});
}

this._onDidChangeOutputItems.fire();
return true;
Expand Down
Loading

0 comments on commit 43957cc

Please sign in to comment.