2025-08-14 02:38:56 +04:00

53 lines
1.7 KiB
Python

import asyncio
import os
def save_sync_token(filename, token):
"""Атомарное сохранение токена синхронизации"""
try:
temp_file = filename + ".tmp"
with open(temp_file, "w") as f:
f.write(token)
os.replace(temp_file, filename)
except IOError as e:
print(f"[ERROR] Failed to save sync token: {str(e)}")
def load_sync_token(filename):
"""Загрузка токена синхронизации"""
try:
if os.path.exists(filename):
with open(filename, "r") as f:
token = f.read().strip()
if token:
print(f"[INFO] Loaded sync token: {token[:10]}...")
return token
return None
except IOError as e:
print(f"[ERROR] Failed to load sync token: {str(e)}")
return None
def is_trusted_room(room_id, trusted_domains):
"""Проверка, находится ли комната на доверенном сервере"""
if not room_id.startswith("!"):
print(f"[WARN] Invalid room ID format: {room_id}")
return False
# Извлекаем домен комнаты
parts = room_id.split(":", 1)
if len(parts) < 2:
return False
room_domain = parts[1].lower()
# Нормализация доверенных доменов
trusted_domains = [domain.lower().strip() for domain in trusted_domains]
return room_domain in trusted_domains
def run_blocking(func, *args):
"""Запускает блокирующую функцию в отдельном потоке"""
loop = asyncio.get_running_loop()
return loop.run_in_executor(None, func, *args)