From ac353e5c742fcdabbf8b8ed03ef18953ae236654 Mon Sep 17 00:00:00 2001 From: arne314 <73391160+arne314@users.noreply.github.com> Date: Sun, 22 Dec 2024 18:22:23 +0100 Subject: [PATCH] feat(anki): basic export --- pyproject.toml | 4 ++ src/anki/__init__.py | 0 src/anki/anki_api.py | 76 ++++++++++++++++++++++++++++++++++++++ src/anki/file_handler.py | 40 ++++++++++++++++++++ src/anki/flashcard.py | 71 +++++++++++++++++++++++++++++++++++ src/anki/main.py | 52 ++++++++++++++++++++++++++ src/anki/parser.py | 57 ++++++++++++++++++++++++++++ src/anki/typst_compiler.py | 61 ++++++++++++++++++++++++++++++ 8 files changed, 361 insertions(+) create mode 100644 src/anki/__init__.py create mode 100644 src/anki/anki_api.py create mode 100644 src/anki/file_handler.py create mode 100644 src/anki/flashcard.py create mode 100644 src/anki/main.py create mode 100644 src/anki/parser.py create mode 100644 src/anki/typst_compiler.py diff --git a/pyproject.toml b/pyproject.toml index bf681ac..9317a58 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,3 +15,7 @@ dependencies = [ "aiohttp>=3.11.11", "tree-sitter-language-pack>=0.2.0", ] + +[project.scripts] +typstar-anki = "anki.main:main" + diff --git a/src/anki/__init__.py b/src/anki/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/anki/anki_api.py b/src/anki/anki_api.py new file mode 100644 index 0000000..82c4e1e --- /dev/null +++ b/src/anki/anki_api.py @@ -0,0 +1,76 @@ +import asyncio +import base64 +from typing import List + +import aiohttp + +from .flashcard import Flashcard + + +class AnkiConnectError(Exception): + pass + + +class AnkiConnectApi: + url: str + + def __init__(self, url="http://127.0.0.1:8765"): + self.url = url + + async def push_flashcards(self, cards: List[Flashcard]): + add = [] + update = [] + + for card in cards: + if card.is_new(): + add.append(card) + else: + update.append(card) + await asyncio.gather(self._add(add), self._update(update)) + + async def _request_api(self, action, **params): + async with aiohttp.ClientSession() as session: + data = { + "action": action, + "version": 6, + "params": params, + } + try: + async with session.post(url=self.url, json=data) as response: + result = await response.json(encoding="utf-8") + if err := result["error"]: + raise AnkiConnectError(err) + return result["result"] + except aiohttp.ClientError as e: + raise AnkiConnectError(f"Could not connect to Anki: {e}") + + async def _update_note_model(self, card: Flashcard): + await self._request_api("updateNoteModel", note=card.as_anki_model()) + + async def _store_media(self, card): + await self._request_api("storeMediaFile", + filename=card.svg_filename(True), + data=base64.b64encode(card.svg_front).decode()) + await self._request_api("storeMediaFile", + filename=card.svg_filename(False), + data=base64.b64encode(card.svg_back).decode()) + + async def _add(self, cards: List[Flashcard]): + notes = [] + for card in cards: + data = { + "deckName": card.deck, + "options": { + "allowDuplicate": True, # won't work with svgs + }, + } + data.update(card.as_anki_model(True)) + notes.append(data) + result = await self._request_api("addNotes", notes=notes) + for idx, note_id in enumerate(result): + cards[idx].update_id(note_id) + await self._update(cards) + + async def _update(self, cards: List[Flashcard]): + await asyncio.gather(*(self._update_note_model(card) for card in cards), + *(self._store_media(card) for card in cards)) diff --git a/src/anki/file_handler.py b/src/anki/file_handler.py new file mode 100644 index 0000000..e09397b --- /dev/null +++ b/src/anki/file_handler.py @@ -0,0 +1,40 @@ +from typing import List + +import tree_sitter + + +class FileHandler: + file_path: str + file_content: List[str] + + def __init__(self, path): + self.file_path = path + self.read() + + def get_bytes(self) -> bytes: + return bytes("".join(self.file_content), encoding="utf-8") + + def get_node_content(self, node: tree_sitter.Node, remove_outer=False): + content = "".join( + self.file_content[node.start_point.row:node.end_point.row + 1] + )[node.start_point.column:-(len(self.file_content[node.end_point.row]) - node.end_point.column)] + return content[1:-1] if remove_outer else content + + def update_node_content(self, node: tree_sitter.Node, value): + new_lines = self.file_content[:node.start_point.row] + first_line = self.file_content[node.start_point.row][:node.start_point.column] + last_line = self.file_content[node.end_point.row][node.end_point.column:] + new_lines.extend(( + line + "\n" for line in (first_line + str(value) + last_line).split("\n") + if line != "" + )) + new_lines.extend(self.file_content[node.end_point.row + 1:]) + self.file_content = new_lines + + def read(self): + with open(self.file_path, encoding="utf-8") as f: + self.file_content = f.readlines() + + def write(self): + with open(self.file_path, "w", encoding="utf-8") as f: + f.writelines(self.file_content) diff --git a/src/anki/flashcard.py b/src/anki/flashcard.py new file mode 100644 index 0000000..d487b63 --- /dev/null +++ b/src/anki/flashcard.py @@ -0,0 +1,71 @@ +import tree_sitter + + +class Flashcard: + note_id: int + front: str + back: str + deck: str + id_updated: bool + + note_id_node: tree_sitter.Node + front_node: tree_sitter.Node + back_node: tree_sitter.Node + + svg_front: bytes + svg_back: bytes + + def __init__(self, front: str, back: str, deck: str = None, note_id: int = None): + if not deck: + deck = "Default" + if not note_id: + note_id = 0 + self.front = front + self.back = back + self.deck = deck + self.note_id = note_id + self.id_updated = False + + def __str__(self): + return f"Flashcard(id={self.note_id}, front={self.front})" + + def as_typst(self, front: bool) -> str: + return f"#flashcard({self.note_id})[{self.front if front else ''}][{self.back if not front else ''}]" + + def as_html(self, front: bool) -> str: + prefix = f"" # indexable via anki search + image = f'' + return prefix + image + + def as_anki_model(self, tmp: bool = False) -> dict: + model = { + "modelName": "Basic", + "fields": { + "Front": f"tmp typst: {self.front}" if tmp else self.as_html(True), + "Back": f"tmp typst: {self.back}" if tmp else self.as_html(False), + }, + "tags": ["typst"] + } + if not self.is_new(): + model["id"] = self.note_id + return model + + def svg_filename(self, front: bool) -> str: + return f"typst_{self.note_id}_{'front' if front else 'back'}.svg" + + def is_new(self) -> bool: + return self.note_id == 0 or self.note_id is None + + def set_ts_nodes(self, front: tree_sitter.Node, back: tree_sitter.Node, note_id: tree_sitter.Node): + self.front_node = front + self.back_node = back + self.note_id_node = note_id + + def update_id(self, value): + if self.note_id != value: + self.note_id = value + self.id_updated = True + + def set_svgs(self, front, back): + self.svg_front = front + self.svg_back = back diff --git a/src/anki/main.py b/src/anki/main.py new file mode 100644 index 0000000..12af433 --- /dev/null +++ b/src/anki/main.py @@ -0,0 +1,52 @@ +import asyncio +import glob +import os + +from anki.anki_api import AnkiConnectApi +from anki.file_handler import FileHandler +from anki.parser import FlashcardParser +from anki.typst_compiler import TypstCompiler + +parser = FlashcardParser() +compiler = TypstCompiler(os.getcwd()) +api = AnkiConnectApi() + + +async def export_flashcards(path): + # parse flashcards + print("Parsing flashcards...") + flashcards = [] + file_handlers = [] + for file in glob.glob(f"{path}/**/*.typ", recursive=True): + fh = FileHandler(file) + cards = parser.parse_file(fh) + file_handlers.append((fh, cards)) + flashcards.extend(cards) + + # async typst compilation + print("Compiling flashcards...") + await compiler.compile_flashcards(flashcards) + + # async anki push per deck + print("Pushing flashcards to anki...") + await api.push_flashcards(flashcards) + + # write id updates to files + print("Updating ids in source...") + for fh, cards in file_handlers: + file_updated = False + for c in cards: + if c.id_updated: + fh.update_node_content(c.note_id_node, c.note_id) + file_updated = True + if file_updated: + fh.write() + print("Done") + + +def main(): + asyncio.run(export_flashcards(os.getcwd())) + + +if __name__ == "__main__": + main() diff --git a/src/anki/parser.py b/src/anki/parser.py new file mode 100644 index 0000000..3217a76 --- /dev/null +++ b/src/anki/parser.py @@ -0,0 +1,57 @@ +from typing import List + +import tree_sitter +from tree_sitter_language_pack import get_language, get_parser + +from .file_handler import FileHandler +from .flashcard import Flashcard + +ts_flashcard_query = """ +(call + item: [ + (call + item: (call + item: (ident) @fncall + (group + (number) @id)) + (content) @front) + (call + item: (ident) @fncall + (group + (number) @id + (string) @front)) + ] + (#eq? @fncall "flashcard") + ((content) @back +) @flashcard) +""" + + +class FlashcardParser: + typst_language: tree_sitter.Language + typst_parser: tree_sitter.Parser + flashcard_query: tree_sitter.Query + + def __init__(self): + self.typst_language = get_language("typst") + self.typst_parser = get_parser("typst") + self.flashcard_query = self.typst_language.query(ts_flashcard_query) + + def parse_file(self, file: FileHandler) -> List[Flashcard]: + cards = [] + tree = self.typst_parser.parse(file.get_bytes(), encoding="utf8") + captures = self.flashcard_query.captures(tree.root_node) + + n = len(captures["flashcard"]) if captures else 0 + for idx in range(n): + note_id = captures["id"][idx] + front = captures["front"][idx] + back = captures["back"][idx] + card = Flashcard( + file.get_node_content(front, True), + file.get_node_content(back, True), + note_id=int(file.get_node_content(note_id)) + ) + card.set_ts_nodes(front, back, note_id) + cards.append(card) + return cards diff --git a/src/anki/typst_compiler.py b/src/anki/typst_compiler.py new file mode 100644 index 0000000..2467a40 --- /dev/null +++ b/src/anki/typst_compiler.py @@ -0,0 +1,61 @@ +import asyncio +import os +from typing import List + +from .flashcard import Flashcard + +default_preamble = """ +#set page(width: auto, height: auto, margin: (rest: 0%)) +#let flashcard(id, front, back) = { + strong(front) + [\\ ] + back +} +""" + + +class TypstCompilationError(ValueError): + pass + + +class TypstCompiler: + preamble: str + typst_cmd: str + typst_root_dir: str + max_processes: int + + def __init__(self, typst_root_dir: str = ".", typst_cmd: str = "typst", preamble: str = None): + if preamble is None: + preamble = default_preamble + self.typst_cmd = typst_cmd + self.typst_root_dir = typst_root_dir + self.preamble = preamble + self.max_processes = round(os.cpu_count() * 1.5) + + async def _compile(self, src: str) -> bytes: + proc = await asyncio.create_subprocess_shell( + f"{self.typst_cmd} compile - - --root {self.typst_root_dir} --format svg", + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + proc.stdin.write(bytes(src, encoding="utf-8")) + proc.stdin.close() + await proc.wait() + if err := await proc.stderr.read(): + raise TypstCompilationError(bytes.decode(err, encoding="utf-8")) + return await proc.stdout.read() + + async def _compile_flashcard(self, card: Flashcard): + front = await self._compile(self.preamble + "\n" + card.as_typst(True)) + back = await self._compile(self.preamble + "\n" + card.as_typst(False)) + card.set_svgs(front, back) + + async def compile_flashcards(self, cards: List[Flashcard]): + semaphore = asyncio.Semaphore(self.max_processes) + + async def compile_coro(card): + async with semaphore: + return await self._compile_flashcard(card) + + return await asyncio.gather(*(compile_coro(card) for card in cards))