# Copyright (c) Microsoft Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import base64
import inspect
import json
import json as json_utils
import mimetypes
import re
from collections import defaultdict
from pathlib import Path
from types import SimpleNamespace
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Coroutine,
    Dict,
    List,
    Optional,
    TypedDict,
    Union,
    cast,
)
from urllib import parse

from playwright._impl._api_structures import (
    ClientCertificate,
    Headers,
    HeadersArray,
    RemoteAddr,
    RequestSizes,
    ResourceTiming,
    SecurityDetails,
)
from playwright._impl._connection import (
    ChannelOwner,
    from_channel,
    from_nullable_channel,
)
from playwright._impl._errors import Error
from playwright._impl._event_context_manager import EventContextManagerImpl
from playwright._impl._helper import (
    URLMatch,
    WebSocketRouteHandlerCallback,
    async_readfile,
    locals_to_params,
    url_matches,
)
from playwright._impl._str_utils import escape_regex_flags
from playwright._impl._waiter import Waiter

if TYPE_CHECKING:  # pragma: no cover
    from playwright._impl._browser_context import BrowserContext
    from playwright._impl._fetch import APIResponse
    from playwright._impl._frame import Frame
    from playwright._impl._page import Page


class FallbackOverrideParameters(TypedDict, total=False):
    url: Optional[str]
    method: Optional[str]
    headers: Optional[Dict[str, str]]
    postData: Optional[Union[str, bytes]]


class SerializedFallbackOverrides:
    def __init__(self) -> None:
        self.url: Optional[str] = None
        self.method: Optional[str] = None
        self.headers: Optional[Dict[str, str]] = None
        self.post_data_buffer: Optional[bytes] = None


def serialize_headers(headers: Dict[str, str]) -> HeadersArray:
    return [
        {"name": name, "value": value}
        for name, value in headers.items()
        if value is not None
    ]


async def to_client_certificates_protocol(
    clientCertificates: Optional[List[ClientCertificate]],
) -> Optional[List[Dict[str, str]]]:
    if not clientCertificates:
        return None
    out = []
    for clientCertificate in clientCertificates:
        out_record = {
            "origin": clientCertificate["origin"],
        }
        if passphrase := clientCertificate.get("passphrase"):
            out_record["passphrase"] = passphrase
        if pfx := clientCertificate.get("pfx"):
            out_record["pfx"] = base64.b64encode(pfx).decode()
        if pfx_path := clientCertificate.get("pfxPath"):
            out_record["pfx"] = base64.b64encode(
                await async_readfile(pfx_path)
            ).decode()
        if cert := clientCertificate.get("cert"):
            out_record["cert"] = base64.b64encode(cert).decode()
        if cert_path := clientCertificate.get("certPath"):
            out_record["cert"] = base64.b64encode(
                await async_readfile(cert_path)
            ).decode()
        if key := clientCertificate.get("key"):
            out_record["key"] = base64.b64encode(key).decode()
        if key_path := clientCertificate.get("keyPath"):
            out_record["key"] = base64.b64encode(
                await async_readfile(key_path)
            ).decode()
        out.append(out_record)
    return out


