"""Capture d'écran continue + extraction OCR de l'état HSBG.""" import asyncio import base64 import io import re import structlog log = structlog.get_logger() try: import mss import mss.tools MSS_OK = True except ImportError: MSS_OK = False log.warning("vision.mss_missing", install="pip install mss") try: from PIL import Image, ImageEnhance, ImageFilter PIL_OK = True except ImportError: PIL_OK = False try: import pytesseract OCR_OK = True except ImportError: OCR_OK = False log.warning("vision.tesseract_missing", install="sudo apt-get install tesseract-ocr && pip install pytesseract") class ScreenshotManager: """ Capture et analyse les frames du jeu HSBG. Zones de capture (coordonnées relatives 0.0-1.0): - gold: haut gauche (or disponible) - tier: haut droite (niveau de taverne) - hero_hp: centre haut (HP du héros) - board: centre (board du joueur) - tavern: bas (serviteurs en taverne) """ ZONES = { "gold": (0.02, 0.02, 0.12, 0.10), "tier": (0.85, 0.02, 0.98, 0.14), "hero_hp": (0.44, 0.01, 0.56, 0.09), "board": (0.10, 0.38, 0.90, 0.72), "tavern": (0.02, 0.72, 0.98, 0.98), } def __init__(self, settings): self.settings = settings self._running = False self._latest_bytes: bytes | None = None self._latest_state: dict = {} self._task: asyncio.Task | None = None if OCR_OK and settings.tesseract_path: pytesseract.pytesseract.tesseract_cmd = settings.tesseract_path async def start(self): """Démarre la boucle de capture en arrière-plan.""" if not MSS_OK: log.warning("vision.skipped", reason="mss non installé") return self._running = True self._task = asyncio.create_task(self._loop()) log.info("vision.capture_started", interval=self.settings.screenshot_interval) async def stop(self): """Arrête proprement la boucle de capture.""" self._running = False if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass log.info("vision.capture_stopped") async def _loop(self): """Boucle principale de capture.""" while self._running: try: await self._tick() await asyncio.sleep(self.settings.screenshot_interval) except asyncio.CancelledError: break except Exception as e: log.warning("vision.loop_error", error=str(e)) await asyncio.sleep(2) async def _tick(self): """Une itération: capture + analyse.""" loop = asyncio.get_event_loop() data = await loop.run_in_executor(None, self._grab) if data: self._latest_bytes = data state = await loop.run_in_executor(None, self._extract, data) if state: self._latest_state = state def _grab(self) -> bytes | None: """Prend une capture d'écran (exécuté dans un thread).""" if not MSS_OK: return None try: with mss.mss() as sct: monitor = sct.monitors[1] # Écran principal frame = sct.grab(monitor) return mss.tools.to_png(frame.rgb, frame.size) except Exception as e: log.warning("screenshot.grab_failed", error=str(e)) return None def _extract(self, data: bytes) -> dict: """Extrait les valeurs numériques via OCR (exécuté dans un thread).""" if not PIL_OK or not OCR_OK: return {} try: img = Image.open(io.BytesIO(data)) state = {} for zone_name, coords in self.ZONES.items(): crop = self._crop(img, coords) if crop: text = self._ocr(crop) if text: state[f"raw_{zone_name}"] = text # Parser les valeurs numériques state["gold"] = self._parse_num(state.get("raw_gold", "")) state["tavern_tier"] = self._parse_num(state.get("raw_tier", "")) state["hero_hp"] = self._parse_num(state.get("raw_hero_hp", "")) return state except Exception as e: log.warning("vision.extract_failed", error=str(e)) return {} def _crop(self, img, coords: tuple): """Découpe une zone de l'image.""" if not PIL_OK: return None try: w, h = img.size x1, y1, x2, y2 = coords return img.crop((int(x1 * w), int(y1 * h), int(x2 * w), int(y2 * h))) except Exception: return None def _ocr(self, img) -> str: """OCR optimisé pour HSBG (chiffres + lettres).""" if not OCR_OK or not PIL_OK: return "" try: # Pipeline de prétraitement pour améliorer l'OCR enhanced = ImageEnhance.Contrast(img).enhance(2.5) enhanced = enhanced.convert("L") # Grayscale enhanced = enhanced.filter(ImageFilter.SHARPEN) # Agrandir pour meilleure précision w, h = enhanced.size enhanced = enhanced.resize((w * 2, h * 2), Image.LANCZOS) text = pytesseract.image_to_string( enhanced, config="--psm 7 --oem 3 -c tessedit_char_whitelist=0123456789/ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz ", ) return text.strip() except Exception: return "" def _parse_num(self, text: str) -> int: """Extrait un nombre d'un texte OCR.""" nums = re.findall(r"\d+", text or "") return int(nums[0]) if nums else 0 # ─── API publique ───────────────────────────────────────────────────────── def get_b64(self) -> str | None: """Dernière capture en base64 (pour affichage frontend).""" if self._latest_bytes: return base64.b64encode(self._latest_bytes).decode() return None def get_state(self) -> dict: """Dernier état extrait.""" return self._latest_state.copy() async def capture_now(self) -> dict: """Déclenche une capture manuelle et retourne l'état.""" await self._tick() return self.get_state()