Google BigQuery đã cách mạng hóa cách các tổ chức xử lý phân tích dữ liệu quy mô lớn. Kiến trúc không máy chủ, khả năng mở rộng và giao diện SQL quen thuộc của nó làm cho nó trở thành một công cụ mạnh mẽ để khai thác thông tin từ các tập dữ liệu khổng lồ. Mặc dù việc tương tác với BigQuery thông qua Google Cloud Console hoặc công cụ dòng lệnh bq
là phổ biến, sức mạnh thực sự của tự động hóa, tích hợp và phát triển ứng dụng tùy chỉnh được mở khóa thông qua bộ API (Giao diện lập trình ứng dụng) toàn diện của nó.
Hướng dẫn này cung cấp một khám phá từng bước về cách sử dụng BigQuery APIs, cho phép bạn tương tác với kho dữ liệu của mình một cách lập trình, xây dựng đường ống dữ liệu, tích hợp BigQuery vào các ứng dụng của bạn và quản lý tài nguyên một cách hiệu quả. Chúng tôi sẽ đề cập đến các loại API khác nhau có sẵn, cách thiết lập môi trường của bạn, các ví dụ thực tiễn sử dụng thư viện khách hàng Python và giới thiệu các API chuyên biệt cho các trường hợp sử dụng nâng cao.
Bạn muốn một nền tảng tích hợp, Tất cả trong Một để nhóm phát triển của bạn làm việc cùng nhau với năng suất tối đa?
Apidog đáp ứng tất cả các yêu cầu của bạn, và thay thế Postman với mức giá dễ chịu hơn!

