Skip to content
Merged
137 changes: 137 additions & 0 deletions google/auth/aio/transport/mtls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Helper functions for mTLS in async for discovery of certs.
"""

import asyncio
import logging

from google.auth import exceptions
import google.auth.transport._mtls_helper
import google.auth.transport.mtls

_LOGGER = logging.getLogger(__name__)


async def _run_in_executor(func, *args):
"""Run a blocking function in an executor to avoid blocking the event loop.

This implements the non-blocking execution strategy for disk I/O operations.
"""
try:
# For python versions 3.9 and newer versions
return await asyncio.to_thread(func, *args)
except AttributeError:
# Fallback for older Python versions
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, func, *args)


def default_client_cert_source():
"""Get a callback which returns the default client SSL credentials.

Returns:
Awaitable[Callable[[], [bytes, bytes]]]: A callback which returns the default
client certificate bytes and private key bytes, both in PEM format.

Raises:
google.auth.exceptions.DefaultClientCertSourceError: If the default
client SSL credentials don't exist or are malformed.
"""
if not google.auth.transport.mtls.has_default_client_cert_source(
include_context_aware=False
):
raise exceptions.MutualTLSChannelError(
"Default client cert source doesn't exist"
)

async def callback():
try:
_, cert_bytes, key_bytes = await get_client_cert_and_key()
except (OSError, RuntimeError, ValueError) as caught_exc:
new_exc = exceptions.MutualTLSChannelError(caught_exc)
raise new_exc from caught_exc

return cert_bytes, key_bytes

return callback


async def get_client_ssl_credentials(
certificate_config_path=None,
):
"""Returns the client side certificate, private key and passphrase.

We look for certificates and keys with the following order of priority:
1. Certificate and key specified by certificate_config.json.
Currently, only X.509 workload certificates are supported.

Args:
certificate_config_path (str): The certificate_config.json file path.

Returns:
Tuple[bool, bytes, bytes, bytes]:
A boolean indicating if cert, key and passphrase are obtained, the
cert bytes and key bytes both in PEM format, and passphrase bytes.

Raises:
google.auth.exceptions.ClientCertError: if problems occurs when getting
the cert, key and passphrase.
"""

# Attempt to retrieve X.509 Workload cert and key.
cert, key = await _run_in_executor(
google.auth.transport._mtls_helper._get_workload_cert_and_key,
certificate_config_path,
False,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this could be cleaner if you use a keyword argument: include_context_aware=False

)

if cert and key:
return True, cert, key, None

return False, None, None, None


async def get_client_cert_and_key(client_cert_callback=None):
"""Returns the client side certificate and private key. The function first
tries to get certificate and key from client_cert_callback; if the callback
is None or doesn't provide certificate and key, the function tries application
default SSL credentials.

Args:
client_cert_callback (Optional[Callable[[], (bytes, bytes)]]): An
optional callback which returns client certificate bytes and private
key bytes both in PEM format.

Returns:
Tuple[bool, bytes, bytes]:
A boolean indicating if cert and key are obtained, the cert bytes
and key bytes both in PEM format.

Raises:
google.auth.exceptions.ClientCertError: if problems occurs when getting
the cert and key.
"""
if client_cert_callback:
result = client_cert_callback()
try:
cert, key = await result
except TypeError:
cert, key = result
return True, cert, key

has_cert, cert, key, _ = await get_client_ssl_credentials()
return has_cert, cert, key
22 changes: 16 additions & 6 deletions google/auth/transport/_mtls_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
from google.auth import exceptions

CONTEXT_AWARE_METADATA_PATH = "~/.secureConnect/context_aware_metadata.json"

# Default gcloud config path, to be used with path.expanduser for cross-platform compatibility.
CERTIFICATE_CONFIGURATION_DEFAULT_PATH = "~/.config/gcloud/certificate_config.json"
_CERT_PROVIDER_COMMAND = "cert_provider_command"
_CERT_REGEX = re.compile(
Expand Down Expand Up @@ -103,14 +105,18 @@ def _load_json_file(path):
return json_data


def _get_workload_cert_and_key(certificate_config_path=None):
def _get_workload_cert_and_key(
certificate_config_path=None, include_context_aware=True
):
"""Read the workload identity cert and key files specified in the certificate config provided.
If no config path is provided, check the environment variable: "GOOGLE_API_CERTIFICATE_CONFIG"
first, then the well known gcloud location: "~/.config/gcloud/certificate_config.json".

