diff --git a/CHANGELOG.md b/CHANGELOG.md index 7e9f12ed..3f2da912 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `datacontract test` field type check now compares the full structured type tree for `object` and `array` logical types. - Unknown and unsupported types are silently ignored rather than failing the check. Specifically the `map` type is not supported until ODCS version v3.2.0 and is also ignored. +### Added +- The `test` and `import` commands support programmatically overriding Databricks configuration, otherwise passed as env variables. ## [1.0.4] - 2026-06-22 diff --git a/datacontract/configuration/source_config.py b/datacontract/configuration/source_config.py new file mode 100644 index 00000000..fa97a639 --- /dev/null +++ b/datacontract/configuration/source_config.py @@ -0,0 +1,33 @@ +import os + +from pydantic import BaseModel + + +class SourceConfig(BaseModel): + databricks: "DatabricksSourceConfig | None" = None + + def databricks_config(self) -> "DatabricksSourceConfig": + return (self.databricks or DatabricksSourceConfig()).resolve() + + +class DatabricksSourceConfig(BaseModel): + server_hostname: str | None = None + http_path: str | None = None + token: str | None = None + client_id: str | None = None + client_secret: str | None = None + profile: str | None = None + auth_type: str | None = None + + def resolve(self) -> "DatabricksSourceConfig": + """Fill missing values from env vars.""" + env = os.getenv + return DatabricksSourceConfig( + server_hostname=self.server_hostname or env("DATACONTRACT_DATABRICKS_SERVER_HOSTNAME"), + http_path=self.http_path or env("DATACONTRACT_DATABRICKS_HTTP_PATH"), + token=self.token or env("DATACONTRACT_DATABRICKS_TOKEN"), + client_id=self.client_id or env("DATACONTRACT_DATABRICKS_CLIENT_ID"), + client_secret=self.client_secret or env("DATACONTRACT_DATABRICKS_CLIENT_SECRET"), + profile=self.profile or env("DATACONTRACT_DATABRICKS_PROFILE"), + auth_type=self.auth_type or env("DATACONTRACT_DATABRICKS_AUTH_TYPE"), + ) diff --git a/datacontract/data_contract.py b/datacontract/data_contract.py index e20faa6b..732d122e 100644 --- a/datacontract/data_contract.py +++ b/datacontract/data_contract.py @@ -4,6 +4,8 @@ from open_data_contract_standard.model import OpenDataContractStandard, Team +from datacontract.configuration.source_config import SourceConfig + if typing.TYPE_CHECKING: from duckdb.duckdb import DuckDBPyConnection from pyspark.sql import SparkSession @@ -39,6 +41,7 @@ def __init__( check_categories: set[str] | None = None, fastapi_url: str = None, include_failed_samples: bool = False, + source_config: SourceConfig | None = None, ): self._data_contract_file = data_contract_file self._data_contract_str = data_contract_str @@ -56,6 +59,7 @@ def __init__( self._check_categories = check_categories self._fastapi_url = fastapi_url self._include_failed_samples = include_failed_samples + self.source_config = source_config @classmethod def init(cls, template: typing.Optional[str], schema: typing.Optional[str] = None) -> OpenDataContractStandard: @@ -147,6 +151,7 @@ def test(self) -> Run: schema_name=self._schema_name, check_categories=self._check_categories, include_failed_samples=self._include_failed_samples, + source_config=self.source_config, ) except DataContractException as e: diff --git a/datacontract/engines/data_contract_test.py b/datacontract/engines/data_contract_test.py index fa3fbb61..5f2c4188 100644 --- a/datacontract/engines/data_contract_test.py +++ b/datacontract/engines/data_contract_test.py @@ -6,6 +6,7 @@ import requests from open_data_contract_standard.model import OpenDataContractStandard, Server +from datacontract.configuration.source_config import SourceConfig from datacontract.engines.checks.create_checks import create_checks if typing.TYPE_CHECKING: @@ -31,6 +32,7 @@ def execute_data_contract_test( schema_name: str = "all", check_categories: set[str] | None = None, include_failed_samples: bool = False, + source_config: SourceConfig | None = None, ): if data_contract.schema_ is None or len(data_contract.schema_) == 0: raise DataContractException( @@ -88,6 +90,7 @@ def execute_data_contract_test( duckdb_connection, schema_name=schema_name, include_failed_samples=include_failed_samples, + source_config=source_config, ) diff --git a/datacontract/engines/ibis/connections/connect.py b/datacontract/engines/ibis/connections/connect.py index 8c268d2a..fdf98ba7 100644 --- a/datacontract/engines/ibis/connections/connect.py +++ b/datacontract/engines/ibis/connections/connect.py @@ -16,6 +16,7 @@ from open_data_contract_standard.model import OpenDataContractStandard, Server +from datacontract.configuration.source_config import SourceConfig from datacontract.engines.ibis.connections.duckdb_connection import get_duckdb_connection from datacontract.model.exceptions import DataContractException, require_env from datacontract.model.run import Check, ResultEnum, Run @@ -50,6 +51,7 @@ def connect_ibis( spark: "SparkSession" = None, duckdb_connection=None, schema_name: str = "all", + source_config: SourceConfig | None = None, ) -> "ibis.BaseBackend | None": """Return a connected ibis backend, or ``None`` if the server is unsupported. @@ -91,7 +93,7 @@ def connect_ibis( if database_name: spark.sql(f"USE {database_name}") return ibis.pyspark.connect(session=spark) - return _connect_databricks(ibis, server, run) + return _connect_databricks(ibis, server, run, source_config=source_config) if server_type == "postgres": return ibis.postgres.connect( @@ -208,7 +210,12 @@ def connect_ibis( return None -def _connect_databricks(ibis, server: Server, run: Run): +def _connect_databricks( + ibis, + server: Server, + run: Run, + source_config: SourceConfig | None = None, +): """Connect to Databricks SQL directly, selecting the auth method from env vars. Auth is resolved in priority order, so an existing token-based setup keeps @@ -226,40 +233,40 @@ def _connect_databricks(ibis, server: Server, run: Run): The OAuth credential providers build their SDK ``Config`` lazily, so token exchange happens when the connection is opened rather than while reading env. """ - host = server.host or require_env("DATACONTRACT_DATABRICKS_SERVER_HOSTNAME", server_type="databricks") + config = (source_config or SourceConfig()).databricks_config() + host = ( + server.host + or config.server_hostname + or require_env("DATACONTRACT_DATABRICKS_SERVER_HOSTNAME", server_type="databricks") + ) + kwargs = dict( server_hostname=host, - http_path=os.getenv("DATACONTRACT_DATABRICKS_HTTP_PATH"), + http_path=config.http_path, catalog=server.catalog, schema=server.schema_, ) - token = os.getenv("DATACONTRACT_DATABRICKS_TOKEN") - client_id = os.getenv("DATACONTRACT_DATABRICKS_CLIENT_ID") - client_secret = os.getenv("DATACONTRACT_DATABRICKS_CLIENT_SECRET") - profile = os.getenv("DATACONTRACT_DATABRICKS_PROFILE") - auth_type = os.getenv("DATACONTRACT_DATABRICKS_AUTH_TYPE") - - if token: + if config.token: run.log_info("Connecting to databricks with a personal access token") - return ibis.databricks.connect(access_token=token, **kwargs) + return ibis.databricks.connect(access_token=config.token, **kwargs) - if client_id and client_secret: + if config.client_id and config.client_secret: run.log_info("Connecting to databricks with an OAuth service principal (M2M)") sdk_host = host if host.startswith("http") else f"https://{host}" kwargs["credentials_provider"] = _databricks_credentials_provider( - host=sdk_host, client_id=client_id, client_secret=client_secret + host=sdk_host, client_id=config.client_id, client_secret=config.client_secret ) return ibis.databricks.connect(**kwargs) - if profile: - run.log_info(f"Connecting to databricks with config profile '{profile}'") - kwargs["credentials_provider"] = _databricks_credentials_provider(profile=profile) + if config.profile: + run.log_info(f"Connecting to databricks with config profile '{config.profile}'") + kwargs["credentials_provider"] = _databricks_credentials_provider(profile=config.profile) return ibis.databricks.connect(**kwargs) - if auth_type: - run.log_info(f"Connecting to databricks with auth_type '{auth_type}'") - return ibis.databricks.connect(auth_type=auth_type, **kwargs) + if config.auth_type: + run.log_info(f"Connecting to databricks with auth_type '{config.auth_type}'") + return ibis.databricks.connect(auth_type=config.auth_type, **kwargs) # Nothing configured: fail with the same clear message as before. token = require_env("DATACONTRACT_DATABRICKS_TOKEN", server_type="databricks") diff --git a/datacontract/engines/ibis/ibis_check_execute.py b/datacontract/engines/ibis/ibis_check_execute.py index dfa3543a..8eae3575 100644 --- a/datacontract/engines/ibis/ibis_check_execute.py +++ b/datacontract/engines/ibis/ibis_check_execute.py @@ -16,6 +16,7 @@ from open_data_contract_standard.model import OpenDataContractStandard, Server +from datacontract.configuration.source_config import SourceConfig from datacontract.engines.checks.check_spec import CheckSpec, MetricType from datacontract.engines.checks.type_normalize import schema_property_matches, schema_property_mismatch_reason from datacontract.engines.ibis.connections.connect import connect_ibis @@ -78,6 +79,7 @@ def execute_ibis_checks( duckdb_connection=None, schema_name: str = "all", include_failed_samples: bool = False, + source_config: SourceConfig | None = None, ): if data_contract is None: run.log_warn("Cannot run engine ibis, as data contract is invalid") @@ -96,7 +98,7 @@ def execute_ibis_checks( run.log_info("Running engine ibis") try: - con = connect_ibis(run, data_contract, server, spark, duckdb_connection, schema_name) + con = connect_ibis(run, data_contract, server, spark, duckdb_connection, schema_name, source_config) except DataContractException: raise except ImportError: diff --git a/datacontract/imports/unity_importer.py b/datacontract/imports/unity_importer.py index 8f0407ef..bfbaf4ba 100644 --- a/datacontract/imports/unity_importer.py +++ b/datacontract/imports/unity_importer.py @@ -1,12 +1,12 @@ import json import logging -import os from typing import List, Optional, Tuple from databricks.sdk import WorkspaceClient from databricks.sdk.service.catalog import ColumnInfo, TableInfo from open_data_contract_standard.model import OpenDataContractStandard, SchemaProperty +from datacontract.configuration.source_config import DatabricksSourceConfig, SourceConfig from datacontract.imports.importer import Importer from datacontract.imports.odcs_helper import ( create_odcs, @@ -32,8 +32,12 @@ def import_source( if source is not None: return import_unity_from_json(source) else: - unity_table_full_name_list = import_args.get("unity_table_full_name") - return import_unity_from_api(unity_table_full_name_list) + unity_table_full_name_list = import_args.get("unity_table_full_name") or [] + + source_config = import_args.get("source_config", SourceConfig()) + config: DatabricksSourceConfig = source_config.databricks_config().resolve() + + return import_unity_from_api(unity_table_full_name_list, config) def import_unity_from_json(source: str) -> OpenDataContractStandard: @@ -55,35 +59,31 @@ def import_unity_from_json(source: str) -> OpenDataContractStandard: return convert_unity_schema(odcs, unity_schema) -def import_unity_from_api(unity_table_full_name_list: List[str] = None) -> OpenDataContractStandard: +def import_unity_from_api( + unity_table_full_name_list: list[str], + config: DatabricksSourceConfig, +) -> OpenDataContractStandard: """Import data contract specification from Unity Catalog API.""" try: - profile = os.getenv("DATACONTRACT_DATABRICKS_PROFILE") - host, token = os.getenv("DATACONTRACT_DATABRICKS_SERVER_HOSTNAME"), os.getenv("DATACONTRACT_DATABRICKS_TOKEN") - exception = DataContractException( - type="configuration", - name="Databricks configuration", - reason="", - engine="datacontract", - ) - if not profile and not host and not token: - reason = "Either DATACONTRACT_DATABRICKS_PROFILE or both DATACONTRACT_DATABRICKS_SERVER_HOSTNAME and DATACONTRACT_DATABRICKS_TOKEN environment variables must be set" - exception.reason = reason - raise exception - if token and not host: - reason = "DATACONTRACT_DATABRICKS_SERVER_HOSTNAME environment variable is not set" - exception.reason = reason - raise exception - if host and not token: - reason = "DATACONTRACT_DATABRICKS_TOKEN environment variable is not set" - exception.reason = reason - raise exception - workspace_client = WorkspaceClient(profile=profile) if profile else WorkspaceClient(host=host, token=token) + if config.profile: + workspace_client = WorkspaceClient(profile=config.profile) + + elif config.token and config.server_hostname: + workspace_client = WorkspaceClient(host=config.server_hostname, token=config.token) + + elif config.server_hostname: + # override host for Application Default Credentials + workspace_client = WorkspaceClient(host=config.server_hostname) + + else: + # use Application Default Credentials + workspace_client = WorkspaceClient() + except Exception as e: raise DataContractException( - type="schema", - name="Retrieve unity catalog schema", - reason="Failed to connect to unity catalog schema", + type="configuration", + name="Databricks configuration", + reason="A valid Databricks configuration is required to import from Unity Catalog. Supply a valid DATACONTRACT_DATABRICKS_PROFILE or both DATACONTRACT_DATABRICKS_SERVER_HOSTNAME and DATACONTRACT_DATABRICKS_TOKEN.", engine="datacontract", original_exception=e, )