Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `datacontract test` field type check now compares the full structured type tree for `object` and `array` logical types.
- Unknown and unsupported types are silently ignored rather than failing the check. Specifically the `map` type is not supported until ODCS version v3.2.0 and is also ignored.

### Added
- The `test` and `import` commands support programmatically overriding Databricks configuration, otherwise passed as env variables.


## [1.0.4] - 2026-06-22
Expand Down
33 changes: 33 additions & 0 deletions datacontract/configuration/source_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import os

from pydantic import BaseModel


class SourceConfig(BaseModel):
databricks: "DatabricksSourceConfig | None" = None

def databricks_config(self) -> "DatabricksSourceConfig":
return (self.databricks or DatabricksSourceConfig()).resolve()


class DatabricksSourceConfig(BaseModel):
server_hostname: str | None = None
http_path: str | None = None
token: str | None = None
client_id: str | None = None
client_secret: str | None = None
profile: str | None = None
auth_type: str | None = None

def resolve(self) -> "DatabricksSourceConfig":
"""Fill missing values from env vars."""
env = os.getenv
return DatabricksSourceConfig(
server_hostname=self.server_hostname or env("DATACONTRACT_DATABRICKS_SERVER_HOSTNAME"),
http_path=self.http_path or env("DATACONTRACT_DATABRICKS_HTTP_PATH"),
token=self.token or env("DATACONTRACT_DATABRICKS_TOKEN"),
client_id=self.client_id or env("DATACONTRACT_DATABRICKS_CLIENT_ID"),
client_secret=self.client_secret or env("DATACONTRACT_DATABRICKS_CLIENT_SECRET"),
profile=self.profile or env("DATACONTRACT_DATABRICKS_PROFILE"),
auth_type=self.auth_type or env("DATACONTRACT_DATABRICKS_AUTH_TYPE"),
)
5 changes: 5 additions & 0 deletions datacontract/data_contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from open_data_contract_standard.model import OpenDataContractStandard, Team

from datacontract.configuration.source_config import SourceConfig

if typing.TYPE_CHECKING:
from duckdb.duckdb import DuckDBPyConnection
from pyspark.sql import SparkSession
Expand Down Expand Up @@ -39,6 +41,7 @@ def __init__(
check_categories: set[str] | None = None,
fastapi_url: str = None,
include_failed_samples: bool = False,
source_config: SourceConfig | None = None,
):
self._data_contract_file = data_contract_file
self._data_contract_str = data_contract_str
Expand All @@ -56,6 +59,7 @@ def __init__(
self._check_categories = check_categories
self._fastapi_url = fastapi_url
self._include_failed_samples = include_failed_samples
self.source_config = source_config

@classmethod
def init(cls, template: typing.Optional[str], schema: typing.Optional[str] = None) -> OpenDataContractStandard:
Expand Down Expand Up @@ -147,6 +151,7 @@ def test(self) -> Run:
schema_name=self._schema_name,
check_categories=self._check_categories,
include_failed_samples=self._include_failed_samples,
source_config=self.source_config,
)

except DataContractException as e:
Expand Down
3 changes: 3 additions & 0 deletions datacontract/engines/data_contract_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import requests
from open_data_contract_standard.model import OpenDataContractStandard, Server

from datacontract.configuration.source_config import SourceConfig
from datacontract.engines.checks.create_checks import create_checks

