Skip to content

Commit

Permalink
Merge pull request #109 from Maksim-Burtsev/insert-file
Browse files Browse the repository at this point in the history
Insert from file
  • Loading branch information
maximdanilchenko authored Jan 9, 2024
2 parents 693244b + efbf6ab commit 3e99c9f
Show file tree
Hide file tree
Showing 2 changed files with 195 additions and 3 deletions.
65 changes: 64 additions & 1 deletion aiochclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import warnings
from enum import Enum
from types import TracebackType
from typing import Any, AsyncGenerator, Dict, List, Optional, Type
from typing import Any, AsyncGenerator, BinaryIO, Dict, List, Optional, Type

from aiochclient.exceptions import ChClientError
from aiochclient.http_clients.abc import HttpClientABC
Expand Down Expand Up @@ -415,6 +415,56 @@ async def cursor(self, query: str, *args) -> AsyncGenerator[Record, None]:
async for row in self.iterate(query, *args):
yield row

async def insert_file(
self,
query: str,
file_obj: BinaryIO,
params: Optional[Dict[str, Any]] = None,
) -> None:
"""Insert file in any suppoted by ClickHouse format. Returns None.
:param str query: Clickhouse query string which include format part.
:param bool file_obj: File object to insert.
:param Optional[Dict[str, Any]] params: Params to escape inside query string.
Usage:
.. code-block:: python
with open('data.csv', 'rb') as f:
await client.insert_file(
"INSERT INTO t FORMAT CSV",
f.read(),
)
with open('data.json', 'rb') as f:
await client.insert_file(
"INSERT INTO t FORMAT JSONEachRow",
f.read(),
)
response = requests.get("https://url_to_download_parquet_file")
await client.insert_file(
"INSERT INTO t FORMAT Parquet",
response.content,
)
:return: Nothing.
"""
self._check_insert_file_query(query)

query_params = self._prepare_query_params(params)
if query_params:
query = query.format(**query_params)

params = {**self.params, "query": query}

await self._http_client.post_no_return(
url=self.url,
params=params,
headers=self.headers,
data=file_obj,
)

@staticmethod
def _parse_squery(query):
statement = sqlparse.parse(query)[0]
Expand All @@ -435,3 +485,16 @@ def _parse_squery(query):
else:
is_json = False
return need_fetch, is_json, statement_type

@staticmethod
def _check_insert_file_query(query: str) -> None:
statement = sqlparse.parse(query)[0]
if statement.get_type() != 'INSERT':
raise ChClientError('It is possible to insert file only with INSERT query')

if not statement.token_matching(
(lambda tk: tk.match(sqlparse.tokens.Keyword, 'FORMAT'),), 0
):
raise ChClientError(
'To insert file its required to specify `FORMAT [...] in the query.'
)
133 changes: 131 additions & 2 deletions tests.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import datetime as dt
import json
import os
from decimal import Decimal
from ipaddress import IPv4Address, IPv6Address
from uuid import uuid4
Expand Down Expand Up @@ -167,6 +169,7 @@ async def all_types_db(chclient, rows):
await chclient.execute("DROP TABLE IF EXISTS all_types")
await chclient.execute("DROP TABLE IF EXISTS test_cache")
await chclient.execute("DROP TABLE IF EXISTS test_cache_mv")
await chclient.execute("DROP TABLE IF EXISTS test_insert_file")
await chclient.execute(
"""
CREATE TABLE all_types (uint8 UInt8,
Expand Down Expand Up @@ -239,7 +242,15 @@ async def all_types_db(chclient, rows):
SELECT avgState(int32), sum(float32) FROM all_types
"""
)

await chclient.execute(
"""
CREATE TABLE test_insert_file(
uint32 UInt32,
string String,
date Date
) ENGINE = Memory
"""
)
await chclient.execute("INSERT INTO all_types VALUES", *rows)


Expand Down Expand Up @@ -973,7 +984,7 @@ async def test_describe_with_fetch(self):

async def test_show_tables_with_fetch(self):
tables = await self.ch.fetch("SHOW TABLES")
assert len(tables) == 3
assert len(tables) == 4
assert tables[0]._row.decode() == 'all_types'

