Skip to content

Commit

Permalink
feat(person-on-events): Enable CI to run using both old and new queri…
Browse files Browse the repository at this point in the history
…es (#9814)
  • Loading branch information
neilkakkar authored May 19, 2022
1 parent b5fa21e commit 9750b7d
Show file tree
Hide file tree
Showing 11 changed files with 135 additions and 20 deletions.
16 changes: 14 additions & 2 deletions .github/actions/run-backend-tests/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ inputs:
group:
required: true
type: number
person-on-events:
required: true
type: boolean

runs:
using: 'composite'
Expand Down Expand Up @@ -85,7 +88,7 @@ runs:
pytest -m "not ee and not async_migrations" posthog/ --cov --cov-report=xml:coverage-postgres.xml --durations=150 --durations-min=1.0
- name: Run ee/ tests
if: ${{ inputs.ee == 'true' }}
if: ${{ inputs.ee == 'true' && inputs.person-on-events != 'true'}}
shell: bash
run: |
pytest ee \
Expand All @@ -97,6 +100,15 @@ runs:
--snapshot-update\
--durations=100 --durations-min=1.0
- name: Run ee/ tests with person-on-events, no snapshot commits
if: ${{ inputs.ee == 'true' && inputs.person-on-events == 'true' }}
shell: bash
run: |
ENABLE_ACTOR_ON_EVENTS_TEAMS=all pytest ee \
--splits ${{ inputs.concurrency }} \
--group ${{ inputs.group }} \
--snapshot-update
- name: Run pytest.mark.ee tests
if: ${{ inputs.ee == 'true' && inputs.group == '1' }}
shell: bash
Expand All @@ -112,7 +124,7 @@ runs:

- name: Upload updated timing data as artifacts
uses: actions/upload-artifact@v2
if: ${{ inputs.ee == 'true' }}
if: ${{ inputs.ee == 'true' && inputs.person-on-events != 'true'}}
with:
name: timing_data-${{ inputs.group }}
path: .test_durations
48 changes: 46 additions & 2 deletions .github/workflows/ci-backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ jobs:
ee: [true]
foss: [false]
name: ['']
person-on-events: [false]
# :NOTE: Keep concurrency and group's in sync
concurrency: [5]
group: [1, 2, 3, 4, 5]
Expand All @@ -207,6 +208,48 @@ jobs:
clickhouse-server-image: 'yandex/clickhouse-server:21.6.5'
concurrency: 1
group: 1
# :TRICKY: Run person-on-events tests only for one CH instance
# But still run in parallel
- python-version: '3.8.12'
ee: true
foss: false
person-on-events: true
name: 'Person on Events'
clickhouse-server-image: 'clickhouse/clickhouse-server:21.11.11.1'
concurrency: 5
group: 1
- python-version: '3.8.12'
ee: true
foss: false
person-on-events: true
name: 'Person on Events'
clickhouse-server-image: 'clickhouse/clickhouse-server:21.11.11.1'
concurrency: 5
group: 2
- python-version: '3.8.12'
ee: true
foss: false
person-on-events: true
name: 'Person on Events'
clickhouse-server-image: 'clickhouse/clickhouse-server:21.11.11.1'
concurrency: 5
group: 3
- python-version: '3.8.12'
ee: true
foss: false
person-on-events: true
name: 'Person on Events'
clickhouse-server-image: 'clickhouse/clickhouse-server:21.11.11.1'
concurrency: 5
group: 4
- python-version: '3.8.12'
ee: true
foss: false
person-on-events: true
name: 'Person on Events'
clickhouse-server-image: 'clickhouse/clickhouse-server:21.11.11.1'
concurrency: 5
group: 5

steps:
- uses: actions/checkout@v2
Expand All @@ -222,12 +265,13 @@ jobs:
python-version: ${{ matrix.python-version }}
ee: ${{ matrix.ee }}
foss: ${{ matrix.foss }}
clickhouse-server-image-version: ${{ matrix.clickhouse-server-image-version }}
clickhouse-server-image: ${{ matrix.clickhouse-server-image }}
concurrency: ${{ matrix.concurrency }}
group: ${{ matrix.group }}
person-on-events: ${{ matrix.person-on-events }}

- uses: EndBug/add-and-commit@v9 # You can change this to use a specific version.
if: ${{ matrix.clickhouse-server-image-version == '21.11.11.1' }}
if: ${{ matrix.clickhouse-server-image == 'clickhouse/clickhouse-server:21.11.11.1' && !matrix.person-on-events }}
with:
# The arguments for the `git add` command (see the paragraph below for more info)
# Default: '.'
Expand Down
23 changes: 21 additions & 2 deletions ee/clickhouse/models/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def create_event(
return str(event_uuid)


def bulk_create_events(events: List[Dict[str, Any]]):
def bulk_create_events(events: List[Dict[str, Any]], person_mapping: Optional[Dict[str, Person]] = None) -> None:
"""
TEST ONLY
Insert events in bulk. List of dicts:
Expand Down Expand Up @@ -90,10 +90,22 @@ def bulk_create_events(events: List[Dict[str, Any]]):
elements_chain = elements_to_string(elements=event.get("elements")) # type: ignore

inserts.append(
"(%(uuid_{i})s, %(event_{i})s, %(properties_{i})s, %(timestamp_{i})s, %(team_id_{i})s, %(distinct_id_{i})s, %(elements_chain_{i})s, %(created_at_{i})s, now(), 0)".format(
"(%(uuid_{i})s, %(event_{i})s, %(properties_{i})s, %(timestamp_{i})s, %(team_id_{i})s, %(distinct_id_{i})s, %(elements_chain_{i})s, %(person_id_{i})s, %(person_properties_{i})s, %(group0_properties_{i})s, %(group1_properties_{i})s, %(group2_properties_{i})s, %(group3_properties_{i})s, %(group4_properties_{i})s, %(created_at_{i})s, now(), 0)".format(
i=index
)
)

#  use person properties mapping to populate person properties in given event
if person_mapping and person_mapping.get(event["distinct_id"]):
person_properties = person_mapping[event["distinct_id"]].properties
person_id = person_mapping[event["distinct_id"]].uuid

event = {
**event,
"person_properties": {**person_properties, **event.get("person_properties", {})},
"person_id": person_id,
}

event = {
"uuid": str(event["event_uuid"]) if event.get("event_uuid") else str(uuid.uuid4()),
"event": event["event"],
Expand All @@ -103,6 +115,13 @@ def bulk_create_events(events: List[Dict[str, Any]]):
"distinct_id": str(event["distinct_id"]),
"elements_chain": elements_chain,
"created_at": timestamp,
"person_id": event["person_id"] if event.get("person_id") else "00000000-0000-0000-0000-000000000000",
"person_properties": json.dumps(event["person_properties"]) if event.get("person_properties") else "{}",
"group0_properties": json.dumps(event["group0_properties"]) if event.get("group0_properties") else "{}",
"group1_properties": json.dumps(event["group1_properties"]) if event.get("group1_properties") else "{}",
"group2_properties": json.dumps(event["group2_properties"]) if event.get("group2_properties") else "{}",
"group3_properties": json.dumps(event["group3_properties"]) if event.get("group3_properties") else "{}",
"group4_properties": json.dumps(event["group4_properties"]) if event.get("group4_properties") else "{}",
}

params = {**params, **{"{}_{}".format(key, index): value for key, value in event.items()}}
Expand Down
4 changes: 4 additions & 0 deletions ee/clickhouse/models/person.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def person_distinct_id_deleted(sender, instance: PersonDistinctId, **kwargs):

def bulk_create_persons(persons_list: List[Dict]):
persons = []
person_mapping = {}
for _person in persons_list:
with ExitStack() as stack:
if _person.get("created_at"):
Expand All @@ -75,6 +76,7 @@ def bulk_create_persons(persons_list: List[Dict]):
PersonDistinctId(person_id=person.pk, distinct_id=distinct_id, team_id=person.team_id)
)
distinct_id_inserts.append(f"('{distinct_id}', '{person.uuid}', {person.team_id}, 0, 0, now(), 0, 0)")
person_mapping[distinct_id] = person

created_at = now().strftime("%Y-%m-%d %H:%M:%S.%f")
timestamp = now().strftime("%Y-%m-%d %H:%M:%S")
Expand All @@ -86,6 +88,8 @@ def bulk_create_persons(persons_list: List[Dict]):
sync_execute(INSERT_PERSON_BULK_SQL + ", ".join(person_inserts), flush=False)
sync_execute(BULK_INSERT_PERSON_DISTINCT_ID2 + ", ".join(distinct_id_inserts), flush=False)

return person_mapping


def create_person(
team_id: int,
Expand Down
5 changes: 1 addition & 4 deletions ee/clickhouse/queries/funnels/test/test_funnel.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from freezegun.api import freeze_time
from rest_framework.exceptions import ValidationError

from ee.clickhouse.materialized_columns import materialize
from ee.clickhouse.queries.funnels.funnel import ClickhouseFunnel
from ee.clickhouse.queries.funnels.funnel_persons import ClickhouseFunnelActors
from ee.clickhouse.queries.funnels.test.breakdown_cases import (
Expand Down Expand Up @@ -1084,6 +1083,7 @@ def test_funnel_exclusions_with_actions(self):
self._get_actor_ids_at_step(filter, 2), [person1.uuid, person3.uuid],
)

@test_with_materialized_columns(["test_prop"])
def test_funnel_with_denormalised_properties(self):
filters = {
"events": [
Expand All @@ -1101,8 +1101,6 @@ def test_funnel_with_denormalised_properties(self):
"date_to": "2020-01-14",
}

materialize("events", "test_prop")

filter = Filter(data=filters)
funnel = ClickhouseFunnel(filter, self.team)

Expand All @@ -1119,7 +1117,6 @@ def test_funnel_with_denormalised_properties(self):
team=self.team, event="paid", distinct_id="user_1", timestamp="2020-01-10T14:00:00Z",
)

self.assertNotIn("json", funnel.get_query().lower())
result = funnel.run()

self.assertEqual(result[0]["name"], "user signed up")
Expand Down
2 changes: 1 addition & 1 deletion ee/clickhouse/sql/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@

BULK_INSERT_EVENT_SQL = (
lambda: f"""
INSERT INTO {EVENTS_DATA_TABLE()} (uuid, event, properties, timestamp, team_id, distinct_id, elements_chain, created_at, _timestamp, _offset)
INSERT INTO {EVENTS_DATA_TABLE()} (uuid, event, properties, timestamp, team_id, distinct_id, elements_chain, person_id, person_properties, group0_properties, group1_properties, group2_properties, group3_properties, group4_properties, created_at, _timestamp, _offset)
VALUES
"""
)
Expand Down
34 changes: 31 additions & 3 deletions ee/clickhouse/test/test_journeys.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ def journeys_for(
for distinct_id, events in events_by_person.items():
if create_people:
people[distinct_id] = update_or_create_person(distinct_ids=[distinct_id], team_id=team.pk)
else:
people[distinct_id] = Person.objects.get(
persondistinctid__distinct_id=distinct_id, persondistinctid__team_id=team.pk
)

for event in events:
if "timestamp" not in event:
Expand All @@ -55,6 +59,13 @@ def journeys_for(
event=event["event"],
timestamp=event["timestamp"],
properties=event.get("properties", {}),
person_id=people[distinct_id].uuid,
person_properties=people[distinct_id].properties or {},
group0_properties=event.get("group0_properties", {}),
group1_properties=event.get("group1_properties", {}),
group2_properties=event.get("group2_properties", {}),
group3_properties=event.get("group3_properties", {}),
group4_properties=event.get("group4_properties", {}),
)
)

Expand All @@ -66,16 +77,26 @@ def journeys_for(
def _create_all_events(all_events: List[Dict]):
parsed = ""
for event in all_events:
data: Dict[str, Any] = {"properties": {}, "timestamp": timezone.now().strftime("%Y-%m-%d %H:%M:%S.%f")}
data: Dict[str, Any] = {
"properties": {},
"timestamp": timezone.now().strftime("%Y-%m-%d %H:%M:%S.%f"),
"person_id": "00000000-0000-0000-0000-000000000000",
"person_properties": {},
"group0_properties": {},
"group1_properties": {},
"group2_properties": {},
"group3_properties": {},
"group4_properties": {},
}
data.update(event)
in_memory_event = InMemoryEvent(**data)
parsed += f"""
('{str(uuid4())}', '{in_memory_event.event}', '{json.dumps(in_memory_event.properties)}', '{in_memory_event.timestamp}', {in_memory_event.team.pk}, '{in_memory_event.distinct_id}', '', '{timezone.now().strftime("%Y-%m-%d %H:%M:%S.%f")}', now(), 0)
('{str(uuid4())}', '{in_memory_event.event}', '{json.dumps(in_memory_event.properties)}', '{in_memory_event.timestamp}', {in_memory_event.team.pk}, '{in_memory_event.distinct_id}', '', '{in_memory_event.person_id}', '{json.dumps(in_memory_event.person_properties)}', '{json.dumps(in_memory_event.group0_properties)}', '{json.dumps(in_memory_event.group1_properties)}', '{json.dumps(in_memory_event.group2_properties)}', '{json.dumps(in_memory_event.group3_properties)}', '{json.dumps(in_memory_event.group4_properties)}', '{timezone.now().strftime("%Y-%m-%d %H:%M:%S.%f")}', now(), 0)
"""

sync_execute(
f"""
INSERT INTO {EVENTS_DATA_TABLE()} (uuid, event, properties, timestamp, team_id, distinct_id, elements_chain, created_at, _timestamp, _offset) VALUES
INSERT INTO {EVENTS_DATA_TABLE()} (uuid, event, properties, timestamp, team_id, distinct_id, elements_chain, person_id, person_properties, group0_properties, group1_properties, group2_properties, group3_properties, group4_properties, created_at, _timestamp, _offset) VALUES
{parsed}
"""
)
Expand All @@ -89,6 +110,13 @@ class InMemoryEvent:
team: Team
timestamp: str
properties: Dict
person_id: str
person_properties: Dict
group0_properties: Dict
group1_properties: Dict
group2_properties: Dict
group3_properties: Dict
group4_properties: Dict


def _create_event(**event):
Expand Down
5 changes: 5 additions & 0 deletions posthog/models/team.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ def _timezone_feature_flag_enabled(self) -> bool:
def behavioral_cohort_querying_enabled(self) -> bool:
return str(self.pk) in get_list(config.NEW_COHORT_QUERY_TEAMS)

@property
def actor_on_events_querying_enabled(self) -> bool:
enabled_teams = get_list(config.ENABLE_ACTOR_ON_EVENTS_TEAMS)
return str(self.pk) in enabled_teams or "all" in enabled_teams

def __str__(self):
if self.name:
return self.name
Expand Down
8 changes: 4 additions & 4 deletions posthog/queries/test/test_funnel.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,19 +278,19 @@ def test_funnel_person_prop(self):
self.assertEqual(result[1]["count"], 1)
self.assertEqual(result[2]["count"], 1)

@test_with_materialized_columns(["test_prop"])
@test_with_materialized_columns(["test_propX"])
def test_funnel_multiple_actions(self):
# we had an issue on clickhouse where multiple actions with different property filters would incorrectly grab only the last
# properties.
# This test prevents a regression
person_factory(distinct_ids=["person1"], team_id=self.team.pk)
event_factory(distinct_id="person1", event="event1", team=self.team)
event_factory(distinct_id="person1", event="event2", properties={"test_prop": "a"}, team=self.team)
event_factory(distinct_id="person1", event="event2", properties={"test_propX": "a"}, team=self.team)

action1 = Action.objects.create(team_id=self.team.pk, name="event2")
ActionStep.objects.create(action=action1, event="event2", properties=[{"key": "test_prop", "value": "a"}])
ActionStep.objects.create(action=action1, event="event2", properties=[{"key": "test_propX", "value": "a"}])
action2 = Action.objects.create(team_id=self.team.pk, name="event2")
ActionStep.objects.create(action=action2, event="event2", properties=[{"key": "test_prop", "value": "c"}])
ActionStep.objects.create(action=action2, event="event2", properties=[{"key": "test_propX", "value": "c"}])

result = Funnel(
filter=Filter(
Expand Down
5 changes: 5 additions & 0 deletions posthog/settings/dynamic_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
"Whether cohort calculations should use the new query or the old query.",
str,
),
"ENABLE_ACTOR_ON_EVENTS_TEAMS": (
get_from_env("ENABLE_ACTOR_ON_EVENTS_TEAMS", ""),
"Whether to use query path using person_id, person_properties, and group_properties on events or the old query",
str,
),
"AUTO_START_ASYNC_MIGRATIONS": (
get_from_env("AUTO_START_ASYNC_MIGRATIONS", False, type_cast=str_to_bool),
"Whether the earliest unapplied async migration should be triggered automatically on server startup.",
Expand Down
5 changes: 3 additions & 2 deletions posthog/test/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,11 +360,12 @@ class NonAtomicTestMigrations(BaseTestMigrations, NonAtomicBaseTest):


def flush_persons_and_events():
person_mapping = {}
if len(persons_cache_tests) > 0:
bulk_create_persons(persons_cache_tests)
person_mapping = bulk_create_persons(persons_cache_tests)
persons_cache_tests.clear()
if len(events_cache_tests) > 0:
bulk_create_events(events_cache_tests)
bulk_create_events(events_cache_tests, person_mapping)
events_cache_tests.clear()


Expand Down

0 comments on commit 9750b7d

Please sign in to comment.