Args:
certificate_config_path (string): The certificate config path. If no path is provided,
the environment variable will be checked first, then the well known gcloud location.
include_context_aware (bool): If context aware metadata path should be checked for the
SecureConnect mTLS configuration.

Returns:
Tuple[Optional[bytes], Optional[bytes]]: client certificate bytes in PEM format and key
Expand All @@ -121,15 +127,17 @@ def _get_workload_cert_and_key(certificate_config_path=None):
the certificate or key information.
"""

cert_path, key_path = _get_workload_cert_and_key_paths(certificate_config_path)
cert_path, key_path = _get_workload_cert_and_key_paths(
certificate_config_path, include_context_aware
)

if cert_path is None and key_path is None:
return None, None

return _read_cert_and_key_files(cert_path, key_path)


def _get_cert_config_path(certificate_config_path=None):
def _get_cert_config_path(certificate_config_path=None, include_context_aware=True):
"""Get the certificate configuration path based on the following order:

1: Explicit override, if set
Expand All @@ -141,6 +149,8 @@ def _get_cert_config_path(certificate_config_path=None):
Args:
certificate_config_path (string): The certificate config path. If provided, the well known
location and environment variable will be ignored.
include_context_aware (bool): If context aware metadata path should be checked for the
SecureConnect mTLS configuration.

Returns:
The absolute path of the certificate config file, and None if the file does not exist.
Expand All @@ -155,7 +165,7 @@ def _get_cert_config_path(certificate_config_path=None):
environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH,
None,
)
if env_path is not None and env_path != "":
if include_context_aware and env_path is not None and env_path != "":
certificate_config_path = env_path
else:
certificate_config_path = CERTIFICATE_CONFIGURATION_DEFAULT_PATH
Expand All @@ -166,8 +176,8 @@ def _get_cert_config_path(certificate_config_path=None):
return certificate_config_path


def _get_workload_cert_and_key_paths(config_path):
absolute_path = _get_cert_config_path(config_path)
def _get_workload_cert_and_key_paths(config_path, include_context_aware=True):
absolute_path = _get_cert_config_path(config_path, include_context_aware)
if absolute_path is None:
return None, None

Expand Down
13 changes: 9 additions & 4 deletions google/auth/transport/mtls.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,19 @@
from google.auth.transport import _mtls_helper


def has_default_client_cert_source():
def has_default_client_cert_source(include_context_aware=True):
"""Check if default client SSL credentials exists on the device.

Args:
include_context_aware (bool): include_context_aware indicates if context_aware
path location will be checked or should it be skipped.

Returns:
bool: indicating if the default client cert source exists.
"""
if (
_mtls_helper._check_config_path(_mtls_helper.CONTEXT_AWARE_METADATA_PATH)
include_context_aware
and _mtls_helper._check_config_path(_mtls_helper.CONTEXT_AWARE_METADATA_PATH)
is not None
):
return True
Expand Down Expand Up @@ -58,7 +63,7 @@ def default_client_cert_source():
google.auth.exceptions.DefaultClientCertSourceError: If the default
client SSL credentials don't exist or are malformed.
"""
if not has_default_client_cert_source():
if not has_default_client_cert_source(include_context_aware=True):
raise exceptions.MutualTLSChannelError(
"Default client cert source doesn't exist"
)
Expand Down Expand Up @@ -94,7 +99,7 @@ def default_client_encrypted_cert_source(cert_path, key_path):
google.auth.exceptions.DefaultClientCertSourceError: If any problem
occurs when loading or saving the client certificate and key.
"""
if not has_default_client_cert_source():
if not has_default_client_cert_source(include_context_aware=True):
raise exceptions.MutualTLSChannelError(
"Default client encrypted cert source doesn't exist"
)
Expand Down
4 changes: 2 additions & 2 deletions system_tests/system_tests_sync/test_service_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ def test_refresh_success(http_request, credentials, token_info):

assert info["email"] == credentials.service_account_email
info_scopes = _helpers.string_to_scopes(info["scope"])
assert set(info_scopes) == set(
assert set(info_scopes).issubset(set(
[
"https://www.googleapis.com/auth/userinfo.email",
"https://www.googleapis.com/auth/userinfo.profile",
]
)
))

def test_iam_signer(http_request, credentials):
credentials = credentials.with_scopes(
Expand Down
Loading
Loading