class Request(ChannelOwner):
    def __init__(
        self, parent: ChannelOwner, type: str, guid: str, initializer: Dict
    ) -> None:
        super().__init__(parent, type, guid, initializer)
        self._channel.mark_as_internal_type()
        self._redirected_from: Optional["Request"] = from_nullable_channel(
            initializer.get("redirectedFrom")
        )
        self._redirected_to: Optional["Request"] = None
        if self._redirected_from:
            self._redirected_from._redirected_to = self
        self._failure_text: Optional[str] = None
        self._timing: ResourceTiming = {
            "startTime": 0,
            "domainLookupStart": -1,
            "domainLookupEnd": -1,
            "connectStart": -1,
            "secureConnectionStart": -1,
            "connectEnd": -1,
            "requestStart": -1,
            "responseStart": -1,
            "responseEnd": -1,
        }
        self._provisional_headers = RawHeaders(self._initializer["headers"])
        self._all_headers_future: Optional[asyncio.Future[RawHeaders]] = None
        self._fallback_overrides: SerializedFallbackOverrides = (
            SerializedFallbackOverrides()
        )

    def __repr__(self) -> str:
        return f"<Request url={self.url!r} method={self.method!r}>"

    def _apply_fallback_overrides(self, overrides: FallbackOverrideParameters) -> None:
        self._fallback_overrides.url = overrides.get(
            "url", self._fallback_overrides.url
        )
        self._fallback_overrides.method = overrides.get(
            "method", self._fallback_overrides.method
        )
        self._fallback_overrides.headers = overrides.get(
            "headers", self._fallback_overrides.headers
        )
        post_data = overrides.get("postData")
        if isinstance(post_data, str):
            self._fallback_overrides.post_data_buffer = post_data.encode()
        elif isinstance(post_data, bytes):
            self._fallback_overrides.post_data_buffer = post_data
        elif post_data is not None:
            self._fallback_overrides.post_data_buffer = json.dumps(post_data).encode()

    @property
    def url(self) -> str:
        return cast(str, self._fallback_overrides.url or self._initializer["url"])

    @property
    def resource_type(self) -> str:
        return self._initializer["resourceType"]

    @property
    def method(self) -> str:
        return cast(str, self._fallback_overrides.method or self._initializer["method"])

    async def sizes(self) -> RequestSizes:
        response = await self.response()
        if not response:
            raise Error("Unable to fetch sizes for failed request")
        return await response._channel.send("sizes")

    @property
    def post_data(self) -> Optional[str]:
        data = self._fallback_overrides.post_data_buffer
        if data:
            return data.decode()
        base64_post_data = self._initializer.get("postData")
        if base64_post_data is not None:
            return base64.b64decode(base64_post_data).decode()
        return None

    @property
    def post_data_json(self) -> Optional[Any]:
        post_data = self.post_data
        if not post_data:
            return None
        content_type = self.headers["content-type"]
        if "application/x-www-form-urlencoded" in content_type:
            return dict(parse.parse_qsl(post_data))
        try:
            return json.loads(post_data)
        except Exception:
            raise Error(f"POST data is not a valid JSON object: {post_data}")

    @property
    def post_data_buffer(self) -> Optional[bytes]:
        if self._fallback_overrides.post_data_buffer:
            return self._fallback_overrides.post_data_buffer
        if self._initializer.get("postData"):
            return base64.b64decode(self._initializer["postData"])
        return None

    async def response(self) -> Optional["Response"]:
        return from_nullable_channel(await self._channel.send("response"))

    @property
    def frame(self) -> "Frame":
        if not self._initializer.get("frame"):
            raise Error("Service Worker requests do not have an associated frame.")
        frame = cast("Frame", from_channel(self._initializer["frame"]))
        if not frame._page:
            raise Error(
                "\n".join(
                    [
                        "Frame for this navigation request is not available, because the request",
                        "was issued before the frame is created. You can check whether the request",
                        "is a navigation request by calling isNavigationRequest() method.",
                    ]
                )
            )
        return frame

    def is_navigation_request(self) -> bool:
        return self._initializer["isNavigationRequest"]

    @property
    def redirected_from(self) -> Optional["Request"]:
        return self._redirected_from

    @property
    def redirected_to(self) -> Optional["Request"]:
        return self._redirected_to

    @property
    def failure(self) -> Optional[str]:
        return self._failure_text

    @property
    def timing(self) -> ResourceTiming:
        return self._timing

    def _set_response_end_timing(self, response_end_timing: float) -> None:
        self._timing["responseEnd"] = response_end_timing
        if self._timing["responseStart"] == -1:
            self._timing["responseStart"] = response_end_timing

    @property
    def headers(self) -> Headers:
        override = self._fallback_overrides.headers
        if override:
            return RawHeaders._from_headers_dict_lossy(override).headers()
        return self._provisional_headers.headers()

    async def all_headers(self) -> Headers:
        return (await self._actual_headers()).headers()

    async def headers_array(self) -> HeadersArray:
        return (await self._actual_headers()).headers_array()

    async def header_value(self, name: str) -> Optional[str]:
        return (await self._actual_headers()).get(name)

    async def _actual_headers(self) -> "RawHeaders":
        override = self._fallback_overrides.headers
        if override:
            return RawHeaders(serialize_headers(override))
        if not self._all_headers_future:
            self._all_headers_future = asyncio.Future()
            headers = await self._channel.send("rawRequestHeaders")
            self._all_headers_future.set_result(RawHeaders(headers))
        return await self._all_headers_future

    def _target_closed_future(self) -> asyncio.Future:
        frame = cast(
            Optional["Frame"], from_nullable_channel(self._initializer.get("frame"))
        )
        if not frame:
            return asyncio.Future()
        page = frame._page
        if not page:
            return asyncio.Future()
        return page._closed_or_crashed_future

    def _safe_page(self) -> "Optional[Page]":
        frame = from_nullable_channel(self._initializer.get("frame"))
        if not frame:
            return None
        return cast("Frame", frame)._page


