Planet Samuro (France)
Messages
Guest_74887
Planet Samuro (France)
 
Messages
3D Chat
Mini-Chat

Message Panels : Aide Technique : script du serveur AI de traduction (à installer sur une machine linux ubuntu)
<<<First<<prev.>next.>>>Last>RepReply^Discussion ^vDiscussion vDelDelete the discussion
SamuroSamuro2025Sent: 25/09/2025 01:54:171 / 3Message 1 from 3



"""
Async TCP server that:
- listens on port 12345
- each client session times out after 60 seconds of inactivity
- messages are framed as: 2-byte little-endian length prefix that counts UTF-16 code units,
  followed by UTF-16-LE encoded payload
- each input payload is "ID|SRC|TGT|input_text" where SRC/TGT are FR/EN/DE
- output payload is "ID|output_text" (ID preserved unchanged)
- text is filtered (no more than 2 consecutive identical non-digit characters)
- text is translated using facebook/nllb-200-distilled-600M
- reply is sent on the same connection in same framing/encoding, with the 2-byte prefix indicating number of UTF-16 code units
"""

import asyncio
import struct
import logging
import threading
from typing import Optional

# Transformers imports
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM

HOST = "0.0.0.0"
PORT = 12345
INACTIVITY_TIMEOUT = 60.0  # seconds

# Map two-letter codes to NLLB language tags
LANG_MAP = {
    "FR": "fra_Latn",
    "EN": "eng_Latn",
    "DE": "deu_Latn",
}

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")


def filter_text(text: str) -> str:
    """
    Remove characters that would make more than 2 consecutive identical non-digit characters.
    Digits are exempted from the rule (they can repeat arbitrarily).
    """
    if not text:
        return text

    out_chars = []
    prev_char: Optional[str] = None
    run_len = 0

    for ch in text:
        if ch.isdigit():
            out_chars.append(ch)
            prev_char = None
            run_len = 0
            continue

        if ch == prev_char:
            run_len += 1
        else:
            prev_char = ch
            run_len = 1

        if run_len <= 2:
            out_chars.append(ch)
        # else: skip extra repeats

    return "".join(out_chars)


class NLLBTranslator:
    """
    Wrapper around facebook/nllb-200-distilled-600M.
    - Loads tokenizer and model once.
    - Uses a lock to avoid concurrent tokenizer state mutation (e.g., src_lang).
    - Finds forced_bos_token_id robustly even if tokenizer lacks lang_code_to_id.
    """
    def __init__(self, model_name: str = "facebook/nllb-200-distilled-600M", device: int = -1):
        logging.info("Loading tokenizer and model (%s). This may take a while...", model_name)
        self.tokenizer = AutoTokenizer.from_pretrained(model_name, src_lang="eng_Latn")
        self.model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
        self._lock = threading.Lock()
        # move model to GPU if requested and available
        if device is not None and device >= 0:
            try:
                import torch
                self.model.to(torch.device(f"cuda:{device}"))
                logging.info("Model moved to CUDA device %d", device)
            except Exception:
                logging.warning("Could not move model to CUDA; continuing on CPU")
        logging.info("Model and tokenizer loaded")

    def _find_lang_token_id(self, lang_code: str) -> int:
        """
        Robustly find the tokenizer id for language token required by NLLB models.
        Tries multiple fallbacks and raises ValueError if not found.
        """
        # 1) lang_code_to_id attribute
        if hasattr(self.tokenizer, "lang_code_to_id"):
            mapping = getattr(self.tokenizer, "lang_code_to_id")
            if isinstance(mapping, dict) and lang_code in mapping:
                return mapping[lang_code]

        # 2) lang2id attribute (older variants)
        if hasattr(self.tokenizer, "lang2id"):
            mapping = getattr(self.tokenizer, "lang2id")
            if isinstance(mapping, dict) and lang_code in mapping:
                return mapping[lang_code]

        # 3) explicit token like "<fra_Latn>"
        token = f"<{lang_code}>"
        try:
            tok_id = self.tokenizer.convert_tokens_to_ids(token)
            if tok_id and tok_id != getattr(self.tokenizer, "unk_token_id", None):
                return tok_id
        except Exception:
            pass

        # 4) scan vocabulary
        try:
            vocab = self.tokenizer.get_vocab()
        except Exception:
            vocab = None

        if vocab:
            if token in vocab:
                return vocab[token]
            # try token without angle brackets
            if lang_code in vocab:
                return vocab[lang_code]
            # try loose matches
            for name, idx in vocab.items():
                if name.strip() == token:
                    return idx

        raise ValueError(f"Could not find tokenizer id for language code {lang_code}")

    def translate(self, text: str, src_code: str, tgt_code: str, max_length: int = 512) -> str:
        """
        Translate text from src_code (e.g., fra_Latn) to tgt_code (e.g., deu_Latn).
        This method is thread-safe for concurrent calls thanks to a lock around tokenizer state changes and generation.
        """
        # resolve forced BOS id
        forced_bos = self._find_lang_token_id(tgt_code)

        # Prepare tokenization and generation inside lock to avoid concurrent mutation of tokenizer state
        with self._lock:
            # If tokenizer supports src_lang attribute, set it for this call
            use_src_attr = hasattr(self.tokenizer, "src_lang")
            if use_src_attr:
                try:
                    setattr(self.tokenizer, "src_lang", src_code)
                except Exception:
                    use_src_attr = False

            # If tokenizer doesn't support src_lang, prepend explicit source token
            if not use_src_attr:
                text_to_tokenize = f"<{src_code}> {text}"
            else:
                text_to_tokenize = text

            # Tokenize
            inputs = self.tokenizer(text_to_tokenize, return_tensors="pt", truncation=True, max_length=max_length)
            # Move inputs to model device
            try:
                import torch
                device = next(self.model.parameters()).device
                inputs = {k: v.to(device) for k, v in inputs.items()}
            except Exception:
                pass

            # Generate
            out_tokens = self.model.generate(
                **inputs,
                forced_bos_token_id=forced_bos,
                max_length=max_length,
                num_beams=4,
                early_stopping=True,
            )

            # Decode using tokenizer (keep using original tokenizer; skip_special_tokens will remove language tokens)
            decoded = self.tokenizer.batch_decode(out_tokens, skip_special_tokens=True)[0]
            return decoded


