Skip to content

Commit

Permalink
feat(tableau): use pagination for all connection queries (datahub-pro…
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate authored and maggiehays committed Aug 1, 2022
1 parent a8241b2 commit 2af6d21
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 16 deletions.
14 changes: 7 additions & 7 deletions metadata-ingestion/docs/sources/tableau/tableau.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ Workbooks from Tableau are ingested as Container in datahub. <br/>
- GraphQL query <br/>
```graphql
{
workbooksConnection(first: 15, offset: 0, filter: {projectNameWithin: ["default", "Project 2"]}) {
workbooksConnection(first: 10, offset: 0, filter: {projectNameWithin: ["default", "Project 2"]}) {
nodes {
id
name
Expand Down Expand Up @@ -73,7 +73,7 @@ Dashboards from Tableau are ingested as Dashboard in datahub. <br/>
- GraphQL query <br/>
```graphql
{
workbooksConnection(first: 15, offset: 0, filter: {projectNameWithin: ["default", "Project 2"]}) {
workbooksConnection(first: 10, offset: 0, filter: {projectNameWithin: ["default", "Project 2"]}) {
nodes {
.....
dashboards {
Expand Down Expand Up @@ -185,7 +185,7 @@ Embedded Data source from Tableau is ingested as a Dataset in datahub.
- GraphQL query <br/>
```graphql
{
workbooksConnection(first: 15, offset: 0, filter: {projectNameWithin: ["default"]}) {
workbooksConnection(first: 10, offset: 0, filter: {projectNameWithin: ["default"]}) {
nodes {
....
embeddedDatasources {
Expand Down Expand Up @@ -265,7 +265,7 @@ Published Data source from Tableau is ingested as a Dataset in datahub.
- GraphQL query <br/>
```graphql
{
publishedDatasourcesConnection(filter: {idWithin: ["00cce29f-b561-bb41-3557-8e19660bb5dd", "618c87db-5959-338b-bcc7-6f5f4cc0b6c6"]}) {
publishedDatasourcesConnection(first: 10, offset: 0, filter: {idWithin: ["00cce29f-b561-bb41-3557-8e19660bb5dd", "618c87db-5959-338b-bcc7-6f5f4cc0b6c6"]}) {
nodes {
__typename
id
Expand Down Expand Up @@ -343,7 +343,7 @@ For custom sql data sources, the query is viewable in UI under View Definition t
- GraphQL query <br/>
```graphql
{
customSQLTablesConnection(filter: {idWithin: ["22b0b4c3-6b85-713d-a161-5a87fdd78f40"]}) {
customSQLTablesConnection(first: 10, offset: 0, filter: {idWithin: ["22b0b4c3-6b85-713d-a161-5a87fdd78f40"]}) {
nodes {
id
name
Expand Down Expand Up @@ -408,8 +408,8 @@ Lineage is emitted as received from Tableau's metadata API for

## Troubleshooting

### Why are only some workbooks ingested from the specified project?
### Why are only some workbooks/custom SQLs/published datasources ingested from the specified project?

This may happen when the Tableau API returns NODE_LIMIT_EXCEEDED error in response to metadata query and returns partial results with message "Showing partial results. , The request exceeded the ‘n’ node limit. Use pagination, additional filtering, or both in the query to adjust results." To resolve this, consider
- reducing the page size using the `workbooks_page_size` config param in datahub recipe (Defaults to 10).
- reducing the page size using the `page_size` config param in datahub recipe (Defaults to 10).
- increasing tableau configuration [metadata query node limit](https://help.tableau.com/current/server/en-us/cli_configuration-set_tsm.htm#metadata_nodelimit) to higher value.
43 changes: 34 additions & 9 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union

import dateutil.parser as dp
from pydantic import validator
from pydantic import root_validator, validator
from pydantic.fields import Field
from tableauserverclient import (
PersonalAccessTokenAuth,
Expand Down Expand Up @@ -132,10 +132,16 @@ class TableauConfig(ConfigModel):
description="Ingest details for tables external to (not embedded in) tableau as entities.",
)

workbooks_page_size: int = Field(
workbooks_page_size: Optional[int] = Field(
default=None,
description="@deprecated(use page_size instead) Number of workbooks to query at a time using Tableau api.",
)

page_size: int = Field(
default=10,
description="Number of workbooks to query at a time using Tableau api.",
description="Number of metadata objects (e.g. CustomSQLTable, PublishedDatasource, etc) to query at a time using Tableau api.",
)

env: str = Field(
default=builder.DEFAULT_ENV,
description="Environment to use in namespace when constructing URNs.",
Expand All @@ -145,6 +151,17 @@ class TableauConfig(ConfigModel):
def remove_trailing_slash(cls, v):
return config_clean.remove_trailing_slashes(v)

@root_validator()
def show_warning_for_deprecated_config_field(
cls, values: Dict[str, Any]
) -> Dict[str, Any]:
if values.get("workbooks_page_size") is not None:
logger.warn(
"Config workbooks_page_size is deprecated. Please use config page_size instead."
)

return values


class WorkbookKey(PlatformKey):
workbook_id: str
Expand Down Expand Up @@ -247,6 +264,9 @@ def get_connection_object(
count: int = 0,
current_count: int = 0,
) -> Tuple[dict, int, int]:
logger.debug(
f"Query {connection_type} to get {count} objects with offset {current_count}"
)
query_data = query_metadata(
self.server, query, connection_type, count, current_count, query_filter
)
Expand All @@ -267,7 +287,12 @@ def get_connection_object(
has_next_page = connection_object.get("pageInfo", {}).get("hasNextPage", False)
return connection_object, total_count, has_next_page

def emit_workbooks(self, workbooks_page_size: int) -> Iterable[MetadataWorkUnit]:
def emit_workbooks(self) -> Iterable[MetadataWorkUnit]:
count_on_query = (
self.config.page_size
if self.config.workbooks_page_size is None
else self.config.workbooks_page_size
)

projects = (
f"projectNameWithin: {json.dumps(self.config.projects)}"
Expand All @@ -282,8 +307,8 @@ def emit_workbooks(self, workbooks_page_size: int) -> Iterable[MetadataWorkUnit]
current_count = 0
while has_next_page:
count = (
workbooks_page_size
if current_count + workbooks_page_size < total_count
count_on_query
if current_count + count_on_query < total_count
else total_count - current_count
)
(
Expand Down Expand Up @@ -410,7 +435,7 @@ def _create_upstream_table_lineage(
return upstream_tables

def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]:
count_on_query = len(self.custom_sql_ids_being_used)
count_on_query = self.config.page_size
custom_sql_filter = "idWithin: {}".format(
json.dumps(self.custom_sql_ids_being_used)
)
Expand Down Expand Up @@ -779,7 +804,7 @@ def emit_datasource(
)

def emit_published_datasources(self) -> Iterable[MetadataWorkUnit]:
count_on_query = len(self.datasource_ids_being_used)
count_on_query = self.config.page_size
datasource_filter = "idWithin: {}".format(
json.dumps(self.datasource_ids_being_used)
)
Expand Down Expand Up @@ -1148,7 +1173,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
if self.server is None or not self.server.is_signed_in():
return
try:
yield from self.emit_workbooks(self.config.workbooks_page_size)
yield from self.emit_workbooks()
if self.datasource_ids_being_used:
yield from self.emit_published_datasources()
if self.custom_sql_ids_being_used:
Expand Down

0 comments on commit 2af6d21

Please sign in to comment.