Source code for annofabapi.api

import copy
import json
import logging
import time
from collections.abc import Callable, Collection
from functools import wraps
from json import JSONDecodeError
from typing import Any, TypeVar, overload

import backoff
import requests
from requests.auth import AuthBase
from requests.cookies import RequestsCookieJar

from annofabapi.credentials import IdPass, Pat, Tokens
from annofabapi.exceptions import InvalidMfaCodeError, MfaEnabledUserExecutionError, NotLoggedInError
from annofabapi.generated_api import AbstractAnnofabApi
from annofabapi.util.type_util import assert_noreturn

logger = logging.getLogger(__name__)

DEFAULT_ENDPOINT_URL = "https://annofab.com"
"""Annofab WebAPIのデフォルトのエンドポイントURL"""

DEFAULT_WAITING_TIME_SECONDS_WITH_429_STATUS_CODE = 300
"""HTTP Status Codeが429のときの、デフォルト(Retry-Afterヘッダがないとき)の待ち時間です。"""


def _read_mfa_code_from_stdin() -> str:
    """標準入力からMFAコードを読み込みます。"""
    inputted_mfa_code = ""
    while inputted_mfa_code == "":
        inputted_mfa_code = input("Enter Annofab MFA Code: ")
    return inputted_mfa_code


def _mask_sensitive_value_for_dict(data: dict[str, Any], keys: Collection[str]) -> dict[str, Any]:
    """
    dictに含まれているセンシティブな情報を"***"でマスクします。

    Args:
        data: マスク対象のdict
        keys: マスク対象の複数のkey

    Returns:
        センシティブな情報がマスクされたdict。
        1つ以上の値をマスクした場合は、複製されたdictが返ります。
        1つもマスクしていない場合は、引数`data`そのものが返ります。

    """
    MASKED_VALUE = "***"
    diff_keys = set(keys) - set(data.keys())
    if len(diff_keys) == len(keys):
        # マスク対象のキーがない
        return data

    copied_data = copy.deepcopy(data)
    for key in keys:
        if key in copied_data:
            copied_data[key] = MASKED_VALUE

    return copied_data


def _raise_for_status(response: requests.Response) -> None:
    """
    HTTP Status CodeがErrorの場合、``requests.exceptions.HTTPError`` を発生させる。
    そのとき ``response.text`` もHTTPErrorに加えて、HTTPError発生時にエラーの原因が分かるようにする。


    Args:
        response: Response

    Raises:
        requests.exceptions.HTTPError:

    """
    try:
        response.raise_for_status()
    except requests.exceptions.HTTPError as e:
        http_error_msg = f"{e.args[0]} , {response.text}"
        e.args = (http_error_msg,)
        raise e


def _log_error_response(arg_logger: logging.Logger, response: requests.Response) -> None:
    """
    HTTP Statusが400以上ならば、loggerにresponse/request情報を出力する


    Args:
        arg_logger: logger
        response: Response

    """

    def mask_str_request_body(str_request_body: str) -> Any:  # noqa: ANN401
        """
        文字列型であるrequest_bodyがJSON形式だとみなして、センシティブな情報をマスクします。
        """
        try:
            # JSON文字列だとみなして、Pythonオブジェクトへの変換を試みる
            json_request_body = json.loads(str_request_body)
        except JSONDecodeError:
            return str_request_body

        return _create_request_body_for_logger(json_request_body)

    if 400 <= response.status_code < 600:
        # logにAuthorizationを出力しないようにマスクする
        headers_for_logger = _mask_sensitive_value_for_dict(dict(response.request.headers), {"Authorization"})

        # request_bodyのpassword関係をマスクして、logに出力する
        request_body_for_logger: Any | None = None
        if isinstance(response.request.body, bytes):
            try:
                # 文字列への変換を試みる
                str_request_body = response.request.body.decode()
                request_body_for_logger = mask_str_request_body(str_request_body)
            except UnicodeError:
                request_body_for_logger = response.request.body

        elif isinstance(response.request.body, str):
            request_body_for_logger = mask_str_request_body(response.request.body)

        arg_logger.error(
            "HTTP error occurred :: %s",
            {
                "response": {
                    "status_code": response.status_code,
                    "text": response.text,
                },
                "request": {
                    "http_method": response.request.method,
                    "url": response.request.url,
                    "body": request_body_for_logger,
                    "headers": headers_for_logger,
                },
            },
        )


