Spaces:
Running
Running
| import os | |
| import tempfile | |
| import re | |
| import torch | |
| import gradio as gr | |
| from PyPDF2 import PdfReader | |
| from docx import Document as DocxDocument | |
| from pptx import Presentation | |
| from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_core.documents import Document | |
| from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline | |
| # Load Reasoning Model (Balanced for CPU + Reasoning) | |
| model_id = "MBZUAI/LaMini-Flan-T5-783M" | |
| tokenizer = AutoTokenizer.from_pretrained(model_id) | |
| model = AutoModelForSeq2SeqLM.from_pretrained(model_id) | |
| reasoning_pipeline = pipeline( | |
| "text2text-generation", | |
| model=model, | |
| tokenizer=tokenizer, | |
| max_new_tokens=512, | |
| temperature=0.7, | |
| top_p=0.9 | |
| ) | |
| # Embedding Model | |
| embedding_model = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") | |
| vectorstore = None | |
| # Summarizer | |
| summary_pipeline = pipeline("summarization", model="sshleifer/distilbart-cnn-12-6") | |
| def clean_text(text): | |
| lines = text.split("\n") | |
| cleaned = [] | |
| for line in lines: | |
| line = line.strip() | |
| if re.search(r'(Page \d+|Slide \d+|CS583|UIC|Bing Liu)', line, re.IGNORECASE): | |
| continue | |
| if len(line) < 3: | |
| continue | |
| line = re.sub(r'[^\x00-\x7F]+', ' ', line) | |
| cleaned.append(line) | |
| return "\n".join(cleaned) | |
| def extract_text(file_path, ext): | |
| if ext == ".pdf": | |
| reader = PdfReader(file_path) | |
| return "\n".join([page.extract_text() or "" for page in reader.pages]) | |
| elif ext == ".docx": | |
| doc = DocxDocument(file_path) | |
| return "\n".join([p.text for p in doc.paragraphs]) | |
| elif ext == ".txt": | |
| with open(file_path, "r", encoding="utf-8", errors="replace") as f: | |
| return f.read() | |
| elif ext == ".pptx": | |
| prs = Presentation(file_path) | |
| return "\n".join(shape.text for slide in prs.slides for shape in slide.shapes if hasattr(shape, "text")) | |
| else: | |
| raise ValueError("Unsupported file format") | |
| def process_file(file): | |
| global vectorstore | |
| try: | |
| filename = getattr(file, "name", None) | |
| ext = os.path.splitext(filename)[1].lower() if filename else ".pdf" | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as tmp: | |
| if hasattr(file, "read"): | |
| file_bytes = file.read() | |
| elif isinstance(file, str) and os.path.exists(file): | |
| with open(file, "rb") as f: | |
| file_bytes = f.read() | |
| elif isinstance(file, bytes): | |
| file_bytes = file | |
| else: | |
| return "β Error: Could not process uploaded file." | |
| tmp.write(file_bytes) | |
| tmp.flush() | |
| full_text = extract_text(tmp.name, ext) | |
| cleaned = clean_text(full_text) | |
| splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=150) | |
| chunks = splitter.split_text(cleaned) | |
| docs = [Document(page_content=c) for c in chunks] | |
| vectorstore = FAISS.from_documents(docs, embedding_model) | |
| return "β File processed. You can now ask questions." | |
| except Exception as e: | |
| return f"β Error: {str(e)}" | |
| def generate_prompt(context, question): | |
| return f"""You are a helpful and intelligent academic assistant. | |
| Use the following class material to answer a student's question. | |
| Material: | |
| {context} | |
| Student's Question: | |
| {question} | |
| Answer the question with: | |
| - Structured academic explanation | |
| - Relevant details from the material (e.g., examples like FIFO, LRU) | |
| - No repetition | |
| - No outside knowledge | |
| - No prompt words like "context" or "question" | |
| - Use markdown formatting with headings and lists where helpful | |
| If the material does not have enough information, say: "The material does not contain enough information to answer this accurately." | |
| """ | |
| def detect_question_type(q): | |
| q = q.lower().strip() | |
| if q.startswith(("what is", "define", "give definition")): | |
| return "definition" | |
| elif q.startswith(("how", "explain", "why")): | |
| return "explanation" | |
| elif "difference between" in q or "compare" in q: | |
| return "comparison" | |
| elif q.startswith("list") or "types of" in q: | |
| return "list" | |
| return "general" | |
| def post_process_output(answer_text, question): | |
| qtype = detect_question_type(question) | |
| label_map = { | |
| "definition": "π **Definition**", | |
| "explanation": "π **Explanation**", | |
| "comparison": "π **Comparison**", | |
| "list": "π **Key Points**", | |
| "general": "π **Insight**", | |
| } | |
| clean_answer = answer_text.strip() | |
| if clean_answer.lower().startswith("context:") or "instructions:" in clean_answer: | |
| for marker in ["Context:", "Question:", "Instructions:"]: | |
| clean_answer = clean_answer.replace(marker, "").strip() | |
| if len(clean_answer.split()) > 80: | |
| summary = summary_pipeline(clean_answer, max_length=60, min_length=25, do_sample=False)[0]['summary_text'] | |
| clean_answer += f"\n\nπ **Summary:** {summary.strip()}" | |
| return f"{label_map.get(qtype)}\n\n{clean_answer}" | |
| def ask_question(question): | |
| global vectorstore | |
| if vectorstore is None: | |
| return "β Please upload and process a file first." | |
| docs = vectorstore.similarity_search(question, k=3) | |
| if not docs: | |
| return "β No relevant information found." | |
| context = "\n".join([doc.page_content for doc in docs]) | |
| prompt = generate_prompt(context, question) | |
| output = reasoning_pipeline(prompt)[0]['generated_text'].strip() | |
| # Clean unwanted leftovers | |
| for marker in ["Context:", "Question:", "Instructions:", "Use structured academic language"]: | |
| output = output.replace(marker, "").strip() | |
| # Remove leading "Answer:" if present | |
| if output.lower().startswith("answer:"): | |
| output = output[len("answer:"):].strip() | |
| # Ensure proper sentence ending | |
| if "." in output: | |
| output = output.rsplit(".", 1)[0] + "." | |
| # Fallback if answer too short or generic | |
| if len(output) < 10 or output.lower() in ["", ".", "use structured academic language.", "use structured academic language"]: | |
| return "β The model could not generate a meaningful answer from the provided material." | |
| return post_process_output(output, question) | |
| # Gradio UI | |
| with gr.Blocks(theme=gr.themes.Monochrome()) as demo: | |
| gr.Markdown(""" | |
| # π AI Study Assistant | |
| Upload your lecture slide/text file, ask questions, and get intelligent answers powered by Phi-1.5. | |
| """) | |
| with gr.Tab("Upload & Ask"): | |
| with gr.Row(): | |
| file_input = gr.File(label="π Upload File", file_types=[".pdf", ".docx", ".pptx", ".txt"]) | |
| upload_btn = gr.Button("Upload") | |
| upload_output = gr.Textbox(label="Upload Status", interactive=False) | |
| upload_btn.click(fn=process_file, inputs=[file_input], outputs=[upload_output]) | |
| gr.Markdown("---") | |
| with gr.Row(): | |
| question = gr.Textbox(label="β Ask a question") | |
| ask_btn = gr.Button("Ask") | |
| answer = gr.Textbox(label="π‘ Answer", interactive=False) | |
| ask_btn.click(fn=ask_question, inputs=[question], outputs=[answer]) | |
| with gr.Tab("History"): | |
| gr.Markdown(""" | |
| **β³ Coming Soon**: Question-answer history, summarization view, and more! | |
| """) | |
| if __name__ == "__main__": | |
| demo.launch() | |