python
This commit is contained in:
110
backend/vkmusic.py
Normal file
110
backend/vkmusic.py
Normal file
@@ -0,0 +1,110 @@
|
||||
import os
|
||||
import asyncio
|
||||
import subprocess
|
||||
from typing import List, Optional, Dict, Any
|
||||
from dataclasses import dataclass
|
||||
import httpx
|
||||
import json
|
||||
import vk_api
|
||||
from vk_api.audio import VkAudio
|
||||
|
||||
@dataclass
|
||||
class Track:
|
||||
id: str
|
||||
title: str
|
||||
artist: str
|
||||
duration: int
|
||||
thumbnail: str
|
||||
url: str
|
||||
permalink: str
|
||||
|
||||
class VKMusicService:
|
||||
def __init__(self):
|
||||
self.vk_login = os.getenv("VK_LOGIN")
|
||||
self.vk_password = os.getenv("VK_PASSWORD")
|
||||
self.vk_session = None
|
||||
self.vk_audio = None
|
||||
self.client = httpx.AsyncClient()
|
||||
|
||||
if self.vk_login and self.vk_password:
|
||||
try:
|
||||
self.vk_session = vk_api.VkApi(self.vk_login, self.vk_password)
|
||||
self.vk_session.auth()
|
||||
self.vk_audio = VkAudio(self.vk_session)
|
||||
print("✅ VK Music service initialized")
|
||||
except Exception as e:
|
||||
print(f"❌ VK Music initialization failed: {e}")
|
||||
self.vk_session = None
|
||||
self.vk_audio = None
|
||||
else:
|
||||
print("⚠️ VK_LOGIN or VK_PASSWORD not configured")
|
||||
|
||||
async def search_tracks(self, query: str, limit: int = 10) -> List[Track]:
|
||||
"""Search for tracks on VK Music"""
|
||||
try:
|
||||
if not self.vk_audio:
|
||||
print("VK Audio not available, returning empty results")
|
||||
return []
|
||||
|
||||
# Search for audio tracks
|
||||
search_results = self.vk_audio.search(q=query, count=limit)
|
||||
tracks = []
|
||||
|
||||
for item in search_results:
|
||||
# VK Audio returns dict with keys: artist, title, duration, url, etc.
|
||||
track = Track(
|
||||
id=str(item.get('id', '')),
|
||||
title=item.get('title', 'Unknown Title'),
|
||||
artist=item.get('artist', 'Unknown Artist'),
|
||||
duration=item.get('duration', 0),
|
||||
thumbnail=item.get('album', {}).get('thumb', {}).get('photo_600', '') if item.get('album') else '',
|
||||
url=item.get('url', ''),
|
||||
permalink=item.get('url', '')
|
||||
)
|
||||
tracks.append(track)
|
||||
|
||||
return tracks
|
||||
|
||||
except Exception as e:
|
||||
print(f"VK search error: {e}")
|
||||
return []
|
||||
|
||||
async def get_audio_stream(self, track_id: str, track_url: str) -> Optional[bytes]:
|
||||
"""Get audio stream for a track"""
|
||||
try:
|
||||
if not track_url:
|
||||
return None
|
||||
|
||||
# Download audio stream directly from VK URL
|
||||
response = await self.client.get(track_url)
|
||||
response.raise_for_status()
|
||||
|
||||
return response.content
|
||||
|
||||
except Exception as e:
|
||||
print(f"Audio stream error: {e}")
|
||||
return None
|
||||
|
||||
async def download_track(self, track_url: str, output_path: str) -> bool:
|
||||
"""Download track directly to file"""
|
||||
try:
|
||||
if not track_url:
|
||||
return False
|
||||
|
||||
# Download directly from VK URL
|
||||
response = await self.client.get(track_url)
|
||||
response.raise_for_status()
|
||||
|
||||
# Save to file
|
||||
with open(output_path, 'wb') as f:
|
||||
f.write(response.content)
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"Download track error: {e}")
|
||||
return False
|
||||
|
||||
async def close(self):
|
||||
"""Close HTTP client"""
|
||||
await self.client.aclose()
|
||||
Reference in New Issue
Block a user