Apidog

올인원 협업 API 개발 플랫폼

API 설계

API 문서

API 디버깅

API 모킹

API 자동화 테스트

초보자를 위한 BigQuery API 사용법

Young-jae

Young-jae

Updated on April 14, 2025

구글 빅쿼리는 조직들이 대규모 데이터 분석을 처리하는 방식을 혁신했습니다. 서버리스 아키텍처, 확장성, 친숙한 SQL 인터페이스 덕분에 방대한 데이터 세트에서 인사이트를 도출하는 강력한 도구가 되었습니다. 구글 클라우드 콘솔 또는 bq 명령줄 도구를 통해 빅쿼리와 상호작용하는 것은 일반적이지만, 자동화, 통합 및 맞춤형 애플리케이션 개발의 진정한 힘은 포괄적인 애플리케이션 프로그래밍 인터페이스(API) 세트를 통해 열립니다.

이 가이드는 빅쿼리 API를 사용하는 방법에 대한 단계별 탐색을 제공합니다. 이를 통해 프로그램적으로 데이터 웨어하우스와 상호작용하고, 데이터 파이프라인을 구축하며, 애플리케이션에 빅쿼리를 통합하고, 리소스를 효과적으로 관리할 수 있습니다. 사용 가능한 다양한 API 유형, 환경 설정 방법, 파이썬 클라이언트 라이브러리를 사용한 실제 예제 및 고급 사용 사례를 위한 전문 API를 소개합니다.

💡
아름다운 API 문서를 생성하는 훌륭한 API 테스트 도구를 원하십니까?

개발 팀이 함께 작업할 수 있는 통합된 올인원 플랫폼을 원하십니까? 최대 생산성를 위해?

Apidog은 모든 요구를 충족하고 Postman을 훨씬 더 저렴한 가격에 대체합니다!
버튼

빅쿼리 API 이해하기

코드에 들어가기 전에 빅쿼리와 프로그램적으로 상호작용할 수 있는 핵심 개념과 다양한 방법을 이해하는 것이 중요합니다.

핵심 빅쿼리 개념:

  • 프로젝트: 모든 리소스를 보유하는 구글 클라우드의 최상위 컨테이너로, 빅쿼리 데이터 세트, 테이블, 작업을 포함합니다. 청구 및 권한 관리는 일반적으로 프로젝트 수준에서 관리됩니다.
  • 데이터세트: 특정 프로젝트 내에서 테이블 및 뷰를 위한 컨테이너입니다. 데이터 세트는 데이터를 조직하고 접근을 제어하는 데 도움이 됩니다. 전통적인 시스템의 스키마 또는 데이터베이스와 비슷하다고 생각할 수 있습니다.
  • 테이블: 정의된 스키마에 따라 로우와 열에 구조화된 데이터를 저장합니다. 빅쿼리는 다양한 데이터 유형과 중첩/반복 필드를 지원합니다.
  • 작업: 쿼리를 실행하거나 데이터를 로드, 내보내기 또는 테이블을 복사하는 등 빅쿼리에서 수행되는 비동기 작업을 나타냅니다. API를 사용하면 이러한 작업을 시작, 모니터링 및 관리할 수 있습니다.

빅쿼리 API 유형:

빅쿼리는 프로그램적으로 서비스와 상호작용할 수 있는 여러 방법을 제공합니다:

