Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(person-on-events): Enable CI to run using both old and new queries #9814

Merged
merged 21 commits into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions .github/actions/run-backend-tests/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ inputs:
group:
required: true
type: number
person-on-events:
required: true
type: boolean

runs:
using: 'composite'
Expand Down Expand Up @@ -85,7 +88,7 @@ runs:
pytest -m "not ee and not async_migrations" posthog/ --cov --cov-report=xml:coverage-postgres.xml --durations=150 --durations-min=1.0

- name: Run ee/ tests
if: ${{ inputs.ee == 'true' }}
if: ${{ inputs.ee == 'true' && inputs.person-on-events != 'true'}}
shell: bash
run: |
pytest ee \
Expand All @@ -97,6 +100,15 @@ runs:
--snapshot-update\
--durations=100 --durations-min=1.0

- name: Run ee/ tests with person-on-events, no snapshot commits
if: ${{ inputs.ee == 'true' && inputs.person-on-events == 'true' }}
shell: bash
run: |
ENABLE_ACTOR_ON_EVENTS_TEAMS=all pytest ee \
--splits ${{ inputs.concurrency }} \
--group ${{ inputs.group }} \
--snapshot-update
Copy link
Member

@EDsCODE EDsCODE May 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's supposed to be happening here? Not sure how this is preventing snapshots

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, these updates aren't committed so it never has any impact

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep exactly


- name: Run pytest.mark.ee tests
if: ${{ inputs.ee == 'true' && inputs.group == '1' }}
shell: bash
Expand All @@ -112,7 +124,7 @@ runs:

- name: Upload updated timing data as artifacts
uses: actions/upload-artifact@v2
if: ${{ inputs.ee == 'true' }}
if: ${{ inputs.ee == 'true' && inputs.person-on-events != 'true'}}
with:
name: timing_data-${{ inputs.group }}
path: .test_durations
48 changes: 46 additions & 2 deletions .github/workflows/ci-backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ jobs:
ee: [true]
foss: [false]
name: ['']
person-on-events: [false]
# :NOTE: Keep concurrency and group's in sync
concurrency: [5]
group: [1, 2, 3, 4, 5]
Expand All @@ -207,6 +208,48 @@ jobs:
clickhouse-server-image: 'yandex/clickhouse-server:21.6.5'
concurrency: 1
group: 1
# :TRICKY: Run person-on-events tests only for one CH instance
# But still run in parallel
- python-version: '3.8.12'
ee: true
foss: false
person-on-events: true
name: 'Person on Events'
clickhouse-server-image: 'clickhouse/clickhouse-server:21.11.11.1'
concurrency: 5
group: 1
- python-version: '3.8.12'
ee: true
foss: false
person-on-events: true
name: 'Person on Events'
clickhouse-server-image: 'clickhouse/clickhouse-server:21.11.11.1'
concurrency: 5
group: 2
- python-version: '3.8.12'
ee: true
foss: false
person-on-events: true
name: 'Person on Events'
clickhouse-server-image: 'clickhouse/clickhouse-server:21.11.11.1'
concurrency: 5
group: 3
- python-version: '3.8.12'
ee: true
foss: false
person-on-events: true
name: 'Person on Events'
clickhouse-server-image: 'clickhouse/clickhouse-server:21.11.11.1'
concurrency: 5
group: 4
- python-version: '3.8.12'
ee: true
foss: false
person-on-events: true
name: 'Person on Events'
clickhouse-server-image: 'clickhouse/clickhouse-server:21.11.11.1'
concurrency: 5
group: 5

steps:
- uses: actions/checkout@v2
Expand All @@ -222,12 +265,13 @@ jobs:
python-version: ${{ matrix.python-version }}
ee: ${{ matrix.ee }}
foss: ${{ matrix.foss }}
clickhouse-server-image-version: ${{ matrix.clickhouse-server-image-version }}
clickhouse-server-image: ${{ matrix.clickhouse-server-image }}
concurrency: ${{ matrix.concurrency }}
group: ${{ matrix.group }}
person-on-events: ${{ matrix.person-on-events }}

