초보자를 위한 BigQuery API 사용법

Young-jae

Young-jae

14 April 2025

초보자를 위한 BigQuery API 사용법

구글 빅쿼리는 조직들이 대규모 데이터 분석을 처리하는 방식을 혁신했습니다. 서버리스 아키텍처, 확장성, 친숙한 SQL 인터페이스 덕분에 방대한 데이터 세트에서 인사이트를 도출하는 강력한 도구가 되었습니다. 구글 클라우드 콘솔 또는 bq 명령줄 도구를 통해 빅쿼리와 상호작용하는 것은 일반적이지만, 자동화, 통합 및 맞춤형 애플리케이션 개발의 진정한 힘은 포괄적인 애플리케이션 프로그래밍 인터페이스(API) 세트를 통해 열립니다.

이 가이드는 빅쿼리 API를 사용하는 방법에 대한 단계별 탐색을 제공합니다. 이를 통해 프로그램적으로 데이터 웨어하우스와 상호작용하고, 데이터 파이프라인을 구축하며, 애플리케이션에 빅쿼리를 통합하고, 리소스를 효과적으로 관리할 수 있습니다. 사용 가능한 다양한 API 유형, 환경 설정 방법, 파이썬 클라이언트 라이브러리를 사용한 실제 예제 및 고급 사용 사례를 위한 전문 API를 소개합니다.

💡
아름다운 API 문서를 생성하는 훌륭한 API 테스트 도구를 원하십니까?

개발 팀이 함께 작업할 수 있는 통합된 올인원 플랫폼을 원하십니까? 최대 생산성를 위해?

Apidog은 모든 요구를 충족하고 Postman을 훨씬 더 저렴한 가격에 대체합니다!
버튼

빅쿼리 API 이해하기

코드에 들어가기 전에 빅쿼리와 프로그램적으로 상호작용할 수 있는 핵심 개념과 다양한 방법을 이해하는 것이 중요합니다.

핵심 빅쿼리 개념:

빅쿼리 API 유형:

빅쿼리는 프로그램적으로 서비스와 상호작용할 수 있는 여러 방법을 제공합니다:

