datbkpro commited on
Commit
f0e51ba
·
verified ·
1 Parent(s): 4a0d9d6

Update services/gemini_realtime_service.py

Browse files
Files changed (1) hide show
  1. services/gemini_realtime_service.py +118 -59
services/gemini_realtime_service.py CHANGED
@@ -2,7 +2,7 @@ import asyncio
2
  import base64
3
  import json
4
  import os
5
- from typing import Callable, Optional
6
  import numpy as np
7
  from google import genai
8
  from google.genai.types import (
@@ -13,7 +13,7 @@ from google.genai.types import (
13
  )
14
 
15
  class GeminiRealtimeService:
16
- """Dịch vụ Gemini Realtime API cho streaming chất lượng cao"""
17
 
18
  def __init__(self, api_key: str = None):
19
  self.api_key = api_key or os.getenv("GEMINI_API_KEY")
@@ -22,6 +22,8 @@ class GeminiRealtimeService:
22
  self.is_active = False
23
  self.callback = None
24
  self.voice_name = "Puck"
 
 
25
 
26
  async def initialize(self):
27
  """Khởi tạo client Gemini"""
@@ -34,8 +36,12 @@ class GeminiRealtimeService:
34
  )
35
  return True
36
 
 
 
 
 
37
  async def start_session(self, voice_name: str = "Puck", callback: Callable = None):
38
- """Bắt đầu session Gemini Realtime"""
39
  try:
40
  if not self.client:
41
  await self.initialize()
@@ -55,32 +61,31 @@ class GeminiRealtimeService:
55
  ),
56
  )
57
 
58
- # Kết nối session - SỬA LỖI Ở ĐÂY
59
- self.session = self.client.aio.live.connect(
60
  model="gemini-2.0-flash-exp",
61
  config=config
62
  )
63
 
64
- # Sử dụng async with để quản lý session
65
- async with self.session as session:
66
- self.is_active = True
67
-
68
- if self.callback:
69
- await self.callback({
70
- 'type': 'status',
71
- 'message': f'✅ Đã kết nối Gemini - Giọng: {voice_name}',
72
- 'status': 'connected'
73
- })
74
-
75
- print("✅ Gemini Realtime session started")
76
-
77
- # Xử lý realtime communication
78
- await self._handle_realtime_session(session)
79
-
80
  return True
81
 
82
  except Exception as e:
83
- error_msg = f"❌ Lỗi khởi động Gemini Realtime: {e}"
84
  if self.callback:
