gen_graph.py

This python script gen_graph.py reads the Org Roam database and extracts the node and edge information. These meta information provided in a json format is used by graph.js to present the graphical view in myzettel . This Org Mode file tangles the gen_graph.py and the complete tangled version can be found here↗ .

To further improve the usability and performance the graph layout is pre computed by moving the code related to ForceAtlas2 layout and Louvain community detection work from Javascript/browser to this python script.

Imports

import sqlite3
import json
import pathlib
from datetime import datetime
import re
import networkx as nx
from fa2 import ForceAtlas2
import community as community_louvain

Configuration

DB_PATH = pathlib.Path.home() / ".config" / "emacs" / "org-roam.db"
OUTPUT_PATH = pathlib.Path("/data/myhome/prabu/org/my_hugo_site/static/graph.json")


def log(msg):
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[{timestamp}] {msg}")

Org File Data Extraction

def extract_date_from_org(file_path):
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            content = f.read(2000)
            match = re.search(
                r"^#\+date:\s*(.+)$", content, re.MULTILINE | re.IGNORECASE
            )
            if match:
                return match.group(1).strip()
    except Exception:
        pass
    return None


def extract_excerpt_from_org(file_path, max_chars=200):
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            content = f.read(3000)

            lines = content.splitlines()
            in_header = True
            excerpt_lines = []

            for line in lines:
                stripped = line.strip()
                # Skip properties drawer and metadata headers
                if in_header:
                    if stripped.startswith(":") or stripped.startswith("#+"):
                        continue
                    if stripped == "":
                        continue
                    in_header = False

                # Skip org headings
                if stripped.startswith("*"):
                    continue
                # Skip blank lines if we have nothing yet
                if stripped == "" and not excerpt_lines:
                    continue
                # Stop at first blank line after content
                if stripped == "" and excerpt_lines:
                    break

                excerpt_lines.append(stripped)

        excerpt = " ".join(excerpt_lines)
        # Strip org link syntax [[url][text]] → text, [[url]] → url
        excerpt = re.sub(r"\[\[([^\]]+)\]\[([^\]]+)\]\]", r"\2", excerpt)
        excerpt = re.sub(r"\[\[([^\]]+)\]\]", r"\1", excerpt)
        # Strip bold/italic markers
        excerpt = re.sub(r"[/*_=~]", "", excerpt)

        return excerpt[:max_chars].strip() or None
    except Exception:
        return None

Org File Filters

def extract_hugo_section(file_path):
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            content = f.read(2000)
            match = re.search(
                r"^#\+hugo_section:\s*(.+)$", content, re.MULTILINE | re.IGNORECASE
            )
            if match:
                return match.group(1).strip()
    except Exception:
        pass
    return None


def is_noexport(file_path):
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            content = f.read(2000)
            return bool(
                re.search(
                    r"^#\+filetags:.*:noexport:", content, re.MULTILINE | re.IGNORECASE
                )
            )
    except Exception:
        return False

Helper functions

Counts backlinks and outlinks from the raw links query. Nodes stripped of org-id quote wrapping before counting.

def _count_links(links_raw):
    backlink_counts = {}
    outlink_counts = {}
    for s, d in links_raw:
        source, dest = s.strip('"'), d.strip('"')
        backlink_counts[dest] = backlink_counts.get(dest, 0) + 1
        outlink_counts[source] = outlink_counts.get(source, 0) + 1
    return backlink_counts, outlink_counts

Node filtering and building

Filters and builds the node list. Skips missing files, hugo_section nodes (non-blog content), and noexport tagged notes. Falls back to filesystem mtime when no #+date header is present.

def _build_nodes(nodes_raw, backlink_counts, outlink_counts):
    nodes = []
    node_ids = set()

    for nid, title, path in nodes_raw:
        clean_id, clean_title, clean_path = (
            nid.strip('"'),
            title.strip('"'),
            path.strip('"'),
        )
        file_path = pathlib.Path(clean_path)

        if not file_path.exists():
            continue

        hugo_section = extract_hugo_section(file_path)
        if hugo_section is not None:
            continue

        if is_noexport(file_path):
            continue

        node_ids.add(clean_id)
        date_str = extract_date_from_org(file_path)
        # Fallback to filesystem mtime
        fs_mtime = datetime.fromtimestamp(file_path.stat().st_mtime).isoformat()

        nodes.append(
            {
                "id": clean_id,
                "label": clean_title,
                "lnk": file_path.stem.lower(),
                "date": date_str or fs_mtime,
                "lastmod": fs_mtime,  # ALWAYS use actual file edit time for recency
                "backlinks": backlink_counts.get(clean_id, 0),
                "outlinks": outlink_counts.get(clean_id, 0),
                "excerpt": extract_excerpt_from_org(file_path),
            }
        )
    return nodes, node_ids