REST API: HTTP와 JSON으로 구축된 기본 API입니다. 빅쿼리 리소스 및 작업에 직접 접근할 수 있습니다. 특정 엔드포인트(예: https://bigquery.googleapis.com/bigquery/v2/projects/{projectId}/datasets)를 타겟으로 하는 표준 HTTP 요청(GET, POST, PUT, DELETE)을 사용하여 상호작용할 수 있습니다. 강력하고 세밀한 제어를 제공하지만 REST API를 직접 사용하려면 인증, 요청 형식, 응답 파싱 및 오류 처리를 수동으로 처리해야 합니다. 인증은 일반적으로 OAuth 2.0 액세스 토큰을 포함합니다.

클라이언트 라이브러리: 구글은 다양한 인기 프로그래밍 언어(파이썬, 자바, 고, Node.js, C#, PHP, 루비 포함)를 위한 고수준 클라이언트 라이브러리를 제공합니다. 이 라이브러리는 기본 REST API를 래핑하여 더욱 관용적이고 개발자 친화적인 경험을 제공합니다. 일반적인 작업을 단순화하고 인증(종종 애플리케이션 기본 자격증명을 통해 자동으로 처리), 재시도 관리 및 작성해야 할 보일러플레이트 코드의 양을 줄입니다. 이는 대부분의 애플리케이션 개발에 추천하는 접근 방식입니다.

전문API: 특히 고성능 또는 특수 작업을 위한 빅쿼리 전용 API를 제공합니다:

환경 설정하기

API 호출을 시작하려면 로컬 또는 서버 환경을 구성해야 합니다.

사전 요구 사항:

  1. 구글 클라우드 계정: 활성 구글 클라우드 계정이 필요합니다.
  2. 구글 클라우드 프로젝트: 구글 클라우드 콘솔에서 새 프로젝트를 생성하거나 기존 프로젝트를 선택합니다. 프로젝트 ID를 기억해 두세요.
  3. 빅쿼리 API 활성화: 프로젝트에 대해 빅쿼리 API가 활성화되어 있는지 확인합니다. 이를 클라우드 콘솔에서 (API 및 서비스 > 라이브러리 > "빅쿼리 API" 검색 > 활성화) 수행할 수 있습니다. 사용 사례에 따라 빅쿼리 스토리지 읽기 API 또는 빅쿼리 연결 API와 같은 다른 API도 활성화해야 할 수 있습니다.
  4. 청구: 프로젝트에 대해 청구가 활성화되어 있는지 확인합니다. 빅쿼리 작업은 데이터 저장, 처리된 분석 및 스트리밍 삽입에 따라 비용이 발생합니다.

인증:

귀하의 애플리케이션은 구글 클라우드에 인증하여 식별성과 빅쿼리 리소스에 접근할 수 있는 권한을 증명해야 합니다. 대부분의 시나리오에서 권장되는 방법은 애플리케이션 기본 자격증명(ADC)입니다.

  1. 클라우드 콘솔(IAM 및 관리자 > 서비스 계정)에서 서비스 계정을 생성합니다.
  2. 필요한 빅쿼리 역할(예: BigQuery Data Editor, BigQuery Job User, BigQuery User)을 서비스 계정에 부여합니다.
  3. 서비스 계정 키 파일(JSON 형식)을 다운로드합니다.
  4. 환경 변수를 GOOGLE_APPLICATION_CREDENTIALS로 설정하고 다운로드한 JSON 키 파일의 절대 경로를 지정합니다. 환경 변수가 설정되면 클라이언트 라이브러리는 이 키 파일을 인증에 자동으로 사용합니다.

클라이언트 라이브러리 설치 (파이썬 예제):

우리는 예제를 위해 파이썬에 집중합니다. pip를 사용하여 필요한 라이브러리를 설치할 수 있습니다:

pip install google-cloud-bigquery
# 선택 사항: 더 빠른 읽기를 위한 스토리지 API 라이브러리 설치
pip install google-cloud-bigquery-storage
# 선택 사항: 더 나은 유형 처리를 위한 pandas 통합 및 db-dtypes 설치
pip install pandas db-dtypes pyarrow

최신 라이브러리 기능을 위해 Python이 설치되어 있는지 확인하십시오(버전 3.7+ 권장).

빅쿼리 클라이언트 라이브러리 사용하기(파이썬 예제)

이제 google-cloud-bigquery 파이썬 라이브러리를 사용하여 일반적인 빅쿼리 작업을 탐색하겠습니다.

1. 클라이언트 가져오기 및 초기화:

먼저 라이브러리를 가져옵니다. 그런 다음 클라이언트 인스턴스를 생성합니다. ADC가 올바르게 구성되면 클라이언트가 자동으로 인증됩니다.

from google.cloud import bigquery
import pandas as pd

# 빅쿼리 클라이언트 객체 구성.
# GOOGLE_APPLICATION_CREDENTIALS가 설정되어 있으면 서비스 계정을 사용합니다.
# gcloud auth application-default login이 실행되면 해당 자격증명을 사용합니다.
# GCP 인프라에서 실행 중이면 인스턴스의 서비스 계정을 사용합니다.
client = bigquery.Client()

# 필요시 프로젝트 ID를 명시적으로 지정할 수 있습니다.
# 그렇지 않으면 환경/ADC 자격증명에서 자주 추론됩니다.
# client = bigquery.Client(project='your-project-id')

print("클라이언트가 성공적으로 생성되었습니다.")

2. 쿼리 실행:

가장 일반적인 작업은 SQL 쿼리를 실행하는 것입니다.

# SQL 쿼리 정의
query = """
    SELECT name, SUM(number) as total_people
    FROM `bigquery-public-data.usa_names.usa_1910_2013`
    WHERE state = 'TX'
    GROUP BY name, state
    ORDER BY total_people DESC
    LIMIT 10
"""

# API 요청을 만들고 작업이 완료될 때까지 기다립니다.
query_job = client.query(query)  # API 요청
print(f"작업 시작됨: {query_job.job_id}")

# 작업이 완료될 때까지 기다리고 결과를 가져옵니다.
rows = query_job.result()  # 작업이 완료될 때까지 기다립니다.

print("\nTX에서의 상위 10개 이름 (1910-2013):")
for row in rows:
    # 행 값은 필드 이름 또는 인덱스에 따라 접근이 가능합니다.
    print(f"이름: {row.name}, 수: {row['total_people']}") # 속성 또는 키로 접근

# 결과를 Pandas 데이터프레임으로 변환
df = rows.to_dataframe()
print("\nPandas 데이터프레임으로서의 결과:")
print(df.head())
query = """
    SELECT corpus, COUNT(word) as distinct_words
    FROM `bigquery-public-data.samples.shakespeare`
    GROUP BY corpus
    ORDER BY distinct_words DESC;
"""
job_config = bigquery.QueryJobConfig(
    # 쿼리에 대해 표준 SQL 구문 사용.
    use_legacy_sql=False
)

# 추가 구성을 전달하며 쿼리를 시작합니다.
query_job = client.query(query, job_config=job_config) # 기다리지 않음
print(f"비동기 작업 시작됨: {query_job.job_id}")

# --- 애플리케이션의 나중에 ---
# 작업 상태 확인 (선택 사항)
# from google.cloud.exceptions import NotFound
# try:
#     job = client.get_job(query_job.job_id, location=query_job.location)
#     print(f"작업 {job.job_id} 상태: {job.state}")
#     if job.state == "DONE":
#         if job.error_result:
#             print(f"작업 실패: {job.errors}")
#         else:
#             results = job.result() # 결과 가져오기
#             print("결과를 가져왔습니다.")
#             # 결과 처리...
# except NotFound:
#     print(f"작업 {query_job.job_id}를 찾을 수 없습니다.")

# 필요시 완료 대기
results = query_job.result() # 이 작업이 완료될 때까지 블로킹됩니다.
print("비동기 작업이 완료되었습니다.")
for row in results:
    print(f"말뭉치: {row.corpus}, 고유 단어 수: {row.distinct_words}")

from google.cloud.bigquery import ScalarQueryParameter, ArrayQueryParameter, StructQueryParameter, QueryJobConfig

# 예: 특정 주에서 특정 접두사로 시작하는 이름 찾기
state_param = "NY"
prefix_param = "Ma"
min_count_param = 1000

query = """
    SELECT name, SUM(number) as total_people
    FROM `bigquery-public-data.usa_names.usa_1910_2013`
    WHERE state = @state_abbr AND name LIKE @name_prefix
    GROUP BY name
    HAVING total_people >= @min_count
    ORDER BY total_people DESC;
"""

job_config = QueryJobConfig(
    query_parameters=[
        ScalarQueryParameter("state_abbr", "STRING", state_param),
        # LIKE 연산자에 대해 'val%' 사용
        ScalarQueryParameter("name_prefix", "STRING", f"{prefix_param}%"),
        ScalarQueryParameter("min_count", "INT64", min_count_param),
    ]
)

query_job = client.query(query, job_config=job_config)
print(f"매개변수화된 쿼리 작업 시작됨: {query_job.job_id}")

rows = query_job.result()

print(f"\n'{prefix_param}'로 시작하는 {state_param} 주의 이름 (인구 >= {min_count_param}명):")
for row in rows:
    print(f"이름: {row.name}, 수: {row.total_people}")

3. 데이터 세트 관리하기:

데이터 세트를 생성하고, 목록을 생성하고, 세부정보를 가져오고, 삭제할 수 있습니다.

# 데이터 세트 ID 및 위치 정의
project_id = client.project
dataset_id = f"{project_id}.my_new_dataset"
dataset_location = "US" # 예: "US", "EU", "asia-northeast1"

# API에 전송할 전체 데이터 세트 객체 구성.
dataset = bigquery.Dataset(dataset_id)
dataset.location = dataset_location
dataset.description = "파이썬 클라이언트 라이브러리를 통해 생성된 데이터 세트"

try:
    # 데이터 세트를 생성하기 위한 API 요청합니다.
    dataset = client.create_dataset(dataset, timeout=30)  # API 요청입니다.
    print(f"생성된 데이터 세트 {client.project}.{dataset.dataset_id}")

    # 프로젝트의 데이터 세트 목록
    print("\n프로젝트의 데이터 세트:")
    datasets = list(client.list_datasets()) # API 요청
    if datasets:
        for ds in datasets:
            print(f"\t{ds.dataset_id}")
    else:
        print(f"\t{client.project} 프로젝트에는 데이터 세트가 없습니다.")

    # 데이터 세트 정보 가져오기
    retrieved_dataset = client.get_dataset(dataset_id) # API 요청
    print(f"\n{dataset_id}의 데이터 세트 정보 가져오기:")
    print(f"\t설명: {retrieved_dataset.description}")
    print(f"\t위치: {retrieved_dataset.location}")

except Exception as e:
    print(f"데이터 세트 작업 중 오류: {e}")

finally:
    # 정리하기: 데이터 세트 삭제
    try:
        client.delete_dataset(
            dataset_id, delete_contents=True, not_found_ok=True
        )  # API 요청
        print(f"\n데이터 세트 '{dataset_id}'이 성공적으로 삭제되었습니다.")
    except Exception as e:
         print(f"데이터 세트 {dataset_id} 삭제 중 오류: {e}")

4. 테이블 관리하기:

테이블에 대해 유사한 작업이 존재합니다: 테이블 생성(스키마 정의), 데이터 로드, 메타데이터 얻기 및 테이블 삭제입니다.

# 이전에 생성된 데이터 세트 ID 사용(존재하는지 확인하거나 위 삭제 단계 생략)
dataset_id_for_table = "my_new_dataset" # 유효한 데이터 세트 ID 사용
table_id = f"{client.project}.{dataset_id_for_table}.my_new_table"

# 스키마 정의
schema = [
    bigquery.SchemaField("full_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("email", "STRING", mode="NULLABLE"),
]

# 테이블 생성
table = bigquery.Table(table_id, schema=schema)
try:
    # 먼저 데이터 세트가 존재하는지 확인합니다.
    client.create_dataset(dataset_id_for_table, exists_ok=True)

    table = client.create_table(table)  # API 요청
    print(
        f"테이블 {table.project}.{table.dataset_id}.{table.table_id}가 생성되었습니다."
    )

    # --- 데이터 로드(예: Pandas 데이터프레임에서) ---
    data = {'full_name': ['Alice Smith', 'Bob Johnson'],
            'age': [30, 45],
            'email': ['alice@example.com', None]}
    dataframe = pd.DataFrame(data)

    job_config = bigquery.LoadJobConfig(
        # 스키마를 지정하는 것이 권장되며, 적절한 유형을 보장합니다.
        schema=schema,
        # 선택 사항: 테이블 데이터 덮어쓰기
        write_disposition="WRITE_TRUNCATE",
        # 또는 추가: write_disposition="WRITE_APPEND",
    )

    load_job = client.load_table_from_dataframe(
        dataframe, table_id, job_config=job_config
    )  # API 요청
    print(f"데이터프레임에서 데이터를 로드하기 위한 작업 시작 {load_job.job_id}")

    load_job.result()  # 작업 완료 대기.
    print("데이터프레임 로드 작업이 완료되었습니다.")

    destination_table = client.get_table(table_id) # API 요청
    print(f"테이블 {table_id}에 {destination_table.num_rows}개의 행이 로드되었습니다.")

    # --- 데이터 로드(예: Google Cloud Storage URI에서) ---
    # 호환 데이터로 gs://your-bucket/data.csv가 존재한다고 가정합니다.
    # uri = "gs://your-bucket/data.csv"
    # job_config_gcs = bigquery.LoadJobConfig(
    #     schema=schema,
    #     skip_leading_rows=1, # 헤더 행 건너뛰기
    #     source_format=bigquery.SourceFormat.CSV,
    #     write_disposition="WRITE_APPEND", # 기존 데이터에 추가
    # )
    # load_job_gcs = client.load_table_from_uri(
    #     uri, table_id, job_config=job_config_gcs
    # )
    # print(f"GCS에서 데이터 로드 작업 시작 {load_job_gcs.job_id}")
    # load_job_gcs.result()
    # print("GCS 로드 작업이 완료되었습니다.")
    # destination_table = client.get_table(table_id)
    # print(f"GCS 로드 후 총 행: {destination_table.num_rows}")

except Exception as e:
    print(f"테이블 작업 중 오류: {e}")

finally:
    # 정리하기: 테이블 삭제
    try:
        client.delete_table(table_id, not_found_ok=True)  # API 요청
        print(f"테이블 '{table_id}'이 성공적으로 삭제되었습니다.")
        # 필요시 예제를 위해 데이터 세트를 다시 삭제합니다.
        # client.delete_dataset(dataset_id_for_table, delete_contents=True, not_found_ok=True)
        # print(f"데이터 세트 '{dataset_id_for_table}'이 성공적으로 삭제되었습니다.")
    except Exception as e:
        print(f"테이블 {table_id} 삭제 중 오류: {e}")

5. 작업 관리하기:

모든 비동기 작업(쿼리, 로드, 내보내기, 복사)은 작업 리소스를 생성합니다. 이러한 작업을 나열하고 관리할 수 있습니다.

# 최근 작업 목록
print("\n최근 빅쿼리 작업:")
for job in client.list_jobs(max_results=10): # API 요청
    print(f"작업 ID: {job.job_id}, 유형: {job.job_type}, 상태: {job.state}, 생성일: {job.created}")

# 특정 작업 가져오기(이전 실행에서 유효한 작업 ID로 대체)
# try:
#     job_id_to_get = "..." # 실제 작업 ID로 대체
#     location = "US"      # 기본이 아닐 경우 작업의 위치로 대체
#     retrieved_job = client.get_job(job_id_to_get, location=location) # API 요청
#     print(f"\n작업 {retrieved_job.job_id} 상세 정보:")
#     print(f"\t상태: {retrieved_job.state}")
#     if retrieved_job.error_result:
#         print(f"\t오류: {retrieved_job.errors}")
# except NotFound:
#     print(f"작업 {job_id_to_get}를 찾을 수 없습니다.")
# except Exception as e:
#     print(f"작업 검색 중 오류: {e}")

전문 API 활용하기 (개념 및 사용 사례)

핵심 클라이언트 라이브러리가 많은 사용 사례를 다루지만, 전문 API는 특정 작업에 대한 성능 또는 기능을 향상시킵니다.

1. 빅쿼리 스토리지 읽기 API (파이썬):

# 필요: pip install google-cloud-bigquery-storage pyarrow pandas db-dtypes

from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types, GetDataStreamRequest

# --- Pandas read_gbq 사용하기 (가장 간단한 통합) ---
# 설치되어 있으면 자동으로 스토리지 API 사용
# table_id_read = "bigquery-public-data.usa_names.usa_1910_2013"
# cols_to_read = ["name", "number", "state"]
# row_filter = "state = 'CA' AND number > 5000"
#
# try:
#      df_storage = pd.read_gbq(
#          table_id_read,
#          project_id=client.project,
#          columns=cols_to_read,
#          row_filter=row_filter,
#          use_bqstorage_api=True, # 명시적으로 스토리지 API 요청
#          progress_bar_type='tqdm' # 선택적 진행률 표시줄
#      )
#      print("\nStorage API를 통해 읽은 데이터:")
#      print(df_storage.head())
#      print(f"{len(df_storage)}개의 행이 읽혔습니다.")
# except Exception as e:
#      print(f"Storage API를 통해 read_gbq 읽기 중 오류: {e}")


# --- 수동 스토리지 API 사용하기 (더 많은 제어) ---
# bqstorageclient = bigquery_storage_v1.BigQueryReadClient()
# table = f"projects/{project_id}/datasets/{dataset_id}/tables/{table_name}" # 테이블 세부정보로 교체

# requested_session = types.ReadSession(
#     table=table,
#     data_format=types.DataFormat.ARROW,
#     read_options=types.ReadSession.TableReadOptions(
#         selected_fields=["col1", "col2"], # 열 지정
#         row_restriction="col1 > 100"     # 필터 지정
#     ),
# )
# parent = f"projects/{project_id}"

# read_session = bqstorageclient.create_read_session(
#     parent=parent,
#     read_session=requested_session,
#     max_stream_count=1, # 병렬 스트림 수 요청
# )

# stream = read_session.streams[0]
# reader = bqstorageclient.read_rows(stream.name)
# frames = [message.arrow_record_batch for message in reader.messages()]
# if frames:
#     arrow_table = pa.Table.from_batches(frames)
#     df_manual = arrow_table.to_pandas()
#     print("\nStorage API를 사용하여 수동으로 읽은 데이터:")
#     print(df_manual.head())
# else:
#     print("수동 Storage API를 사용하여 읽은 데이터가 없습니다.")

2. 빅쿼리 연결 API:

  1. API(또는 클라우드 콘솔/bq 도구)를 사용하여 외부 소스 유형 및 세부 정보를 지정하여 Connection 리소스를 생성합니다.
  2. 외부 리소스에 대해 연결의 서비스 계정에 적절한 권한을 부여합니다(예: 클라우드 SQL 사용자 역할).
  3. BigQuery SQL 내에서 EXTERNAL_QUERY("connection_id", "external_sql_query") 기능을 사용합니다.

3. 분석 허브 API:

4. 빅레이크 API:

REST API 직접 사용하기

클라이언트 라이브러리는 일반적으로 선호되지만 다음과 같은 경우 REST API를 직접 사용할 수 있습니다:

요청 만들기:

일반적으로 curl 또는 파이썬의 requests 라이브러리와 같은 HTTP 클라이언트를 사용합니다. 다음이 필요합니다:

  1. OAuth 2.0 액세스 토큰을 얻습니다(예: gcloud auth print-access-token 사용).
  2. 올바른 API 엔드포인트 URL을 구성합니다.
  3. API 메서드의 사양에 따라 JSON 요청 본문을 생성합니다.
  4. Authorization: Bearer <token> 헤더에 액세스 토큰을 포함합니다.
  5. HTTP 응답을 처리합니다(상태 코드, JSON 파싱, 오류 메시지).

예제: REST를 통해 쿼리 실행하기 (jobs.query)

# 1. 액세스 토큰 얻기
TOKEN=$(gcloud auth print-access-token)

# 2. 프로젝트 ID 및 요청 본문 정의
PROJECT_ID="your-project-id" # 프로젝트 ID로 교체
REQUEST_BODY=$(cat <<EOF
{
  "query": "SELECT name, SUM(number) as total_people FROM \`bigquery-public-data.usa_names.usa_1910_2013\` WHERE state = 'CA' GROUP BY name ORDER BY total_people DESC LIMIT 5;",
  "useLegacySql": false
}
EOF
)

# 3. curl로 API 호출하기
curl -X POST \
  "https://bigquery.googleapis.com/bigquery/v2/projects/${PROJECT_ID}/jobs" \
  -H "Authorization: Bearer ${TOKEN}" \
  -H "Content-Type: application/json; charset=utf-8" \
  -d "${REQUEST_BODY}"

# 응답에는 작업 정보가 포함되며, 작업 ID도 포함됩니다.
# 이후 jobs.getQueryResults를 호출하여 작업이 완료된 후 실제 데이터 행을 검색해야 합니다.

이 예제는 쿼리 작업을 시작하는 것에만 해당됩니다. 결과를 검색하려면 작업 상태를 폴링하고 jobs.getQueryResults 엔드포인트를 호출해야 합니다. 이는 클라이언트 라이브러리와 비교할 때 추가 단계가 있음을 강조합니다.

모범 사례 및 팁

결론

빅쿼리 API는 데이터 웨어하우스와 프로그램적으로 상호작용할 수 있는 강력하고 유연한 방법을 제공합니다. 데이터를 자동으로 로드하거나 애플리케이션의 일환으로 복잡한 분석 쿼리를 실행하거나 빅쿼리 인사이트를 대시보드에 통합하거나 리소스를 동적으로 관리해야 하는 경우, API는 필요한 도구를 제공합니다.

다양한 API 유형을 이해하고 환경을 올바르게 설정하며 클라이언트 라이브러리(특히 파이썬)의 편리함을 활용함으로써 인터랙티브 콘솔을 넘어서 빅쿼리의 잠재력을 최대한 활용할 수 있습니다. 공통 작업을 위한 핵심 API로 시작하고, 성능이 중요한 작업을 위해 스토리지 읽기 API와 같은 전문 API를 탐색하며, 궁극적인 제어를 위해 REST API도 항상 사용할 수 있음을 기억하십시오. 빅쿼리를 기반으로 애플리케이션을 구축할 때 이러한 API는 데이터 아키텍처의 필수 구성 요소가 될 것입니다.

Apidog에서 API 설계-첫 번째 연습

API를 더 쉽게 구축하고 사용하는 방법을 발견하세요