Comment utiliser les API BigQuery pour les débutants

Ce guide explore l'utilisation des API BigQuery, pour interagir avec l'entrepôt de données, créer des pipelines et gérer les ressources.

Louis Dupont

Louis Dupont

5 June 2025

Comment utiliser les API BigQuery pour les débutants

```html

Google BigQuery a révolutionné la façon dont les organisations gèrent l'analyse de données à grande échelle. Son architecture sans serveur, son évolutivité et son interface SQL familière en font un outil puissant pour tirer des informations de vastes ensembles de données. Bien qu'interagir avec BigQuery via la Google Cloud Console ou l'outil en ligne de commande bq soit courant, la véritable puissance de l'automatisation, de l'intégration et du développement d'applications personnalisées est débloquée grâce à son ensemble complet d'interfaces de programmation d'applications (API).

Ce guide fournit une exploration étape par étape de la façon d'utiliser les API BigQuery, vous permettant d'interagir par programmation avec votre entrepôt de données, de créer des pipelines de données, d'intégrer BigQuery dans vos applications et de gérer efficacement les ressources. Nous aborderons les différents types d'API disponibles, comment configurer votre environnement, des exemples pratiques utilisant la bibliothèque cliente Python et présenterons des API spécialisées pour les cas d'utilisation avancés.

💡
Vous voulez un excellent outil de test d'API qui génère une belle documentation d'API ?

Vous voulez une plateforme intégrée, tout-en-un, pour que votre équipe de développeurs travaille ensemble avec une productivité maximale ?

Apidog répond à toutes vos demandes et remplace Postman à un prix beaucoup plus abordable !
button

Comprendre les API BigQuery

Avant de plonger dans le code, il est crucial de comprendre les concepts de base et les différentes façons dont vous pouvez interagir avec BigQuery par programmation.

Concepts de base de BigQuery :

Types d'API BigQuery :

BigQuery offre plusieurs façons d'interagir avec ses services par programmation :

API REST : Il s'agit de l'API fondamentale construite sur HTTP et JSON. Elle fournit un accès direct aux ressources et aux opérations BigQuery. Vous pouvez interagir avec elle à l'aide de requêtes HTTP standard (GET, POST, PUT, DELETE) ciblant des points de terminaison spécifiques (par exemple, https://bigquery.googleapis.com/bigquery/v2/projects/{projectId}/datasets). Bien que puissante et offrant un contrôle précis, l'utilisation directe de l'API REST nécessite de gérer manuellement l'authentification, le formatage des requêtes, l'analyse des réponses et la gestion des erreurs. L'authentification implique généralement des jetons d'accès OAuth 2.0.

Bibliothèques clientes : Google fournit des bibliothèques clientes de haut niveau pour divers langages de programmation populaires (y compris Python, Java, Go, Node.js, C#, PHP, Ruby). Ces bibliothèques encapsulent l'API REST sous-jacente, offrant une expérience plus idiomatique et conviviale pour les développeurs. Elles simplifient les tâches courantes, gèrent l'authentification (souvent automatiquement via les Application Default Credentials), gèrent les nouvelles tentatives et réduisent la quantité de code passe-partout que vous devez écrire. Il s'agit de l'approche recommandée pour la plupart des développements d'applications.

API spécialisées : Pour des tâches spécifiques à hautes performances ou spécialisées, BigQuery propose des API dédiées :

Configuration de votre environnement

Avant de pouvoir commencer à effectuer des appels d'API, vous devez configurer votre environnement local ou serveur.

Conditions préalables :

  1. Compte Google Cloud : Vous avez besoin d'un compte Google Cloud actif.
  2. Projet Google Cloud : Créez un nouveau projet ou sélectionnez-en un existant dans la Google Cloud Console. Notez votre ID de projet.
  3. Activer l'API BigQuery : Assurez-vous que l'API BigQuery est activée pour votre projet. Vous pouvez le faire via la Cloud Console (APIs & Services > Library > Rechercher « BigQuery API » > Activer). Vous devrez peut-être également activer d'autres API comme l'API BigQuery Storage Read ou l'API BigQuery Connection en fonction de votre cas d'utilisation.
  4. Facturation : Assurez-vous que la facturation est activée pour votre projet. Les opérations BigQuery entraînent des coûts basés sur le stockage des données, l'analyse traitée et les insertions en streaming.

Authentification :

Votre application doit s'authentifier auprès de Google Cloud pour prouver son identité et son autorisation d'accéder aux ressources BigQuery. La méthode recommandée pour la plupart des scénarios est Application Default Credentials (ADC).

  1. Créez un compte de service dans la Cloud Console (IAM & Admin > Service Accounts).
  2. Attribuez les rôles BigQuery nécessaires (par exemple, BigQuery Data Editor, BigQuery Job User, BigQuery User) au compte de service.
  3. Téléchargez le fichier de clé du compte de service (format JSON).
  4. Définissez la variable d'environnement GOOGLE_APPLICATION_CREDENTIALS sur le chemin absolu du fichier de clé JSON téléchargé. Les bibliothèques clientes utiliseront automatiquement ce fichier de clé pour l'authentification si la variable d'environnement est définie.

Installation des bibliothèques clientes (exemple Python) :

Nous nous concentrerons sur Python pour nos exemples. Vous pouvez installer les bibliothèques nécessaires à l'aide de pip :

pip install google-cloud-bigquery
# Facultatif : Installez la bibliothèque de l'API de stockage pour des lectures plus rapides
pip install google-cloud-bigquery-storage
# Facultatif : Installez l'intégration pandas et db-dtypes pour une meilleure gestion des types
pip install pandas db-dtypes pyarrow

Assurez-vous que Python est installé (version 3.7+ recommandée pour les dernières fonctionnalités de la bibliothèque).

Utilisation de la bibliothèque cliente BigQuery (exemples Python)

Maintenant, explorons les opérations BigQuery courantes à l'aide de la bibliothèque Python google-cloud-bigquery.

1. Importation et initialisation du client :

Tout d'abord, importez la bibliothèque. Ensuite, créez une instance client. Si ADC est configuré correctement, le client s'authentifiera automatiquement.

from google.cloud import bigquery
import pandas as pd

# Construire un objet client BigQuery.
# Si GOOGLE_APPLICATION_CREDENTIALS est défini, il utilise le compte de service.
# Si gcloud auth application-default login a été exécuté, il utilise ces informations d'identification.
# Si vous êtes sur l'infrastructure GCP, il utilise le compte de service de l'instance.
client = bigquery.Client()

# Vous pouvez spécifier explicitement l'ID du projet si nécessaire,
# sinon il est souvent déduit de l'environnement/des informations d'identification ADC.
# client = bigquery.Client(project='your-project-id')

print("Client créé avec succès.")

2. Exécution de requêtes :

L'opération la plus courante consiste à exécuter des requêtes SQL.

# Définir votre requête SQL
query = """
    SELECT name, SUM(number) as total_people
    FROM `bigquery-public-data.usa_names.usa_1910_2013`
    WHERE state = 'TX'
    GROUP BY name, state
    ORDER BY total_people DESC
    LIMIT 10
"""

# Effectuer une requête API et attendre la fin de la tâche.
query_job = client.query(query)  # Requête API
print(f"Tâche démarrée : {query_job.job_id}")

# Attendre la fin de la tâche et obtenir les résultats.
rows = query_job.result()  # Attend la fin de la tâche.

print("\nTop 10 des noms au TX (1910-2013) :")
for row in rows:
    # Les valeurs de ligne sont accessibles par nom de champ ou par index.
    print(f"Nom : {row.name}, Nombre : {row['total_people']}") # Accès par attribut ou clé

# Convertir les résultats en un DataFrame Pandas
df = rows.to_dataframe()
print("\nRésultats sous forme de DataFrame Pandas :")
print(df.head())
query = """
    SELECT corpus, COUNT(word) as distinct_words
    FROM `bigquery-public-data.samples.shakespeare`
    GROUP BY corpus
    ORDER BY distinct_words DESC;
"""
job_config = bigquery.QueryJobConfig(
    # Utilisez la syntaxe SQL standard pour les requêtes.
    use_legacy_sql=False
)

# Démarrer la requête, en passant la configuration supplémentaire.
query_job = client.query(query, job_config=job_config) # N'attend pas
print(f"Tâche asynchrone démarrée : {query_job.job_id}")

# --- Plus tard dans votre application ---
# Vérifier l'état de la tâche (facultatif)
# from google.cloud.exceptions import NotFound
# try:
#     job = client.get_job(query_job.job_id, location=query_job.location)
#     print(f"État de la tâche {job.job_id} : {job.state}")
#     if job.state == "DONE":
#         if job.error_result:
#             print(f"Échec de la tâche : {job.errors}")
#         else:
#             results = job.result() # Obtenir les résultats
#             print("Résultats récupérés.")
#             # Traiter les résultats...
# except NotFound:
#     print(f"Tâche {query_job.job_id} introuvable.")

# Ou attendez simplement la fin si nécessaire
results = query_job.result() # Cela bloquera jusqu'à ce que la tâche soit terminée
print("Tâche asynchrone terminée.")
for row in results:
    print(f"Corpus : {row.corpus}, Mots distincts : {row.distinct_words}")

from google.cloud.bigquery import ScalarQueryParameter, ArrayQueryParameter, StructQueryParameter, QueryJobConfig

# Exemple : Rechercher les noms commençant par un préfixe spécifique dans un état donné
state_param = "NY"
prefix_param = "Ma"
min_count_param = 1000

query = """
    SELECT name, SUM(number) as total_people
    FROM `bigquery-public-data.usa_names.usa_1910_2013`
    WHERE state = @state_abbr AND name LIKE @name_prefix
    GROUP BY name
    HAVING total_people >= @min_count
    ORDER BY total_people DESC;
"""

job_config = QueryJobConfig(
    query_parameters=[
        ScalarQueryParameter("state_abbr", "STRING", state_param),
        # Utilisez 'val%' pour l'opérateur LIKE
        ScalarQueryParameter("name_prefix", "STRING", f"{prefix_param}%"),
        ScalarQueryParameter("min_count", "INT64", min_count_param),
    ]
)

query_job = client.query(query, job_config=job_config)
print(f"Tâche de requête paramétrée démarrée : {query_job.job_id}")

rows = query_job.result()

print(f"\nNoms commençant par '{prefix_param}' dans {state_param} avec >= {min_count_param} personnes :")
for row in rows:
    print(f"Nom : {row.name}, Nombre : {row.total_people}")

3. Gestion des ensembles de données :

Vous pouvez créer, lister, obtenir des détails et supprimer des ensembles de données.

# Définir l'ID et l'emplacement de l'ensemble de données
project_id = client.project
dataset_id = f"{project_id}.my_new_dataset"
dataset_location = "US" # par exemple, "US", "EU", "asia-northeast1"

# Construire un objet Dataset complet à envoyer à l'API.
dataset = bigquery.Dataset(dataset_id)
dataset.location = dataset_location
dataset.description = "Ensemble de données créé via la bibliothèque cliente Python"

try:
    # Effectuer une requête API pour créer l'ensemble de données.
    dataset = client.create_dataset(dataset, timeout=30)  # Effectuer une requête API.
    print(f"Ensemble de données créé {client.project}.{dataset.dataset_id}")

    # Lister les ensembles de données dans le projet
    print("\nEnsembles de données dans le projet :")
    datasets = list(client.list_datasets()) # Requête API
    if datasets:
        for ds in datasets:
            print(f"\t{ds.dataset_id}")
    else:
        print(f"\tLe projet {client.project} ne contient aucun ensemble de données.")

    # Obtenir les informations de l'ensemble de données
    retrieved_dataset = client.get_dataset(dataset_id) # Requête API
    print(f"\nInformations de l'ensemble de données récupérées pour {dataset_id} :")
    print(f"\tDescription : {retrieved_dataset.description}")
    print(f"\tEmplacement : {retrieved_dataset.location}")

except Exception as e:
    print(f"Erreur lors des opérations sur l'ensemble de données : {e}")

finally:
    # Nettoyage : Supprimer l'ensemble de données
    try:
        client.delete_dataset(
            dataset_id, delete_contents=True, not_found_ok=True
        )  # Requête API
        print(f"\nEnsemble de données '{dataset_id}' supprimé avec succès.")
    except Exception as e:
         print(f"Erreur lors de la suppression de l'ensemble de données {dataset_id} : {e}")

4. Gestion des tables :

Des opérations similaires existent pour les tables : création de tables (définition du schéma), chargement de données, obtention de métadonnées et suppression de tables.

# Utilisation de l'ID d'ensemble de données créé précédemment (assurez-vous qu'il existe ou supprimez l'étape de suppression ci-dessus)
dataset_id_for_table = "my_new_dataset" # Utiliser un ID d'ensemble de données valide
table_id = f"{client.project}.{dataset_id_for_table}.my_new_table"

# Définir le schéma
schema = [
    bigquery.SchemaField("full_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("email", "STRING", mode="NULLABLE"),
]

# Créer la table
table = bigquery.Table(table_id, schema=schema)
try:
    # Assurez-vous que l'ensemble de données existe en premier
    client.create_dataset(dataset_id_for_table, exists_ok=True)

    table = client.create_table(table)  # Requête API
    print(
        f"Table créée {table.project}.{table.dataset_id}.{table.table_id}"
    )

    # --- Chargement des données (Exemple : à partir d'un DataFrame Pandas) ---
    data = {'full_name': ['Alice Smith', 'Bob Johnson'],
            'age': [30, 45],
            'email': ['alice@example.com', None]}
    dataframe = pd.DataFrame(data)

    job_config = bigquery.LoadJobConfig(
        # Spécifier le schéma est recommandé, assure les types appropriés
        schema=schema,
        # Facultatif : écraser les données de la table
        write_disposition="WRITE_TRUNCATE",
        # Ou ajouter : write_disposition="WRITE_APPEND",
    )

    load_job = client.load_table_from_dataframe(
        dataframe, table_id, job_config=job_config
    )  # Requête API
    print(f"Démarrage de la tâche {load_job.job_id} pour charger les données du DataFrame")

    load_job.result()  # Attend la fin de la tâche.
    print("La tâche de chargement du DataFrame est terminée.")

    destination_table = client.get_table(table_id) # Requête API
    print(f"Chargement de {destination_table.num_rows} lignes dans la table {table_id}")

    # --- Chargement des données (Exemple : à partir d'URI Google Cloud Storage) ---
    # Supposons qu'un fichier CSV gs://your-bucket/data.csv existe avec des données compatibles
    # uri = "gs://your-bucket/data.csv"
    # job_config_gcs = bigquery.LoadJobConfig(
    #     schema=schema,
    #     skip_leading_rows=1, # Ignorer la ligne d'en-tête
    #     source_format=bigquery.SourceFormat.CSV,
    #     write_disposition="WRITE_APPEND", # Ajouter aux données existantes
    # )
    # load_job_gcs = client.load_table_from_uri(
    #     uri, table_id, job_config=job_config_gcs
    # )
    # print(f"Démarrage de la tâche {load_job_gcs.job_id} pour charger les données de GCS")
    # load_job_gcs.result()
    # print("La tâche de chargement GCS est terminée.")
    # destination_table = client.get_table(table_id)
    # print(f"Nombre total de lignes après le chargement GCS : {destination_table.num_rows}")

except Exception as e:
    print(f"Erreur lors des opérations sur la table : {e}")

finally:
    # Nettoyage : Supprimer la table
    try:
        client.delete_table(table_id, not_found_ok=True)  # Requête API
        print(f"Table '{table_id}' supprimée avec succès.")
        # Supprimez éventuellement l'ensemble de données à nouveau s'il était uniquement destiné à cet exemple
        # client.delete_dataset(dataset_id_for_table, delete_contents=True, not_found_ok=True)
        # print(f"Ensemble de données '{dataset_id_for_table}' supprimé avec succès.")
    except Exception as e:
        print(f"Erreur lors de la suppression de la table {table_id} : {e}")

5. Utilisation des tâches :

Toutes les opérations asynchrones (requête, chargement, exportation, copie) créent des ressources Job. Vous pouvez lister et gérer ces tâches.

# Lister les tâches récentes
print("\nTâches BigQuery récentes :")
for job in client.list_jobs(max_results=10): # Requête API
    print(f"ID de la tâche : {job.job_id}, Type : {job.job_type}, État : {job.state}, Créé : {job.created}")

# Obtenir une tâche spécifique (remplacez-la par un ID de tâche valide des exécutions précédentes)
# try:
#     job_id_to_get = "..." # Remplacez par un véritable ID de tâche
#     location = "US"      # Remplacez par l'emplacement de la tâche si ce n'est pas par défaut
#     retrieved_job = client.get_job(job_id_to_get, location=location) # Requête API
#     print(f"\nDétails de la tâche {retrieved_job.job_id} :")
#     print(f"\tÉtat : {retrieved_job.state}")
#     if retrieved_job.error_result:
#         print(f"\tErreur : {retrieved_job.errors}")
# except NotFound:
#     print(f"Tâche {job_id_to_get} introuvable.")
# except Exception as e:
#     print(f"Erreur lors de la récupération de la tâche : {e}")

Tirer parti des API spécialisées (concepts et cas d'utilisation)

Bien que la bibliothèque cliente de base couvre de nombreux cas d'utilisation, les API spécialisées offrent des performances ou des fonctionnalités améliorées pour des tâches spécifiques.

1. API BigQuery Storage Read (Python) :

# Nécessite : pip install google-cloud-bigquery-storage pyarrow pandas db-dtypes

from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types, GetDataStreamRequest

# --- Utilisation de Pandas read_gbq (intégration la plus simple) ---
# Cela utilise automatiquement l'API Storage si elle est installée et avantageuse
# table_id_read = "bigquery-public-data.usa_names.usa_1910_2013"
# cols_to_read = ["name", "number", "state"]
# row_filter = "state = 'CA' AND number > 5000"
#
# try:
#      df_storage = pd.read_gbq(
#          table_id_read,
#          project_id=client.project,
#          columns=cols_to_read,
#          row_filter=row_filter,
#          use_bqstorage_api=True, # Demander explicitement l'API Storage
#          progress_bar_type='tqdm' # Barre de progression facultative
#      )
#      print("\nLire les données à l'aide de l'API Storage via pandas.read_gbq :")
#      print(df_storage.head())
#      print(f"Lire {len(df_storage)} lignes.")
# except Exception as e:
#      print(f"Erreur de lecture avec l'API Storage via read_gbq : {e}")


# --- Utilisation manuelle de l'API Storage (plus de contrôle) ---
# bqstorageclient = bigquery_storage_v1.BigQueryReadClient()
# table = f"projects/{project_id}/datasets/{dataset_id}/tables/{table_name}" # Remplacez par les détails de votre table

# requested_session = types.ReadSession(
#     table=table,
#     data_format=types.DataFormat.ARROW,
#     read_options=types.ReadSession.TableReadOptions(
#         selected_fields=["col1", "col2"], # Spécifier les colonnes
#         row_restriction="col1 > 100"     # Spécifier le filtre
#     ),
# )
# parent = f"projects/{project_id}"

# read_session = bqstorageclient.create_read_session(
#     parent=parent,
#     read_session=requested_session,
#     max_stream_count=1, # Demander le nombre de flux parallèles
# )

# stream = read_session.streams[0]
# reader = bqstorageclient.read_rows(stream.name)
# frames = [message.arrow_record_batch for message in reader.messages()]
# if frames:
#     arrow_table = pa.Table.from_batches(frames)
#     df_manual = arrow_table.to_pandas()
#     print("\nLire les données manuellement à l'aide de l'API Storage :")
#     print(df_manual.head())
# else:
#     print("Aucune donnée lue à l'aide de l'API Storage manuelle.")

2. API BigQuery Connection :

3. API Analytics Hub :

4. API BigLake :

Utilisation directe de l'API REST

Bien que les bibliothèques clientes soient généralement préférées, vous pouvez utiliser l'API REST directement si :

Effectuer des requêtes :

Vous utiliserez généralement un client HTTP (comme la bibliothèque curl ou requests de Python). Vous devez :

  1. Obtenir un jeton d'accès OAuth 2.0 (par exemple, en utilisant gcloud auth print-access-token).
  2. Construire l'URL du point de terminaison de l'API correct.
  3. Créer le corps de la requête JSON conformément à la spécification de la méthode de l'API.
  4. Inclure le jeton d'accès dans l'en-tête Authorization: Bearer <token>.
  5. Gérer la réponse HTTP (codes d'état, analyse JSON, messages d'erreur).

Exemple : Exécution d'une requête via REST (jobs.query)

# 1. Obtenir le jeton d'accès
TOKEN=$(gcloud auth print-access-token)

# 2. Définir l'ID du projet et le corps de la requête
PROJECT_ID="your-project-id" # Remplacez par votre ID de projet
REQUEST_BODY=$(cat <<

Explore more

Fathom-R1-14B : Modèle de raisonnement IA avancé d'Inde

Fathom-R1-14B : Modèle de raisonnement IA avancé d'Inde

L'IA en expansion rapide. Fathom-R1-14B (14,8 milliards de paramètres) excelle en raisonnement mathématique et général, conçu par Fractal AI Research.

5 June 2025

Mistral Code : L'assistant de codage le plus personnalisable basé sur l'IA pour les entreprises

Mistral Code : L'assistant de codage le plus personnalisable basé sur l'IA pour les entreprises

Découvrez Mistral Code, l'IA d'aide au code la plus personnalisable pour les entreprises.

5 June 2025

Comment Claude Code transforme le codage de l'IA en 2025

Comment Claude Code transforme le codage de l'IA en 2025

Découvrez Claude Code en 2025 : codage IA révolutionné. Fonctionnalités, démo, et pourquoi il gagne du terrain après Windsurf d'Anthropic. Indispensable !

5 June 2025

Pratiquez le Design-first d'API dans Apidog

Découvrez une manière plus simple de créer et utiliser des API