Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory leak in ParserBuffer #579

Closed
joente opened this issue Oct 21, 2015 · 6 comments
Closed

Memory leak in ParserBuffer #579

joente opened this issue Oct 21, 2015 · 6 comments
Labels

Comments

@joente
Copy link

joente commented Oct 21, 2015

It looks like there is a memory leak in the ParserBuffer, caused by the _writer generator. This simplification shows the issue:

import gc
gc.collect() # => make sure we start clean

class ParserBuffer:
    # defined __slots__ in this simplification so we produce the same garbage result
    __slots__ = ('_writer', ) 

    def __init__(self):
        self._writer = self._feed_data()
        next(self._writer)

    def _feed_data(self):
        while True:
            chunk = yield

parserBuffer = ParserBuffer()
parserBuffer = None
gc.set_debug(gc.DEBUG_LEAK)
print(gc.collect())   # => Collects 3 objects

The generator is not closed and holds a reference to self and therefore creates a reference cycle. Is it really necessary to use a generator? (Or can we somehow make sure the generator is closed using self._writer.close() ?)

@joente
Copy link
Author

joente commented Oct 21, 2015

It's less elegant but maybe it's an option to not subclass bytearray but use a bytearray as attribute. The code below seems to work but i'm not sure this is the way to go.

class ParserBuffer:
    """ParserBuffer is NOT a bytearray extension anymore.

    ParserBuffer provides helper methods for parsers.
    """
    __slots__ = ('_exception', '_writer', '_data')

    def __init__(self, *args):
        self._data = bytearray(*args)
        self._exception = None
        self._writer = self._feed_data(self._data)
        next(self._writer)

    def exception(self):
        return self._exception

    def set_exception(self, exc):
        self._exception = exc

    @staticmethod
    def _feed_data(data):
        while True:
            chunk = yield
            if chunk:
                data.extend(chunk)

    def _check_exception(self):
        if self._exception:
            self._writer = self._feed_data(self._data)
            next(self._writer)
            raise self._exception

    def feed_data(self, data):
        self._writer.send(data)
        self._check_exception()

    def read(self, size):
        """read() reads specified amount of bytes."""

        while True:
            if len(self._data) >= size:
                data = self._data[:size]
                del self._data[:size]
                return data

            self._writer.send((yield))
            self._check_exception()

    def readsome(self, size=None):
        """reads size of less amount of bytes."""

        while True:
            length = len(self._data)
            if length > 0:
                if size is None or length < size:
                    size = length

                data = self._data[:size]
                del self._data[:size]
                return data

            self._writer.send((yield))
            self._check_exception()

    def readuntil(self, stop, limit=None):
        assert isinstance(stop, bytes) and stop, \
            'bytes is required: {!r}'.format(stop)

        stop_len = len(stop)

        while True:
            pos = self._data.find(stop)
            if pos >= 0:
                end = pos + stop_len
                size = end
                if limit is not None and size > limit:
                    raise errors.LineLimitExceededParserError(
                        'Line is too long.', limit)

                data = self._data[:size]
                del self._data[:size]
                return data
            else:
                if limit is not None and len(self._data) > limit:
                    raise errors.LineLimitExceededParserError(
                        'Line is too long.', limit)

            self._writer.send((yield))
            self._check_exception()

    def wait(self, size):
        """wait() waits for specified amount of bytes
        then returns data without changing internal buffer."""

        while True:
            if len(self._data) >= size:
                return self._data[:size]

            self._writer.send((yield))
            self._check_exception()

    def waituntil(self, stop, limit=None):
        """waituntil() reads until `stop` bytes sequence."""
        assert isinstance(stop, bytes) and stop, \
            'bytes is required: {!r}'.format(stop)

        stop_len = len(stop)

        while True:
            pos = self._data.find(stop)
            if pos >= 0:
                size = pos + stop_len
                if limit is not None and size > limit:
                    raise errors.LineLimitExceededParserError(
                        'Line is too long. %s' % bytes(self._data), limit)

                return self._data[:size]
            else:
                if limit is not None and len(self._data) > limit:
                    raise errors.LineLimitExceededParserError(
                        'Line is too long. %s' % bytes(self._data), limit)

            self._writer.send((yield))
            self._check_exception()

    def skip(self, size):
        """skip() skips specified amount of bytes."""

        while len(self._data) < size:
            self._writer.send((yield))
            self._check_exception()

        del self._data[:size]

    def skipuntil(self, stop):
        """skipuntil() reads until `stop` bytes sequence."""
        assert isinstance(stop, bytes) and stop, \
            'bytes is required: {!r}'.format(stop)

        stop_len = len(stop)

        while True:
            stop_line = self._data.find(stop)
            if stop_line >= 0:
                size = stop_line + stop_len
                del self._data[:size]
                return

            self._writer.send((yield))
            self._check_exception()

