|
26 | 26 | "ChatCompletionsRequestHandler", |
27 | 27 | "OpenAIRequestHandler", |
28 | 28 | "OpenAIRequestHandlerFactory", |
| 29 | + "ResponsesRequestHandler", |
29 | 30 | "TextCompletionsRequestHandler", |
30 | 31 | ] |
31 | 32 |
|
@@ -711,3 +712,250 @@ def extract_metrics( |
711 | 712 | text_words=len(text.split()) if text else 0, |
712 | 713 | text_characters=len(text) if text else 0, |
713 | 714 | ) |
| 715 | + |
| 716 | + |
| 717 | +@OpenAIRequestHandlerFactory.register("/v1/responses") |
| 718 | +class ResponsesRequestHandler(OpenAIRequestHandler): |
| 719 | + """ |
| 720 | + Request handler for the OpenAI Responses API endpoint. |
| 721 | +
|
| 722 | + Handles the /v1/responses format which uses `input` instead of `messages`, |
| 723 | + `instructions` for system prompts, and a different response/streaming shape |
| 724 | + than chat completions. Supports both streaming and non-streaming responses. |
| 725 | + """ |
| 726 | + |
| 727 | + def __init__(self): |
| 728 | + self.streaming_texts: list[str] = [] |
| 729 | + self.streaming_usage: dict[str, int | dict[str, int]] | None = None |
| 730 | + self.streaming_response_id: str | None = None |
| 731 | + |
| 732 | + def _format_prompts( |
| 733 | + self, column_data: list, column_type: str |
| 734 | + ) -> list[dict[str, Any]]: |
| 735 | + formatted_data: list[dict[str, Any]] = [] |
| 736 | + for item in column_data: |
| 737 | + if column_type == "text_column": |
| 738 | + formatted_data.append({"type": "input_text", "text": item}) |
| 739 | + elif column_type == "image_column": |
| 740 | + formatted_data.append( |
| 741 | + { |
| 742 | + "type": "input_image", |
| 743 | + "image_url": item.get("image"), |
| 744 | + } |
| 745 | + ) |
| 746 | + elif column_type == "audio_column": |
| 747 | + formatted_data.append( |
| 748 | + { |
| 749 | + "type": "input_file", |
| 750 | + "file_data": base64.b64encode(item.get("audio", b"")).decode( |
| 751 | + "utf-8" |
| 752 | + ), |
| 753 | + } |
| 754 | + ) |
| 755 | + return formatted_data |
| 756 | + |
| 757 | + def _build_input_items( |
| 758 | + self, |
| 759 | + data: GenerationRequest, |
| 760 | + response: GenerationResponse | None, |
| 761 | + prev_requests: list[GenerationRequestArguments], |
| 762 | + ) -> list[dict[str, Any]]: |
| 763 | + """Build the ``input`` array for the Responses API. |
| 764 | +
|
| 765 | + The Responses API uses a flat ``input`` list of role-tagged message |
| 766 | + dicts (with nested content parts like ``input_text``, ``input_image``) |
| 767 | + instead of chat completions' ``messages`` array. |
| 768 | + """ |
| 769 | + input_items: list[dict[str, Any]] = [] |
| 770 | + |
| 771 | + for req in prev_requests: |
| 772 | + if req.body and "input" in req.body: |
| 773 | + prev_input = req.body["input"] |
| 774 | + if isinstance(prev_input, list): |
| 775 | + input_items.extend(prev_input) |
| 776 | + |
| 777 | + prompts = [ |
| 778 | + self._format_prompts(data.columns.get(col, []), col) |
| 779 | + for col in ("text_column", "image_column", "video_column", "audio_column") |
| 780 | + ] |
| 781 | + content_parts = list(roundrobin(*prompts)) |
| 782 | + if content_parts: |
| 783 | + input_items.append({"role": "user", "content": content_parts}) |
| 784 | + |
| 785 | + if response and response.text: |
| 786 | + input_items.append({"role": "assistant", "content": response.text}) |
| 787 | + |
| 788 | + return input_items |
| 789 | + |
| 790 | + def format( |
| 791 | + self, |
| 792 | + data: GenerationRequest, |
| 793 | + response: GenerationResponse | None = None, |
| 794 | + history: HistoryT[GenerationRequest, GenerationResponse] | None = None, |
| 795 | + **kwargs, |
| 796 | + ) -> GenerationRequestArguments: |
| 797 | + prev_requests: list[GenerationRequestArguments] = [] |
| 798 | + if history: |
| 799 | + prev_requests = [ |
| 800 | + self.format(req, response=res, **kwargs) for req, res in history |
| 801 | + ] |
| 802 | + |
| 803 | + arguments = GenerationRequestArguments() |
| 804 | + arguments.body = {} |
| 805 | + |
| 806 | + if kwargs.get("model") is not None: |
| 807 | + arguments.body["model"] = kwargs["model"] |
| 808 | + |
| 809 | + if kwargs.get("stream"): |
| 810 | + arguments.stream = True |
| 811 | + arguments.body["stream"] = True |
| 812 | + # Unlike chat completions, we don't send stream_options here. |
| 813 | + # The Responses API's stream_options only controls obfuscation, |
| 814 | + # not usage reporting. vLLM always includes usage data in the |
| 815 | + # response.completed SSE event for this endpoint. |
| 816 | + # Unfortunately, this complicates getting accurate stats when canceled. |
| 817 | + |
| 818 | + if data.output_metrics.text_tokens: |
| 819 | + arguments.body["max_output_tokens"] = data.output_metrics.text_tokens |
| 820 | + # stop/ignore_eos are vLLM-specific sampling params that force |
| 821 | + # the model to generate exactly N tokens, matching the behavior |
| 822 | + # of the chat completions handler for controlled benchmarking. |
| 823 | + arguments.body["stop"] = None |
| 824 | + arguments.body["ignore_eos"] = True |
| 825 | + elif kwargs.get("max_tokens") is not None: |
| 826 | + arguments.body["max_output_tokens"] = kwargs["max_tokens"] |
| 827 | + |
| 828 | + if kwargs.get("extras"): |
| 829 | + arguments.model_combine(kwargs["extras"]) |
| 830 | + |
| 831 | + prefix = " ".join(data.columns.get("prefix_column", [])) |
| 832 | + if prefix: |
| 833 | + arguments.body["instructions"] = prefix |
| 834 | + |
| 835 | + arguments.body["input"] = self._build_input_items(data, response, prev_requests) |
| 836 | + |
| 837 | + return arguments |
| 838 | + |
| 839 | + def compile_non_streaming( |
| 840 | + self, |
| 841 | + request: GenerationRequest, |
| 842 | + arguments: GenerationRequestArguments, |
| 843 | + response: dict, |
| 844 | + ) -> GenerationResponse: |
| 845 | + text = self._extract_output_text(response) |
| 846 | + usage = response.get("usage", {}) |
| 847 | + input_metrics, output_metrics = self.extract_metrics(usage, text) |
| 848 | + |
| 849 | + return GenerationResponse( |
| 850 | + request_id=request.request_id, |
| 851 | + request_args=arguments.model_dump_json(), |
| 852 | + response_id=response.get("id"), |
| 853 | + text=text, |
| 854 | + input_metrics=input_metrics, |
| 855 | + output_metrics=output_metrics, |
| 856 | + ) |
| 857 | + |
| 858 | + def compile_streaming( |
| 859 | + self, request: GenerationRequest, arguments: GenerationRequestArguments |
| 860 | + ) -> GenerationResponse: |
| 861 | + text = "".join(self.streaming_texts) |
| 862 | + input_metrics, output_metrics = self.extract_metrics(self.streaming_usage, text) |
| 863 | + |
| 864 | + return GenerationResponse( |
| 865 | + request_id=request.request_id, |
| 866 | + request_args=arguments.model_dump_json(), |
| 867 | + response_id=self.streaming_response_id, |
| 868 | + text=text, |
| 869 | + input_metrics=input_metrics, |
| 870 | + output_metrics=output_metrics, |
| 871 | + ) |
| 872 | + |
| 873 | + def extract_line_data(self, line: str) -> dict[str, Any] | None: |
| 874 | + """Parse a Responses API SSE line. |
| 875 | +
|
| 876 | + The Responses API streams paired ``event: <type>`` and ``data: <json>`` |
| 877 | + lines, unlike chat completions which only uses ``data:`` lines. The |
| 878 | + event type is redundantly embedded in the JSON payload's ``type`` field, |
| 879 | + so ``event:`` lines are skipped, keeping only ``data:`` lines. |
| 880 | + """ |
| 881 | + line = line.strip() |
| 882 | + |
| 883 | + if not line or not line.startswith("data:"): |
| 884 | + return {} |
| 885 | + |
| 886 | + if line == "data: [DONE]": |
| 887 | + return None |
| 888 | + |
| 889 | + return json.loads(line[len("data:") :].strip()) |
| 890 | + |
| 891 | + def add_streaming_line(self, line: str) -> int | None: |
| 892 | + if not (data := self.extract_line_data(line)): |
| 893 | + return None if data is None else 0 |
| 894 | + |
| 895 | + event_type = data.get("type", "") |
| 896 | + |
| 897 | + if "id" in data and self.streaming_response_id is None: |
| 898 | + resp = data.get("response", {}) |
| 899 | + if isinstance(resp, dict) and "id" in resp: |
| 900 | + self.streaming_response_id = resp["id"] |
| 901 | + |
| 902 | + if event_type == "response.output_text.delta": |
| 903 | + delta = data.get("delta", "") |
| 904 | + if delta: |
| 905 | + self.streaming_texts.append(delta) |
| 906 | + return 1 |
| 907 | + return 0 |
| 908 | + |
| 909 | + if event_type in ( |
| 910 | + "response.completed", |
| 911 | + "response.failed", |
| 912 | + "response.incomplete", |
| 913 | + ): |
| 914 | + # All three are terminal SSE events. response.completed is the |
| 915 | + # normal case; response.failed and response.incomplete may be sent |
| 916 | + # by some providers instead. Each carries a final response object |
| 917 | + # with optional usage data. Returning None signals the streaming |
| 918 | + # loop in http.py to break out of the stream. |
| 919 | + resp = data.get("response", {}) |
| 920 | + if isinstance(resp, dict): |
| 921 | + usage = resp.get("usage") |
| 922 | + if usage: |
| 923 | + self.streaming_usage = usage |
| 924 | + if self.streaming_response_id is None and "id" in resp: |
| 925 | + self.streaming_response_id = resp["id"] |
| 926 | + return None |
| 927 | + |
| 928 | + return 0 |
| 929 | + |
| 930 | + def extract_metrics( |
| 931 | + self, usage: dict[str, int | dict[str, int]] | None, text: str |
| 932 | + ) -> tuple[UsageMetrics, UsageMetrics]: |
| 933 | + # Responses API uses "input_tokens"/"output_tokens" in its usage |
| 934 | + # payload, unlike chat completions' "prompt_tokens"/"completion_tokens". |
| 935 | + if not usage: |
| 936 | + return UsageMetrics(), UsageMetrics( |
| 937 | + text_words=len(text.split()) if text else 0, |
| 938 | + text_characters=len(text) if text else 0, |
| 939 | + ) |
| 940 | + |
| 941 | + usage_metrics: dict[str, int] = cast("dict[str, int]", usage) |
| 942 | + |
| 943 | + return UsageMetrics( |
| 944 | + text_tokens=usage_metrics.get("input_tokens", 0), |
| 945 | + ), UsageMetrics( |
| 946 | + text_tokens=usage_metrics.get("output_tokens", 0), |
| 947 | + text_words=len(text.split()) if text else 0, |
| 948 | + text_characters=len(text) if text else 0, |
| 949 | + ) |
| 950 | + |
| 951 | + @staticmethod |
| 952 | + def _extract_output_text(response: dict) -> str: |
| 953 | + """Extract generated text from a Responses API response object.""" |
| 954 | + texts: list[str] = [] |
| 955 | + for item in response.get("output", []): |
| 956 | + if item.get("type") != "message": |
| 957 | + continue |
| 958 | + for part in item.get("content", []): |
| 959 | + if part.get("type") == "output_text": |
| 960 | + texts.append(part.get("text", "")) |
| 961 | + return "".join(texts) |
0 commit comments