class Route(ChannelOwner):
    def __init__(
        self, parent: ChannelOwner, type: str, guid: str, initializer: Dict
    ) -> None:
        super().__init__(parent, type, guid, initializer)
        self._channel.mark_as_internal_type()
        self._handling_future: Optional[asyncio.Future["bool"]] = None
        self._context: "BrowserContext" = cast("BrowserContext", None)
        self._did_throw = False

    def _start_handling(self) -> "asyncio.Future[bool]":
        self._handling_future = asyncio.Future()
        return self._handling_future

    def _report_handled(self, done: bool) -> None:
        chain = self._handling_future
        assert chain
        self._handling_future = None
        chain.set_result(done)

    def _check_not_handled(self) -> None:
        if not self._handling_future:
            raise Error("Route is already handled!")

    def __repr__(self) -> str:
        return f"<Route request={self.request}>"

    @property
    def request(self) -> Request:
        return from_channel(self._initializer["request"])

    async def abort(self, errorCode: str = None) -> None:
        await self._handle_route(
            lambda: self._race_with_page_close(
                self._channel.send(
                    "abort",
                    {
                        "errorCode": errorCode,
                    },
                )
            )
        )

    async def fulfill(
        self,
        status: int = None,
        headers: Dict[str, str] = None,
        body: Union[str, bytes] = None,
        json: Any = None,
        path: Union[str, Path] = None,
        contentType: str = None,
        response: "APIResponse" = None,
    ) -> None:
        await self._handle_route(
            lambda: self._inner_fulfill(
                status, headers, body, json, path, contentType, response
            )
        )

    async def _inner_fulfill(
        self,
        status: int = None,
        headers: Dict[str, str] = None,
        body: Union[str, bytes] = None,
        json: Any = None,
        path: Union[str, Path] = None,
        contentType: str = None,
        response: "APIResponse" = None,
    ) -> None:
        params = locals_to_params(locals())

        if json is not None:
            if body is not None:
                raise Error("Can specify either body or json parameters")
            body = json_utils.dumps(json)

        if response:
            del params["response"]
            params["status"] = (
                params["status"] if params.get("status") else response.status
            )
            params["headers"] = (
                params["headers"] if params.get("headers") else response.headers
            )
            from playwright._impl._fetch import APIResponse

            if body is None and path is None and isinstance(response, APIResponse):
                if response._request._connection is self._connection:
                    params["fetchResponseUid"] = response._fetch_uid
                else:
                    body = await response.body()

        length = 0
        if isinstance(body, str):
            params["body"] = body
            params["isBase64"] = False
            length = len(body.encode())
        elif isinstance(body, bytes):
            params["body"] = base64.b64encode(body).decode()
            params["isBase64"] = True
            length = len(body)
        elif path:
            del params["path"]
            file_content = Path(path).read_bytes()
            params["body"] = base64.b64encode(file_content).decode()
            params["isBase64"] = True
            length = len(file_content)

        headers = {k.lower(): str(v) for k, v in params.get("headers", {}).items()}
        if params.get("contentType"):
            headers["content-type"] = params["contentType"]
        elif json:
            headers["content-type"] = "application/json"
        elif path:
            headers["content-type"] = (
                mimetypes.guess_type(str(Path(path)))[0] or "application/octet-stream"
            )
        if length and "content-length" not in headers:
            headers["content-length"] = str(length)
        params["headers"] = serialize_headers(headers)

        await self._race_with_page_close(self._channel.send("fulfill", params))

    async def _handle_route(self, callback: Callable) -> None:
        self._check_not_handled()
        try:
            await callback()
            self._report_handled(True)
        except Exception as e:
            self._did_throw = True
            raise e

    async def fetch(
        self,
        url: str = None,
        method: str = None,
        headers: Dict[str, str] = None,
        postData: Union[Any, str, bytes] = None,
        maxRedirects: int = None,
        maxRetries: int = None,
        timeout: float = None,
    ) -> "APIResponse":
        return await self._connection.wrap_api_call(
            lambda: self._context.request._inner_fetch(
                self.request,
                url,
                method,
                headers,
                postData,
                maxRedirects=maxRedirects,
                maxRetries=maxRetries,
                timeout=timeout,
            )
        )

    async def fallback(
        self,
        url: str = None,
        method: str = None,
        headers: Dict[str, str] = None,
        postData: Union[Any, str, bytes] = None,
    ) -> None:
        overrides = cast(FallbackOverrideParameters, locals_to_params(locals()))
        self._check_not_handled()
        self.request._apply_fallback_overrides(overrides)
        self._report_handled(False)

    async def continue_(
        self,
        url: str = None,
        method: str = None,
        headers: Dict[str, str] = None,
        postData: Union[Any, str, bytes] = None,
    ) -> None:
        overrides = cast(FallbackOverrideParameters, locals_to_params(locals()))

        async def _inner() -> None:
            self.request._apply_fallback_overrides(overrides)
            await self._inner_continue(False)

        return await self._handle_route(_inner)

    async def _inner_continue(self, is_fallback: bool = False) -> None:
        options = self.request._fallback_overrides
        await self._race_with_page_close(
            self._channel.send(
                "continue",
                {
                    "url": options.url,
                    "method": options.method,
                    "headers": (
                        serialize_headers(options.headers) if options.headers else None
                    ),
                    "postData": (
                        base64.b64encode(options.post_data_buffer).decode()
                        if options.post_data_buffer is not None
                        else None
                    ),
                    "isFallback": is_fallback,
                },
            )
        )

    async def _redirected_navigation_request(self, url: str) -> None:
        await self._handle_route(
            lambda: self._race_with_page_close(
                self._channel.send("redirectNavigationRequest", {"url": url})
            )
        )

    async def _race_with_page_close(self, future: Coroutine) -> None:
        fut = asyncio.create_task(future)
        # Rewrite the user's stack to the new task which runs in the background.
        setattr(
            fut,
            "__pw_stack__",
            getattr(asyncio.current_task(self._loop), "__pw_stack__", inspect.stack()),
        )
        target_closed_future = self.request._target_closed_future()
        await asyncio.wait(
            [fut, target_closed_future],
            return_when=asyncio.FIRST_COMPLETED,
        )
        if fut.done() and fut.exception():
            raise cast(BaseException, fut.exception())
        if target_closed_future.done():
            await asyncio.gather(fut, return_exceptions=True)


