mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-09 00:01:28 -05:00
Don't check the at_hash (access token hash) in OIDC ID Tokens if we don't use the access token (#18374)
Co-authored-by: Eric Eastwood <erice@element.io>
This commit is contained in:
parent
ea376126a0
commit
fd5d3d852d
1
changelog.d/18374.misc
Normal file
1
changelog.d/18374.misc
Normal file
@ -0,0 +1 @@
|
|||||||
|
Don't validate the `at_hash` (access token hash) field in OIDC ID Tokens if we don't end up actually using the OIDC Access Token.
|
||||||
@ -586,6 +586,24 @@ class OidcProvider:
|
|||||||
or self._user_profile_method == "userinfo_endpoint"
|
or self._user_profile_method == "userinfo_endpoint"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def _uses_access_token(self) -> bool:
|
||||||
|
"""Return True if the `access_token` will be used during the login process.
|
||||||
|
|
||||||
|
This is useful to determine whether the access token
|
||||||
|
returned by the identity provider, and
|
||||||
|
any related metadata (such as the `at_hash` field in
|
||||||
|
the ID token), should be validated.
|
||||||
|
"""
|
||||||
|
# Currently, Synapse only uses the access_token to fetch user metadata
|
||||||
|
# from the userinfo endpoint. Therefore we only have a single criteria
|
||||||
|
# to check right now but this may change in the future and this function
|
||||||
|
# should be updated if more usages are introduced.
|
||||||
|
#
|
||||||
|
# For example, if we start to use the access_token given to us by the
|
||||||
|
# IdP for more things, such as accessing Resource Server APIs.
|
||||||
|
return self._uses_userinfo
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def issuer(self) -> str:
|
def issuer(self) -> str:
|
||||||
"""The issuer identifying this provider."""
|
"""The issuer identifying this provider."""
|
||||||
@ -957,9 +975,16 @@ class OidcProvider:
|
|||||||
"nonce": nonce,
|
"nonce": nonce,
|
||||||
"client_id": self._client_auth.client_id,
|
"client_id": self._client_auth.client_id,
|
||||||
}
|
}
|
||||||
if "access_token" in token:
|
if self._uses_access_token and "access_token" in token:
|
||||||
# If we got an `access_token`, there should be an `at_hash` claim
|
# If we got an `access_token`, there should be an `at_hash` claim
|
||||||
# in the `id_token` that we can check against.
|
# in the `id_token` that we can check against. Setting this
|
||||||
|
# instructs authlib to check the value of `at_hash` in the
|
||||||
|
# ID token.
|
||||||
|
#
|
||||||
|
# We only need to verify the access token if we actually make
|
||||||
|
# use of it. Which currently only happens when we need to fetch
|
||||||
|
# the user's information from the userinfo_endpoint. Thus, this
|
||||||
|
# check is also gated on self._uses_userinfo.
|
||||||
claims_params["access_token"] = token["access_token"]
|
claims_params["access_token"] = token["access_token"]
|
||||||
|
|
||||||
claims_options = {"iss": {"values": [metadata["issuer"]]}}
|
claims_options = {"iss": {"values": [metadata["issuer"]]}}
|
||||||
|
|||||||
@ -1029,6 +1029,50 @@ class OidcHandlerTestCase(HomeserverTestCase):
|
|||||||
args = parse_qs(kwargs["data"].decode("utf-8"))
|
args = parse_qs(kwargs["data"].decode("utf-8"))
|
||||||
self.assertEqual(args["redirect_uri"], [TEST_REDIRECT_URI])
|
self.assertEqual(args["redirect_uri"], [TEST_REDIRECT_URI])
|
||||||
|
|
||||||
|
@override_config(
|
||||||
|
{
|
||||||
|
"oidc_config": {
|
||||||
|
**DEFAULT_CONFIG,
|
||||||
|
"redirect_uri": TEST_REDIRECT_URI,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
def test_code_exchange_ignores_access_token(self) -> None:
|
||||||
|
"""
|
||||||
|
Code exchange completes successfully and doesn't validate the `at_hash`
|
||||||
|
(access token hash) field of an ID token when the access token isn't
|
||||||
|
going to be used.
|
||||||
|
|
||||||
|
The access token won't be used in this test because Synapse (currently)
|
||||||
|
only needs it to fetch a user's metadata if it isn't included in the ID
|
||||||
|
token itself.
|
||||||
|
|
||||||
|
Because we have included "openid" in the requested scopes for this IdP
|
||||||
|
(see `SCOPES`), user metadata is be included in the ID token. Thus the
|
||||||
|
access token isn't needed, and it's unnecessary for Synapse to validate
|
||||||
|
the access token.
|
||||||
|
|
||||||
|
This is a regression test for a situation where an upstream identity
|
||||||
|
provider was providing an invalid `at_hash` value, which Synapse errored
|
||||||
|
on, yet Synapse wasn't using the access token for anything.
|
||||||
|
"""
|
||||||
|
# Exchange the code against the fake IdP.
|
||||||
|
userinfo = {
|
||||||
|
"sub": "foo",
|
||||||
|
"username": "foo",
|
||||||
|
"phone": "1234567",
|
||||||
|
}
|
||||||
|
with self.fake_server.id_token_override(
|
||||||
|
{
|
||||||
|
"at_hash": "invalid-hash",
|
||||||
|
}
|
||||||
|
):
|
||||||
|
request, _ = self.start_authorization(userinfo)
|
||||||
|
self.get_success(self.handler.handle_oidc_callback(request))
|
||||||
|
|
||||||
|
# If no error was rendered, then we have success.
|
||||||
|
self.render_error.assert_not_called()
|
||||||
|
|
||||||
@override_config(
|
@override_config(
|
||||||
{
|
{
|
||||||
"oidc_config": {
|
"oidc_config": {
|
||||||
|
|||||||
@ -20,7 +20,9 @@
|
|||||||
#
|
#
|
||||||
|
|
||||||
|
|
||||||
|
import base64
|
||||||
import json
|
import json
|
||||||
|
from hashlib import sha256
|
||||||
from typing import Any, ContextManager, Dict, List, Optional, Tuple
|
from typing import Any, ContextManager, Dict, List, Optional, Tuple
|
||||||
from unittest.mock import Mock, patch
|
from unittest.mock import Mock, patch
|
||||||
from urllib.parse import parse_qs
|
from urllib.parse import parse_qs
|
||||||
@ -154,10 +156,23 @@ class FakeOidcServer:
|
|||||||
json_payload = json.dumps(payload)
|
json_payload = json.dumps(payload)
|
||||||
return jws.serialize_compact(protected, json_payload, self._key).decode("utf-8")
|
return jws.serialize_compact(protected, json_payload, self._key).decode("utf-8")
|
||||||
|
|
||||||
def generate_id_token(self, grant: FakeAuthorizationGrant) -> str:
|
def generate_id_token(
|
||||||
|
self, grant: FakeAuthorizationGrant, access_token: str
|
||||||
|
) -> str:
|
||||||
|
# Generate a hash of the access token for the optional
|
||||||
|
# `at_hash` field in an ID Token.
|
||||||
|
#
|
||||||
|
# 3.1.3.6. ID Token, https://openid.net/specs/openid-connect-core-1_0.html#CodeIDToken
|
||||||
|
at_hash = (
|
||||||
|
base64.urlsafe_b64encode(sha256(access_token.encode("ascii")).digest()[:16])
|
||||||
|
.rstrip(b"=")
|
||||||
|
.decode("ascii")
|
||||||
|
)
|
||||||
|
|
||||||
now = int(self._clock.time())
|
now = int(self._clock.time())
|
||||||
id_token = {
|
id_token = {
|
||||||
**grant.userinfo,
|
**grant.userinfo,
|
||||||
|
"at_hash": at_hash,
|
||||||
"iss": self.issuer,
|
"iss": self.issuer,
|
||||||
"aud": grant.client_id,
|
"aud": grant.client_id,
|
||||||
"iat": now,
|
"iat": now,
|
||||||
@ -243,7 +258,7 @@ class FakeOidcServer:
|
|||||||
}
|
}
|
||||||
|
|
||||||
if "openid" in grant.scope:
|
if "openid" in grant.scope:
|
||||||
token["id_token"] = self.generate_id_token(grant)
|
token["id_token"] = self.generate_id_token(grant, access_token)
|
||||||
|
|
||||||
return dict(token)
|
return dict(token)
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user