| Message Panels : Aide Technique : script du serveur AI de traduction (à installer sur une machine linux ubuntu) |
| << | <First | < | <prev. | > | next.> | >> | Last> | Rep | Reply | ^ | Discussion ^ | v | Discussion v | Del | Delete the discussion |
| 2025 | Sent: 25/09/2025 01:54:17 | 1 / 3 | Message 1 from 3 |
""" Async TCP server that: - listens on port 12345 - each client session times out after 60 seconds of inactivity - messages are framed as: 2-byte little-endian length prefix that counts UTF-16 code units, followed by UTF-16-LE encoded payload - each input payload is "ID|SRC|TGT|input_text" where SRC/TGT are FR/EN/DE - output payload is "ID|output_text" (ID preserved unchanged) - text is filtered (no more than 2 consecutive identical non-digit characters) - text is translated using facebook/nllb-200-distilled-600M - reply is sent on the same connection in same framing/encoding, with the 2-byte prefix indicating number of UTF-16 code units """ import asyncio import struct import logging import threading from typing import Optional # Transformers imports from transformers import AutoTokenizer, AutoModelForSeq2SeqLM HOST = "0.0.0.0" PORT = 12345 INACTIVITY_TIMEOUT = 60.0 # seconds # Map two-letter codes to NLLB language tags LANG_MAP = { "FR": "fra_Latn", "EN": "eng_Latn", "DE": "deu_Latn", } logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") def filter_text(text: str) -> str: """ Remove characters that would make more than 2 consecutive identical non-digit characters. Digits are exempted from the rule (they can repeat arbitrarily). """ if not text: return text out_chars = [] prev_char: Optional[str] = None run_len = 0 for ch in text: if ch.isdigit(): out_chars.append(ch) prev_char = None run_len = 0 continue if ch == prev_char: run_len += 1 else: prev_char = ch run_len = 1 if run_len <= 2: out_chars.append(ch) # else: skip extra repeats return "".join(out_chars) class NLLBTranslator: """ Wrapper around facebook/nllb-200-distilled-600M. - Loads tokenizer and model once. - Uses a lock to avoid concurrent tokenizer state mutation (e.g., src_lang). - Finds forced_bos_token_id robustly even if tokenizer lacks lang_code_to_id. """ def __init__(self, model_name: str = "facebook/nllb-200-distilled-600M", device: int = -1): logging.info("Loading tokenizer and model (%s). This may take a while...", model_name) self.tokenizer = AutoTokenizer.from_pretrained(model_name, src_lang="eng_Latn") self.model = AutoModelForSeq2SeqLM.from_pretrained(model_name) self._lock = threading.Lock() # move model to GPU if requested and available if device is not None and device >= 0: try: import torch self.model.to(torch.device(f"cuda:{device}")) logging.info("Model moved to CUDA device %d", device) except Exception: logging.warning("Could not move model to CUDA; continuing on CPU") logging.info("Model and tokenizer loaded") def _find_lang_token_id(self, lang_code: str) -> int: """ Robustly find the tokenizer id for language token required by NLLB models. Tries multiple fallbacks and raises ValueError if not found. """ # 1) lang_code_to_id attribute if hasattr(self.tokenizer, "lang_code_to_id"): mapping = getattr(self.tokenizer, "lang_code_to_id") if isinstance(mapping, dict) and lang_code in mapping: return mapping[lang_code] # 2) lang2id attribute (older variants) if hasattr(self.tokenizer, "lang2id"): mapping = getattr(self.tokenizer, "lang2id") if isinstance(mapping, dict) and lang_code in mapping: return mapping[lang_code] # 3) explicit token like "<fra_Latn>" token = f"<{lang_code}>" try: tok_id = self.tokenizer.convert_tokens_to_ids(token) if tok_id and tok_id != getattr(self.tokenizer, "unk_token_id", None): return tok_id except Exception: pass # 4) scan vocabulary try: vocab = self.tokenizer.get_vocab() except Exception: vocab = None if vocab: if token in vocab: return vocab[token] # try token without angle brackets if lang_code in vocab: return vocab[lang_code] # try loose matches for name, idx in vocab.items(): if name.strip() == token: return idx raise ValueError(f"Could not find tokenizer id for language code {lang_code}") def translate(self, text: str, src_code: str, tgt_code: str, max_length: int = 512) -> str: """ Translate text from src_code (e.g., fra_Latn) to tgt_code (e.g., deu_Latn). This method is thread-safe for concurrent calls thanks to a lock around tokenizer state changes and generation. """ # resolve forced BOS id forced_bos = self._find_lang_token_id(tgt_code) # Prepare tokenization and generation inside lock to avoid concurrent mutation of tokenizer state with self._lock: # If tokenizer supports src_lang attribute, set it for this call use_src_attr = hasattr(self.tokenizer, "src_lang") if use_src_attr: try: setattr(self.tokenizer, "src_lang", src_code) except Exception: use_src_attr = False # If tokenizer doesn't support src_lang, prepend explicit source token if not use_src_attr: text_to_tokenize = f"<{src_code}> {text}" else: text_to_tokenize = text # Tokenize inputs = self.tokenizer(text_to_tokenize, return_tensors="pt", truncation=True, max_length=max_length) # Move inputs to model device try: import torch device = next(self.model.parameters()).device inputs = {k: v.to(device) for k, v in inputs.items()} except Exception: pass # Generate out_tokens = self.model.generate( **inputs, forced_bos_token_id=forced_bos, max_length=max_length, num_beams=4, early_stopping=True, ) # Decode using tokenizer (keep using original tokenizer; skip_special_tokens will remove language tokens) decoded = self.tokenizer.batch_decode(out_tokens, skip_special_tokens=True)[0] return decoded # Lazy global translator instance _TRANSLATOR: Optional[NLLBTranslator] = None _TRANSLATOR_LOCK = threading.Lock() def get_translator() -> NLLBTranslator: global _TRANSLATOR if _TRANSLATOR is None: with _TRANSLATOR_LOCK: if _TRANSLATOR is None: _TRANSLATOR = NLLBTranslator() return _TRANSLATOR async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: peer = writer.get_extra_info("peername") logging.info("Connection from %s", peer) try: while True: # Read 2-byte length prefix (number of UTF-16 code units) try: header = await asyncio.wait_for(reader.readexactly(2), timeout=INACTIVITY_TIMEOUT) except asyncio.IncompleteReadError: logging.info("Client %s closed connection", peer) break except asyncio.TimeoutError: logging.info("Client %s inactive for %s seconds, closing", peer, INACTIVITY_TIMEOUT) break (code_unit_count,) = struct.unpack("<H", header) expected_bytes = code_unit_count * 2 # Read exact bytes corresponding to code units try: payload = b"" if expected_bytes: payload = await asyncio.wait_for(reader.readexactly(expected_bytes), timeout=INACTIVITY_TIMEOUT) except asyncio.IncompleteReadError: logging.info("Client %s closed during payload read", peer) break except asyncio.TimeoutError: logging.info("Client %s inactive during payload read, closing", peer) break # Decode payload as UTF-16-LE try: text = payload.decode("utf-16-le") except Exception: logging.exception("Failed to decode UTF-16-LE payload from %s; skipping message", peer) continue # Expect "ID|SRC|TGT|input_text" parts = text.split("|", 3) if len(parts) != 4: logging.warning("Malformed message from %s: %r", peer, text) continue msg_id, src_short, tgt_short, body = parts msg_id = msg_id.strip() src_short = src_short.strip().upper() tgt_short = tgt_short.strip().upper() if not msg_id: logging.warning("Empty ID from %s: %r", peer, text) continue if src_short not in LANG_MAP or tgt_short not in LANG_MAP: logging.warning("Unsupported language codes from %s: %s -> %s (ID=%s)", peer, src_short, tgt_short, msg_id) # respond with ID| (empty output) to keep protocol consistent translated = "" else: filtered = filter_text(body) # Translate using executor to avoid blocking event loop try: translator = get_translator() src_code = LANG_MAP[src_short] tgt_code = LANG_MAP[tgt_short] loop = asyncio.get_event_loop() translated = await loop.run_in_executor(None, translator.translate, filtered, src_code, tgt_code) except Exception: logging.exception("Translation error for %s (ID=%s)", peer, msg_id) translated = "" # Prepare output "ID|output_text" out_payload = f"{msg_id}|{translated}" reply_bytes = out_payload.encode("utf-16-le") # Ensure even length and compute code units if len(reply_bytes) % 2 == 1: reply_bytes += b"\x00" reply_code_units = len(reply_bytes) // 2 # Truncate if too large if reply_code_units > 0xFFFF: max_bytes = 0xFFFF * 2 reply_bytes = reply_bytes[:max_bytes] reply_code_units = 0xFFFF reply_header = struct.pack("<H", reply_code_units) try: writer.write(reply_header + reply_bytes) await writer.drain() except Exception: logging.exception("Failed to send reply to %s (ID=%s)", peer, msg_id) break finally: try: writer.close() await writer.wait_closed() except Exception: pass logging.info("Connection with %s closed", peer) async def main(): server = await asyncio.start_server(handle_client, HOST, PORT) addr = server.sockets[0].getsockname() logging.info("Serving on %s", addr) async with server: await server.serve_forever() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: logging.info("Server stopped by user") |
| 2025 | Sent: 25/09/2025 01:58:49 | 2 / 3 | Message 2 from 3 |
comment installer 1) avoir un serveur linux ubuntu 2) installer python 3) lancer le script 4) en cas d'erreur, copier le message d'erreur dans une IA de votre choix qui vous dira quoi faire pour installer ce qui manque. |
| 2025 | Sent: 26/09/2025 10:52:36 | 3 / 3 | Message 3 from 3 |
modification : early_stopping=True, changer à : early_stopping=False, sinon ça coupe parfois les longues phrases. |
| << | <First | < | <prev. | > | next.> | >> | Last> | Rep | Reply | ^ | Discussion ^ | v | Discussion v | Del | Delete the discussion |