def _create_task_and_ignore_exception(
    loop: asyncio.AbstractEventLoop, coro: Coroutine
) -> None:
    async def _ignore_exception() -> None:
        try:
            await coro
        except Exception:
            pass

    loop.create_task(_ignore_exception())


class ServerWebSocketRoute:
    def __init__(self, ws: "WebSocketRoute"):
        self._ws = ws

    def on_message(self, handler: Callable[[Union[str, bytes]], Any]) -> None:
        self._ws._on_server_message = handler

    def on_close(self, handler: Callable[[Optional[int], Optional[str]], Any]) -> None:
        self._ws._on_server_close = handler

    def connect_to_server(self) -> None:
        raise NotImplementedError(
            "connectToServer must be called on the page-side WebSocketRoute"
        )

    @property
    def url(self) -> str:
        return self._ws._initializer["url"]

    def close(self, code: int = None, reason: str = None) -> None:
        _create_task_and_ignore_exception(
            self._ws._loop,
            self._ws._channel.send(
                "closeServer",
                {
                    "code": code,
                    "reason": reason,
                    "wasClean": True,
                },
            ),
        )

    def send(self, message: Union[str, bytes]) -> None:
        if isinstance(message, str):
            _create_task_and_ignore_exception(
                self._ws._loop,
                self._ws._channel.send(
                    "sendToServer", {"message": message, "isBase64": False}
                ),
            )
        else:
            _create_task_and_ignore_exception(
                self._ws._loop,
                self._ws._channel.send(
                    "sendToServer",
                    {"message": base64.b64encode(message).decode(), "isBase64": True},
                ),
            )


