To accomplish their commitment to delivering safer and more reliable transportation, Uber relies heavily on data-driven decisions at every level. Since 2014, they’ve developed a Big Data solution that ensures reliability, scalability, and ease-of-use, and are now focusing on increasing their platform’s speed and efficiency.
Let’s take a deep dive into Uber’s Big Data platform and learn how they’re expanding their ecosystem to become more reliable and efficient.
Generation 1: The Beginning
Before 2014, Uber’s limited data could fit into a few traditional online transaction processing (OLTP) databases (MySQL and PostgreSQL in their case).
To leverage data, engineers had to access each database or table individually, and users had to write their own code if they needed to combine data from different databases. There was no global access or global view of all stored data. Data was scattered across different OLTP databases. The total data size was on the order of a few terabytes (1,000 Gigabytes), and latency to access this data was very fast (often sub-minute).
Below is the architecture prior to 2014:
With Uber’s business growing exponentially (in terms of number of cities/countries they operated in, and the number of riders/drivers using the service in each city), the amount of incoming data increased along with it and the need to access and analyze all the data in one place required their first generation of an analytical data warehouse.
To be as data-driven as possible, they needed to ensure that analytical data was accessible to analysts, all in one place.
To achieve this, they first separated data users into 3 main categories:
- City operations teams (thousands of users): On-the-ground crews that manage and scale Uber’s transportation network in each market
- Data scientists and analysts (hundreds of users): Analysts and scientists spread across different functional groups that need data to help deliver the best possible transportation and delivery experience to users
- Engineering teams (hundreds of users): Engineers across the company focused on building automated data applications, such as Fraud Detection and Driver Onboarding platforms
Uber’s first generation analytical data warehouse focused on aggregating all of their data in one place, as well as streamlining data access. For the aggregation, they decided to use Vertica as their data warehouse software for its fast, scalable, and column-oriented design. They also developed multiple ad hoc ETL (Extract, Transfer, and Load) jobs that copied data from different sources (i.e. AWS S3, OLTP databases, service logs, etc.) into Vertica. To achieve streamlining data access, Uber standardized SQL as their solution’s interface and built an online query service to accept user queries and submit them to the underlying query engine.
Below is Uber’s First Generation analytical data warehouse:
The first generation of Uber’s Big Data platform allowed them to aggregate all of their data in one place and provide a standard SQL interface for users to access data. For the first time, users had a global view and could access all the data in one place. This resulted in a large number of new teams using data analysis as the foundation for their technology and product decisions.
Within a few months, the size of their analytical data grew to tens of terabytes and the number of users increased to several hundred. The use of SQL as a simple standard interface enabled city operators to easily interact with the data without knowing about the underlying technologies. In addition, different engineering teams started to build services and products tailored to user needs that were informed by this data (e.g. UberPool). As well, new teams were formed to better use and serve this data (i.e. machine learning and experimentation teams).
Since data was ingested through ad hoc ETL jobs and the data platform lacked a formal schema mechanism, data reliability became a concern: most of the source data was in JSON format, and ingestion jobs weren’t resilient to changes in the producer code.
As the company grew, scaling the data warehouse became increasingly expensive. To cut down costs, they started deleting older, obsolete data to free up space for new data. On top of this, much of Uber’s Big Data platform was not horizontally scalable (they couldn’t partition/separate the data — again) since the primary goal was to meet the critical business need for centralized data access. There was simply not enough time to ensure all parts were horizontally scalable.
Uber’s data warehouse was effectively being used as a data lake: piled up raw data, with data modeling being performed, and data being served, all in one place. ETL jobs that ingested data into the data warehouse were also very fragile due to the lack of a formal contract between the services producing the data and the downstream data consumers (the use of flexible JSON format resulted in the lack of schema enforcement for the source data).
Moreover, the same data could be ingested multiple times if different users performed different transformations during ingestion. This resulted in extra pressure on upstream data sources (i.e. online data stores) and affected their quality of service. This also resulted in multiple copies of almost the same data being stored in the warehouse, further increasing storage costs.
In the case of data quality issues, backfilling was very time-and-labor consuming because the ETL jobs were ad hoc and source-dependent and data projections and transformation were performed during ingestion. It was also difficult to ingest any new data sets and types due to lack of standardization in their ingestion jobs.
Generation 2: The Arrival of Hadoop
To address the limitations of the first generation platform, Uber re-architected it around the Hadoop ecosystem. Specifically, they introduced a Hadoop data lake:
- All raw data was ingested from different online data stores only once
- No transformations occurred during ingestion
This design shift significantly lowered the pressure on their online datastore and allowed them to transition from ad hoc ingestion jobs to a scalable ingestion platform. In order for users to access data in Hadoop, they introduced:
- Presto to enable interactive ad hoc user queries
- Apache Spark to facilitate programmatic access to raw data (in both SQL and non-SQL formats)
- Apache Hive to serve as the workhorse for extremely large queries
These different query engines allowed users to use the tools that best addressed their needs, making Uber’s platform more flexible and accessible. To keep the platform scalable, they ensured all data modeling and transformation only happened in Hadoop. This enabled fast backfilling and recovery when issues arose. Only the most critically modeled tables (i.e. those leveraged by city operators in real time to run pure, quick SQL queries) were transferred to Uber’s data warehouse. This significantly lowered the operational cost of running a huge data warehouse while also directing users to Hadoop-based query engines that were designed with their specific needs in mind.
Uber also leveraged the standard columnar file format of Apache Parquet. Since parquet is a columnar storage format (i.e. it stores data tables by column rather than by row), this resulted in storage savings and compute resource gains for serving analytical queries. Moreover, Parquet’s seamless integration with Apache Spark made this solution a popular choice for accessing Hadoop data.
Below is Uber’s second generation Big Data platform:
This second generation platform leveraged Hadoop to enable horizontal scaling by incorporating technologies such as Parquet, Spark, and Hive. In turn, tens of petabytes (1 petabyte = 1024 terabytes) of data was ingested, stored, and served. In addition to incorporating a Hadoop data lake, Uber also made all data services in this ecosystem horizontally scalable, thereby improving the efficiency and stability of its Big Data platform. Having this universal horizontal scalability to address immediate business needs allowed them to focus their energy on building the next generation of the data platform (as opposed to ad hoc problem solving).
Unlike the first generation, in which data pipelines were vulnerable to upstream data format changes, the second iteration of the Big Data platform allowed Uber to enforce a schema on all the data, transitioning it from JSON to Parquet to store schema and data together.
To accomplish this:
- They built a central schema service to collect, store, and serve schemas as well as different client libraries to integrate different services with this central schema service
- Fragile, ad hoc ingestion jobs were replaced with a standard platform to transfer all source data in its original, nested format into the Hadoop data lake
- Any required operations on and transformation of the data happened after ingestion via horizontally scalable batch jobs in Hadoop
With Uber’s business continuing to scale, they soon had tens of petabytes (millions of gigabytes!) of data:
- On a daily basis, there were tens of terabytes (thousands of gigabytes) of new data added to their data lake
- Their Big Data platform grew to over 10,000 vcores (virtual cores) with over 100,000 running batch jobs on any given day
- This resulted in Uber’s Hadoop data lake becoming the centralized source-of-truth for all their analytical data
A new set of challenges arose when scaling with tens of petabytes of data:
- The massive amount of small files stores in Uber’s HDFS (Hadoop Distributed File System)(which resulted from more data being ingested as well as more users writing ad hoc batch jobs which generated even more output data) began adding extra pressure on HDFS NameNodes (which are the centerpiece of an HDFS file system — they keep the directory tree of all files in the system)
- Data latency was still far from its business needs. New data was only accessible to users once every 24 hours (too slow to make real-time decisions). While moving ETL and modeling into Hadoop made this process more scalable, these steps were still bottlenecks since these ETL jobs had to recreate the entire modeled table in every run!
- Both ingestion of the new data and modeling of the related derived table were based on creating new snapshots of the entire dataset and swapping the old and new tables to provide users access to fresh data. Ingestion jobs had to return to the source datastore, create a new snapshot, and ingest or convert the entire data into consumable, columnar Parquet files during every run. With data stores growing, these jobs could take over 20 hours with over 1,000 Spark executors to run.
- A big part of each job involved converting both historical and new data from the latest snapshot. While only over 100 gigabytes of new data was added every day for each table, each run of the ingestion job had to convert the entire over-100-terabyte dataset for that specific table. This was also true for ETL and modeling jobs that recreated new derived tables on every run.
- Jobs had to rely on snapshot-based ingestion of the source data because of the high ratio of updates on historical data. By nature, Uber’s data contains a lot of update operations (e.g. rider and driver ratings, fare adjustments, etc.). Since HDFS and Parquet don’t support data updates, all ingestion jobs needed to: 1) create new snapshots from the updated source data, 2) ingest the new snapshot into Hadoop, 3) convert it into Parquet format, and 3) swap the output tables to view the new data.
Below is the Snapshot-based Data Ingestion within the platform:
Generation 3: Rebuilding for the Long Term
By early 2017, Uber’s Big Data platform was used by engineering and operations teams across the company, enabling them to access new and historical data all in one place. Users could easily access data in Hive, Presto, Spark, Vertica, Notebook, and more warehouse options all through a single UI (user-interface) portal tailored to their needs.
- Over 100 petabytes of data in HDFS
- 100,000 vcores in their compute cluster
- 100,000 Presto queries per day
- 10,000 Spark jobs per day, and
- 20,000 Hive queries per day
Uber’s Hadoop analytics architecture was hitting scalability limitations and many services were affected by high data latency. Since their underlying infrastructure was horizontally scalable to address the immediate business needs, they had enough time to study their data content, data access patterns, and user-specific requirements to identify the most pressing concerns before building the next generation of the platform.
Uber’s research revealed four main points:
1. HDFS scalability limitation
This is faced by many companies who rely on HDFS to scale their big data structures. By design, HDFS is bottlenecked by its NameNode capacity, so storing large numbers of small files can significantly affect performance. This limitation usually occurs when data size grows beyond ten petabytes and becomes a real issue beyond 50–100 petabytes.
There are relatively straightforward solutions to scale HDFS from a few tens to a few hundreds of petabytes:
- Leveraging ViewFS (a way to manage multiple Hadoop file system namespaces/NameNodes) and using HDFS NameNode Federation
- By controlling the number of small files and moving different parts of their data to separate clusters, Uber was able to mitigate this HDFS limitation
2. Faster data in Hadoop
Uber’s business operates in real time and as such, their services require access to data that is as fresh as possible. As a result, 24-hour latency was way too slow for many use cases and there was a huge demand for faster data delivery. Their second generation Big Data platform’s snapshot-based ingestion method was inefficient and prevented them from ingesting data with lower latency. To speed up data delivery, they had to re-architect their pipeline to the incremental ingestion of only updated and new data.
3. Support of updates and deletes in Hadoop and Parquet
Uber’s data contains a lot of updates, ranging in age from:
- The past few days (e.g. a rider or driver-partner adjusting a recent trip fare)
- A few weeks (e.g. a rider rating their last trip the next time they take a new trip)
- A few months (e.g. backfilling or adjusting past data due to a business need)
With snapshot-based ingestion of data, they ingest a fresh copy of this source data every 24 hours. In other words, they ingest all updates at one time, once per day.
With the need for fresher data and incremental ingestion, their solution had to be able to support update and delete operations for existing data. However, since Uber’s Big Data is stored in HDFS and Parquet, it wasn’t possible to directly support update operations on the existing data. On the other hand, their data contains extremely wide tables (1,000 columns per table) with five or more levels of nesting while user queries usually only touch a few of these columns, which prevented them from using non-columnar formats in a cost-efficient way.
To prepare their Big Data platform for long-term growth, they had to find a way to solve this limitation within their HDFS file system so that they can support update/delete operations too.
4. Faster ETL and modeling
Similar to raw data ingestion, ETL and modeling jobs were snapshot-based, requiring the platform to rebuild derived tables in every run. To reduce data latency for modeled tables, ETL jobs also needed to become incremental. This required the ETL jobs to incrementally pull out only the changed data from the raw source table and update the previous derived output table instead of rebuilding the entire output table every few hours.
With the above requirements in mind, Uber built Hadoop Upserts anD Incremental (Hudi), an open source Spark library that provides an abstraction layer on top of HDFS and Parquet to support the required update and delete operations.
As a result, any Big Data platform that needs to support update/delete operations for their historical data can leverage Hudi. Hudi enabled Uber to update, insert, and delete existing Parquet data in Hadoop. Hudi also allows data users to incrementally pull out only changed data, significantly improving query efficiency and allowing for incremental updates of derived modeled tables.
Raw data in Uber’s Hadoop ecosystem is partitioned based on time, and any of the old partitions can potentially receive updates at a later time. For a data user or an ETL job relying on these raw source tables, the only way to know what date partition contains updated data is to scan the entire source table and filter out records based on some known notion of time. This results in a computationally expensive query requiring a full source table scan and prevents ETL jobs from running very frequently.
With Hudi, users can simply pass on their last checkpoint timestamp and retrieve all the records that have been updated since, regardless of whether these updates are new records added to recent date partitions (e.g. a new trip happening today) or updates to older data (e.g. an updated trip from 6 months ago), without running an expensive query that scans the entire source table. Using the Hudi library, Uber was able to move away from the snapshot-based ingestion of raw data to an incremental ingestion model that enabled them to reduce data latency from 24 hours to less than one hour.
Below is Uber’s Hudi-incorporated Big Data platform:
The third generation of their Big Data platform incorporates faster, incremental data ingestion (using their open source Marmaray framework), as well as more efficient storage and serving of data via Hudi.
Generic Data Ingestion
Along with Hudi, Uber also formalized the hand-over of upstream datastore changes between the storage and big data teams through Apache Kafka. Upstream datastore events (as well as classic logging messages from different applications and services) stream into Kafka with a unified Avro encoding (JSON-formatted specifications and protocols) including standard global metadata headers (i.e. timestamp, row key, version, data center information, and originating host) attached. Both the Streaming and Big Data teams use these storage changelog events as their source input data for further processing.
Uber’s data ingestion platform, Marmaray, runs in mini-batches and picks up the upstream storage changelogs from Kafka, applying them on top of the existing data in Hadoop using Hudi. Again, Hudi supports upsert operations, allowing users to add new records and update or delete historical data. Ingestion Spark jobs run every 10–15 minutes, providing a 30-minute raw data latency in Hadoop (having headroom for 1–2 ingestion job failures or retries). To avoid inefficiencies resulting from ingesting the same source data into Hadoop more than once, the setup doesn’t allow any transformations during raw data ingestion. This results in the decision to make their raw data ingestion framework an EL platform as opposed to a traditional ETL platform. Under this model, users are encouraged to perform desired transformations within Hadoop and in batch mode after upstream data arrives in its raw nested format.
Since implementing these changes, Uber’s saved a significant amount of computational resources by avoiding unnecessary or inefficient ingestion operations. The reliability of their raw data also significantly improved, as they were able to avoid error-prone transformations during ingestion. Users could run their transformations on top of raw source data using any Big Data processing engine. In case of any issues, users could re-run their transformations again and still meet their SLAs (service level agreements) by using more compute resources and a higher degree of parallelism to finish the batch transformation jobs faster.
Incremental Data Modeling
Considering the large number of upstream data stores that need to be ingested into Hadoop (over 3,000 raw Hadoop tables as of 2017), Uber also built a generic ingestion platform that facilitated the ingestion of raw data into Hadoop in a unified and configurable way. Their Big Data platform updated raw Hadoop tables incrementally with a data latency of 10–15 minutes, allowing for fast access to source data.
To ensure that modeled tables were also available with low latency, inefficiencies had to be avoided (i.e. full derived table recreation or full source raw table scans) in the modeling ETL jobs too. Hudi allows ETL jobs to fetch only the changed data from the source table. Modeling jobs only need to pass a checkpoint timestamp to the Hudi reader during each iterative run to receive a stream of new or updated records from the raw source table (regardless of the data partition where the actual record is stored).
The use of a Hudi writer during an ETL job enabled Uber to update old partitions in the derived modeled tables without recreating the entire partition or table. As such, their modeling ETL jobs used Hudi readers to incrementally fetch only the changed data from the source table and used Hudi writers to incrementally update the derived output table.
ETL jobs finished in less than 30 minutes, providing end-to-end latency of less than one hour for all derived tables in Hadoop.
In order to provide data users of Hadoop tables with different options to access all data or only new or updated data, Hadoop raw tables using Hudi storage format provide two different reading modes:
- Latest mode view: Provides a holistic view on the entire Hadoop table at that point in time. This view includes the latest merged values for all records as well as all the existing records in a table.
- Incremental mode view: Fetches only the new and updated records from a specific Hadoop table based on a given timestamp. This view returns only the rows that have recently been inserted or have been updated since the latest checkpoint. If a specific row is updated more than once since the last checkpoint, this mode returns all these intermediate changed values (rather than just returning the latest merged one).
Below are the two reading views for all Hadoop tables stored in Hudi file format:
Users generally alternate between these two table views based on their needs:
- When they run an ad hoc query to analyze data based on the latest state, they use the latest mode view of the table (e.g. to fetch the total weekly number of trips per city in the U.S.).
- When a user has an iterative job or query that needs to fetch only changed or new records since its latest execution, they use the incremental mode view.
Both views are available for all Hadoop tables at all times.
Standardized Data Model
In addition to providing different views of the same table, Uber also standardized their data model to provide two types of tables for all raw Hadoop data:
- Changelog history table: Contains the history of all changelogs received for a specific upstream table. This table enabled users to scan through the history of changes for a given table and can be merged per key to provide the latest value for each row.
- Merged snapshot table: Housed the latest merged view of the upstream tables. This table contains the compacted merged view of all the historical changelogs received per key.
Below is how different Hive raw tables are generated for a specific upstream source datastore using the stream of given changelogs:
Standardizing the Hive data model improved data quality for Uber’s entire Big Data ecosystem. This model incorporated a merged snapshot table containing the latest values for reach row_key as well as a changelog history table containing the history of all value changes per each row_key.
The stream of changelogs may or may not contain the entire row (all columns) for a given key. While merged snapshot tables always provide all the columns for a specific key, the changelog history table may be sparse if the upstream of changelogs only provides partial row changelogs. This functionality improves efficiency by avoiding resending the entire row when only one or a few limited column values are changed. Should users want to fetch the changed values from the changelog history table and join it against the merged snapshot table to create the full row of data, included in the changelog history table is the data partition of the same key from the merged snapshot table. This allows the two tables to more efficiently join across a specific partition by avoiding a full table scan of the merged snapshot table when joining the two.
Below is the relationship between different components of the Big Data platform:
Building a more extensible data transfer platform allowed Uber to easily aggregate all data pipelines in a standard way under one service as well as support any-to-any connectivity between any data source and data sink (destination).
Generation 4: What’s Next?
Since the 3rd generation Big Data platform in 2017, users across the company can quickly and reliably access data in Hadoop, but there’s always room to grow.
Below is a summary of Uber’s ongoing efforts to enhance their Big Data platform for improved data quality, data latency, efficiency, scalability, and reliability.
To enhance data quality, Uber had to identify two key areas for improvement:
1. Avoid non-schema-conforming data
When some of the upstream data stores don’t mandatorily enforce or check data schema before storage (e.g. storing a key-value where the value is a JSON blob), this results in bad data entering the Hadoop ecosystem, thereby affecting all downstream users also relying on this data.
To prevent an influx of bad data, Uber is transitioning all upstream data stores towards performing mandatory schema checks on data content and rejecting data entries if there are any issues with the data (e.g. not confirming with the schema).
2. Quality of the actual data content
While using schemas ensures that data contains correct data types, they don’t check the actual data values (e.g. an integer as opposed to a positive number between [0, 150]).
To improve data quality, Uber is expanding its schema service to support semantic checks. These semantic checks (i.e. Uber-specific data types) allow them to add extra constraints on the actual data content beyond basic structural type-checking.
Uber is also aiming to reduce raw data latency in Hadoop to 5 minutes and data latency for modeled tables to 10 minutes. This will allow more use cases to move away from stream processing to more efficient mini-batch processing that uses Hudi’s incremental data pulls.
They’re also expanding their Hudi project to support an additional view mode, which will include the existing read-optimized view, as well as a new real-time view which shows data with latency of just a few minutes. This real-time view relies on an open source solution (and part of Hudi) called Merge-On-Read, or Hudi 2.0.
To improve data efficiency, Uber is moving away from relying on dedicated hardware for any of their services and towards service dockerization (converting to run within a Docker container — a standalone, executable package of software that includes everything needed to run an application).
In addition, they’re unifying all of their resource schedulers within and across their Hadoop ecosystem to bridge the gap between their Hadoop and non-data services across the company. This allows all jobs and services to be scheduled in a unified fashion regardless of the medium it will be executed in.
As Uber grows, data locality becomes a big concern for Hadoop applications, and a successful unified resource manager can bring together all existing schedulers (i.e. Yarn, Mesos, and Myriad).
Scalability and Reliability
As part of Uber’s effort to improve the scalability and reliability of their platform, they identified several issues related to possible edge cases.
While their ingestion platform was developed as a generic, pluggable model, the actual ingestion of upstream data still includes a lot of source-dependent pipeline configurations. This makes the ingestion pipeline fragile and increases the maintenance costs of operating several thousands of these pipelines.
To ensure they have unified data ingestion, regardless of the data source, Uber started a project in collaboration with its Data Storage team to unify the content, format, and metadata of the changelogs from all upstream data sources, regardless of their technological makeup. This project will ensure that information about these specific upstream technologies will only be an additional metadata added to the actual changelog value (as opposed to having totally different changelog content and metadata for different data sources) and data ingestion will happen regardless of the upstream source.
Their next-gen Hudi will allow them to generate much larger Parquet files (larger than the current 128-megabyte file size) by default within a few minutes for all of their data sources. It will also remove any sensitivities around the ratio of updates versus inserts.
Hudi 1.0 relies on a technique called copy-on-write that rewrites the entire source Parquet file whenever there is an updated record. This significantly increases the write amplification, especially when the ratio of update to insert increases, and prevents creation of larger Parquet files in HDFs.
The new version of Hudi is designed to overcome this limitation by storing the updated record in a separate delta file and asynchronously merging it with the base Parquet file based on a given policy (e.g. when there is enough updated data to reduce the cost of rewriting a large base Parquet file). Having Hadoop data stored in larger Parquet files as well as a more reliable source-independent data ingestion platform allows Uber’s analytical data platform to continue to grow as their business thrives.
The information contained in this article was inspired by and drawn from this article. I am NOT an employee of Uber, nor do I have access to their infrastructure.
I’m a Data Engineer always looking to broaden my knowledge and I’m passionate about Big Data. I also enjoy spreading the knowledge I gain in topics that I find interesting. This article is meant to do just that. I play video games, watch tv, and love to travel (when it’s safe to do so). I’m a happy father of 2 fur babies and when I’m not giving them attention, I’m blogging about data and big data infrastructures!