Skip to content

Commit

Permalink
Adding vector serialization and deserialization functions for PyVelox (
Browse files Browse the repository at this point in the history
…#4400)

Summary:
This PR adds support for `VectorSaver.h` functionality for serializing and deserializing vectors.

`VectorSaver.h` includes `saveVectorToFile` and `restoreVectorFromFile` methods. This PR includes bindings for those methods as `save_vector` and `load_vector`.

Pull Request resolved: #4400

Reviewed By: laithsakka

Differential Revision: D45975301

Pulled By: kgpai

fbshipit-source-id: 3d52b08f8c0642ddb8b5b507ea8ea11e34df2f55
  • Loading branch information
vibhatha authored and facebook-github-bot committed May 26, 2023
1 parent fd2972d commit 527d9ca
Show file tree
Hide file tree
Showing 7 changed files with 269 additions and 51 deletions.
2 changes: 1 addition & 1 deletion pyvelox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ if(VELOX_BUILD_PYTHON_PACKAGE)
include_directories(SYSTEM ${CMAKE_SOURCE_DIR})
add_definitions(-DCREATE_PYVELOX_MODULE -DVELOX_DISABLE_GOOGLETEST)
# Define our Python module:
pybind11_add_module(pyvelox MODULE pyvelox.cpp signatures.cpp)
pybind11_add_module(pyvelox MODULE pyvelox.cpp serde.cpp signatures.cpp)
# Link with Velox:
target_link_libraries(
pyvelox
Expand Down
70 changes: 70 additions & 0 deletions pyvelox/context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <pybind11/stl.h>
#include "velox/common/memory/Memory.h"
#include "velox/core/QueryCtx.h"

namespace facebook::velox::py {

/// PyVeloxContext is used only during function binding time. Its a utility
/// that manages pool, query and exec context for Velox expressions and vectors.
struct PyVeloxContext {
static inline PyVeloxContext& getSingletonInstance() {
if (!instance_) {
instance_ = std::unique_ptr<PyVeloxContext>(new PyVeloxContext());
}
return *instance_.get();
}

facebook::velox::memory::MemoryPool* pool() {
return pool_.get();
}

facebook::velox::core::QueryCtx* queryCtx() {
return queryCtx_.get();
}

facebook::velox::core::ExecCtx* execCtx() {
return execCtx_.get();
}

static inline void cleanup() {
if (instance_) {
instance_.reset();
}
}

private:
PyVeloxContext() = default;
PyVeloxContext(const PyVeloxContext&) = delete;
PyVeloxContext(const PyVeloxContext&&) = delete;
PyVeloxContext& operator=(const PyVeloxContext&) = delete;
PyVeloxContext& operator=(const PyVeloxContext&&) = delete;

std::shared_ptr<facebook::velox::memory::MemoryPool> pool_ =
facebook::velox::memory::addDefaultLeafMemoryPool();
std::shared_ptr<facebook::velox::core::QueryCtx> queryCtx_ =
std::make_shared<facebook::velox::core::QueryCtx>();
std::unique_ptr<facebook::velox::core::ExecCtx> execCtx_ =
std::make_unique<facebook::velox::core::ExecCtx>(
pool_.get(),
queryCtx_.get());

static inline std::unique_ptr<PyVeloxContext> instance_;
};

} // namespace facebook::velox::py
8 changes: 5 additions & 3 deletions pyvelox/pyvelox.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "pyvelox.h"
#include "serde.h"
#include "signatures.h"

namespace facebook::velox::py {
Expand Down Expand Up @@ -202,13 +203,13 @@ static VectorPtr evaluateExpression(
}
}
auto rowType = ROW(std::move(names), std::move(types));
memory::MemoryPool* pool = PyVeloxContext::getInstance().pool();
memory::MemoryPool* pool = PyVeloxContext::getSingletonInstance().pool();
RowVectorPtr rowVector = std::make_shared<RowVector>(
pool, rowType, BufferPtr{nullptr}, numRows, inputs);
core::TypedExprPtr typed = core::Expressions::inferTypes(expr, rowType, pool);
exec::ExprSet set({typed}, PyVeloxContext::getInstance().execCtx());
exec::ExprSet set({typed}, PyVeloxContext::getSingletonInstance().execCtx());
exec::EvalCtx evalCtx(
PyVeloxContext::getInstance().execCtx(), &set, rowVector.get());
PyVeloxContext::getSingletonInstance().execCtx(), &set, rowVector.get());
SelectivityVector rows(numRows);
std::vector<VectorPtr> result;
set.eval(rows, evalCtx, result);
Expand Down Expand Up @@ -292,6 +293,7 @@ PYBIND11_MODULE(pyvelox, m) {

addVeloxBindings(m);
addSignatureBindings(m);
addSerdeBindings(m);
m.attr("__version__") = "dev";
}
#endif
Expand Down
53 changes: 6 additions & 47 deletions pyvelox/pyvelox.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,53 +31,12 @@
#include <velox/vector/FlatVector.h>
#include "folly/json.h"

#include "context.h"

namespace facebook::velox::py {

namespace py = pybind11;

struct PyVeloxContext {
PyVeloxContext() = default;
PyVeloxContext(const PyVeloxContext&) = delete;
PyVeloxContext(const PyVeloxContext&&) = delete;
PyVeloxContext& operator=(const PyVeloxContext&) = delete;
PyVeloxContext& operator=(const PyVeloxContext&&) = delete;

static inline PyVeloxContext& getInstance() {
if (!instance_) {
instance_ = std::make_unique<PyVeloxContext>();
}
return *instance_.get();
}

facebook::velox::memory::MemoryPool* pool() {
return pool_.get();
}
facebook::velox::core::QueryCtx* queryCtx() {
return queryCtx_.get();
}
facebook::velox::core::ExecCtx* execCtx() {
return execCtx_.get();
}

static inline void cleanup() {
if (instance_) {
instance_.reset();
}
}

private:
std::shared_ptr<facebook::velox::memory::MemoryPool> pool_ =
facebook::velox::memory::addDefaultLeafMemoryPool();
std::shared_ptr<facebook::velox::core::QueryCtx> queryCtx_ =
std::make_shared<facebook::velox::core::QueryCtx>();
std::unique_ptr<facebook::velox::core::ExecCtx> execCtx_ =
std::make_unique<facebook::velox::core::ExecCtx>(
pool_.get(),
queryCtx_.get());

static inline std::unique_ptr<PyVeloxContext> instance_;
};

static std::string serializeType(
const std::shared_ptr<const velox::Type>& type);

Expand Down Expand Up @@ -426,13 +385,13 @@ static void addVectorBindings(
});

m.def("from_list", [](const py::list& list) mutable {
return pyListToVector(list, PyVeloxContext::getInstance().pool());
return pyListToVector(list, PyVeloxContext::getSingletonInstance().pool());
});
m.def(
"constant_vector",
[](const py::handle& obj, vector_size_t length, TypePtr type) {
return pyToConstantVector(
obj, length, PyVeloxContext::getInstance().pool(), type);
obj, length, PyVeloxContext::getSingletonInstance().pool(), type);
},
py::arg("value"),
py::arg("length"),
Expand All @@ -442,7 +401,7 @@ static void addVectorBindings(
"dictionary_vector",
[](VectorPtr baseVector, const py::list& indices_list) {
BufferPtr indices_buffer = AlignedBuffer::allocate<vector_size_t>(
indices_list.size(), PyVeloxContext::getInstance().pool());
indices_list.size(), PyVeloxContext::getSingletonInstance().pool());
vector_size_t* indices_ptr = indices_buffer->asMutable<vector_size_t>();
for (size_t i = 0; i < indices_list.size(); i++) {
if (!py::isinstance<py::int_>(indices_list[i]))
Expand All @@ -456,7 +415,7 @@ static void addVectorBindings(
baseVector->typeKind(),
std::move(indices_buffer),
std::move(baseVector),
PyVeloxContext::getInstance().pool());
PyVeloxContext::getSingletonInstance().pool());
});
}

