257 lines
No EOL
6.5 KiB
Python
257 lines
No EOL
6.5 KiB
Python
#!/usr/bin/env python3
|
|
from pathlib import Path
|
|
from typing import Union, Tuple, List, NamedTuple, Dict
|
|
import sys
|
|
|
|
|
|
class Block(NamedTuple):
|
|
name: str
|
|
created: int
|
|
updated: int
|
|
|
|
|
|
def skip_space(s: str, pos: int) -> int:
|
|
while s[pos].isspace():
|
|
pos += 1
|
|
return pos
|
|
|
|
|
|
def consume(s: str, pos: int, c: str) -> Tuple[bool, int]:
|
|
if s[pos] == c:
|
|
return (True, pos + 1)
|
|
else:
|
|
return (False, pos)
|
|
|
|
|
|
def parse_key(s: str, pos: int) -> Tuple[Union[str, None], int]:
|
|
if s[pos] != ":":
|
|
return (None, pos)
|
|
else:
|
|
pos += 1
|
|
key = ""
|
|
while not s[pos].isspace():
|
|
key += s[pos]
|
|
pos += 1
|
|
return (key, pos)
|
|
|
|
|
|
def parse_str(s: str, pos: int) -> Tuple[Union[str, None], int]:
|
|
ok, pos = consume(s, pos, '"')
|
|
if not ok:
|
|
return (None, pos)
|
|
res: List[str] = []
|
|
prev_is_backslash = False
|
|
while True:
|
|
if s[pos] == '"':
|
|
pos += 1
|
|
if prev_is_backslash:
|
|
prev_is_backslash = False
|
|
res.append('"')
|
|
else:
|
|
break
|
|
elif s[pos] == "\\":
|
|
if prev_is_backslash: # two consecutive backslashes
|
|
prev_is_backslash = False
|
|
else:
|
|
prev_is_backslash = True
|
|
res.append(s[pos])
|
|
pos += 1
|
|
else:
|
|
prev_is_backslash = False
|
|
res.append(s[pos])
|
|
pos += 1
|
|
return ("".join(res), pos)
|
|
|
|
|
|
def parse_int(s: str, pos: int) -> Tuple[Union[int, None], int]:
|
|
if s[pos] == "-":
|
|
sgn = -1
|
|
pos += 1
|
|
else:
|
|
sgn = 1
|
|
if not s[pos].isdigit():
|
|
return (None, pos)
|
|
val = 0
|
|
while s[pos].isdigit():
|
|
val = val * 10 + int(s[pos])
|
|
pos += 1
|
|
return (sgn * val, pos)
|
|
|
|
|
|
def parse_value(s: str, pos: int) -> Tuple[Union[str, int, None], int]:
|
|
pos = skip_space(s, pos)
|
|
v, pos = parse_str(s, pos)
|
|
if v is not None:
|
|
return (v, pos)
|
|
v, pos = parse_int(s, pos)
|
|
if v is not None:
|
|
return (v, pos)
|
|
return (None, pos)
|
|
|
|
|
|
def parse_block(s: str, pos: int) -> Tuple[Union[Block, None], int]:
|
|
kv: Dict[str, Union[str, int]] = {}
|
|
pos = skip_space(s, pos)
|
|
ok, pos = consume(s, pos, "{")
|
|
if not ok:
|
|
return (None, pos)
|
|
while True:
|
|
pos = skip_space(s, pos)
|
|
|
|
# k-v pair
|
|
key, pos = parse_key(s, pos)
|
|
if not key:
|
|
return (None, pos)
|
|
pos = skip_space(s, pos)
|
|
val, pos = parse_value(s, pos)
|
|
if not val:
|
|
return (None, pos)
|
|
kv[key] = val
|
|
|
|
pos = skip_space(s, pos)
|
|
|
|
# end of block
|
|
ok, pos = consume(s, pos, "}")
|
|
if ok:
|
|
break
|
|
|
|
# next k-v pair
|
|
ok, pos = consume(s, pos, ",")
|
|
if not ok:
|
|
return (None, pos)
|
|
|
|
blk = Block(kv["block/name"], kv["block/created-at"], kv["block/updated-at"])
|
|
return (blk, pos)
|
|
|
|
|
|
def parse_pages_metadata(s: str) -> Union[List[Block], None]:
|
|
blocks: List[Block] = []
|
|
pos = 0
|
|
pos = skip_space(s, pos)
|
|
ok, pos = consume(s, pos, "[")
|
|
if not ok:
|
|
return None
|
|
while True:
|
|
block, pos = parse_block(s, pos)
|
|
if block is None:
|
|
break
|
|
blocks.append(block)
|
|
pos = skip_space(s, pos)
|
|
ok, pos = consume(s, pos, "]")
|
|
if not ok:
|
|
return None
|
|
return blocks
|
|
|
|
|
|
def parse_diff(s: str) -> Tuple[bool, Union[str, None], Union[str, None]]:
|
|
a: List[str] = []
|
|
b: List[str] = []
|
|
state = "both"
|
|
has_conflicts = False
|
|
for line in s.split("\n"):
|
|
if line.startswith("<<<<<<< "):
|
|
has_conflicts = True
|
|
if state == "both":
|
|
state = "a"
|
|
else:
|
|
return (has_conflicts, None, None)
|
|
elif line.startswith("======="):
|
|
if state == "a":
|
|
state = "b"
|
|
else:
|
|
return (has_conflicts, None, None)
|
|
elif line.startswith(">>>>>>> "):
|
|
if state == "b":
|
|
state = "both"
|
|
else:
|
|
return (has_conflicts, None, None)
|
|
else:
|
|
if state == "a":
|
|
a.append(line)
|
|
elif state == "b":
|
|
b.append(line)
|
|
else: # 'both'
|
|
a.append(line)
|
|
b.append(line)
|
|
return (has_conflicts, "\n".join(a), "\n".join(b))
|
|
|
|
|
|
def merge_block(a: Block, b: Block) -> Block:
|
|
created = min(a.created, b.created)
|
|
updated = max(a.updated, b.updated)
|
|
return Block(a.name, created, updated)
|
|
|
|
|
|
def merge_blocks(blk_a: List[Block], blk_b: List[Block]) -> List[Block]:
|
|
blk_a_dict: Dict[str, Block] = {}
|
|
blk_b_dict: Dict[str, Block] = {}
|
|
|
|
for blk in blk_a:
|
|
blk_a_dict[blk.name] = blk
|
|
|
|
for blk in blk_b:
|
|
blk_b_dict[blk.name] = blk
|
|
|
|
res: List[Block] = []
|
|
for blk in blk_a:
|
|
if blk.name in blk_b_dict:
|
|
res.append(merge_block(blk, blk_b_dict[blk.name]))
|
|
else:
|
|
res.append(blk)
|
|
for blk in blk_b:
|
|
if blk.name not in blk_a_dict:
|
|
res.append(blk)
|
|
|
|
res.sort(key=lambda x: x.name)
|
|
|
|
return res
|
|
|
|
|
|
def print_blocks(blks: List[Block]) -> str:
|
|
res = "["
|
|
for i, blk in enumerate(blks):
|
|
if i == 0:
|
|
s = f'{{:block/name "{blk.name}",\n :block/created-at {blk.created},\n :block/updated-at {blk.updated}}}\n'
|
|
elif i == len(blks) - 1:
|
|
s = f' {{:block/name "{blk.name}",\n :block/created-at {blk.created},\n :block/updated-at {blk.updated}}}'
|
|
else:
|
|
s = f' {{:block/name "{blk.name}",\n :block/created-at {blk.created},\n :block/updated-at {blk.updated}}}\n'
|
|
|
|
res += s
|
|
res += "]\n"
|
|
return res
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if len(sys.argv) > 1:
|
|
fp = Path(sys.argv[1])
|
|
else:
|
|
fp = Path(".")
|
|
fp = fp / "logseq/pages-metadata.edn"
|
|
if not fp.exists():
|
|
print("`pages-metadata.edn` not found")
|
|
sys.exit(1)
|
|
with fp.open(encoding="utf-8") as f:
|
|
content = f.read()
|
|
conf, a, b = parse_diff(content)
|
|
if not conf:
|
|
print("No conflicts found")
|
|
sys.exit(0)
|
|
if not a or not b:
|
|
print("Could not parse diff")
|
|
sys.exit(1)
|
|
|
|
blk_a = parse_pages_metadata(a)
|
|
blk_b = parse_pages_metadata(b)
|
|
if not blk_a or not blk_b:
|
|
print("Could not parse `pages-metadata.edn`")
|
|
sys.exit(1)
|
|
blk = merge_blocks(blk_a, blk_b)
|
|
s = print_blocks(blk)
|
|
|
|
fp.unlink()
|
|
fp.open("w", encoding="utf-8").write(s)
|
|
|
|
print("Merged `pages-metadata.edn` successfully")
|
|
|
|
sys.exit(0) |