mirror of
https://github.com/Ascyii/typstar.git
synced 2026-01-01 05:24:24 -05:00
feat(anki): basic export
This commit is contained in:
0
src/anki/__init__.py
Normal file
0
src/anki/__init__.py
Normal file
76
src/anki/anki_api.py
Normal file
76
src/anki/anki_api.py
Normal file
@@ -0,0 +1,76 @@
|
||||
import asyncio
|
||||
import base64
|
||||
from typing import List
|
||||
|
||||
import aiohttp
|
||||
|
||||
from .flashcard import Flashcard
|
||||
|
||||
|
||||
class AnkiConnectError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class AnkiConnectApi:
|
||||
url: str
|
||||
|
||||
def __init__(self, url="http://127.0.0.1:8765"):
|
||||
self.url = url
|
||||
|
||||
async def push_flashcards(self, cards: List[Flashcard]):
|
||||
add = []
|
||||
update = []
|
||||
|
||||
for card in cards:
|
||||
if card.is_new():
|
||||
add.append(card)
|
||||
else:
|
||||
update.append(card)
|
||||
await asyncio.gather(self._add(add), self._update(update))
|
||||
|
||||
async def _request_api(self, action, **params):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
data = {
|
||||
"action": action,
|
||||
"version": 6,
|
||||
"params": params,
|
||||
}
|
||||
try:
|
||||
async with session.post(url=self.url, json=data) as response:
|
||||
result = await response.json(encoding="utf-8")
|
||||
if err := result["error"]:
|
||||
raise AnkiConnectError(err)
|
||||
return result["result"]
|
||||
except aiohttp.ClientError as e:
|
||||
raise AnkiConnectError(f"Could not connect to Anki: {e}")
|
||||
|
||||
async def _update_note_model(self, card: Flashcard):
|
||||
await self._request_api("updateNoteModel", note=card.as_anki_model())
|
||||
|
||||
async def _store_media(self, card):
|
||||
await self._request_api("storeMediaFile",
|
||||
filename=card.svg_filename(True),
|
||||
data=base64.b64encode(card.svg_front).decode())
|
||||
await self._request_api("storeMediaFile",
|
||||
filename=card.svg_filename(False),
|
||||
data=base64.b64encode(card.svg_back).decode())
|
||||
|
||||
async def _add(self, cards: List[Flashcard]):
|
||||
notes = []
|
||||
for card in cards:
|
||||
data = {
|
||||
"deckName": card.deck,
|
||||
"options": {
|
||||
"allowDuplicate": True, # won't work with svgs
|
||||
},
|
||||
}
|
||||
data.update(card.as_anki_model(True))
|
||||
notes.append(data)
|
||||
result = await self._request_api("addNotes", notes=notes)
|
||||
for idx, note_id in enumerate(result):
|
||||
cards[idx].update_id(note_id)
|
||||
await self._update(cards)
|
||||
|
||||
async def _update(self, cards: List[Flashcard]):
|
||||
await asyncio.gather(*(self._update_note_model(card) for card in cards),
|
||||
*(self._store_media(card) for card in cards))
|
||||
40
src/anki/file_handler.py
Normal file
40
src/anki/file_handler.py
Normal file
@@ -0,0 +1,40 @@
|
||||
from typing import List
|
||||
|
||||
import tree_sitter
|
||||
|
||||
|
||||
class FileHandler:
|
||||
file_path: str
|
||||
file_content: List[str]
|
||||
|
||||
def __init__(self, path):
|
||||
self.file_path = path
|
||||
self.read()
|
||||
|
||||
def get_bytes(self) -> bytes:
|
||||
return bytes("".join(self.file_content), encoding="utf-8")
|
||||
|
||||
def get_node_content(self, node: tree_sitter.Node, remove_outer=False):
|
||||
content = "".join(
|
||||
self.file_content[node.start_point.row:node.end_point.row + 1]
|
||||
)[node.start_point.column:-(len(self.file_content[node.end_point.row]) - node.end_point.column)]
|
||||
return content[1:-1] if remove_outer else content
|
||||
|
||||
def update_node_content(self, node: tree_sitter.Node, value):
|
||||
new_lines = self.file_content[:node.start_point.row]
|
||||
first_line = self.file_content[node.start_point.row][:node.start_point.column]
|
||||
last_line = self.file_content[node.end_point.row][node.end_point.column:]
|
||||
new_lines.extend((
|
||||
line + "\n" for line in (first_line + str(value) + last_line).split("\n")
|
||||
if line != ""
|
||||
))
|
||||
new_lines.extend(self.file_content[node.end_point.row + 1:])
|
||||
self.file_content = new_lines
|
||||
|
||||
def read(self):
|
||||
with open(self.file_path, encoding="utf-8") as f:
|
||||
self.file_content = f.readlines()
|
||||
|
||||
def write(self):
|
||||
with open(self.file_path, "w", encoding="utf-8") as f:
|
||||
f.writelines(self.file_content)
|
||||
71
src/anki/flashcard.py
Normal file
71
src/anki/flashcard.py
Normal file
@@ -0,0 +1,71 @@
|
||||
import tree_sitter
|
||||
|
||||
|
||||
class Flashcard:
|
||||
note_id: int
|
||||
front: str
|
||||
back: str
|
||||
deck: str
|
||||
id_updated: bool
|
||||
|
||||
note_id_node: tree_sitter.Node
|
||||
front_node: tree_sitter.Node
|
||||
back_node: tree_sitter.Node
|
||||
|
||||
svg_front: bytes
|
||||
svg_back: bytes
|
||||
|
||||
def __init__(self, front: str, back: str, deck: str = None, note_id: int = None):
|
||||
if not deck:
|
||||
deck = "Default"
|
||||
if not note_id:
|
||||
note_id = 0
|
||||
self.front = front
|
||||
self.back = back
|
||||
self.deck = deck
|
||||
self.note_id = note_id
|
||||
self.id_updated = False
|
||||
|
||||
def __str__(self):
|
||||
return f"Flashcard(id={self.note_id}, front={self.front})"
|
||||
|
||||
def as_typst(self, front: bool) -> str:
|
||||
return f"#flashcard({self.note_id})[{self.front if front else ''}][{self.back if not front else ''}]"
|
||||
|
||||
def as_html(self, front: bool) -> str:
|
||||
prefix = f"<p hidden>{self.front}: {self.back}{' ' * 10}</p>" # indexable via anki search
|
||||
image = f'<img src="{self.svg_filename(front)}" width=100 />'
|
||||
return prefix + image
|
||||
|
||||
def as_anki_model(self, tmp: bool = False) -> dict:
|
||||
model = {
|
||||
"modelName": "Basic",
|
||||
"fields": {
|
||||
"Front": f"tmp typst: {self.front}" if tmp else self.as_html(True),
|
||||
"Back": f"tmp typst: {self.back}" if tmp else self.as_html(False),
|
||||
},
|
||||
"tags": ["typst"]
|
||||
}
|
||||
if not self.is_new():
|
||||
model["id"] = self.note_id
|
||||
return model
|
||||
|
||||
def svg_filename(self, front: bool) -> str:
|
||||
return f"typst_{self.note_id}_{'front' if front else 'back'}.svg"
|
||||
|
||||
def is_new(self) -> bool:
|
||||
return self.note_id == 0 or self.note_id is None
|
||||
|
||||
def set_ts_nodes(self, front: tree_sitter.Node, back: tree_sitter.Node, note_id: tree_sitter.Node):
|
||||
self.front_node = front
|
||||
self.back_node = back
|
||||
self.note_id_node = note_id
|
||||
|
||||
def update_id(self, value):
|
||||
if self.note_id != value:
|
||||
self.note_id = value
|
||||
self.id_updated = True
|
||||
|
||||
def set_svgs(self, front, back):
|
||||
self.svg_front = front
|
||||
self.svg_back = back
|
||||
52
src/anki/main.py
Normal file
52
src/anki/main.py
Normal file
@@ -0,0 +1,52 @@
|
||||
import asyncio
|
||||
import glob
|
||||
import os
|
||||
|
||||
from anki.anki_api import AnkiConnectApi
|
||||
from anki.file_handler import FileHandler
|
||||
from anki.parser import FlashcardParser
|
||||
from anki.typst_compiler import TypstCompiler
|
||||
|
||||
parser = FlashcardParser()
|
||||
compiler = TypstCompiler(os.getcwd())
|
||||
api = AnkiConnectApi()
|
||||
|
||||
|
||||
async def export_flashcards(path):
|
||||
# parse flashcards
|
||||
print("Parsing flashcards...")
|
||||
flashcards = []
|
||||
file_handlers = []
|
||||
for file in glob.glob(f"{path}/**/*.typ", recursive=True):
|
||||
fh = FileHandler(file)
|
||||
cards = parser.parse_file(fh)
|
||||
file_handlers.append((fh, cards))
|
||||
flashcards.extend(cards)
|
||||
|
||||
# async typst compilation
|
||||
print("Compiling flashcards...")
|
||||
await compiler.compile_flashcards(flashcards)
|
||||
|
||||
# async anki push per deck
|
||||
print("Pushing flashcards to anki...")
|
||||
await api.push_flashcards(flashcards)
|
||||
|
||||
# write id updates to files
|
||||
print("Updating ids in source...")
|
||||
for fh, cards in file_handlers:
|
||||
file_updated = False
|
||||
for c in cards:
|
||||
if c.id_updated:
|
||||
fh.update_node_content(c.note_id_node, c.note_id)
|
||||
file_updated = True
|
||||
if file_updated:
|
||||
fh.write()
|
||||
print("Done")
|
||||
|
||||
|
||||
def main():
|
||||
asyncio.run(export_flashcards(os.getcwd()))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
57
src/anki/parser.py
Normal file
57
src/anki/parser.py
Normal file
@@ -0,0 +1,57 @@
|
||||
from typing import List
|
||||
|
||||
import tree_sitter
|
||||
from tree_sitter_language_pack import get_language, get_parser
|
||||
|
||||
from .file_handler import FileHandler
|
||||
from .flashcard import Flashcard
|
||||
|
||||
ts_flashcard_query = """
|
||||
(call
|
||||
item: [
|
||||
(call
|
||||
item: (call
|
||||
item: (ident) @fncall
|
||||
(group
|
||||
(number) @id))
|
||||
(content) @front)
|
||||
(call
|
||||
item: (ident) @fncall
|
||||
(group
|
||||
(number) @id
|
||||
(string) @front))
|
||||
]
|
||||
(#eq? @fncall "flashcard")
|
||||
((content) @back
|
||||
) @flashcard)
|
||||
"""
|
||||
|
||||
|
||||
class FlashcardParser:
|
||||
typst_language: tree_sitter.Language
|
||||
typst_parser: tree_sitter.Parser
|
||||
flashcard_query: tree_sitter.Query
|
||||
|
||||
def __init__(self):
|
||||
self.typst_language = get_language("typst")
|
||||
self.typst_parser = get_parser("typst")
|
||||
self.flashcard_query = self.typst_language.query(ts_flashcard_query)
|
||||
|
||||
def parse_file(self, file: FileHandler) -> List[Flashcard]:
|
||||
cards = []
|
||||
tree = self.typst_parser.parse(file.get_bytes(), encoding="utf8")
|
||||
captures = self.flashcard_query.captures(tree.root_node)
|
||||
|
||||
n = len(captures["flashcard"]) if captures else 0
|
||||
for idx in range(n):
|
||||
note_id = captures["id"][idx]
|
||||
front = captures["front"][idx]
|
||||
back = captures["back"][idx]
|
||||
card = Flashcard(
|
||||
file.get_node_content(front, True),
|
||||
file.get_node_content(back, True),
|
||||
note_id=int(file.get_node_content(note_id))
|
||||
)
|
||||
card.set_ts_nodes(front, back, note_id)
|
||||
cards.append(card)
|
||||
return cards
|
||||
61
src/anki/typst_compiler.py
Normal file
61
src/anki/typst_compiler.py
Normal file
@@ -0,0 +1,61 @@
|
||||
import asyncio
|
||||
import os
|
||||
from typing import List
|
||||
|
||||
from .flashcard import Flashcard
|
||||
|
||||
default_preamble = """
|
||||
#set page(width: auto, height: auto, margin: (rest: 0%))
|
||||
#let flashcard(id, front, back) = {
|
||||
strong(front)
|
||||
[\\ ]
|
||||
back
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
class TypstCompilationError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
class TypstCompiler:
|
||||
preamble: str
|
||||
typst_cmd: str
|
||||
typst_root_dir: str
|
||||
max_processes: int
|
||||
|
||||
def __init__(self, typst_root_dir: str = ".", typst_cmd: str = "typst", preamble: str = None):
|
||||
if preamble is None:
|
||||
preamble = default_preamble
|
||||
self.typst_cmd = typst_cmd
|
||||
self.typst_root_dir = typst_root_dir
|
||||
self.preamble = preamble
|
||||
self.max_processes = round(os.cpu_count() * 1.5)
|
||||
|
||||
async def _compile(self, src: str) -> bytes:
|
||||
proc = await asyncio.create_subprocess_shell(
|
||||
f"{self.typst_cmd} compile - - --root {self.typst_root_dir} --format svg",
|
||||
stdin=asyncio.subprocess.PIPE,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
proc.stdin.write(bytes(src, encoding="utf-8"))
|
||||
proc.stdin.close()
|
||||
await proc.wait()
|
||||
if err := await proc.stderr.read():
|
||||
raise TypstCompilationError(bytes.decode(err, encoding="utf-8"))
|
||||
return await proc.stdout.read()
|
||||
|
||||
async def _compile_flashcard(self, card: Flashcard):
|
||||
front = await self._compile(self.preamble + "\n" + card.as_typst(True))
|
||||
back = await self._compile(self.preamble + "\n" + card.as_typst(False))
|
||||
card.set_svgs(front, back)
|
||||
|
||||
async def compile_flashcards(self, cards: List[Flashcard]):
|
||||
semaphore = asyncio.Semaphore(self.max_processes)
|
||||
|
||||
async def compile_coro(card):
|
||||
async with semaphore:
|
||||
return await self._compile_flashcard(card)
|
||||
|
||||
return await asyncio.gather(*(compile_coro(card) for card in cards))
|
||||
Reference in New Issue
Block a user