Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| from prodiapy import Prodia | |
| from PIL import Image | |
| from io import BytesIO | |
| import requests | |
| import random | |
| import os | |
| import base64 | |
| import json | |
| import time | |
| class Prodia: | |
| def __init__(self, api_key=os.getenv("PRODIA_API_KEY"), base=None): | |
| self.base = base or "https://api.prodia.com/v1" | |
| self.headers = { | |
| "X-Prodia-Key": api_key | |
| } | |
| def faceswap(self, params): | |
| response = self._post(f"{self.base}/faceswap", params) | |
| return response.json() | |
| def get_job(self, job_id): | |
| response = self._get(f"{self.base}/job/{job_id}") | |
| return response.json() | |
| def wait(self, job): | |
| job_result = job | |
| while job_result['status'] not in ['succeeded', 'failed']: | |
| time.sleep(0.25) | |
| job_result = self.get_job(job['job']) | |
| return job_result | |
| def _post(self, url, params): | |
| headers = { | |
| **self.headers, | |
| "Content-Type": "application/json" | |
| } | |
| response = requests.post(url, headers=headers, data=json.dumps(params)) | |
| if response.status_code != 200: | |
| raise Exception(f"Bad Prodia Response: {response.status_code}") | |
| return response | |
| def _get(self, url): | |
| response = requests.get(url, headers=self.headers) | |
| if response.status_code != 200: | |
| raise Exception(f"Bad Prodia Response: {response.status_code}") | |
| return response | |
| client = Prodia() | |
| def infer(source, target): | |
| if source_image is None or target_image is None: | |
| return | |
| source_url = upload_image(source) | |
| target_url = upload_image(target) | |
| job = client.faceswap({ | |
| "sourceUrl": source_url, | |
| "targetUrl": target_url | |
| }) | |
| res = client.wait(job) | |
| if res['status'] == "failed": | |
| return | |
| return res['imageUrl'] | |
| def upload_image(file): | |
| files = {'file': open(file, 'rb')} | |
| img_id = requests.post(os.getenv("IMAGE_API_1"), files=files).json()['id'] | |
| payload = { | |
| "content": "", | |
| "nonce": f"{random.randint(1, 10000000)}H9X42KSEJFNNH", | |
| "replies": [], | |
| "attachments": [img_id] | |
| } | |
| res = requests.post(os.getenv("IMAGE_API_2"), json=payload, headers={"x-session-token": os.getenv("SESSION_TOKEN")}) | |
| return f"{os.getenv('IMAGE_API_1')}/{img_id}/{res.json()['attachments'][0]['filename']}" | |
| def image_to_base64(image: Image): | |
| # Convert the image to bytes | |
| buffered = BytesIO() | |
| image.save(buffered, format="PNG") # You can change format to PNG if needed | |
| # Encode the bytes to base64 | |
| img_str = base64.b64encode(buffered.getvalue()) | |
| return img_str.decode('utf-8') # Convert bytes to string | |
| with gr.Blocks() as demo: | |
| with gr.Column(): | |
| gr.HTML("<h1><center>Face Swap</center></h1>") | |
| with gr.Row(): | |
| with gr.Row(): | |
| source_image = gr.Image(type="filepath", label="Source Image") | |
| target_image = gr.Image(type="filepath", label="Target Image") | |
| with gr.Column(): | |
| result = gr.Image() | |
| run_button = gr.Button("Swap Face", variant="primary") | |
| gr.Examples( | |
| examples=[ | |
| ["example1.jpg", "example2.jpg"], | |
| ["example3.jpg", "example4.jpg"], | |
| ["example5.jpg", "example6.jpg"] | |
| ], | |
| fn=infer, | |
| inputs=[source_image, target_image], | |
| outputs=[result] | |
| ) | |
| run_button.click(fn=infer, inputs=[source_image, target_image], outputs=[result]) | |
| demo.queue(max_size=20, api_open=False).launch(show_api=False, max_threads=400) | |