gen_graph.py
This python script gen_graph.py reads the Org Roam database and extracts the node and edge information. These meta information provided in a json format is used by graph.js to present the graphical view in myzettel . This Org Mode file tangles the gen_graph.py and the complete tangled version can be found here↗ .
To further improve the usability and performance the graph layout is pre computed by moving the code related to ForceAtlas2 layout and Louvain community detection work from Javascript/browser to this python script.
Imports
import sqlite3
import json
import pathlib
from datetime import datetime
import re
import networkx as nx
from fa2 import ForceAtlas2
import community as community_louvain
Configuration
DB_PATH = pathlib.Path.home() / ".config" / "emacs" / "org-roam.db"
OUTPUT_PATH = pathlib.Path("/data/myhome/prabu/org/my_hugo_site/static/graph.json")
def log(msg):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] {msg}")
Org File Data Extraction
def extract_date_from_org(file_path):
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read(2000)
match = re.search(
r"^#\+date:\s*(.+)$", content, re.MULTILINE | re.IGNORECASE
)
if match:
return match.group(1).strip()
except Exception:
pass
return None
def extract_excerpt_from_org(file_path, max_chars=200):
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read(3000)
lines = content.splitlines()
in_header = True
excerpt_lines = []
for line in lines:
stripped = line.strip()
# Skip properties drawer and metadata headers
if in_header:
if stripped.startswith(":") or stripped.startswith("#+"):
continue
if stripped == "":
continue
in_header = False
# Skip org headings
if stripped.startswith("*"):
continue
# Skip blank lines if we have nothing yet
if stripped == "" and not excerpt_lines:
continue
# Stop at first blank line after content
if stripped == "" and excerpt_lines:
break
excerpt_lines.append(stripped)
excerpt = " ".join(excerpt_lines)
# Strip org link syntax [[url][text]] → text, [[url]] → url
excerpt = re.sub(r"\[\[([^\]]+)\]\[([^\]]+)\]\]", r"\2", excerpt)
excerpt = re.sub(r"\[\[([^\]]+)\]\]", r"\1", excerpt)
# Strip bold/italic markers
excerpt = re.sub(r"[/*_=~]", "", excerpt)
return excerpt[:max_chars].strip() or None
except Exception:
return None
Org File Filters
def extract_hugo_section(file_path):
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read(2000)
match = re.search(
r"^#\+hugo_section:\s*(.+)$", content, re.MULTILINE | re.IGNORECASE
)
if match:
return match.group(1).strip()
except Exception:
pass
return None
def is_noexport(file_path):
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read(2000)
return bool(
re.search(
r"^#\+filetags:.*:noexport:", content, re.MULTILINE | re.IGNORECASE
)
)
except Exception:
return False
Helper functions
Link counting
Counts backlinks and outlinks from the raw links query. Nodes stripped of org-id quote wrapping before counting.
def _count_links(links_raw):
backlink_counts = {}
outlink_counts = {}
for s, d in links_raw:
source, dest = s.strip('"'), d.strip('"')
backlink_counts[dest] = backlink_counts.get(dest, 0) + 1
outlink_counts[source] = outlink_counts.get(source, 0) + 1
return backlink_counts, outlink_counts
Node filtering and building
Filters and builds the node list. Skips missing files, hugo_section nodes (non-blog content), and noexport tagged notes. Falls back to filesystem mtime when no #+date header is present.
def _build_nodes(nodes_raw, backlink_counts, outlink_counts):
nodes = []
node_ids = set()
for nid, title, path in nodes_raw:
clean_id, clean_title, clean_path = (
nid.strip('"'),
title.strip('"'),
path.strip('"'),
)
file_path = pathlib.Path(clean_path)
if not file_path.exists():
continue
hugo_section = extract_hugo_section(file_path)
if hugo_section is not None:
continue
if is_noexport(file_path):
continue
node_ids.add(clean_id)
date_str = extract_date_from_org(file_path)
# Fallback to filesystem mtime
fs_mtime = datetime.fromtimestamp(file_path.stat().st_mtime).isoformat()
nodes.append(
{
"id": clean_id,
"label": clean_title,
"lnk": file_path.stem.lower(),
"date": date_str or fs_mtime,
"lastmod": fs_mtime, # ALWAYS use actual file edit time for recency
"backlinks": backlink_counts.get(clean_id, 0),
"outlinks": outlink_counts.get(clean_id, 0),
"excerpt": extract_excerpt_from_org(file_path),
}
)
return nodes, node_ids
Layout computation
Computes ForceAtlas2 spatial layout and Louvain community detection. Self-loops are excluded before edge construction to avoid FA2 warnings. Node positions and community assignments are written directly into the nodes list in-place.
def _compute_layout(nodes, links):
G = nx.Graph()
for n in nodes:
G.add_node(n["id"])
for l in links:
if l["source"] != l["target"]:
G.add_edge(l["source"], l["target"], weight=1.2 if l["bidirectional"] else 0.8)
fa2 = ForceAtlas2(
outboundAttractionDistribution=False,
edgeWeightInfluence=0.2,
jitterTolerance=1.0,
barnesHutOptimize=True,
barnesHutTheta=1.2,
scalingRatio=3.5,
strongGravityMode=True,
gravity=0.1,
verbose=False,
)
positions = fa2.forceatlas2_networkx_layout(G, pos=None, iterations=200)
partition = community_louvain.best_partition(G)
community_count = len(set(partition.values()))
for n in nodes:
pos = positions.get(n["id"], (0.0, 0.0))
n["x"] = round(pos[0], 4)
n["y"] = round(pos[1], 4)
n["community"] = partition.get(n["id"], 0)
return community_count
Build the json
Entry point. Checks db mtime to skip unchanged runs, queries org-roam sqlite in read-only mode, builds links filtered to known node_ids, then calls helpers for counting, node building, and layout. Writes graph.json only when content has changed.
def build_json():
if not DB_PATH.exists():
log(f"❌ Database not found at {DB_PATH}")
return
db_mtime = DB_PATH.stat().st_mtime
last_mtime_file = OUTPUT_PATH.with_suffix(".mtime")
if last_mtime_file.exists():
try:
# Use >= to prevent precision-loss skips
if float(last_mtime_file.read_text()) >= db_mtime:
log("✓ Database unchanged, skipping")
return
except (ValueError, OSError):
pass
conn = sqlite3.connect(f"file:{DB_PATH}?mode=ro", uri=True)
nodes_raw = conn.execute(
"SELECT id, title, file FROM nodes WHERE level = 0"
).fetchall()
links_raw = conn.execute(
"SELECT source, dest FROM links WHERE type = '\"id\"'"
).fetchall()
conn.close()
backlink_counts, outlink_counts = _count_links(links_raw)
nodes, node_ids = _build_nodes(nodes_raw, backlink_counts, outlink_counts)
links = []
link_set = set((s.strip('"'), d.strip('"')) for s, d in links_raw)
for source, target in link_set:
if source in node_ids and target in node_ids:
links.append(
{
"source": source,
"target": target,
"bidirectional": (target, source) in link_set,
}
)
nodes.sort(key=lambda x: x["id"])
links.sort(key=lambda x: (x["source"], x["target"]))
# Compute layout
community_count = _compute_layout(nodes, links)
log(f"✓ Layout computed: {len(nodes)} nodes")
should_write = True
if OUTPUT_PATH.exists():
try:
with open(OUTPUT_PATH, "r") as f:
existing = json.load(f)
if existing.get("nodes") == nodes and existing.get("links") == links:
log(f"✓ No changes detected ({len(nodes)} nodes)")
should_write = False
except:
should_write = True
if should_write:
with open(OUTPUT_PATH, "w") as f:
json.dump(
{"nodes": nodes, "links": links, "communityCount": community_count},
f,
indent=2,
)
with open(last_mtime_file, "w") as f:
f.write(str(db_mtime))
log(f"✓ Graph updated: {len(nodes)} nodes")
Main function
if __name__ == "__main__":
build_json()
© Prabu Anand K 2020-2026