Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tableau): use pagination for all connection queries #5204

Merged
merged 2 commits into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions metadata-ingestion/docs/sources/tableau/tableau.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ Workbooks from Tableau are ingested as Container in datahub. <br/>
- GraphQL query <br/>
```graphql
{
workbooksConnection(first: 15, offset: 0, filter: {projectNameWithin: ["default", "Project 2"]}) {
workbooksConnection(first: 10, offset: 0, filter: {projectNameWithin: ["default", "Project 2"]}) {
nodes {
id
name
Expand Down Expand Up @@ -73,7 +73,7 @@ Dashboards from Tableau are ingested as Dashboard in datahub. <br/>
- GraphQL query <br/>
```graphql
{
workbooksConnection(first: 15, offset: 0, filter: {projectNameWithin: ["default", "Project 2"]}) {
workbooksConnection(first: 10, offset: 0, filter: {projectNameWithin: ["default", "Project 2"]}) {
nodes {
.....
dashboards {
Expand Down Expand Up @@ -185,7 +185,7 @@ Embedded Data source from Tableau is ingested as a Dataset in datahub.
- GraphQL query <br/>
```graphql
{
workbooksConnection(first: 15, offset: 0, filter: {projectNameWithin: ["default"]}) {
workbooksConnection(first: 10, offset: 0, filter: {projectNameWithin: ["default"]}) {
nodes {
....
embeddedDatasources {
Expand Down Expand Up @@ -265,7 +265,7 @@ Published Data source from Tableau is ingested as a Dataset in datahub.
- GraphQL query <br/>
```graphql
{
publishedDatasourcesConnection(filter: {idWithin: ["00cce29f-b561-bb41-3557-8e19660bb5dd", "618c87db-5959-338b-bcc7-6f5f4cc0b6c6"]}) {
publishedDatasourcesConnection(first: 10, offset: 0, filter: {idWithin: ["00cce29f-b561-bb41-3557-8e19660bb5dd", "618c87db-5959-338b-bcc7-6f5f4cc0b6c6"]}) {
nodes {
__typename
id
Expand Down Expand Up @@ -343,7 +343,7 @@ For custom sql data sources, the query is viewable in UI under View Definition t
- GraphQL query <br/>
```graphql
{
customSQLTablesConnection(filter: {idWithin: ["22b0b4c3-6b85-713d-a161-5a87fdd78f40"]}) {
customSQLTablesConnection(first: 10, offset: 0, filter: {idWithin: ["22b0b4c3-6b85-713d-a161-5a87fdd78f40"]}) {
nodes {
id
name
Expand Down Expand Up @@ -408,8 +408,8 @@ Lineage is emitted as received from Tableau's metadata API for

## Troubleshooting

### Why are only some workbooks ingested from the specified project?
### Why are only some workbooks/custom SQLs/published datasources ingested from the specified project?

This may happen when the Tableau API returns NODE_LIMIT_EXCEEDED error in response to metadata query and returns partial results with message "Showing partial results. , The request exceeded the ‘n’ node limit. Use pagination, additional filtering, or both in the query to adjust results." To resolve this, consider
- reducing the page size using the `workbooks_page_size` config param in datahub recipe (Defaults to 10).
- reducing the page size using the `page_size` config param in datahub recipe (Defaults to 10).
- increasing tableau configuration [metadata query node limit](https://help.tableau.com/current/server/en-us/cli_configuration-set_tsm.htm#metadata_nodelimit) to higher value.
43 changes: 34 additions & 9 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union

import dateutil.parser as dp
from pydantic import validator
from pydantic import root_validator, validator
from pydantic.fields import Field
from tableauserverclient import (
PersonalAccessTokenAuth,
Expand Down Expand Up @@ -132,10 +132,16 @@ class TableauConfig(ConfigModel):
description="Ingest details for tables external to (not embedded in) tableau as entities.",
)

workbooks_page_size: int = Field(
workbooks_page_size: Optional[int] = Field(
default=None,
description="@deprecated(use page_size instead) Number of workbooks to query at a time using Tableau api.",
)

page_size: int = Field(
default=10,
description="Number of workbooks to query at a time using Tableau api.",
description="Number of metadata objects (e.g. CustomSQLTable, PublishedDatasource, etc) to query at a time using Tableau api.",
)

env: str = Field(
default=builder.DEFAULT_ENV,
description="Environment to use in namespace when constructing URNs.",
Expand All @@ -145,6 +151,17 @@ class TableauConfig(ConfigModel):
def remove_trailing_slash(cls, v):
return config_clean.remove_trailing_slashes(v)

@root_validator()
def show_warning_for_deprecated_config_field(
cls, values: Dict[str, Any]
) -> Dict[str, Any]:
if values.get("workbooks_page_size") is not None:
logger.warn(
"Config workbooks_page_size is deprecated. Please use config page_size instead."
)

return values


class WorkbookKey(PlatformKey):
workbook_id: str
Expand Down Expand Up @@ -247,6 +264,9 @@ def get_connection_object(
count: int = 0,
current_count: int = 0,
) -> Tuple[dict, int, int]:
logger.debug(
f"Query {connection_type} to get {count} objects with offset {current_count}"
)
query_data = query_metadata(
self.server, query, connection_type, count, current_count, query_filter
)
Expand All @@ -267,7 +287,12 @@ def get_connection_object(
has_next_page = connection_object.get("pageInfo", {}).get("hasNextPage", False)
return connection_object, total_count, has_next_page

def emit_workbooks(self, workbooks_page_size: int) -> Iterable[MetadataWorkUnit]:
def emit_workbooks(self) -> Iterable[MetadataWorkUnit]:
count_on_query = (
self.config.page_size
if self.config.workbooks_page_size is None
else self.config.workbooks_page_size
)

projects = (
f"projectNameWithin: {json.dumps(self.config.projects)}"
Expand All @@ -282,8 +307,8 @@ def emit_workbooks(self, workbooks_page_size: int) -> Iterable[MetadataWorkUnit]
current_count = 0
while has_next_page:
count = (
workbooks_page_size
if current_count + workbooks_page_size < total_count
count_on_query
if current_count + count_on_query < total_count
else total_count - current_count
)
(
Expand Down Expand Up @@ -410,7 +435,7 @@ def _create_upstream_table_lineage(
return upstream_tables

def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]:
count_on_query = len(self.custom_sql_ids_being_used)
count_on_query = self.config.page_size
custom_sql_filter = "idWithin: {}".format(
json.dumps(self.custom_sql_ids_being_used)
)
Expand Down Expand Up @@ -779,7 +804,7 @@ def emit_datasource(
)

def emit_published_datasources(self) -> Iterable[MetadataWorkUnit]:
count_on_query = len(self.datasource_ids_being_used)
count_on_query = self.config.page_size
datasource_filter = "idWithin: {}".format(
json.dumps(self.datasource_ids_being_used)
)
Expand Down Expand Up @@ -1148,7 +1173,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
if self.server is None or not self.server.is_signed_in():
return
try:
yield from self.emit_workbooks(self.config.workbooks_page_size)
yield from self.emit_workbooks()
if self.datasource_ids_being_used:
yield from self.emit_published_datasources()
if self.custom_sql_ids_being_used:
Expand Down