Hiểu biết về BigQuery APIs
Trước khi bắt tay vào mã, điều quan trọng là phải hiểu các khái niệm cơ bản và các cách khác nhau mà bạn có thể tương tác với BigQuery một cách lập trình.
Các khái niệm cơ bản về BigQuery:
- Dự án: Bộ chứa cấp cao nhất trong Google Cloud, giữ tất cả các tài nguyên của bạn, bao gồm các tập dữ liệu BigQuery, bảng và công việc. Thanh toán và quyền thường được quản lý ở cấp độ dự án.
- Tập dữ liệu: Một bộ chứa cho các bảng và chế độ xem trong một dự án cụ thể. Các tập dữ liệu giúp tổ chức dữ liệu của bạn và kiểm soát quyền truy cập. Hãy nghĩ về chúng như các sơ đồ hoặc cơ sở dữ liệu trong các hệ thống truyền thống.
- Bảng: Lưu trữ dữ liệu có cấu trúc của bạn theo hàng và cột với một sơ đồ được định nghĩa. BigQuery hỗ trợ các loại dữ liệu khác nhau và các trường lồng ghép/lặp lại.
- Công việc: Đại diện cho các hành động bất đồng bộ được thực hiện trong BigQuery, chẳng hạn như chạy các truy vấn, tải dữ liệu, xuất dữ liệu hoặc sao chép các bảng. Các API cho phép bạn bắt đầu, theo dõi và quản lý những công việc này.
Các loại BigQuery APIs:
BigQuery cung cấp một số cách để tương tác với các dịch vụ của nó một cách lập trình:
REST API: Đây là API cơ bản được xây dựng trên HTTP và JSON. Nó cung cấp quyền truy cập trực tiếp vào các tài nguyên và hoạt động của BigQuery. Bạn có thể tương tác với nó bằng các yêu cầu HTTP tiêu chuẩn (GET, POST, PUT, DELETE) nhắm vào các điểm cuối cụ thể (ví dụ: https://bigquery.googleapis.com/bigquery/v2/projects/{projectId}/datasets
). Dù mạnh mẽ và cung cấp quyền kiểm soát chi tiết, việc sử dụng REST API trực tiếp yêu cầu xử lý xác thực, định dạng yêu cầu, phân tích phản hồi và xử lý lỗi thủ công. Xác thực thường liên quan đến các mã thông báo truy cập OAuth 2.0.
Thư viện Client: Google cung cấp các thư viện client cấp cao cho nhiều ngôn ngữ lập trình phổ biến (bao gồm Python, Java, Go, Node.js, C#, PHP, Ruby). Các thư viện này bọc API REST cơ bản, cung cấp trải nghiệm lập trình viên thân thiện hơn, đơn giản hóa các tác vụ thông thường, xử lý xác thực (thường tự động thông qua Chứng chỉ Mặc định của Ứng dụng), quản lý việc thử lại và giảm số lượng mã khởi tạo bạn cần viết. Đây là cách tiếp cận được khuyến nghị cho hầu hết các phát triển ứng dụng.
Các API chuyên biệt: Đối với các tác vụ hiệu suất cao hoặc chuyên biệt, BigQuery cung cấp các API dành riêng:
- BigQuery Storage Read API: Được thiết kế cho việc đọc dữ liệu có thông lượng cao trực tiếp từ lưu trữ được quản lý của BigQuery. Nó nhanh hơn nhiều so với xuất kết quả truy vấn tiêu chuẩn, đặc biệt đối với các tập dữ liệu lớn, và tích hợp tốt với các framework xử lý dữ liệu như Apache Spark, Apache Beam và Pandas. Nó cho phép đọc song song thông qua nhiều luồng.
- BigQuery Connection API: Cho phép bạn tạo và quản lý các kết nối đến các nguồn dữ liệu bên ngoài, chẳng hạn như Cloud SQL, Spanner hoặc Cloud Storage. Điều này cho phép bạn truy vấn dữ liệu trong các hệ thống bên ngoài này trực tiếp từ BigQuery bằng cách sử dụng hàm
EXTERNAL_QUERY
, mà không cần phải tải dữ liệu vào BigQuery trước. - Analytics Hub API: Tạo điều kiện cho việc chia sẻ dữ liệu an toàn và quy mô giữa các tổ chức hoặc bộ phận kinh doanh khác nhau. Nó cho phép bạn tạo "Mục" - các tham chiếu có thể chia sẻ đến các tập dữ liệu - trong "Sàn giao dịch Dữ liệu". Các bên tiêu dùng có thể đăng ký những mục này để truy cập dữ liệu được chia sẻ trong các dự án của riêng họ.
- BigLake API: Được sử dụng để quản lý các bảng BigLake. BigLake cho phép bạn tạo các bảng trong BigQuery hoạt động trên dữ liệu được lưu trữ ở các định dạng mở (như Parquet, ORC, Avro) trong các hồ dữ liệu (chủ yếu là Google Cloud Storage). Điều này cung cấp một giao diện thống nhất để truy vấn giữa cả lưu trữ BigQuery được quản lý và lưu trữ hồ dữ liệu với sự bảo mật và quản lý nhất quán.
Thiết lập Môi trường của Bạn
Trước khi bạn có thể bắt đầu thực hiện các cuộc gọi API, bạn cần cấu hình môi trường cục bộ hoặc máy chủ của mình.
Các yêu cầu tiên quyết:
- Tài khoản Google Cloud: Bạn cần một tài khoản Google Cloud hoạt động.
- Dự án Google Cloud: Tạo một dự án mới hoặc chọn một dự án hiện có trong Google Cloud Console. Lưu ý ID Dự án của bạn.
- Bật BigQuery API: Đảm bảo API BigQuery đã được bật cho dự án của bạn. Bạn có thể làm điều này thông qua Cloud Console (APIs & Services > Thư viện > Tìm kiếm "BigQuery API" > Bật). Bạn có thể cũng cần phải bật các API khác như BigQuery Storage Read API hoặc BigQuery Connection API tùy thuộc vào trường hợp sử dụng của bạn.
- Thanh toán: Đảm bảo tính năng thanh toán được bật cho dự án của bạn. Các hoạt động BigQuery phát sinh phí dựa trên dung lượng lưu trữ dữ liệu, phân tích đã được xử lý và các lượt chèn luồng.
Xác thực:
Ứng dụng của bạn cần thực hiện xác thực với Google Cloud để chứng minh danh tính và quyền truy cập tài nguyên BigQuery. Phương pháp được khuyến nghị cho hầu hết các trường hợp là Chứng chỉ Mặc định của Ứng dụng (ADC).
- Chứng chỉ Mặc định của Ứng dụng (ADC): Chiến lược này cho phép ứng dụng của bạn tìm thấy chứng chỉ tự động dựa trên môi trường mà nó đang chạy, mà không cần phải chèn trực tiếp các mã khóa vào mã.
- Phát triển Cục bộ: Cài đặt Google Cloud CLI (
gcloud
). Chạy lệnhgcloud auth application-default login
. Điều này sẽ mở một cửa sổ trình duyệt để bạn đăng nhập bằng tài khoản Google của mình, cấp quyền truy cập cho SDK với chứng chỉ của bạn. Các thư viện khách hàng sẽ tự động phát hiện các chứng chỉ này. - Compute Engine, Cloud Functions, App Engine, v.v: ADC tự động sử dụng tài khoản dịch vụ liên quan đến tài nguyên mà mã của bạn đang chạy.
- Tài khoản Dịch vụ: Đối với các ứng dụng chạy bên ngoài Google Cloud hoặc yêu cầu quyền truy cập cụ thể, bạn có thể sử dụng các tài khoản dịch vụ.
- Tạo một tài khoản dịch vụ trong Cloud Console (IAM & Admin > Tài khoản Dịch vụ).
- Cấp các vai trò BigQuery cần thiết (ví dụ:
BigQuery Data Editor
,BigQuery Job User
,BigQuery User
) cho tài khoản dịch vụ. - Tải xuống tệp khóa tài khoản dịch vụ (định dạng JSON).
- Đặt biến môi trường
GOOGLE_APPLICATION_CREDENTIALS
thành đường dẫn tuyệt đối của tệp khóa JSON đã tải xuống. Các thư viện khách hàng sẽ tự động sử dụng tệp khóa này để xác thực nếu biến môi trường được thiết lập.
Cài đặt Thư viện Khách hàng (Ví dụ Python):
Chúng tôi sẽ tập trung vào Python trong các ví dụ của mình. Bạn có thể cài đặt các thư viện cần thiết bằng cách sử dụng pip
:
pip install google-cloud-bigquery
# Tùy chọn: Cài đặt thư viện API lưu trữ để đọc nhanh hơn
pip install google-cloud-bigquery-storage
# Tùy chọn: Cài đặt tích hợp pandas và db-dtypes để xử lý kiểu tốt hơn
pip install pandas db-dtypes pyarrow
Đảm bảo bạn đã cài đặt Python (phiên bản 3.7+ được khuyến nghị cho các tính năng thư viện mới nhất).
Sử dụng Thư viện Khách hàng BigQuery (Các Ví dụ Python)
Giờ đây, hãy cùng khám phá các thao tác BigQuery thông thường sử dụng thư viện Python google-cloud-bigquery
.
1. Nhập và Khởi tạo Khách hàng:
Đầu tiên, nhập thư viện. Sau đó, tạo một phiên bản khách hàng. Nếu ADC đã được cấu hình chính xác, khách hàng sẽ thực hiện xác thực tự động.
from google.cloud import bigquery
import pandas as pd
# Tạo một đối tượng khách hàng BigQuery.
# Nếu GOOGLE_APPLICATION_CREDENTIALS được thiết lập, nó sử dụng tài khoản dịch vụ.
# Nếu đã chạy gcloud auth application-default login, nó sử dụng các chứng chỉ đó.
# Nếu chạy trên hạ tầng GCP, nó sử dụng tài khoản dịch vụ của phiên bản.
client = bigquery.Client()
# Bạn có thể chỉ định ID dự án một cách rõ ràng nếu cần,
# nếu không, nó thường suy luận từ môi trường/chứng chỉ ADC.
# client = bigquery.Client(project='your-project-id')
print("Khách hàng được tạo thành công.")
2. Chạy Truy vấn:
Hoạt động phổ biến nhất là chạy các truy vấn SQL.
- Truy vấn Đồng bộ Đơn giản: Dành cho các truy vấn được mong đợi hoàn thành nhanh chóng.
query_and_wait()
gửi truy vấn và đợi kết quả.
# Định nghĩa truy vấn SQL của bạn
query = """
SELECT name, SUM(number) as total_people
FROM `bigquery-public-data.usa_names.usa_1910_2013`
WHERE state = 'TX'
GROUP BY name, state
ORDER BY total_people DESC
LIMIT 10
"""
# Thực hiện một yêu cầu API và đợi công việc hoàn thành.
query_job = client.query(query) # Yêu cầu API
print(f"Bắt đầu công việc: {query_job.job_id}")
# Đợi công việc hoàn thành và nhận kết quả.
rows = query_job.result() # Đợi công việc hoàn thành.
print("\nTop 10 tên ở TX (1910-2013):")
for row in rows:
# Các giá trị hàng có thể được truy cập bằng tên trường hoặc chỉ số.
print(f"Tên: {row.name}, Số lượng: {row['total_people']}") # Truy cập theo thuộc tính hoặc khóa
# Chuyển đổi kết quả thành Pandas DataFrame
df = rows.to_dataframe()
print("\nKết quả dưới dạng Pandas DataFrame:")
print(df.head())
- Truy vấn Bất đồng bộ: Đối với các truy vấn chạy lâu, bắt đầu công việc và kiểm tra trạng thái của nó sau.
query = """
SELECT corpus, COUNT(word) as distinct_words
FROM `bigquery-public-data.samples.shakespeare`
GROUP BY corpus
ORDER BY distinct_words DESC;
"""
job_config = bigquery.QueryJobConfig(
# Sử dụng cú pháp SQL chuẩn cho các truy vấn.
use_legacy_sql=False
)
# Bắt đầu truy vấn, truyền vào cấu hình bổ sung.
query_job = client.query(query, job_config=job_config) # Không chờ
print(f"Bắt đầu công việc bất đồng bộ: {query_job.job_id}")
# --- Sau đó trong ứng dụng của bạn ---
# Kiểm tra trạng thái công việc (tùy chọn)
# from google.cloud.exceptions import NotFound
# try:
# job = client.get_job(query_job.job_id, location=query_job.location)
# print(f"Trạng thái công việc {job.job_id}: {job.state}")
# if job.state == "DONE":
# if job.error_result:
# print(f"Công việc thất bại: {job.errors}")
# else:
# results = job.result() # Nhận kết quả
# print("Kết quả đã được thu thập.")
# # Xử lý kết quả...
# except NotFound:
# print(f"Công việc {query_job.job_id} không được tìm thấy.")
# Hoặc đơn giản là đợi hoàn thành khi cần thiết
results = query_job.result() # Điều này sẽ chặn cho đến khi công việc hoàn thành
print("Công việc bất đồng bộ hoàn thành.")
for row in results:
print(f"Corpus: {row.corpus}, Số từ riêng biệt: {row.distinct_words}")
- Truy vấn Có tham số: Cần thiết cho bảo mật (ngăn chặn SQL injection) khi đưa đầu vào từ người dùng vào các truy vấn.
from google.cloud.bigquery import ScalarQueryParameter, ArrayQueryParameter, StructQueryParameter, QueryJobConfig
# Ví dụ: Tìm tên bắt đầu bằng một tiền tố cụ thể trong một tiểu bang nhất định
state_param = "NY"
prefix_param = "Ma"
min_count_param = 1000
query = """
SELECT name, SUM(number) as total_people
FROM `bigquery-public-data.usa_names.usa_1910_2013`
WHERE state = @state_abbr AND name LIKE @name_prefix
GROUP BY name
HAVING total_people >= @min_count
ORDER BY total_people DESC;
"""
job_config = QueryJobConfig(
query_parameters=[
ScalarQueryParameter("state_abbr", "STRING", state_param),
# Sử dụng 'val%' cho toán tử LIKE
ScalarQueryParameter("name_prefix", "STRING", f"{prefix_param}%"),
ScalarQueryParameter("min_count", "INT64", min_count_param),
]
)
query_job = client.query(query, job_config=job_config)
print(f"Bắt đầu công việc truy vấn có tham số: {query_job.job_id}")
rows = query_job.result()
print(f"\nTên bắt đầu bằng '{prefix_param}' ở {state_param} với >= {min_count_param} người:")
for row in rows:
print(f"Tên: {row.name}, Số lượng: {row.total_people}")
3. Quản lý Tập dữ liệu:
Bạn có thể tạo, liệt kê, lấy thông tin chi tiết về và xóa các tập dữ liệu.
# Định nghĩa ID tập dữ liệu và vị trí
project_id = client.project
dataset_id = f"{project_id}.my_new_dataset"
dataset_location = "US" # ví dụ: "US", "EU", "asia-northeast1"
# Tạo một đối tượng Tập dữ liệu đầy đủ để gửi đến API.
dataset = bigquery.Dataset(dataset_id)
dataset.location = dataset_location
dataset.description = "Tập dữ liệu được tạo qua thư viện khach hàng Python"
try:
# Thực hiện một yêu cầu API để tạo tập dữ liệu.
dataset = client.create_dataset(dataset, timeout=30) # Thực hiện yêu cầu API.
print(f"Đã tạo tập dữ liệu {client.project}.{dataset.dataset_id}")
# Liệt kê các tập dữ liệu trong dự án
print("\nCác tập dữ liệu trong dự án:")
datasets = list(client.list_datasets()) # Yêu cầu API
if datasets:
for ds in datasets:
print(f"\t{ds.dataset_id}")
else:
print(f"\tDự án {client.project} không chứa bất kỳ tập dữ liệu nào.")
# Lấy thông tin tập dữ liệu
retrieved_dataset = client.get_dataset(dataset_id) # Yêu cầu API
print(f"\nLấy thông tin tập dữ liệu cho {dataset_id}:")
print(f"\tMô tả: {retrieved_dataset.description}")
print(f"\tVị trí: {retrieved_dataset.location}")
except Exception as e:
print(f"Lỗi trong quá trình thao tác tập dữ liệu: {e}")
finally:
# Dọn dẹp: Xóa tập dữ liệu
try:
client.delete_dataset(
dataset_id, delete_contents=True, not_found_ok=True
) # Yêu cầu API
print(f"\nXóa thành công tập dữ liệu '{dataset_id}'.")
except Exception as e:
print(f"Lỗi khi xóa tập dữ liệu {dataset_id}: {e}")
4. Quản lý Bảng:
Các thao tác tương tự tồn tại đối với bảng: tạo bảng (định nghĩa sơ đồ), tải dữ liệu, lấy siêu dữ liệu và xóa bảng.
# Sử dụng ID tập dữ liệu đã tạo trước đó (đảm bảo nó tồn tại hoặc xóa bước xóa ở trên)
dataset_id_for_table = "my_new_dataset" # Sử dụng ID tập dữ liệu hợp lệ
table_id = f"{client.project}.{dataset_id_for_table}.my_new_table"
# Định nghĩa sơ đồ
schema = [
bigquery.SchemaField("full_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("email", "STRING", mode="NULLABLE"),
]
# Tạo bảng
table = bigquery.Table(table_id, schema=schema)
try:
# Đảm bảo tập dữ liệu tồn tại trước tiên
client.create_dataset(dataset_id_for_table, exists_ok=True)
table = client.create_table(table) # Yêu cầu API
print(
f"Đã tạo bảng {table.project}.{table.dataset_id}.{table.table_id}"
)
# --- Tải Dữ liệu (Ví dụ: từ Pandas DataFrame) ---
data = {'full_name': ['Alice Smith', 'Bob Johnson'],
'age': [30, 45],
'email': ['alice@example.com', None]}
dataframe = pd.DataFrame(data)
job_config = bigquery.LoadJobConfig(
# Khuyến nghị nên chỉ định sơ đồ, đảm bảo kiểu đúng
schema=schema,
# Tùy chọn: ghi đè dữ liệu bảng
write_disposition="WRITE_TRUNCATE",
# Hoặc thêm vào: write_disposition="WRITE_APPEND",
)
load_job = client.load_table_from_dataframe(
dataframe, table_id, job_config=job_config
) # Yêu cầu API
print(f"Bắt đầu công việc {load_job.job_id} để tải dữ liệu từ DataFrame")
load_job.result() # Đợi công việc hoàn thành.
print("Công việc tải DataFrame đã kết thúc.")
destination_table = client.get_table(table_id) # Yêu cầu API
print(f"Đã tải {destination_table.num_rows} hàng vào bảng {table_id}")
# --- Tải Dữ liệu (Ví dụ: từ URI Google Cloud Storage) ---
# Giả sử một tệp CSV gs://your-bucket/data.csv tồn tại với dữ liệu tương thích
# uri = "gs://your-bucket/data.csv"
# job_config_gcs = bigquery.LoadJobConfig(
# schema=schema,
# skip_leading_rows=1, # Bỏ qua hàng tiêu đề
# source_format=bigquery.SourceFormat.CSV,
# write_disposition="WRITE_APPEND", # Thêm vào dữ liệu hiện có
# )
# load_job_gcs = client.load_table_from_uri(
# uri, table_id, job_config=job_config_gcs
# )
# print(f"Bắt đầu công việc {load_job_gcs.job_id} để tải dữ liệu từ GCS")
# load_job_gcs.result()
# print("Công việc tải từ GCS đã hoàn thành.")
# destination_table = client.get_table(table_id)
# print(f"Tổng số hàng sau khi tải từ GCS: {destination_table.num_rows}")
except Exception as e:
print(f"Lỗi trong quá trình thao tác bảng: {e}")
finally:
# Dọn dẹp: Xóa bảng
try:
client.delete_table(table_id, not_found_ok=True) # Yêu cầu API
print(f"Xóa thành công bảng '{table_id}'.")
# Tùy chọn xóa tập dữ liệu một lần nữa nếu nó chỉ được sử dụng cho ví dụ này
# client.delete_dataset(dataset_id_for_table, delete_contents=True, not_found_ok=True)
# print(f"Xóa thành công tập dữ liệu '{dataset_id_for_table}'.")
except Exception as e:
print(f"Lỗi khi xóa bảng {table_id}: {e}")
5. Làm việc với Công việc:
Tất cả các hoạt động bất đồng bộ (truy vấn, tải, xuất, sao chép) đều tạo ra các nguồn lực Công việc. Bạn có thể liệt kê và quản lý các công việc này.
# Liệt kê các công việc gần đây
print("\nCác Công việc BigQuery Gần đây:")
for job in client.list_jobs(max_results=10): # Yêu cầu API
print(f"ID Công việc: {job.job_id}, Loại: {job.job_type}, Trạng thái: {job.state}, Được tạo: {job.created}")
# Lấy một công việc cụ thể (thay thế bằng một ID công việc hợp lệ từ các lần chạy trước)
# try:
# job_id_to_get = "..." # Thay thế bằng một ID công việc thực
# location = "US" # Thay thế bằng vị trí của công việc nếu không phải mặc định
# retrieved_job = client.get_job(job_id_to_get, location=location) # Yêu cầu API
# print(f"\nThông tin cho công việc {retrieved_job.job_id}:")
# print(f"\tTrạng thái: {retrieved_job.state}")
# if retrieved_job.error_result:
# print(f"\tLỗi: {retrieved_job.errors}")
# except NotFound:
# print(f"Công việc {job_id_to_get} không được tìm thấy.")
# except Exception as e:
# print(f"Lỗi khi lấy công việc: {e}")
Tận dụng các API Chuyên biệt (Khái niệm và Trường hợp Sử dụng)
Mặc dù thư viện client cốt lõi bao phủ nhiều trường hợp sử dụng, các API chuyên biệt cung cấp hiệu suất hoặc tính năng nâng cao cho các tác vụ cụ thể.
1. BigQuery Storage Read API (Python):
- Mục đích: Truy xuất dữ liệu cực nhanh từ các bảng BigQuery, bỏ qua động cơ thực thi truy vấn tiêu chuẩn. Lý tưởng để xuất các tập dữ liệu lớn hoặc đưa dữ liệu vào các pipeline đào tạo ML hoặc các framework xử lý dữ liệu.
- Cách thức hoạt động: Bạn tạo một
ReadSession
xác định bảng, cột và bộ lọc hàng. API trả về một hoặc nhiềuStreams
. Bạn đọc dữ liệu (thường ở định dạng Apache Arrow hoặc Avro) từ những luồng này, thường là song song. - Thư viện Client:
google-cloud-bigquery-storage
- Ví dụ Sử dụng Python Khái niệm (với tích hợp Pandas):
# Yêu cầu: pip install google-cloud-bigquery-storage pyarrow pandas db-dtypes
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types, GetDataStreamRequest
# --- Sử dụng pandas read_gbq (Tích hợp đơn giản nhất) ---
# Điều này tự động sử dụng API Lưu trữ nếu được cài đặt và có lợi
# table_id_read = "bigquery-public-data.usa_names.usa_1910_2013"
# cols_to_read = ["name", "number", "state"]
# row_filter = "state = 'CA' AND number > 5000"
#
# try:
# df_storage = pd.read_gbq(
# table_id_read,
# project_id=client.project,
# columns=cols_to_read,
# row_filter=row_filter,
# use_bqstorage_api=True, # Yêu cầu rõ ràng Lưu trữ API
# progress_bar_type='tqdm' # Thanh tiến trình tùy chọn
# )
# print("\nĐọc dữ liệu sử dụng API Lưu trữ qua pandas.read_gbq:")
# print(df_storage.head())
# print(f"Đã đọc {len(df_storage)} hàng.")
# except Exception as e:
# print(f"Lỗi khi đọc với API Lưu trữ qua read_gbq: {e}")
# --- Sử dụng API Lưu trữ Thủ công (Kiểm soát Nhiều hơn) ---
# bqstorageclient = bigquery_storage_v1.BigQueryReadClient()
# table = f"projects/{project_id}/datasets/{dataset_id}/tables/{table_name}" # Thay thế bằng chi tiết bảng của bạn
# requested_session = types.ReadSession(
# table=table,
# data_format=types.DataFormat.ARROW,
# read_options=types.ReadSession.TableReadOptions(
# selected_fields=["col1", "col2"], # Chỉ định cột
# row_restriction="col1 > 100" # Chỉ định bộ lọc
# ),
# )
# parent = f"projects/{project_id}"
# read_session = bqstorageclient.create_read_session(
# parent=parent,
# read_session=requested_session,
# max_stream_count=1, # Yêu cầu số lượng luồng song song
# )
# stream = read_session.streams[0]
# reader = bqstorageclient.read_rows(stream.name)
# frames = [message.arrow_record_batch for message in reader.messages()]
# if frames:
# arrow_table = pa.Table.from_batches(frames)
# df_manual = arrow_table.to_pandas()
# print("\nĐọc dữ liệu thủ công bằng cách sử dụng API Lưu trữ:")
# print(df_manual.head())
# else:
# print("Không có dữ liệu nào được đọc bằng API Lưu trữ thủ công.")
2. BigQuery Connection API:
- Mục đích: Truy vấn các nguồn dữ liệu bên ngoài (Cloud SQL, Spanner, Cloud Storage) trực tiếp từ BigQuery mà không cần di chuyển dữ liệu.
- Cách thức hoạt động:
- Sử dụng API (hoặc Cloud Console/
bq
tool) để tạo một nguồn lựcConnection
, chỉ định loại và chi tiết nguồn bên ngoài. - Cấp cho tài khoản dịch vụ của kết nối quyền truy cập phù hợp vào tài nguyên bên ngoài (ví dụ: vai trò Người dùng Cloud SQL).
- Sử dụng hàm
EXTERNAL_QUERY("connection_id", "external_sql_query")
trong SQL BigQuery của bạn.
- Thư viện Client:
google-cloud-bigquery-connection
cung cấp các phương thức nhưcreate_connection
,get_connection
,list_connections
,delete_connection
.
3. Analytics Hub API:
- Mục đích: Chia sẻ các tập dữ liệu đã được biên soạn một cách an toàn giữa các tổ chức hoặc đội nhóm.
- Các khái niệm chính:
- Sàn giao dịch Dữ liệu: Một bộ chứa để chia sẻ dữ liệu. Có thể là công khai hoặc riêng tư.
- Mục: Một tham chiếu đến một tập dữ liệu BigQuery cụ thể (hoặc có thể là view/bảng trong tương lai) được chia sẻ trong một sàn giao dịch. Nhà xuất bản tạo mục; các bên đăng ký truy cập chúng.
- Các API: Có sẵn qua REST và thư viện khách hàng (ví dụ:
google-cloud-analyticshub
) để quản lý các sàn giao dịch, mục, và đăng ký một cách lập trình.
4. BigLake API:
- Mục đích: Quản lý các bảng BigLake, cho phép truy vấn dữ liệu trong Cloud Storage bằng giao diện và quản lý của BigQuery.
- Cách thức hoạt động: Định nghĩa một bảng trong BigQuery tham chiếu đến các tệp dữ liệu (Parquet, ORC, v.v.) trong một bucket GCS. API BigLake (chủ yếu là REST hiện tại) được sử dụng để tạo, cập nhật và quản lý siêu dữ liệu cho các bảng này. Việc truy vấn diễn ra thông qua SQL BigQuery tiêu chuẩn.
Sử dụng REST API Trực tiếp
Mặc dù các thư viện khách hàng thường được ưa chuộng, bạn có thể sử dụng REST API trực tiếp nếu:
- Bạn đang sử dụng một ngôn ngữ không có thư viện khách hàng chính thức của Google.
- Bạn cần truy cập vào các tính năng tiên tiến chưa được xuất hiện trong các thư viện.
- Bạn yêu cầu kiểm soát tuyệt đối đối với chu kỳ yêu cầu/phản hồi HTTP.
Thực hiện Yêu cầu:
Bạn thường sẽ sử dụng một khách hàng HTTP (như curl
hoặc thư viện requests
của Python). Bạn cần:
- Nhận mã thông báo truy cập OAuth 2.0 (ví dụ: sử dụng
gcloud auth print-access-token
). - Xây dựng đúng URL điểm cuối API.
- Tạo thân yêu cầu JSON theo đặc tả của phương thức API.
- bao gồm mã thông báo truy cập trong tiêu đề
Authorization: Bearer <token>
. - Xử lý phản hồi HTTP (mã trạng thái, phân tích JSON, thông báo lỗi).
Ví dụ: Chạy một Truy vấn thông qua REST (jobs.query
)
# 1. Nhận Mã thông báo Truy cập
TOKEN=$(gcloud auth print-access-token)
# 2. Định nghĩa ID Dự án và Thân yêu cầu
PROJECT_ID="your-project-id" # Thay thế bằng ID dự án của bạn
REQUEST_BODY=$(cat <<EOF
{
"query": "SELECT name, SUM(number) as total_people FROM \`bigquery-public-data.usa_names.usa_1910_2013\` WHERE state = 'CA' GROUP BY name ORDER BY total_people DESC LIMIT 5;",
"useLegacySql": false
}
EOF
)
# 3. Thực hiện cuộc gọi API sử dụng curl
curl -X POST \
"https://bigquery.googleapis.com/bigquery/v2/projects/${PROJECT_ID}/jobs" \
-H "Authorization: Bearer ${TOKEN}" \
-H "Content-Type: application/json; charset=utf-8" \
-d "${REQUEST_BODY}"
# Phản hồi sẽ chứa thông tin công việc, bao gồm ID công việc.
# Bạn sau đó sẽ cần thực hiện các cuộc gọi tiếp theo đến jobs.getQueryResults
# sử dụng ID công việc để truy xuất các dòng dữ liệu thực tế một khi công việc hoàn thành.
Ví dụ này chỉ khởi tạo công việc truy vấn. Việc truy xuất kết quả yêu cầu phải theo dõi trạng thái công việc và sau đó gọi điểm cuối jobs.getQueryResults
. Điều này nhấn mạnh các bước thêm mà bạn phải thực hiện so với các thư viện khách hàng.
Những Thực tiễn và Mẹo Tốt Nhất
- Ưu tiên các Thư viện Khách hàng: Sử dụng các thư viện khách hàng Google Cloud bất cứ khi nào có thể để đơn giản hóa, tăng tính ổn định và dễ bảo trì.
- Sử dụng ADC: Dựa vào Chứng chỉ Mặc định của Ứng dụng cho xác thực giúp đơn giản hóa việc triển khai trên các môi trường khác nhau. Tránh việc nhúng các chứng chỉ trong mã.
- Truy vấn Có tham số: Luôn sử dụng các truy vấn có tham số khi đưa đầu vào từ bên ngoài vào để ngăn chặn các lỗ hổng SQL injection.
- Tối ưu hóa Truy vấn: Viết SQL hiệu quả. Tránh
SELECT *
, sử dụng các điều khoảnWHERE
một cách hiệu quả, hiểu cách phân vùng và phân cụm để tối ưu hóa hiệu suất trên các bảng lớn. Sử dụng tính năng giải thích kế hoạch truy vấn. - Xử lý Lỗi: Thực hiện xử lý lỗi mạnh mẽ, đặc biệt cho các vấn đề về mạng hoặc lỗi riêng của API (ví dụ: vượt quá hạn ngạch, không tìm thấy). Các thư viện khách hàng thường bao gồm các cơ chế thử lại tích hợp cho các lỗi tạm thời.
- Quản lý Tài nguyên: Xóa rõ ràng các tập dữ liệu hoặc bảng được tạo trong quá trình thử nghiệm hoặc xử lý tạm thời để tránh chi phí lưu trữ không cần thiết.
- Theo dõi Chi phí và Hạn ngạch: Cần chú ý đến việc định giá BigQuery (tính theo byte đã xử lý cho các truy vấn, chi phí lưu trữ). Theo dõi việc sử dụng của bạn và nhận thức về các hạn ngạch API (ví dụ: giới hạn truy vấn đồng thời, giới hạn tỷ lệ yêu cầu API). Sử dụng các tính năng như số byte tối đa được tính phí (
maximum_bytes_billed
trongJobConfig
) để kiểm soát chi phí truy vấn. - Sử dụng các API Chuyên biệt: Sử dụng API Lưu trữ Đọc cho các xuất/đọc dữ liệu lớn và API Kết nối để truy vấn các nguồn bên ngoài một cách hiệu quả.
Kết luận
Các BigQuery APIs cung cấp một cách mạnh mẽ và linh hoạt để tương tác một cách lập trình với kho dữ liệu của bạn. Dù bạn cần tự động hóa việc tải dữ liệu, chạy các truy vấn phân tích phức tạp trong ứng dụng, tích hợp những hiểu biết từ BigQuery vào các bảng điều khiển, hay quản lý tài nguyên một cách động, các API cung cấp những công cụ cần thiết.
Bằng cách hiểu các loại API khác nhau, thiết lập môi trường của bạn một cách chính xác, và tận dụng sự tiện lợi của các thư viện khách hàng (đặc biệt là trong Python), bạn có thể mở khóa tiềm năng đầy đủ của BigQuery vượt ra ngoài console tương tác. Bắt đầu với API cốt lõi cho các tác vụ thông thường, khám phá các API chuyên biệt như API Lưu trữ Đọc cho các hoạt động quan trọng về hiệu suất, và nhớ rằng API REST luôn sẵn có cho kiểm soát tối đa. Khi bạn xây dựng các ứng dụng trên BigQuery, những API này sẽ là thành phần thiết yếu của kiến trúc dữ liệu của bạn.