Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import fitz # PyMuPDF | |
| import docx | |
| import chromadb | |
| import torch | |
| import nltk | |
| import gradio as gr | |
| from tqdm import tqdm | |
| from typing import List | |
| from PIL import Image | |
| from nltk.tokenize import sent_tokenize | |
| from sentence_transformers import SentenceTransformer, util | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
| # --- Ensure punkt tokenizer is available --- | |
| try: | |
| nltk.data.find("tokenizers/punkt") | |
| except LookupError: | |
| nltk.download("punkt") | |
| # --- Configuration --- | |
| MANUALS_FOLDER = "./Manuals" | |
| CHROMA_PATH = "./chroma_store" | |
| COLLECTION_NAME = "manual_chunks" | |
| MODEL_OPTIONS = { | |
| "LLaMA 3.1 8B": "meta-llama/Llama-3.1-8B-Instruct", | |
| "Mistral 7B": "mistralai/Mistral-7B-Instruct-v0.3", | |
| "Gemma 7B": "google/gemma-1.1-7b-it" | |
| } | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| MAX_CONTEXT_CHUNKS = 3 | |
| # --- Utility Functions --- | |
| def extract_text_from_pdf(path): | |
| try: | |
| doc = fitz.open(path) | |
| return "\n".join([page.get_text().strip() for page in doc]) | |
| except: | |
| return "" | |
| def extract_text_from_docx(path): | |
| try: | |
| doc = docx.Document(path) | |
| return "\n".join([para.text.strip() for para in doc.paragraphs]) | |
| except: | |
| return "" | |
| def clean(text): | |
| return "\n".join([line.strip() for line in text.splitlines() if line.strip()]) | |
| def split_sentences(text): | |
| try: | |
| return sent_tokenize(text) | |
| except Exception as e: | |
| print(f"[Tokenizer Error] {e}. Falling back to simple split.") | |
| return text.split(". ") | |
| def chunk_sentences(sentences, max_tokens=500, overlap=50): | |
| chunks = [] | |
| current = [] | |
| total = 0 | |
| for sentence in sentences: | |
| count = len(sentence.split()) | |
| if total + count > max_tokens: | |
| chunks.append(" ".join(current)) | |
| current = current[-overlap:] | |
| total = sum(len(s.split()) for s in current) | |
| current.append(sentence) | |
| total += count | |
| if current: | |
| chunks.append(" ".join(current)) | |
| return chunks | |
| def embed_all(): | |
| db = chromadb.PersistentClient(path=CHROMA_PATH) | |
| if COLLECTION_NAME in [c.name for c in db.list_collections()]: | |
| db.delete_collection(COLLECTION_NAME) | |
| collection = db.create_collection(COLLECTION_NAME) | |
| embedder = SentenceTransformer("all-MiniLM-L6-v2") | |
| all_chunks = [] | |
| for fname in os.listdir(MANUALS_FOLDER): | |
| path = os.path.join(MANUALS_FOLDER, fname) | |
| text = "" | |
| if fname.lower().endswith(".pdf"): | |
| text = extract_text_from_pdf(path) | |
| elif fname.lower().endswith(".docx"): | |
| text = extract_text_from_docx(path) | |
| else: | |
| continue | |
| sents = split_sentences(clean(text)) | |
| chunks = chunk_sentences(sents) | |
| for idx, chunk in enumerate(chunks): | |
| chunk_id = f"{fname}::chunk_{idx}" | |
| all_chunks.append({"id": chunk_id, "text": chunk, "metadata": {"source": fname}}) | |
| for i in range(0, len(all_chunks), 16): | |
| batch = all_chunks[i:i+16] | |
| docs = [x["text"] for x in batch] | |
| ids = [x["id"] for x in batch] | |
| metas = [x["metadata"] for x in batch] | |
| embs = embedder.encode(docs).tolist() | |
| collection.add(documents=docs, ids=ids, metadatas=metas, embeddings=embs) | |
| return collection, embedder | |
| def answer_query(query, model_choice): | |
| db, embedder = embed_all() | |
| results = db.get_collection(COLLECTION_NAME).query(query_texts=[query], n_results=MAX_CONTEXT_CHUNKS) | |
| context = "\n\n".join(results["documents"][0]) | |
| model_id = MODEL_OPTIONS.get(model_choice) | |
| tokenizer = AutoTokenizer.from_pretrained(model_id, token=HF_TOKEN) | |
| model = AutoModelForCausalLM.from_pretrained(model_id, token=HF_TOKEN) | |
| pipe = pipeline("text-generation", model=model, tokenizer=tokenizer) | |
| prompt = f""" | |
| Context: | |
| {context} | |
| Question: {query} | |
| Answer:""" | |
| out = pipe(prompt, max_new_tokens=300, do_sample=False) | |
| return out[0]["generated_text"].split("Answer:")[-1].strip() | |
| # --- UI --- | |
| with gr.Blocks() as demo: | |
| gr.Markdown("""# 📘 SmartManuals-AI | |
| Ask technical questions from manuals (PDF & DOCX) with LLM + OCR + RAG. | |
| """) | |
| with gr.Row(): | |
| question = gr.Textbox(label="Your Question", placeholder="e.g., How do I reset the console?") | |
| model_choice = gr.Dropdown(choices=list(MODEL_OPTIONS.keys()), value="LLaMA 3.1 8B", label="Model") | |
| answer = gr.Textbox(label="Answer") | |
| submit = gr.Button("Ask") | |
| submit.click(fn=answer_query, inputs=[question, model_choice], outputs=answer) | |
| demo.launch() | |