mirror of
https://github.com/Ascyii/typstar.git
synced 2026-01-01 05:24:24 -05:00
feat(anki): specify deck in typst comments
This commit is contained in:
@@ -1,12 +1,19 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import base64
|
import base64
|
||||||
from typing import List
|
from collections import defaultdict
|
||||||
|
from typing import Iterable, List
|
||||||
|
|
||||||
import aiohttp
|
import aiohttp
|
||||||
|
|
||||||
from .flashcard import Flashcard
|
from .flashcard import Flashcard
|
||||||
|
|
||||||
|
|
||||||
|
async def _gather_exceptions(coroutines):
|
||||||
|
for result in await asyncio.gather(*coroutines, return_exceptions=True):
|
||||||
|
if isinstance(result, Exception):
|
||||||
|
raise result
|
||||||
|
|
||||||
|
|
||||||
class AnkiConnectError(Exception):
|
class AnkiConnectError(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@@ -21,18 +28,26 @@ class AnkiConnectApi:
|
|||||||
self.api_key = api_key
|
self.api_key = api_key
|
||||||
self.semaphore = asyncio.Semaphore(2) # increase in case Anki implements multithreading
|
self.semaphore = asyncio.Semaphore(2) # increase in case Anki implements multithreading
|
||||||
|
|
||||||
async def push_flashcards(self, cards: List[Flashcard]):
|
async def push_flashcards(self, cards: Iterable[Flashcard]):
|
||||||
add = []
|
add: dict[str, List[Flashcard]] = defaultdict(list)
|
||||||
update = []
|
update: dict[str, List[Flashcard]] = defaultdict(list)
|
||||||
|
n_add: int = 0
|
||||||
|
n_update: int = 0
|
||||||
|
|
||||||
for card in cards:
|
for card in cards:
|
||||||
if card.is_new():
|
if card.is_new():
|
||||||
add.append(card)
|
add[card.deck].append(card)
|
||||||
|
n_add += 1
|
||||||
else:
|
else:
|
||||||
update.append(card)
|
update[card.deck].append(card)
|
||||||
print(f"Pushing {len(add)} new flashcards and {len(update)} updated flashcards to Anki...")
|
n_update += 1
|
||||||
await self._add(add)
|
|
||||||
await self._update(update)
|
print(f"Pushing {n_add} new flashcards and {n_update} updated flashcards to Anki...")
|
||||||
|
await self._create_required_decks({*add.keys(), *update.keys()})
|
||||||
|
await self._add_new_cards(add)
|
||||||
|
await _gather_exceptions(
|
||||||
|
[*self._update_cards_requests(add), *self._update_cards_requests(update, True)]
|
||||||
|
)
|
||||||
|
|
||||||
async def _request_api(self, action, **params):
|
async def _request_api(self, action, **params):
|
||||||
async with aiohttp.ClientSession() as session:
|
async with aiohttp.ClientSession() as session:
|
||||||
@@ -63,25 +78,44 @@ class AnkiConnectApi:
|
|||||||
filename=card.svg_filename(False),
|
filename=card.svg_filename(False),
|
||||||
data=base64.b64encode(card.svg_back).decode())
|
data=base64.b64encode(card.svg_back).decode())
|
||||||
|
|
||||||
async def _add(self, cards: List[Flashcard]):
|
async def _change_deck(self, deck: str, cards: List[int]):
|
||||||
notes = []
|
await self._request_api("changeDeck", deck=deck, cards=cards)
|
||||||
for card in cards:
|
|
||||||
data = {
|
|
||||||
"deckName": card.deck,
|
|
||||||
"options": {
|
|
||||||
"allowDuplicate": True, # won't work with svgs
|
|
||||||
},
|
|
||||||
}
|
|
||||||
data.update(card.as_anki_model(True))
|
|
||||||
notes.append(data)
|
|
||||||
result = await self._request_api("addNotes", notes=notes)
|
|
||||||
for idx, note_id in enumerate(result):
|
|
||||||
cards[idx].update_id(note_id)
|
|
||||||
await self._update(cards)
|
|
||||||
|
|
||||||
async def _update(self, cards: List[Flashcard]):
|
async def _add_new_cards(self, cards_map: dict[str, List[Flashcard]]):
|
||||||
results = await asyncio.gather(*(self._update_note_model(card) for card in cards),
|
notes: List[Flashcard] = []
|
||||||
*(self._store_media(card) for card in cards), return_exceptions=True)
|
notes_data: List[dict] = []
|
||||||
for result in results:
|
for cards in cards_map.values():
|
||||||
if isinstance(result, Exception):
|
for card in cards:
|
||||||
raise result
|
data = {
|
||||||
|
"deckName": card.deck,
|
||||||
|
"options": {
|
||||||
|
"allowDuplicate": True, # won't work with svgs
|
||||||
|
},
|
||||||
|
}
|
||||||
|
data.update(card.as_anki_model(True))
|
||||||
|
notes.append(card)
|
||||||
|
notes_data.append(data)
|
||||||
|
result = await self._request_api("addNotes", notes=notes_data)
|
||||||
|
for idx, note_id in enumerate(result):
|
||||||
|
notes[idx].update_id(note_id)
|
||||||
|
|
||||||
|
async def _create_required_decks(self, required: Iterable[str]):
|
||||||
|
existing = await self._request_api("deckNamesAndIds")
|
||||||
|
requests = []
|
||||||
|
for deck in required:
|
||||||
|
if deck not in existing:
|
||||||
|
requests.append(self._request_api("createDeck", deck=deck))
|
||||||
|
await _gather_exceptions(requests)
|
||||||
|
|
||||||
|
def _update_cards_requests(self, cards_map: dict[str, List[Flashcard]], update_deck: bool = True):
|
||||||
|
requests = []
|
||||||
|
for deck, cards in cards_map.items():
|
||||||
|
card_ids = []
|
||||||
|
for card in cards:
|
||||||
|
requests.append(self._update_note_model(card))
|
||||||
|
requests.append(self._store_media(card))
|
||||||
|
if update_deck:
|
||||||
|
card_ids.append(card.note_id)
|
||||||
|
if card_ids:
|
||||||
|
requests.append(self._change_deck(deck, card_ids))
|
||||||
|
return requests
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ async def export_flashcards(root_dir, clear_cache, typst_cmd, anki_url, anki_key
|
|||||||
await compiler.compile_flashcards(flashcards)
|
await compiler.compile_flashcards(flashcards)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# async anki push per deck
|
# async anki push
|
||||||
await api.push_flashcards(flashcards)
|
await api.push_flashcards(flashcards)
|
||||||
finally:
|
finally:
|
||||||
# write id updates to files
|
# write id updates to files
|
||||||
|
|||||||
@@ -1,9 +1,10 @@
|
|||||||
import glob
|
import glob
|
||||||
import json
|
import json
|
||||||
|
import re
|
||||||
|
|
||||||
from functools import cache
|
from functools import cache
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import List
|
from typing import List, Tuple
|
||||||
|
|
||||||
import appdirs
|
import appdirs
|
||||||
import tree_sitter
|
import tree_sitter
|
||||||
@@ -32,11 +33,17 @@ ts_flashcard_query = """
|
|||||||
) @flashcard)
|
) @flashcard)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
ts_deck_query = """
|
||||||
|
((comment) @deck)
|
||||||
|
"""
|
||||||
|
deck_regex = re.compile(r"\W+ANKI:\s*(\S*)")
|
||||||
|
|
||||||
|
|
||||||
class FlashcardParser:
|
class FlashcardParser:
|
||||||
typst_language: tree_sitter.Language
|
typst_language: tree_sitter.Language
|
||||||
typst_parser: tree_sitter.Parser
|
typst_parser: tree_sitter.Parser
|
||||||
flashcard_query: tree_sitter.Query
|
flashcard_query: tree_sitter.Query
|
||||||
|
deck_query: tree_sitter.Query
|
||||||
|
|
||||||
file_handlers: List[tuple[FileHandler, List[Flashcard]]]
|
file_handlers: List[tuple[FileHandler, List[Flashcard]]]
|
||||||
file_hashes: dict[str, str]
|
file_hashes: dict[str, str]
|
||||||
@@ -46,28 +53,43 @@ class FlashcardParser:
|
|||||||
self.typst_language = get_language("typst")
|
self.typst_language = get_language("typst")
|
||||||
self.typst_parser = get_parser("typst")
|
self.typst_parser = get_parser("typst")
|
||||||
self.flashcard_query = self.typst_language.query(ts_flashcard_query)
|
self.flashcard_query = self.typst_language.query(ts_flashcard_query)
|
||||||
|
self.deck_query = self.typst_language.query(ts_deck_query)
|
||||||
self.file_handlers = []
|
self.file_handlers = []
|
||||||
self._load_file_hashes()
|
self._load_file_hashes()
|
||||||
|
|
||||||
def _parse_file(self, file: FileHandler, preamble: str) -> List[Flashcard]:
|
def _parse_file(self, file: FileHandler, preamble: str) -> List[Flashcard]:
|
||||||
cards = []
|
cards = []
|
||||||
tree = self.typst_parser.parse(file.get_bytes(), encoding="utf8")
|
tree = self.typst_parser.parse(file.get_bytes(), encoding="utf8")
|
||||||
captures = self.flashcard_query.captures(tree.root_node)
|
card_captures = self.flashcard_query.captures(tree.root_node)
|
||||||
if not captures:
|
if not card_captures:
|
||||||
return cards
|
return cards
|
||||||
|
deck_captures = self.deck_query.captures(tree.root_node)
|
||||||
|
|
||||||
def row_compare(node):
|
def row_compare(node):
|
||||||
return node.start_point.row
|
return node.start_point.row
|
||||||
|
|
||||||
captures["id"].sort(key=row_compare)
|
card_captures["id"].sort(key=row_compare)
|
||||||
captures["front"].sort(key=row_compare)
|
card_captures["front"].sort(key=row_compare)
|
||||||
captures["back"].sort(key=row_compare)
|
card_captures["back"].sort(key=row_compare)
|
||||||
|
|
||||||
|
deck_refs: List[Tuple[int, str]] = []
|
||||||
|
deck_refs_idx = -1
|
||||||
|
current_deck = None
|
||||||
|
if deck_captures:
|
||||||
|
deck_captures["deck"].sort(key=row_compare)
|
||||||
|
for comment in deck_captures["deck"]:
|
||||||
|
if match := deck_regex.match(file.get_node_content(comment)):
|
||||||
|
deck_refs.append((comment.start_point.row, None if match[1].isspace() else match[1]))
|
||||||
|
|
||||||
|
for note_id, front, back in zip(card_captures["id"], card_captures["front"], card_captures["back"]):
|
||||||
|
while deck_refs_idx < len(deck_refs) - 1 and back.end_point.row >= deck_refs[deck_refs_idx + 1][0]:
|
||||||
|
deck_refs_idx += 1
|
||||||
|
current_deck = deck_refs[deck_refs_idx][1]
|
||||||
|
|
||||||
for note_id, front, back in zip(captures["id"], captures["front"], captures["back"]):
|
|
||||||
card = Flashcard(
|
card = Flashcard(
|
||||||
file.get_node_content(front, True),
|
file.get_node_content(front, True),
|
||||||
file.get_node_content(back, True),
|
file.get_node_content(back, True),
|
||||||
None,
|
current_deck,
|
||||||
int(file.get_node_content(note_id)),
|
int(file.get_node_content(note_id)),
|
||||||
preamble,
|
preamble,
|
||||||
file,
|
file,
|
||||||
|
|||||||
Reference in New Issue
Block a user