Skip to content

Commit

Permalink
WIP testing Buffer issue
Browse files Browse the repository at this point in the history
  • Loading branch information
shannonwells committed Sep 13, 2024
1 parent 58ca110 commit bd9767c
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 39 deletions.
2 changes: 1 addition & 1 deletion .tool-versions
Original file line number Diff line number Diff line change
@@ -1 +1 @@
nodejs 18.18.2
nodejs 18.20.4
3 changes: 2 additions & 1 deletion lib/codec/encoding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ export function bitWidth(value: number): number {
* @param {number} width - width of each bit-packed group
* @param {number} length - length of the encoded data, in bytes (?)
* @param {DecodedArray} output
* @param {disableEnvelope} - set to true to consume entire buffer, false to assume (and therefore skip) a 4 byte header
*/
export function readRleBitPackedHybrid(reader: DataReader, width: number, length: number, output: DecodedArray, disableEnvelope?: boolean) {

Expand Down Expand Up @@ -121,7 +122,7 @@ export function readBitPacked(reader: DataReader,
seen: number): number {
let count = header >> 1 << 3 // values to read
const mask = (1 << bitWidth) - 1
// when reading definition levels v2 on readColumnChunk, ArrayBuffer len is 69 only

let data = 0
if (reader.offset < reader.view.byteLength) {
data = reader.view.getUint8(reader.offset++)
Expand Down
6 changes: 4 additions & 2 deletions lib/codec/plain_dictionary.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import {readRleBitPackedHybrid} from "./encoding";
export const decodeValues = function (type: string, cursor: Cursor, count: number, opts: Options) {

Check failure on line 5 in lib/codec/plain_dictionary.ts

View workflow job for this annotation

GitHub Actions / test

'opts' is defined but never used. Allowed unused args must match /^_/u
const bitWidth = cursor.buffer.subarray(cursor.offset, cursor.offset + 1).readInt8(0);
cursor.offset += 1;
// old:
// return rle.decodeValues(type, cursor, count, Object.assign({}, opts, { disableEnvelope: true, bitWidth }));
const reader: DataReader = {
view: new DataView(cursor.buffer.buffer, cursor.offset),
offset: 0,
}
let output: DecodedArray = new Array(count);

Check failure on line 14 in lib/codec/plain_dictionary.ts

View workflow job for this annotation

GitHub Actions / test

'output' is never reassigned. Use 'const' instead
readRleBitPackedHybrid(reader, bitWidth, count, output, true)
cursor.offset += reader.offset
const disableEnvelope = true;
readRleBitPackedHybrid(reader, bitWidth, count, output, disableEnvelope)
return output;
};
9 changes: 6 additions & 3 deletions lib/codec/rle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// https://github.com/apache/parquet-format/blob/master/Encodings.md

import varint from 'varint';
import { Cursor } from './types';
import {Cursor} from './types';
import {readBitPacked, readRle, readRleBitPackedHybrid, readVarInt} from "./encoding";

Check failure on line 7 in lib/codec/rle.ts

View workflow job for this annotation

GitHub Actions / test

'readRle' is defined but never used. Allowed unused vars must match /^_/u

Check failure on line 7 in lib/codec/rle.ts

View workflow job for this annotation

GitHub Actions / test

'readRleBitPackedHybrid' is defined but never used. Allowed unused vars must match /^_/u

function encodeRunBitpacked(values: number[], opts: { bitWidth: number }) {
Expand Down Expand Up @@ -111,6 +111,10 @@ export const encodeValues = function (
// opts.bitWidth is undefined when the boolean values are being passed
// decode a bitpacked value
// setting old code to true here only results in the RLE/bitpacked hybrid test failing, so we know that code is bad.
// cursor: Cursor containing the data to be decoded
// count: the number of values expected to result from the decoding
// opts: bitWidth is required.
// returns: a DecodedArray
export function decodeRunBitpacked(cursor: Cursor, count: number, opts: { bitWidth: number }): Array<number> {

Check failure on line 118 in lib/codec/rle.ts

View workflow job for this annotation

GitHub Actions / test

Array type using 'Array<number>' is forbidden. Use 'number[]' instead
const run_old_code = true;
let output = new Array(count).fill(0);

Check failure on line 120 in lib/codec/rle.ts

View workflow job for this annotation

GitHub Actions / test

'output' is never reassigned. Use 'const' instead
Expand All @@ -127,8 +131,7 @@ export function decodeRunBitpacked(cursor: Cursor, count: number, opts: { bitWid

cursor.offset += opts.bitWidth * (count / 8);
} else {
let arrayBuf = cursor.buffer.buffer.slice(0, count);
const view = new DataView(arrayBuf, cursor.offset, count);
const view = new DataView(cursor.buffer.buffer, cursor.offset);
const reader = {view, offset: 0}
const header = readVarInt(reader);
readBitPacked(reader, header, opts.bitWidth, output, 0)
Expand Down
2 changes: 1 addition & 1 deletion lib/datapageV2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export const readDefinitionLevelsV2 = (reader: DataReader,
// V2 we know the length
const values = new Array(daph2.num_values)
const bitWidth = getBitWidth(dLevelMax)
let disableEnvelope = daph2.definition_levels_byte_length === 0
const disableEnvelope = true
readRleBitPackedHybrid(reader, bitWidth, daph2.definition_levels_byte_length, values, disableEnvelope)
return values
}
Expand Down
16 changes: 3 additions & 13 deletions lib/reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { GetObjectCommand, HeadObjectCommand, S3Client } from '@aws-sdk/client-s
import type { Readable } from 'stream';
import type { Blob } from 'buffer';
import {readDefinitionLevelsV2, readRepetitionLevelsV2} from "./datapageV2";
import {dataReaderFromCursor} from "./util";

const { getBloomFiltersFor } = bloomFilterReader;

Expand Down Expand Up @@ -96,7 +97,6 @@ class ParquetCursor {
this.columnList
);

// now this one is *@($&ing up, it's dematerializing records
this.rowGroup = parquet_shredder.materializeRecords(this.schema, rowBuffer);
this.rowGroupIndex++;
this.cursorIndex = 0;
Expand Down Expand Up @@ -933,7 +933,7 @@ async function decodePages(buffer: Buffer, opts: Options) {
pageData.values = pageData.values!.map((d) => opts.dictionary![d]);
}

const length = pageData.rlevels !== undefined ? pageData.dlevels?.length : 0;
const length = pageData.rlevels !== undefined ? pageData.rlevels?.length : 0;

if (pageData.rlevels?.length) {
data.rlevels = pageData.rlevels;
Expand Down Expand Up @@ -1062,12 +1062,6 @@ async function decodeDataPage(cursor: Cursor, header: parquet_thrift.PageHeader,
};
}

// ensures minimum allocation of ArrayBuffer for the DataView.
function dataViewFromCursor(cursor: Cursor, offset?: number): DataView {
// @ts-ignore
return new DataView(cursor.buffer.buffer, cursor.offset);
}

async function decodeDataPageV2(cursor: Cursor, header: parquet_thrift.PageHeader, opts: Options): Promise<Record<string, any>> {
const cursorEnd = cursor.offset + header.compressed_page_size;
const dataPageHeaderV2 = header.data_page_header_v2!;
Expand All @@ -1077,12 +1071,8 @@ async function decodeDataPageV2(cursor: Cursor, header: parquet_thrift.PageHeade
const valueEncoding = parquet_util.getThriftEnum(parquet_thrift.Encoding, dataPageHeaderV2.encoding);

/* read repetition levels */
const use_old_rlevels = false;
let rLevels: Array<any>;
let reader: DataReader = {
view: dataViewFromCursor(cursor),
offset: 0
}
let reader = dataReaderFromCursor(cursor, 0)

rLevels = readRepetitionLevelsV2(reader, dataPageHeaderV2, opts.rLevelMax || 0);
reader.offset = dataPageHeaderV2.repetition_levels_byte_length;
Expand Down
7 changes: 7 additions & 0 deletions lib/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import fs, { WriteStream } from 'fs';
import * as parquet_thrift from '../gen-nodejs/parquet_types';
import { FileMetaDataExt, WriterOptions } from './declare';
import { Int64 } from 'thrift';
import {Cursor, DataReader} from "./codec/types";

// Use this so users only need to implement the minimal amount of the WriteStream interface
export type WriteStreamMinimal = Pick<WriteStream, 'write' | 'end'>;
Expand Down Expand Up @@ -229,3 +230,9 @@ export const fieldIndexOf = function (arr: unknown[][], elem: unknown[]) {
export const cloneInteger = (int: Int64) => {
return new Int64(int.valueOf());
};


export function dataReaderFromCursor(data: Cursor, offset?: number): DataReader {
let view = new DataView(data.buffer.buffer, data.offset);
return { view , offset: offset || 0};
}
2 changes: 1 addition & 1 deletion test/codec_rle.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ function dataViewFromArray(data) {
const ab = new ArrayBuffer(data.length,{ maxByteLength: data.length });
let view = new DataView(ab, 0);
data.forEach((val,idx) => view.setUint8(idx, val));
return view;
return view
}

describe('ParquetCodec::RLE', function () {
Expand Down
51 changes: 34 additions & 17 deletions test/lib/codec/rle.test.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,36 @@
import { expect } from 'chai';
import { decodeRunBitpacked } from '../../../lib/codec/rle';
import type { Cursor } from '../../../lib/codec/types';
import {expect} from 'chai';
import {decodeRunBitpacked} from '../../../lib/codec/rle';
import {readRleBitPackedHybrid} from "../../../lib/codec/encoding";

describe('RLE Codec', function () {
describe('#decodeRunBitpacked', function () {
it('can decode a known bitpack value', function () {
const cursor: Cursor = {
// 136, 1, left off the front
buffer: Buffer.from([7, 251, 127, 127, 28, 1, 9, 254, 251, 191, 63]),
offset: 0,
};
const values = decodeRunBitpacked(cursor, 24, { bitWidth: 1 });
expect(values.length).equals(24);
});
it('can decode a known bitpack value', function () {
// 136 = 10001000, 0x80 = 128 or 100000000
const bitPackedDecBuffer = Buffer.from([136, 1, 7, 251, 127, 127, 28, 1, 9, 254, 251, 191, 63]);

const myView = new DataView(bitPackedDecBuffer.buffer);
let decoder = new TextDecoder('utf-8');
const res = bitPackedDecBuffer.map((val, i, ary) => myView.getUint8(i));
// Answer was `var N1 = Mat` which is part of the compiled parquetjs library.
// Undid some changes and now it's `module.exports = read`...
console.log(decoder.decode(res));

const reader = {view: myView, offset: 0};
const values = new Array(26).fill(0);
const expected = [
1, 1, 1, 1, 0, 1, 1, 0,
0, 1, 1, 1, 0, 1, 1, 0,
1, 1, 0, 0, 1, 1, 1, 0,
0, 0
];
readRleBitPackedHybrid(reader, 1, 13, values, true);
expect(values.length).equals(expected.length)
// correct?
values.forEach((val, i) => {
expect(val, `${val} != ${expected[i]} for i = ${i}`).equals(expected[i])
})
});

describe('#decodeRunBitpacked', function () {
// use the example from the documentation for RLE/Bitpacked hybrid,
// https://parquet.apache.org/docs/file-format/data-pages/encodings/#RLE
it('writes and reads bit packed values for documentation example correctly', () => {
Expand All @@ -26,7 +43,7 @@ describe('RLE Codec', function () {

// the number of values? or the number of bits used for encoding a set of values ?
const bitPackedRunLength = 8;
const bitPackedScaledRunLength = bitPackedRunLength/8;
const bitPackedScaledRunLength = bitPackedRunLength / 8;
// in the grammar it says it's EITHER the left-shifted value OR 1 if the shifted value is 0?? but that makes
// no sense and results in an error.
const shiftedBPSRL = (bitPackedScaledRunLength << 1) | 1;
Expand All @@ -41,10 +58,10 @@ describe('RLE Codec', function () {
const bitWidth = 3;

// number of expected values in the result = 8
const cursor = { buffer: Buffer.from(view.buffer), offset: 0 }
const values = decodeRunBitpacked(cursor, decVals.length, {bitWidth} );
const cursor = {buffer: Buffer.from(view.buffer), offset: 0}
const values = decodeRunBitpacked(cursor, decVals.length, {bitWidth});
expect(values.length).equals(decVals.length);
values.forEach((val,i) => {
values.forEach((val, i) => {
expect(val).equals(decVals[i]);
})
});
Expand Down

0 comments on commit bd9767c

Please sign in to comment.