Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimized buffered byte writing #62

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions performance/reference/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,33 @@ def dtype_from_element(value: tp.Optional[tp.Hashable]) -> np.dtype:
return np.array(value).dtype


NDITER_FLAGS = ('external_loop', 'buffered', 'zerosize_ok')
BUFFERSIZE_NUMERATOR = 16 * 1024 ** 2
# for 8 bytes this would give 2,097,152 bytes

def array_bytes_to_file(
array: np.ndarray,
file: tp.BinaryIO,
):
buffersize = max(BUFFERSIZE_NUMERATOR // array.itemsize, 1)
flags = array.flags
if flags.f_contiguous and not flags.c_contiguous:
for chunk in np.nditer(
array,
flags=NDITER_FLAGS,
buffersize=buffersize,
order='F',
):
file.write(chunk.tobytes('C'))
else:
for chunk in np.nditer(
array,
flags=NDITER_FLAGS,
buffersize=buffersize,
order='C',
):
file.write(chunk.tobytes('C'))

def get_new_indexers_and_screen_ref(
indexers: np.ndarray,
positions: np.ndarray,
Expand Down Expand Up @@ -260,6 +287,3 @@ def count_iteration(iterable: tp.Iterable):
count += 1
return count




1 change: 1 addition & 0 deletions src/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from ._arraykit import resolve_dtype_iter as resolve_dtype_iter
from ._arraykit import isna_element as isna_element
from ._arraykit import dtype_from_element as dtype_from_element
from ._arraykit import array_bytes_to_file as array_bytes_to_file
from ._arraykit import delimited_to_arrays as delimited_to_arrays
from ._arraykit import iterable_str_to_array_1d as iterable_str_to_array_1d
from ._arraykit import get_new_indexers_and_screen as get_new_indexers_and_screen
Expand Down
3 changes: 2 additions & 1 deletion src/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,5 @@ def resolve_dtype(__d1: np.dtype, __d2: np.dtype) -> np.dtype: ...
def resolve_dtype_iter(__dtypes: tp.Iterable[np.dtype]) -> np.dtype: ...
def isna_element(__value: tp.Any) -> bool: ...
def dtype_from_element(__value: tp.Optional[tp.Hashable]) -> np.dtype: ...
def get_new_indexers_and_screen(indexers: np.ndarray, positions: np.ndarray) -> tp.Tuple[np.ndarray, np.ndarray]: ...
def array_bytes_to_file(__array: np.ndarray, __file: tp.IO) -> int: ...
def get_new_indexers_and_screen(indexers: np.ndarray, positions: np.ndarray) -> tp.Tuple[np.ndarray, np.ndarray]: ...
91 changes: 91 additions & 0 deletions src/_arraykit.c
Original file line number Diff line number Diff line change
Expand Up @@ -3158,6 +3158,96 @@ array_deepcopy(PyObject *m, PyObject *args, PyObject *kwargs)
return AK_ArrayDeepCopy(m, (PyArrayObject*)array, memo);
}



// Wites array bytes to an open, writeable file. Possibly return number of bytes written. This is similar to what tofile() does but tofile() cannot be used on a _ZipWriteFile when writing into a zip (raises io.UnsupportedOperation: fileno)
static PyObject *
array_bytes_to_file(PyObject *Py_UNUSED(m), PyObject *args)
{

PyObject *array;
PyObject *file;

if (!PyArg_ParseTuple(args, "OO:array_bytes_to_file",
&array, &file)) // how to validate file type?
{
return NULL;
}
AK_CHECK_NUMPY_ARRAY(array);

PyObject *write_func = PyObject_GetAttrString(file, "write");
if (!write_func) {
goto error;
}
PyObject *mv;
PyObject *ret;
size_t elsize = PyArray_DESCR((PyArrayObject*)array)->elsize;

// this is what PyArray_ToFile to does
if (PyArray_ISCONTIGUOUS((PyArrayObject*)array)) {
npy_intp size = PyArray_SIZE((PyArrayObject*)array);
// might use PyMemoryView_GetContiguous
mv = PyMemoryView_FromMemory(PyArray_DATA((PyArrayObject*)array), size * elsize, 0);
ret = PyObject_CallFunctionObjArgs(write_func, mv, NULL);
Py_DECREF(mv);
Py_DECREF(ret); }
else {
PyArrayIterObject *it = (PyArrayIterObject *) PyArray_IterNew(array);
if (it == NULL) {
return NULL;
}
while (it->index < it->size) {
mv = PyMemoryView_FromMemory(it->dataptr, elsize, 0);
ret = PyObject_CallFunctionObjArgs(write_func, mv, NULL);

PyArray_ITER_NEXT(it);
Py_DECREF(mv);
Py_DECREF(ret);
}
Py_DECREF(it);
}
Py_DECREF(write_func);

// dummy return
PyObject *post = PyLong_FromLong(3);
if (!post) {
return NULL;
}
return post;

error:
return NULL;

}

// can create memory view object and pass this to the write method
// PyObject *PyMemoryView_FromMemory(char *mem, Py_ssize_t size, int flags)
// PyObject *PyMemoryView_GetContiguous(PyObject *obj, int buffertype, char order)

// from PyArray_ToString: create an empty bytes object and write to it

// fwrite((const void *)it->dataptr,
// (size_t) PyArray_DESCR(self)->elsize,
// 1, fp)


// ret = PyBytes_FromStringAndSize(NULL, (Py_ssize_t) numbytes);
// if (ret == NULL) {
// Py_DECREF(it);
// return NULL;
// }
// dptr = PyBytes_AS_STRING(ret);
// i = it->size;
// elsize = PyArray_DESCR(self)->elsize;
// while (i--) {
// memcpy(dptr, it->dataptr, elsize);
// dptr += elsize;
// PyArray_ITER_NEXT(it);
// }
// Py_DECREF(it);



//------------------------------------------------------------------------------
// type resolution

Expand Down Expand Up @@ -3873,6 +3963,7 @@ static PyMethodDef arraykit_methods[] = {
(PyCFunction)array_deepcopy,
METH_VARARGS | METH_KEYWORDS,
NULL},
{"array_bytes_to_file", array_bytes_to_file, METH_VARARGS, NULL},
{"resolve_dtype", resolve_dtype, METH_VARARGS, NULL},
{"resolve_dtype_iter", resolve_dtype_iter, METH_O, NULL},
{"delimited_to_arrays",
Expand Down
51 changes: 50 additions & 1 deletion test/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@
import collections
import datetime
import unittest
import itertools
import typing as tp
from contextlib import contextmanager
import os
from os import PathLike
from pathlib import Path
import tempfile

import warnings
from io import StringIO
import numpy as np # type: ignore
Expand All @@ -17,6 +25,7 @@
from arraykit import array_deepcopy
from arraykit import isna_element
from arraykit import dtype_from_element
from arraykit import array_bytes_to_file
from arraykit import split_after_count
from arraykit import count_iteration

Expand All @@ -25,6 +34,27 @@

from performance.reference.util import mloc as mloc_ref

PathSpecifier = tp.Union[str, PathLike]

@contextmanager
def temp_file(suffix: tp.Optional[str] = None,
path: bool = False
) -> tp.Iterator[PathSpecifier]:
try:
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as f:
tmp_name = f.name
if path:
yield Path(tmp_name)
else:
yield tmp_name
finally:
if os.path.exists(tmp_name):
try:
os.unlink(tmp_name)
except PermissionError: # happens on Windows sometimes
pass



class TestUnit(unittest.TestCase):

Expand Down Expand Up @@ -395,6 +425,26 @@ def test_dtype_from_element_str_and_bytes_dtypes(self) -> None:
self.assertEqual(np.dtype(f'|S{size}'), dtype_from_element(bytes(size)))
self.assertEqual(np.dtype(f'<U{size}'), dtype_from_element('x' * size))


#---------------------------------------------------------------------------
def test_array_bytes_to_file_a(self) -> None:

a1 = np.array([3, 4, 5])
with temp_file('.npy') as fp:
with open(fp, 'wb') as f:
count = array_bytes_to_file(a1, f)
self.assertTrue(count > 0)
# import ipdb; ipdb.set_trace()

with open(fp, 'r') as f:
a2 = np.fromfile(f, dtype=a1.dtype)
self.assertTrue((a1 == a2).all())
# print(a2)
# import ipdb; ipdb.set_trace()
pass

#---------------------------------------------------------------------------

def test_dtype_from_element_int(self) -> None:
# make sure all platforms give 64 bit int
self.assertEqual(str(dtype_from_element(3)), 'int64')
Expand Down Expand Up @@ -505,6 +555,5 @@ def test_count_iteration_b(self) -> None:
self.assertEqual(post, 5)



if __name__ == '__main__':
unittest.main()