Spaces:
Sleeping
Sleeping
| # β app.py (Final Hugging Face Version for SmartManuals-AI) | |
| # β No metadata filtering; all semantic search with keyword reranking | |
| # β Auto-index from Manuals/ on startup, with rerun prevention | |
| # β Gradio UI only, no file upload, progress logs | |
| import os | |
| import json | |
| import fitz # PyMuPDF | |
| import hashlib | |
| import chromadb | |
| from tqdm import tqdm | |
| from nltk.tokenize import sent_tokenize | |
| from sentence_transformers import SentenceTransformer, util | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
| import torch | |
| import gradio as gr | |
| # --------------------------- | |
| # βοΈ Config | |
| # --------------------------- | |
| MANUALS_FOLDER = "./Manuals" | |
| CHROMA_PATH = "./chroma_store" | |
| CHUNKS_FILE = "manual_chunks_with_ocr.jsonl" | |
| HASH_FILE = "manuals.hash" | |
| CHUNK_SIZE = 750 | |
| CHUNK_OVERLAP = 100 | |
| MODEL_ID = "meta-llama/Llama-3.1-8B-Instruct" | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| collection = None | |
| embedder = None | |
| pipe = None | |
| # --------------------------- | |
| # π Load model and pipeline | |
| # --------------------------- | |
| def load_model(): | |
| global pipe | |
| if HF_TOKEN is None: | |
| print("β HF_TOKEN is not set") | |
| return None | |
| try: | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, token=HF_TOKEN) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| MODEL_ID, token=HF_TOKEN, torch_dtype=torch.float32 | |
| ) | |
| pipe = pipeline( | |
| "text-generation", | |
| model=model, | |
| tokenizer=tokenizer, | |
| max_new_tokens=512, | |
| temperature=0.2, | |
| top_p=0.9, | |
| do_sample=True, | |
| device=-1 | |
| ) | |
| print(f"β Model loaded: {MODEL_ID}") | |
| return tokenizer | |
| except Exception as e: | |
| print(f"β Model load failed: {e}") | |
| return None | |
| # --------------------------- | |
| # π Utilities | |
| # --------------------------- | |
| def clean_text(text): | |
| lines = text.splitlines() | |
| return "\n".join([l.strip() for l in lines if l.strip()]) | |
| def split_into_chunks(sentences, max_tokens=CHUNK_SIZE, overlap=CHUNK_OVERLAP): | |
| chunks, current, cur_len = [], [], 0 | |
| for sent in sentences: | |
| tok = len(sent.split()) | |
| if cur_len + tok > max_tokens: | |
| chunks.append(" ".join(current)) | |
| current = current[-overlap:] | |
| cur_len = sum(len(s.split()) for s in current) | |
| current.append(sent) | |
| cur_len += tok | |
| if current: chunks.append(" ".join(current)) | |
| return chunks | |
| def hash_folder(folder): | |
| hasher = hashlib.sha256() | |
| for fname in sorted(os.listdir(folder)): | |
| if fname.endswith(".pdf"): | |
| with open(os.path.join(folder, fname), "rb") as f: | |
| while chunk := f.read(8192): | |
| hasher.update(chunk) | |
| return hasher.hexdigest() | |
| # --------------------------- | |
| # π Indexing | |
| # --------------------------- | |
| def extract_and_chunk(): | |
| from PIL import Image | |
| import pytesseract | |
| chunks = [] | |
| for fname in tqdm(sorted(os.listdir(MANUALS_FOLDER))): | |
| if not fname.endswith(".pdf"): continue | |
| path = os.path.join(MANUALS_FOLDER, fname) | |
| try: | |
| doc = fitz.open(path) | |
| for i, page in enumerate(doc): | |
| text = page.get_text() | |
| if not text: | |
| img = Image.open(io.BytesIO(page.get_pixmap(dpi=300).tobytes("png"))) | |
| text = pytesseract.image_to_string(img) | |
| sents = sent_tokenize(clean_text(text)) | |
| for j, chunk in enumerate(split_into_chunks(sents)): | |
| chunks.append({ | |
| "source_file": fname, | |
| "chunk_id": f"{fname}::p{i+1}::c{j+1}", | |
| "page": i+1, | |
| "text": chunk.strip() | |
| }) | |
| except Exception as e: | |
| print(f"Error reading {fname}: {e}") | |
| with open(CHUNKS_FILE, "w", encoding="utf-8") as f: | |
| for chunk in chunks: | |
| json.dump(chunk, f) | |
| f.write("\n") | |
| return chunks | |
| # --------------------------- | |
| # πΎ ChromaDB Embedding | |
| # --------------------------- | |
| def embed_chunks(): | |
| global collection, embedder | |
| client = chromadb.PersistentClient(path=CHROMA_PATH) | |
| embedder = SentenceTransformer("all-MiniLM-L6-v2") | |
| try: client.delete_collection("manual_chunks") | |
| except: pass | |
| collection = client.create_collection("manual_chunks") | |
| with open(CHUNKS_FILE, "r", encoding="utf-8") as f: | |
| batch, metas, ids, texts = [], [], [], [] | |
| for line in f: | |
| item = json.loads(line) | |
| texts.append(item["text"]) | |
| ids.append(item["chunk_id"]) | |
| metas.append({"source_file": item["source_file"], "page": item["page"]}) | |
| if len(texts) == 16: | |
| embs = embedder.encode(texts).tolist() | |
| collection.add(documents=texts, ids=ids, metadatas=metas, embeddings=embs) | |
| texts, ids, metas = [], [], [] | |
| if texts: | |
| embs = embedder.encode(texts).tolist() | |
| collection.add(documents=texts, ids=ids, metadatas=metas, embeddings=embs) | |
| # --------------------------- | |
| # π Semantic QA | |
| # --------------------------- | |
| def ask(question): | |
| if not collection or not embedder or not pipe: | |
| return "App not ready." | |
| emb = embedder.encode(question).tolist() | |
| results = collection.query(query_embeddings=[emb], n_results=3) | |
| context = "\n\n".join([r for r in results["documents"][0]]) | |
| prompt = f""" | |
| Use the context to answer. Say 'I don't know' if unsure. | |
| Context: | |
| {context} | |
| Question: {question} | |
| """ | |
| return pipe(prompt)[0]['generated_text'] | |
| # --------------------------- | |
| # π App Startup | |
| # --------------------------- | |
| def initialize(): | |
| if not os.path.exists(MANUALS_FOLDER): | |
| os.makedirs(MANUALS_FOLDER) | |
| new_hash = hash_folder(MANUALS_FOLDER) | |
| if os.path.exists(HASH_FILE): | |
| with open(HASH_FILE, "r") as f: | |
| if f.read().strip() == new_hash and os.path.exists(CHUNKS_FILE): | |
| print("β Manuals unchanged. Skipping re-embedding.") | |
| return | |
| print("π Indexing manuals...") | |
| extract_and_chunk() | |
| embed_chunks() | |
| with open(HASH_FILE, "w") as f: | |
| f.write(new_hash) | |
| print("β Embedding complete.") | |
| # --------------------------- | |
| # π₯οΈ Gradio Interface | |
| # --------------------------- | |
| def build_ui(): | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## π Ask SmartManuals-AI") | |
| inp = gr.Textbox(label="Your question") | |
| out = gr.Textbox(label="Answer", lines=6) | |
| btn = gr.Button("Ask") | |
| btn.click(fn=ask, inputs=inp, outputs=out) | |
| return demo | |
| # --------------------------- | |
| # π§ Run App | |
| # --------------------------- | |
| load_model() | |
| initialize() | |
| demo = build_ui() | |