T = TypeVar("T")


@overload
def _create_request_body_for_logger(data: bytes) -> str:
    pass


@overload
def _create_request_body_for_logger(data: T) -> T: ...


def _create_request_body_for_logger(data: Any) -> Any:
    """
    ログに出力するためのrequest_bodyを生成する。
     * パスワードやトークンなどの機密情報をマスクする
     * bytes型の場合は `(bytes)`と記載する。

    Args:
        data: request_body

    Returns:
        ログ出力用のrequest_body
    """
    if isinstance(data, bytes):
        # bytes型のときは値を出力しても意味がないので、bytesであることが分かるようにする
        return "(bytes)"

    # dictの場合は機密情報をマスクする
    sensitive_keys = {"password", "old_password", "new_password", "id_token", "refresh_token", "access_token", "session", "mfa_code"}
    if isinstance(data, dict):
        return _mask_sensitive_value_for_dict(data, keys=sensitive_keys)

    return data


def _create_query_params_for_logger(params: dict[str, Any]) -> dict[str, Any]:
    """
    ログに出力するためのquery_paramsを生成する。
     * AWS関係のcredential情報をマスクする。

    Args:
        params: query_params

    Returns:
        ログ出力用のparams
    """
    return _mask_sensitive_value_for_dict(params, keys={"X-Amz-Security-Token", "X-Amz-Credential"})


def _should_retry_with_status(status_code: int) -> bool:
    """
    HTTP Status Codeからリトライすべきかどうかを返す。

    Notes:
        429(Too many requests)の場合も、`@my_backoff`ではリトライしません。
        レスポンスヘッダーの`Retry-After`を参照するため、`@my_backoff`ではなく、直接リトライするコードを書いています。

    Returns:
        trueならばリトライする
    """

    # 501の場合は、未実装のためリトライしない
    if status_code == requests.codes.not_implemented:
        return False
    if 500 <= status_code < 600:  # noqa: SIM103
        return True
    return False


def my_backoff(function) -> Callable:  # noqa: ANN001
    """
    リトライした方が良い場合は、バックオフする
    """

    @wraps(function)
    def wrapped(*args, **kwargs):  # noqa: ANN202
        def should_give_up(e: Exception) -> bool:
            """
            ギブアップ(リトライしない)かどうか

            Returns:
                True: give up(リトライしない), False: リトライする

            """
            if isinstance(e, requests.exceptions.HTTPError):
                if e.response is None:
                    return True
                return not _should_retry_with_status(e.response.status_code)

            else:
                # リトライする
                return False

        return backoff.on_exception(
            backoff.expo,
            (requests.exceptions.HTTPError, requests.exceptions.ConnectionError, ConnectionError),
            jitter=backoff.full_jitter,
            max_time=300,
            giveup=should_give_up,
            # loggerの名前をbackoffからannofabapiに変更する
            logger=logger,
        )(function)(*args, **kwargs)

    return wrapped