class WebSocketRoute(ChannelOwner):
    def __init__(
        self, parent: ChannelOwner, type: str, guid: str, initializer: Dict
    ) -> None:
        super().__init__(parent, type, guid, initializer)
        self._channel.mark_as_internal_type()
        self._on_page_message: Optional[Callable[[Union[str, bytes]], Any]] = None
        self._on_page_close: Optional[Callable[[Optional[int], Optional[str]], Any]] = (
            None
        )
        self._on_server_message: Optional[Callable[[Union[str, bytes]], Any]] = None
        self._on_server_close: Optional[
            Callable[[Optional[int], Optional[str]], Any]
        ] = None
        self._server = ServerWebSocketRoute(self)
        self._connected = False

        self._channel.on("messageFromPage", self._channel_message_from_page)
        self._channel.on("messageFromServer", self._channel_message_from_server)
        self._channel.on("closePage", self._channel_close_page)
        self._channel.on("closeServer", self._channel_close_server)

    def _channel_message_from_page(self, event: Dict) -> None:
        if self._on_page_message:
            self._on_page_message(
                base64.b64decode(event["message"])
                if event["isBase64"]
                else event["message"]
            )
        elif self._connected:
            _create_task_and_ignore_exception(
                self._loop, self._channel.send("sendToServer", event)
            )

    def _channel_message_from_server(self, event: Dict) -> None:
        if self._on_server_message:
            self._on_server_message(
                base64.b64decode(event["message"])
                if event["isBase64"]
                else event["message"]
            )
        else:
            _create_task_and_ignore_exception(
                self._loop, self._channel.send("sendToPage", event)
            )

    def _channel_close_page(self, event: Dict) -> None:
        if self._on_page_close:
            self._on_page_close(event["code"], event["reason"])
        else:
            _create_task_and_ignore_exception(
                self._loop, self._channel.send("closeServer", event)
            )

    def _channel_close_server(self, event: Dict) -> None:
        if self._on_server_close:
            self._on_server_close(event["code"], event["reason"])
        else:
            _create_task_and_ignore_exception(
                self._loop, self._channel.send("closePage", event)
            )

    @property
    def url(self) -> str:
        return self._initializer["url"]

    async def close(self, code: int = None, reason: str = None) -> None:
        try:
            await self._channel.send(
                "closePage", {"code": code, "reason": reason, "wasClean": True}
            )
        except Exception:
            pass

    def connect_to_server(self) -> "WebSocketRoute":
        if self._connected:
            raise Error("Already connected to the server")
        self._connected = True
        asyncio.create_task(self._channel.send("connect"))
        return cast("WebSocketRoute", self._server)

    def send(self, message: Union[str, bytes]) -> None:
        if isinstance(message, str):
            _create_task_and_ignore_exception(
                self._loop,
                self._channel.send(
                    "sendToPage", {"message": message, "isBase64": False}
                ),
            )
        else:
            _create_task_and_ignore_exception(
                self._loop,
                self._channel.send(
                    "sendToPage",
                    {
                        "message": base64.b64encode(message).decode(),
                        "isBase64": True,
                    },
                ),
            )

    def on_message(self, handler: Callable[[Union[str, bytes]], Any]) -> None:
        self._on_page_message = handler

    def on_close(self, handler: Callable[[Optional[int], Optional[str]], Any]) -> None:
        self._on_page_close = handler

    async def _after_handle(self) -> None:
        if self._connected:
            return
        # Ensure that websocket is "open" and can send messages without an actual server connection.
        await self._channel.send("ensureOpened")


