Python

Functions for managing downloads.

read_sql

python
def read_sql(query: str,
             billing_project_id: Optional[str] = None,
             from_file: bool = False,
             reauth: bool = False,
             use_bqstorage_api: bool = False) -> pd.DataFrame

Load data from BigQuery using a query. Just a wrapper around pandas.read_gbq.

Args

  • query: Valid SQL Standard Query to basedosdados.
  • billing_project_id: Project that will be billed. Find your Project ID here.
  • from_file: Uses the credentials from file, located in ~/.basedosdados/credentials/.
  • reauth: Re-authorize Google Cloud Project in case you need to change user or reset configurations.
  • use_bqstorage_api: Use the BigQuery Storage API to download query results quickly, but at an increased cost. More info. To use this API, first enable it in the Cloud Console: Enable API. You must also have the bigquery.readsessions.create permission on the project you are billing queries to.

Returns

  • Query result as a pandas DataFrame.

read_table

python
def read_table(dataset_id: str,
               table_id: str,
               billing_project_id: Optional[str] = None,
               query_project_id: str = "basedosdados",
               limit: Optional[int] = None,
               from_file: bool = False,
               reauth: bool = False,
               use_bqstorage_api: bool = False) -> pd.DataFrame

Load data from BigQuery using dataset_id and table_id.

Args

  • dataset_id: Dataset id available in basedosdados. It should always come with table_id.
  • table_id: Table id available in basedosdados.dataset_id. It should always come with dataset_id.
  • billing_project_id: Project that will be billed. Find your Project ID here.
  • query_project_id: Which project the table lives. You can change this if you want to query different projects.
  • limit: Number of rows to read from table.
  • from_file: Uses the credentials from file, located in ~/.basedosdados/credentials/.
  • reauth: Re-authorize Google Cloud Project in case you need to change user or reset configurations.
  • use_bqstorage_api: Use the BigQuery Storage API to download query results quickly, but at an increased cost. More info. To use this API, first enable it in the Cloud Console: Enable API. You must also have the bigquery.readsessions.create permission on the project you are billing queries to.

Returns

  • Query result as a pandas DataFrame.

download

python
def download(savepath: Union[str, Path],
             query: Optional[str] = None,
             dataset_id: Optional[str] = None,
             table_id: Optional[str] = None,
             billing_project_id: Optional[str] = None,
             query_project_id: str = "basedosdados",
             limit: Optional[int] = None,
             from_file: bool = False,
             reauth: bool = False,
             compression: str = "GZIP") -> None

Download table or query result from basedosdados BigQuery (or other).

  • Using a query:

    download('select * from basedosdados.br_suporte.diretorio_municipios limit 10')

  • Using dataset_id & table_id:

    download(dataset_id='br_suporte', table_id='diretorio_municipios')

You can also add arguments to modify save parameters:

download(dataset_id='br_suporte', table_id='diretorio_municipios', index=False, sep='|')

Args

  • savepath: File path to save the result. Only supports .csv.
  • query: Valid SQL Standard Query to basedosdados. If query is available, dataset_id and table_id are not required.
  • dataset_id: Dataset id available in basedosdados. It should always come with table_id.
  • table_id: Table id available in basedosdados.dataset_id. It should always come with dataset_id.
  • billing_project_id: Project that will be billed. Find your Project ID here.
  • query_project_id: Which project the table lives. You can change this if you want to query different projects.
  • limit: Number of rows.
  • from_file: Uses the credentials from file, located in ~/.basedosdados/credentials/.
  • reauth: Re-authorize Google Cloud Project in case you need to change user or reset configurations.
  • compression: Compression type. Only GZIP is available for now.

Raises

  • Exception: If either table_id, dataset_id or query are empty.

Functions to get metadata from BD's API.

get_datasets

python
def get_datasets(dataset_id: Optional[str] = None,
                 dataset_name: Optional[str] = None,
                 page: int = 1,
                 page_size: int = 10,
                 backend: Optional[Backend] = None) -> list[dict]

