Skip to content

feat: support BigLakeConfiguration (managed Iceberg tables) #2162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Apr 25, 2025
Merged
16 changes: 16 additions & 0 deletions google/cloud/bigquery/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,3 +387,19 @@ def _generate_next_value_(name, start, count, last_values):
ROUNDING_MODE_UNSPECIFIED = enum.auto()
ROUND_HALF_AWAY_FROM_ZERO = enum.auto()
ROUND_HALF_EVEN = enum.auto()


class BigLakeFileFormat(object):
FILE_FORMAT_UNSPECIFIED = "FILE_FORMAT_UNSPECIFIED"
"""The default unspecified value."""

PARQUET = "PARQUET"
"""Apache Parquet format."""


class BigLakeTableFormat(object):
TABLE_FORMAT_UNSPECIFIED = "TABLE_FORMAT_UNSPECIFIED"
"""The default unspecified value."""

ICEBERG = "ICEBERG"
"""Apache Iceberg format."""
150 changes: 150 additions & 0 deletions google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ class Table(_TableBase):

_PROPERTY_TO_API_FIELD: Dict[str, Any] = {
**_TableBase._PROPERTY_TO_API_FIELD,
"biglake_configuration": "biglakeConfiguration",
"clustering_fields": "clustering",
"created": "creationTime",
"description": "description",
Expand Down Expand Up @@ -431,6 +432,29 @@ def __init__(self, table_ref, schema=None) -> None:

reference = property(_reference_getter)

@property
def biglake_configuration(self):
"""google.cloud.bigquery.table.BigLakeConfiguration: Configuration
for managed tables for Apache Iceberg.

See https://cloud.google.com/bigquery/docs/iceberg-tables for more information.
"""
prop = self._properties.get(
self._PROPERTY_TO_API_FIELD["biglake_configuration"]
)
if prop is not None:
prop = BigLakeConfiguration.from_api_repr(prop)
return prop

@biglake_configuration.setter
def biglake_configuration(self, value):
api_repr = value
if value is not None:
api_repr = value.to_api_repr()
self._properties[
self._PROPERTY_TO_API_FIELD["biglake_configuration"]
] = api_repr

@property
def require_partition_filter(self):
"""bool: If set to true, queries over the partitioned table require a
Expand Down Expand Up @@ -3501,6 +3525,132 @@ def to_api_repr(self) -> Dict[str, Any]:
return resource


class BigLakeConfiguration(object):
"""Configuration for managed tables for Apache Iceberg, formerly
known as BigLake.

Args:
connection_id (Optional[str]):
The connection specifying the credentials to be used to read and write to external
storage, such as Cloud Storage. The connection_id can have the form
``{project}.{location}.{connection_id}`` or
``projects/{project}/locations/{location}/connections/{connection_id}``.
storage_uri (Optional[str]):
The fully qualified location prefix of the external folder where table data is
stored. The '*' wildcard character is not allowed. The URI should be in the
format ``gs://bucket/path_to_table/``.
file_format (Optional[str]):
The file format the table data is stored in. See BigLakeFileFormat for available
values.
table_format (Optional[str]):
The table format the metadata only snapshots are stored in. See BigLakeTableFormat
for available values.
_properties (Optional[dict]):
Private. Used to construct object from API resource.
"""

def __init__(
self,
connection_id: Optional[str] = None,
storage_uri: Optional[str] = None,
file_format: Optional[str] = None,
table_format: Optional[str] = None,
_properties: Optional[dict] = None,
) -> None:
if _properties is None:
_properties = {}
self._properties = _properties
if connection_id is not None:
self.connection_id = connection_id
if storage_uri is not None:
self.storage_uri = storage_uri
if file_format is not None:
self.file_format = file_format
if table_format is not None:
self.table_format = table_format

@property
def connection_id(self) -> Optional[str]:
"""str: The connection specifying the credentials to be used to read and write to external
storage, such as Cloud Storage."""
return self._properties.get("connectionId")

@connection_id.setter
def connection_id(self, value: Optional[str]):
self._properties["connectionId"] = value

@property
def storage_uri(self) -> Optional[str]:
"""str: The fully qualified location prefix of the external folder where table data is
stored."""
return self._properties.get("storageUri")

@storage_uri.setter
def storage_uri(self, value: Optional[str]):
self._properties["storageUri"] = value

@property
def file_format(self) -> Optional[str]:
"""str: The file format the table data is stored in. See BigLakeFileFormat for available
values."""
return self._properties.get("fileFormat")

@file_format.setter
def file_format(self, value: Optional[str]):
self._properties["fileFormat"] = value

@property
def table_format(self) -> Optional[str]:
"""str: The table format the metadata only snapshots are stored in. See BigLakeTableFormat
for available values."""
return self._properties.get("tableFormat")

@table_format.setter
def table_format(self, value: Optional[str]):
self._properties["tableFormat"] = value

def _key(self):
return tuple(sorted(self._properties.items()))

def __eq__(self, other):
if not isinstance(other, BigLakeConfiguration):
return NotImplemented
return self._key() == other._key()

def __ne__(self, other):
return not self == other

def __hash__(self):
return hash(self._key())

def __repr__(self):
key_vals = ["{}={}".format(key, val) for key, val in self._key()]
return "BigLakeConfiguration({})".format(",".join(key_vals))

@classmethod
def from_api_repr(cls, resource: Dict[str, Any]) -> "BigLakeConfiguration":
"""Factory: construct a BigLakeConfiguration given its API representation.

Args:
resource:
BigLakeConfiguration representation returned from the API

Returns:
BigLakeConfiguration parsed from ``resource``.
"""
ref = cls()
ref._properties = resource
return ref

def to_api_repr(self) -> Dict[str, Any]:
"""Construct the API resource representation of this BigLakeConfiguration.

Returns:
BigLakeConfiguration represented as an API resource.
"""
return copy.deepcopy(self._properties)


def _item_to_row(iterator, resource):
"""Convert a JSON row to the native object.

Expand Down
160 changes: 160 additions & 0 deletions tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,12 @@ def _make_resource(self):
"sourceFormat": "CSV",
"csvOptions": {"allowJaggedRows": True, "encoding": "encoding"},
},
"biglakeConfiguration": {
"connectionId": "connection",
"storageUri": "uri",
"fileFormat": "PARQUET",
"tableFormat": "ICEBERG",
},
"labels": {"x": "y"},
}