Layout computation

Computes ForceAtlas2 spatial layout and Louvain community detection. Self-loops are excluded before edge construction to avoid FA2 warnings. Node positions and community assignments are written directly into the nodes list in-place.

def _compute_layout(nodes, links):
    G = nx.Graph()
    for n in nodes:
        G.add_node(n["id"])
    for l in links:
        if l["source"] != l["target"]:
            G.add_edge(l["source"], l["target"], weight=1.2 if l["bidirectional"] else 0.8)
    fa2 = ForceAtlas2(
        outboundAttractionDistribution=False,
        edgeWeightInfluence=0.2,
        jitterTolerance=1.0,
        barnesHutOptimize=True,
        barnesHutTheta=1.2,
        scalingRatio=3.5,
        strongGravityMode=True,
        gravity=0.1,
        verbose=False,
    )
    positions = fa2.forceatlas2_networkx_layout(G, pos=None, iterations=200)
    partition = community_louvain.best_partition(G)
    community_count = len(set(partition.values()))
    for n in nodes:
        pos = positions.get(n["id"], (0.0, 0.0))
        n["x"] = round(pos[0], 4)
        n["y"] = round(pos[1], 4)
        n["community"] = partition.get(n["id"], 0)
    return community_count

Build the json

Entry point. Checks db mtime to skip unchanged runs, queries org-roam sqlite in read-only mode, builds links filtered to known node_ids, then calls helpers for counting, node building, and layout. Writes graph.json only when content has changed.


def build_json():
    if not DB_PATH.exists():
        log(f"❌ Database not found at {DB_PATH}")
        return

    db_mtime = DB_PATH.stat().st_mtime
    last_mtime_file = OUTPUT_PATH.with_suffix(".mtime")

    if last_mtime_file.exists():
        try:
            # Use >= to prevent precision-loss skips
            if float(last_mtime_file.read_text()) >= db_mtime:
                log("✓ Database unchanged, skipping")
                return
        except (ValueError, OSError):
            pass

    conn = sqlite3.connect(f"file:{DB_PATH}?mode=ro", uri=True)
    nodes_raw = conn.execute(
        "SELECT id, title, file FROM nodes WHERE level = 0"
    ).fetchall()
    links_raw = conn.execute(
        "SELECT source, dest FROM links WHERE type = '\"id\"'"
    ).fetchall()
    conn.close()

    backlink_counts, outlink_counts = _count_links(links_raw)
    nodes, node_ids = _build_nodes(nodes_raw, backlink_counts, outlink_counts)
    links = []
    link_set = set((s.strip('"'), d.strip('"')) for s, d in links_raw)
    for source, target in link_set:
        if source in node_ids and target in node_ids:
            links.append(
                {
                    "source": source,
                    "target": target,
                    "bidirectional": (target, source) in link_set,
                }
            )

    nodes.sort(key=lambda x: x["id"])
    links.sort(key=lambda x: (x["source"], x["target"]))

    # Compute layout
    community_count = _compute_layout(nodes, links)

    log(f"✓ Layout computed: {len(nodes)} nodes")

    should_write = True
    if OUTPUT_PATH.exists():
        try:
            with open(OUTPUT_PATH, "r") as f:
                existing = json.load(f)
                if existing.get("nodes") == nodes and existing.get("links") == links:
                    log(f"✓ No changes detected ({len(nodes)} nodes)")
                    should_write = False
        except:
            should_write = True

    if should_write:
        with open(OUTPUT_PATH, "w") as f:
            json.dump(
                {"nodes": nodes, "links": links, "communityCount": community_count},
                f,
                indent=2,
            )
        with open(last_mtime_file, "w") as f:
            f.write(str(db_mtime))

        log(f"✓ Graph updated: {len(nodes)} nodes")

Main function


if __name__ == "__main__":
    build_json()

© Prabu Anand K 2020-2026