datbkpro commited on
Commit
27b3d88
·
verified ·
1 Parent(s): f0e51ba

Update services/gemini_realtime_service.py

Browse files
Files changed (1) hide show
  1. services/gemini_realtime_service.py +84 -42
services/gemini_realtime_service.py CHANGED
@@ -11,6 +11,7 @@ from google.genai.types import (
11
  SpeechConfig,
12
  VoiceConfig,
13
  )
 
14
 
15
  class GeminiRealtimeService:
16
  """Dịch vụ Gemini Realtime API với audio streaming thực"""
@@ -24,17 +25,21 @@ class GeminiRealtimeService:
24
  self.voice_name = "Puck"
25
  self.input_queue = asyncio.Queue()
26
  self.output_queue = asyncio.Queue()
 
27
 
28
  async def initialize(self):
29
  """Khởi tạo client Gemini"""
30
  if not self.api_key:
31
  raise ValueError("Gemini API key is required")
32
 
33
- self.client = genai.Client(
34
- api_key=self.api_key,
35
- http_options={"api_version": "v1alpha"},
36
- )
37
- return True
 
 
 
38
 
39
  def encode_audio(self, data: np.ndarray) -> str:
40
  """Encode audio data to base64"""
@@ -61,8 +66,8 @@ class GeminiRealtimeService:
61
  ),
62
  )
63
 
64
- # Kết nối session
65
- self.session = await self.client.aio.live.connect(
66
  model="gemini-2.0-flash-exp",
67
  config=config
68
  )
@@ -78,9 +83,8 @@ class GeminiRealtimeService:
78
 
79
  print("✅ Gemini Realtime Audio session started")
80
 
81
- # Khởi động background tasks
82
- asyncio.create_task(self._audio_sender())
83
- asyncio.create_task(self._audio_receiver())
84
 
85
  return True
86
 
@@ -95,12 +99,33 @@ class GeminiRealtimeService:
95
  print(error_msg)
96
  return False
97
 
98
- async def _audio_sender(self):
99
- """Gửi audio data đến Gemini"""
100
  try:
 
101
  async with self.session as session:
102
- async for audio_chunk in self._audio_stream_generator():
103
- await session.send(audio_chunk)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
104
  except Exception as e:
105
  error_msg = f"❌ Lỗi gửi audio: {e}"
106
  if self.callback:
@@ -122,36 +147,35 @@ class GeminiRealtimeService:
122
  print(f"❌ Lỗi audio generator: {e}")
123
  break
124
 
125
- async def _audio_receiver(self):
126
  """Nhận audio response từ Gemini"""
127
  try:
128
- async with self.session as session:
129
- async for response in session:
130
- if hasattr(response, 'data') and response.data:
131
- # Audio response
132
- audio_data = np.frombuffer(response.data, dtype=np.int16)
133
-
134
- if self.callback:
135
- await self.callback({
136
- 'type': 'audio',
137
- 'audio_data': audio_data,
138
- 'sample_rate': 24000,
139
- 'status': 'audio_streaming'
140
- })
 
 
 
 
 
 
 
 
 
 
 
 
141
 
142
- # Đưa vào output queue để phát ra loa
143
- self.output_queue.put_nowait((24000, audio_data))
144
-
145
- elif hasattr(response, 'text') and response.text:
146
- # Text response
147
- if self.callback:
148
- await self.callback({
149
- 'type': 'text',
150
- 'content': response.text,
151
- 'role': 'assistant',
152
- 'status': 'text_response'
153
- })
154
-
155
  except Exception as e:
156
  error_msg = f"❌ Lỗi nhận audio: {e}"
157
  if self.callback:
@@ -221,6 +245,16 @@ class GeminiRealtimeService:
221
 
222
  return response.text
223
 
 
 
 
 
 
 
 
 
 
 
224
  except Exception as e:
225
  error_msg = f"❌ Lỗi gửi text: {e}"
226
  if self.callback:
@@ -229,12 +263,20 @@ class GeminiRealtimeService:
229
  'message': error_msg,
230
  'status': 'error'
231
  })
232
- return None
233
 
234
  async def close(self):
235
  """Đóng kết nối"""
236
  self.is_active = False
237
 
 
 
 
 
 
 
 
 
238
  if self.callback:
