Source code for annofabapi.api2

import logging
import time
from typing import Any, Dict, Optional, Tuple

import requests
from requests.cookies import RequestsCookieJar

import annofabapi.utils
from annofabapi.api import (
    DEFAULT_WAITING_TIME_SECONDS_WITH_429_STATUS_CODE,
    AnnofabApi,
    _create_request_body_for_logger,
    _log_error_response,
    _raise_for_status,
    _should_retry_with_status,
)
from annofabapi.generated_api2 import AbstractAnnofabApi2

logger = logging.getLogger(__name__)


[docs]class AnnofabApi2(AbstractAnnofabApi2): """ Web API v2に対応したメソッドが存在するクラス。 Note: 開発途上版のため、互換性のない変更がある可能性があります。 Args: api: API v1のインスタンス(一部のAPIは、v1のログインメソッドを利用するため) """ def __init__(self, api: AnnofabApi) -> None: self.api = api self.url_prefix = f"{api.endpoint_url}/api/v2" #: Signed Cookie情報 cookies: Optional[RequestsCookieJar] = None ######################################### # Private Method ######################################### @annofabapi.api.my_backoff def _request_wrapper( self, http_method: str, url_path: str, *, query_params: Optional[Dict[str, Any]] = None, header_params: Optional[Dict[str, Any]] = None, request_body: Optional[Any] = None, # noqa: ANN401 raise_for_status: bool = True, ) -> Tuple[Any, requests.Response]: """ HTTP Requestを投げて、Responseを返す。 Args: http_method: url_path: query_params: header_params: request_body: raise_for_status: Trueの場合HTTP Status Codeが4XX,5XXのときはHTTPErrorをスローします。Falseの場合はtuple[None, Response]を返します。 Returns: Tuple[content, Response]. contentはcontent_typeにより型が変わる。 application/jsonならDict型, text/*ならばstr型, それ以外ならばbite型。 """ url = f"{self.url_prefix}{url_path}" kwargs = self.api._create_kwargs(query_params, header_params) if url_path == "/sign-url": # HTTP Requestを投げる response = getattr(self.api.session, http_method.lower())(url, **kwargs) # Unauthorized Errorならば、ログイン後に再度実行する if response.status_code == requests.codes.unauthorized: self.api.refresh_token() return self._request_wrapper( http_method, url_path, query_params=query_params, header_params=header_params, request_body=request_body, raise_for_status=raise_for_status, ) else: kwargs.update({"cookies": self.cookies}) # HTTP Requestを投げる response = self.api.session.request(method=http_method.lower(), url=url, **kwargs) logger.debug( "Sent a request :: %s", { "request": { "http_method": http_method.lower(), "url": url, "query_params": query_params, "header_params": header_params, "request_body": _create_request_body_for_logger(request_body) if request_body is not None else None, }, "response": { "status_code": response.status_code, "content_length": len(response.content), }, }, ) # CloudFrontから403 Errorが発生したとき if response.status_code == requests.codes.forbidden and response.headers.get("server") == "CloudFront": self._get_signed_access_v2(url_path) return self._request_wrapper( http_method, url_path, query_params=query_params, header_params=header_params, request_body=request_body, raise_for_status=raise_for_status, ) elif response.status_code == requests.codes.too_many_requests: retry_after_value = response.headers.get("Retry-After") waiting_time_seconds = ( float(retry_after_value) if retry_after_value is not None else DEFAULT_WAITING_TIME_SECONDS_WITH_429_STATUS_CODE ) logger.warning( "HTTPステータスコードが'%s'なので、%s秒待ってからリトライします。 :: %s", response.status_code, waiting_time_seconds, { "response": { "status_code": response.status_code, "text": response.text, "headers": {"Retry-After": retry_after_value}, }, "request": { "http_method": http_method.lower(), "url": url, "query_params": query_params, }, }, ) time.sleep(waiting_time_seconds) return self._request_wrapper( http_method, url_path, query_params=query_params, header_params=header_params, request_body=request_body, raise_for_status=raise_for_status, ) response.encoding = "utf-8" content = self.api._response_to_content(response) # リトライすべき場合はExceptionを返す if raise_for_status or _should_retry_with_status(response.status_code): _log_error_response(logger, response) _raise_for_status(response) return content, response def _get_signed_access_v2(self, url_path: str): query_params = {"url": f"/api/v2{url_path}"} self.get_signed_access_v2(query_params) ######################################### # Public Method : Cache #########################################
[docs] def get_signed_access_v2(self, query_params: Dict[str, Any]) -> Tuple[Dict[str, Any], requests.Response]: """ Signed Cookieを取得して、インスタンスに保持する。 Args: query_params (Dict[str, Any]): Query Parameters url (str): アクセスするページのURL Returns: Tuple[SignedCookie, requests.Response] """ url_path = "/sign-url" http_method = "GET" keyword_params: Dict[str, Any] = { "query_params": query_params, } content, response = self._request_wrapper(http_method, url_path, **keyword_params) # Signed Cookieをインスタンスに保持する self.cookies = response.cookies return content, response