- uses: EndBug/add-and-commit@v9 # You can change this to use a specific version.
if: ${{ matrix.clickhouse-server-image-version == '21.11.11.1' }}
if: ${{ matrix.clickhouse-server-image == 'clickhouse/clickhouse-server:21.11.11.1' && !matrix.person-on-events }}
with:
# The arguments for the `git add` command (see the paragraph below for more info)
# Default: '.'
Expand Down
23 changes: 21 additions & 2 deletions ee/clickhouse/models/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def create_event(
return str(event_uuid)


def bulk_create_events(events: List[Dict[str, Any]]):
def bulk_create_events(events: List[Dict[str, Any]], person_mapping: Optional[Dict[str, Person]] = None) -> None:
"""
TEST ONLY
Insert events in bulk. List of dicts:
Expand Down Expand Up @@ -88,10 +88,22 @@ def bulk_create_events(events: List[Dict[str, Any]]):
elements_chain = elements_to_string(elements=event.get("elements")) # type: ignore

inserts.append(
"(%(uuid_{i})s, %(event_{i})s, %(properties_{i})s, %(timestamp_{i})s, %(team_id_{i})s, %(distinct_id_{i})s, %(elements_chain_{i})s, %(created_at_{i})s, now(), 0)".format(
"(%(uuid_{i})s, %(event_{i})s, %(properties_{i})s, %(timestamp_{i})s, %(team_id_{i})s, %(distinct_id_{i})s, %(elements_chain_{i})s, %(person_id_{i})s, %(person_properties_{i})s, %(group0_properties_{i})s, %(group1_properties_{i})s, %(group2_properties_{i})s, %(group3_properties_{i})s, %(group4_properties_{i})s, %(created_at_{i})s, now(), 0)".format(
i=index
)
)

#  use person properties mapping to populate person properties in given event
if person_mapping and person_mapping.get(event["distinct_id"]):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going to need to handle group properties in some way like this too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aye, leaving it for a PR where groups get tackled, as that provides an easy way to test the changes are sound.

Right now, since there's no code actually using the instance setting, the new CI stuff still runs the old code, with extra event properties populated.

person_properties = person_mapping[event["distinct_id"]].properties
person_id = person_mapping[event["distinct_id"]].uuid

event = {
**event,
"person_properties": {**person_properties, **event.get("person_properties", {})},
"person_id": person_id,
}

event = {
"uuid": str(event["event_uuid"]) if event.get("event_uuid") else str(uuid.uuid4()),
"event": event["event"],
Expand All @@ -101,6 +113,13 @@ def bulk_create_events(events: List[Dict[str, Any]]):
"distinct_id": str(event["distinct_id"]),
"elements_chain": elements_chain,
"created_at": timestamp,
"person_id": event["person_id"] if event.get("person_id") else "00000000-0000-0000-0000-000000000000",
"person_properties": json.dumps(event["person_properties"]) if event.get("person_properties") else "{}",
"group0_properties": json.dumps(event["group0_properties"]) if event.get("group0_properties") else "{}",
"group1_properties": json.dumps(event["group1_properties"]) if event.get("group1_properties") else "{}",
"group2_properties": json.dumps(event["group2_properties"]) if event.get("group2_properties") else "{}",
"group3_properties": json.dumps(event["group3_properties"]) if event.get("group3_properties") else "{}",
"group4_properties": json.dumps(event["group4_properties"]) if event.get("group4_properties") else "{}",
}

params = {**params, **{"{}_{}".format(key, index): value for key, value in event.items()}}
Expand Down
4 changes: 4 additions & 0 deletions ee/clickhouse/models/person.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def person_distinct_id_deleted(sender, instance: PersonDistinctId, **kwargs):

def bulk_create_persons(persons_list: List[Dict]):
persons = []
person_mapping = {}
for _person in persons_list:
with ExitStack() as stack:
if _person.get("created_at"):
Expand All @@ -75,6 +76,7 @@ def bulk_create_persons(persons_list: List[Dict]):
PersonDistinctId(person_id=person.pk, distinct_id=distinct_id, team_id=person.team_id)
)
distinct_id_inserts.append(f"('{distinct_id}', '{person.uuid}', {person.team_id}, 0, 0, now(), 0, 0)")
person_mapping[distinct_id] = person

created_at = now().strftime("%Y-%m-%d %H:%M:%S.%f")
timestamp = now().strftime("%Y-%m-%d %H:%M:%S")
Expand All @@ -86,6 +88,8 @@ def bulk_create_persons(persons_list: List[Dict]):
sync_execute(INSERT_PERSON_BULK_SQL + ", ".join(person_inserts), flush=False)
sync_execute(BULK_INSERT_PERSON_DISTINCT_ID2 + ", ".join(distinct_id_inserts), flush=False)

return person_mapping


def create_person(
team_id: int,
Expand Down
5 changes: 1 addition & 4 deletions ee/clickhouse/queries/funnels/test/test_funnel.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from freezegun.api import freeze_time
from rest_framework.exceptions import ValidationError

from ee.clickhouse.materialized_columns import materialize
from ee.clickhouse.queries.funnels.funnel import ClickhouseFunnel
from ee.clickhouse.queries.funnels.funnel_persons import ClickhouseFunnelActors
from ee.clickhouse.queries.funnels.test.breakdown_cases import (
Expand Down Expand Up @@ -1084,6 +1083,7 @@ def test_funnel_exclusions_with_actions(self):
self._get_actor_ids_at_step(filter, 2), [person1.uuid, person3.uuid],
)

@test_with_materialized_columns(["test_prop"])
def test_funnel_with_denormalised_properties(self):
filters = {
"events": [
Expand All @@ -1101,8 +1101,6 @@ def test_funnel_with_denormalised_properties(self):
"date_to": "2020-01-14",
}

materialize("events", "test_prop")

filter = Filter(data=filters)
funnel = ClickhouseFunnel(filter, self.team)

Expand All @@ -1119,7 +1117,6 @@ def test_funnel_with_denormalised_properties(self):
team=self.team, event="paid", distinct_id="user_1", timestamp="2020-01-10T14:00:00Z",
)

self.assertNotIn("json", funnel.get_query().lower())
result = funnel.run()

self.assertEqual(result[0]["name"], "user signed up")
Expand Down
2 changes: 1 addition & 1 deletion ee/clickhouse/sql/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@

BULK_INSERT_EVENT_SQL = (
lambda: f"""
INSERT INTO {EVENTS_DATA_TABLE()} (uuid, event, properties, timestamp, team_id, distinct_id, elements_chain, created_at, _timestamp, _offset)
INSERT INTO {EVENTS_DATA_TABLE()} (uuid, event, properties, timestamp, team_id, distinct_id, elements_chain, person_id, person_properties, group0_properties, group1_properties, group2_properties, group3_properties, group4_properties, created_at, _timestamp, _offset)
VALUES
"""
)
Expand Down
34 changes: 31 additions & 3 deletions ee/clickhouse/test/test_journeys.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ def journeys_for(
for distinct_id, events in events_by_person.items():
if create_people:
people[distinct_id] = update_or_create_person(distinct_ids=[distinct_id], team_id=team.pk)
else:
people[distinct_id] = Person.objects.get(
persondistinctid__distinct_id=distinct_id, persondistinctid__team_id=team.pk
)

for event in events:
if "timestamp" not in event:
Expand All @@ -55,6 +59,13 @@ def journeys_for(
event=event["event"],
timestamp=event["timestamp"],
properties=event.get("properties", {}),
person_id=people[distinct_id].uuid,
person_properties=people[distinct_id].properties or {},
group0_properties=event.get("group0_properties", {}),
group1_properties=event.get("group1_properties", {}),
group2_properties=event.get("group2_properties", {}),
group3_properties=event.get("group3_properties", {}),
group4_properties=event.get("group4_properties", {}),
)
)

Expand All @@ -66,16 +77,26 @@ def journeys_for(
def _create_all_events(all_events: List[Dict]):
parsed = ""
for event in all_events:
data: Dict[str, Any] = {"properties": {}, "timestamp": timezone.now().strftime("%Y-%m-%d %H:%M:%S.%f")}
data: Dict[str, Any] = {
"properties": {},
"timestamp": timezone.now().strftime("%Y-%m-%d %H:%M:%S.%f"),
"person_id": "00000000-0000-0000-0000-000000000000",
"person_properties": {},
"group0_properties": {},
"group1_properties": {},
"group2_properties": {},
"group3_properties": {},
"group4_properties": {},
}
data.update(event)
in_memory_event = InMemoryEvent(**data)
parsed += f"""
('{str(uuid4())}', '{in_memory_event.event}', '{json.dumps(in_memory_event.properties)}', '{in_memory_event.timestamp}', {in_memory_event.team.pk}, '{in_memory_event.distinct_id}', '', '{timezone.now().strftime("%Y-%m-%d %H:%M:%S.%f")}', now(), 0)
('{str(uuid4())}', '{in_memory_event.event}', '{json.dumps(in_memory_event.properties)}', '{in_memory_event.timestamp}', {in_memory_event.team.pk}, '{in_memory_event.distinct_id}', '', '{in_memory_event.person_id}', '{json.dumps(in_memory_event.person_properties)}', '{json.dumps(in_memory_event.group0_properties)}', '{json.dumps(in_memory_event.group1_properties)}', '{json.dumps(in_memory_event.group2_properties)}', '{json.dumps(in_memory_event.group3_properties)}', '{json.dumps(in_memory_event.group4_properties)}', '{timezone.now().strftime("%Y-%m-%d %H:%M:%S.%f")}', now(), 0)
"""

sync_execute(
f"""
INSERT INTO {EVENTS_DATA_TABLE()} (uuid, event, properties, timestamp, team_id, distinct_id, elements_chain, created_at, _timestamp, _offset) VALUES
INSERT INTO {EVENTS_DATA_TABLE()} (uuid, event, properties, timestamp, team_id, distinct_id, elements_chain, person_id, person_properties, group0_properties, group1_properties, group2_properties, group3_properties, group4_properties, created_at, _timestamp, _offset) VALUES
{parsed}
"""
)
Expand All @@ -89,6 +110,13 @@ class InMemoryEvent:
team: Team
timestamp: str
properties: Dict
person_id: str
person_properties: Dict
group0_properties: Dict
group1_properties: Dict
group2_properties: Dict
group3_properties: Dict
group4_properties: Dict


def _create_event(**event):
Expand Down
5 changes: 5 additions & 0 deletions posthog/models/team.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ def _timezone_feature_flag_enabled(self) -> bool:
def behavioral_cohort_querying_enabled(self) -> bool:
return str(self.pk) in get_list(config.NEW_COHORT_QUERY_TEAMS)

@property
def actor_on_events_querying_enabled(self) -> bool:
enabled_teams = get_list(config.ENABLE_ACTOR_ON_EVENTS_TEAMS)
return str(self.pk) in enabled_teams or "all" in enabled_teams

def __str__(self):
if self.name:
return self.name
Expand Down
8 changes: 4 additions & 4 deletions posthog/queries/test/test_funnel.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,19 +278,19 @@ def test_funnel_person_prop(self):
self.assertEqual(result[1]["count"], 1)
self.assertEqual(result[2]["count"], 1)

@test_with_materialized_columns(["test_prop"])
@test_with_materialized_columns(["test_propX"])
def test_funnel_multiple_actions(self):
# we had an issue on clickhouse where multiple actions with different property filters would incorrectly grab only the last
# properties.
# This test prevents a regression
person_factory(distinct_ids=["person1"], team_id=self.team.pk)
event_factory(distinct_id="person1", event="event1", team=self.team)
event_factory(distinct_id="person1", event="event2", properties={"test_prop": "a"}, team=self.team)
event_factory(distinct_id="person1", event="event2", properties={"test_propX": "a"}, team=self.team)

action1 = Action.objects.create(team_id=self.team.pk, name="event2")
ActionStep.objects.create(action=action1, event="event2", properties=[{"key": "test_prop", "value": "a"}])
ActionStep.objects.create(action=action1, event="event2", properties=[{"key": "test_propX", "value": "a"}])
action2 = Action.objects.create(team_id=self.team.pk, name="event2")
ActionStep.objects.create(action=action2, event="event2", properties=[{"key": "test_prop", "value": "c"}])
ActionStep.objects.create(action=action2, event="event2", properties=[{"key": "test_propX", "value": "c"}])

result = Funnel(
filter=Filter(
Expand Down
5 changes: 5 additions & 0 deletions posthog/settings/dynamic_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
"Whether cohort calculations should use the new query or the old query.",
str,
),
"ENABLE_ACTOR_ON_EVENTS_TEAMS": (
get_from_env("ENABLE_ACTOR_ON_EVENTS_TEAMS", ""),
"Whether to use query path using person_id, person_properties, and group_properties on events or the old query",
str,
),
"AUTO_START_ASYNC_MIGRATIONS": (
get_from_env("AUTO_START_ASYNC_MIGRATIONS", False, type_cast=str_to_bool),
"Whether the earliest unapplied async migration should be triggered automatically on server startup.",
Expand Down
5 changes: 3 additions & 2 deletions posthog/test/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,11 +360,12 @@ class NonAtomicTestMigrations(BaseTestMigrations, NonAtomicBaseTest):


def flush_persons_and_events():
person_mapping = {}
if len(persons_cache_tests) > 0:
bulk_create_persons(persons_cache_tests)
person_mapping = bulk_create_persons(persons_cache_tests)
persons_cache_tests.clear()
if len(events_cache_tests) > 0:
bulk_create_events(events_cache_tests)
bulk_create_events(events_cache_tests, person_mapping)
events_cache_tests.clear()


Expand Down