Expand Down Expand Up @@ -521,6 +527,15 @@ def _verifyResourceProperties(self, table, resource):
else:
self.assertIsNone(table.encryption_configuration)

if "biglakeConfiguration" in resource:
self.assertIsNotNone(table.biglake_configuration)
self.assertEqual(table.biglake_configuration.connection_id, "connection")
self.assertEqual(table.biglake_configuration.storage_uri, "uri")
self.assertEqual(table.biglake_configuration.file_format, "PARQUET")
self.assertEqual(table.biglake_configuration.table_format, "ICEBERG")
else:
self.assertIsNone(table.biglake_configuration)

def test_ctor(self):
dataset = DatasetReference(self.PROJECT, self.DS_ID)
table_ref = dataset.table(self.TABLE_NAME)
Expand Down Expand Up @@ -893,6 +908,60 @@ def test_table_constraints_property_getter(self):
assert isinstance(table_constraints, TableConstraints)
assert table_constraints.primary_key == PrimaryKey(columns=["id"])

def test_biglake_configuration_not_set(self):
dataset = DatasetReference(self.PROJECT, self.DS_ID)
table_ref = dataset.table(self.TABLE_NAME)
table = self._make_one(table_ref)

assert table.biglake_configuration is None

def test_biglake_configuration_set(self):
from google.cloud.bigquery.table import BigLakeConfiguration

dataset = DatasetReference(self.PROJECT, self.DS_ID)
table_ref = dataset.table(self.TABLE_NAME)
table = self._make_one(table_ref)

table._properties["biglakeConfiguration"] = {
"connectionId": "connection",
"storageUri": "uri",
"fileFormat": "PARQUET",
"tableFormat": "ICEBERG",
}