85
  await self.callback({
86
  'type': 'error',
@@ -90,34 +95,65 @@ class GeminiRealtimeService:
90
  print(error_msg)
91
  return False
92
 
93
- async def _handle_realtime_session(self, session):
94
- """Xử session realtime"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
95
  try:
96
- async for response in session:
97
- if hasattr(response, 'data') and response.data:
98
- # Audio response from Gemini
99
- audio_data = np.frombuffer(response.data, dtype=np.int16)
100
-
101
- if self.callback:
102
- await self.callback({
103
- 'type': 'audio',
104
- 'audio_data': audio_data,
105
- 'sample_rate': 24000,
106
- 'status': 'audio_streaming'
107
- })
108
 
109
- elif hasattr(response, 'text') and response.text:
110
- # Text response from Gemini
111
- if self.callback:
112
- await self.callback({
113
- 'type': 'text',
114
- 'content': response.text,
115
- 'role': 'assistant',
116
- 'status': 'text_response'
117
- })
118
 
 
 
 
 
 
 
 
 
 
 
 
 
 
119
  except Exception as e:
120
- error_msg = f"❌ Lỗi trong session: {e}"
121
  if self.callback:
122
  await self.callback({
123
  'type': 'error',
@@ -125,28 +161,51 @@ class GeminiRealtimeService:
125
  'status': 'error'
126
  })
127
 
128
- async def send_audio(self, audio_data: bytes):
129
- """Gửi audio data đến Gemini session"""
130
- if not self.session or not self.is_active:
131
  return False
132
 
133
  try:
134
- # Trong implementation thực tế, sẽ gửi qua session
135
- # await self.session.send(audio_data)
136
- print(f"📤 Sent audio data: {len(audio_data)} bytes")
 
 
 
 
137
  return True
138
 
139
  except Exception as e:
140
- print(f"❌ Lỗi gửi audio: {e}")
141
  return False
142
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
143
  async def send_text(self, text: str):
144
- """Gửi text message đến Gemini (fallback)"""
145
- if not self.client or not self.is_active:
146
  return None
147
 
148
  try:
149
- # Fallback: sử dụng chat completion thông thường
150
  response = await self.client.aio.models.generate_content(
151
  model="gemini-2.0-flash-exp",
152
  contents=text
@@ -173,14 +232,14 @@ class GeminiRealtimeService:
173
  return None
174
 
175
  async def close(self):
176
- """Đóng kết nối Gemini"""
177
  self.is_active = False
178
 
179
  if self.callback:
180
  await self.callback({
181
  'type': 'status',
182
- 'message': '🛑 Đã đóng kết nối Gemini',
183
  'status': 'disconnected'
184
  })
185
 
186
- print("🛑 Gemini Realtime session closed")
 
2
  import base64
3
  import json
4
  import os
5
+ from typing import Callable, Optional, AsyncGenerator
6
  import numpy as np
7
  from google import genai
8
  from google.genai.types import (
 
13
  )
14
 
15
  class GeminiRealtimeService:
16
+ """Dịch vụ Gemini Realtime API với audio streaming thực"""
17
 
18
  def __init__(self, api_key: str = None):
19
  self.api_key = api_key or os.getenv("GEMINI_API_KEY")
 
22
  self.is_active = False
23
  self.callback = None
24
  self.voice_name = "Puck"
25
+ self.input_queue = asyncio.Queue()
26
+ self.output_queue = asyncio.Queue()
27
 
28
  async def initialize(self):
29
  """Khởi tạo client Gemini"""
 
36
  )
37
  return True
38
 
39
+ def encode_audio(self, data: np.ndarray) -> str:
40
+ """Encode audio data to base64"""
41
+ return base64.b64encode(data.tobytes()).decode("UTF-8")
42
+
43
  async def start_session(self, voice_name: str = "Puck", callback: Callable = None):
44
+ """Bắt đầu session Gemini Realtime với audio streaming"""
45
  try:
46
  if not self.client:
47
  await self.initialize()
 
61
  ),
62
  )
63
 
64
+ # Kết nối session
65
+ self.session = await self.client.aio.live.connect(
66
  model="gemini-2.0-flash-exp",
67
  config=config
68
  )
69
 
70
+ self.is_active = True
71
+
72
+ if self.callback:
73
+ await self.callback({
74
+ 'type': 'status',
75
+ 'message': f'✅ Đã kết nối Gemini Audio Streaming - Giọng: {voice_name}',
76
+ 'status': 'connected'
77
+ })
78
+
79
+ print("✅ Gemini Realtime Audio session started")
80
+
81
+ # Khởi động background tasks
82
+ asyncio.create_task(self._audio_sender())
83
+ asyncio.create_task(self._audio_receiver())
84
+
 
85
  return True
86
 
87
  except Exception as e:
88
+ error_msg = f"❌ Lỗi khởi động Gemini Audio: {e}"
89
  if self.callback:
90
  await self.callback({
91
  'type': 'error',
 
95
  print(error_msg)
96
  return False
97
 
98
+ async def _audio_sender(self):
99
+ """Gửi audio data đến Gemini"""
100
+ try:
101
+ async with self.session as session:
102
+ async for audio_chunk in self._audio_stream_generator():
103
+ await session.send(audio_chunk)
104
+ except Exception as e:
105
+ error_msg = f"❌ Lỗi gửi audio: {e}"
106
+ if self.callback:
107
+ await self.callback({
108
+ 'type': 'error',
109
+ 'message': error_msg,
110
+ 'status': 'error'
111
+ })
112
+
113
+ async def _audio_stream_generator(self) -> AsyncGenerator[bytes, None]:
114
+ """Generator cho audio streaming"""
115
+ while self.is_active:
116
+ try:
117
+ audio_data = await asyncio.wait_for(self.input_queue.get(), timeout=1.0)
118
+ yield audio_data
119
+ except asyncio.TimeoutError:
120
+ continue
121
+ except Exception as e:
122
+ print(f"❌ Lỗi audio generator: {e}")
123
+ break
124
+
125
+ async def _audio_receiver(self):
126
+ """Nhận audio response từ Gemini"""
127
  try:
128
+ async with self.session as session:
129
+ async for response in session:
130
+ if hasattr(response, 'data') and response.data:
131
+ # Audio response
132
+ audio_data = np.frombuffer(response.data, dtype=np.int16)
 
 
 
 
 
 
 
133
 
134
+ if self.callback:
135
+ await self.callback({
136
+ 'type': 'audio',
137
+ 'audio_data': audio_data,
138
+ 'sample_rate': 24000,
139
+ 'status': 'audio_streaming'
140
+ })
 
 
141
 
142
+ # Đưa vào output queue để phát ra loa
143
+ self.output_queue.put_nowait((24000, audio_data))
144
+
145
+ elif hasattr(response, 'text') and response.text:
146
+ # Text response
147
+ if self.callback:
148
+ await self.callback({
149
+ 'type': 'text',
150
+ 'content': response.text,
151
+ 'role': 'assistant',
152
+ 'status': 'text_response'
153
+ })
154
+
155
  except Exception as e:
156
+ error_msg = f"❌ Lỗi nhận audio: {e}"
157
  if self.callback:
158
  await self.callback({
159
  'type': 'error',
 
161
  'status': 'error'
162
  })
163
 
164
+ async def send_audio_chunk(self, audio_chunk: np.ndarray, sample_rate: int = 16000):
165
+ """Gửi audio chunk đến Gemini"""
166
+ if not self.is_active:
167
  return False
168
 
169
  try:
170
+ # Resample nếu cần (Gemini cần 16kHz)
171
+ if sample_rate != 16000:
172
+ audio_chunk = self._resample_audio(audio_chunk, sample_rate, 16000)
173
+
174
+ # Encode và gửi
175
+ audio_bytes = audio_chunk.tobytes()
176
+ await self.input_queue.put(audio_bytes)
177
  return True
178
 
179
  except Exception as e:
180
+ print(f"❌ Lỗi gửi audio chunk: {e}")
181
  return False
182
 
183
+ async def receive_audio(self) -> tuple[int, np.ndarray] | None:
184
+ """Nhận audio từ Gemini"""
185
+ try:
186
+ return await asyncio.wait_for(self.output_queue.get(), timeout=1.0)
187
+ except asyncio.TimeoutError:
188
+ return None
189
+
190
+ def _resample_audio(self, audio_chunk: np.ndarray, original_rate: int, target_rate: int):
191
+ """Resample audio chunk (đơn giản)"""
192
+ if original_rate == target_rate:
193
+ return audio_chunk
194
+
195
+ ratio = target_rate / original_rate
196
+ new_length = int(len(audio_chunk) * ratio)
197
+ return np.interp(
198
+ np.linspace(0, len(audio_chunk) - 1, new_length),
199
+ np.arange(len(audio_chunk)),
200
+ audio_chunk
201
+ ).astype(np.int16)
202
+
203
  async def send_text(self, text: str):
204
+ """Gửi text message (fallback)"""
205
+ if not self.client:
206
  return None
207
 
208
  try:
 
209
  response = await self.client.aio.models.generate_content(
210
  model="gemini-2.0-flash-exp",
211
  contents=text
 
232
  return None
233
 
234
  async def close(self):
235
+ """Đóng kết nối"""
236
  self.is_active = False
237
 
238
  if self.callback:
239
  await self.callback({
240
  'type': 'status',
241
+ 'message': '🛑 Đã đóng kết nối Gemini Audio',
242
  'status': 'disconnected'
243
  })
244
 
245
+ print("🛑 Gemini Audio session closed")