| import os |
| import json |
| import re |
| import gradio as gr |
| import requests |
| from duckduckgo_search import DDGS |
| from typing import List |
| from pydantic import BaseModel, Field |
| from langchain_community.vectorstores import FAISS |
| from langchain_community.embeddings import HuggingFaceEmbeddings |
| from langchain_core.documents import Document |
| from huggingface_hub import InferenceClient |
| import logging |
| import pandas as pd |
| import tempfile |
|
|
| |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
| |
| huggingface_token = os.environ.get("HUGGINGFACE_TOKEN") |
|
|
| MODELS = [ |
| "mistralai/Mistral-7B-Instruct-v0.3", |
| "mistralai/Mixtral-8x7B-Instruct-v0.1", |
| "mistralai/Mistral-Nemo-Instruct-2407", |
| "meta-llama/Meta-Llama-3.1-8B-Instruct", |
| "meta-llama/Meta-Llama-3.1-70B-Instruct" |
| ] |
|
|
| MODEL_TOKEN_LIMITS = { |
| "mistralai/Mistral-7B-Instruct-v0.3": 32768, |
| "mistralai/Mixtral-8x7B-Instruct-v0.1": 32768, |
| "mistralai/Mistral-Nemo-Instruct-2407": 32768, |
| "meta-llama/Meta-Llama-3.1-8B-Instruct": 8192, |
| "meta-llama/Meta-Llama-3.1-70B-Instruct": 8192, |
| } |
|
|
| DEFAULT_SYSTEM_PROMPT = """You are a world-class financial AI assistant, capable of complex reasoning and reflection. |
| Reason through the query inside <thinking> tags, and then provide your final response inside <output> tags. |
| Providing comprehensive and accurate information based on web search results is essential. |
| Your goal is to synthesize the given context into a coherent and detailed response that directly addresses the user's query. |
| Please ensure that your response is well-structured, factual. |
| If you detect that you made a mistake in your reasoning at any point, correct yourself inside <reflection> tags.""" |
|
|
| def process_excel_file(file, model, temperature, num_calls, use_embeddings, system_prompt): |
| try: |
| df = pd.read_excel(file.name) |
| results = [] |
| |
| for _, row in df.iterrows(): |
| question = row['Question'] |
| custom_system_prompt = row['System Prompt'] |
| |
| |
| response_generator = get_response_with_search(question, model, num_calls, temperature, use_embeddings, custom_system_prompt) |
| |
| full_response = "" |
| for partial_response, _ in response_generator: |
| full_response = partial_response |
| |
| if not full_response: |
| full_response = "No response generated. Please check the input parameters and try again." |
| |
| results.append(full_response) |
| |
| df['Response'] = results |
| |
| |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.xlsx') as tmp: |
| df.to_excel(tmp.name, index=False) |
| return tmp.name |
| except Exception as e: |
| logging.error(f"Error processing Excel file: {str(e)}") |
| return None |
|
|
| def upload_file(file): |
| return file.name if file else None |
|
|
| def download_file(file_path): |
| return file_path |
|
|
| def get_embeddings(): |
| return HuggingFaceEmbeddings(model_name="sentence-transformers/stsb-roberta-large") |
|
|
| def duckduckgo_search(query): |
| with DDGS() as ddgs: |
| results = list(ddgs.text(query, max_results=5)) |
| return results |
|
|
| class CitingSources(BaseModel): |
| sources: List[str] = Field( |
| ..., |
| description="List of sources to cite. Should be an URL of the source." |
| ) |
|
|
| def chatbot_interface(message, history, model, temperature, num_calls, use_embeddings, system_prompt): |
| if not message.strip(): |
| return "", history |
|
|
| history = history + [(message, "")] |
|
|
| try: |
| for response in respond(message, history, model, temperature, num_calls, use_embeddings, system_prompt): |
| history[-1] = (message, response) |
| yield history |
| except Exception as e: |
| logging.error(f"Error in chatbot_interface: {str(e)}") |
| error_message = f"An error occurred: {str(e)}. Please try again." |
| history[-1] = (message, error_message) |
| yield history |
|
|
| def retry_last_response(history, model, temperature, num_calls, use_embeddings, system_prompt): |
| if not history: |
| return history |
| |
| last_user_msg = history[-1][0] |
| history = history[:-1] |
| |
| return chatbot_interface(last_user_msg, history, model, temperature, num_calls, use_embeddings, system_prompt) |
|
|
| def respond(message, history, model, temperature, num_calls, use_embeddings, system_prompt): |
| logging.info(f"User Query: {message}") |
| logging.info(f"Model Used: {model}") |
| logging.info(f"Use Embeddings: {use_embeddings}") |
| logging.info(f"System Prompt: {system_prompt}") |
|
|
| try: |
| for main_content, _ in get_response_with_search(message, model, num_calls=num_calls, temperature=temperature, use_embeddings=use_embeddings, system_prompt=system_prompt): |
| yield main_content |
| except Exception as e: |
| logging.error(f"Error with {model}: {str(e)}") |
| yield f"An error occurred with the {model} model: {str(e)}. Please try again or select a different model." |
|
|
| def create_web_search_vectors(search_results): |
| embed = get_embeddings() |
| |
| documents = [] |
| for result in search_results: |
| if 'body' in result: |
| content = f"{result['title']}\n{result['body']}\nSource: {result['href']}" |
| documents.append(Document(page_content=content, metadata={"source": result['href']})) |
| |
| return FAISS.from_documents(documents, embed) |
|
|
| def summarize_article(article, content, model, system_prompt, user_query, client, temperature=0.2): |
| prompt = f"""Summarize the following article in the context of broader web search results: |
| |
| Article: |
| Title: {article['title']} |
| URL: {article['href']} |
| Content: {article['body'][:1000]}... # Truncate to avoid extremely long prompts |
| |
| Additional Context: |
| {content[:1000]}... # Truncate additional context as well |
| |
| User Query: {user_query} |
| |
| Write a detailed and complete research document which addresses the User Query, incorporating both the specific article and the broader context. Focus on the most relevant information. |
| """ |
|
|
| |
| input_tokens = len(prompt.split()) // 4 |
| |
| |
| model_token_limit = MODEL_TOKEN_LIMITS.get(model, 8192) |
| |
| |
| max_new_tokens = min(model_token_limit - input_tokens, 6500) |
|
|
| try: |
| response = client.chat_completion( |
| messages=[ |
| {"role": "system", "content": system_prompt}, |
| {"role": "user", "content": prompt} |
| ], |
| max_tokens=max_new_tokens, |
| temperature=temperature, |
| stream=False, |
| top_p=0.8, |
| ) |
|
|
| if hasattr(response, 'choices') and response.choices: |
| for choice in response.choices: |
| if hasattr(choice, 'message') and hasattr(choice.message, 'content'): |
| return choice.message.content.strip() |
| except Exception as e: |
| logging.error(f"Error summarizing article: {str(e)}") |
| return f"Error summarizing article: {str(e)}" |
|
|
| return "Unable to generate summary." |
|
|
| def get_response_with_search(query, model, num_calls=3, temperature=0.2, use_embeddings=True, system_prompt=DEFAULT_SYSTEM_PROMPT): |
| search_results = duckduckgo_search(query) |
| client = InferenceClient(model, token=huggingface_token) |
|
|
| |
| overall_context = "\n".join([f"{result['title']}\n{result['body']}" for result in search_results]) |
|
|
| summaries = [] |
| for result in search_results: |
| summary = summarize_article(result, overall_context, model, system_prompt, query, client, temperature) |
| summaries.append({ |
| "title": result['title'], |
| "url": result['href'], |
| "summary": summary |
| }) |
| yield format_output(summaries), "" |
|
|
| def format_output(summaries): |
| output = "Here are the summarized search results:\n\n" |
| for item in summaries: |
| output += f"News Title: {item['title']}\n" |
| output += f"URL: {item['url']}\n" |
| output += f"Summary: {item['summary']}\n\n" |
| return output |
|
|
| def vote(data: gr.LikeData): |
| if data.liked: |
| print(f"You upvoted this response: {data.value}") |
| else: |
| print(f"You downvoted this response: {data.value}") |
|
|
| css = """ |
| /* Fine-tune chatbox size */ |
| """ |
|
|
| def initial_conversation(): |
| return [ |
| (None, "Welcome! I'm your AI assistant for web search. Here's how you can use me:\n\n" |
| "1. Ask me any question, and I'll search the web for information.\n" |
| "2. You can adjust the system prompt for fine-tuned responses, whether to use embeddings, and the temperature.\n" |
|
|
| "To get started, ask me a question!") |
| ] |
|
|
| |
| with gr.Blocks() as demo: |
| gr.Markdown("# AI-powered Web Search Assistant") |
| gr.Markdown("Ask questions and get answers from web search results.") |
| |
| with gr.Row(): |
| chatbot = gr.Chatbot( |
| show_copy_button=True, |
| likeable=True, |
| layout="bubble", |
| height=400, |
| value=initial_conversation() |
| ) |
| |
| with gr.Row(): |
| message = gr.Textbox(placeholder="Ask a question", container=False, scale=7) |
| submit_button = gr.Button("Submit") |
| |
| with gr.Accordion("⚙️ Parameters", open=False): |
| model = gr.Dropdown(choices=MODELS, label="Select Model", value=MODELS[3]) |
| temperature = gr.Slider(minimum=0.1, maximum=1.0, value=0.2, step=0.1, label="Temperature") |
| num_calls = gr.Slider(minimum=1, maximum=5, value=1, step=1, label="Number of API Calls") |
| use_embeddings = gr.Checkbox(label="Use Embeddings", value=False) |
| system_prompt = gr.Textbox(label="System Prompt", lines=5, value=DEFAULT_SYSTEM_PROMPT) |
| |
| with gr.Accordion("Batch Processing", open=False): |
| excel_file = gr.File(label="Upload Excel File", file_types=[".xlsx"]) |
| process_button = gr.Button("Process Excel File") |
| download_button = gr.File(label="Download Processed File") |
| |
| |
| submit_button.click(chatbot_interface, inputs=[message, chatbot, model, temperature, num_calls, use_embeddings, system_prompt], outputs=chatbot) |
| message.submit(chatbot_interface, inputs=[message, chatbot, model, temperature, num_calls, use_embeddings, system_prompt], outputs=chatbot) |
| |
| |
| excel_file.change(upload_file, inputs=[excel_file], outputs=[excel_file]) |
| process_button.click( |
| process_excel_file, |
| inputs=[excel_file, model, temperature, num_calls, use_embeddings, system_prompt], |
| outputs=[download_button] |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch(share=True) |