# Lazy global translator instance
_TRANSLATOR: Optional[NLLBTranslator] = None
_TRANSLATOR_LOCK = threading.Lock()


def get_translator() -> NLLBTranslator:
    global _TRANSLATOR
    if _TRANSLATOR is None:
        with _TRANSLATOR_LOCK:
            if _TRANSLATOR is None:
                _TRANSLATOR = NLLBTranslator()
    return _TRANSLATOR


async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
    peer = writer.get_extra_info("peername")
    logging.info("Connection from %s", peer)
    try:
        while True:
            # Read 2-byte length prefix (number of UTF-16 code units)
            try:
                header = await asyncio.wait_for(reader.readexactly(2), timeout=INACTIVITY_TIMEOUT)
            except asyncio.IncompleteReadError:
                logging.info("Client %s closed connection", peer)
                break
            except asyncio.TimeoutError:
                logging.info("Client %s inactive for %s seconds, closing", peer, INACTIVITY_TIMEOUT)
                break

            (code_unit_count,) = struct.unpack("<H", header)
            expected_bytes = code_unit_count * 2

            # Read exact bytes corresponding to code units
            try:
                payload = b""
                if expected_bytes:
                    payload = await asyncio.wait_for(reader.readexactly(expected_bytes), timeout=INACTIVITY_TIMEOUT)
            except asyncio.IncompleteReadError:
                logging.info("Client %s closed during payload read", peer)
                break
            except asyncio.TimeoutError:
                logging.info("Client %s inactive during payload read, closing", peer)
                break

            # Decode payload as UTF-16-LE
            try:
                text = payload.decode("utf-16-le")
            except Exception:
                logging.exception("Failed to decode UTF-16-LE payload from %s; skipping message", peer)
                continue

            # Expect "ID|SRC|TGT|input_text"
            parts = text.split("|", 3)
            if len(parts) != 4:
                logging.warning("Malformed message from %s: %r", peer, text)
                continue

            msg_id, src_short, tgt_short, body = parts
            msg_id = msg_id.strip()
            src_short = src_short.strip().upper()
            tgt_short = tgt_short.strip().upper()

            if not msg_id:
                logging.warning("Empty ID from %s: %r", peer, text)
                continue

            if src_short not in LANG_MAP or tgt_short not in LANG_MAP:
                logging.warning("Unsupported language codes from %s: %s -> %s (ID=%s)", peer, src_short, tgt_short, msg_id)
                # respond with ID| (empty output) to keep protocol consistent
                translated = ""
            else:
                filtered = filter_text(body)
                # Translate using executor to avoid blocking event loop
                try:
                    translator = get_translator()
                    src_code = LANG_MAP[src_short]
                    tgt_code = LANG_MAP[tgt_short]
                    loop = asyncio.get_event_loop()
                    translated = await loop.run_in_executor(None, translator.translate, filtered, src_code, tgt_code)
                except Exception:
                    logging.exception("Translation error for %s (ID=%s)", peer, msg_id)
                    translated = ""

            # Prepare output "ID|output_text"
            out_payload = f"{msg_id}|{translated}"
            reply_bytes = out_payload.encode("utf-16-le")
            # Ensure even length and compute code units
            if len(reply_bytes) % 2 == 1:
                reply_bytes += b"\x00"
            reply_code_units = len(reply_bytes) // 2

            # Truncate if too large
            if reply_code_units > 0xFFFF:
                max_bytes = 0xFFFF * 2
                reply_bytes = reply_bytes[:max_bytes]
                reply_code_units = 0xFFFF

            reply_header = struct.pack("<H", reply_code_units)
            try:
                writer.write(reply_header + reply_bytes)
                await writer.drain()
            except Exception:
                logging.exception("Failed to send reply to %s (ID=%s)", peer, msg_id)
                break

    finally:
        try:
            writer.close()
            await writer.wait_closed()
        except Exception:
            pass
        logging.info("Connection with %s closed", peer)


async def main():
    server = await asyncio.start_server(handle_client, HOST, PORT)
    addr = server.sockets[0].getsockname()
    logging.info("Serving on %s", addr)
    async with server:
        await server.serve_forever()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logging.info("Server stopped by user")



SamuroSamuro2025Sent: 25/09/2025 01:58:492 / 3Message 2 from 3
comment installer

1) avoir un serveur linux ubuntu
2) installer python
3) lancer le script
4) en cas d'erreur, copier le message d'erreur dans une IA de votre choix qui vous dira quoi faire pour installer ce qui manque.

SamuroSamuro2025Sent: 26/09/2025 10:52:363 / 3Message 3 from 3
modification :

  early_stopping=True,

changer à :

  early_stopping=False,

sinon ça coupe parfois les longues phrases.
<<<First<<prev.>next.>>>Last>RepReply^Discussion ^vDiscussion vDelDelete the discussion