REST API: HTTP와 JSON으로 구축된 기본 API입니다. 빅쿼리 리소스 및 작업에 직접 접근할 수 있습니다. 특정 엔드포인트(예: https://bigquery.googleapis.com/bigquery/v2/projects/{projectId}/datasets)를 타겟으로 하는 표준 HTTP 요청(GET, POST, PUT, DELETE)을 사용하여 상호작용할 수 있습니다. 강력하고 세밀한 제어를 제공하지만 REST API를 직접 사용하려면 인증, 요청 형식, 응답 파싱 및 오류 처리를 수동으로 처리해야 합니다. 인증은 일반적으로 OAuth 2.0 액세스 토큰을 포함합니다.

클라이언트 라이브러리: 구글은 다양한 인기 프로그래밍 언어(파이썬, 자바, 고, Node.js, C#, PHP, 루비 포함)를 위한 고수준 클라이언트 라이브러리를 제공합니다. 이 라이브러리는 기본 REST API를 래핑하여 더욱 관용적이고 개발자 친화적인 경험을 제공합니다. 일반적인 작업을 단순화하고 인증(종종 애플리케이션 기본 자격증명을 통해 자동으로 처리), 재시도 관리 및 작성해야 할 보일러플레이트 코드의 양을 줄입니다. 이는 대부분의 애플리케이션 개발에 추천하는 접근 방식입니다.

전문API: 특히 고성능 또는 특수 작업을 위한 빅쿼리 전용 API를 제공합니다:

  • 빅쿼리 스토리지 읽기 API: 빅쿼리 관리 스토리지에서 데이터를 직접 읽기 위해 설계되었습니다. 대량의 데이터 세트에 대해 표준 쿼리 결과 내보내기보다 훨씬 빠르며 Apache Spark, Apache Beam 및 Pandas와 같은 데이터 처리 프레임워크와 잘 통합됩니다. 여러 스트림을 통한 병렬 읽기를 허용합니다.
  • 빅쿼리 연결 API: 클라우드 SQL, 스패너 또는 클라우드 스토리지와 같은 외부 데이터 소스와 연결을 생성하고 관리할 수 있습니다. 이를 통해 빅쿼리에서 EXTERNAL_QUERY 기능을 사용하여 이러한 외부 시스템의 데이터를 직접 쿼리할 수 있습니다.
  • 분석 허브 API: 서로 다른 조직이나 비즈니스 단위 간의 안전하고 확장 가능한 데이터 공유를 촉진합니다. “리스트” – 데이터 세트에 대한 공유 가능한 참조를 생성할 수 있습니다. 소비자는 이러한 목록을 구독하여 자신의 프로젝트 내에서 공유 데이터를 액세스할 수 있습니다.
  • 빅레이크 API: 빅레이크 테이블을 관리하는 데 사용됩니다. 빅레이크는 열린 형식(예: Parquet, ORC, Avro)으로 저장된 데이터를 대상으로 하는 빅쿼리 테이블을 생성할 수 있습니다. 이를 통해 관리되는 빅쿼리 스토리지와 데이터 레이크 스토리지 모두를 쿼리하기 위한 통합 인터페이스를 제공합니다.

환경 설정하기

API 호출을 시작하려면 로컬 또는 서버 환경을 구성해야 합니다.

사전 요구 사항:

  1. 구글 클라우드 계정: 활성 구글 클라우드 계정이 필요합니다.
  2. 구글 클라우드 프로젝트: 구글 클라우드 콘솔에서 새 프로젝트를 생성하거나 기존 프로젝트를 선택합니다. 프로젝트 ID를 기억해 두세요.
  3. 빅쿼리 API 활성화: 프로젝트에 대해 빅쿼리 API가 활성화되어 있는지 확인합니다. 이를 클라우드 콘솔에서 (API 및 서비스 > 라이브러리 > "빅쿼리 API" 검색 > 활성화) 수행할 수 있습니다. 사용 사례에 따라 빅쿼리 스토리지 읽기 API 또는 빅쿼리 연결 API와 같은 다른 API도 활성화해야 할 수 있습니다.
  4. 청구: 프로젝트에 대해 청구가 활성화되어 있는지 확인합니다. 빅쿼리 작업은 데이터 저장, 처리된 분석 및 스트리밍 삽입에 따라 비용이 발생합니다.

인증:

귀하의 애플리케이션은 구글 클라우드에 인증하여 식별성과 빅쿼리 리소스에 접근할 수 있는 권한을 증명해야 합니다. 대부분의 시나리오에서 권장되는 방법은 애플리케이션 기본 자격증명(ADC)입니다.

  • 애플리케이션 기본 자격증명 (ADC): 이 전략을 통해 귀하의 애플리케이션은 실행되는 환경에 따라 자격 증명을 자동으로 찾을 수 있습니다. 키를 코드에 직접 포함할 필요가 없습니다.
  • 로컬 개발: 구글 클라우드 CLI(gcloud)를 설치합니다. gcloud auth application-default login 명령을 실행합니다. 이렇게 하면 브라우저 창이 열리고 구글 계정으로 로그인하여 SDK가 자격증명에 접근할 수 있도록 허용합니다. 클라이언트 라이브러리는 이러한 자격증명을 자동으로 감지합니다.
  • 컴퓨트 엔진, 클라우드 함수, 앱 엔진 등: ADC는 코드가 실행 중인 리소스와 연결된 서비스 계정을 자동으로 사용합니다.
  • 서비스 계정: 구글 클라우드 외부에서 실행 중이거나 특정 권한이 필요한 애플리케이션의 경우 서비스 계정을 사용할 수 있습니다.
  1. 클라우드 콘솔(IAM 및 관리자 > 서비스 계정)에서 서비스 계정을 생성합니다.
  2. 필요한 빅쿼리 역할(예: BigQuery Data Editor, BigQuery Job User, BigQuery User)을 서비스 계정에 부여합니다.
  3. 서비스 계정 키 파일(JSON 형식)을 다운로드합니다.
  4. 환경 변수를 GOOGLE_APPLICATION_CREDENTIALS로 설정하고 다운로드한 JSON 키 파일의 절대 경로를 지정합니다. 환경 변수가 설정되면 클라이언트 라이브러리는 이 키 파일을 인증에 자동으로 사용합니다.

클라이언트 라이브러리 설치 (파이썬 예제):

우리는 예제를 위해 파이썬에 집중합니다. pip를 사용하여 필요한 라이브러리를 설치할 수 있습니다:

pip install google-cloud-bigquery
# 선택 사항: 더 빠른 읽기를 위한 스토리지 API 라이브러리 설치
pip install google-cloud-bigquery-storage
# 선택 사항: 더 나은 유형 처리를 위한 pandas 통합 및 db-dtypes 설치
pip install pandas db-dtypes pyarrow

최신 라이브러리 기능을 위해 Python이 설치되어 있는지 확인하십시오(버전 3.7+ 권장).

빅쿼리 클라이언트 라이브러리 사용하기(파이썬 예제)

이제 google-cloud-bigquery 파이썬 라이브러리를 사용하여 일반적인 빅쿼리 작업을 탐색하겠습니다.

1. 클라이언트 가져오기 및 초기화:

먼저 라이브러리를 가져옵니다. 그런 다음 클라이언트 인스턴스를 생성합니다. ADC가 올바르게 구성되면 클라이언트가 자동으로 인증됩니다.

from google.cloud import bigquery
import pandas as pd

# 빅쿼리 클라이언트 객체 구성.
# GOOGLE_APPLICATION_CREDENTIALS가 설정되어 있으면 서비스 계정을 사용합니다.
# gcloud auth application-default login이 실행되면 해당 자격증명을 사용합니다.
# GCP 인프라에서 실행 중이면 인스턴스의 서비스 계정을 사용합니다.
client = bigquery.Client()

# 필요시 프로젝트 ID를 명시적으로 지정할 수 있습니다.
# 그렇지 않으면 환경/ADC 자격증명에서 자주 추론됩니다.
# client = bigquery.Client(project='your-project-id')

print("클라이언트가 성공적으로 생성되었습니다.")

2. 쿼리 실행:

가장 일반적인 작업은 SQL 쿼리를 실행하는 것입니다.

  • 간단한 동기 쿼리: 빠르게 완료될 것으로 예상되는 쿼리입니다. query_and_wait()는 쿼리를 전송하고 결과를 기다립니다.
# SQL 쿼리 정의
query = """
    SELECT name, SUM(number) as total_people
    FROM `bigquery-public-data.usa_names.usa_1910_2013`
    WHERE state = 'TX'
    GROUP BY name, state
    ORDER BY total_people DESC
    LIMIT 10
"""

# API 요청을 만들고 작업이 완료될 때까지 기다립니다.
query_job = client.query(query)  # API 요청
print(f"작업 시작됨: {query_job.job_id}")

# 작업이 완료될 때까지 기다리고 결과를 가져옵니다.
rows = query_job.result()  # 작업이 완료될 때까지 기다립니다.

print("\nTX에서의 상위 10개 이름 (1910-2013):")
for row in rows:
    # 행 값은 필드 이름 또는 인덱스에 따라 접근이 가능합니다.
    print(f"이름: {row.name}, 수: {row['total_people']}") # 속성 또는 키로 접근

# 결과를 Pandas 데이터프레임으로 변환
df = rows.to_dataframe()
print("\nPandas 데이터프레임으로서의 결과:")
print(df.head())
  • 비동기 쿼리: 장기 실행 쿼인의 경우 작업을 시작하고 나중에 상태를 확인합니다.
query = """
    SELECT corpus, COUNT(word) as distinct_words
    FROM `bigquery-public-data.samples.shakespeare`
    GROUP BY corpus
    ORDER BY distinct_words DESC;
"""
job_config = bigquery.QueryJobConfig(
    # 쿼리에 대해 표준 SQL 구문 사용.
    use_legacy_sql=False
)

# 추가 구성을 전달하며 쿼리를 시작합니다.
query_job = client.query(query, job_config=job_config) # 기다리지 않음
print(f"비동기 작업 시작됨: {query_job.job_id}")

# --- 애플리케이션의 나중에 ---
# 작업 상태 확인 (선택 사항)
# from google.cloud.exceptions import NotFound
# try:
#     job = client.get_job(query_job.job_id, location=query_job.location)
#     print(f"작업 {job.job_id} 상태: {job.state}")
#     if job.state == "DONE":
#         if job.error_result:
#             print(f"작업 실패: {job.errors}")
#         else:
#             results = job.result() # 결과 가져오기
#             print("결과를 가져왔습니다.")
#             # 결과 처리...
# except NotFound:
#     print(f"작업 {query_job.job_id}를 찾을 수 없습니다.")

# 필요시 완료 대기
results = query_job.result() # 이 작업이 완료될 때까지 블로킹됩니다.
print("비동기 작업이 완료되었습니다.")
for row in results:
    print(f"말뭉치: {row.corpus}, 고유 단어 수: {row.distinct_words}")

  • 매개변수화된 쿼리: 보안(사용자 입력이 쿼리에 포함될 때 SQL 인젝션 방지)에 필수적입니다.
from google.cloud.bigquery import ScalarQueryParameter, ArrayQueryParameter, StructQueryParameter, QueryJobConfig

# 예: 특정 주에서 특정 접두사로 시작하는 이름 찾기
state_param = "NY"
prefix_param = "Ma"
min_count_param = 1000

query = """
    SELECT name, SUM(number) as total_people
    FROM `bigquery-public-data.usa_names.usa_1910_2013`
    WHERE state = @state_abbr AND name LIKE @name_prefix
    GROUP BY name
    HAVING total_people >= @min_count
    ORDER BY total_people DESC;
"""

job_config = QueryJobConfig(
    query_parameters=[
        ScalarQueryParameter("state_abbr", "STRING", state_param),
        # LIKE 연산자에 대해 'val%' 사용
        ScalarQueryParameter("name_prefix", "STRING", f"{prefix_param}%"),
        ScalarQueryParameter("min_count", "INT64", min_count_param),
    ]
)

query_job = client.query(query, job_config=job_config)
print(f"매개변수화된 쿼리 작업 시작됨: {query_job.job_id}")

rows = query_job.result()

print(f"\n'{prefix_param}'로 시작하는 {state_param} 주의 이름 (인구 >= {min_count_param}명):")
for row in rows:
    print(f"이름: {row.name}, 수: {row.total_people}")

3. 데이터 세트 관리하기:

데이터 세트를 생성하고, 목록을 생성하고, 세부정보를 가져오고, 삭제할 수 있습니다.

# 데이터 세트 ID 및 위치 정의
project_id = client.project
dataset_id = f"{project_id}.my_new_dataset"
dataset_location = "US" # 예: "US", "EU", "asia-northeast1"

# API에 전송할 전체 데이터 세트 객체 구성.
dataset = bigquery.Dataset(dataset_id)
dataset.location = dataset_location
dataset.description = "파이썬 클라이언트 라이브러리를 통해 생성된 데이터 세트"

try:
    # 데이터 세트를 생성하기 위한 API 요청합니다.
    dataset = client.create_dataset(dataset, timeout=30)  # API 요청입니다.
    print(f"생성된 데이터 세트 {client.project}.{dataset.dataset_id}")

    # 프로젝트의 데이터 세트 목록
    print("\n프로젝트의 데이터 세트:")
    datasets = list(client.list_datasets()) # API 요청
    if datasets:
        for ds in datasets:
            print(f"\t{ds.dataset_id}")
    else:
        print(f"\t{client.project} 프로젝트에는 데이터 세트가 없습니다.")

    # 데이터 세트 정보 가져오기
    retrieved_dataset = client.get_dataset(dataset_id) # API 요청
    print(f"\n{dataset_id}의 데이터 세트 정보 가져오기:")
    print(f"\t설명: {retrieved_dataset.description}")
    print(f"\t위치: {retrieved_dataset.location}")

except Exception as e:
    print(f"데이터 세트 작업 중 오류: {e}")

finally:
    # 정리하기: 데이터 세트 삭제
    try:
        client.delete_dataset(
            dataset_id, delete_contents=True, not_found_ok=True
        )  # API 요청
        print(f"\n데이터 세트 '{dataset_id}'이 성공적으로 삭제되었습니다.")
    except Exception as e:
         print(f"데이터 세트 {dataset_id} 삭제 중 오류: {e}")

4. 테이블 관리하기:

테이블에 대해 유사한 작업이 존재합니다: 테이블 생성(스키마 정의), 데이터 로드, 메타데이터 얻기 및 테이블 삭제입니다.

# 이전에 생성된 데이터 세트 ID 사용(존재하는지 확인하거나 위 삭제 단계 생략)
dataset_id_for_table = "my_new_dataset" # 유효한 데이터 세트 ID 사용
table_id = f"{client.project}.{dataset_id_for_table}.my_new_table"

# 스키마 정의
schema = [
    bigquery.SchemaField("full_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("email", "STRING", mode="NULLABLE"),
]

# 테이블 생성
table = bigquery.Table(table_id, schema=schema)
try:
    # 먼저 데이터 세트가 존재하는지 확인합니다.
    client.create_dataset(dataset_id_for_table, exists_ok=True)

    table = client.create_table(table)  # API 요청
    print(
        f"테이블 {table.project}.{table.dataset_id}.{table.table_id}가 생성되었습니다."
    )

    # --- 데이터 로드(예: Pandas 데이터프레임에서) ---
    data = {'full_name': ['Alice Smith', 'Bob Johnson'],
            'age': [30, 45],
            'email': ['alice@example.com', None]}
    dataframe = pd.DataFrame(data)

    job_config = bigquery.LoadJobConfig(
        # 스키마를 지정하는 것이 권장되며, 적절한 유형을 보장합니다.
        schema=schema,
        # 선택 사항: 테이블 데이터 덮어쓰기
        write_disposition="WRITE_TRUNCATE",
        # 또는 추가: write_disposition="WRITE_APPEND",
    )

    load_job = client.load_table_from_dataframe(
        dataframe, table_id, job_config=job_config
    )  # API 요청
    print(f"데이터프레임에서 데이터를 로드하기 위한 작업 시작 {load_job.job_id}")

    load_job.result()  # 작업 완료 대기.
    print("데이터프레임 로드 작업이 완료되었습니다.")

    destination_table = client.get_table(table_id) # API 요청
    print(f"테이블 {table_id}에 {destination_table.num_rows}개의 행이 로드되었습니다.")

    # --- 데이터 로드(예: Google Cloud Storage URI에서) ---
    # 호환 데이터로 gs://your-bucket/data.csv가 존재한다고 가정합니다.
    # uri = "gs://your-bucket/data.csv"
    # job_config_gcs = bigquery.LoadJobConfig(
    #     schema=schema,
    #     skip_leading_rows=1, # 헤더 행 건너뛰기
    #     source_format=bigquery.SourceFormat.CSV,
    #     write_disposition="WRITE_APPEND", # 기존 데이터에 추가
    # )
    # load_job_gcs = client.load_table_from_uri(
    #     uri, table_id, job_config=job_config_gcs
    # )
    # print(f"GCS에서 데이터 로드 작업 시작 {load_job_gcs.job_id}")
    # load_job_gcs.result()
    # print("GCS 로드 작업이 완료되었습니다.")
    # destination_table = client.get_table(table_id)
    # print(f"GCS 로드 후 총 행: {destination_table.num_rows}")

except Exception as e:
    print(f"테이블 작업 중 오류: {e}")

finally:
    # 정리하기: 테이블 삭제
    try:
        client.delete_table(table_id, not_found_ok=True)  # API 요청
        print(f"테이블 '{table_id}'이 성공적으로 삭제되었습니다.")
        # 필요시 예제를 위해 데이터 세트를 다시 삭제합니다.
        # client.delete_dataset(dataset_id_for_table, delete_contents=True, not_found_ok=True)
        # print(f"데이터 세트 '{dataset_id_for_table}'이 성공적으로 삭제되었습니다.")
    except Exception as e:
        print(f"테이블 {table_id} 삭제 중 오류: {e}")

5. 작업 관리하기:

모든 비동기 작업(쿼리, 로드, 내보내기, 복사)은 작업 리소스를 생성합니다. 이러한 작업을 나열하고 관리할 수 있습니다.

# 최근 작업 목록
print("\n최근 빅쿼리 작업:")
for job in client.list_jobs(max_results=10): # API 요청
    print(f"작업 ID: {job.job_id}, 유형: {job.job_type}, 상태: {job.state}, 생성일: {job.created}")

# 특정 작업 가져오기(이전 실행에서 유효한 작업 ID로 대체)
# try:
#     job_id_to_get = "..." # 실제 작업 ID로 대체
#     location = "US"      # 기본이 아닐 경우 작업의 위치로 대체
#     retrieved_job = client.get_job(job_id_to_get, location=location) # API 요청
#     print(f"\n작업 {retrieved_job.job_id} 상세 정보:")
#     print(f"\t상태: {retrieved_job.state}")
#     if retrieved_job.error_result:
#         print(f"\t오류: {retrieved_job.errors}")
# except NotFound:
#     print(f"작업 {job_id_to_get}를 찾을 수 없습니다.")
# except Exception as e:
#     print(f"작업 검색 중 오류: {e}")

전문 API 활용하기 (개념 및 사용 사례)

핵심 클라이언트 라이브러리가 많은 사용 사례를 다루지만, 전문 API는 특정 작업에 대한 성능 또는 기능을 향상시킵니다.

1. 빅쿼리 스토리지 읽기 API (파이썬):

  • 목적: 빅쿼리 테이블에서 표준 쿼리 실행 엔진을 우회하여 매우 빠른 데이터 검색. 대량 데이터 세트를 내보내거나 ML 훈련 파이프라인 또는 데이터 처리 프레임워크에 데이터를 공급하는 데 이상적입니다.
  • 작동 방식: 테이블, 열 및 행 필터를 지정하여 ReadSession을 생성합니다. API는 하나 이상의 Streams를 반환합니다. 이러한 스트림에서 데이터를 읽으며, 종종 병렬로 수행됩니다.
  • 클라이언트 라이브러리: google-cloud-bigquery-storage
  • 개념적 파이썬 사용법 (Pandas 통합 포함):
# 필요: pip install google-cloud-bigquery-storage pyarrow pandas db-dtypes

from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types, GetDataStreamRequest

# --- Pandas read_gbq 사용하기 (가장 간단한 통합) ---
# 설치되어 있으면 자동으로 스토리지 API 사용
# table_id_read = "bigquery-public-data.usa_names.usa_1910_2013"
# cols_to_read = ["name", "number", "state"]
# row_filter = "state = 'CA' AND number > 5000"
#
# try:
#      df_storage = pd.read_gbq(
#          table_id_read,
#          project_id=client.project,
#          columns=cols_to_read,
#          row_filter=row_filter,
#          use_bqstorage_api=True, # 명시적으로 스토리지 API 요청
#          progress_bar_type='tqdm' # 선택적 진행률 표시줄
#      )
#      print("\nStorage API를 통해 읽은 데이터:")
#      print(df_storage.head())
#      print(f"{len(df_storage)}개의 행이 읽혔습니다.")
# except Exception as e:
#      print(f"Storage API를 통해 read_gbq 읽기 중 오류: {e}")


# --- 수동 스토리지 API 사용하기 (더 많은 제어) ---
# bqstorageclient = bigquery_storage_v1.BigQueryReadClient()
# table = f"projects/{project_id}/datasets/{dataset_id}/tables/{table_name}" # 테이블 세부정보로 교체

# requested_session = types.ReadSession(
#     table=table,
#     data_format=types.DataFormat.ARROW,
#     read_options=types.ReadSession.TableReadOptions(
#         selected_fields=["col1", "col2"], # 열 지정
#         row_restriction="col1 > 100"     # 필터 지정
#     ),
# )
# parent = f"projects/{project_id}"

# read_session = bqstorageclient.create_read_session(
#     parent=parent,
#     read_session=requested_session,
#     max_stream_count=1, # 병렬 스트림 수 요청
# )

# stream = read_session.streams[0]
# reader = bqstorageclient.read_rows(stream.name)
# frames = [message.arrow_record_batch for message in reader.messages()]
# if frames:
#     arrow_table = pa.Table.from_batches(frames)
#     df_manual = arrow_table.to_pandas()
#     print("\nStorage API를 사용하여 수동으로 읽은 데이터:")
#     print(df_manual.head())
# else:
#     print("수동 Storage API를 사용하여 읽은 데이터가 없습니다.")

2. 빅쿼리 연결 API:

  • 목적: 데이터 이동 없이 빅쿼리에서 외부 데이터 소스(클라우드 SQL, 스패너, 클라우드 스토리지)를 직접 쿼리합니다.
  • 작동 방식:
  1. API(또는 클라우드 콘솔/bq 도구)를 사용하여 외부 소스 유형 및 세부 정보를 지정하여 Connection 리소스를 생성합니다.
  2. 외부 리소스에 대해 연결의 서비스 계정에 적절한 권한을 부여합니다(예: 클라우드 SQL 사용자 역할).
  3. BigQuery SQL 내에서 EXTERNAL_QUERY("connection_id", "external_sql_query") 기능을 사용합니다.
  • 클라이언트 라이브러리: google-cloud-bigquery-connectioncreate_connection, get_connection, list_connections, delete_connection와 같은 메서드를 제공합니다.

3. 분석 허브 API:

  • 목적: 전문화된 데이터 세트를 조직 또는 팀 간에 안전하게 공유합니다.
  • 핵심 개념:
  • 데이터 교환: 데이터를 공유하기 위한 컨테이너입니다. 공공 또는 비공식적일 수 있습니다.
  • 리스트: 교환 내에서 공유되는 특정 빅쿼리 데이터 세트(또는 잠재적으로 미래의 뷰/테이블)에 대한 참조입니다. 게시자는 리스트를 생성하고, 구독자는 이에 액세스합니다.
  • APIs: 교환, 리스트 및 구독을 프로그래밍 방식으로 관리하기 위해 REST 및 클라이언트 라이브러리를 통해 사용할 수 있습니다(예: google-cloud-analyticshub).

4. 빅레이크 API:

  • 목적: 빅쿼리 인터페이스와 거버넌스를 사용하여 클라우드 스토리지에서 데이터를 쿼리할 수 있는 빅레이크 테이블을 관리합니다.
  • 작동 방식: GCS 버킷의 데이터 파일(Parquet, ORC 등)을 참조하는 테이블을 빅쿼리에서 정의합니다. 빅레이크 API(현재 주로 REST)는 이러한 테이블의 메타데이터를 생성, 업데이트 및 관리하는 데 사용됩니다. 쿼리는 표준 빅쿼리 SQL을 통해 수행됩니다.

REST API 직접 사용하기

클라이언트 라이브러리는 일반적으로 선호되지만 다음과 같은 경우 REST API를 직접 사용할 수 있습니다:

  • 공식 구글 클라이언트 라이브러리가 없는 언어를 사용하는 경우.
  • 아직 라이브러리에서 노출되지 않은 최첨단 기능에 대한 접근이 필요한 경우.
  • HTTP 요청/응답 사이클을 완전히 제어할 필요가 있는 경우.

요청 만들기:

일반적으로 curl 또는 파이썬의 requests 라이브러리와 같은 HTTP 클라이언트를 사용합니다. 다음이 필요합니다:

  1. OAuth 2.0 액세스 토큰을 얻습니다(예: gcloud auth print-access-token 사용).
  2. 올바른 API 엔드포인트 URL을 구성합니다.
  3. API 메서드의 사양에 따라 JSON 요청 본문을 생성합니다.
  4. Authorization: Bearer <token> 헤더에 액세스 토큰을 포함합니다.
  5. HTTP 응답을 처리합니다(상태 코드, JSON 파싱, 오류 메시지).

예제: REST를 통해 쿼리 실행하기 (jobs.query)

# 1. 액세스 토큰 얻기
TOKEN=$(gcloud auth print-access-token)

# 2. 프로젝트 ID 및 요청 본문 정의
PROJECT_ID="your-project-id" # 프로젝트 ID로 교체
REQUEST_BODY=$(cat <<EOF
{
  "query": "SELECT name, SUM(number) as total_people FROM \`bigquery-public-data.usa_names.usa_1910_2013\` WHERE state = 'CA' GROUP BY name ORDER BY total_people DESC LIMIT 5;",
  "useLegacySql": false
}
EOF
)

# 3. curl로 API 호출하기
curl -X POST \
  "https://bigquery.googleapis.com/bigquery/v2/projects/${PROJECT_ID}/jobs" \
  -H "Authorization: Bearer ${TOKEN}" \
  -H "Content-Type: application/json; charset=utf-8" \
  -d "${REQUEST_BODY}"

# 응답에는 작업 정보가 포함되며, 작업 ID도 포함됩니다.
# 이후 jobs.getQueryResults를 호출하여 작업이 완료된 후 실제 데이터 행을 검색해야 합니다.

이 예제는 쿼리 작업을 시작하는 것에만 해당됩니다. 결과를 검색하려면 작업 상태를 폴링하고 jobs.getQueryResults 엔드포인트를 호출해야 합니다. 이는 클라이언트 라이브러리와 비교할 때 추가 단계가 있음을 강조합니다.

모범 사례 및 팁

  • 클라이언트 라이브러리 선호: 간단함, 강인성 및 유지 보수를 용이하게 하기 위해 가능한 한 구글 클라우드 클라이언트 라이브러리를 사용하십시오.
  • ADC 사용: 애플리케이션 기본 자격증명을 통한 인증은 다양한 환경에서의 배포를 단순화합니다. 자격 증명을 코드에 포함하는 것을 피하십시오.
  • 매개변수화된 쿼리: SQL 인젝션 취약점을 방지하기 위해 외부 입력을 포함할 때 항상 매개변수화된 쿼리를 사용하십시오.
  • 쿼리 최적화: 효율적인 SQL을 작성하십시오. SELECT * 를 피하고 WHERE 절을 효과적으로 사용하고, 큰 테이블에 대한 성능을 위해 파티셔닝 및 클러스터링을 이해하십시오. 쿼리 계획 설명 기능을 사용하십시오.
  • 오류 처리: 네트워크 문제나 API 특정 오류(예: 쿼타 초과, 찾을 수 없음)에 대해 강력한 오류 처리를 구현하십시오. 클라이언트 라이브러리는 종종 일시적인 오류에 대한 기본 재시도 메커니즘을 포함합니다.
  • 리소스 관리: 테스트 또는 임시 처리를 위해 생성된 데이터 세트 또는 테이블을 명시적으로 삭제하여 불필요한 저장 비용을 피하십시오.
  • 비용 및 쿼타 모니터링: 쿼리별 빅쿼리 가격(처리된 바이트, 저장 비용)에 주의하십시오. 사용량을 모니터링하고 API 쿼타(예: 동시 쿼리 제한, API 요청 속도 제한)에 대해 알고 있으십시오. 쿼리 비용을 제어하기 위해 최대 바이트 청구(maximum_bytes_billedJobConfig에 사용하십시오.)와 같은 기능을 사용하십시오.
  • 전문 API 사용: 대규모 데이터 내보내기/읽기를 위해 스토리지 읽기 API를 활용하고, 외부 소스를 효율적으로 쿼리하기 위해 연결 API를 활용하십시오.

결론

빅쿼리 API는 데이터 웨어하우스와 프로그램적으로 상호작용할 수 있는 강력하고 유연한 방법을 제공합니다. 데이터를 자동으로 로드하거나 애플리케이션의 일환으로 복잡한 분석 쿼리를 실행하거나 빅쿼리 인사이트를 대시보드에 통합하거나 리소스를 동적으로 관리해야 하는 경우, API는 필요한 도구를 제공합니다.

다양한 API 유형을 이해하고 환경을 올바르게 설정하며 클라이언트 라이브러리(특히 파이썬)의 편리함을 활용함으로써 인터랙티브 콘솔을 넘어서 빅쿼리의 잠재력을 최대한 활용할 수 있습니다. 공통 작업을 위한 핵심 API로 시작하고, 성능이 중요한 작업을 위해 스토리지 읽기 API와 같은 전문 API를 탐색하며, 궁극적인 제어를 위해 REST API도 항상 사용할 수 있음을 기억하십시오. 빅쿼리를 기반으로 애플리케이션을 구축할 때 이러한 API는 데이터 아키텍처의 필수 구성 요소가 될 것입니다.