Functions for managing downloads.
def read_sql(query: str,
billing_project_id: Optional[str] = None,
from_file: bool = False,
reauth: bool = False,
use_bqstorage_api: bool = False) -> pd.DataFrame
Load data from BigQuery using a query. Just a wrapper around
pandas.read_gbq
.
Args
query
: Valid SQL Standard Query to basedosdados.billing_project_id
: Project that will be billed. Find your Project ID
here.from_file
: Uses the credentials from file, located in
~/.basedosdados/credentials/
.reauth
: Re-authorize Google Cloud Project in case you need to change
user or reset configurations.use_bqstorage_api
: Use the BigQuery Storage API to download query
results quickly, but at an increased cost. More info.
To use this API, first enable it in the Cloud Console:
Enable API.
You must also have the bigquery.readsessions.create
permission on
the project you are billing queries to.Returns
def read_table(dataset_id: str,
table_id: str,
billing_project_id: Optional[str] = None,
query_project_id: str = "basedosdados",
limit: Optional[int] = None,
from_file: bool = False,
reauth: bool = False,
use_bqstorage_api: bool = False) -> pd.DataFrame
Load data from BigQuery using dataset_id
and table_id
.
Args
dataset_id
: Dataset id available in basedosdados. It should always come
with table_id
.table_id
: Table id available in basedosdados.dataset_id
. It should
always come with dataset_id
.billing_project_id
: Project that will be billed. Find your Project ID
here.query_project_id
: Which project the table lives. You can change this if
you want to query different projects.limit
: Number of rows to read from table.from_file
: Uses the credentials from file, located in
~/.basedosdados/credentials/
.reauth
: Re-authorize Google Cloud Project in case you need to change
user or reset configurations.use_bqstorage_api
: Use the BigQuery Storage API to download query
results quickly, but at an increased cost. More info.
To use this API, first enable it in the Cloud Console:
Enable API.
You must also have the bigquery.readsessions.create
permission on
the project you are billing queries to.Returns
def download(savepath: Union[str, Path],
query: Optional[str] = None,
dataset_id: Optional[str] = None,
table_id: Optional[str] = None,
billing_project_id: Optional[str] = None,
query_project_id: str = "basedosdados",
limit: Optional[int] = None,
from_file: bool = False,
reauth: bool = False,
compression: str = "GZIP") -> None
Download table or query result from basedosdados BigQuery (or other).
Using a query:
download('select * from basedosdados.br_suporte.diretorio_municipios limit 10')
Using dataset_id & table_id:
download(dataset_id='br_suporte', table_id='diretorio_municipios')
You can also add arguments to modify save parameters:
download(dataset_id='br_suporte', table_id='diretorio_municipios', index=False, sep='|')
Args
savepath
: File path to save the result. Only supports .csv
.query
: Valid SQL Standard Query to basedosdados. If query is available,
dataset_id
and table_id
are not required.dataset_id
: Dataset id available in basedosdados. It should always come
with table_id
.table_id
: Table id available in basedosdados.dataset_id
. It should
always come with dataset_id
.billing_project_id
: Project that will be billed. Find your Project ID
here.query_project_id
: Which project the table lives. You can change this if
you want to query different projects.limit
: Number of rows.from_file
: Uses the credentials from file, located in
~/.basedosdados/credentials/
.reauth
: Re-authorize Google Cloud Project in case you need to change
user or reset configurations.compression
: Compression type. Only GZIP
is available for now.Raises
Exception
: If either table_id
, dataset_id
or query
are empty.Functions to get metadata from BD's API.
def get_datasets(dataset_id: Optional[str] = None,
dataset_name: Optional[str] = None,
page: int = 1,
page_size: int = 10,
backend: Optional[Backend] = None) -> list[dict]
Get a list of available datasets, either by dataset_id
or dataset_name
.
Args
dataset_id
: Dataset slug in Google BigQuery (GBQ).dataset_name
: Dataset name in Base dos Dados metadata.page
: Page for pagination.page_size
: Page size for pagination.backend
: Backend instance, injected automatically.Returns
def get_tables(dataset_id: Optional[str] = None,
table_id: Optional[str] = None,
table_name: Optional[str] = None,
page: int = 1,
page_size: int = 10,
backend: Optional[Backend] = None) -> list[dict]
Get a list of available tables, either by dataset_id
, table_id
or
table_name
.
Args
dataset_id
: Dataset slug in Google BigQuery (GBQ).table_id
: Table slug in Google BigQuery (GBQ).table_name
: Table name in Base dos Dados metadata.page
: Page for pagination.page_size
: Page size for pagination.backend
: Backend instance, injected automatically.Returns
def get_columns(table_id: Optional[str] = None,
column_id: Optional[str] = None,
columns_name: Optional[str] = None,
page: int = 1,
page_size: int = 10,
backend: Optional[Backend] = None) -> list[dict]
Get a list of available columns, either by table_id
, column_id
or
column_name
.
Args
table_id
: Table slug in Google BigQuery (GBQ).column_id
: Column slug in Google BigQuery (GBQ).column_name
: Column name in Base dos Dados metadata.page
: Page for pagination.page_size
: Page size for pagination.backend
: Backend instance, injected automatically.Returns
def search(q: Optional[str] = None,
page: int = 1,
page_size: int = 10,
backend: Optional[Backend] = None) -> list[dict]
Search for datasets, querying all available metadata for the term q
.
Args
q
: Search term.page
: Page for pagination.page_size
: Page size for pagination.backend
: Backend instance, injected automatically.Returns
Module for manage dataset to the server.
class Dataset(Base)
Manage datasets in BigQuery.
def __init__(dataset_id: str, **kwargs)
Initializes a new instance of the class with the specified dataset ID.
Args
dataset_id
: The identifier of the dataset. Hyphens in the ID will be replaced with underscores.**kwargs
: Additional keyword arguments to be passed to the superclass initializer.@property
@lru_cache
def dataset_config() -> dict[str, Any]
Dataset config file.
def publicize(mode: str = "all", dataset_is_public: bool = True) -> None
Changes IAM configuration to turn BigQuery dataset public.
Args
mode
: Which dataset to create [prod
|staging
|all
].dataset_is_public
: Control if prod dataset is public or not. By
default, staging datasets like dataset_id_staging
are not
public.def exists(mode: str = "staging") -> bool
Check if dataset exists.
def create(mode: str = "all",
if_exists: str = "raise",
dataset_is_public: bool = True,
location: Optional[str] = None) -> None
Creates BigQuery datasets given dataset_id
.
It can create two datasets:
<dataset_id>
(mode = prod
)<dataset_id>_staging
(mode = staging
)If mode
is all
, it creates both.
Args
mode
: Which dataset to create [prod
|staging
|all
].if_exists
: What to do if dataset exists
raise
: Raises Conflict exceptionreplace
: Drop all tables and replace datasetupdate
: Update dataset descriptionpass
: Do nothingdataset_is_public
: Control if prod dataset is public or not. By
default, staging datasets like dataset_id_staging
are not
public.location
: Location of dataset data. List of possible region names:
BigQuery locationsRaises
Warning
: Dataset already exists and if_exists is set to raise
def delete(mode: str = "all") -> None
Deletes dataset in BigQuery. Toggle mode to choose which dataset to delete.
Args
mode
: Which dataset to delete [prod
|staging
|all
]def update(mode: str = "all", location: Optional[str] = None) -> None
Update dataset description. Toggle mode to choose which dataset to update.
Args
mode
: Which dataset to update [prod
|staging
|all
]location
: Location of dataset data. List of possible region names:
BigQuery locationsClass for manage tables in Storage and BigQuery.
class Table(Base)
Manage tables in Google Cloud Storage and BigQuery.
def __init__(dataset_id: str, table_id: str, **kwargs)
Initializes a new instance of the class with the specified dataset and table identifiers.
Args
dataset_id
: The identifier of the dataset. Hyphens will be replaced with underscores.table_id
: The identifier of the table. Hyphens will be replaced with underscores.**kwargs
: Additional keyword arguments to be passed to the superclass initializer.Attributes
table_id
: The sanitized table identifier (hyphens replaced with underscores).dataset_id
: The sanitized dataset identifier (hyphens replaced with underscores).table_full_name
: Dictionary containing fully qualified table names for different environments:
@property
@lru_cache(256)
def table_config() -> dict[str, Any]
Load table config.
def table_exists(mode: str) -> bool
Check if table exists in BigQuery.
Args
mode
: Which dataset to check [prod
|staging
].def create(path: Optional[Union[str, Path]] = None,
source_format: str = "csv",
csv_delimiter: str = ",",
csv_skip_leading_rows: int = 1,
csv_allow_jagged_rows: bool = False,
if_table_exists: str = "raise",
if_storage_data_exists: str = "raise",
if_dataset_exists: str = "pass",
dataset_is_public: bool = True,
location: Optional[str] = None,
chunk_size: Optional[int] = None,
biglake_table: bool = False,
set_biglake_connection_permissions: bool = True) -> None
Creates a BigQuery table in the staging dataset.
If a path is provided, data is automatically saved in storage, and a datasets folder and BigQuery location are created, in addition to creating the table and its configuration files.
The new table is located at <dataset_id>_staging.<table_id>
in
BigQuery.
Data can be found in Storage at
<bucket_name>/staging/<dataset_id>/<table_id>/*
and is used to build
the table.
The following data types are supported:
Data can also be partitioned following the Hive partitioning scheme
<key1>=<value1>/<key2>=<value2>
. For example,
year=2012/country=BR
. The partition is automatically detected by
searching for partitions
in the table_config.yaml
file.
Args
path
: The path to the file to be uploaded to create the table.source_format
: The format of the data source. Only 'csv', 'avro',
and 'parquet' are supported. Defaults to 'csv'.csv_delimiter
: The separator for fields in a CSV file. The
separator can be any ISO-8859-1 single-byte character. Defaults
to ','.csv_skip_leading_rows
: The number of rows at the top of a CSV file
that BigQuery will skip when loading the data. Defaults to 1.csv_allow_jagged_rows
: Indicates if BigQuery should allow extra
values that are not represented in the table schema. Defaults to
False.if_table_exists
: Determines what to do if the table already exists:
raise
: Raises a Conflict exceptionreplace
: Replaces the tablepass
: Does nothingif_storage_data_exists
: Determines what to do if the data already
exists on your bucket:
raise
: Raises a Conflict exceptionreplace
: Replaces the tablepass
: Does nothingif_dataset_exists
: Determines what to do if the dataset already
exists:
raise
: Raises a Conflict exceptionreplace
: Replaces the datasetpass
: Does nothingdataset_is_public
: Controls if the prod dataset is public or not. By
default, staging datasets like dataset_id_staging
are not
public.location
: The location of the dataset data. List of possible region
names: BigQuery locationschunk_size
: The size of a chunk of data whenever iterating (in
bytes). This must be a multiple of 256 KB per the API
specification. If not specified, the chunk_size of the blob
itself is used. If that is not specified, a default value of 40
MB is used.biglake_table
: Sets this as a BigLake table. BigLake tables allow
end-users to query from external data (such as GCS) even if they
don't have access to the source data. IAM is managed like any
other BigQuery native table. See
BigLake intro
for more on BigLake.set_biglake_connection_permissions
: If set to True
, attempts to
grant the BigLake connection service account access to the
table's data in GCS.def update(mode: str = "prod",
custom_schema: Optional[list[dict[str, str]]] = None) -> None
Updates BigQuery schema and description.
Args
mode
: Table of which table to update [prod
].not_found_ok
: What to do if table is not found.def publish(if_exists: str = "raise",
custom_publish_sql: Optional[str] = None,
custom_schema: Optional[list[dict[str, str]]] = None) -> None
Creates BigQuery table at production dataset.
Table should be located at <dataset_id>.<table_id>
.
It creates a view that uses the query from
<metadata_path>/<dataset_id>/<table_id>/publish.sql
.
Make sure that all columns from the query also exist at
<metadata_path>/<dataset_id>/<table_id>/table_config.sql
, including
the partitions.
Args
if_exists
: What to do if table exists.
raise
: Raises Conflict exceptionreplace
: Replace tablepass
: Do nothingdef delete(mode: str = "all") -> None
Deletes table in BigQuery.
Args
mode
: Table of which table to delete [prod
|staging
].def append(filepath: Union[str, Path],
partitions: Optional[Union[str, dict[str, str]]] = None,
if_exists: str = "replace",
chunk_size: Optional[int] = None,
**upload_args) -> None
Appends new data to existing BigQuery table.
As long as the data has the same schema. It appends the data in the filepath to the existing table.
Args
filepath
: Where to find the file that you want to upload to create a
table with.partitions
: Hive structured partition as a string or dict.
<key>=<value>/<key2>=<value2>
dict(key=value, key2=value2)
if_exists
: What to do if data with same name exists in storage.
raise
: Raises Conflict exceptionreplace
: Replace tablepass
: Do nothingchunk_size
: The size of a chunk of data whenever iterating (in
bytes). This must be a multiple of 256 KB per the API
specification. If not specified, the chunk_size of the blob
itself is used. If that is not specified, a default value of 40
MB is used.Class for managing the files in Google Cloud Storage.
class Storage(Base)
Manage files on Google Cloud Storage.
def __init__(dataset_id: str, table_id: str, **kwargs)
Initializes the storage upload class with the specified dataset and table identifiers.
Args
dataset_id
: The identifier of the dataset. Hyphens will be replaced with underscores.table_id
: The identifier of the table. Hyphens will be replaced with underscores.**kwargs
: Additional keyword arguments to pass to the superclass initializer.Attributes
bucket
: The storage bucket object used for staging.dataset_id
: The normalized dataset identifier.table_id
: The normalized table identifier.def init(replace: bool = False, very_sure: bool = False) -> None
Initialize bucket and folders.
Folder should be:
raw
: contains raw datastaging
: preprocessed data ready to upload to BigQueryArgs
replace
: Whether to replace if bucket already exists.very_sure
: Are you aware that everything will be erased if you
replace the bucket?Raises
Warning
: very_sure
argument is still False.def upload(path: Union[str, Path],
mode: str = "all",
partitions: Optional[Union[str, dict[str, str]]] = None,
if_exists: str = "raise",
chunk_size: Optional[int] = None,
**upload_args) -> None
Upload to storage at <bucket_name>/<mode>/<dataset_id>/<table_id>
.
You can:
path = <file_path>
path = <folder_path>
. The folder should only contain
files, not folders.path = <folder_path>
. This folder must follow
the hive partitioning scheme, e.g.
<table_id>/<key>=<value>/<key2>=<value2>/<partition>.csv
Remember all files must follow a single schema. Otherwise, things might fail in the future.
Modes:
raw
: raw files from datasourcestaging
: pre-treated files ready to upload to BigQueryheader
: header of the tablesauxiliary_files
: auxiliary files from each tablearchitecture
: architecture sheet of the tablesall
: if no treatment is needed, use all
.Args
path
: Where to find the file or folder to upload to storage.mode
: Folder of which dataset to update
[raw
|staging
|header
|auxiliary_files
|architecture
|all
]partitions
: If adding a single file, use this to add it to a
specific partition. Can be a string or dict.if_exists
: What to do if data exists.
raise
: Raises Conflict exceptionreplace
: Replace tablepass
: Do nothingchunk_size
: The size of a chunk of data when iterating (in bytes).upload_args
: Extra arguments accepted by
google.cloud.storage.blob.Blob.upload_from_file
def download(filename: str = "*",
savepath: Union[Path, str] = Path("."),
partitions: Optional[Union[str, dict[str, str]]] = None,
mode: str = "staging",
if_not_exists: str = "raise") -> None
Download files from Google Storage from path
mode/dataset_id/table_id/partitions/filename
and replicate folder
hierarchy on save.
Modes:
raw
: raw files from datasourcestaging
: pre-treated files ready to upload to BigQueryheader
: header of the tablesauxiliary_files
: auxiliary files from each tablearchitecture
: architecture sheet of the tablesYou can use the partitions
argument to choose files from a partition.
Args
filename
: Specify which file to download. If "*"
, downloads all
files within the bucket folder. Defaults to "*"
.savepath
: Where to save the data on your computer. Must be a path to
a directory.partitions
: If downloading a single file, use this to specify the
partition path from which to download. Can be a string <key>=<value>/<key2>=<value2>
or dict dict(key=value, key2=value2)
.mode
: Folder of which dataset to update.
[raw
|staging
|header
|auxiliary_files
|architecture
]if_not_exists
: What to do if data not found.
raise
: Raises FileNotFoundError.pass
: Do nothing and exit the function.Raises
FileNotFoundError
: If the given path <mode>/<dataset_id>/<table_id>/<partitions>/<filename>
could not be found or there are
no files to download.def delete_file(filename: str,
mode: str,
partitions: Optional[Union[str, dict[str, str]]] = None,
not_found_ok: bool = False) -> None
Delete file from path <bucket_name>/<mode>/<dataset_id>/<table_id>/<partitions>/<filename>
.
Args
filename
: Name of the file to be deleted.mode
: Folder of which dataset to update
[raw
|staging
|header
|auxiliary_files
|architecture
|all
]partitions
: Hive structured partition as a string <key>=<value>/<key2>=<value2>
or dict dict(key=value, key2=value2)
.not_found_ok
: What to do if file not found.def delete_table(mode: str = "staging",
bucket_name: Optional[str] = None,
not_found_ok: bool = False) -> None
Delete a table from storage, sends request in batches.
Args
mode
: Folder of which dataset to update
[raw
|staging
|header
|auxiliary_files
|architecture
]bucket_name
: The bucket name from which to delete the table. If
None, defaults to the bucket initialized when instantiating the
Storage object.not_found_ok
: What to do if table not found.Raises
FileNotFoundError
: If the requested table could not be found.def copy_table(source_bucket_name: str = "basedosdados",
destination_bucket_name: Optional[str] = None,
mode: str = "staging",
new_table_id: Optional[str] = None) -> None
Copy table from a source bucket to your bucket, sends request in batches.
Args
source_bucket_name
: The bucket name from which to copy data. You can
change it to copy from another external bucket.destination_bucket_name
: The bucket name where data will be copied
to. If None, defaults to the bucket initialized when
instantiating the Storage object. You can check it with the
Storage().bucket
property.mode
: Folder of which dataset to update
[raw
|staging
|header
|auxiliary_files
|architecture
]new_table_id
: New table id to be copied to. If None, defaults to the
table id initialized when instantiating the Storage object.Base dos Dados