class WebSocketRouteHandler:
    def __init__(
        self,
        base_url: Optional[str],
        url: URLMatch,
        handler: WebSocketRouteHandlerCallback,
    ):
        self._base_url = base_url
        self.url = url
        self.handler = handler

    @staticmethod
    def prepare_interception_patterns(
        handlers: List["WebSocketRouteHandler"],
    ) -> List[dict]:
        patterns = []
        all_urls = False
        for handler in handlers:
            if isinstance(handler.url, str):
                patterns.append({"glob": handler.url})
            elif isinstance(handler.url, re.Pattern):
                patterns.append(
                    {
                        "regexSource": handler.url.pattern,
                        "regexFlags": escape_regex_flags(handler.url),
                    }
                )
            else:
                all_urls = True

        if all_urls:
            return [{"glob": "**/*"}]
        return patterns

    def matches(self, ws_url: str) -> bool:
        return url_matches(self._base_url, ws_url, self.url, True)

    async def handle(self, websocket_route: "WebSocketRoute") -> None:
        coro_or_future = self.handler(websocket_route)
        if asyncio.iscoroutine(coro_or_future):
            await coro_or_future
        await websocket_route._after_handle()


class Response(ChannelOwner):
    def __init__(
        self, parent: ChannelOwner, type: str, guid: str, initializer: Dict
    ) -> None:
        super().__init__(parent, type, guid, initializer)
        self._channel.mark_as_internal_type()
        self._request: Request = from_channel(self._initializer["request"])
        timing = self._initializer["timing"]
        self._request._timing["startTime"] = timing["startTime"]
        self._request._timing["domainLookupStart"] = timing["domainLookupStart"]
        self._request._timing["domainLookupEnd"] = timing["domainLookupEnd"]
        self._request._timing["connectStart"] = timing["connectStart"]
        self._request._timing["secureConnectionStart"] = timing["secureConnectionStart"]
        self._request._timing["connectEnd"] = timing["connectEnd"]
        self._request._timing["requestStart"] = timing["requestStart"]
        self._request._timing["responseStart"] = timing["responseStart"]
        self._provisional_headers = RawHeaders(
            cast(HeadersArray, self._initializer["headers"])
        )
        self._raw_headers_future: Optional[asyncio.Future[RawHeaders]] = None
        self._finished_future: asyncio.Future[bool] = asyncio.Future()

    def __repr__(self) -> str:
        return f"<Response url={self.url!r} request={self.request}>"

    @property
    def url(self) -> str:
        return self._initializer["url"]

    @property
    def ok(self) -> bool:
        # Status 0 is for file:// URLs
        return self._initializer["status"] == 0 or (
            self._initializer["status"] >= 200 and self._initializer["status"] <= 299
        )

    @property
    def status(self) -> int:
        return self._initializer["status"]

    @property
    def status_text(self) -> str:
        return self._initializer["statusText"]

    @property
    def headers(self) -> Headers:
        return self._provisional_headers.headers()

    @property
    def from_service_worker(self) -> bool:
        return self._initializer["fromServiceWorker"]

    async def all_headers(self) -> Headers:
        return (await self._actual_headers()).headers()

    async def headers_array(self) -> HeadersArray:
        return (await self._actual_headers()).headers_array()

    async def header_value(self, name: str) -> Optional[str]:
        return (await self._actual_headers()).get(name)

    async def header_values(self, name: str) -> List[str]:
        return (await self._actual_headers()).get_all(name)

    async def _actual_headers(self) -> "RawHeaders":
        if not self._raw_headers_future:
            self._raw_headers_future = asyncio.Future()
            headers = cast(HeadersArray, await self._channel.send("rawResponseHeaders"))
            self._raw_headers_future.set_result(RawHeaders(headers))
        return await self._raw_headers_future

    async def server_addr(self) -> Optional[RemoteAddr]:
        return await self._channel.send("serverAddr")

    async def security_details(self) -> Optional[SecurityDetails]:
        return await self._channel.send("securityDetails")

    async def finished(self) -> None:
        async def on_finished() -> None:
            await self._request._target_closed_future()
            raise Error("Target closed")

        on_finished_task = asyncio.create_task(on_finished())
        await asyncio.wait(
            cast(
                List[Union[asyncio.Task, asyncio.Future]],
                [self._finished_future, on_finished_task],
            ),
            return_when=asyncio.FIRST_COMPLETED,
        )
        if on_finished_task.done():
            await on_finished_task

    async def body(self) -> bytes:
        binary = await self._channel.send("body")
        return base64.b64decode(binary)

    async def text(self) -> str:
        content = await self.body()
        return content.decode()

    async def json(self) -> Any:
        return json.loads(await self.text())

    @property
    def request(self) -> Request:
        return self._request

    @property
    def frame(self) -> "Frame":
        return self._request.frame