Get a list of available datasets, either by dataset_id or dataset_name.

Args

  • dataset_id: Dataset slug in Google BigQuery (GBQ).
  • dataset_name: Dataset name in Base dos Dados metadata.
  • page: Page for pagination.
  • page_size: Page size for pagination.
  • backend: Backend instance, injected automatically.

Returns

  • List of datasets.

get_tables

python
def get_tables(dataset_id: Optional[str] = None,
               table_id: Optional[str] = None,
               table_name: Optional[str] = None,
               page: int = 1,
               page_size: int = 10,
               backend: Optional[Backend] = None) -> list[dict]

Get a list of available tables, either by dataset_id, table_id or table_name.

Args

  • dataset_id: Dataset slug in Google BigQuery (GBQ).
  • table_id: Table slug in Google BigQuery (GBQ).
  • table_name: Table name in Base dos Dados metadata.
  • page: Page for pagination.
  • page_size: Page size for pagination.
  • backend: Backend instance, injected automatically.

Returns

  • List of tables.

get_columns

python
def get_columns(table_id: Optional[str] = None,
                column_id: Optional[str] = None,
                columns_name: Optional[str] = None,
                page: int = 1,
                page_size: int = 10,
                backend: Optional[Backend] = None) -> list[dict]

Get a list of available columns, either by table_id, column_id or column_name.

Args

  • table_id: Table slug in Google BigQuery (GBQ).
  • column_id: Column slug in Google BigQuery (GBQ).
  • column_name: Column name in Base dos Dados metadata.
  • page: Page for pagination.
  • page_size: Page size for pagination.
  • backend: Backend instance, injected automatically.

Returns

  • List of columns.
python
def search(q: Optional[str] = None,
           page: int = 1,
           page_size: int = 10,
           backend: Optional[Backend] = None) -> list[dict]

Search for datasets, querying all available metadata for the term q.

Args

  • q: Search term.
  • page: Page for pagination.
  • page_size: Page size for pagination.
  • backend: Backend instance, injected automatically.

Returns

  • List of datasets and metadata.

Module for manage dataset to the server.

Dataset Objects

python
class Dataset(Base)

Manage datasets in BigQuery.

__init__

python
def __init__(dataset_id: str, **kwargs)

Initializes a new instance of the class with the specified dataset ID.

Args

  • dataset_id: The identifier of the dataset. Hyphens in the ID will be replaced with underscores.
  • **kwargs: Additional keyword arguments to be passed to the superclass initializer.

dataset_config

python
@property
@lru_cache
def dataset_config() -> dict[str, Any]

Dataset config file.

publicize

python
def publicize(mode: str = "all", dataset_is_public: bool = True) -> None

Changes IAM configuration to turn BigQuery dataset public.

Args

  • mode: Which dataset to create [prod|staging|all].
  • dataset_is_public: Control if prod dataset is public or not. By default, staging datasets like dataset_id_staging are not public.

exists

python
def exists(mode: str = "staging") -> bool

Check if dataset exists.

create

python
def create(mode: str = "all",
           if_exists: str = "raise",
           dataset_is_public: bool = True,
           location: Optional[str] = None) -> None

Creates BigQuery datasets given dataset_id.

It can create two datasets:

  • <dataset_id> (mode = prod)
  • <dataset_id>_staging (mode = staging)

If mode is all, it creates both.

Args

  • mode: Which dataset to create [prod|staging|all].
  • if_exists: What to do if dataset exists
    • raise: Raises Conflict exception
    • replace: Drop all tables and replace dataset
    • update: Update dataset description
    • pass: Do nothing
  • dataset_is_public: Control if prod dataset is public or not. By default, staging datasets like dataset_id_staging are not public.
  • location: Location of dataset data. List of possible region names: BigQuery locations

Raises

  • Warning: Dataset already exists and if_exists is set to raise

delete

python
def delete(mode: str = "all") -> None

Deletes dataset in BigQuery. Toggle mode to choose which dataset to delete.

Args

  • mode: Which dataset to delete [prod|staging|all]

update