[docs] class AnnofabApi(AbstractAnnofabApi): """ Web APIに対応したメソッドが存在するクラス。 Args: credentials: Annofabにログインするときの認証情報 endpoint_url: Annofab APIのエンドポイント。 input_mfa_code_via_stdin: MFAコードを標準入力から入力するかどうか Falseを渡して且つMFAコードの入力を求められるアカウントを利用する場合、mfa_codeを引数にloginメソッドを直接呼び出さなければならず、そうしない場合は例外を送出する Attributes: tokens: login, refresh_tokenで取得したtoken情報 cookies: Signed Cookie情報 """ def __init__(self, credentials: IdPass | Pat, *, endpoint_url: str = DEFAULT_ENDPOINT_URL, input_mfa_code_via_stdin: bool = False) -> None: if isinstance(credentials, IdPass) and (not credentials.user_id or not credentials.password): raise ValueError("login_user_id or login_password is empty.") if isinstance(credentials, Pat) and not credentials.token: raise ValueError("pat is empty.") self.credentials = credentials self.endpoint_url = endpoint_url self.input_mfa_code_via_stdin = input_mfa_code_via_stdin self.url_prefix = f"{endpoint_url}/api/v1" self.session = requests.Session() self.tokens: Tokens | Pat | None = None self.cookies: RequestsCookieJar | None = None self.__account_id: str | None = None self.__user_id: str | None = None class _MyToken(AuthBase): """ requestsモジュールのauthに渡す情報。 http://docs.python-requests.org/en/master/user/advanced/#custom-authentication """ def __init__(self, id_token: str) -> None: self.id_token = id_token def __call__(self, req): # noqa: ANN001, ANN204 req.headers["Authorization"] = self.id_token return req ######################################### # Private Method ######################################### @staticmethod def _encode_query_params(query_params: dict[str, Any]) -> dict[str, Any]: """query_paramsのvalueがlist or dictのときは、JSON形式の文字列に変換する。 `getAnnotationList` webapiで指定できる `query`などのように、2階層のquery_paramsに対応させる。 Args: query_params (dict[str,Any]): [description] Returns: dict[str, str]: [description] """ new_params = {} if query_params is not None: for key, value in query_params.items(): if isinstance(value, (list, dict)): new_params[key] = json.dumps(value) else: new_params[key] = value return new_params def _create_kwargs( self, params: dict[str, Any] | None = None, headers: dict[str, Any] | None = None, request_body: Any | None = None, # noqa: ANN401 ) -> dict[str, Any]: """ requestsモジュールのget,...メソッドに渡すkwargsを生成する。 Returns: kwargs情報 """ # query_param # query_paramsのvalueがlist or dictのときは、JSON形式の文字列に変換する。 # `getAnnotationList` webapiで指定できる `query`などのように、2階層のquery_paramsに対応させる。 new_params = {} if params is not None: for key, value in params.items(): if isinstance(value, (list, dict)): new_params[key] = json.dumps(value) else: new_params[key] = value kwargs: dict[str, Any] = { "params": new_params, "headers": headers, } if self.tokens is not None: token = self.tokens.auth_token kwargs.update({"auth": self._MyToken(token)}) if request_body is not None: if isinstance(request_body, (dict, list)): kwargs.update({"json": request_body}) elif isinstance(request_body, str): kwargs.update({"data": request_body.encode("utf-8")}) else: kwargs.update({"data": request_body}) return kwargs @staticmethod def _response_to_content(response: requests.Response) -> Any: # noqa: ANN401 """ Responseのcontentを、Content-Typeに対応した型に変換する。 Args: response: Returns: JSONの場合はDict, textの場合はstringのcontent """ content_type = response.headers["Content-Type"] # `Content-Type: application/json;charset=utf-8`などcharsetが含まれている場合にも対応できるようにする。 tokens = content_type.split(";") media_type = tokens[0].strip() if media_type == "application/json": content = response.json() if len(response.content) != 0 else {} elif media_type.find("text/") >= 0: content = response.text else: content = response.content return content @my_backoff def _execute_http_request( self, http_method: str, url: str, *, params: dict[str, Any] | None = None, data: Any | None = None, # noqa: ANN401 json: Any | None = None, # pylint: disable=redefined-outer-name # noqa: ANN401 headers: dict[str, Any] | None = None, stream: bool = False, raise_for_status: bool = True, **kwargs, ) -> requests.Response: """ Session情報を使って、HTTP Requestを投げます。Annofab WebAPIで取得したAWS S3のURLなどに、アクセスすることを想定しています。 引数は ``requests.Session.request`` にそのまま渡します。 Args: raise_for_status: Trueの場合HTTP Status Codeが4XX,5XXのときはHTTPErrorをスローします Returns: requests.Response: [description] Raises: requests.exceptions.HTTPError: http status codeが4XXX,5XXXのとき """ return self._execute_http_request_without_backoff( http_method=http_method, url=url, params=params, data=data, json=json, headers=headers, stream=stream, raise_for_status=raise_for_status, **kwargs, ) def _execute_http_request_without_backoff( self, http_method: str, url: str, *, params: dict[str, Any] | None = None, data: Any | None = None, # noqa: ANN401 json: Any | None = None, # pylint: disable=redefined-outer-name # noqa: ANN401 headers: dict[str, Any] | None = None, stream: bool = False, raise_for_status: bool = True, **kwargs, ) -> requests.Response: """ Session情報を使って、HTTP Requestを投げます。Annofab WebAPIで取得したAWS S3のURLなどに、アクセスすることを想定しています。 ただし、backoffは行いません。 """ response = self.session.request(method=http_method, url=url, params=params, data=data, headers=headers, json=json, stream=stream, **kwargs) # response.requestよりメソッド引数のrequest情報の方が分かりやすいので、メソッド引数のrequest情報を出力する。 logger.debug( "Sent a request :: %s", { "requests": { "http_method": http_method, "url": url, "query_params": _create_query_params_for_logger(params) if params is not None else None, "request_body_json": _create_request_body_for_logger(json) if json is not None else None, "request_body_data": _create_request_body_for_logger(data) if data is not None else None, "header_params": headers, }, "response": {"status_code": response.status_code, "headers": {"Content-Length": response.headers.get("Content-Length")}}, }, ) # リクエスト過多の場合、待ってから再度アクセスする if response.status_code == requests.codes.too_many_requests: retry_after_value = response.headers.get("Retry-After") waiting_time_seconds = float(retry_after_value) if retry_after_value is not None else DEFAULT_WAITING_TIME_SECONDS_WITH_429_STATUS_CODE logger.info( "HTTPステータスコードが'%s'なので、%s秒待ってからリトライします。 :: %s", response.status_code, waiting_time_seconds, { "response": { "status_code": response.status_code, "text": response.text, "headers": {"Retry-After": retry_after_value}, }, "request": { "http_method": http_method, "url": url, "query_params": _create_query_params_for_logger(params) if params is not None else None, }, }, ) time.sleep(float(waiting_time_seconds)) return self._execute_http_request( http_method=http_method, url=url, params=params, data=data, json=json, stream=stream, headers=headers, raise_for_status=raise_for_status, **kwargs, ) # リトライが必要な場合は、backoffがリトライできるようにするため、Exceptionをスローする if raise_for_status or _should_retry_with_status(response.status_code): _log_error_response(logger, response) _raise_for_status(response) return response @my_backoff def _request_wrapper( self, http_method: str, url_path: str, *, query_params: dict[str, Any] | None = None, header_params: dict[str, Any] | None = None, request_body: Any | None = None, # noqa: ANN401 raise_for_status: bool = True, ) -> tuple[Any, requests.Response]: """ Annofab WebAPIにアクセスして、レスポンスの中身とレスポンスを取得します。 Args: http_method: url_path: Annofab WebAPIのパス(例:``/my/account``) query_params: クエリパラメタ header_params: リクエストヘッダ request_body: リクエストボディ raise_for_status: Trueの場合HTTP Status Codeが4XX,5XXのときはHTTPErrorをスローします。Falseの場合はtuple[None, Response]を返します。 Returns: tuple[content, Response]. contentはcontent_typeにより型が変わる。 application/jsonならDict型, text/*ならばstr型, それ以外ならばbite型。 Raises: HTTPError: 引数 ``raise_for_status`` がTrueで、HTTP status codeが4xxx,5xxのときにスローします。 """ # TODO 判定条件が不明 if url_path.startswith("/internal/"): url = f"{self.endpoint_url}/api{url_path}" else: url = f"{self.url_prefix}{url_path}" # patを使う場合は最初にtokensをセットする # def logoutの呼び出しでtokensがNoneになった後にAPIを呼び出しても問題ないように(IdPassの場合も、自動loginしているので、その代わり) # IdPassと同じ処理に合流させてしまうと、patが無効なときに無限ループしてしまうので、ここで1回だけ呼び出す if self.tokens is None and isinstance(self.credentials, Pat): self._login_pat(self.credentials) kwargs = self._create_kwargs(query_params, header_params, request_body) response = self.session.request(method=http_method.lower(), url=url, **kwargs) # response.requestよりメソッド引数のrequest情報の方が分かりやすいので、メソッド引数のrequest情報を出力する。 logger.debug( "Sent a request :: %s", { "request": { "http_method": http_method.lower(), "url": url, "query_params": query_params, "header_params": header_params, "request_body": _create_request_body_for_logger(request_body) if request_body is not None else None, }, "response": {"status_code": response.status_code, "headers": {"Content-Length": response.headers.get("Content-Length")}}, }, ) # ID/PASSが指定されており、Unauthorized Errorならば、ログイン後に再度実行する if isinstance(self.credentials, IdPass) and response.status_code == requests.codes.unauthorized: self.refresh_token() return self._request_wrapper( http_method, url_path, query_params=query_params, header_params=header_params, request_body=request_body, raise_for_status=raise_for_status, ) elif response.status_code == requests.codes.too_many_requests: retry_after_value = response.headers.get("Retry-After") waiting_time_seconds = float(retry_after_value) if retry_after_value is not None else DEFAULT_WAITING_TIME_SECONDS_WITH_429_STATUS_CODE logger.info( "HTTPステータスコードが'%s'なので、%s秒待ってからリトライします。 :: %s", response.status_code, waiting_time_seconds, { "response": { "status_code": response.status_code, "text": response.text, "headers": {"Retry-After": retry_after_value}, }, "request": { "http_method": http_method.lower(), "url": url, "query_params": query_params, }, }, ) time.sleep(waiting_time_seconds) return self._request_wrapper( http_method, url_path, query_params=query_params, header_params=header_params, request_body=request_body, raise_for_status=raise_for_status, ) response.encoding = "utf-8" content = self._response_to_content(response) # リトライすべき場合はExceptionを返す if raise_for_status or _should_retry_with_status(response.status_code): _log_error_response(logger, response) _raise_for_status(response) return content, response def _get_signed_cookie(self, project_id, query_params: dict[str, Any] | None = None) -> tuple[dict[str, Any], requests.Response]: # noqa: ANN001 """ アノテーション仕様の履歴情報を取得するために、非公開APIにアクセスする。 変更される可能性あり. Args: project_id: プロジェクトID Returns: tuple[Content, Response) """ url_path = f"/internal/projects/{project_id}/sign-headers" http_method = "GET" keyword_params: dict[str, Any] = {"query_params": query_params} return self._request_wrapper(http_method, url_path, **keyword_params) def _request_get_with_cookie(self, project_id: str, url: str) -> requests.Response: """ Signed Cookie を使って、AnnofabのURLにGET requestを投げる。 Args: project_id: プロジェクトID url: アクセス対象のURL Returns: Response """ # Sessionオブジェクトに保存されているCookieを利用して、URLにアクセスする response = self._execute_http_request("get", url, raise_for_status=False) # CloudFrontから403 Errorが発生したときは、別プロジェクトのcookieを渡している可能性があるので、 # Signed Cookieを発行して、再度リクエストを投げる if response.status_code == requests.codes.forbidden and response.headers.get("server") == "CloudFront": query_params = {} if "/input_data_set/" in url: query_params.update({"resource": "input_data_set"}) else: query_params.update({"resource": "project"}) _, r = self._get_signed_cookie(project_id, query_params=query_params) for cookie in r.cookies: self.session.cookies.set_cookie(cookie) response = self._execute_http_request("get", url) return response ######################################### # Public Method : Login ######################################### def _login_respond_to_auth_challenge(self, id_pass: IdPass, mfa_code: str, session: str) -> dict[str, Any]: """ MFAコードによるログインを実行します。 ``self.input_mfa_code_via_stdin`` が ``True`` AND ``mfa_code`` が正しくない場合は、標準入力から再度MFAコードの入力を求めます。 Args: mfa_code: MFAコード session: `login` APIのレスポンスに格納されている`session` Raises: InvalidMfaCodeError: ``self.input_mfa_code_via_stdin`` が ``False`` AND ``mfa_code`` が正しくない場合 """ request_body = {"user_id": id_pass.user_id, "mfa_code": mfa_code, "session": session} url = f"{self.url_prefix}/login-respond-to-auth-challenge" response = self._execute_http_request("post", url, json=request_body, raise_for_status=False) json_obj = response.json() # MFAコードが間違っているかどうかの判定が、メッセージでしかできなかったので、暫定的にメッセージで判定する if response.status_code == requests.codes.bad_request: assert len(json_obj["errors"]) > 0 error_message = json_obj["errors"][0]["message"] if error_message in {"検証コードが間違っています", "検証コードの期限が切れています"}: # 分かりやすいメッセージにするため「検証コード」を「MFAコード」に置き換える new_error_message = error_message.replace("検証コード", "MFAコード") if self.input_mfa_code_via_stdin: logger.info(new_error_message) new_mfa_code = _read_mfa_code_from_stdin() return self._login_respond_to_auth_challenge(id_pass, new_mfa_code, session) else: raise InvalidMfaCodeError(new_error_message) _log_error_response(logger, response) _raise_for_status(response) return response.json()
[docs] def login(self, mfa_code: str | None = None) -> None: """ ログインして、トークンをインスタンスに保持します。 MFAが有効化されている場合は、loginRespondToAuthChallenge APIを実行してトークンを取得します。 ``self.input_mfa_code_via_stdin == True`` の場合は、標準入力からMFAコードの入力を求めます。 Args: mfa_code: ``loginRespondToAuthChallenge``のレスポンスから取得したMFAコード。この引数はexperimentalです。将来削除される可能性があります。 Returns: tuple[Token, requests.Response] Raises: InvalidMfaCodeError: ``self.input_mfa_code_via_stdin`` が ``False`` AND ``mfa_code`` が正しくない場合 MfaEnabledUserExecutionError: ``self.input_mfa_code_via_stdin`` が ``False`` AND ``mfa_code`` が未指定の場合 """ if isinstance(self.credentials, IdPass): self._login_id_pass(self.credentials, mfa_code) elif isinstance(self.credentials, Pat): self._login_pat(self.credentials) else: assert_noreturn(self.credentials)
def _login_id_pass(self, id_pass: IdPass, mfa_code: str | None = None) -> None: login_info = {"user_id": id_pass.user_id, "password": id_pass.password} url = f"{self.url_prefix}/login" login_response = self._execute_http_request("post", url, json=login_info) login_json_obj = login_response.json() if "token" not in login_json_obj: # `login` APIのレスポンスのスキーマがLoginNeedChallengeResponseのとき if mfa_code is None: if self.input_mfa_code_via_stdin: mfa_code = _read_mfa_code_from_stdin() else: raise MfaEnabledUserExecutionError(id_pass.user_id) mfa_json_obj = self._login_respond_to_auth_challenge(id_pass, mfa_code, login_json_obj["session"]) token_dict = mfa_json_obj["token"] else: # `login` APIのレスポンスのスキーマがloginRespondToAuthChallengeのとき token_dict = login_json_obj["token"] self.tokens = Tokens.from_dict(token_dict) logger.debug("Logged in successfully. user_id = %s", id_pass.user_id) def _login_pat(self, pat: Pat) -> None: self.tokens = pat
[docs] def logout(self) -> None: """ ログアウトします。 ログアウト後は、インスタンス変数 ``tokens`` をNoneにします。 Returns: tuple[Token, requests.Response] Raises: NotLoggedInError: ログインしてない状態で関数を呼び出したときのエラー """ if self.tokens is None: raise NotLoggedInError if isinstance(self.tokens, Pat): self.tokens = None return request_body = self.tokens.to_dict() url = f"{self.url_prefix}/logout" self._execute_http_request("POST", url, json=request_body) self.tokens = None
[docs] def refresh_token(self) -> None: """ トークンを再発行して、新しいトークン情報をインスタンスに保持します。 パーソナルアクセストークンでのアクセスをしている場合はrefreshを行いません。 ログインしていない場合やリフレッシュトークンの有効期限が切れている場合は、login APIを実行して新しいトークン情報をインスタンスに保持します。 """ if self.tokens is None: # 一度もログインしていないときは、login APIを実行して、トークン情報をインスタンスに保持する(login関数内でインスタンスに保持している) self.login() return if isinstance(self.tokens, Pat): return request_body = {"refresh_token": self.tokens.refresh_token} url = f"{self.url_prefix}/refresh-token" response = self._execute_http_request("POST", url, json=request_body) # Unauthorized Errorならば、login APIを実行して、取得したトークン情報をインスタンスに保持する if response.status_code == requests.codes.unauthorized: self.login() return self.tokens = Tokens.from_dict(response.json())
######################################### # Public Method : Other ######################################### @property def account_id(self) -> str: """ Annofabにログインするユーザのaccount_id """ if self.__account_id is not None: return self.__account_id else: content, _ = self.get_my_account() account_id = content["account_id"] self.__account_id = account_id return account_id @property def login_user_id(self) -> str: """ Annofabにログインするユーザのuser_id """ if self.__user_id is not None: return self.__user_id if isinstance(self.credentials, IdPass): self.__user_id = self.credentials.user_id return self.__user_id else: content, _ = self.get_my_account() user_id = content["user_id"] self.__user_id = user_id return user_id