Files
hsbg-ai/hsbg_ai/backend/vision/screenshot_manager.py

196 lines
6.5 KiB
Python
Raw Permalink Normal View History

2026-03-31 13:10:46 +02:00
"""Capture d'écran continue + extraction OCR de l'état HSBG."""
import asyncio
import base64
import io
import re
import structlog
log = structlog.get_logger()
try:
import mss
import mss.tools
MSS_OK = True
except ImportError:
MSS_OK = False
log.warning("vision.mss_missing", install="pip install mss")
try:
from PIL import Image, ImageEnhance, ImageFilter
PIL_OK = True
except ImportError:
PIL_OK = False
try:
import pytesseract
OCR_OK = True
except ImportError:
OCR_OK = False
log.warning("vision.tesseract_missing",
install="sudo apt-get install tesseract-ocr && pip install pytesseract")
class ScreenshotManager:
"""
Capture et analyse les frames du jeu HSBG.
Zones de capture (coordonnées relatives 0.0-1.0):
- gold: haut gauche (or disponible)
- tier: haut droite (niveau de taverne)
- hero_hp: centre haut (HP du héros)
- board: centre (board du joueur)
- tavern: bas (serviteurs en taverne)
"""
ZONES = {
"gold": (0.02, 0.02, 0.12, 0.10),
"tier": (0.85, 0.02, 0.98, 0.14),
"hero_hp": (0.44, 0.01, 0.56, 0.09),
"board": (0.10, 0.38, 0.90, 0.72),
"tavern": (0.02, 0.72, 0.98, 0.98),
}
def __init__(self, settings):
self.settings = settings
self._running = False
self._latest_bytes: bytes | None = None
self._latest_state: dict = {}
self._task: asyncio.Task | None = None
if OCR_OK and settings.tesseract_path:
pytesseract.pytesseract.tesseract_cmd = settings.tesseract_path
async def start(self):
"""Démarre la boucle de capture en arrière-plan."""
if not MSS_OK:
log.warning("vision.skipped", reason="mss non installé")
return
self._running = True
self._task = asyncio.create_task(self._loop())
log.info("vision.capture_started", interval=self.settings.screenshot_interval)
async def stop(self):
"""Arrête proprement la boucle de capture."""
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
log.info("vision.capture_stopped")
async def _loop(self):
"""Boucle principale de capture."""
while self._running:
try:
await self._tick()
await asyncio.sleep(self.settings.screenshot_interval)
except asyncio.CancelledError:
break
except Exception as e:
log.warning("vision.loop_error", error=str(e))
await asyncio.sleep(2)
async def _tick(self):
"""Une itération: capture + analyse."""
loop = asyncio.get_event_loop()
data = await loop.run_in_executor(None, self._grab)
if data:
self._latest_bytes = data
state = await loop.run_in_executor(None, self._extract, data)
if state:
self._latest_state = state
def _grab(self) -> bytes | None:
"""Prend une capture d'écran (exécuté dans un thread)."""
if not MSS_OK:
return None
try:
with mss.mss() as sct:
monitor = sct.monitors[1] # Écran principal
frame = sct.grab(monitor)
return mss.tools.to_png(frame.rgb, frame.size)
except Exception as e:
log.warning("screenshot.grab_failed", error=str(e))
return None
def _extract(self, data: bytes) -> dict:
"""Extrait les valeurs numériques via OCR (exécuté dans un thread)."""
if not PIL_OK or not OCR_OK:
return {}
try:
img = Image.open(io.BytesIO(data))
state = {}
for zone_name, coords in self.ZONES.items():
crop = self._crop(img, coords)
if crop:
text = self._ocr(crop)
if text:
state[f"raw_{zone_name}"] = text
# Parser les valeurs numériques
state["gold"] = self._parse_num(state.get("raw_gold", ""))
state["tavern_tier"] = self._parse_num(state.get("raw_tier", ""))
state["hero_hp"] = self._parse_num(state.get("raw_hero_hp", ""))
return state
except Exception as e:
log.warning("vision.extract_failed", error=str(e))
return {}
def _crop(self, img, coords: tuple):
"""Découpe une zone de l'image."""
if not PIL_OK:
return None
try:
w, h = img.size
x1, y1, x2, y2 = coords
return img.crop((int(x1 * w), int(y1 * h), int(x2 * w), int(y2 * h)))
except Exception:
return None
def _ocr(self, img) -> str:
"""OCR optimisé pour HSBG (chiffres + lettres)."""
if not OCR_OK or not PIL_OK:
return ""
try:
# Pipeline de prétraitement pour améliorer l'OCR
enhanced = ImageEnhance.Contrast(img).enhance(2.5)
enhanced = enhanced.convert("L") # Grayscale
enhanced = enhanced.filter(ImageFilter.SHARPEN)
# Agrandir pour meilleure précision
w, h = enhanced.size
enhanced = enhanced.resize((w * 2, h * 2), Image.LANCZOS)
text = pytesseract.image_to_string(
enhanced,
config="--psm 7 --oem 3 -c tessedit_char_whitelist=0123456789/ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz ",
)
return text.strip()
except Exception:
return ""
def _parse_num(self, text: str) -> int:
"""Extrait un nombre d'un texte OCR."""
nums = re.findall(r"\d+", text or "")
return int(nums[0]) if nums else 0
# ─── API publique ─────────────────────────────────────────────────────────
def get_b64(self) -> str | None:
"""Dernière capture en base64 (pour affichage frontend)."""
if self._latest_bytes:
return base64.b64encode(self._latest_bytes).decode()
return None
def get_state(self) -> dict:
"""Dernier état extrait."""
return self._latest_state.copy()
async def capture_now(self) -> dict:
"""Déclenche une capture manuelle et retourne l'état."""
await self._tick()
return self.get_state()