Expand Down
88 changes: 88 additions & 0 deletions pyvelox/serde.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "serde.h"
#include "context.h"

#include <velox/vector/VectorSaver.h>

namespace facebook::velox::py {

namespace py = pybind11;

namespace {
VectorPtr pyRestoreVectorFromFileHelper(const char* FOLLY_NONNULL filePath) {
using namespace facebook::velox;
memory::MemoryPool* pool = PyVeloxContext::getSingletonInstance().pool();
return restoreVectorFromFile(filePath, pool);
}
} // namespace

void addSerdeBindings(py::module& m, bool asModuleLocalDefinitions) {
using namespace facebook::velox;

m.def(
"save_vector",
&saveVectorToFile,
R"delimiter(
Serializes the vector into binary format and writes it to a new file.
Parameters
----------
vector : Union[FlatVector, ConstantVector, DictionaryVector]
The vector to be saved.
file_path: str
The path to which the vector will be saved.
Returns
-------
None
Examples
--------
>>> import pyvelox.pyvelox as pv
>>> vec = pv.from_list([1, 2, 3])
>>> pv.save_vector(vec, '/tmp/flatvector.bin')
)delimiter",
py::arg("vector"),
py::arg("file_path"));
m.def(
"load_vector",
&pyRestoreVectorFromFileHelper,
R"delimiter(
Reads and deserializes a vector from a file stored by save_vector.
Parameters
----------
file_path: str
The path from which the vector will be loaded.
Returns
-------
Union[FlatVector, ConstantVector, DictionaryVector]
Examples
--------
>>> import pyvelox.pyvelox as pv
>>> pv.load_vector('/tmp/flatvector.bin')
<pyvelox.pyvelox.FlatVector_BIGINT object at 0x7f8f6f818bb0>
)delimiter",
py::arg("file_path"));
}

} // namespace facebook::velox::py
36 changes: 36 additions & 0 deletions pyvelox/serde.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <pybind11/pybind11.h>
#include <pybind11/stl.h>