python
def update(mode: str = "all", location: Optional[str] = None) -> None

Update dataset description. Toggle mode to choose which dataset to update.

Args

  • mode: Which dataset to update [prod|staging|all]
  • location: Location of dataset data. List of possible region names: BigQuery locations

Class for manage tables in Storage and BigQuery.

Table Objects

python
class Table(Base)

Manage tables in Google Cloud Storage and BigQuery.

__init__

python
def __init__(dataset_id: str, table_id: str, **kwargs)

Initializes a new instance of the class with the specified dataset and table identifiers.

Args

  • dataset_id: The identifier of the dataset. Hyphens will be replaced with underscores.
  • table_id: The identifier of the table. Hyphens will be replaced with underscores.
  • **kwargs: Additional keyword arguments to be passed to the superclass initializer.

Attributes

  • table_id: The sanitized table identifier (hyphens replaced with underscores).
  • dataset_id: The sanitized dataset identifier (hyphens replaced with underscores).
  • table_full_name: Dictionary containing fully qualified table names for different environments:
    • 'prod': Production BigQuery table name.
    • 'staging': Staging BigQuery table name.
    • 'all': Deep copy of the table_full_name dictionary.

table_config

python
@property
@lru_cache(256)
def table_config() -> dict[str, Any]

Load table config.

table_exists

python
def table_exists(mode: str) -> bool

Check if table exists in BigQuery.

Args

  • mode: Which dataset to check [prod|staging].

create

python
def create(path: Optional[Union[str, Path]] = None,
           source_format: str = "csv",
           csv_delimiter: str = ",",
           csv_skip_leading_rows: int = 1,
           csv_allow_jagged_rows: bool = False,
           if_table_exists: str = "raise",
           if_storage_data_exists: str = "raise",
           if_dataset_exists: str = "pass",
           dataset_is_public: bool = True,
           location: Optional[str] = None,
           chunk_size: Optional[int] = None,
           biglake_table: bool = False,
           set_biglake_connection_permissions: bool = True) -> None

Creates a BigQuery table in the staging dataset.

If a path is provided, data is automatically saved in storage, and a datasets folder and BigQuery location are created, in addition to creating the table and its configuration files.

The new table is located at <dataset_id>_staging.<table_id> in BigQuery.

