Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import logging | |
| import json | |
| import uvicorn | |
| import secrets | |
| from fastapi import FastAPI, Response, Request | |
| from fastapi.responses import StreamingResponse, RedirectResponse, HTMLResponse, JSONResponse | |
| from fastapi.exceptions import RequestValidationError | |
| from fastapi.security import APIKeyHeader | |
| from starlette.exceptions import HTTPException | |
| from starlette.status import HTTP_422_UNPROCESSABLE_ENTITY, HTTP_401_UNAUTHORIZED, HTTP_403_FORBIDDEN | |
| from fastapi.encoders import jsonable_encoder | |
| from pydantic import BaseModel | |
| from typing import Union, Optional | |
| import g4f | |
| import g4f.debug | |
| from g4f.client import AsyncClient | |
| from g4f.typing import Messages | |
| from g4f.cookies import read_cookie_files | |
| def create_app(): | |
| app = FastAPI() | |
| api = Api2(app) | |
| api.register_routes() | |
| api.register_authorization() | |
| api.register_validation_exception_handler() | |
| if not AppConfig.ignore_cookie_files: | |
| read_cookie_files() | |
| return app | |
| def create_app_debug(): | |
| g4f.debug.logging = True | |
| return create_app() | |
| class ChatCompletionsForm(BaseModel): | |
| messages: Messages | |
| model: str | |
| provider: Optional[str] = None | |
| stream: bool = False | |
| temperature: Optional[float] = None | |
| max_tokens: Optional[int] = None | |
| stop: Union[list[str], str, None] = None | |
| api_key: Optional[str] = None | |
| web_search: Optional[bool] = None | |
| proxy: Optional[str] = None | |
| class AppConfig(): | |
| list_ignored_providers: Optional[list[str]] = None | |
| g4f_api_key: Optional[str] = None | |
| ignore_cookie_files: bool = False | |
| defaults: dict = {} | |
| def set_config(cls, **data): | |
| for key, value in data.items(): | |
| setattr(cls, key, value) | |
| class Api2: | |
| def __init__(self, app: FastAPI) -> None: | |
| self.app = app | |
| self.client = AsyncClient() | |
| self.get_g4f_api_key = APIKeyHeader(name="g4f-api-key") | |
| def register_authorization(self): | |
| async def authorization(request: Request, call_next): | |
| if AppConfig.g4f_api_key and request.url.path in ["/v1/chat/completions", "/v1/completions"]: | |
| try: | |
| user_g4f_api_key = await self.get_g4f_api_key(request) | |
| except HTTPException as e: | |
| if e.status_code == 403: | |
| return JSONResponse( | |
| status_code=HTTP_401_UNAUTHORIZED, | |
| content=jsonable_encoder({"detail": "G4F API key required"}), | |
| ) | |
| if not secrets.compare_digest(AppConfig.g4f_api_key, user_g4f_api_key): | |
| return JSONResponse( | |
| status_code=HTTP_403_FORBIDDEN, | |
| content=jsonable_encoder({"detail": "Invalid G4F API key"}), | |
| ) | |
| return await call_next(request) | |
| def register_validation_exception_handler(self): | |
| async def validation_exception_handler(request: Request, exc: RequestValidationError): | |
| details = exc.errors() | |
| modified_details = [{ | |
| "loc": error["loc"], | |
| "message": error["msg"], | |
| "type": error["type"], | |
| } for error in details] | |
| return JSONResponse( | |
| status_code=HTTP_422_UNPROCESSABLE_ENTITY, | |
| content=jsonable_encoder({"detail": modified_details}), | |
| ) | |
| def register_routes(self): | |
| async def read_root(): | |
| return RedirectResponse("/v1", 302) | |
| async def read_root_v1(): | |
| return HTMLResponse('g4f API: Go to ' | |
| '<a href="/api/v1/chat/completions">chat/completions</a> ' | |
| 'or <a href="/api/v1/models">models</a>.') | |
| async def models(): | |
| model_list = { | |
| model: g4f.models.ModelUtils.convert[model] | |
| for model in g4f.Model.__all__() | |
| } | |
| model_list = [{ | |
| 'id': model_id, | |
| 'object': 'model', | |
| 'created': 0, | |
| 'owned_by': model.base_provider | |
| } for model_id, model in model_list.items()] | |
| return JSONResponse(model_list) | |
| async def model_info(model_name: str): | |
| try: | |
| model_info = g4f.models.ModelUtils.convert[model_name] | |
| return JSONResponse({ | |
| 'id': model_name, | |
| 'object': 'model', | |
| 'created': 0, | |
| 'owned_by': model_info.base_provider | |
| }) | |
| except: | |
| return JSONResponse({"error": "The model does not exist."}) | |
| async def chat_completions(config: ChatCompletionsForm, request: Request = None, provider: str = None): | |
| try: | |
| config.provider = provider if config.provider is None else config.provider | |
| if config.api_key is None and request is not None: | |
| auth_header = request.headers.get("Authorization") | |
| if auth_header is not None: | |
| auth_header = auth_header.split(None, 1)[-1] | |
| if auth_header and auth_header != "Bearer": | |
| config.api_key = auth_header | |
| response = self.client.chat.completions.create( | |
| **{ | |
| **AppConfig.defaults, | |
| **config.dict(exclude_none=True), | |
| }, | |
| ignored=AppConfig.list_ignored_providers | |
| ) | |
| except Exception as e: | |
| logging.exception(e) | |
| return Response(content=format_exception(e, config), status_code=500, media_type="application/json") | |
| if not config.stream: | |
| return JSONResponse((await response).to_json()) | |
| async def streaming(): | |
| try: | |
| async for chunk in response: | |
| yield f"data: {json.dumps(chunk.to_json())}\n\n" | |
| except GeneratorExit: | |
| pass | |
| except Exception as e: | |
| logging.exception(e) | |
| yield f'data: {format_exception(e, config)}\n\n' | |
| yield "data: [DONE]\n\n" | |
| return StreamingResponse(streaming(), media_type="text/event-stream") | |
| async def completions(): | |
| return Response(content=json.dumps({'info': 'Not working yet.'}, indent=4), media_type="application/json") | |
| def format_exception(e: Exception, config: ChatCompletionsForm) -> str: | |
| last_provider = g4f.get_last_provider(True) | |
| return json.dumps({ | |
| "error": {"message": f"{e.__class__.__name__}: {e}"}, | |
| "model": last_provider.get("model") if last_provider else config.model, | |
| "provider": last_provider.get("name") if last_provider else config.provider | |
| }) | |
| def run_api( | |
| host: str = '0.0.0.0', | |
| port: int = 1337, | |
| bind: str = None, | |
| debug: bool = False, | |
| workers: int = None, | |
| use_colors: bool = None | |
| ) -> None: | |
| print(f'Starting server... [g4f v-{g4f.version.utils.current_version}]' + (" (debug)" if debug else "")) | |
| if use_colors is None: | |
| use_colors = debug | |
| if bind is not None: | |
| host, port = bind.split(":") | |
| uvicorn.run( | |
| f"run_app:create_app{'_debug' if debug else ''}", | |
| host=host, port=int(port), | |
| workers=workers, | |
| use_colors=use_colors, | |
| factory=True, | |
| reload=debug | |
| ) | |
| list_ignored_providers = [ | |
| "GptForLove", | |
| "ChatBase", | |
| "Chatgpt4Online", | |
| "ChatAnywhere", | |
| "GptGo", | |
| "You", | |
| ] | |
| if __name__ == "__main__": | |
| run_api(debug=True) | |