namespace facebook::velox::py {

namespace py = pybind11;

/// Adds serialization and deserialization bindings to module m.
/// This adds bindings to save and load Vectors.
///
/// @param m Module to add bindings to.
/// @param asModuleLocalDefinitions If true then these bindings are only
/// visible inside the module. Refer to
/// https://pybind11.readthedocs.io/en/stable/advanced/classes.html#module-local-class-bindings
/// for further details.
void addSerdeBindings(py::module& m, bool asModuleLocalDefinitions = true);

} // namespace facebook::velox::py
63 changes: 63 additions & 0 deletions pyvelox/test/test_serde.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import shutil
import tempfile
import unittest
from os import path

import pyvelox.pyvelox as pv


class TestVeloxVectorSaver(unittest.TestCase):
def setUp(self):
# create a temporary directory
self.test_dir = tempfile.mkdtemp()

def tearDown(self):
# remove the temporary directory
shutil.rmtree(self.test_dir)

def make_flat_vector(self):
return pv.from_list([1, 2, 3])

def make_const_vector(self):
return pv.constant_vector(1000, 10)

def make_dict_vector(self):
base_indices = [0, 0, 1, 0, 2]
return pv.dictionary_vector(pv.from_list([1, 2, 3]), base_indices)

def test_serde_vector(self):
data = {
"flat_vector": self.make_flat_vector(),
"const_vector": self.make_const_vector(),
"dict_vector": self.make_dict_vector(),
}

paths = {
"flat_vector": path.join(self.test_dir, "flat.pyvelox"),
"const_vector": path.join(self.test_dir, "const.pyvelox"),
"dict_vector": path.join(self.test_dir, "dict.pyvelox"),
}

for vec_key, fpath_key in zip(data, paths):
vec = data[vec_key]
fpath = paths[fpath_key]
pv.save_vector(vector=vec, file_path=fpath)
loaded_vec = pv.load_vector(file_path=fpath)
self.assertEqual(len(vec), len(loaded_vec))
self.assertEqual(vec.dtype(), loaded_vec.dtype())
for i in range(len(vec)):
self.assertEqual(vec[i], loaded_vec[i])

0 comments on commit 527d9ca

Please sign in to comment.