Data can be found in Storage at <bucket_name>/staging/<dataset_id>/<table_id>/* and is used to build the table.

The following data types are supported:

  • Comma-Delimited CSV
  • Apache Avro
  • Apache Parquet

Data can also be partitioned following the Hive partitioning scheme <key1>=<value1>/<key2>=<value2>. For example, year=2012/country=BR. The partition is automatically detected by searching for partitions in the table_config.yaml file.

Args

  • path: The path to the file to be uploaded to create the table.
  • source_format: The format of the data source. Only 'csv', 'avro', and 'parquet' are supported. Defaults to 'csv'.
  • csv_delimiter: The separator for fields in a CSV file. The separator can be any ISO-8859-1 single-byte character. Defaults to ','.
  • csv_skip_leading_rows: The number of rows at the top of a CSV file that BigQuery will skip when loading the data. Defaults to 1.
  • csv_allow_jagged_rows: Indicates if BigQuery should allow extra values that are not represented in the table schema. Defaults to False.
  • if_table_exists: Determines what to do if the table already exists:
    • raise: Raises a Conflict exception
    • replace: Replaces the table
    • pass: Does nothing
  • if_storage_data_exists: Determines what to do if the data already exists on your bucket:
    • raise: Raises a Conflict exception
    • replace: Replaces the table
    • pass: Does nothing
  • if_dataset_exists: Determines what to do if the dataset already exists:
    • raise: Raises a Conflict exception
    • replace: Replaces the dataset
    • pass: Does nothing
  • dataset_is_public: Controls if the prod dataset is public or not. By default, staging datasets like dataset_id_staging are not public.
  • location: The location of the dataset data. List of possible region names: BigQuery locations
  • chunk_size: The size of a chunk of data whenever iterating (in bytes). This must be a multiple of 256 KB per the API specification. If not specified, the chunk_size of the blob itself is used. If that is not specified, a default value of 40 MB is used.
  • biglake_table: Sets this as a BigLake table. BigLake tables allow end-users to query from external data (such as GCS) even if they don't have access to the source data. IAM is managed like any other BigQuery native table. See BigLake intro for more on BigLake.
  • set_biglake_connection_permissions: If set to True, attempts to grant the BigLake connection service account access to the table's data in GCS.

update

python
def update(mode: str = "prod",
           custom_schema: Optional[list[dict[str, str]]] = None) -> None

Updates BigQuery schema and description.

Args

  • mode: Table of which table to update [prod].
  • not_found_ok: What to do if table is not found.

publish

python
def publish(if_exists: str = "raise",
            custom_publish_sql: Optional[str] = None,
            custom_schema: Optional[list[dict[str, str]]] = None) -> None

Creates BigQuery table at production dataset.

Table should be located at <dataset_id>.<table_id>.

It creates a view that uses the query from <metadata_path>/<dataset_id>/<table_id>/publish.sql.

Make sure that all columns from the query also exist at <metadata_path>/<dataset_id>/<table_id>/table_config.sql, including the partitions.

Args

  • if_exists: What to do if table exists.
    • raise: Raises Conflict exception
    • replace: Replace table
    • pass: Do nothing

delete

python
def delete(mode: str = "all") -> None

Deletes table in BigQuery.

Args

  • mode: Table of which table to delete [prod|staging].

append

python
def append(filepath: Union[str, Path],
           partitions: Optional[Union[str, dict[str, str]]] = None,
           if_exists: str = "replace",
           chunk_size: Optional[int] = None,
           **upload_args) -> None

Appends new data to existing BigQuery table.

As long as the data has the same schema. It appends the data in the filepath to the existing table.

Args

  • filepath: Where to find the file that you want to upload to create a table with.
  • partitions: Hive structured partition as a string or dict.
    • str: <key>=<value>/<key2>=<value2>
    • dict: dict(key=value, key2=value2)
  • if_exists: What to do if data with same name exists in storage.
    • raise: Raises Conflict exception
    • replace: Replace table
    • pass: Do nothing
  • chunk_size: The size of a chunk of data whenever iterating (in bytes). This must be a multiple of 256 KB per the API specification. If not specified, the chunk_size of the blob itself is used. If that is not specified, a default value of 40 MB is used.

Class for managing the files in Google Cloud Storage.

Storage Objects

python
class Storage(Base)

Manage files on Google Cloud Storage.

__init__

python
def __init__(dataset_id: str, table_id: str, **kwargs)

Initializes the storage upload class with the specified dataset and table identifiers.

Args

  • dataset_id: The identifier of the dataset. Hyphens will be replaced with underscores.
  • table_id: The identifier of the table. Hyphens will be replaced with underscores.
  • **kwargs: Additional keyword arguments to pass to the superclass initializer.

Attributes

  • bucket: The storage bucket object used for staging.
  • dataset_id: The normalized dataset identifier.
  • table_id: The normalized table identifier.

init

python
def init(replace: bool = False, very_sure: bool = False) -> None

Initialize bucket and folders.

Folder should be:

  • raw : contains raw data
  • staging : preprocessed data ready to upload to BigQuery

Args

  • replace: Whether to replace if bucket already exists.
  • very_sure: Are you aware that everything will be erased if you replace the bucket?

Raises

  • Warning: very_sure argument is still False.

upload

python
def upload(path: Union[str, Path],
           mode: str = "all",
           partitions: Optional[Union[str, dict[str, str]]] = None,
           if_exists: str = "raise",
           chunk_size: Optional[int] = None,
           **upload_args) -> None

Upload to storage at <bucket_name>/<mode>/<dataset_id>/<table_id>.

You can:

  • Add a single file: path = <file_path>
  • Add a folder: path = <folder_path>. The folder should only contain files, not folders.
  • Add partitioned files: path = <folder_path>. This folder must follow the hive partitioning scheme, e.g. <table_id>/<key>=<value>/<key2>=<value2>/<partition>.csv

Remember all files must follow a single schema. Otherwise, things might fail in the future.

Modes:

  • raw : raw files from datasource
  • staging : pre-treated files ready to upload to BigQuery
  • header: header of the tables
  • auxiliary_files: auxiliary files from each table
  • architecture: architecture sheet of the tables
  • all: if no treatment is needed, use all.

Args

  • path: Where to find the file or folder to upload to storage.
  • mode: Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture|all]
  • partitions: If adding a single file, use this to add it to a specific partition. Can be a string or dict.
  • if_exists: What to do if data exists.
    • raise: Raises Conflict exception
    • replace: Replace table
    • pass: Do nothing
  • chunk_size: The size of a chunk of data when iterating (in bytes).
  • upload_args: Extra arguments accepted by google.cloud.storage.blob.Blob.upload_from_file

download

python
def download(filename: str = "*",
             savepath: Union[Path, str] = Path("."),
             partitions: Optional[Union[str, dict[str, str]]] = None,
             mode: str = "staging",
             if_not_exists: str = "raise") -> None

Download files from Google Storage from path mode/dataset_id/table_id/partitions/filename and replicate folder hierarchy on save.

Modes:

  • raw : raw files from datasource
  • staging : pre-treated files ready to upload to BigQuery
  • header: header of the tables
  • auxiliary_files: auxiliary files from each table
  • architecture: architecture sheet of the tables

You can use the partitions argument to choose files from a partition.

Args

  • filename: Specify which file to download. If "*", downloads all files within the bucket folder. Defaults to "*".
  • savepath: Where to save the data on your computer. Must be a path to a directory.
  • partitions: If downloading a single file, use this to specify the partition path from which to download. Can be a string <key>=<value>/<key2>=<value2> or dict dict(key=value, key2=value2).
  • mode: Folder of which dataset to update. [raw|staging|header|auxiliary_files|architecture]
  • if_not_exists: What to do if data not found.
    • raise: Raises FileNotFoundError.
    • pass: Do nothing and exit the function.

Raises

  • FileNotFoundError: If the given path <mode>/<dataset_id>/<table_id>/<partitions>/<filename> could not be found or there are no files to download.

delete_file

python
def delete_file(filename: str,
                mode: str,
                partitions: Optional[Union[str, dict[str, str]]] = None,
                not_found_ok: bool = False) -> None

Delete file from path <bucket_name>/<mode>/<dataset_id>/<table_id>/<partitions>/<filename>.

Args

  • filename: Name of the file to be deleted.
  • mode: Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture|all]
  • partitions: Hive structured partition as a string <key>=<value>/<key2>=<value2> or dict dict(key=value, key2=value2).
  • not_found_ok: What to do if file not found.

delete_table

python
def delete_table(mode: str = "staging",
                 bucket_name: Optional[str] = None,
                 not_found_ok: bool = False) -> None

Delete a table from storage, sends request in batches.

Args

  • mode: Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture]
  • bucket_name: The bucket name from which to delete the table. If None, defaults to the bucket initialized when instantiating the Storage object.
  • not_found_ok: What to do if table not found.

Raises

  • FileNotFoundError: If the requested table could not be found.

copy_table

python
def copy_table(source_bucket_name: str = "basedosdados",
               destination_bucket_name: Optional[str] = None,
               mode: str = "staging",
               new_table_id: Optional[str] = None) -> None

Copy table from a source bucket to your bucket, sends request in batches.

Args

  • source_bucket_name: The bucket name from which to copy data. You can change it to copy from another external bucket.
  • destination_bucket_name: The bucket name where data will be copied to. If None, defaults to the bucket initialized when instantiating the Storage object. You can check it with the Storage().bucket property.
  • mode: Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture]
  • new_table_id: New table id to be copied to. If None, defaults to the table id initialized when instantiating the Storage object.

Base dos Dados

® 2025 Base dos Dados

Termos de usoPolítica de privacidade