if typing.TYPE_CHECKING:
Expand All @@ -31,6 +32,7 @@ def execute_data_contract_test(
schema_name: str = "all",
check_categories: set[str] | None = None,
include_failed_samples: bool = False,
source_config: SourceConfig | None = None,
):
if data_contract.schema_ is None or len(data_contract.schema_) == 0:
raise DataContractException(
Expand Down Expand Up @@ -88,6 +90,7 @@ def execute_data_contract_test(
duckdb_connection,
schema_name=schema_name,
include_failed_samples=include_failed_samples,
source_config=source_config,
)


Expand Down
47 changes: 27 additions & 20 deletions datacontract/engines/ibis/connections/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from open_data_contract_standard.model import OpenDataContractStandard, Server

from datacontract.configuration.source_config import SourceConfig
from datacontract.engines.ibis.connections.duckdb_connection import get_duckdb_connection
from datacontract.model.exceptions import DataContractException, require_env
from datacontract.model.run import Check, ResultEnum, Run
Expand Down Expand Up @@ -50,6 +51,7 @@ def connect_ibis(
spark: "SparkSession" = None,
duckdb_connection=None,
schema_name: str = "all",
source_config: SourceConfig | None = None,
) -> "ibis.BaseBackend | None":
"""Return a connected ibis backend, or ``None`` if the server is unsupported.

Expand Down Expand Up @@ -91,7 +93,7 @@ def connect_ibis(
if database_name:
spark.sql(f"USE {database_name}")
return ibis.pyspark.connect(session=spark)
return _connect_databricks(ibis, server, run)
return _connect_databricks(ibis, server, run, source_config=source_config)

if server_type == "postgres":
return ibis.postgres.connect(
Expand Down Expand Up @@ -208,7 +210,12 @@ def connect_ibis(
return None


def _connect_databricks(ibis, server: Server, run: Run):
def _connect_databricks(
ibis,
server: Server,
run: Run,
source_config: SourceConfig | None = None,
):
"""Connect to Databricks SQL directly, selecting the auth method from env vars.

Auth is resolved in priority order, so an existing token-based setup keeps
Expand All @@ -226,40 +233,40 @@ def _connect_databricks(ibis, server: Server, run: Run):
The OAuth credential providers build their SDK ``Config`` lazily, so token
exchange happens when the connection is opened rather than while reading env.
"""
host = server.host or require_env("DATACONTRACT_DATABRICKS_SERVER_HOSTNAME", server_type="databricks")
config = (source_config or SourceConfig()).databricks_config()
host = (
server.host
or config.server_hostname
or require_env("DATACONTRACT_DATABRICKS_SERVER_HOSTNAME", server_type="databricks")
)

kwargs = dict(
server_hostname=host,
http_path=os.getenv("DATACONTRACT_DATABRICKS_HTTP_PATH"),
http_path=config.http_path,
catalog=server.catalog,
schema=server.schema_,
)

token = os.getenv("DATACONTRACT_DATABRICKS_TOKEN")
client_id = os.getenv("DATACONTRACT_DATABRICKS_CLIENT_ID")
client_secret = os.getenv("DATACONTRACT_DATABRICKS_CLIENT_SECRET")
profile = os.getenv("DATACONTRACT_DATABRICKS_PROFILE")
auth_type = os.getenv("DATACONTRACT_DATABRICKS_AUTH_TYPE")

if token:
if config.token:
run.log_info("Connecting to databricks with a personal access token")
return ibis.databricks.connect(access_token=token, **kwargs)
return ibis.databricks.connect(access_token=config.token, **kwargs)

if client_id and client_secret:
if config.client_id and config.client_secret:
run.log_info("Connecting to databricks with an OAuth service principal (M2M)")
sdk_host = host if host.startswith("http") else f"https://{host}"
kwargs["credentials_provider"] = _databricks_credentials_provider(
host=sdk_host, client_id=client_id, client_secret=client_secret
host=sdk_host, client_id=config.client_id, client_secret=config.client_secret
)
return ibis.databricks.connect(**kwargs)

if profile:
run.log_info(f"Connecting to databricks with config profile '{profile}'")
kwargs["credentials_provider"] = _databricks_credentials_provider(profile=profile)
if config.profile:
run.log_info(f"Connecting to databricks with config profile '{config.profile}'")
kwargs["credentials_provider"] = _databricks_credentials_provider(profile=config.profile)
return ibis.databricks.connect(**kwargs)

if auth_type:
run.log_info(f"Connecting to databricks with auth_type '{auth_type}'")
return ibis.databricks.connect(auth_type=auth_type, **kwargs)
if config.auth_type:
run.log_info(f"Connecting to databricks with auth_type '{config.auth_type}'")
return ibis.databricks.connect(auth_type=config.auth_type, **kwargs)

# Nothing configured: fail with the same clear message as before.
token = require_env("DATACONTRACT_DATABRICKS_TOKEN", server_type="databricks")
Expand Down
4 changes: 3 additions & 1 deletion datacontract/engines/ibis/ibis_check_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from open_data_contract_standard.model import OpenDataContractStandard, Server

from datacontract.configuration.source_config import SourceConfig
from datacontract.engines.checks.check_spec import CheckSpec, MetricType
from datacontract.engines.checks.type_normalize import schema_property_matches, schema_property_mismatch_reason
from datacontract.engines.ibis.connections.connect import connect_ibis
Expand Down Expand Up @@ -78,6 +79,7 @@ def execute_ibis_checks(
duckdb_connection=None,
schema_name: str = "all",
include_failed_samples: bool = False,
source_config: SourceConfig | None = None,
):
if data_contract is None:
run.log_warn("Cannot run engine ibis, as data contract is invalid")
Expand All @@ -96,7 +98,7 @@ def execute_ibis_checks(

run.log_info("Running engine ibis")
try:
con = connect_ibis(run, data_contract, server, spark, duckdb_connection, schema_name)
con = connect_ibis(run, data_contract, server, spark, duckdb_connection, schema_name, source_config)
except DataContractException:
raise
except ImportError:
Expand Down
56 changes: 28 additions & 28 deletions datacontract/imports/unity_importer.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import json
import logging
import os
from typing import List, Optional, Tuple

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import ColumnInfo, TableInfo
from open_data_contract_standard.model import OpenDataContractStandard, SchemaProperty

from datacontract.configuration.source_config import DatabricksSourceConfig, SourceConfig
from datacontract.imports.importer import Importer
from datacontract.imports.odcs_helper import (
create_odcs,
Expand All @@ -32,8 +32,12 @@ def import_source(
if source is not None:
return import_unity_from_json(source)
else:
unity_table_full_name_list = import_args.get("unity_table_full_name")
return import_unity_from_api(unity_table_full_name_list)
unity_table_full_name_list = import_args.get("unity_table_full_name") or []

source_config = import_args.get("source_config", SourceConfig())
config: DatabricksSourceConfig = source_config.databricks_config().resolve()

return import_unity_from_api(unity_table_full_name_list, config)


def import_unity_from_json(source: str) -> OpenDataContractStandard:
Expand All @@ -55,35 +59,31 @@ def import_unity_from_json(source: str) -> OpenDataContractStandard:
return convert_unity_schema(odcs, unity_schema)


def import_unity_from_api(unity_table_full_name_list: List[str] = None) -> OpenDataContractStandard:
def import_unity_from_api(
unity_table_full_name_list: list[str],
config: DatabricksSourceConfig,
) -> OpenDataContractStandard:
"""Import data contract specification from Unity Catalog API."""
try:
profile = os.getenv("DATACONTRACT_DATABRICKS_PROFILE")
host, token = os.getenv("DATACONTRACT_DATABRICKS_SERVER_HOSTNAME"), os.getenv("DATACONTRACT_DATABRICKS_TOKEN")
exception = DataContractException(
type="configuration",
name="Databricks configuration",
reason="",
engine="datacontract",
)
if not profile and not host and not token:
reason = "Either DATACONTRACT_DATABRICKS_PROFILE or both DATACONTRACT_DATABRICKS_SERVER_HOSTNAME and DATACONTRACT_DATABRICKS_TOKEN environment variables must be set"
exception.reason = reason
raise exception
if token and not host:
reason = "DATACONTRACT_DATABRICKS_SERVER_HOSTNAME environment variable is not set"
exception.reason = reason
raise exception
if host and not token:
reason = "DATACONTRACT_DATABRICKS_TOKEN environment variable is not set"
exception.reason = reason
raise exception
workspace_client = WorkspaceClient(profile=profile) if profile else WorkspaceClient(host=host, token=token)
if config.profile:
workspace_client = WorkspaceClient(profile=config.profile)

elif config.token and config.server_hostname:
workspace_client = WorkspaceClient(host=config.server_hostname, token=config.token)

elif config.server_hostname:
# override host for Application Default Credentials
workspace_client = WorkspaceClient(host=config.server_hostname)

else:
# use Application Default Credentials
workspace_client = WorkspaceClient()

except Exception as e:
raise DataContractException(
type="schema",
name="Retrieve unity catalog schema",
reason="Failed to connect to unity catalog schema",
type="configuration",
name="Databricks configuration",
reason="A valid Databricks configuration is required to import from Unity Catalog. Supply a valid DATACONTRACT_DATABRICKS_PROFILE or both DATACONTRACT_DATABRICKS_SERVER_HOSTNAME and DATACONTRACT_DATABRICKS_TOKEN.",
engine="datacontract",
original_exception=e,
)
Expand Down
Loading