class WebSocket(ChannelOwner):
    Events = SimpleNamespace(
        Close="close",
        FrameReceived="framereceived",
        FrameSent="framesent",
        Error="socketerror",
    )

    def __init__(
        self, parent: ChannelOwner, type: str, guid: str, initializer: Dict
    ) -> None:
        super().__init__(parent, type, guid, initializer)
        self._is_closed = False
        self._page = cast("Page", parent)
        self._channel.on(
            "frameSent",
            lambda params: self._on_frame_sent(params["opcode"], params["data"]),
        )
        self._channel.on(
            "frameReceived",
            lambda params: self._on_frame_received(params["opcode"], params["data"]),
        )
        self._channel.on(
            "socketError",
            lambda params: self.emit(WebSocket.Events.Error, params["error"]),
        )
        self._channel.on("close", lambda params: self._on_close())

    def __repr__(self) -> str:
        return f"<WebSocket url={self.url!r}>"

    @property
    def url(self) -> str:
        return self._initializer["url"]

    def expect_event(
        self,
        event: str,
        predicate: Callable = None,
        timeout: float = None,
    ) -> EventContextManagerImpl:
        if timeout is None:
            timeout = cast(Any, self._parent)._timeout_settings.timeout()
        waiter = Waiter(self, f"web_socket.expect_event({event})")
        waiter.reject_on_timeout(
            cast(float, timeout),
            f'Timeout {timeout}ms exceeded while waiting for event "{event}"',
        )
        if event != WebSocket.Events.Close:
            waiter.reject_on_event(self, WebSocket.Events.Close, Error("Socket closed"))
        if event != WebSocket.Events.Error:
            waiter.reject_on_event(self, WebSocket.Events.Error, Error("Socket error"))
        waiter.reject_on_event(
            self._page, "close", lambda: self._page._close_error_with_reason()
        )
        waiter.wait_for_event(self, event, predicate)
        return EventContextManagerImpl(waiter.result())

    async def wait_for_event(
        self, event: str, predicate: Callable = None, timeout: float = None
    ) -> Any:
        async with self.expect_event(event, predicate, timeout) as event_info:
            pass
        return await event_info

    def _on_frame_sent(self, opcode: int, data: str) -> None:
        if opcode == 2:
            self.emit(WebSocket.Events.FrameSent, base64.b64decode(data))
        elif opcode == 1:
            self.emit(WebSocket.Events.FrameSent, data)

    def _on_frame_received(self, opcode: int, data: str) -> None:
        if opcode == 2:
            self.emit(WebSocket.Events.FrameReceived, base64.b64decode(data))
        elif opcode == 1:
            self.emit(WebSocket.Events.FrameReceived, data)

    def is_closed(self) -> bool:
        return self._is_closed

    def _on_close(self) -> None:
        self._is_closed = True
        self.emit(WebSocket.Events.Close, self)


class RawHeaders:
    def __init__(self, headers: HeadersArray) -> None:
        self._headers_array = headers
        self._headers_map: Dict[str, Dict[str, bool]] = defaultdict(dict)
        for header in headers:
            self._headers_map[header["name"].lower()][header["value"]] = True

    @staticmethod
    def _from_headers_dict_lossy(headers: Dict[str, str]) -> "RawHeaders":
        return RawHeaders(serialize_headers(headers))

    def get(self, name: str) -> Optional[str]:
        values = self.get_all(name)
        if not values:
            return None
        separator = "\n" if name.lower() == "set-cookie" else ", "
        return separator.join(values)

    def get_all(self, name: str) -> List[str]:
        return list(self._headers_map[name.lower()].keys())

    def headers(self) -> Dict[str, str]:
        result = {}
        for name in self._headers_map.keys():
            result[name] = cast(str, self.get(name))
        return result

    def headers_array(self) -> HeadersArray:
        return self._headers_array
