Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issues with user-defined codecs #108

Open
aleixalcacer opened this issue May 5, 2023 · 4 comments
Open

Issues with user-defined codecs #108

aleixalcacer opened this issue May 5, 2023 · 4 comments

Comments

@aleixalcacer
Copy link
Member

aleixalcacer commented May 5, 2023

I'm facing issues when creating a simple codec that just makes a copy of the data to get familiar with Blosc's registering machinery. I attach the code:

import blosc2
import numpy as np

# Create an User-defined codec (just a memcpy)

def encoder(input, output, meta, schunk: blosc2.SChunk):
    print(f"Encoder output size: {output.size}")
    output[:schunk.blocksize] = input[:schunk.blocksize]
    return schunk.blocksize

def decoder(input, output, meta, schunk: blosc2.SChunk):
    output[:schunk.blocksize] = input[:schunk.blocksize]
    return schunk.blocksize

# Register the codec
codec_id = 200
blosc2.register_codec('test1', codec_id, encoder, decoder)

# Compress this array with the new codec

shape = (100, 100)
a = np.ones(shape, dtype=np.int64)


cparams = {
    'codec': codec_id,
    'nthreads': 1,
    'filters': [],
    'splitmode': blosc2.SplitMode.NEVER_SPLIT,
}

dparams = {
    'nthreads': 1,
}

chunks = shape
blocks = (50, 50)

c_a = blosc2.asarray(a, chunks=chunks, blocks=blocks, cparams=cparams, dparams=dparams)

However, when I run the previous code, I get the following:

Encoder output size: 20000
Encoder output size: 20000
Encoder output size: 20000
Encoder output size: 19968

ValueError: could not broadcast input array from shape (20000,) into shape (19968,)

Looking at this, it appears that the last block of the chunk is smaller than the others. Do you know what is happening? Is there something I'm doing wrong?

@FrancescAlted
Copy link
Member

I have had a look into this. The problem in your approach is that you was trying to copy everything without testing whether there is available space. This can be fixed with something like:

def encoder(input, output, meta, schunk: blosc2.SChunk):
    try:
        output[:schunk.blocksize] = input[:schunk.blocksize]
    except:
        return 0
    return schunk.blocksize

But the best solution to copy is to just return 0; this way, blosc will know that the codec could not compress and it will copy the original buffer as-is:

def encoder(input, output, meta, schunk: blosc2.SChunk):
    # By returning 0, we are saying that we are not compressing anything
    return 0

With the next script:

import blosc2
import numpy as np
import sys

# Create a User-defined codec (just a memcpy)

def encoder(input, output, meta, schunk: blosc2.SChunk):
    #print(f"Encoder output size: {output.size, input.size, schunk.blocksize}")
    # By returning 0, we are saying that we are not compressing anything
    return 0
    # Alternative, but more time-consuming:
    try:
        output[:schunk.blocksize] = input[:schunk.blocksize]
    except:
        return 0
    return schunk.blocksize

def decoder(input, output, meta, schunk: blosc2.SChunk):
    #print(f"Decoder output size: {output.size, input.size, schunk.blocksize}")
    output[:schunk.blocksize] = input[:schunk.blocksize]
    return schunk.blocksize

# Register the codec
codec_id = 200
blosc2.register_codec('test1', codec_id, encoder, decoder)

# Compress this array with the new codec

shape = (100, 100)
a = np.ones(shape, dtype=np.int64)


cparams = {
    'codec': codec_id,
    'nthreads': 1,
    'filters': [],
    'splitmode': blosc2.SplitMode.NEVER_SPLIT,
}

dparams = {
    'nthreads': 1,
}

chunks = shape
blocks = (50, 50)

c_a = blosc2.asarray(a, chunks=chunks, blocks=blocks, cparams=cparams, dparams=dparams)
print(c_a.info)
print(c_a[:])

I am getting this output:

type    : NDArray
shape   : (100, 100)
chunks  : (100, 100)
blocks  : (50, 50)
dtype   : int64
cratio  : 1.00
cparams : {
    'blocksize': 20000,
    'clevel': 1,
    'codec': 200,
    'codec_meta': 0,
    'filters': [
        <Filter.NOFILTER: 0>,
        <Filter.NOFILTER: 0>,
        <Filter.NOFILTER: 0>,
        <Filter.NOFILTER: 0>,
        <Filter.NOFILTER: 0>,
        <Filter.NOFILTER: 0>
    ],
    'filters_meta': [0, 0, 0, 0, 0, 0],
    'nthreads': 1,
    'splitmode': <SplitMode.NEVER_SPLIT: 2>,
    'typesize': 8,
    'use_dict': 0
}
dparams : {'nthreads': 1}

[[1 1 1 ... 1 1 1]
 [1 1 1 ... 1 1 1]
 [1 1 1 ... 1 1 1]
 ...
 [1 1 1 ... 1 1 1]
 [1 1 1 ... 1 1 1]
 [1 1 1 ... 1 1 1]]
Error in sys.excepthook:

Original exception was:

Which is what you want. There is still the error:

Error in sys.excepthook:

Original exception was:

that frankly, I don't know where it comes from...

@FrancescAlted
Copy link
Member

FWIW, here it is a working version that does not raise the sys.excepthook exception (based on pytest-dev/execnet#30):

import blosc2
import numpy as np
import sys

# Create a User-defined codec (just a memcpy)

def encoder(input, output, meta, schunk: blosc2.SChunk):
    # By returning 0, we are saying that we are not compressing anything
    return 0

def decoder(input, output, meta, schunk: blosc2.SChunk):
    output[:schunk.blocksize] = input[:schunk.blocksize]
    return schunk.blocksize

def main():
    # Register the codec
    codec_id = 200
    blosc2.register_codec('test1', codec_id, encoder, decoder)

    # Compress this array with the new codec

    shape = (100, 100)
    a = np.ones(shape, dtype=np.int64)

    cparams = {
        'codec': codec_id,
        'nthreads': 1,
        'filters': [],
        'splitmode': blosc2.SplitMode.NEVER_SPLIT,
    }

    dparams = {
        'nthreads': 1,
    }

    chunks = shape
    blocks = (50, 50)

    c_a = blosc2.asarray(a, chunks=chunks, blocks=blocks, cparams=cparams, dparams=dparams)
    print(c_a.info)
    print(c_a[:])

if __name__ == '__main__':
    try:
        sys.exit(main())
    finally:
        # This block is crucial to avoid having issues with
        # Python spitting non-sense thread exceptions. We have already
        # handled what we could, so close stderr and stdout.
        try:
            sys.stdout.close()
        except:
            pass
        try:
            sys.stderr.close()
        except:
            pass

@FrancescAlted
Copy link
Member

After thinking twice, we should try to make your original code working, because we want a user-defined codec to continue working even when it cannot compress a chunk (which is not the case currently). Unfortunately, this requires some important changes in the underlying C-Blosc2 library (which is equally affected), and should be postponed til we have time. If you want to tackle this one, that would be great ;-)

@FrancescAlted FrancescAlted reopened this May 9, 2023
@aleixalcacer
Copy link
Member Author

Thanks for the response Francesc, I think the same that you. when I have time I will be happy to try to fix it :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants