Spaces:
Paused
Paused
| # import sys, os, time | |
| # import traceback | |
| # from dotenv import load_dotenv | |
| # load_dotenv() | |
| # import os, io | |
| # # this file is to test litellm/proxy | |
| # sys.path.insert( | |
| # 0, os.path.abspath("../..") | |
| # ) # Adds the parent directory to the system path | |
| # import pytest, logging | |
| # import litellm | |
| # from litellm import embedding, completion, completion_cost, Timeout | |
| # from litellm import RateLimitError | |
| # import sys, os, time | |
| # import traceback | |
| # from dotenv import load_dotenv | |
| # load_dotenv() | |
| # import os, io | |
| # # this file is to test litellm/proxy | |
| # from concurrent.futures import ThreadPoolExecutor | |
| # sys.path.insert( | |
| # 0, os.path.abspath("../..") | |
| # ) # Adds the parent directory to the system path | |
| # import pytest, logging, requests | |
| # import litellm | |
| # from litellm import embedding, completion, completion_cost, Timeout | |
| # from litellm import RateLimitError | |
| # from github import Github | |
| # import subprocess | |
| # # Function to execute a command and return the output | |
| # def run_command(command): | |
| # process = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True) | |
| # output, _ = process.communicate() | |
| # return output.decode().strip() | |
| # # Retrieve the current branch name | |
| # branch_name = run_command("git rev-parse --abbrev-ref HEAD") | |
| # # GitHub personal access token (with repo scope) or use username and password | |
| # access_token = os.getenv("GITHUB_ACCESS_TOKEN") | |
| # # Instantiate the PyGithub library's Github object | |
| # g = Github(access_token) | |
| # # Provide the owner and name of the repository where the pull request is located | |
| # repository_owner = "BerriAI" | |
| # repository_name = "litellm" | |
| # # Get the repository object | |
| # repo = g.get_repo(f"{repository_owner}/{repository_name}") | |
| # # Iterate through the pull requests to find the one related to your branch | |
| # for pr in repo.get_pulls(): | |
| # print(f"in here! {pr.head.ref}") | |
| # if pr.head.ref == branch_name: | |
| # pr_number = pr.number | |
| # break | |
| # print(f"The pull request number for branch {branch_name} is: {pr_number}") | |
| # def test_add_new_key(): | |
| # max_retries = 3 | |
| # retry_delay = 10 # seconds | |
| # for retry in range(max_retries + 1): | |
| # try: | |
| # # Your test data | |
| # test_data = { | |
| # "models": ["gpt-3.5-turbo", "gpt-4", "claude-2", "azure-model"], | |
| # "aliases": {"mistral-7b": "gpt-3.5-turbo"}, | |
| # "duration": "20m", | |
| # } | |
| # print("testing proxy server") | |
| # # Your bearer token | |
| # token = os.getenv("PROXY_MASTER_KEY") | |
| # headers = {"Authorization": f"Bearer {token}"} | |
| # endpoint = f"https://litellm-litellm-pr-{pr_number}.up.railway.app" | |
| # # Make a request to the staging endpoint | |
| # response = requests.post( | |
| # endpoint + "/key/generate", json=test_data, headers=headers | |
| # ) | |
| # print(f"response: {response.text}") | |
| # if response.status_code == 200: | |
| # result = response.json() | |
| # break # Successful response, exit the loop | |
| # elif response.status_code == 503 and retry < max_retries: | |
| # print( | |
| # f"Retrying in {retry_delay} seconds... (Retry {retry + 1}/{max_retries})" | |
| # ) | |
| # time.sleep(retry_delay) | |
| # else: | |
| # assert False, f"Unexpected response status code: {response.status_code}" | |
| # except Exception as e: | |
| # print(traceback.format_exc()) | |
| # pytest.fail(f"An error occurred {e}") | |
| # def test_update_new_key(): | |
| # try: | |
| # # Your test data | |
| # test_data = { | |
| # "models": ["gpt-3.5-turbo", "gpt-4", "claude-2", "azure-model"], | |
| # "aliases": {"mistral-7b": "gpt-3.5-turbo"}, | |
| # "duration": "20m", | |
| # } | |
| # print("testing proxy server") | |
| # # Your bearer token | |
| # token = os.getenv("PROXY_MASTER_KEY") | |
| # headers = {"Authorization": f"Bearer {token}"} | |
| # endpoint = f"https://litellm-litellm-pr-{pr_number}.up.railway.app" | |
| # # Make a request to the staging endpoint | |
| # response = requests.post( | |
| # endpoint + "/key/generate", json=test_data, headers=headers | |
| # ) | |
| # assert response.status_code == 200 | |
| # result = response.json() | |
| # assert result["key"].startswith("sk-") | |
| # def _post_data(): | |
| # json_data = {"models": ["bedrock-models"], "key": result["key"]} | |
| # response = requests.post( | |
| # endpoint + "/key/generate", json=json_data, headers=headers | |
| # ) | |
| # print(f"response text: {response.text}") | |
| # assert response.status_code == 200 | |
| # return response | |
| # _post_data() | |
| # print(f"Received response: {result}") | |
| # except Exception as e: | |
| # pytest.fail(f"LiteLLM Proxy test failed. Exception: {str(e)}") | |
| # def test_add_new_key_max_parallel_limit(): | |
| # try: | |
| # # Your test data | |
| # test_data = {"duration": "20m", "max_parallel_requests": 1} | |
| # # Your bearer token | |
| # token = os.getenv("PROXY_MASTER_KEY") | |
| # headers = {"Authorization": f"Bearer {token}"} | |
| # endpoint = f"https://litellm-litellm-pr-{pr_number}.up.railway.app" | |
| # print(f"endpoint: {endpoint}") | |
| # # Make a request to the staging endpoint | |
| # response = requests.post( | |
| # endpoint + "/key/generate", json=test_data, headers=headers | |
| # ) | |
| # assert response.status_code == 200 | |
| # result = response.json() | |
| # # load endpoint with model | |
| # model_data = { | |
| # "model_name": "azure-model", | |
| # "litellm_params": { | |
| # "model": "azure/chatgpt-v-3", | |
| # "api_key": os.getenv("AZURE_API_KEY"), | |
| # "api_base": os.getenv("AZURE_API_BASE"), | |
| # "api_version": os.getenv("AZURE_API_VERSION") | |
| # } | |
| # } | |
| # response = requests.post(endpoint + "/model/new", json=model_data, headers=headers) | |
| # assert response.status_code == 200 | |
| # print(f"response text: {response.text}") | |
| # def _post_data(): | |
| # json_data = { | |
| # "model": "azure-model", | |
| # "messages": [ | |
| # { | |
| # "role": "user", | |
| # "content": f"this is a test request, write a short poem {time.time()}", | |
| # } | |
| # ], | |
| # } | |
| # # Your bearer token | |
| # response = requests.post( | |
| # endpoint + "/chat/completions", json=json_data, headers={"Authorization": f"Bearer {result['key']}"} | |
| # ) | |
| # return response | |
| # def _run_in_parallel(): | |
| # with ThreadPoolExecutor(max_workers=2) as executor: | |
| # future1 = executor.submit(_post_data) | |
| # future2 = executor.submit(_post_data) | |
| # # Obtain the results from the futures | |
| # response1 = future1.result() | |
| # print(f"response1 text: {response1.text}") | |
| # response2 = future2.result() | |
| # print(f"response2 text: {response2.text}") | |
| # if response1.status_code == 429 or response2.status_code == 429: | |
| # pass | |
| # else: | |
| # raise Exception() | |
| # _run_in_parallel() | |
| # except Exception as e: | |
| # pytest.fail(f"LiteLLM Proxy test failed. Exception: {str(e)}") | |
| # def test_add_new_key_max_parallel_limit_streaming(): | |
| # try: | |
| # # Your test data | |
| # test_data = {"duration": "20m", "max_parallel_requests": 1} | |
| # # Your bearer token | |
| # token = os.getenv("PROXY_MASTER_KEY") | |
| # headers = {"Authorization": f"Bearer {token}"} | |
| # endpoint = f"https://litellm-litellm-pr-{pr_number}.up.railway.app" | |
| # # Make a request to the staging endpoint | |
| # response = requests.post( | |
| # endpoint + "/key/generate", json=test_data, headers=headers | |
| # ) | |
| # print(f"response: {response.text}") | |
| # assert response.status_code == 200 | |
| # result = response.json() | |
| # def _post_data(): | |
| # json_data = { | |
| # "model": "azure-model", | |
| # "messages": [ | |
| # { | |
| # "role": "user", | |
| # "content": f"this is a test request, write a short poem {time.time()}", | |
| # } | |
| # ], | |
| # "stream": True, | |
| # } | |
| # response = requests.post( | |
| # endpoint + "/chat/completions", json=json_data, headers={"Authorization": f"Bearer {result['key']}"} | |
| # ) | |
| # return response | |
| # def _run_in_parallel(): | |
| # with ThreadPoolExecutor(max_workers=2) as executor: | |
| # future1 = executor.submit(_post_data) | |
| # future2 = executor.submit(_post_data) | |
| # # Obtain the results from the futures | |
| # response1 = future1.result() | |
| # response2 = future2.result() | |
| # if response1.status_code == 429 or response2.status_code == 429: | |
| # pass | |
| # else: | |
| # raise Exception() | |
| # _run_in_parallel() | |
| # except Exception as e: | |
| # pytest.fail(f"LiteLLM Proxy test failed. Exception: {str(e)}") | |