async def test_aggr_merge_tree(self):
Expand Down Expand Up @@ -1126,3 +1137,121 @@ async def test_select_nested_json(self):
{'value1': 'inner', 'value2': '2018-09-22'},
{'value1': 'world', 'value2': '2018-09-23'},
]


@pytest.mark.usefixtures("class_chclient")
class TestInsertFile:
async def test_insert_csv_file(self):
# setup
data: str = """uint32,string,date
1,test,2024-01-03
2,hello world,2023-03-10
123,test string,2024-01-03"""
with open('test_data.csv', 'w') as f:
f.write(data)

# assert
with open('test_data.csv', 'rb') as f:
await self.ch.insert_file(
'INSERT INTO test_insert_file FORMAT CSV',
f.read(),
)
result = await self.ch.fetch(
"SELECT * FROM test_insert_file FORMAT JSONEachRow"
)
print(result)
assert result == [
{'uint32': 1, 'string': 'test', 'date': '2024-01-03'},
{'uint32': 2, 'string': 'hello world', 'date': '2023-03-10'},
{'uint32': 123, 'string': 'test string', 'date': '2024-01-03'},
]

# clean
os.remove('test_data.csv')

async def test_insert_json_file(self):
# setup
data = [
{"uint32": 1, "string": "test", "date": "2024-01-03"},
{"uint32": 2, "string": "hello world", "date": "2023-03-10"},
{"uint32": 3, "string": "", "date": "2018-09-21"},
{"uint32": 123, "string": "test string", "date": "2024-01-03"},
]

with open('test_data.json', 'w') as f:
f.write(json.dumps(data))

# assert
with open('test_data.json', 'rb') as f:
await self.ch.insert_file(
"INSERT INTO test_insert_file FORMAT JSONEachRow",
f.read(),
)
result = await self.ch.fetch("SELECT * FROM test_insert_file")
assert [row[:] for row in result] == [
(1, 'test', dt.date(2024, 1, 3)),
(2, 'hello world', dt.date(2023, 3, 10)),
(3, '', dt.date(2018, 9, 21)),
(123, 'test string', dt.date(2024, 1, 3)),
]

# clean
os.remove('test_data.json')

async def test_insert_tsv_file(self):
# setup
data = (
"uint32 string date\n"
"1 some test string 2024-01-03\n"
"1 test 2024-01-03\n"
"1 test 2024-01-03\n"
"2 hello world 2023-03-10\n"
"2 hello world 2023-03-10\n"
"2 hello world 2023-03-10\n"
"123 other things 2023-03-10\n"
"123 test string 2024-01-03"
)
with open('test_data.tsv', 'w') as f:
f.write(data)

# assert
with open('test_data.tsv', 'rb') as f:
await self.ch.insert_file(
"INSERT INTO test_insert_file FORMAT TabSeparated",
f.read(),
)
result = await self.ch.fetch("SELECT * FROM test_insert_file")
assert [row[:] for row in result] == [
(1, 'some test string', dt.date(2024, 1, 3)),
(1, 'test', dt.date(2024, 1, 3)),
(1, 'test', dt.date(2024, 1, 3)),
(2, 'hello world', dt.date(2023, 3, 10)),
(2, 'hello world', dt.date(2023, 3, 10)),
(2, 'hello world', dt.date(2023, 3, 10)),
(123, 'other things', dt.date(2023, 3, 10)),
(123, 'test string', dt.date(2024, 1, 3)),
]

# clean
os.remove('test_data.tsv')

async def test_insert_file_with_invalid_format(self):
# setup
data: str = (
"uint32,string,date\n"
"1,test,2024-01-03\n"
"2,hello world,2023-03-10\n"
"123,test string,2024-01-03"
)
with open('test_data.csv', 'w') as f:
f.write(data)

# assert
with open('test_data.csv', 'rb') as f:
with pytest.raises(ChClientError):
await self.ch.insert_file(
'INSERT INTO test_insert_file FORMAT TabSeparated',
f.read(),
)
# clean
os.remove('test_data.csv')

0 comments on commit 3e99c9f

Please sign in to comment.