@fafhrd91
Copy link
Member

Did you run tests?

Sent from my iPhone

On Oct 21, 2015, at 7:27 AM, Jeroen van der Heijden [email protected] wrote:

It's less elegant but maybe it's an option to not subclass bytearray but use a bytearray as attribute. The code below seems to work but i'm not sure this is the way to go.

class ParserBuffer:
"""ParserBuffer is NOT a bytearray extension anymore.

ParserBuffer provides helper methods for parsers.
"""
__slots__ = ('_exception', '_writer', '_data')

def __init__(self, *args):
    self._data = bytearray(*args)
    self._exception = None
    self._writer = self._feed_data(self._data)
    next(self._writer)

def exception(self):
    return self._exception

def set_exception(self, exc):
    self._exception = exc

@staticmethod
def _feed_data(data):
    while True:
        chunk = yield
        if chunk:
            data.extend(chunk)

def _check_exception(self):
    if self._exception:
        self._writer = self._feed_data(self._data)
        next(self._writer)
        raise self._exception

def feed_data(self, data):
    self._writer.send(data)
    self._check_exception()

def read(self, size):
    """read() reads specified amount of bytes."""

    while True:
        if len(self._data) >= size:
            data = self._data[:size]
            del self._data[:size]
            return data

        self._writer.send((yield))
        self._check_exception()

def readsome(self, size=None):
    """reads size of less amount of bytes."""

    while True:
        length = len(self._data)
        if length > 0:
            if size is None or length < size:
                size = length

            data = self._data[:size]
            del self._data[:size]
            return data

        self._writer.send((yield))
        self._check_exception()

def readuntil(self, stop, limit=None):
    assert isinstance(stop, bytes) and stop, \
        'bytes is required: {!r}'.format(stop)

    stop_len = len(stop)

    while True:
        pos = self._data.find(stop)
        if pos >= 0:
            end = pos + stop_len
            size = end
            if limit is not None and size > limit:
                raise errors.LineLimitExceededParserError(
                    'Line is too long.', limit)

            data = self._data[:size]
            del self._data[:size]
            return data
        else:
            if limit is not None and len(self._data) > limit:
                raise errors.LineLimitExceededParserError(
                    'Line is too long.', limit)

        self._writer.send((yield))
        self._check_exception()

def wait(self, size):
    """wait() waits for specified amount of bytes
    then returns data without changing internal buffer."""

    while True:
        if len(self._data) >= size:
            return self._data[:size]

        self._writer.send((yield))
        self._check_exception()

def waituntil(self, stop, limit=None):
    """waituntil() reads until `stop` bytes sequence."""
    assert isinstance(stop, bytes) and stop, \
        'bytes is required: {!r}'.format(stop)

    stop_len = len(stop)

    while True:
        pos = self._data.find(stop)
        if pos >= 0:
            size = pos + stop_len
            if limit is not None and size > limit:
                raise errors.LineLimitExceededParserError(
                    'Line is too long. %s' % bytes(self._data), limit)

            return self._data[:size]
        else:
            if limit is not None and len(self._data) > limit:
                raise errors.LineLimitExceededParserError(
                    'Line is too long. %s' % bytes(self._data), limit)

        self._writer.send((yield))
        self._check_exception()

def skip(self, size):
    """skip() skips specified amount of bytes."""

    while len(self._data) < size:
        self._writer.send((yield))
        self._check_exception()

    del self._data[:size]

def skipuntil(self, stop):
    """skipuntil() reads until `stop` bytes sequence."""
    assert isinstance(stop, bytes) and stop, \
        'bytes is required: {!r}'.format(stop)

    stop_len = len(stop)

    while True:
        stop_line = self._data.find(stop)
        if stop_line >= 0:
            size = stop_line + stop_len
            del self._data[:size]
            return

        self._writer.send((yield))
        self._check_exception()


Reply to this email directly or view it on GitHub.

@joente
Copy link
Author

joente commented Oct 21, 2015

