-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
306 lines (244 loc) · 8.05 KB
/
utils.py
File metadata and controls
306 lines (244 loc) · 8.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
# utils.py
import atexit
import json
import re
import shutil
import subprocess
from datetime import datetime
from pathlib import Path
from typing import Any, Dict
from config import (
IST,
NLU_LOG,
AGENT_LOG,
GMAIL_SCOPES,
MONGO_COLLECTION_NLU,
MONGO_COLLECTION_AGENT,
)
from db import get_db
_gui_logger = None
# Gmail imports
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
# -------------- TTS / speak() --------------
try:
import pyttsx3
_tts_engine = pyttsx3.init()
_tts_engine.setProperty("rate", 180)
_tts_engine.setProperty("volume", 1.0)
def speak(text: str) -> None:
"""Text-to-speech (with console print)."""
try:
print(f"🗣️ {text}")
_tts_engine.say(text)
_tts_engine.runAndWait()
except Exception as e:
print(f"(tts) error: {e}")
except Exception:
def speak(text: str) -> None:
"""Fallback speak that only prints to console."""
print(f"🗣️ {text}")
# -------------- Time helpers --------------
def now_local() -> datetime:
"""Return current local time in IST."""
return datetime.now(IST)
def iso_now() -> str:
"""Return ISO timestamp in IST."""
return now_local().isoformat()
def make_aware(dt: datetime) -> datetime:
"""Ensure a datetime is timezone-aware (IST)."""
if dt.tzinfo is None:
return dt.replace(tzinfo=IST)
return dt.astimezone(IST)
# -------------- Logging (JSONL) --------------
_buffer = [] # buffered NLU logs
def append_jsonl(record: Dict[str, Any]) -> None:
"""Append a record to the NLU JSONL log with small buffering."""
global _buffer
if not record:
return
_buffer.append(record)
if len(_buffer) >= 5:
with NLU_LOG.open("a", encoding="utf-8") as f:
for r in _buffer:
# NOTE: default=str handles ObjectId and other non-JSON types
f.write(json.dumps(r, ensure_ascii=False, default=str) + "\n")
_buffer.clear()
def flush_jsonl() -> None:
"""Flush remaining buffered NLU logs."""
global _buffer
if not _buffer:
return
with NLU_LOG.open("a", encoding="utf-8") as f:
for r in _buffer:
f.write(json.dumps(r, ensure_ascii=False) + "\n")
_buffer = []
atexit.register(flush_jsonl)
def _insert_mongo(collection_name: str, doc: Dict[str, Any]) -> None:
"""Insert a document into MongoDB (best-effort, non-fatal)."""
if not doc or not collection_name:
return
try:
db = get_db()
if db is None:
return
db[collection_name].insert_one(doc)
except Exception as e:
# Don't crash the app if Mongo is down
print(f"(mongo) insert error into {collection_name}: {e}")
def log_agent(obj: Dict[str, Any]) -> None:
"""Log agent events to AGENT_LOG and MongoDB."""
if not obj:
return
# File log (existing behaviour)
AGENT_LOG.parent.mkdir(parents=True, exist_ok=True)
with AGENT_LOG.open("a", encoding="utf-8") as f:
f.write(json.dumps(obj, ensure_ascii=False) + "\n")
# Mongo log (new)
_insert_mongo(MONGO_COLLECTION_AGENT, obj)
# -------------- Shell / system helpers --------------
def which(programs):
"""Return first found executable from a list (or string)."""
if isinstance(programs, (list, tuple)):
for p in programs:
if shutil.which(p):
return p
return None
return shutil.which(programs)
def open_with(programs, args=None, return_program: bool = False):
"""
Launch a GUI app with arguments, suppressing stdout/stderr.
If return_program=False (default):
→ returns bool ok
If return_program=True:
→ returns (ok: bool, exe: str | None)
"""
exe = which(programs)
if not exe:
if return_program:
return False, None
return False
try:
if args is None:
args = []
subprocess.Popen(
[exe] + args,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
print(f"(launcher) launched: {exe} {' '.join(args)}".strip())
if return_program:
return True, exe
return True
except Exception as e:
print(f"(launcher) failed {exe}: {e}")
if return_program:
return False, None
return False
def run_cmd(cmd) -> str:
"""Run a command and return its output (or error)."""
try:
out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, text=True)
return out
except subprocess.CalledProcessError as e:
return e.output
except Exception as e:
return str(e)
def expand_dir_keyword(keyword: str) -> Path:
"""
Map spoken folder keywords like 'downloads' or 'documents'
to real paths under the user's home.
"""
home = Path.home()
mapping = {
"home": home,
"downloads": home / "Downloads",
"documents": home / "Documents",
"desktop": home / "Desktop",
"pictures": home / "Pictures",
"music": home / "Music",
"videos": home / "Videos",
}
if not keyword:
return home
kraw = keyword.strip().lower()
k = kraw.rstrip("s") + ("s" if kraw.endswith("s") else "s")
for key, path in mapping.items():
if k.startswith(key.rstrip("s")):
return path
return home
def looks_like_url(s: str) -> bool:
"""Rudimentary URL / domain heuristic."""
s = s.strip().lower()
return (
s.startswith("http://")
or s.startswith("https://")
or re.match(r"^[a-z0-9\-\.]+\.[a-z]{2,}(/.*)?$", s) is not None
)
# -------------- Gmail helper --------------
def get_gmail_service():
"""
Returns an authenticated Gmail API service.
On first run, opens a browser window for OAuth consent and saves token.json.
"""
creds = None
token_path = Path("token.json")
if token_path.exists():
creds = Credentials.from_authorized_user_file(str(token_path), GMAIL_SCOPES)
# Refresh or fetch new token if needed
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
try:
creds.refresh(Request())
except Exception as e:
print(f"(gmail) token refresh failed: {e}")
creds = None
if not creds:
if not Path("credentials.json").exists():
print("(gmail) Missing credentials.json for OAuth.")
return None
flow = InstalledAppFlow.from_client_secrets_file(
"credentials.json", GMAIL_SCOPES
)
# This opens a local browser for login once
creds = flow.run_local_server(port=0)
# Save for next time
with token_path.open("w", encoding="utf-8") as f:
f.write(creds.to_json())
try:
service = build("gmail", "v1", credentials=creds)
return service
except Exception as e:
print(f"(gmail) Failed to build service: {e}")
return None
def log_nlu(record: Dict[str, Any]) -> None:
"""
Log NLU command both to local JSONL (NLU_LOG)
and to MongoDB (MONGO_COLLECTION_NLU), if configured.
"""
if not record:
return
append_jsonl(record)
_insert_mongo(MONGO_COLLECTION_NLU, record)
def attach_gui_logger(logger) -> None:
"""
Optional hook so GUI (NiceGUI) can receive log lines.
`logger` is expected to have a .put(str) method (like ThreadLog).
"""
global _gui_logger
_gui_logger = logger
def gui_log(msg: str) -> None:
"""
Log to console AND, if available, to the GUI logger.
Use this instead of print() when you want messages
to show up both in terminal and in the NiceGUI console.
"""
print(msg)
global _gui_logger
if _gui_logger is not None:
try:
_gui_logger.put(msg)
except Exception:
pass