Partition Registry is a platform-independent library designed for managing sources, providers, and partitions in various real-world scenarios. It is particularly useful for handling data storage and retrieval in a structured and efficient manner.
- Source: A place where data is stored, like a database table or an Excel file.
- Provider: An entity that supplies data to a source, such as an Airflow DAG or a Kubernetes job.
- Partition: A specific time interval associated with a data source.
- Distributed Airflow Management: Useful for Data Engineers managing isolated Airflow instances needing to be aware of each other's task states.
- Cron Job Coordination: Helps in monitoring data updates happening simultaneously across multiple cron jobs, ensuring data readiness for specific time intervals.
- Install Docker from Docker's official site.
- Navigate to the root folder:
./partition-registry
. - Execute
docker compose up
.
Docker will start three containers:
- PostgreSQL (default URL: http://localhost:5432/)
- Adminer (default URL: http://localhost:8080/)
- Web Application (default URL: http://localhost:5498/)
Adminer or any other DBMS can be used to view all created entities. Documentation for the Web Service is available at: http://localhost:5498/docs.
- Install
direnv
from Direnv's official site. - Install
pyenv
following instructions at Berkeley's GGKBase. - Go to the
partition-registry
folder. - Run
direnv allow
in the terminal. This step installs poetry and the required Python version. - Verify the
python3
path withwhich python3
- it should point to.../partition-registry/.venv/bin/python3
.
- Source Registration: Register your source to receive an access key for data provision.
- Endpoint:
/sources/register
- Endpoint:
- Provider Registration: Register a provider using an
access_token
to ensure authorized access to a source.- Endpoint:
/providers/register
- Endpoint:
- Partition Registration: Register partitions as atomic parts of the system.
- Endpoint:
/partitions/register
- Endpoint:
- Partition Lock/Unlock: Lock or unlock partitions to manage data availability.
- Lock Endpoint:
/partitions/lock
- Unlock Endpoint:
/partitions/unlock
- Lock Endpoint:
- Check Readiness: Verify if data in a source is ready for a specific period.
- Endpoint:
/sources/{source_name}/check_readiness
- Endpoint:
- Initial registration will create an object with Access Token.
- All following attempts to register source won't create an entity and won't provide Access Keys
import requests
from urllib.parse import urljoin
WEB_SERVICE_URL = "http://127.0.0.1:5498"
SOURCE_NAME = 'public.some_source'
SOURCE_OWNER = "[email protected]"
response = requests.post(
urljoin(WEB_SERVICE_URL, 'sources/register'),
params={"source_name": SOURCE_NAME, "owner": SOURCE_OWNER}
)
status_code = response.status_code
data = response.json()
- Access Key should be the same as specified after Source registration to have an access to this Source.
import requests
from urllib.parse import urljoin
WEB_SERVICE_URL = "http://127.0.0.1:5498"
PROVIDER_NAME = '[email protected]'
ACCESS_TOKEN = '...' # <--- Your Key to access the Source
response = requests.post(
urljoin(WEB_SERVICE_URL, 'providers/register'),
params={"provider_name": PROVIDER_NAME, "access_token": ACCESS_TOKEN}
)
status_code = response.status_code
data = response.json()
- Partition should be registered to start work with it
- If timezone is not provided - object will be converted to the timestamp with tz=UTC
- To Register Partition all entities like Source, Provider should be registered
import datetime as dt
from datetime import timedelta
from dateutil import tz
import requests
from urllib.parse import urljoin
WEB_SERVICE_URL = "http://127.0.0.1:5498"
PARTITION_START = dt.datetime(2000, 1, 1, 0, 0, 0, 0, tzinfo=tz.UTC)
PARTITION_END = dt.datetime(2000, 1, 2, 0, 0, 0, 0, tzinfo=tz.UTC)
SOURCE_NAME = 'public.some_source'
PROVIDER_NAME = '[email protected]'
response = requests.post(
urljoin(WEB_SERVICE_URL, 'partitions/register'),
params={
"start": str(PARTITION_START),
"end": str(PARTITION_END),
"source_name": SOURCE_NAME,
"provider_name": PROVIDER_NAME,
}
)
status_code = response.status_code
data = response.json()
- Partition can be locked or unlocked
- Partition shoul be registered before lock or unlock action
- If partition is locked for specific interval - all dependens will be notified that Source is not ready to consume for this period.
import datetime as dt
from datetime import timedelta
from dateutil import tz
import requests
from urllib.parse import urljoin
WEB_SERVICE_URL = "http://127.0.0.1:5498"
PARTITION_START = dt.datetime(2000, 1, 1, 0, 0, 0, 0, tzinfo=tz.UTC)
PARTITION_END = dt.datetime(2000, 1, 2, 0, 0, 0, 0, tzinfo=tz.UTC)
SOURCE_NAME = 'public.some_source'
PROVIDER_NAME = '[email protected]'
lock_response = requests.post(
urljoin(WEB_SERVICE_URL, 'partitions/unlock'),
params={
"start": str(PARTITION_START),
"end": str(PARTITION_END),
"source_name": SOURCE_NAME,
"provider_name": PROVIDER_NAME,
}
)
unlock_response = requests.post(
urljoin(WEB_SERVICE_URL, 'partitions/unlock'),
params={
"start": str(PARTITION_START),
"end": str(PARTITION_END),
"source_name": SOURCE_NAME,
"provider_name": PROVIDER_NAME,
}
)
status_code = unlock_response.status_code
lock_data = unlock_response.json()
status_code = lock_response.status_code
unlock_data = lock_response.json()
import datetime as dt
from datetime import timedelta
from dateutil import tz
import requests
from urllib.parse import urljoin
WEB_SERVICE_URL = "http://127.0.0.1:5498"
PARTITION_START = dt.datetime(2000, 1, 1, 0, 0, 0, 0, tzinfo=tz.UTC)
PARTITION_END = dt.datetime(2000, 1, 2, 0, 0, 0, 0, tzinfo=tz.UTC)
SOURCE_NAME = 'public.some_source'
PROVIDER_NAME = '[email protected]'
response = requests.get(
urljoin(WEB_SERVICE_URL, f'sources/{SOURCE_NAME}/check_readiness'),
params={
"source_name": str(SOURCE_NAME),
"start": str(PARTITION_START),
"end": str(PARTITION_END),
}
)
status_code = response.status_code
data = response.json()