diff --git a/pyproject.toml b/pyproject.toml
index bf681ac..9317a58 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -15,3 +15,7 @@ dependencies = [
"aiohttp>=3.11.11",
"tree-sitter-language-pack>=0.2.0",
]
+
+[project.scripts]
+typstar-anki = "anki.main:main"
+
diff --git a/src/anki/__init__.py b/src/anki/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/anki/anki_api.py b/src/anki/anki_api.py
new file mode 100644
index 0000000..82c4e1e
--- /dev/null
+++ b/src/anki/anki_api.py
@@ -0,0 +1,76 @@
+import asyncio
+import base64
+from typing import List
+
+import aiohttp
+
+from .flashcard import Flashcard
+
+
+class AnkiConnectError(Exception):
+ pass
+
+
+class AnkiConnectApi:
+ url: str
+
+ def __init__(self, url="http://127.0.0.1:8765"):
+ self.url = url
+
+ async def push_flashcards(self, cards: List[Flashcard]):
+ add = []
+ update = []
+
+ for card in cards:
+ if card.is_new():
+ add.append(card)
+ else:
+ update.append(card)
+ await asyncio.gather(self._add(add), self._update(update))
+
+ async def _request_api(self, action, **params):
+ async with aiohttp.ClientSession() as session:
+ data = {
+ "action": action,
+ "version": 6,
+ "params": params,
+ }
+ try:
+ async with session.post(url=self.url, json=data) as response:
+ result = await response.json(encoding="utf-8")
+ if err := result["error"]:
+ raise AnkiConnectError(err)
+ return result["result"]
+ except aiohttp.ClientError as e:
+ raise AnkiConnectError(f"Could not connect to Anki: {e}")
+
+ async def _update_note_model(self, card: Flashcard):
+ await self._request_api("updateNoteModel", note=card.as_anki_model())
+
+ async def _store_media(self, card):
+ await self._request_api("storeMediaFile",
+ filename=card.svg_filename(True),
+ data=base64.b64encode(card.svg_front).decode())
+ await self._request_api("storeMediaFile",
+ filename=card.svg_filename(False),
+ data=base64.b64encode(card.svg_back).decode())
+
+ async def _add(self, cards: List[Flashcard]):
+ notes = []
+ for card in cards:
+ data = {
+ "deckName": card.deck,
+ "options": {
+ "allowDuplicate": True, # won't work with svgs
+ },
+ }
+ data.update(card.as_anki_model(True))
+ notes.append(data)
+ result = await self._request_api("addNotes", notes=notes)
+ for idx, note_id in enumerate(result):
+ cards[idx].update_id(note_id)
+ await self._update(cards)
+
+ async def _update(self, cards: List[Flashcard]):
+ await asyncio.gather(*(self._update_note_model(card) for card in cards),
+ *(self._store_media(card) for card in cards))
diff --git a/src/anki/file_handler.py b/src/anki/file_handler.py
new file mode 100644
index 0000000..e09397b
--- /dev/null
+++ b/src/anki/file_handler.py
@@ -0,0 +1,40 @@
+from typing import List
+
+import tree_sitter
+
+
+class FileHandler:
+ file_path: str
+ file_content: List[str]
+
+ def __init__(self, path):
+ self.file_path = path
+ self.read()
+
+ def get_bytes(self) -> bytes:
+ return bytes("".join(self.file_content), encoding="utf-8")
+
+ def get_node_content(self, node: tree_sitter.Node, remove_outer=False):
+ content = "".join(
+ self.file_content[node.start_point.row:node.end_point.row + 1]
+ )[node.start_point.column:-(len(self.file_content[node.end_point.row]) - node.end_point.column)]
+ return content[1:-1] if remove_outer else content
+
+ def update_node_content(self, node: tree_sitter.Node, value):
+ new_lines = self.file_content[:node.start_point.row]
+ first_line = self.file_content[node.start_point.row][:node.start_point.column]
+ last_line = self.file_content[node.end_point.row][node.end_point.column:]
+ new_lines.extend((
+ line + "\n" for line in (first_line + str(value) + last_line).split("\n")
+ if line != ""
+ ))
+ new_lines.extend(self.file_content[node.end_point.row + 1:])
+ self.file_content = new_lines
+
+ def read(self):
+ with open(self.file_path, encoding="utf-8") as f:
+ self.file_content = f.readlines()
+
+ def write(self):
+ with open(self.file_path, "w", encoding="utf-8") as f:
+ f.writelines(self.file_content)
diff --git a/src/anki/flashcard.py b/src/anki/flashcard.py
new file mode 100644
index 0000000..d487b63
--- /dev/null
+++ b/src/anki/flashcard.py
@@ -0,0 +1,71 @@
+import tree_sitter
+
+
+class Flashcard:
+ note_id: int
+ front: str
+ back: str
+ deck: str
+ id_updated: bool
+
+ note_id_node: tree_sitter.Node
+ front_node: tree_sitter.Node
+ back_node: tree_sitter.Node
+
+ svg_front: bytes
+ svg_back: bytes
+
+ def __init__(self, front: str, back: str, deck: str = None, note_id: int = None):
+ if not deck:
+ deck = "Default"
+ if not note_id:
+ note_id = 0
+ self.front = front
+ self.back = back
+ self.deck = deck
+ self.note_id = note_id
+ self.id_updated = False
+
+ def __str__(self):
+ return f"Flashcard(id={self.note_id}, front={self.front})"
+
+ def as_typst(self, front: bool) -> str:
+ return f"#flashcard({self.note_id})[{self.front if front else ''}][{self.back if not front else ''}]"
+
+ def as_html(self, front: bool) -> str:
+ prefix = f"
{self.front}: {self.back}{' ' * 10}
" # indexable via anki search
+ image = f'
'
+ return prefix + image
+
+ def as_anki_model(self, tmp: bool = False) -> dict:
+ model = {
+ "modelName": "Basic",
+ "fields": {
+ "Front": f"tmp typst: {self.front}" if tmp else self.as_html(True),
+ "Back": f"tmp typst: {self.back}" if tmp else self.as_html(False),
+ },
+ "tags": ["typst"]
+ }
+ if not self.is_new():
+ model["id"] = self.note_id
+ return model
+
+ def svg_filename(self, front: bool) -> str:
+ return f"typst_{self.note_id}_{'front' if front else 'back'}.svg"
+
+ def is_new(self) -> bool:
+ return self.note_id == 0 or self.note_id is None
+
+ def set_ts_nodes(self, front: tree_sitter.Node, back: tree_sitter.Node, note_id: tree_sitter.Node):
+ self.front_node = front
+ self.back_node = back
+ self.note_id_node = note_id
+
+ def update_id(self, value):
+ if self.note_id != value:
+ self.note_id = value
+ self.id_updated = True
+
+ def set_svgs(self, front, back):
+ self.svg_front = front
+ self.svg_back = back
diff --git a/src/anki/main.py b/src/anki/main.py
new file mode 100644
index 0000000..12af433
--- /dev/null
+++ b/src/anki/main.py
@@ -0,0 +1,52 @@
+import asyncio
+import glob
+import os
+
+from anki.anki_api import AnkiConnectApi
+from anki.file_handler import FileHandler
+from anki.parser import FlashcardParser
+from anki.typst_compiler import TypstCompiler
+
+parser = FlashcardParser()
+compiler = TypstCompiler(os.getcwd())
+api = AnkiConnectApi()
+
+
+async def export_flashcards(path):
+ # parse flashcards
+ print("Parsing flashcards...")
+ flashcards = []
+ file_handlers = []
+ for file in glob.glob(f"{path}/**/*.typ", recursive=True):
+ fh = FileHandler(file)
+ cards = parser.parse_file(fh)
+ file_handlers.append((fh, cards))
+ flashcards.extend(cards)
+
+ # async typst compilation
+ print("Compiling flashcards...")
+ await compiler.compile_flashcards(flashcards)
+
+ # async anki push per deck
+ print("Pushing flashcards to anki...")
+ await api.push_flashcards(flashcards)
+
+ # write id updates to files
+ print("Updating ids in source...")
+ for fh, cards in file_handlers:
+ file_updated = False
+ for c in cards:
+ if c.id_updated:
+ fh.update_node_content(c.note_id_node, c.note_id)
+ file_updated = True
+ if file_updated:
+ fh.write()
+ print("Done")
+
+
+def main():
+ asyncio.run(export_flashcards(os.getcwd()))
+
+
+if __name__ == "__main__":
+ main()
diff --git a/src/anki/parser.py b/src/anki/parser.py
new file mode 100644
index 0000000..3217a76
--- /dev/null
+++ b/src/anki/parser.py
@@ -0,0 +1,57 @@
+from typing import List
+
+import tree_sitter
+from tree_sitter_language_pack import get_language, get_parser
+
+from .file_handler import FileHandler
+from .flashcard import Flashcard
+
+ts_flashcard_query = """
+(call
+ item: [
+ (call
+ item: (call
+ item: (ident) @fncall
+ (group
+ (number) @id))
+ (content) @front)
+ (call
+ item: (ident) @fncall
+ (group
+ (number) @id
+ (string) @front))
+ ]
+ (#eq? @fncall "flashcard")
+ ((content) @back
+) @flashcard)
+"""
+
+
+class FlashcardParser:
+ typst_language: tree_sitter.Language
+ typst_parser: tree_sitter.Parser
+ flashcard_query: tree_sitter.Query
+
+ def __init__(self):
+ self.typst_language = get_language("typst")
+ self.typst_parser = get_parser("typst")
+ self.flashcard_query = self.typst_language.query(ts_flashcard_query)
+
+ def parse_file(self, file: FileHandler) -> List[Flashcard]:
+ cards = []
+ tree = self.typst_parser.parse(file.get_bytes(), encoding="utf8")
+ captures = self.flashcard_query.captures(tree.root_node)
+
+ n = len(captures["flashcard"]) if captures else 0
+ for idx in range(n):
+ note_id = captures["id"][idx]
+ front = captures["front"][idx]
+ back = captures["back"][idx]
+ card = Flashcard(
+ file.get_node_content(front, True),
+ file.get_node_content(back, True),
+ note_id=int(file.get_node_content(note_id))
+ )
+ card.set_ts_nodes(front, back, note_id)
+ cards.append(card)
+ return cards
diff --git a/src/anki/typst_compiler.py b/src/anki/typst_compiler.py
new file mode 100644
index 0000000..2467a40
--- /dev/null
+++ b/src/anki/typst_compiler.py
@@ -0,0 +1,61 @@
+import asyncio
+import os
+from typing import List
+
+from .flashcard import Flashcard
+
+default_preamble = """
+#set page(width: auto, height: auto, margin: (rest: 0%))
+#let flashcard(id, front, back) = {
+ strong(front)
+ [\\ ]
+ back
+}
+"""
+
+
+class TypstCompilationError(ValueError):
+ pass
+
+
+class TypstCompiler:
+ preamble: str
+ typst_cmd: str
+ typst_root_dir: str
+ max_processes: int
+
+ def __init__(self, typst_root_dir: str = ".", typst_cmd: str = "typst", preamble: str = None):
+ if preamble is None:
+ preamble = default_preamble
+ self.typst_cmd = typst_cmd
+ self.typst_root_dir = typst_root_dir
+ self.preamble = preamble
+ self.max_processes = round(os.cpu_count() * 1.5)
+
+ async def _compile(self, src: str) -> bytes:
+ proc = await asyncio.create_subprocess_shell(
+ f"{self.typst_cmd} compile - - --root {self.typst_root_dir} --format svg",
+ stdin=asyncio.subprocess.PIPE,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE,
+ )
+ proc.stdin.write(bytes(src, encoding="utf-8"))
+ proc.stdin.close()
+ await proc.wait()
+ if err := await proc.stderr.read():
+ raise TypstCompilationError(bytes.decode(err, encoding="utf-8"))
+ return await proc.stdout.read()
+
+ async def _compile_flashcard(self, card: Flashcard):
+ front = await self._compile(self.preamble + "\n" + card.as_typst(True))
+ back = await self._compile(self.preamble + "\n" + card.as_typst(False))
+ card.set_svgs(front, back)
+
+ async def compile_flashcards(self, cards: List[Flashcard]):
+ semaphore = asyncio.Semaphore(self.max_processes)
+
+ async def compile_coro(card):
+ async with semaphore:
+ return await self._compile_flashcard(card)
+
+ return await asyncio.gather(*(compile_coro(card) for card in cards))