config = table.biglake_configuration

assert isinstance(config, BigLakeConfiguration)
assert config.connection_id == "connection"
assert config.storage_uri == "uri"
assert config.file_format == "PARQUET"
assert config.table_format == "ICEBERG"

def test_biglake_configuration_property_setter(self):
from google.cloud.bigquery.table import BigLakeConfiguration

dataset = DatasetReference(self.PROJECT, self.DS_ID)
table_ref = dataset.table(self.TABLE_NAME)
table = self._make_one(table_ref)

config = BigLakeConfiguration(
connection_id="connection",
storage_uri="uri",
file_format="PARQUET",
table_format="ICEBERG",
)
table.biglake_configuration = config

assert table._properties["biglakeConfiguration"] == {
"connectionId": "connection",
"storageUri": "uri",
"fileFormat": "PARQUET",
"tableFormat": "ICEBERG",
}

table.biglake_configuration = None
assert table.biglake_configuration is None

def test_table_constraints_property_setter(self):
from google.cloud.bigquery.table import (
ColumnReference,
Expand Down Expand Up @@ -2166,6 +2235,97 @@ def test_ctor_full_resource(self):
assert instance.snapshot_time == expected_time


class TestBigLakeConfiguration(unittest.TestCase):
@staticmethod
def _get_target_class():
from google.cloud.bigquery.table import BigLakeConfiguration

return BigLakeConfiguration

@classmethod
def _make_one(cls, *args, **kwargs):
klass = cls._get_target_class()
return klass(*args, **kwargs)

def test_ctor_empty_resource(self):
instance = self._make_one()
self.assertIsNone(instance.connection_id)
self.assertIsNone(instance.storage_uri)
self.assertIsNone(instance.file_format)
self.assertIsNone(instance.table_format)

def test_ctor_kwargs(self):
instance = self._make_one(
connection_id="conn",
storage_uri="uri",
file_format="FILE",
table_format="TABLE",
)
self.assertEqual(instance.connection_id, "conn")
self.assertEqual(instance.storage_uri, "uri")
self.assertEqual(instance.file_format, "FILE")
self.assertEqual(instance.table_format, "TABLE")

def test_ctor_full_resource(self):
resource = {
"connectionId": "conn",
"storageUri": "uri",
"fileFormat": "FILE",
"tableFormat": "TABLE",
}
instance = self._make_one(_properties=resource)
self.assertEqual(instance.connection_id, "conn")
self.assertEqual(instance.storage_uri, "uri")
self.assertEqual(instance.file_format, "FILE")
self.assertEqual(instance.table_format, "TABLE")

def test_to_api_repr(self):
resource = {
"connectionId": "conn",
"storageUri": "uri",
"fileFormat": "FILE",
"tableFormat": "TABLE",
}
instance = self._make_one(_properties=resource)
self.assertEqual(instance.to_api_repr(), resource)

def test_from_api_repr_partial(self):
klass = self._get_target_class()
api_repr = {"fileFormat": "FILE"}
instance = klass.from_api_repr(api_repr)

self.assertIsNone(instance.connection_id)
self.assertIsNone(instance.storage_uri)
self.assertEqual(instance.file_format, "FILE")
self.assertIsNone(instance.table_format)

def test_comparisons(self):
resource = {
"connectionId": "conn",
"storageUri": "uri",
"fileFormat": "FILE",
"tableFormat": "TABLE",
}

first = self._make_one(_properties=resource)
second = self._make_one(_properties=copy.deepcopy(resource))
# Exercise comparator overloads.
# first and second should be equivalent.
self.assertNotEqual(first, resource)
self.assertEqual(first, second)
self.assertEqual(hash(first), hash(second))

# Update second to ensure that first and second are no longer equivalent.
second.connection_id = "foo"
self.assertNotEqual(first, second)
self.assertNotEqual(hash(first), hash(second))

# Update first with the same change, restoring equivalence.
first.connection_id = "foo"
self.assertEqual(first, second)
self.assertEqual(hash(first), hash(second))


class TestCloneDefinition:
@staticmethod
def _get_target_class():
Expand Down