Giới thiệu Trino (Presto) — áp dụng cho bài toán làm report tại team Platform

Giới thiệu Trino (Presto) — áp dụng cho bài toán làm report tại team Platform

Hi mọi người, trong bài viết này, em sẽ giới thiệu đến mọi người một open-source (OP) về query engine là Trino.
(**NOTE: Presto và Trino là 2 OP có kiến trúc giống nhau, đều được phát triển bởi một nhóm tác giả là các cựu nhân viên Facebook. Tuy nhiên, hiện tại 2 OP này được phát triển độc lập (chi tiết https://trino.io/blog/2020/12/27/announcing-trino.html):

  • Presto do Facebook đứng đầu
  • Trino được Apache bảo trợ với các tác giả là người đứng đầu

I. Vấn đề

  • Hiện tại, team Platform đang là điểm cuối xử lý các report về campaign, insignt data từ team Data Analysis cũng như các đội sản phẩm. Các report này đi từ dễ với chỉ 1 phép count trên spark-shell đến các report phức tạp hơn yêu cầu viết spark-applciation với nhiều phép aggregate, join từ nhiều data source (chủ yếu là từ các cụm HDFS<logs>+ Elasticsearch<bài viết>). Có khá nhiều report (~ 50%) lại có chung một logic xử lý các bước trung gian nhưng input, output lại hòan toàn khác nhau dẫn tới team luôn phải sắp xếp một vài thành viên phải tiếp nhận, xử lý các report này. Điều này đôi lúc dẫn tới lãng phí nguồn lực khi phải thực hiện những công việc lặp lại.
  • Một phần do hệ thống hiện tại chưa có một data warehouse đủ tốt dẫn tới việc các team data analysis, data science muốn tự xử lý dữ liệu đều phải yêu cầu biết sử dụng apache spark hoặc yêu cầu tới các team khác để lấy dữ liệu. Ngoài ra, hiện tại trên hệ thống để có thể visualize dữ liệu cũng khá phức tạp, vì vậy, nếu có thể team cũng muốn triển khai tích hợp thêm các OP về BI như Superset
  • Ngoài ra, với vai trò là một thành viên mới join vào nhóm, việc nắm được các logs trong hệ thống hiện tại mất khá nhiều thời gian do các data source phân tán khá nhiều nơi và không có một data catagory tập trung.

=> Từ các vấn đề trên, team có nhiệm vụ xây dựng một hệ thống giúp giải phóng các bài toán reports khỏi việc phải viết Spark Application. Tốt hơn có thể giúp team data analysis cũng như các team khác có thể tự chủ được quá trình phân tích dữ liệu

II. Lựa chọn công nghệ

Ban đầu, team đề xuất khá nhiều các open source (OP) “lạ hoắc" có thể kể đến như: Apache Druid, Apache Drill, Apache Kylin, Apache Hive, Presto,… hay build thêm một Spark Application có thể tự parse câu SQL từ client truyền lên. Lúc này, nhiệm vụ cần phải đọc, thử nghiệm, đánh giá sự phù hợp hiệu quả để giải quyết vấn đề của nhóm. Sau khoảng 1 tháng, nhóm đã khá clear được ưu nhược điểm của các OP trên và chia đựơc thành các nhóm:

  • Processing Engine (MapReduce, Spark, Flink,…): Cung cấp khả năng xử lý mạnh mẽ, tùy chỉnh cao phù hợp để viết các pipeline
  • Query Engine (Pig, Presto, Drill,…): Dễ dàng tương tác với nhiều data source qua SQL
  • OLAP Database/Engine (Hive, Druid, Pinot, Kylin,…): tối ưu về latency

Từ việc phân nhóm và có thể nhận ra được các điểm mạnh, yếu đặc trưng của các OP, nhóm đã quyết định lựa chọn Trino để có thể triển khai cho hệ thống với những đặc điểm sau:

  • Hệ thống có thể linh hoạt, dễ dàng trong việc tích hợp các data source hiện có (nhiều cụm HDFS, Elasticsearch, MySQL,…) (không phụ thuộc vào storage => tránh duplicate dữ liệu đã có)
  • Có thể chấp nhận latency vì các report thường không yêu cầu kết quả ngay lập tức
  • Hỗ trợ ANSI SQL để mọi thành viên với base SQL có thể truy vẫn được dữ liệu
  • Cộng đồng lớn, có nguồn tài liệu đa dạng

(Như đã note ở đầu bài viết, Presto và Trino có kiến trúc giống hệt nhau, nhưng do chỉ có Trino có thể tích hợp cụm Elasticsearch có Authen nên nhóm đã lựa chọn Trino)

III. Các khái niệm và kiến trúc trong Trino

  • Server Types: Có hai loại server trong Trino là Coordinator và Worker trong đó:
    + Coordinator: là “bộ não” của cụm Trino, chịu trách nhiệm phân tích cú pháp các câu lệnh, lập kế hoạch truy vấn và quản lý các worker. Trong cụm Trino phải có một coordinator cùng với một hoặc nhiều worker (Có thể config coordinator và worker trên cùng 1 server vật lý với mục đích thử nghiệm). Với mỗi query, coordinator ra một mô hình logic của truy vấn gồm các stage (giai đoạn), sau đó được chuyển thành một chuỗi các task (nhiệm vụ) và chuyển đến để các worker thực thi. Coordinator giao tiếp với worker và clients bằng API REST.
    + Worker: chịu trách nhiệm thực thi các tác vụ và xử lý dữ liệu. Worker sẽ truy vấn dữ liệu từ các connector, data source đuợc chỉ định và trao đổi dữ liệu trung gian với nhau. Coordinator sẽ chỉ lấy kết quả từ worker và trả lại cho client mà không có nhiệm vụ tính toán. Khi khởi tạo, worker sẽ quảng bá (advertises) địa chỉ của mình tới coordinator để có thể tham gia cụm.
  • Data sources: Là các khái niệm liên quan tới cách trino quản lý dữ liệu
    + Connector: Trino được phát triển dựa theo mô hình connector-based architecture. Để kết nối với các data source bên ngoài sẽ cần có một connector như một cổng trung gian (plugin) giữa Trino và data source. Connector sẽ implement bộ các interface của Trino (Trino's SPI: Metadata API, Data Location API, Data Stream API, Data Statics API,…) để thực hiện 3 nhiệm vụ chính: Lấy schema/table metadata; đọc ghi dữ liệu từ data source; convert định dạng dữ liệu để Trino engine xử lý
    + Catalog: Một catalog đại điện cho một data source mà Trino kết nối tới. Một catalog sẽ luôn ứng với một connector cụ thể. Ví dụ khi kết nối tới Elasticsearch thì sẽ có một catalog elasticsearch và sử dụng elasticsearch connector
    + Schema: Là một nhóm các bảng trong từng catalog. Có thể so sánh nó với 1 database trong MySQL
    + Table: Là tập các rows (hàng) dữ liệu, tương tự như trong relational db hay index trong elasticsearch
  • Query execution model:
    + Statament là các câu lệnh text SQL do user viết và mong muốn thực thi. Mỗi câu statement sẽ được parse thành các câu query gồm nhiều các bước để Trino thực thi
    + Stage: Mỗi query có thể gồm một hoặc nhiều stage và được phân cấp dưới dạng cây để có thể thực hiện song song hay tuần tự. Mỗi stage là nhóm các task mà có thể xử lý trên từng worker mà chưa yêu câu trao đổi dữ liệu (khá tương tự với khái niệm stage trong spark)

+ Task: Là các đơn vị thực thi của stage trên từng worker. Mỗi task sẽ có input, output data được xử lý qua nhiều operator (như filter, join,…)
+ Split: Là một đơn vị data được task thực thi. Data sẽ được chia nhỏ thành từng split và đây là lý do Trino có thể xử lý được lượng dữ liệu lớn trên một lượng tài nguyên RAM giới hạn

  • Kiến trúc thực thi trong Trino
Giao tiếp các thành phần trong Trino (1)
Activity Diagram của Trino khi connect với HDFS (2)

Một vài note trong kiến trúc này:

  • Khi kết nối tới 1 data source, Trino engine thực thi tối thiểu 2 pharse: 1 để lấy metadata và tạo ra các stage (planning); 1 để truy vấn, xử lý data trên worker
  • Data sẽ được xử lý hoàn toàn trên RAM (nhưng vẫn có cơ chế lưu tạm xuống disk cho các phép toán aggregate, join)
  • Tùy thuộc từng data source mà cách giao tiếp của trino với data source khác nhau dựa trên implement của connector ứng với data source đó. Ví dụ, như sơ đồ (2) khi Trino kết nối với HDFS, sẽ phát sinh thêm 1 thành phần là Hive Meta Store (HMS), đây là server quản lý các shema, table tương ứng với các directory trong HDFS. Tuy nhiên, với những data source khác như MySQL hay ES, sẽ không phát sinh thêm các thành phần khác, các metadata sẽ được các data source đó trực tiếp cung cấp cho connector.
  • Tùy từng connector, có thể có một số toán tử sẽ được “push down” để thực thi tại chính data source. Điều này cần được chú ý để tối ưu khi viết query để tránh lãng phí tài nguyên. Đây cũng là một lý do quan trọng để nhóm lựa chọn Trino vì một phần các report yêu cầu tìm kiếm theo keyword trong các bài báo mà với dạng search text thì khó có data source nào tối ưu được như elasticsearch. Ví dụ với câu query sau tới PosgreSQL:
Phép filter (where) sẽ được xử lý tại PosgreSQL trước khi gửi lại worker để xử lý các phép aggregate khác (count)

IV. Mô hình triển khai hiện tại

Mô hình triển khai tại team Platform — Adtech (các khối cùng màu sẽ được triển khai trên cùng 1 server vật lý)

So với các thành phần cơ bản của Trino cluster, khi triển khai thực tế tại, team đã bổ sung thêm:

  • 1 load balancher cho cụm để đảm bảo khi coordinator chết, sẽ có HA sang coordinator còn lại.
  • 1 tầng Alluxio cho cache layer tránh việc đọc liên tục cùng một lượng lớn dữ liệu từ data source bên ngoài.
  • 1 tầng Superset client giúp truy vấn dữ liệu và visualize dữ liệu

Trên đây là hệ thống mà team Platform đã triển khai với Trino, vẫn còn khá nhiều vấn đề và cải tiến cần làm để có thể tối ưu cũng như vận hành ổn định có thể kể đến như:

  • Phân quyền chi tiết user dựa trên hệ thống LDAP đang có
  • Quản lý resource cho từng group user, tối ưu tài nguyên cụm sử dụng
  • Triển khai trên K8s để tối ưu tài nguyên nhưng vẫn có thể scale dễ dàng

Hi vọng qua bài viết này, nhóm có thể cung cấp thêm cho mọi người cái nhìn tổng quan về một query engine đang có tốc độ và tiềm năng phát triển mạnh mẽ gần đây. Nhóm cũng sẵn sàng tiếp nhận những đánh giá, nhận xét, góp ý để có thể cải thiện hệ thống và trả lời những thắc mắc của mọi người.
Chúc mọi người có một ngày làm việc hiệu quả!