Pinot is a realtime distributed OLAP datastore, which is used at LinkedIn to deliver scalable real time analytics with low latency. It can ingest data from offline data sources (such as Hadoop and flat files) as well as online sources (such as Kafka). Pinot is designed to scale horizontally.
Pinot is well suited for analytical use cases on immutable append-only data that require low latency between an event being ingested and it being available to be queried.
- A column-oriented database with various compression schemes such as Run Length, Fixed Bit Length
- Pluggable indexing technologies - Sorted Index, Bitmap Index, Inverted Index
- Ability to optimize query/execution plan based on query and segment metadata .
- Near real time ingestion from Kafka and batch ingestion from Hadoop
- SQL like language that supports selection, aggregation, filtering, group by, order by, distinct queries on fact data.
- Support for multivalued fields
- Horizontally scalable and fault tolerant
Because of the design choices we made to achieve these goals, there are certain limitations present in Pinot:
- Pinot is not a replacement for database i.e it cannot be used as source of truth store, cannot mutate data
- Not a replacement for search engine i.e Full text search, relevance not supported
- Query cannot span across multiple tables.
Pinot works very well for querying time series data with lots of Dimensions and Metrics. Example - Query (profile views, ad campaign performance, etc.) in an analytical fashion (who viewed this profile in the last weeks, how many ads were clicked per campaign).
Before we get to quick start, lets go over the terminology.
- Table: A table is a logical abstraction to refer to a collection of related data. It consists of columns and rows (Document). Table Schema defines column names and their metadata.
- Segment: A logical table is divided into multiple physical units referred to as segments.
Pinot has following Roles/Components:
- Pinot Controller: Manages nodes in the cluster. Responsibilities :
- Handles all Create, Update, Delete operations on Tables and Segments.
- Computes assignment of Table and its segments to Pinot Servers.
- Pinot Server: Hosts one or more physical segments. Responsibilities: -
- When assigned a pre created segment, download it and load it. If assigned a Kafka topic, start consuming from a sub set of partitions in Kafka.
- Executes queries and returns the response to Pinot Broker.
- Pinot Broker: Accepts queries from clients and routes them to multiple servers (based of routing strategy). All responses are merged and sent back to client.
Pinot leverages Apache Helix for cluster management.
For more information on Pinot Design and Architecture can be found here
git clone https://github.com/linkedin/pinot.git
cd pinot
mvn install package -DskipTests
cd pinot-distribution/target/pinot-0.016-pkg
chmod +x bin/*.sh
We will load BaseBall stats from 1878 to 2013 into Pinot and run queries against it. There are 100000 records and 15 columns (schema) in this dataset.
Execute the quick-start-offline.sh script in bin folder which performs the following:
- Converts Baseball data in CSV format into Pinot Index Segments.
- Starts Pinot components, Zookeeper, Controller, Broker, Server.
- Uploads segment to Pinot
bin/quick-start-offline.sh
We should see the following output.
Deployed Zookeeper
Deployed controller, broker and server
Added baseballStats schema
Creating baseballStats table
Built index segment for baseballStats
Pushing segments to the controller
At this point we can post queries. Here are some of the sample queries. Sample queries:
/*Total number of documents in the table*/
select count(*) from baseballStats limit 0
/*Top 5 run scorers of all time*/
select sum('runs') from baseballStats group by playerName top 5 limit 0
/*Top 5 run scorers of the year 2000*/
select sum('runs') from baseballStats where yearID=2000 group by playerName top 5 limit 0
/*Top 10 run scorers after 2000*/
select sum('runs') from baseballStats where yearID>=2000 group by playerName limit 0
/*Select playerName,runs,homeRuns for 10 records from the table and order them by yearID*/
select playerName,runs,homeRuns from baseballStats order by yearID limit 10
There are 3 ways to interact with Pinot - simple web interface, REST api and java client. Open your browser and go to http://localhost:9000/query and run any of the queries provided above. See Pinot Query Syntax for more info.
There are two ways to ingest data into Pinot - batch and realtime. Previous baseball stats demonstrated ingestion in batch. Typically these batch jobs are run on Hadoop periodically (e.g every hour/day/week/month). Data freshness depends on job granularity.
Lets look at an example where we ingest data in realtime. We will subscribe to meetup.com rsvp feed and index the rsvp events in real time. Execute quick-start-realtime.sh script in bin folder which performs the following:
- start a kafka broker
- setup a meetup event listener that subscribes to meetup.com stream and publishes it to local kafka broker
- start zookeeper, pinot controller, pinot broker, pinot-server.
- configure the realtime source
bin/quick-start-realtime.sh
Starting Kafka
Created topic "meetupRSVPEvents".
Starting controller, server and broker
Added schema and table
Realtime quick start setup complete
Starting meetup data stream and publishing to kafka
Open Pinot Query Console at http://localhost:9000/query and run queries. Here are some sample queries
/*Total number of documents in the table*/
select count(*) from meetupRsvp limit 0
/*Top 10 cities with the most rsvp*/
select sum(rsvp_count) from meetupRsvp group by group_city top 10 limit 0
/*Show 10 most recent rsvps*/
select * from meetupRsvp order by mtime limit 10
/*Show top 10 rsvp'ed events*/
select sum(rsvp_count) from meetupRsvp group by event_name top 10 limit 0
At LinkedIn, it powers more than 50+ applications such as Who Viewed My Profile, Who Viewed My Jobs and many more, with interactive-level response times. Pinot ingests close to a Billion per day in real time and processes 100 million queries per day.