196 lines
6.5 KiB
Python
196 lines
6.5 KiB
Python
|
|
"""Capture d'écran continue + extraction OCR de l'état HSBG."""
|
||
|
|
import asyncio
|
||
|
|
import base64
|
||
|
|
import io
|
||
|
|
import re
|
||
|
|
import structlog
|
||
|
|
|
||
|
|
log = structlog.get_logger()
|
||
|
|
|
||
|
|
try:
|
||
|
|
import mss
|
||
|
|
import mss.tools
|
||
|
|
MSS_OK = True
|
||
|
|
except ImportError:
|
||
|
|
MSS_OK = False
|
||
|
|
log.warning("vision.mss_missing", install="pip install mss")
|
||
|
|
|
||
|
|
try:
|
||
|
|
from PIL import Image, ImageEnhance, ImageFilter
|
||
|
|
PIL_OK = True
|
||
|
|
except ImportError:
|
||
|
|
PIL_OK = False
|
||
|
|
|
||
|
|
try:
|
||
|
|
import pytesseract
|
||
|
|
OCR_OK = True
|
||
|
|
except ImportError:
|
||
|
|
OCR_OK = False
|
||
|
|
log.warning("vision.tesseract_missing",
|
||
|
|
install="sudo apt-get install tesseract-ocr && pip install pytesseract")
|
||
|
|
|
||
|
|
|
||
|
|
class ScreenshotManager:
|
||
|
|
"""
|
||
|
|
Capture et analyse les frames du jeu HSBG.
|
||
|
|
|
||
|
|
Zones de capture (coordonnées relatives 0.0-1.0):
|
||
|
|
- gold: haut gauche (or disponible)
|
||
|
|
- tier: haut droite (niveau de taverne)
|
||
|
|
- hero_hp: centre haut (HP du héros)
|
||
|
|
- board: centre (board du joueur)
|
||
|
|
- tavern: bas (serviteurs en taverne)
|
||
|
|
"""
|
||
|
|
|
||
|
|
ZONES = {
|
||
|
|
"gold": (0.02, 0.02, 0.12, 0.10),
|
||
|
|
"tier": (0.85, 0.02, 0.98, 0.14),
|
||
|
|
"hero_hp": (0.44, 0.01, 0.56, 0.09),
|
||
|
|
"board": (0.10, 0.38, 0.90, 0.72),
|
||
|
|
"tavern": (0.02, 0.72, 0.98, 0.98),
|
||
|
|
}
|
||
|
|
|
||
|
|
def __init__(self, settings):
|
||
|
|
self.settings = settings
|
||
|
|
self._running = False
|
||
|
|
self._latest_bytes: bytes | None = None
|
||
|
|
self._latest_state: dict = {}
|
||
|
|
self._task: asyncio.Task | None = None
|
||
|
|
|
||
|
|
if OCR_OK and settings.tesseract_path:
|
||
|
|
pytesseract.pytesseract.tesseract_cmd = settings.tesseract_path
|
||
|
|
|
||
|
|
async def start(self):
|
||
|
|
"""Démarre la boucle de capture en arrière-plan."""
|
||
|
|
if not MSS_OK:
|
||
|
|
log.warning("vision.skipped", reason="mss non installé")
|
||
|
|
return
|
||
|
|
self._running = True
|
||
|
|
self._task = asyncio.create_task(self._loop())
|
||
|
|
log.info("vision.capture_started", interval=self.settings.screenshot_interval)
|
||
|
|
|
||
|
|
async def stop(self):
|
||
|
|
"""Arrête proprement la boucle de capture."""
|
||
|
|
self._running = False
|
||
|
|
if self._task:
|
||
|
|
self._task.cancel()
|
||
|
|
try:
|
||
|
|
await self._task
|
||
|
|
except asyncio.CancelledError:
|
||
|
|
pass
|
||
|
|
log.info("vision.capture_stopped")
|
||
|
|
|
||
|
|
async def _loop(self):
|
||
|
|
"""Boucle principale de capture."""
|
||
|
|
while self._running:
|
||
|
|
try:
|
||
|
|
await self._tick()
|
||
|
|
await asyncio.sleep(self.settings.screenshot_interval)
|
||
|
|
except asyncio.CancelledError:
|
||
|
|
break
|
||
|
|
except Exception as e:
|
||
|
|
log.warning("vision.loop_error", error=str(e))
|
||
|
|
await asyncio.sleep(2)
|
||
|
|
|
||
|
|
async def _tick(self):
|
||
|
|
"""Une itération: capture + analyse."""
|
||
|
|
loop = asyncio.get_event_loop()
|
||
|
|
data = await loop.run_in_executor(None, self._grab)
|
||
|
|
if data:
|
||
|
|
self._latest_bytes = data
|
||
|
|
state = await loop.run_in_executor(None, self._extract, data)
|
||
|
|
if state:
|
||
|
|
self._latest_state = state
|
||
|
|
|
||
|
|
def _grab(self) -> bytes | None:
|
||
|
|
"""Prend une capture d'écran (exécuté dans un thread)."""
|
||
|
|
if not MSS_OK:
|
||
|
|
return None
|
||
|
|
try:
|
||
|
|
with mss.mss() as sct:
|
||
|
|
monitor = sct.monitors[1] # Écran principal
|
||
|
|
frame = sct.grab(monitor)
|
||
|
|
return mss.tools.to_png(frame.rgb, frame.size)
|
||
|
|
except Exception as e:
|
||
|
|
log.warning("screenshot.grab_failed", error=str(e))
|
||
|
|
return None
|
||
|
|
|
||
|
|
def _extract(self, data: bytes) -> dict:
|
||
|
|
"""Extrait les valeurs numériques via OCR (exécuté dans un thread)."""
|
||
|
|
if not PIL_OK or not OCR_OK:
|
||
|
|
return {}
|
||
|
|
try:
|
||
|
|
img = Image.open(io.BytesIO(data))
|
||
|
|
state = {}
|
||
|
|
|
||
|
|
for zone_name, coords in self.ZONES.items():
|
||
|
|
crop = self._crop(img, coords)
|
||
|
|
if crop:
|
||
|
|
text = self._ocr(crop)
|
||
|
|
if text:
|
||
|
|
state[f"raw_{zone_name}"] = text
|
||
|
|
|
||
|
|
# Parser les valeurs numériques
|
||
|
|
state["gold"] = self._parse_num(state.get("raw_gold", ""))
|
||
|
|
state["tavern_tier"] = self._parse_num(state.get("raw_tier", ""))
|
||
|
|
state["hero_hp"] = self._parse_num(state.get("raw_hero_hp", ""))
|
||
|
|
|
||
|
|
return state
|
||
|
|
except Exception as e:
|
||
|
|
log.warning("vision.extract_failed", error=str(e))
|
||
|
|
return {}
|
||
|
|
|
||
|
|
def _crop(self, img, coords: tuple):
|
||
|
|
"""Découpe une zone de l'image."""
|
||
|
|
if not PIL_OK:
|
||
|
|
return None
|
||
|
|
try:
|
||
|
|
w, h = img.size
|
||
|
|
x1, y1, x2, y2 = coords
|
||
|
|
return img.crop((int(x1 * w), int(y1 * h), int(x2 * w), int(y2 * h)))
|
||
|
|
except Exception:
|
||
|
|
return None
|
||
|
|
|
||
|
|
def _ocr(self, img) -> str:
|
||
|
|
"""OCR optimisé pour HSBG (chiffres + lettres)."""
|
||
|
|
if not OCR_OK or not PIL_OK:
|
||
|
|
return ""
|
||
|
|
try:
|
||
|
|
# Pipeline de prétraitement pour améliorer l'OCR
|
||
|
|
enhanced = ImageEnhance.Contrast(img).enhance(2.5)
|
||
|
|
enhanced = enhanced.convert("L") # Grayscale
|
||
|
|
enhanced = enhanced.filter(ImageFilter.SHARPEN)
|
||
|
|
# Agrandir pour meilleure précision
|
||
|
|
w, h = enhanced.size
|
||
|
|
enhanced = enhanced.resize((w * 2, h * 2), Image.LANCZOS)
|
||
|
|
|
||
|
|
text = pytesseract.image_to_string(
|
||
|
|
enhanced,
|
||
|
|
config="--psm 7 --oem 3 -c tessedit_char_whitelist=0123456789/ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz ",
|
||
|
|
)
|
||
|
|
return text.strip()
|
||
|
|
except Exception:
|
||
|
|
return ""
|
||
|
|
|
||
|
|
def _parse_num(self, text: str) -> int:
|
||
|
|
"""Extrait un nombre d'un texte OCR."""
|
||
|
|
nums = re.findall(r"\d+", text or "")
|
||
|
|
return int(nums[0]) if nums else 0
|
||
|
|
|
||
|
|
# ─── API publique ─────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def get_b64(self) -> str | None:
|
||
|
|
"""Dernière capture en base64 (pour affichage frontend)."""
|
||
|
|
if self._latest_bytes:
|
||
|
|
return base64.b64encode(self._latest_bytes).decode()
|
||
|
|
return None
|
||
|
|
|
||
|
|
def get_state(self) -> dict:
|
||
|
|
"""Dernier état extrait."""
|
||
|
|
return self._latest_state.copy()
|
||
|
|
|
||
|
|
async def capture_now(self) -> dict:
|
||
|
|
"""Déclenche une capture manuelle et retourne l'état."""
|
||
|
|
await self._tick()
|
||
|
|
return self.get_state()
|