from flask import Flask, render_template, request, redirect, url_for, jsonify, flash, abort, make_response
import sqlite3
import time # for small backoff on transient SQLite locks
import re, os, json, hashlib
from functools import lru_cache
from pathlib import Path
from datetime import datetime, date, timezone
_ARTICLES_LIST: list[dict] = []
_ARTICLES_INDEX_MTIME: float | None = None
try:
import yaml # front-matter parser (PyYAML); optional but recommended
except Exception:
yaml = None
try:
import markdown # Markdown renderer; optional
except Exception:
markdown = None
try:
import bleach # sanitizer; optional
except Exception:
bleach = None
try:
_load_articles_index(force=True) # load initial
except Exception:
pass
app = Flask(__name__)
app.config['TINYMCE_API_KEY'] = os.environ.get('TINYMCE_API_KEY', 'jbqzydjme8x40fw6hc8b39yn9ruxk6bs7ot1m0w977vx1ql0')
app.config['SECRET_KEY'] = os.environ.get('FLASK_SECRET_KEY', 'dev-change-me')
# Set simple Cache-Control on static files and a short max-age on hot APIs
@app.after_request
def add_cache_headers(resp):
try:
p = request.path or ""
# Far-future cache for versioned assets (you'll point to .min files)
if p.startswith("/static/"):
resp.headers.setdefault("Cache-Control", "public, max-age=31536000, immutable")
# Short cache for identical queries within a burst window (optional)
elif p.startswith("/api/search"):
resp.headers.setdefault("Cache-Control", "public, max-age=30")
elif p.startswith("/api/commands/") and p.endswith("/detail"):
resp.headers.setdefault("Cache-Control", "public, max-age=30")
except Exception:
pass
return resp
DB_PATH = os.getenv("PENTEST_DB", "pentest.db")
# Optional response compression (safe if not installed)
try:
from flask_compress import Compress
Compress(app)
except Exception:
pass
# ------------------ DB utils ------------------
def get_conn():
"""Return a SQLite connection with FK enforcement enabled."""
conn = sqlite3.connect(DB_PATH, timeout=30)
conn.row_factory = sqlite3.Row
# Fast, safe defaults for mixed R/W endpoints
try:
conn.execute("PRAGMA foreign_keys = ON")
conn.execute("PRAGMA temp_store = MEMORY")
conn.execute("PRAGMA cache_size = -20000") # ~20MB page cache
conn.execute("PRAGMA journal_mode = WAL") # idempotent; DB remembers it
conn.execute("PRAGMA synchronous = NORMAL") # good balance for WAL
except sqlite3.DatabaseError:
pass
return conn
def get_ro_conn():
"""
Open a read-only connection for hot read paths (search endpoints).
Uses SQLite URI mode to avoid taking write locks and to honor WAL efficiently.
Falls back to normal get_conn() if the platform doesn't support URI ro mode.
"""
try:
uri = f"file:{DB_PATH}?mode=ro&cache=shared"
conn = sqlite3.connect(uri, uri=True, timeout=30)
conn.row_factory = sqlite3.Row
# Read-only safe pragmas (ignore failures gracefully)
try:
conn.execute("PRAGMA foreign_keys = ON")
conn.execute("PRAGMA temp_store = MEMORY")
conn.execute("PRAGMA cache_size = -20000") # ~20MB page cache
conn.execute("PRAGMA mmap_size = 268435456") # 256MB if supported
except sqlite3.DatabaseError:
pass
return conn
except Exception:
# Fallback (keeps behavior unchanged if URI ro is unavailable)
return get_conn()
def canon(s: str) -> str:
"""Canonical form: trim, collapse internal spaces, lowercase."""
return " ".join((s or "").strip().split()).lower()
def split_tags(s: str):
"""Split comma-separated tags -> list of unique canonical-preserving tuples (canon, display)."""
out = []
seen = set()
for raw in (s or "").split(","):
disp = raw.strip()
if not disp:
continue
c = canon(disp)
if not c or c in seen:
continue
seen.add(c)
out.append((c, disp))
return out
def norm_command(cmd: str) -> str:
"""
Normalize command text for uniqueness and dedupe:
- strip leading/trailing whitespace
- collapse ALL whitespace (including newlines/tabs) to a single space
- lowercase
"""
s = (cmd or "").strip()
s = re.sub(r"\s+", " ", s)
return s.lower()
# ------------------ Schema initialization ------------------
def _commands_has_mitre_fk(cur) -> bool:
cur.execute("PRAGMA foreign_key_list(commands)")
for fk in cur.fetchall():
if fk[2] == "mitre" and fk[3] == "mitre_id" and fk[4] == "id":
return True
return False
def _rebuild_commands_fk_if_needed(cur):
"""
Ensure commands.mitre_id actually references mitre(id) ON DELETE SET NULL.
SQLite can't ALTER TABLE to add FK constraints retroactively; rebuild when missing.
Uses a SAVEPOINT to play nice with implicit transactions.
"""
if _commands_has_mitre_fk(cur):
return
# If commands table doesn't exist yet, nothing to rebuild.
cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='commands'")
if cur.fetchone() is None:
return
cur.execute("PRAGMA table_info(commands)")
cols = {r[1] for r in cur.fetchall()}
cur.execute("PRAGMA foreign_keys = OFF")
cur.execute("SAVEPOINT rebuild_cmds")
try:
cur.execute("""
CREATE TABLE commands_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tool_id INTEGER NOT NULL REFERENCES tools(id) ON DELETE CASCADE,
command TEXT NOT NULL,
tactic TEXT,
tags TEXT,
explanation TEXT,
explanation_id INTEGER,
tactic_id INTEGER,
mitre_id INTEGER REFERENCES mitre(id) ON DELETE SET NULL,
command_norm TEXT
)
""")
copy_cols = [c for c in [
"id","tool_id","command","tactic","tags","explanation",
"explanation_id","tactic_id","mitre_id","command_norm"
] if c in cols]
if copy_cols:
cur.execute(
f"INSERT INTO commands_new ({', '.join(copy_cols)}) "
f"SELECT {', '.join(copy_cols)} FROM commands"
)
cur.execute("DROP TABLE commands")
cur.execute("ALTER TABLE commands_new RENAME TO commands")
cur.execute("RELEASE SAVEPOINT rebuild_cmds")
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT rebuild_cmds")
cur.execute("RELEASE SAVEPOINT rebuild_cmds")
raise
finally:
cur.execute("PRAGMA foreign_keys = ON")
def _ensure_fts(cur):
"""
Ensure FTS5 table and triggers exist and are populated.
Safe to re-run. Contentless FTS with custom triggers.
"""
fts_ok = False
try:
cur.execute("""
CREATE VIRTUAL TABLE IF NOT EXISTS fts_commands
USING fts5(rowid UNINDEXED, tool_name, command, tags, tactic, tokenize='porter');
""")
cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='fts_commands'")
fts_ok = cur.fetchone() is not None
except sqlite3.OperationalError:
fts_ok = False
except Exception:
fts_ok = False
if fts_ok:
# Triggers to keep FTS in sync
cur.execute("""
CREATE TRIGGER IF NOT EXISTS trg_commands_ai AFTER INSERT ON commands BEGIN
INSERT INTO fts_commands(rowid, tool_name, command, tags, tactic)
SELECT NEW.id, (SELECT name FROM tools WHERE id = NEW.tool_id), NEW.command, COALESCE(NEW.tags,''), COALESCE(NEW.tactic,'');
END;
""")
cur.execute("""
CREATE TRIGGER IF NOT EXISTS trg_commands_au AFTER UPDATE ON commands BEGIN
DELETE FROM fts_commands WHERE rowid = NEW.id;
INSERT INTO fts_commands(rowid, tool_name, command, tags, tactic)
SELECT NEW.id, (SELECT name FROM tools WHERE id = NEW.tool_id), NEW.command, COALESCE(NEW.tags,''), COALESCE(NEW.tactic,'');
END;
""")
cur.execute("""
CREATE TRIGGER IF NOT EXISTS trg_commands_ad AFTER DELETE ON commands BEGIN
DELETE FROM fts_commands WHERE rowid = OLD.id;
END;
""")
cur.execute("""
CREATE TRIGGER IF NOT EXISTS trg_tools_uu AFTER UPDATE OF name ON tools BEGIN
UPDATE fts_commands SET tool_name = NEW.name
WHERE rowid IN (SELECT id FROM commands WHERE tool_id = NEW.id);
END;
""")
# Initial populate (idempotent)
try:
cur.execute("SELECT 1 FROM fts_commands LIMIT 1")
has_rows = cur.fetchone() is not None
except Exception:
has_rows = False
if not has_rows:
cur.execute("""
INSERT INTO fts_commands(rowid, tool_name, command, tags, tactic)
SELECT c.id, t.name, c.command, COALESCE(c.tags,''), COALESCE(c.tactic,'')
FROM commands c JOIN tools t ON t.id = c.tool_id;
""")
else:
# Clean up any leftover triggers referencing missing fts_commands table
for trg in ("trg_commands_ai", "trg_commands_au", "trg_commands_ad", "trg_tools_uu"):
try:
cur.execute(f"DROP TRIGGER IF EXISTS {trg}")
except Exception:
pass
def init_db():
"""Ensure schema exists and is up to date (including FTS, command_norm, and uniqueness)."""
conn = get_conn()
if conn is None: # belt-and-suspenders: shouldn't happen now
raise RuntimeError("get_conn() returned None")
cur = conn.cursor()
# Enable WAL so reads don't block writes (persistent setting on the DB file)
try:
cur.execute("PRAGMA journal_mode=WAL")
cur.execute("PRAGMA synchronous=NORMAL")
except sqlite3.DatabaseError:
pass
# Core tables
cur.execute("""
CREATE TABLE IF NOT EXISTS tools (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
description TEXT,
url TEXT
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS commands (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tool_id INTEGER NOT NULL,
command TEXT NOT NULL,
tactic TEXT, -- renamed from 'category'
tags TEXT,
explanation TEXT,
explanation_id INTEGER,
tactic_id INTEGER, -- renamed from 'category_id'
mitre_id INTEGER, -- FK to mitre(id); enforced below if needed
command_norm TEXT,
FOREIGN KEY (tool_id) REFERENCES tools(id) ON DELETE CASCADE
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS explanations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tool_id INTEGER NOT NULL,
text TEXT,
FOREIGN KEY (tool_id) REFERENCES tools(id) ON DELETE CASCADE
)
""")
# NEW: Variables key/value store
cur.execute("""
CREATE TABLE IF NOT EXISTS variables (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
value TEXT,
description TEXT
)
""")
# Normalized taxonomy (table names kept for now)
cur.execute("""
CREATE TABLE IF NOT EXISTS categories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE, -- canonical (lowercased/trimmed)
display_name TEXT -- pretty for UI
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS tags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE, -- canonical
display_name TEXT -- pretty
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS command_tags (
command_id INTEGER NOT NULL REFERENCES commands(id) ON DELETE CASCADE,
tag_id INTEGER NOT NULL REFERENCES tags(id) ON DELETE CASCADE,
PRIMARY KEY (command_id, tag_id)
)
""")
# Lightweight migrations / column renames
cur.execute("PRAGMA table_info(commands)")
cols = {row[1] for row in cur.fetchall()}
if "explanation_id" not in cols:
cur.execute("ALTER TABLE commands ADD COLUMN explanation_id INTEGER")
cur.execute("PRAGMA table_info(commands)")
cols = {row[1] for row in cur.fetchall()}
if "tactic_id" not in cols and "category_id" in cols:
cur.execute("ALTER TABLE commands ADD COLUMN tactic_id INTEGER")
cur.execute("PRAGMA table_info(commands)")
cols = {row[1] for row in cur.fetchall()}
if "category" in cols and "tactic" not in cols:
cur.execute("ALTER TABLE commands RENAME COLUMN category TO tactic")
cur.execute("PRAGMA table_info(commands)")
cols = {row[1] for row in cur.fetchall()}
if "category_id" in cols and "tactic_id" not in cols:
# Some SQLite builds don't support RENAME COLUMN prior to 3.25
try:
cur.execute("ALTER TABLE commands RENAME COLUMN category_id TO tactic_id")
except sqlite3.OperationalError:
cur.execute("ALTER TABLE commands ADD COLUMN tactic_id INTEGER")
cur.execute("PRAGMA table_info(commands)")
cols = {row[1] for row in cur.fetchall()}
if "mitre_id" not in cols:
cur.execute("ALTER TABLE commands ADD COLUMN mitre_id INTEGER")
cur.execute("PRAGMA table_info(commands)")
cols = {row[1] for row in cur.fetchall()}
if "command_norm" not in cols:
cur.execute("ALTER TABLE commands ADD COLUMN command_norm TEXT")
# Ensure mitre table exists (and helpful indexes) for FK + lookups
cur.execute("""
CREATE TABLE IF NOT EXISTS mitre (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tactics TEXT,
tac_def TEXT,
techniques TEXT,
tech_def TEXT,
sub_techniques TEXT,
sub_tech_def TEXT
)
""")
cur.execute("CREATE INDEX IF NOT EXISTS idx_mitre_tactics ON mitre(tactics)")
cur.execute("CREATE INDEX IF NOT EXISTS idx_mitre_techniques ON mitre(techniques)")
cur.execute("CREATE INDEX IF NOT EXISTS idx_mitre_subtech ON mitre(sub_techniques)")
# Backfill command_norm for any existing rows
cur.execute("SELECT id, command FROM commands")
for r in cur.fetchall():
cur.execute("UPDATE commands SET command_norm=? WHERE id=?", (norm_command(r["command"]), r["id"]))
# Unique index on (tool_id, command_norm) to prevent dupes at the DB level
try:
cur.execute("DROP INDEX IF EXISTS uq_tool_command") # in case an older index exists
except Exception:
pass
try:
cur.execute("CREATE UNIQUE INDEX IF NOT EXISTS uq_tool_command_norm ON commands(tool_id, command_norm)")
except sqlite3.OperationalError:
# If legacy duplicates exist, this will fail; user can dedupe and re-run.
pass
# Helpful app indexes
cur.execute("CREATE INDEX IF NOT EXISTS idx_commands_tool_cmd ON commands(tool_id, command)")
cur.execute("CREATE INDEX IF NOT EXISTS idx_commands_tactic ON commands(tactic)")
cur.execute("CREATE INDEX IF NOT EXISTS idx_commands_tags ON commands(tags)")
# Rebuild commands table to add FK (mitre_id -> mitre.id) if missing
_rebuild_commands_fk_if_needed(cur)
# FTS and triggers + initial populate
_ensure_fts(cur)
conn.commit()
conn.close()
if __name__ == "__main__":
init_db()
# You can start the server below if you normally do that here:
# app.run(debug=True)
# ------------------ Helpers (queries) ------------------
# --------------------------------------------------------------------------------------------- Articles: config ---------------------------------------
# --------------------------------------------------------------------------------------------- Articles: config ---------------------------------------
def _ensure_article_dirs():
ARTICLES_DIR.mkdir(parents=True, exist_ok=True)
ARTICLES_STATIC_DIR.mkdir(parents=True, exist_ok=True)
if not ARTICLES_INDEX_PATH.exists():
ARTICLES_INDEX_PATH.write_text("[]", encoding="utf-8")
def _read_text(p: Path):
return p.read_text(encoding="utf-8", errors="ignore")
_WORD_RX = re.compile(r"[A-Za-z0-9_]+")
def _tokens(s: str):
return [t.lower() for t in _WORD_RX.findall(s or "")]
def _first_para(md_body: str):
for block in re.split(r"\n{2,}", md_body.strip()):
if block.strip():
# strip md headings/inline code minimally
return re.sub(r"[*_`#>]", "", block.strip())
return ""
def _etag_for(text: str):
return hashlib.sha256(text.encode("utf-8", errors="ignore")).hexdigest()[:16]
def _strip_html_tags(html: str):
# lightweight summary extractor
txt = re.sub(r"(?is)<(script|style)\b.*?", "", html)
txt = re.sub(r"(?is)<[^>]+>", "", txt)
return re.sub(r"\s+", " ", txt).strip()
def _first_para_text(html: str):
m = re.search(r"(?is)]*>(.*?)
", html)
return _strip_html_tags(m.group(1))[:ARTICLES_MAX_SUMMARY] if m else _strip_html_tags(html)[:ARTICLES_MAX_SUMMARY]
def _sanitize_html(html: str) -> str:
if not bleach:
return html
return bleach.clean(
html,
tags=["p","a","ul","ol","li","strong","em","code","pre","h1","h2","h3","h4","blockquote","table","thead","tbody","tr","th","td","img","hr","br","span"],
attributes={
"a": ["href","title","target","rel"],
"img": ["src","alt","title","width","height"],
"span": ["style"],
"*": ["class", "id"]
},
protocols=["http","https","data"],
strip=True
)
def _etag_for(text: str):
return hashlib.sha256(text.encode("utf-8", errors="ignore")).hexdigest()[:16]
_FRONT_RE = re.compile(r"(?is)^\s*\s*")
def _parse_html_front_matter(text: str):
"""
Front-matter is an HTML comment at the very top:
Returns (meta_dict, body_html_without_comment)
"""
meta = {}
m = _FRONT_RE.match(text)
if m:
blob = m.group(1)
for line in blob.splitlines():
if ":" in line:
k, v = line.split(":", 1)
meta[k.strip().lower()] = v.strip()
text = text[m.end():]
return meta, text
def _serialize_front_matter(meta: dict) -> str:
lines = ["")
return "\n".join(lines) + "\n"
@app.template_filter('humandate')
def humandate(value: str) -> str:
from datetime import datetime
try:
iso = (value or "").replace("Z", "+00:00")
dt = datetime.fromisoformat(iso)
return dt.strftime("%b %d, %Y") # e.g., Oct 04, 2025
except Exception:
return value or ""
# --------------------------------------------------------------------------------------------- Articles: config ---------------------------------------
# --------------------------------------------------------------------------------------------- Articles: config ---------------------------------------
def upsert_tactic(cur, display: str):
display = (display or "").strip() or "Uncategorized"
c = canon(display)
try:
cur.execute("INSERT INTO categories(name, display_name) VALUES (?,?)", (c, display))
tact_id = cur.lastrowid
return tact_id, display
except sqlite3.IntegrityError:
cur.execute("SELECT id, display_name FROM categories WHERE name= ?", (c,))
row = cur.fetchone()
if row:
return row[0], row[1]
cur.execute("INSERT INTO categories(name, display_name) VALUES (?,?)", (c, display))
return cur.lastrowid, display
def upsert_tags(cur, tags_text: str):
pairs = split_tags(tags_text)
tag_ids, pretty = [], []
for c, disp in pairs:
try:
cur.execute("INSERT INTO tags(name, display_name) VALUES (?,?)", (c, disp))
tag_ids.append(cur.lastrowid)
pretty.append(disp)
except sqlite3.IntegrityError:
cur.execute("SELECT id, display_name FROM tags WHERE name=?", (c,))
row = cur.fetchone()
if row:
tag_ids.append(row[0])
pretty.append(row[1] or disp)
return tag_ids, ",".join(pretty)
def rewrite_command_tags(cur, command_id: int, tag_ids):
cur.execute("DELETE FROM command_tags WHERE command_id=?", (command_id,))
for tid in tag_ids:
try:
cur.execute("INSERT INTO command_tags(command_id, tag_id) VALUES (?,?)", (command_id, tid))
except sqlite3.IntegrityError:
pass
def normalize_command(cmd: str) -> str:
return (cmd or "").strip()
def find_mitre_id(cur, tactic: str, technique: str = "", sub: str = ""):
"""Resolve mitre.id from (tactic[, technique[, sub-technique]]). Case/space-insensitive."""
t = (tactic or "").strip()
te = (technique or "").strip()
su = (sub or "").strip()
if not t:
return None
if t and te and su:
cur.execute("""
SELECT id FROM mitre
WHERE LOWER(TRIM(tactics)) = LOWER(TRIM(?))
AND LOWER(TRIM(techniques)) = LOWER(TRIM(?))
AND LOWER(TRIM(sub_techniques)) = LOWER(TRIM(?))
ORDER BY id LIMIT 1
""", (t, te, su))
r = cur.fetchone()
if r: return r[0]
if t and te:
cur.execute("""
SELECT id FROM mitre
WHERE LOWER(TRIM(tactics)) = LOWER(TRIM(?))
AND LOWER(TRIM(techniques)) = LOWER(TRIM(?))
ORDER BY
CASE WHEN IFNULL(TRIM(sub_techniques),'') = '' THEN 0 ELSE 1 END ASC,
id ASC
LIMIT 1
""", (t, te))
r = cur.fetchone()
if r: return r[0]
cur.execute("""
SELECT id FROM mitre
WHERE LOWER(TRIM(tactics)) = LOWER(TRIM(?))
ORDER BY
CASE WHEN (IFNULL(TRIM(techniques),'')='' AND IFNULL(TRIM(sub_techniques),'')='') THEN 0 ELSE 1 END ASC,
id ASC
LIMIT 1
""", (t,))
r = cur.fetchone()
return r[0] if r else None
@lru_cache(maxsize=4096)
def find_mitre_id_cached(tactic: str, technique: str = "", sub: str = ""):
"""
Cached resolver that reuses the existing SQL logic via a short-lived connection.
Only use this in read paths (e.g., APIs) where we don't already have a cursor.
"""
conn = get_conn()
try:
cur = conn.cursor()
return find_mitre_id(cur, tactic, technique, sub)
finally:
conn.close()
@lru_cache(maxsize=4096)
def tool_brief_by_id(tool_id: int):
conn = get_ro_conn()
try:
cur = conn.cursor()
cur.execute("SELECT id, name, COALESCE(url,''), COALESCE(description,'') FROM tools WHERE id=? LIMIT 1", (tool_id,))
r = cur.fetchone()
return None if not r else {"id": r[0], "name": r[1], "url": r[2], "description": r[3]}
finally:
conn.close()
@lru_cache(maxsize=4096)
def mitre_full_by_id(mitre_id: int):
if not mitre_id:
return None
conn = get_ro_conn()
try:
cur = conn.cursor()
cur.execute("""
SELECT id, COALESCE(tactics,''), COALESCE(tac_def,''), COALESCE(techniques,''), COALESCE(tech_def,''),
COALESCE(sub_techniques,''), COALESCE(sub_tech_def,'')
FROM mitre WHERE id=? LIMIT 1
""", (mitre_id,))
r = cur.fetchone()
if not r: return None
return {
"id": r[0], "tactics": r[1], "tac_def": r[2],
"techniques": r[3], "tech_def": r[4],
"sub_techniques": r[5], "sub_tech_def": r[6],
}
finally:
conn.close()
# ------------------ Public Index ------------------
def search_db(keyword):
"""
FTS-first search with AND-of-terms across (tool_name, command, tactic, tags, mitre text).
Builds SQL and params in a way that keeps placeholder counts exact.
"""
keyword = (keyword or "").strip()
terms = [t for t in re.split(r"\s+", keyword) if t]
# ---------- 1) FTS shortlist (rowid = commands.id) ----------
fts_cmd_ids = []
try:
if keyword:
conn_fts = get_conn()
cur_fts = conn_fts.cursor()
# ALL terms: t1* AND t2* AND ...
fts_query = " AND ".join(f"{t}*" for t in terms)
cur_fts.execute("""
SELECT rowid FROM fts_commands
WHERE fts_commands MATCH ?
LIMIT 250
""", (fts_query,))
fts_cmd_ids = [r[0] for r in cur_fts.fetchall()]
conn_fts.close()
except Exception:
fts_cmd_ids = []
conn = get_conn()
cur = conn.cursor()
# ---------- 2) WHERE (AND of terms) ----------
where_groups = []
where_params = []
for t in terms:
t = t.lower()
like_t = f"%{t}%"
# exact tag: ",t," anywhere
tags_exact = f"%,{t},%"
# prefix tag: starts with t after a comma, before the next comma
tags_prefix = f"%,{t}%," # e.g. ",arti%," matches "artifact-collection"
where_groups.append("""
(
t.name LIKE ?
OR c.command LIKE ?
OR c.tactic LIKE ?
OR (
(',' || REPLACE(LOWER(c.tags),' ','') || ',') LIKE ? -- exact word
OR (',' || REPLACE(LOWER(c.tags),' ','') || ',') LIKE ? -- prefix at start of tag
)
)
""")
# tool_name, command, tactic, tags_exact, tags_prefix
where_params.extend([like_t, like_t, like_t, tags_exact, tags_prefix])
base_where = ("WHERE " + " AND ".join(where_groups)) if where_groups else ""
# ---------- 3) FTS filter ----------
fts_filter = ""
if fts_cmd_ids:
fts_filter = " AND c.id IN (" + ",".join("?" * len(fts_cmd_ids)) + ")"
# One “whole-keyword” LIKE used by the match-flags/scalars blocks
like_kw = f"%{keyword}%" if keyword else "%"
sql = f"""
SELECT
t.id, t.name, t.description, t.url,
c.id, c.command, c.tactic, c.tags,
COALESCE(e.text, c.explanation, '') AS exp_text,
c.explanation_id,
m.id, m.tactics, m.tac_def, m.techniques, m.tech_def, m.sub_techniques, m.sub_tech_def,
-- match flags (tactic / technique / sub-technique)
CASE
WHEN (? <> '')
THEN CASE
WHEN m.id IS NOT NULL AND m.tactics LIKE ? THEN 1
WHEN m.id IS NULL AND EXISTS (
SELECT 1 FROM mitre mm
WHERE LOWER(TRIM(mm.tactics)) = LOWER(TRIM(c.tactic))
AND mm.tactics LIKE ?
) THEN 1
ELSE 0
END
ELSE 0
END AS match_tactic,
CASE
WHEN (? <> '')
THEN CASE
WHEN m.id IS NOT NULL AND m.techniques LIKE ? THEN 1
WHEN m.id IS NULL AND EXISTS (
SELECT 1 FROM mitre mm
WHERE LOWER(TRIM(mm.tactics)) = LOWER(TRIM(c.tactic))
AND mm.techniques LIKE ?
) THEN 1
ELSE 0
END
ELSE 0
END AS match_tech,
CASE
WHEN (? <> '')
THEN CASE
WHEN m.id IS NOT NULL AND m.sub_techniques LIKE ? THEN 1
WHEN m.id IS NULL AND EXISTS (
SELECT 1 FROM mitre mm
WHERE LOWER(TRIM(mm.tactics)) = LOWER(TRIM(c.tactic))
AND mm.sub_techniques LIKE ?
) THEN 1
ELSE 0
END
ELSE 0
END AS match_sub,
-- definition / first match scalars (do not affect filtering)
CASE
WHEN m.id IS NOT NULL THEN m.tac_def
ELSE (
SELECT m4.tac_def FROM mitre m4
WHERE LOWER(TRIM(m4.tactics)) = LOWER(TRIM(c.tactic))
AND IFNULL(TRIM(m4.tac_def),'') <> ''
ORDER BY
CASE
WHEN (IFNULL(TRIM(m4.techniques),'')='' AND IFNULL(TRIM(m4.sub_techniques),'')='') THEN 0 ELSE 1
END ASC, m4.id ASC
LIMIT 1
)
END AS tac_def,
CASE
WHEN m.id IS NOT NULL AND (? <> '') AND m.techniques LIKE ?
THEN m.techniques
WHEN m.id IS NULL AND (? <> '')
THEN (SELECT m5.techniques FROM mitre m5
WHERE LOWER(TRIM(m5.tactics)) = LOWER(TRIM(c.tactic))
AND m5.techniques LIKE ?
LIMIT 1)
ELSE NULL
END AS match_technique,
CASE
WHEN m.id IS NOT NULL AND (? <> '') AND m.techniques LIKE ?
THEN m.tech_def
WHEN m.id IS NULL AND (? <> '')
THEN (SELECT m6.tech_def FROM mitre m6
WHERE LOWER(TRIM(m6.tactics)) = LOWER(TRIM(c.tactic))
AND m6.techniques LIKE ?
LIMIT 1)
ELSE NULL
END AS match_tech_def,
CASE
WHEN m.id IS NOT NULL AND (? <> '') AND m.sub_techniques LIKE ?
THEN m.sub_techniques
WHEN m.id IS NULL AND (? <> '')
THEN (SELECT m7.sub_techniques FROM mitre m7
WHERE LOWER(TRIM(m7.tactics)) = LOWER(TRIM(c.tactic))
AND m7.sub_techniques LIKE ?
LIMIT 1)
ELSE NULL
END AS match_subtech,
CASE
WHEN m.id IS NOT NULL AND (? <> '') AND m.sub_techniques LIKE ?
THEN m.sub_tech_def
WHEN m.id IS NULL AND (? <> '')
THEN (SELECT m8.sub_tech_def FROM mitre m8
WHERE LOWER(TRIM(m8.tactics)) = LOWER(TRIM(c.tactic))
AND m8.sub_tech_def LIKE ?
LIMIT 1)
ELSE NULL
END AS match_subtech_def
FROM tools t
JOIN commands c ON c.tool_id = t.id
LEFT JOIN explanations e ON e.id = c.explanation_id
LEFT JOIN mitre m ON m.id = c.mitre_id
{base_where}{('' if not fts_filter else fts_filter)}
ORDER BY t.name COLLATE NOCASE, c.id;
"""
# ---------- 4) Params in exact order ----------
params = [
# match flags (3 x 3)
keyword, like_kw, like_kw,
keyword, like_kw, like_kw,
keyword, like_kw, like_kw,
# technique scalar (4)
keyword, like_kw, keyword, like_kw,
# technique def scalar (4)
keyword, like_kw, keyword, like_kw,
# sub-tech scalar (4)
keyword, like_kw, keyword, like_kw,
# sub-tech def scalar (4)
keyword, like_kw, keyword, like_kw,
]
# WHERE (6 per term)
params.extend(where_params)
# FTS ids (once)
params.extend(fts_cmd_ids)
cur.execute(sql, params)
rows = cur.fetchall()
conn.close()
# ---------- 5) Group results by tool (unchanged) ----------
grouped = {}
for row in rows:
tool_id = row[0]
tool_name = row[1]
tool_desc = row[2]
tool_url = row[3]
cmd_id = row[4]
cmd_text = row[5]
cmd_tactic = row[6]
cmd_tags = row[7]
exp_text = row[8]
exp_id = row[9]
m_id = row[10]
m_tactics = row[11]
m_tac_def = row[12]
m_techniques = row[13]
m_tech_def = row[14]
m_sub_techniques= row[15]
m_sub_tech_def = row[16]
match_tactic = bool(row[17])
match_tech = bool(row[18])
match_sub = bool(row[19])
tac_def = row[20] or ""
match_technique = row[21] or ""
match_tech_def = row[22] or ""
match_subtech = row[23] or ""
match_subtech_def = row[24] or ""
g = grouped.setdefault(
tool_name,
{
"id": tool_id,
"name": tool_name,
"description": tool_desc,
"url": tool_url,
"commands": [],
},
)
technique_val = (m_techniques or "") if (m_techniques or "").strip() else (match_technique or "")
technique_def_val = (m_tech_def or "") if (m_techniques or "").strip() else (match_tech_def or "")
subtech_val = (m_sub_techniques or "") if (m_sub_techniques or "").strip() else (match_subtech or "")
subtech_def_val = (m_sub_tech_def or "") if (m_sub_techniques or "").strip() else (match_subtech_def or "")
mitre_obj = {
"tactic": (cmd_tactic or ""),
"tac_def": tac_def,
"technique": technique_val,
"tech_def": technique_def_val,
"sub_technique": subtech_val,
"sub_tech_def": subtech_def_val,
"match_tactic": match_tactic,
"match_tech": match_tech,
"match_sub": match_sub,
}
g["commands"].append(
{
"id": cmd_id,
"command": cmd_text,
"tactic": (cmd_tactic or ""),
"tags": [t.strip() for t in (cmd_tags or '').split(',') if t.strip()],
"explanation": exp_text or "",
"explanation_id": exp_id or None,
"mitre": mitre_obj,
}
)
return list(grouped.values())
def get_tools():
conn = get_conn()
cur = conn.cursor()
cur.execute("SELECT id, name, description, url FROM tools ORDER BY name COLLATE NOCASE")
tools = [dict(id=r[0], name=r[1], description=r[2], url=r[3]) for r in cur.fetchall()]
conn.close()
return tools
def _fts_shortlist(keyword: str, limit: int = 250):
"""
Returns a list of command rowids (== commands.id) from the FTS table,
capped by 'limit'. If no terms or no matches, returns [] quickly.
"""
keyword = (keyword or "").strip()
terms = [t for t in re.split(r"\s+", keyword) if t]
if len("".join(terms)) == 0:
return []
fts_query = " AND ".join(f"{t}*" for t in terms)
try:
conn_fts = get_conn()
cur_fts = conn_fts.cursor()
cur_fts.execute(
"SELECT rowid FROM fts_commands WHERE fts_commands MATCH ? LIMIT ?",
(fts_query, limit)
)
ids = [r[0] for r in cur_fts.fetchall()]
conn_fts.close()
return ids
except Exception:
return []
# --------------------------------------------------------------------------------------------- Articles: config ---------------------------------------
# --------------------------------------------------------------------------------------------- Articles: config ---------------------------------------
# --------------------------------------------------------------------------------------------- Articles: config ---------------------------------------
ARTICLES_DIR = Path("content/articles")
ARTICLES_STATIC_DIR = Path("static/articles")
ARTICLES_INDEX_PATH = Path("content/articles_index.json")
ARTICLES_MAX_SUMMARY = 360 # chars used in snippets
ARTICLES_RESULT_CAP = 50 # show up to N matches in left rail
# in-memory cache
_ARTICLES_BY_SLUG = {} # slug -> record dict
_ARTICLES_LIST = [] # list of records
_ARTICLES_ETAG_BY_SLUG = {} # slug -> etag
def _load_articles_index(force: bool = False) -> None:
"""Load/refresh the in-memory articles index if needed.
When force=True, always reload from disk."""
global _ARTICLES_LIST, _ARTICLES_INDEX_MTIME, _ARTICLES_BY_SLUG, _ARTICLES_ETAG_BY_SLUG
p = Path(ARTICLES_INDEX_PATH) if not isinstance(ARTICLES_INDEX_PATH, Path) else ARTICLES_INDEX_PATH
if not p.exists():
_ARTICLES_LIST = []
_ARTICLES_INDEX_MTIME = None
_ARTICLES_BY_SLUG = {}
_ARTICLES_ETAG_BY_SLUG = {}
return
mtime = p.stat().st_mtime
if not force and _ARTICLES_INDEX_MTIME is not None and _ARTICLES_INDEX_MTIME >= mtime:
# Already up-to-date
return
data = json.loads(p.read_text(encoding="utf-8"))
# Normalize each record (tags to list; path to forward slashes)
norm_rows = []
for rec in data:
rec = dict(rec) # shallow copy
tags = rec.get("tags") or []
if isinstance(tags, str):
tags = [t.strip() for t in tags.split(",") if t.strip()]
rec["tags"] = tags
if "content_path" in rec:
rec["content_path"] = Path(rec["content_path"]).as_posix()
norm_rows.append(rec)
_ARTICLES_LIST = norm_rows
_ARTICLES_BY_SLUG = {r.get("slug",""): r for r in _ARTICLES_LIST if r.get("slug")}
# Clear etag cache; it will be lazily recomputed on next view
_ARTICLES_ETAG_BY_SLUG = {}
_ARTICLES_INDEX_MTIME = mtime
_SUMMARY_MAX = 200 # target length for meta/snippet (tunable)
_SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-Z0-9])')
def _clean_for_summary(html: str) -> str:
"""
Strip non-contenty blocks (code/pre/table/nav/aside/footer), tags, and collapse whitespace.
"""
if not html:
return ""
# remove obvious non-summary blocks
html = re.sub(r"(?is)<(script|style|nav|aside|footer)\b.*?", " ", html)
html = re.sub(r"(?is)<(pre|code|table|thead|tbody|tr|th|td|figure)\b.*?", " ", html)
# keep only paragraph-ish, lists, and inline text
text = _strip_html_tags(html)
# collapse
return re.sub(r"\s+", " ", (text or "")).strip()
def _best_summary(html: str, max_chars: int = _SUMMARY_MAX) -> str:
"""
Build a sentence-aware, search-friendly summary:
- Prefer the first meaningful paragraph
- If too long, trim at sentence boundary near max_chars
- Fallback to _first_para_text (existing behavior)
"""
cleaned = _clean_for_summary(html)
if not cleaned:
return _first_para_text(html) # fallback to your current logic
# Split into paragraphs (double newline or heavy gaps)
paras = re.split(r"\n{2,}| {2,}", cleaned)
paras = [p.strip() for p in paras if p.strip()]
if not paras:
return _first_para_text(html)
# pick first meaningful paragraph (≥ 60 chars or has at least 2 words)
para = next((p for p in paras if len(p) >= 60 or len(p.split()) >= 8), paras[0])
# If short enough, use as-is (up to max_chars)
if len(para) <= max_chars:
return para
# Otherwise trim on sentence boundaries near max_chars
sentences = _SENT_SPLIT.split(para)
out, total = [], 0
for s in sentences:
s = s.strip()
if not s:
continue
if total + len(s) + (1 if out else 0) > max_chars:
break
out.append(s)
total += len(s) + (1 if out else 0)
if out:
return " ".join(out)
# Hard fallback if no sentence fits nicely
return para[:max_chars].rstrip() + "…"
def reindex_articles():
_ensure_article_dirs()
rows = []
for p in sorted(ARTICLES_DIR.glob("*.html")):
raw = _read_text(p)
meta, body = _parse_html_front_matter(raw)
slug = (meta.get("slug") or p.stem).strip()
title = (meta.get("title") or slug.replace("-", " ").title()).strip()
# tags: accept "a,b,c" or ["a","b","c"]
tags_meta = meta.get("tags") or []
if isinstance(tags_meta, str):
tags_list = [t.strip() for t in tags_meta.split(",") if t.strip()]
elif isinstance(tags_meta, (list, tuple)):
tags_list = [str(t).strip() for t in tags_meta if str(t).strip()]
else:
tags_list = []
date_str = (meta.get("date") or "").strip()
# summary: explicit override in front-matter OR smart extract
summary_meta = (meta.get("summary") or "").strip()
if summary_meta:
summary = summary_meta[:_SUMMARY_MAX]
else:
summary = _best_summary(body, max_chars=_SUMMARY_MAX)
rows.append(dict(
slug=slug,
title=title,
tags=tags_list,
summary=summary,
date=date_str,
content_path=Path(p).as_posix(), # normalize slashes
modified=int(p.stat().st_mtime),
word_count=len(_strip_html_tags(body).split()),
))
Path(ARTICLES_INDEX_PATH).write_text(json.dumps(rows, ensure_ascii=False, indent=2), encoding="utf-8")
_load_articles_index(force=True) # now valid
def search_articles(query: str, limit=ARTICLES_RESULT_CAP, tags_only: bool = False):
if not query or len(query.strip()) < 2:
return []
q = query.lower()
out = []
tokens = set(re.findall(r"[a-z0-9_]+", q))
for rec in _ARTICLES_LIST:
score = 0
tags = [t.lower() for t in rec.get("tags", [])]
title = (rec.get("title") or "").lower()
# Preload body once (best-effort)
try:
raw = _read_text(Path(rec["content_path"]))
_, body = _parse_html_front_matter(raw)
body_l = body.lower()
except Exception:
body_l = ""
tag_hit = False
for tok in tokens:
if tok in tags:
score += 3
tag_hit = True
if not tags_only and tok in title:
score += 2
if not tags_only and tok and tok in body_l:
score += 1
# Only include items that match the chosen policy
if (tags_only and tag_hit) or (not tags_only and score > 0):
out.append((score, rec))
# Sort by score (desc) to keep "best" first, even in tags_only mode
out.sort(key=lambda x: x[0], reverse=True)
return [r for _, r in out[:limit]]
# --- Article linking helpers ---
def related_articles_for(slug: str, tags: list[str], limit: int = 5):
"""
Return up to N other articles, scored by tag overlap (desc) then recency.
"""
if not tags:
return []
tagset = {t.lower() for t in tags}
scored = []
for rec in _ARTICLES_LIST:
if rec.get("slug") == slug:
continue
other_tags = {t.lower() for t in rec.get("tags", [])}
score = len(tagset & other_tags)
if score > 0:
scored.append((score, rec.get("modified", 0), rec))
# sort: score desc, modified desc
scored.sort(key=lambda tup: (tup[0], tup[1]), reverse=True)
return [r for _, __, r in scored[:limit]]
def prev_next_articles_for(slug: str):
"""
Return (prev, next) by date (ISO string) fallback to modified time.
"""
# Normalize list by chronological order
def _key(rec):
# prefer ISO date if present, else modified timestamp
d = (rec.get("date") or "").replace("Z", "+00:00")
try:
from datetime import datetime
dt = datetime.fromisoformat(d)
return dt.timestamp()
except Exception:
return float(rec.get("modified") or 0)
ordered = sorted(_ARTICLES_LIST, key=_key)
idx = next((i for i, r in enumerate(ordered) if r.get("slug") == slug), -1)
prev_rec = ordered[idx - 1] if idx > 0 else None
next_rec = ordered[idx + 1] if 0 <= idx < len(ordered) - 1 else None
return prev_rec, next_rec
# --- Articles: load index on startup (won't crash if empty) ---
try:
_load_articles_index(force=True)
except Exception:
pass
@app.get("/admin/section/articles")
def admin_section_articles():
# Partial for Admin panel left-nav (like your other sections)
_load_articles_index() # ensure present
return render_template("admin_section_articles.html", articles=_ARTICLES_LIST)
@app.post("/admin/articles/reindex")
def admin_articles_reindex():
reindex_articles()
flash("Articles reindexed.", "success")
return ("OK", 200)
@app.get("/articles")
def articles_browse():
# Always ensure the in-memory index is up to date
_load_articles_index(force=False)
# Optional browse page with basic filter by ?q=
q = request.args.get("q", "") or ""
matches = search_articles(q, limit=999) if q else list(_ARTICLES_LIST) # copy
# Backfill summaries for any rows missing or with empty summary
for r in matches:
s = (r.get("summary") or "").strip()
if not s:
try:
raw = _read_text(Path(r["content_path"]))
_, body = _parse_html_front_matter(raw)
r["summary"] = _best_summary(body, max_chars=_SUMMARY_MAX)
except Exception:
r["summary"] = ""
return render_template("articles_browse.html", articles=matches, keyword=q)
@app.get("/articles/")
def article_view(slug):
rec = _ARTICLES_BY_SLUG.get(slug)
if not rec:
abort(404)
raw = _read_text(Path(rec["content_path"]))
_, body = _parse_html_front_matter(raw)
html = _sanitize_html(body)
if request.args.get("partial"):
return render_template("article_partial.html", article=rec, html=html, keyword=request.args.get("q",""))
# Compute internal linking sets
related = related_articles_for(rec["slug"], rec.get("tags", []), limit=5)
prev_rec, next_rec = prev_next_articles_for(rec["slug"])
etag = _ARTICLES_ETAG_BY_SLUG.get(slug) or _etag_for(html)
# existing ETag/headers remain the same — just pass new vars into the template
resp = make_response(render_template(
"article_view.html",
article=rec,
html=html,
keyword=request.args.get("q",""),
related=related,
prev_rec=prev_rec,
next_rec=next_rec
))
resp.headers["ETag"] = etag
resp.headers["Cache-Control"] = "public, max-age=300"
resp.headers["Last-Modified"] = datetime.utcfromtimestamp(rec.get("modified", int(time.time()))).strftime("%a, %d %b %Y %H:%M:%S GMT")
return resp
@app.get("/admin/articles/new")
def admin_articles_new():
# empty editor
return render_template("admin_articles_editor.html", mode="new", article=None)
@app.get("/admin/articles/edit/")
def admin_articles_edit(slug):
rec = _ARTICLES_BY_SLUG.get(slug)
if not rec:
abort(404)
raw = _read_text(Path(rec["content_path"]))
meta, body = _parse_html_front_matter(raw)
article = {
"slug": slug,
"title": meta.get("title",""),
"tags": meta.get("tags",""),
"date": meta.get("date",""),
"summary": meta.get("summary",""),
"body": body
}
return render_template("admin_articles_editor.html", mode="edit", article=article)
@app.post("/admin/articles/save")
def admin_articles_save():
title = (request.form.get("title") or "").strip()
slug = (request.form.get("slug") or "").strip().lower()
tags = (request.form.get("tags") or "").strip()
body = request.form.get("body") or ""
if not slug:
# derive slug from title
base = re.sub(r"[^a-z0-9\-]+", "-", (title or "article").lower()).strip("-") or "article"
slug = base
i = 2
while (ARTICLES_DIR / f"{slug}.html").exists():
slug = f"{base}-{i}"
i += 1
# upsert file
ARTICLES_DIR.mkdir(parents=True, exist_ok=True)
dest = ARTICLES_DIR / f"{slug}.html"
now = datetime.now(timezone.utc).isoformat()
summary_in = (request.form.get("summary") or "").strip()
fm = {
"title": title or slug.replace("-", " ").title(),
"tags": tags,
"date": now,
"summary": summary_in[:_SUMMARY_MAX] if summary_in else ""
}
content = _serialize_front_matter(fm) + body
dest.write_text(content, encoding="utf-8")
reindex_articles()
return redirect(url_for("admin_articles_edit", slug=slug))
@app.post("/admin/articles/upload-asset")
def admin_articles_upload_asset():
slug = (request.args.get("slug") or request.form.get("slug") or "").strip().lower()
if not slug:
return jsonify({"error": "missing slug"}), 400
f = request.files.get("file")
if not f:
return jsonify({"error": "no file"}), 400
# validate basic mime
mime = (f.mimetype or "").lower()
if not (mime.startswith("image/") or mime in ("application/octet-stream",)):
return jsonify({"error": "unsupported type"}), 400
# save
folder = ARTICLES_STATIC_DIR / slug
folder.mkdir(parents=True, exist_ok=True)
ext = os.path.splitext(f.filename or "")[1].lower() or ".bin"
name = f"{int(time.time()*1000)}{ext}"
path = folder / name
f.save(path)
url = f"/static/articles/{slug}/{name}"
reindex_articles()
return jsonify({"location": url})
# --------------------------------------------------------------------------------------------- Articles: config ---------------------------------------
# --------------------------------------------------------------------------------------------- Articles: config ---------------------------------------
# --------------------------------------------------------------------------------------------- Articles: config ---------------------------------------
# ------------------ Public Index ------------------
# app.py — inside index()
@app.route('/', methods=['GET', 'POST'])
def index():
keyword = (request.values.get('q') or request.values.get('keyword') or '').strip()
_load_articles_index(force=False)
# Inline-article view (?article=) — HTML-only
article_slug = (request.args.get('article') or '').strip()
article = None
article_html = None
if article_slug:
rec = _ARTICLES_BY_SLUG.get(article_slug)
if rec:
raw = _read_text(Path(rec["content_path"])) # read .html file
_, body = _parse_html_front_matter(raw) # strip front-matter comment
article = rec
article_html = _sanitize_html(body) # sanitize for safe rendering
# Articles matched to the search term (left rail)
article_hits = search_articles(keyword, limit=ARTICLES_RESULT_CAP, tags_only=True) if keyword else []
# Tool results (center cards) — keep as you had it
results = []
if len(keyword) >= 2:
results = search_db(keyword)
tactic_map = {}
for item in results:
for cmd in item.get('commands', []):
tact = cmd.get('tactic') or 'Uncategorized'
tactic_map.setdefault(tact, set()).add(item['name'])
categories = {t: sorted(list(tools)) for t, tools in sorted(tactic_map.items())}
else:
categories = []
return render_template(
'index.html',
results=results,
articles=article_hits,
keyword=keyword,
categories=categories,
article=article,
article_html=article_html
)
# ==================================================
# Admin Control Panel (single page) + Section partials
# ==================================================
@app.route('/admin', methods=['GET'])
def admin_panel():
return render_template('admin_panel.html')
@app.route('/admin/section/commands', methods=['GET'])
def admin_section_commands():
conn = get_conn()
cur = conn.cursor()
cur.execute("""
SELECT c.id, t.id, t.name, t.description, t.url,
c.command, c.tactic, c.tags, c.explanation,
c.mitre_id,
m.tactics, m.techniques, m.sub_techniques
FROM commands c
JOIN tools t ON c.tool_id = t.id
LEFT JOIN mitre m ON m.id = c.mitre_id
ORDER BY t.name COLLATE NOCASE, c.id
""")
rows = cur.fetchall()
commands = []
for (
cmd_id,
tool_id,
tool_name,
tool_desc,
tool_url,
command,
tactic,
tags,
explanation,
mitre_id,
m_tactic,
m_technique,
m_subtech,
) in rows:
commands.append(
{
"cmd_id": cmd_id,
"tool_id": tool_id,
"tool_name": tool_name,
"tool_description": tool_desc,
"tool_url": tool_url,
"command": command,
"tactic": tactic,
"tags": tags,
"explanation": explanation,
"mitre_id": mitre_id,
"mitre_tactic": m_tactic or "",
"mitre_technique": m_technique or "",
"mitre_sub_technique": m_subtech or "",
}
)
# Tactic dropdown populated from MITRE (distinct, non-empty, sorted)
cur.execute("""
SELECT DISTINCT TRIM(tactics)
FROM mitre
WHERE tactics IS NOT NULL AND TRIM(tactics) <> ''
ORDER BY LOWER(TRIM(tactics))
""")
mitre_tactics = [r[0] for r in cur.fetchall()]
# Normalized hints if used elsewhere
cur.execute("SELECT COALESCE(display_name, name) FROM categories ORDER BY LOWER(COALESCE(display_name, name))")
categories_norm = [r[0] for r in cur.fetchall()]
cur.execute("SELECT COALESCE(display_name, name) FROM tags ORDER BY LOWER(COALESCE(display_name, name))")
tags_norm = [r[0] for r in cur.fetchall()]
conn.close()
return render_template(
'admin_section_commands.html',
commands=commands,
categories=mitre_tactics, # tactic
@app.route("/admin/section/mitre")
def admin_section_mitre():
conn = get_conn()
cur = conn.cursor()
rows = cur.execute("""
SELECT
COALESCE(NULLIF(tactics, ''), 'Unmapped') AS tactic,
NULLIF(techniques, '') AS technique,
NULLIF(sub_techniques, '') AS sub_technique
FROM mitre
-- only show entries that actually have a technique or a sub-technique
WHERE (NULLIF(techniques, '') IS NOT NULL OR NULLIF(sub_techniques, '') IS NOT NULL)
GROUP BY tactic, technique, sub_technique
ORDER BY
tactic COLLATE NOCASE,
technique COLLATE NOCASE,
sub_technique COLLATE NOCASE
""").fetchall()
conn.close()
items = [dict(tactic=r[0], technique=r[1] or '', sub_technique=r[2] or '') for r in rows]
return render_template("admin_section_mitre.html", items=items)
@app.route('/admin/section/tools', methods=['GET'])
def admin_section_tools():
conn = get_conn()
cur = conn.cursor()
cur.execute("SELECT id, name, description, url FROM tools ORDER BY name COLLATE NOCASE")
tools = [
{"id": r[0], "name": r[1], "description": r[2] or "", "url": r[3] or ""}
for r in cur.fetchall()
]
cur.execute("SELECT tool_id, COUNT(*) FROM commands GROUP BY tool_id")
counts = dict(cur.fetchall())
conn.close()
return render_template('admin_section_tools.html', tools=tools, counts=counts)
# ------------------ NEW: Variables Admin Section ------------------
@app.route('/admin/section/variables', methods=['GET'])
def admin_section_variables():
"""
Render the Variables management section.
"""
conn = get_conn()
cur = conn.cursor()
cur.execute("SELECT id, name, description, value FROM variables ORDER BY LOWER(name)")
rows = cur.fetchall()
variables = [
{"id": r["id"], "name": r["name"], "description": r["description"] or "", "value": r["value"] or ""}
for r in rows
]
conn.close()
return render_template("admin_section_variables.html", variables=variables)
@app.route('/admin/variables/add', methods=['POST'])
def add_variable():
"""
Create a new variable. Enforces unique name at DB level.
Behavior:
- If called via AJAX/fetch (X-Requested-With: fetch/XMLHttpRequest) or ajax=1 -> return JSON
- Otherwise -> redirect back to Admin panel (#variables)
"""
name = (request.form.get("name") or "").strip()
description = (request.form.get("description") or "").strip()
value = (request.form.get("value") or "").strip()
# Detect AJAX/fetch explicitly
is_ajax = (
(request.headers.get("X-Requested-With") in ("fetch", "XMLHttpRequest"))
or (request.args.get("ajax") == "1")
or (request.form.get("ajax") == "1")
)
if not name:
if is_ajax:
return jsonify({"status": "error", "message": "Name is required."}), 400
return redirect(url_for('admin_panel') + '#variables')
conn = get_conn()
cur = conn.cursor()
try:
cur.execute("INSERT INTO variables (name, description, value) VALUES (?,?,?)",
(name, description, value))
conn.commit()
new_id = cur.lastrowid
if is_ajax:
return jsonify({"status": "success", "id": new_id})
# non-AJAX: go back to variables section
return redirect(url_for('admin_panel') + '#variables')
except sqlite3.IntegrityError:
if is_ajax:
return jsonify({"status": "error", "message": "A variable with this name already exists."}), 409
return redirect(url_for('admin_panel') + '#variables')
finally:
conn.close()
@app.route('/admin/variables//edit', methods=['POST'])
def edit_variable(var_id: int):
"""
Update variable fields. Name remains unique.
Returns JSON {status} or 204 for non-AJAX callers.
"""
name = (request.form.get("name") or "").strip()
description = (request.form.get("description") or "").strip()
value = (request.form.get("value") or "").strip()
if not name:
return jsonify({"status": "error", "message": "Name is required."}), 400
conn = get_conn()
cur = conn.cursor()
try:
# Enforce uniqueness of name (excluding current row)
cur.execute("SELECT id FROM variables WHERE LOWER(name)=LOWER(?) AND id<>?", (name, var_id))
dup = cur.fetchone()
if dup:
return jsonify({"status": "error", "message": "Another variable with this name already exists."}), 409
cur.execute("UPDATE variables SET name=?, description=?, value=? WHERE id=?",
(name, description, value, var_id))
conn.commit()
return jsonify({"status": "success"})
finally:
conn.close()
@app.route('/admin/variables//delete', methods=['POST'])
def delete_variable(var_id: int):
"""
Delete a variable.
"""
conn = get_conn()
cur = conn.cursor()
cur.execute("DELETE FROM variables WHERE id=?", (var_id,))
conn.commit()
conn.close()
# For AJAX callers:
if request.accept_mimetypes.accept_json:
return jsonify({"status": "success"})
# Fallback:
return redirect(url_for('admin_panel') + '#variables')
# ------------------ Add/Edit/Delete endpoints (tools/commands) ------------------
@app.route('/admin/commands/add', methods=['POST'])
def add_commands():
tool_id = (request.form.get('tool_id') or '').strip()
tactic_text = (
(request.form.get('cmd_tactic') or request.form.get('existing_tactic') or '').strip()
or (request.form.get('cmd_category') or request.form.get('existing_category') or '').strip()
)
tags_text = (request.form.get('cmd_tags') or '').strip()
explanation = (request.form.get('explanation') or '').strip()
commands_text = (request.form.get('command') or '').strip()
mitre_id_raw = (request.form.get('mitre_id') or '').strip()
mitre_id = int(mitre_id_raw) if mitre_id_raw.isdigit() else None
commands_list = [ln.strip() for ln in commands_text.splitlines() if ln.strip()]
if not tool_id or not commands_list:
if request.accept_mimetypes.accept_json:
return jsonify({"status": "error", "message": "Tool and at least one command are required."}), 400
return redirect(url_for('admin_panel'))
conn = get_conn()
cur = conn.cursor()
# Tool name for auto-tagging
cur.execute("SELECT name FROM tools WHERE id=?", (tool_id,))
tool_row = cur.fetchone()
tool_name = (tool_row[0] if tool_row else "").strip()
# Shared explanation (optional, per tool)
eid = None
if explanation:
cur.execute("SELECT id FROM explanations WHERE tool_id=? AND text=?", (tool_id, explanation))
row = cur.fetchone()
if row:
eid = row[0]
else:
cur.execute("INSERT INTO explanations (tool_id, text) VALUES (?,?)", (tool_id, explanation))
eid = cur.lastrowid
tact_id, tact_display = (None, '')
if tactic_text:
tact_id, tact_display = upsert_tactic(cur, tactic_text)
# Merge incoming tags + auto (tool name + tactic)
merged_tags_text = ", ".join(
[t for t in [tags_text, tool_name, tact_display] if t]
)
tag_ids, tags_display = ([], '')
if merged_tags_text:
tag_ids, tags_display = upsert_tags(cur, merged_tags_text)
inserted = 0
duplicates = []
for raw_cmd in commands_list:
cmd = raw_cmd.strip()
if not cmd:
continue
cmd_norm = norm_command(cmd)
# quick UX-level check against normalized uniqueness
cur.execute(
"SELECT 1 FROM commands WHERE tool_id=? AND command_norm=?",
(tool_id, cmd_norm),
)
if cur.fetchone():
duplicates.append(cmd)
continue
try:
cur.execute("""
INSERT INTO commands (tool_id, command, command_norm, tactic, tags, explanation, explanation_id, tactic_id, mitre_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (tool_id, cmd, cmd_norm, tact_display, tags_display, explanation, eid, tact_id, mitre_id))
new_cmd_id = cur.lastrowid
if tag_ids:
rewrite_command_tags(cur, new_cmd_id, tag_ids)
inserted += 1
except sqlite3.IntegrityError:
# Unique index on (tool_id, command_norm) is authoritative
duplicates.append(cmd)
continue
conn.commit()
conn.close()
if request.accept_mimetypes.accept_json:
return jsonify({
"status": "success",
"inserted": inserted,
"skipped_duplicates": len(duplicates),
"duplicates": duplicates
})
else:
return redirect(url_for('admin_panel') + '#commands')
@app.route('/admin/tools/add', methods=['POST'])
def add_tool():
name = (request.form.get('name') or '').strip()
description = (request.form.get('description') or '').strip()
url = (request.form.get('url') or '').strip()
if not name:
return redirect(url_for('admin_panel') + '#tools')
conn = get_conn()
cur = conn.cursor()
try:
cur.execute("INSERT INTO tools (name, description, url) VALUES (?, ?, ?)", (name, description, url))
conn.commit()
finally:
conn.close()
return redirect(url_for('admin_panel') + '#tools')
@app.route('/admin/tools//edit', methods=['POST'])
def edit_tool(tool_id):
"""Update tool URL/description. Name is intentionally ignored (read-only in UI)."""
description = (request.form.get('description') or '').strip()
url = (request.form.get('url') or '').strip()
conn = get_conn()
cur = conn.cursor()
cur.execute("UPDATE tools SET description=?, url=? WHERE id=?", (description, url, tool_id))
conn.commit()
conn.close()
return ("", 204)
@app.route('/admin/commands//edit', methods=['POST'])
def edit_command(command_id):
"""
Update a single command (and normalized tactic/tags) + shared explanation handling.
Now also handles MITRE (tactic/technique/sub-technique/mitre_id).
"""
payload = request.form
new_command = (payload.get('command') or '').strip()
new_tactic_text = (payload.get('tactic') or payload.get('category') or '').strip()
new_technique = (payload.get('technique') or '').strip()
new_subtech = (payload.get('sub_technique') or '').strip()
new_mitre_raw = (payload.get('mitre_id') or '').strip()
new_tags_text = (payload.get('tags') or '').strip()
new_expl = (payload.get('explanation') or '').strip()
if not new_command:
return jsonify({"status": "error", "message": "Command text cannot be empty."}), 400
conn = get_conn()
cur = conn.cursor()
cur.execute("SELECT tool_id, explanation_id FROM commands WHERE id = ?", (command_id,))
row = cur.fetchone()
if not row:
conn.close()
return jsonify({"status": "error", "message": "Command not found."}), 404
tool_id, eid = row[0], row[1]
# Explanation
if new_expl:
if eid:
cur.execute("UPDATE explanations SET text=? WHERE id=?", (new_expl, eid))
cur.execute("UPDATE commands SET explanation=? WHERE id=?", (new_expl, command_id))
else:
cur.execute("INSERT INTO explanations (tool_id, text) VALUES (?,?)", (tool_id, new_expl))
new_eid = cur.lastrowid
cur.execute(
"UPDATE commands SET explanation_id=?, explanation=? WHERE id=?",
(new_eid, new_expl, command_id),
)
else:
cur.execute("UPDATE commands SET explanation='' WHERE id=?", (command_id,))
if eid:
cur.execute("UPDATE explanations SET text='' WHERE id=?", (eid,))
# Tactic (normalized categories table)
tact_id, tact_display = (None, '')
if new_tactic_text:
tact_id, tact_display = upsert_tactic(cur, new_tactic_text)
# Tags (replace with whatever user passed)
tag_ids, tags_display = ([], '')
if new_tags_text:
tag_ids, tags_display = upsert_tags(cur, new_tags_text)
# MITRE resolution:
# - If mitre_id provided, trust it.
# - Else, try to resolve from (tactic, technique, sub-technique); may be None if mismatch.
if new_mitre_raw.isdigit():
mitre_id = int(new_mitre_raw)
else:
mitre_id = find_mitre_id(cur, tact_display, new_technique, new_subtech)
cur.execute(
"UPDATE commands SET command=?, command_norm=?, tactic=?, tags=?, tactic_id=?, mitre_id=? WHERE id=?",
(new_command, norm_command(new_command), tact_display, tags_display, tact_id, mitre_id, command_id),
)
rewrite_command_tags(cur, command_id, tag_ids)
conn.commit()
conn.close()
return jsonify({"status": "success"})
@app.route('/admin/tools//delete', methods=['POST'])
def delete_tool(tool_id):
conn = None
try:
conn = get_conn()
cur = conn.cursor()
cur.execute("BEGIN")
cur.execute("DELETE FROM commands WHERE tool_id=?", (tool_id,))
cur.execute("DELETE FROM explanations WHERE tool_id=?", (tool_id,))
cur.execute("DELETE FROM tools WHERE id=?", (tool_id,))
conn.commit()
return redirect(url_for('admin_panel') + '#tools')
except sqlite3.IntegrityError as e:
if conn:
conn.rollback()
print("Delete tool failed:", e)
return ("", 500)
except Exception as e:
if conn:
conn.rollback()
print("Unexpected error deleting tool:", e)
return ("", 500)
finally:
if conn:
conn.close()
@app.route('/admin/delete/command//remove', methods=['POST'])
def remove_command(command_id):
"""
Delete a command; if it was the last command for its tool, delete the tool too.
Adds a tiny retry loop to ride out transient 'database is locked' situations.
"""
attempts = 0
while True:
conn = None
try:
conn = get_conn()
conn.execute("BEGIN IMMEDIATE")
cur = conn.cursor()
cur.execute("SELECT tool_id FROM commands WHERE id=?", (command_id,))
row = cur.fetchone()
if not row:
conn.rollback()
conn.close()
return jsonify({"status": "error", "message": "Command not found"}), 404
tool_id = row[0]
cur.execute("SELECT COUNT(*) FROM commands WHERE tool_id=?", (tool_id,))
cnt = cur.fetchone()[0]
cur.execute("DELETE FROM commands WHERE id=?", (command_id,))
if cnt == 1:
cur.execute("DELETE FROM tools WHERE id=?", (tool_id,))
conn.commit()
conn.close()
return jsonify({"status": "success"})
except sqlite3.OperationalError as e:
if 'locked' in str(e).lower() and attempts < 5:
attempts += 1
time.sleep(0.2 * attempts)
try:
if conn:
conn.rollback()
conn.close()
except Exception:
pass
continue
print("Error deleting command:", e)
try:
if conn:
conn.rollback()
conn.close()
except Exception:
pass
return jsonify({"status": "error", "message": "Error deleting command"}), 500
@app.route('/api/search', methods=['GET'])
def api_search():
"""
Fast, minimal search API.
Query params:
- q (or keyword): the search text (min 2 chars)
- offset (default 0), limit (default 20, max 100)
- show_all=1 to increase the FTS shortlist cap beyond 250 (discouraged)
Returns:
{ total:int, items:[{tool_id,tool_name,tool_url,command_id,command,tactic,tags,has_explanation,mitre_id}] }
"""
q = (request.args.get('q') or request.args.get('keyword') or '').strip()
if len(q) < 2:
# Guardrail: avoid heavy work on trivial queries
return jsonify({"total": 0, "items": [], "message": "Type at least 2 characters"}), 400
try:
offset = max(0, int(request.args.get('offset', 0)))
except Exception:
offset = 0
try:
limit = int(request.args.get('limit', 20))
limit = 20 if limit <= 0 else min(limit, 100)
except Exception:
limit = 20
show_all = request.args.get('show_all', '') in ('1', 'true', 'yes')
fts_cap = 250 if not show_all else 1000 # cap raised only if explicitly requested
# 1) FTS shortlist (fast-fail if no matches)
ids = _fts_shortlist(q, limit=fts_cap)
total = len(ids)
if total == 0:
return jsonify({"total": 0, "items": []})
# Optional cursor-based pagination over the shortlist
cursor_raw = (request.args.get('cursor_id') or '').strip()
if cursor_raw.isdigit():
try:
cur_idx = ids.index(int(cursor_raw)) + 1
# Cursor takes precedence over offset
offset = cur_idx
except ValueError:
# If cursor isn't in the current shortlist, fall back to provided offset
pass
# 2) Slice the shortlist in Python (stable & cheap), then select only minimal columns
page_ids = ids[offset: offset + limit]
if not page_ids:
return jsonify({"total": total, "items": []})
# Build IN (?) list safely
placeholders = ",".join("?" * len(page_ids))
sql = f"""
SELECT
t.id AS tool_id,
t.name AS tool_name,
t.url AS tool_url,
c.id AS command_id,
c.command AS command,
COALESCE(c.tactic, '') AS tactic,
COALESCE(c.tags, '') AS tags_csv,
CASE WHEN LENGTH(COALESCE(c.explanation,'')) > 0 THEN 1 ELSE 0 END AS has_explanation,
c.mitre_id AS mitre_id,
-- NEW: bring just the names, no definitions
COALESCE(m.techniques, '') AS technique_name,
COALESCE(m.sub_techniques, '') AS sub_technique_name
FROM commands c
JOIN tools t ON t.id = c.tool_id
LEFT JOIN mitre m ON m.id = c.mitre_id -- <<< add this line
WHERE c.id IN ({placeholders})
ORDER BY t.name COLLATE NOCASE, c.id
"""
conn = get_ro_conn()
cur = conn.cursor()
cur.execute(sql, page_ids)
rows = cur.fetchall()
conn.close()
items = []
for r in rows:
tags = [s.strip() for s in (r["tags_csv"] or "").split(",") if s.strip()]
items.append({
"tool_id": r["tool_id"],
"tool_name": r["tool_name"],
"tool_url": r["tool_url"] or "",
"command_id": r["command_id"],
"command": r["command"],
"tactic": r["tactic"],
"tags": tags,
"has_explanation": bool(r["has_explanation"]),
"mitre_id": r["mitre_id"],
# NEW:
"technique": r["technique_name"] or "",
"sub_technique": r["sub_technique_name"] or "",
})
return jsonify({"total": total, "items": items})
# -------------- Lightweight APIs --------------
@app.route('/api/tools', methods=['GET'])
def api_tools():
return jsonify(get_tools())
@app.route('/api/commands//detail', methods=['GET'])
def api_command_detail(command_id: int):
"""
Returns the full detail payload for a command card modal.
Does not mutate anything; safe to call repeatedly from the UI.
"""
conn = get_ro_conn()
try:
cur = conn.cursor()
cur.execute("""
SELECT c.id, c.tool_id, c.command, COALESCE(c.tactic,''), COALESCE(c.tags,''), COALESCE(c.explanation,''), c.mitre_id
FROM commands c
WHERE c.id=? LIMIT 1
""", (command_id,))
r = cur.fetchone()
if not r:
return jsonify({"error": "Not found"}), 404
tool = tool_brief_by_id(r["tool_id"]) or {"id": r["tool_id"], "name": "", "url": "", "description": ""}
mitre = mitre_full_by_id(r["mitre_id"]) if r["mitre_id"] else None
tags = [s.strip() for s in (r["tags"] or "").split(",") if s.strip()]
return jsonify({
"tool": tool,
"command": {
"id": r["id"],
"text": r["command"],
"tactic": r[3],
"tags": tags,
"explanation": r["explanation"],
"mitre_id": r["mitre_id"],
},
"mitre": mitre
})
finally:
conn.close()
@app.route('/api/tools//commands', methods=['GET'])
def api_tool_commands(tool_id):
"""
Return a single tool's details + (optionally paginated) commands for the modal.
Query params:
- offset (default 0)
- limit (no limit by default for back-compat; if provided, max 500)
Shape (unchanged):
{ "tool": {...}, "items": [...] , "total": int, "offset": int, "limit": int|null }
"""
conn = get_conn()
cur = conn.cursor()
cur.execute("SELECT id, name, description, url FROM tools WHERE id=?", (tool_id,))
t = cur.fetchone()
if not t:
conn.close()
return jsonify({"error": "Not found"}), 404
# Optional pagination
use_paging = False
try:
offset = max(0, int(request.args.get('offset', 0)))
except Exception:
offset = 0
try:
limit_raw = request.args.get('limit', '').strip()
if limit_raw != '':
use_paging = True
limit = min(max(1, int(limit_raw)), 500)
else:
limit = None
except Exception:
limit = None
# Total count for this tool (for UI “Load more”)
cur.execute("SELECT COUNT(*) FROM commands WHERE tool_id=?", (tool_id,))
total = cur.fetchone()[0]
if use_paging:
cur.execute("""
SELECT id, command, tactic, tags, explanation, mitre_id
FROM commands
WHERE tool_id=?
ORDER BY id
LIMIT ? OFFSET ?
""", (tool_id, limit, offset))
else:
cur.execute("""
SELECT id, command, tactic, tags, explanation, mitre_id
FROM commands
WHERE tool_id=?
ORDER BY id
""", (tool_id,))
rows = cur.fetchall()
conn.close()
items = [{
"id": r["id"],
"command": r["command"],
"tactic": r["tactic"] or "",
"tags": r["tags"] or "",
"explanation": r["explanation"] or "",
"mitre_id": r["mitre_id"]
} for r in rows]
return jsonify({
"tool": {"id": t[0], "name": t[1], "description": t[2] or "", "url": t[3] or ""},
"items": items,
"total": total,
"offset": offset if use_paging else 0,
"limit": limit if use_paging else None
})
@app.route('/api/mitre//detail', methods=['GET'])
def api_mitre_detail(mitre_id: int):
"""
Returns full MITRE definitions for a given mitre_id.
Use this when a user opens the details modal.
"""
conn = get_conn()
cur = conn.cursor()
cur.execute("""
SELECT id, tactics, tac_def, techniques, tech_def, sub_techniques, sub_tech_def
FROM mitre
WHERE id = ?
LIMIT 1
""", (mitre_id,))
row = cur.fetchone()
conn.close()
if not row:
return jsonify({"error": "Not found"}), 404
return jsonify({
"id": row["id"],
"tactics": row["tactics"] or "",
"tac_def": row["tac_def"] or "",
"techniques": row["techniques"] or "",
"tech_def": row["tech_def"] or "",
"sub_techniques": row["sub_techniques"] or "",
"sub_tech_def": row["sub_tech_def"] or "",
})
# ---- MITRE endpoints for cascading dropdowns ----
# app.py
@app.get("/api/mitre/tactics")
def api_mitre_tactics():
"""
Return a simple list of tactic names for the left-rail dropdown.
Prefer the MITRE table; fall back to distinct tactics present in commands.
"""
conn = get_ro_conn()
try:
cur = conn.cursor()
cur.execute("""
SELECT DISTINCT TRIM(tactics)
FROM mitre
WHERE tactics IS NOT NULL AND TRIM(tactics) <> ''
ORDER BY LOWER(TRIM(tactics))
""")
names = [r[0] for r in cur.fetchall()]
if not names:
# Fallback if MITRE is empty in this DB
cur.execute("""
SELECT DISTINCT TRIM(tactic)
FROM commands
WHERE tactic IS NOT NULL AND TRIM(tactic) <> ''
ORDER BY LOWER(TRIM(tactic))
""")
names = [r[0] for r in cur.fetchall()]
return jsonify(names)
finally:
conn.close()
@app.route('/api/mitre/techniques', methods=['GET'])
def api_mitre_techniques():
tactic = (request.args.get('tactic') or '').strip()
if not tactic:
return jsonify([])
conn = get_conn()
cur = conn.cursor()
cur.execute("""
SELECT DISTINCT techniques
FROM mitre
WHERE LOWER(TRIM(tactics)) = LOWER(TRIM(?))
AND IFNULL(TRIM(techniques),'') <> ''
ORDER BY LOWER(techniques)
""", (tactic,))
out = [{"technique": r[0]} for r in cur.fetchall()]
conn.close()
return jsonify(out)
@app.route('/api/mitre/subtechniques', methods=['GET'])
def api_mitre_subtechniques():
tactic = (request.args.get('tactic') or '').strip()
technique = (request.args.get('technique') or '').strip()
if not tactic or not technique:
return jsonify([])
conn = get_conn()
cur = conn.cursor()
cur.execute("""
SELECT id, sub_techniques
FROM mitre
WHERE LOWER(TRIM(tactics)) = LOWER(TRIM(?))
AND LOWER(TRIM(techniques)) = LOWER(TRIM(?))
AND IFNULL(TRIM(sub_techniques),'') <> ''
ORDER BY LOWER(sub_techniques)
""", (tactic, technique))
out = [{"id": r[0], "sub_technique": r[1]} for r in cur.fetchall()]
conn.close()
return jsonify(out)
@app.route('/api/mitre/resolve', methods=['GET'])
def api_mitre_resolve():
tac = (request.args.get('tactic') or '').strip()
tech = (request.args.get('technique') or '').strip()
# accept either 'sub' or 'sub_technique'
sub = (request.args.get('sub') or request.args.get('sub_technique') or '').strip()
conn = get_conn()
cur = conn.cursor()
row = None
# 1) tactic + technique + sub
if tac and tech and sub:
cur.execute("""
SELECT id FROM mitre
WHERE LOWER(TRIM(tactics)) = LOWER(TRIM(?))
AND LOWER(TRIM(techniques)) = LOWER(TRIM(?))
AND LOWER(TRIM(sub_techniques)) = LOWER(TRIM(?))
ORDER BY id
LIMIT 1
""", (tac, tech, sub))
row = cur.fetchone()
# 2) technique + sub (no tactic)
if not row and tech and sub:
cur.execute("""
SELECT id FROM mitre
WHERE LOWER(TRIM(techniques)) = LOWER(TRIM(?))
AND LOWER(TRIM(sub_techniques)) = LOWER(TRIM(?))
ORDER BY id
LIMIT 1
""", (tech, sub))
row = cur.fetchone()
# 3) tactic + sub (no technique)
if not row and tac and sub:
cur.execute("""
SELECT id FROM mitre
WHERE LOWER(TRIM(tactics)) = LOWER(TRIM(?))
AND LOWER(TRIM(sub_techniques)) = LOWER(TRIM(?))
ORDER BY id
LIMIT 1
""", (tac, sub))
row = cur.fetchone()
# 4) tactic + technique
if not row and tac and tech:
cur.execute("""
SELECT id FROM mitre
WHERE LOWER(TRIM(tactics)) = LOWER(TRIM(?))
AND LOWER(TRIM(techniques)) = LOWER(TRIM(?))
ORDER BY
CASE WHEN IFNULL(TRIM(sub_techniques),'') = '' THEN 0 ELSE 1 END ASC,
id ASC
LIMIT 1
""", (tac, tech))
row = cur.fetchone()
# 5) technique only
if not row and tech:
cur.execute("""
SELECT id FROM mitre
WHERE LOWER(TRIM(techniques)) = LOWER(TRIM(?))
ORDER BY
CASE WHEN IFNULL(TRIM(sub_techniques),'') = '' THEN 0 ELSE 1 END ASC,
id ASC
LIMIT 1
""", (tech,))
row = cur.fetchone()
# 6) sub only
if not row and sub:
cur.execute("""
SELECT id FROM mitre
WHERE LOWER(TRIM(sub_techniques)) = LOWER(TRIM(?))
ORDER BY id
LIMIT 1
""", (sub,))
row = cur.fetchone()
# 7) tactic only (last resort; preserves your previous fallback)
if not row and tac:
cur.execute("""
SELECT id FROM mitre
WHERE LOWER(TRIM(tactics)) = LOWER(TRIM(?))
ORDER BY
CASE WHEN (IFNULL(TRIM(techniques),'')='' AND IFNULL(TRIM(sub_techniques),'')='') THEN 0 ELSE 1 END ASC,
id ASC
LIMIT 1
""", (tac,))
row = cur.fetchone()
conn.close()
return jsonify({"id": (row[0] if row else None)})
@app.post("/api/commands/exists/bulk")
def api_bulk_exists():
data = request.get_json(force=True, silent=True) or {}
tool_id = str(data.get("tool_id") or "").strip()
commands = [(" ".join((c or "").split())).lower()
for c in (data.get("commands") or []) if str(c).strip()]
if not tool_id or not commands:
return jsonify({"dups": []})
qmarks = ",".join("?"*len(commands))
conn = get_conn(); cur = conn.cursor()
cur.execute("""
SELECT command FROM commands
WHERE tool_id=? AND LOWER(REPLACE(command, ' ', ' ')) IN ({})
""".format(qmarks), (tool_id, *commands))
dups = [r[0] for r in cur.fetchall()]
conn.close()
return jsonify({"dups": dups})
# -------- Variables API for Search Page config --------
@app.route('/api/variables', methods=['GET'])
def api_variables():
"""Return all variables as JSON: [{id, name, description, value}, ...]."""
"""Return all variables as JSON: [{id, name, description, value}, ...]."""
conn = get_conn()
cur = conn.cursor()
cur.execute("""
SELECT id, name, description, value
FROM variables
ORDER BY LOWER(name)
""")
rows = cur.fetchall()
conn.close()
return jsonify([
{
"id": r[0],
"name": r[1],
"description": r[2] or "",
"value": r[3] or ""
} for r in rows
])
@app.post("/admin/commands/import_json")
def import_commands_json():
"""
Append tools/commands from a pasted JSON payload in the admin modal.
- Creates tool if missing (matched by name, case-insensitive)
- Appends commands; skips duplicates per (tool_id, command) exact text
- Never overwrites/deletes existing
"""
# ---- read & parse payload
raw = (request.form.get("json_payload") or "").strip()
if not raw:
return ("Missing json_payload", 400)
try:
payload = json.loads(raw)
except Exception as e:
return (f"Invalid JSON: {e}", 400)
if not isinstance(payload, list):
return ("Top-level JSON must be a list of tools", 400)
# ---- db connection (reuse your app's helper if present)
try:
get_db_fn = globals().get("get_db")
if callable(get_db_fn):
conn = get_db_fn()
close_after = False
else:
db_path = app.config.get("DATABASE") or app.config.get("DB_PATH") or "pentest.db"
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
close_after = True
except Exception as e:
return (f"DB connection error: {e}", 500)
cur = conn.cursor()
def normalize_tags(v):
if v is None:
return None
if isinstance(v, list):
parts = [str(x).strip() for x in v if str(x).strip()]
return ",".join(parts) if parts else None
# allow comma-separated string
parts = [t.strip() for t in str(v).split(",") if t.strip()]
return ",".join(parts) if parts else None
tools_created = 0
cmds_inserted = 0
cmds_skipped = 0
try:
for tool in payload:
if not isinstance(tool, dict):
continue
name = (tool.get("name") or "").strip()
if not name:
continue
# find tool by name (case-insensitive)
cur.execute("SELECT id, description, url FROM tools WHERE LOWER(name)=LOWER(?)", (name,))
row = cur.fetchone()
if row:
tool_id = row["id"]
# Optionally fill missing description/url if new data provided
desc_in = (tool.get("description") or "").strip()
url_in = (tool.get("url") or "").strip()
if desc_in and not (row["description"] or "").strip():
cur.execute("UPDATE tools SET description=? WHERE id=?", (desc_in, tool_id))
if url_in and not (row["url"] or "").strip():
cur.execute("UPDATE tools SET url=? WHERE id=?", (url_in, tool_id))
else:
cur.execute(
"INSERT INTO tools (name, description, url) VALUES (?, ?, ?)",
(name, (tool.get("description") or "").strip(), (tool.get("url") or "").strip())
)
tool_id = cur.lastrowid
tools_created += 1
# commands
for c in (tool.get("commands") or []):
if not isinstance(c, dict):
continue
cmd_text = (c.get("command") or "").strip()
if not cmd_text:
continue
# duplicate check: same tool + same command
cur.execute(
"SELECT id FROM commands WHERE tool_id=? AND command=?",
(tool_id, cmd_text)
)
if cur.fetchone():
cmds_skipped += 1
continue
tags_csv = normalize_tags(c.get("tags"))
tactic = (c.get("tactic") or "").strip() or None
# If you don’t store technique/sub_technique in DB, ignore them:
explanation = (c.get("explanation") or "").strip() or None
# If you have a mitre_id column and a resolver, set it here; else None
mitre_id = None
cur.execute(
"""
INSERT INTO commands (tool_id, command, explanation, tags, tactic, mitre_id)
VALUES (?, ?, ?, ?, ?, ?)
""",
(tool_id, cmd_text, explanation, tags_csv, tactic, mitre_id)
)
cmds_inserted += 1
conn.commit()
except Exception as e:
conn.rollback()
if close_after:
conn.close()
return (f"Import failed: {e}", 500)
if close_after:
conn.close()
return jsonify({
"message": f"Import done. Tools created: {tools_created}, commands inserted: {cmds_inserted}, skipped: {cmds_skipped}",
"tools_created": tools_created,
"commands_inserted": cmds_inserted,
"commands_skipped": cmds_skipped
})
@app.route("/about")
def about():
return render_template("about.html")
if __name__ == '__main__':
app.run(debug=True)