239
  await self.callback({
240
  'type': 'status',
 
11
  SpeechConfig,
12
  VoiceConfig,
13
  )
14
+ from google.api_core import exceptions as google_exceptions
15
 
16
  class GeminiRealtimeService:
17
  """Dịch vụ Gemini Realtime API với audio streaming thực"""
 
25
  self.voice_name = "Puck"
26
  self.input_queue = asyncio.Queue()
27
  self.output_queue = asyncio.Queue()
28
+ self._session_task = None
29
 
30
  async def initialize(self):
31
  """Khởi tạo client Gemini"""
32
  if not self.api_key:
33
  raise ValueError("Gemini API key is required")
34
 
35
+ try:
36
+ self.client = genai.Client(
37
+ api_key=self.api_key,
38
+ http_options={"api_version": "v1alpha"},
39
+ )
40
+ return True
41
+ except Exception as e:
42
+ raise Exception(f"Không thể khởi tạo Gemini client: {str(e)}")
43
 
44
  def encode_audio(self, data: np.ndarray) -> str:
45
  """Encode audio data to base64"""
 
66
  ),
67
  )
68
 
69
+ # SỬA LỖI: Sử dụng async with đúng cách
70
+ self.session = self.client.aio.live.connect(
71
  model="gemini-2.0-flash-exp",
72
  config=config
73
  )
 
83
 
84
  print("✅ Gemini Realtime Audio session started")
85
 
86
+ # Khởi động background task để xử lý session
87
+ self._session_task = asyncio.create_task(self._handle_session())
 
88
 
89
  return True
90
 
 
99
  print(error_msg)
100
  return False
101
 
102
+ async def _handle_session(self):
103
+ """Xử session realtime với async with đúng cách"""
104
  try:
105
+ # SỬA LỖI: Sử dụng async with đúng cách
106
  async with self.session as session:
107
+ # Khởi động sender và receiver
108
+ sender_task = asyncio.create_task(self._audio_sender(session))
109
+ receiver_task = asyncio.create_task(self._audio_receiver(session))
110
+
111
+ # Chờ cả hai task hoàn thành
112
+ await asyncio.gather(sender_task, receiver_task)
113
+
114
+ except Exception as e:
115
+ error_msg = f"❌ Lỗi trong session: {e}"
116
+ if self.callback:
117
+ await self.callback({
118
+ 'type': 'error',
119
+ 'message': error_msg,
120
+ 'status': 'error'
121
+ })
122
+ print(error_msg)
123
+
124
+ async def _audio_sender(self, session):
125
+ """Gửi audio data đến Gemini"""
126
+ try:
127
+ async for audio_chunk in self._audio_stream_generator():
128
+ await session.send(audio_chunk)
129
  except Exception as e:
130
  error_msg = f"❌ Lỗi gửi audio: {e}"
131
  if self.callback:
 
147
  print(f"❌ Lỗi audio generator: {e}")
148
  break
149
 
150
+ async def _audio_receiver(self, session):
151
  """Nhận audio response từ Gemini"""
152
  try:
153
+ async for response in session:
154
+ if hasattr(response, 'data') and response.data:
155
+ # Audio response
156
+ audio_data = np.frombuffer(response.data, dtype=np.int16)
157
+
158
+ if self.callback:
159
+ await self.callback({
160
+ 'type': 'audio',
161
+ 'audio_data': audio_data,
162
+ 'sample_rate': 24000,
163
+ 'status': 'audio_streaming'
164
+ })
165
+
166
+ # Đưa vào output queue để phát ra loa
167
+ self.output_queue.put_nowait((24000, audio_data))
168
+
169
+ elif hasattr(response, 'text') and response.text:
170
+ # Text response
171
+ if self.callback:
172
+ await self.callback({
173
+ 'type': 'text',
174
+ 'content': response.text,
175
+ 'role': 'assistant',
176
+ 'status': 'text_response'
177
+ })
178
 
 
 
 
 
 
 
 
 
 
 
 
 
 
179
  except Exception as e:
180
  error_msg = f"❌ Lỗi nhận audio: {e}"
181
  if self.callback:
 
245
 
246
  return response.text
247
 
248
+ except google_exceptions.ResourceExhausted:
249
+ error_msg = "❌ Quota Gemini đã hết. Vui lòng kiểm tra billing."
250
+ if self.callback:
251
+ await self.callback({
252
+ 'type': 'error',
253
+ 'message': error_msg,
254
+ 'status': 'quota_exceeded'
255
+ })
256
+ return error_msg
257
+
258
  except Exception as e:
259
  error_msg = f"❌ Lỗi gửi text: {e}"
260
  if self.callback:
 
263
  'message': error_msg,
264
  'status': 'error'
265
  })
266
+ return error_msg
267
 
268
  async def close(self):
269
  """Đóng kết nối"""
270
  self.is_active = False
271
 
272
+ # Hủy task nếu đang chạy
273
+ if self._session_task:
274
+ self._session_task.cancel()
275
+ try:
276
+ await self._session_task
277
+ except asyncio.CancelledError:
278
+ pass
279
+
280
  if self.callback:
281
  await self.callback({
282
  'type': 'status',