I did, the memory leak does not occur using this code. But maybe you have a better solution for the 'check_exception' part. I had to remove this from the _feed_data generator.

@fafhrd91
Copy link
Member

Could you create pull request for this change.

On Oct 21, 2015, at 7:27 AM, Jeroen van der Heijden [email protected] wrote:

It's less elegant but maybe it's an option to not subclass bytearray but use a bytearray as attribute. The code below seems to work but i'm not sure this is the way to go.

class ParserBuffer:
"""ParserBuffer is NOT a bytearray extension anymore.

ParserBuffer provides helper methods for parsers.
"""
__slots__ = ('_exception', '_writer', '_data')

def __init__(self, *args):
    self._data = bytearray(*args)
    self._exception = None
    self._writer = self._feed_data(self._data)
    next(self._writer)

def exception(self):
    return self._exception

def set_exception(self, exc):
    self._exception = exc

@staticmethod
def _feed_data(data):
    while True:
        chunk = yield
        if chunk:
            data.extend(chunk)

def _check_exception(self):
    if self._exception:
        self._writer = self._feed_data(self._data)
        next(self._writer)
        raise self._exception

def feed_data(self, data):
    self._writer.send(data)
    self._check_exception()

def read(self, size):
    """read() reads specified amount of bytes."""

    while True:
        if len(self._data) >= size:
            data = self._data[:size]
            del self._data[:size]
            return data

        self._writer.send((yield))
        self._check_exception()

def readsome(self, size=None):
    """reads size of less amount of bytes."""

    while True:
        length = len(self._data)
        if length > 0:
            if size is None or length < size:
                size = length

            data = self._data[:size]
            del self._data[:size]
            return data

        self._writer.send((yield))
        self._check_exception()

def readuntil(self, stop, limit=None):
    assert isinstance(stop, bytes) and stop, \
        'bytes is required: {!r}'.format(stop)

    stop_len = len(stop)

    while True:
        pos = self._data.find(stop)
        if pos >= 0:
            end = pos + stop_len
            size = end
            if limit is not None and size > limit:
                raise errors.LineLimitExceededParserError(
                    'Line is too long.', limit)

            data = self._data[:size]
            del self._data[:size]
            return data
        else:
            if limit is not None and len(self._data) > limit:
                raise errors.LineLimitExceededParserError(
                    'Line is too long.', limit)

        self._writer.send((yield))
        self._check_exception()

def wait(self, size):
    """wait() waits for specified amount of bytes
    then returns data without changing internal buffer."""

    while True:
        if len(self._data) >= size:
            return self._data[:size]

        self._writer.send((yield))
        self._check_exception()

def waituntil(self, stop, limit=None):
    """waituntil() reads until `stop` bytes sequence."""
    assert isinstance(stop, bytes) and stop, \
        'bytes is required: {!r}'.format(stop)

    stop_len = len(stop)

    while True:
        pos = self._data.find(stop)
        if pos >= 0:
            size = pos + stop_len
            if limit is not None and size > limit:
                raise errors.LineLimitExceededParserError(
                    'Line is too long. %s' % bytes(self._data), limit)

            return self._data[:size]
        else:
            if limit is not None and len(self._data) > limit:
                raise errors.LineLimitExceededParserError(
                    'Line is too long. %s' % bytes(self._data), limit)

        self._writer.send((yield))
        self._check_exception()

def skip(self, size):
    """skip() skips specified amount of bytes."""

    while len(self._data) < size:
        self._writer.send((yield))
        self._check_exception()

    del self._data[:size]

def skipuntil(self, stop):
    """skipuntil() reads until `stop` bytes sequence."""
    assert isinstance(stop, bytes) and stop, \
        'bytes is required: {!r}'.format(stop)

    stop_len = len(stop)

    while True:
        stop_line = self._data.find(stop)
        if stop_line >= 0:
            size = stop_line + stop_len
            del self._data[:size]
            return

        self._writer.send((yield))
        self._check_exception()


Reply to this email directly or view it on GitHub #579 (comment).

@fafhrd91
Copy link
Member

Thanks! i added you to contributors list.

@lock
Copy link

lock bot commented Oct 29, 2019

This thread has been automatically locked since there has not been
any recent activity after it was closed. Please open a new issue for
related bugs.

If you feel like there's important points made in this discussion,
please include those exceprts into that new issue.

@lock lock bot added the outdated label Oct 29, 2019
@lock lock bot locked as resolved and limited conversation to collaborators Oct 29, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

2 participants