feat: initial commit

This commit is contained in:
Rudra Saraswat 2026-04-01 21:26:27 +01:00
commit 8085c747cb
14 changed files with 926 additions and 0 deletions

View file

View file

@ -0,0 +1,43 @@
import json
import os
from classes.rootfs import RootFS
from . import output
def run_script_rootfs(rootfs: RootFS, input: str, args: list):
"""Run a script within the rootfs."""
rootfs.exec(["bash", "-s", *args], text=True, input=input)
def gen_rootfs(system_config: dict, rootfs_path: str) -> RootFS:
"""Generates a rootfs for a given system configuration."""
rootfs = RootFS(rootfs_path, system_config["distro-config"])
rootfs.init()
modules = {}
if isinstance(system_config.get("modules"), list):
for module in system_config["modules"]:
modules[module["name"]] = module["run"]
if isinstance(system_config.get("stages"), list):
for stage in system_config["stages"]:
if stage.get("module") not in modules.keys():
output.error(f"{stage.get('module')} not found within module list.")
exit(1)
inputs = stage["inputs"] if isinstance(stage.get("inputs"), list) else []
run_script_rootfs(rootfs, modules[stage["module"]], inputs)
if isinstance(system_config["distro-config"].get("after-stages"), str):
run_script_rootfs(rootfs, system_config["distro-config"]["after-stages"], [])
with open(os.path.join(rootfs_path, "usr/system.json"), "w") as system_json_file:
json.dump(system_config, system_json_file, ensure_ascii=False)
pass
return rootfs

View file

@ -0,0 +1,129 @@
import json
import os
import subprocess
import requests
import yaml
from . import output
def resolve_config(system_config: dict) -> dict:
if system_config["track"] == "base":
try:
return {
"track": "base",
"modules": system_config["modules"]
if isinstance(system_config.get("modules"), list)
else [],
"stages": system_config["stages"]
if isinstance(system_config.get("stages"), list)
else [],
"distro-config": system_config["distro-config"],
"auto-update": system_config["auto-update"]
if isinstance(system_config.get("auto-update"), dict)
else {"enabled": False},
}
except IndexError:
output.error("base track must include distro-config")
exit(1)
else:
base_track_src = system_config["track"]
if os.path.exists(base_track_src):
base_config = resolve_config(yaml.safe_load(base_track_src))
elif base_track_src.startswith("http:") or base_track_src.startswith("https:"):
base_config = resolve_config(
yaml.safe_load(
requests.get(base_track_src, allow_redirects=True).content.decode()
)
)
pass
else:
output.error(f"could not resolve parent track - {system_config['track']}")
exit(1)
base_config["modules"] += (
system_config["modules"]
if isinstance(system_config.get("modules"), list)
else []
)
base_config["stages"] += (
system_config["stages"]
if isinstance(system_config.get("stages"), list)
else []
)
base_config["auto-update"] = (
system_config["auto-update"]
if isinstance(system_config.get("auto-update"), dict)
else base_config["auto-update"]
)
return base_config
def get_system_config():
if os.path.exists("/system.yaml"):
with open("/system.yaml") as system_yaml_file:
system_config = yaml.safe_load(system_yaml_file.read())
elif os.path.exists("/system.yml"):
with open("/system.yml") as system_yml_file:
system_config = yaml.safe_load(system_yml_file.read())
elif os.path.exists("/system.json"):
with open("/system.json") as system_json_file:
system_config = yaml.safe_load(system_json_file.read())
else:
output.error("no system config file found")
exit(1)
return resolve_config(system_config)
def is_already_latest() -> bool:
"""Checks if system is already up-to-date.
Returns:
True if up-to-date, else False.
"""
if os.path.isfile("/usr/system.json"):
with open("/usr/system.json") as system_json_file:
contents = system_json_file.read().strip()
if (
json.dumps(get_system_config(), ensure_ascii=False).strip()
== contents.strip()
):
return True
else:
return False
else:
return False
def notify_prompt(title: str, body: str, actions: dict) -> str:
"""Display a notification prompting the user.
Args:
title: Title of the notification.
body: Body of the notification.
actions: Dict containing action key-value pairs.
Returns:
String containing selected action.
"""
return (
subprocess.run(
[
"notify-send",
"--app-name=System",
"--urgency=critical",
title,
body,
*[f"--action={action}={actions[action]}" for action in actions.keys()],
],
stdout=subprocess.PIPE,
)
.stdout.decode()
.strip()
)

View file

@ -0,0 +1,27 @@
import sys
def info(msg: str) -> None:
"""Print an informative message to stdout.
Args:
msg: String containing the message.
"""
print(f"I: {msg}")
def warn(msg: str) -> None:
"""Print a warning message to stderr.
Args:
msg: String containing the message."""
print(f"W: {msg}", file=sys.stderr)
def error(msg: str) -> None:
"""Print an error message to stderr.
Args:
msg: String containing the message.
"""
print(f"E: {msg}", file=sys.stderr)

View file

@ -0,0 +1,224 @@
import filecmp
import os
import subprocess
import sys
from pathlib import Path
from classes.rootfs import RootFS
from utils import output, users
from . import helpers
from .gen_rootfs import gen_rootfs
def update_cleanup() -> None:
"""Clean-up from previous rebase/update."""
subprocess.run(
["umount", "-l", "/var/cache/akshara/rootfs/var/cache/blendOS"],
stdout=subprocess.DEVNULL,
)
subprocess.run(
[
"rm",
"-rf",
"/var/cache/akshara/rootfs",
"/.update_rootfs",
]
)
def merge_etc(new_rootfs: RootFS, overrides_keep_new: dict) -> None:
"""Merges /etc trees.
Args:
new_rootfs: New RootFS instance.
overrides_keep_new: Dictionary comprising overrides and whether to keep new.
"""
subprocess.run(["cp", "-ax", f"{new_rootfs}/etc", f"{new_rootfs}/usr/etc"])
if not os.path.isdir("/usr/etc"):
subprocess.run(["rm", "-rf", "/usr/etc"])
subprocess.run(["cp", "-ax", "/etc", "/usr/etc"])
etc_diff = filecmp.dircmp("/etc/", "/usr/etc/")
def handle_diff_etc_files(dcmp):
dir_name = dcmp.left.replace("/etc/", f"{new_rootfs}/etc/", 1)
subprocess.run(["mkdir", "-p", dir_name])
for name in dcmp.left_only:
subprocess.run(["cp", "-ax", "--", os.path.join(dcmp.left, name), dir_name])
for name in dcmp.diff_files:
subprocess.run(["cp", "-ax", "--", os.path.join(dcmp.left, name), dir_name])
for sub_dcmp in dcmp.subdirs.values():
handle_diff_etc_files(sub_dcmp)
handle_diff_etc_files(etc_diff)
for override, keep_new in overrides_keep_new.items():
if (
override.startswith("/etc/")
and os.path.exists(override)
and new_rootfs.exists(override)
):
subprocess.run(["rm", "-rf", f"{new_rootfs}/{override}"])
if keep_new:
subprocess.run(
[
"cp",
"-ax",
f"{new_rootfs}/usr/{override}",
f"{new_rootfs}/{override}",
]
)
else:
subprocess.run(
[
"cp",
"-ax",
override,
f"{new_rootfs}/{override}",
]
)
def merge_var(new_rootfs: RootFS, overrides_keep_new: dict) -> None:
"""Merges /var trees.
Args:
new_rootfs: Path to rootfs.
overrides_keep_new: Dictionary comprising overrides and whether to keep new.
"""
subprocess.run(["mv", f"{new_rootfs}/var", f"{new_rootfs}/usr/var"])
subprocess.run(["cp", "-ax", "/var", f"{new_rootfs}/var"])
var_lib_diff = filecmp.dircmp(
f"{new_rootfs}/usr/var/lib/", f"{new_rootfs}/var/lib/"
)
dir_name = f"{new_rootfs}/var/lib/"
for name in var_lib_diff.left_only:
if os.path.isdir(os.path.join(var_lib_diff.left, name)):
subprocess.run(
["cp", "-ax", os.path.join(var_lib_diff.left, name), dir_name]
)
for override, keep_new in overrides_keep_new.items():
if (
override.startswith("/var/")
and os.path.exists(override)
and new_rootfs.exists(override)
):
subprocess.run(["rm", "-rf", f"{new_rootfs}/{override}"])
if keep_new:
subprocess.run(
[
"cp",
"-ax",
f"{new_rootfs}/usr/{override}",
f"{new_rootfs}/{override}",
]
)
else:
subprocess.run(
[
"cp",
"-ax",
override,
f"{new_rootfs}/{override}",
]
)
def replace_boot_files() -> None:
"""Replace files in /boot with those from new rootfs."""
new_boot_files = []
for f in os.listdir("/.update_rootfs/boot"):
if not os.path.isdir(f"/.update_rootfs/boot/{f}"):
subprocess.run(["mv", f"/.update_rootfs/boot/{f}", "/boot"])
new_boot_files.append(f)
for f in os.listdir("/boot"):
if not os.path.isdir(f"/boot/{f}"):
if f not in new_boot_files:
subprocess.run(["rm", "-f", f"/boot/{f}"])
subprocess.run(["grub-mkconfig", "-o", "/boot/grub/grub.cfg"])
def update() -> None:
"""Update system.
Args:
image_name: Name of image to rebase to.
"""
update_cleanup()
system_config = helpers.get_system_config()
output.info("generating new rootfs")
subprocess.run(["rm", "-rf", "/var/cache/akshara"])
Path("/var/cache/akshara/rootfs").mkdir(parents=True, exist_ok=True)
new_rootfs = gen_rootfs(system_config, "/var/cache/akshara/rootfs")
overrides_keep_new = (
{
override["path"]: override["keep"] == "new"
for override in system_config["override"]
}
if isinstance(system_config.get("override"), list)
else {}
)
merge_etc(new_rootfs, overrides_keep_new)
try:
# Store new_passwd_entries for users.merge_group() call
new_passwd_entries = users.merge_passwd(new_rootfs)
except Exception:
output.error("malformed /etc/passwd")
sys.exit(1)
try:
users.merge_shadow(new_rootfs)
except Exception:
output.error("malformed /etc/shadow")
sys.exit(1)
try:
users.merge_group(new_rootfs, new_passwd_entries)
except Exception:
output.error("malformed /etc/group")
sys.exit(1)
try:
users.merge_gshadow(new_rootfs, new_passwd_entries)
except Exception:
output.error("malformed /etc/shadow")
sys.exit(1)
merge_var(new_rootfs, overrides_keep_new)
if (
len(
[
kernel
for kernel in os.listdir(f"{new_rootfs}/boot")
if kernel.startswith("vmlinuz")
]
)
== 0
):
output.error("new rootfs contains no kernel")
output.error("refusing to proceed with applying update")
exit(1)
subprocess.run(["cp", "-ax", str(new_rootfs), "/.update_rootfs"])
replace_boot_files()
print()

View file

@ -0,0 +1,209 @@
def merge_passwd(new_rootfs) -> list:
"""Merge /etc/passwd from host and new rootfs.
Args:
new_rootfs: Path to new root filesystem.
Returns:
A list of the newly generated passwd entries.
"""
with open("/etc/passwd") as f:
current_passwd_entries = {
line.strip().split(":")[0]: line.strip() for line in f if line.strip()
}
with open("/usr/etc/passwd") as f:
current_system_passwd_entries = {
line.strip().split(":")[0]: line.strip() for line in f if line.strip()
}
with open(f"{new_rootfs}/etc/passwd") as f:
new_rootfs_passwd_entries = {
line.strip().split(":")[0]: line.strip() for line in f if line.strip()
}
new_passwd_entries = list(new_rootfs_passwd_entries.values())
for user in set(current_passwd_entries.keys()) - set(
current_system_passwd_entries.keys()
):
if (
int(current_passwd_entries[user].split(":")[2]) >= 1000
and user not in new_rootfs_passwd_entries.keys()
):
new_passwd_entries.append(current_passwd_entries[user])
with open("/.new.etc/passwd", "w") as f:
for user in new_passwd_entries:
f.write(user + "\n")
return new_passwd_entries
def merge_shadow(new_rootfs):
"""Merge /etc/shadow from host and new rootfs.
Args:
new_rootfs: Path to new root filesystem.
"""
with open("/etc/passwd") as f:
current_passwd_entries = {
line.strip().split(":")[0]: line.strip() for line in f if line.strip()
}
with open("/etc/shadow") as f:
current_shadow_entries = {
line.strip().split(":")[0]: line.strip() for line in f if line.strip()
}
with open("/usr/etc/shadow") as f:
current_system_shadow_entries = {
line.strip().split(":")[0]: line.strip() for line in f if line.strip()
}
with open(f"{new_rootfs}/etc/shadow") as f:
new_rootfs_shadow_entries = {
line.strip().split(":")[0]: line.strip() for line in f if line.strip()
}
new_shadow_entries = list(new_rootfs_shadow_entries.values())
for user in set(current_shadow_entries.keys()) - set(
current_system_shadow_entries.keys()
):
if (
int(current_passwd_entries[user].split(":")[2]) >= 1000
and user not in new_rootfs_shadow_entries.keys()
):
new_shadow_entries.append(current_shadow_entries[user])
with open("/.new.etc/shadow", "w") as f:
for user in new_shadow_entries:
f.write(user + "\n")
def merge_group(new_rootfs, new_passwd_entries):
"""Merge /etc/group from host and new rootfs.
Args:
new_rootfs: Path to new root filesystem.
"""
with open("/etc/group") as f:
current_group_entries = {
line.strip().split(":")[0]: line.strip() for line in f if line.strip()
}
with open("/usr/etc/group") as f:
current_system_group_entries = {
line.strip().split(":")[0]: line.strip() for line in f if line.strip()
}
with open(f"{new_rootfs}/etc/group") as f:
new_rootfs_group_entries = {
line.strip().split(":")[0]: line.strip() for line in f if line.strip()
}
new_group_entries_names = list(
set(new_rootfs_group_entries.keys()) - set(current_system_group_entries.keys())
)
new_group_entries = [
new_rootfs_group_entries[new_group_entries_name]
for new_group_entries_name in new_group_entries_names
]
for group in (
set(current_system_group_entries.keys()) & set(new_rootfs_group_entries.keys())
) & set(current_group_entries.keys()):
old_group_entry = current_group_entries[group]
new_group_entry = new_rootfs_group_entries[group]
for member_user in old_group_entry.split(":")[3].split(","):
if member_user in [
passwd_entry.split(":")[0] for passwd_entry in new_passwd_entries
] and member_user not in new_group_entry.split(":")[3].split(","):
if new_group_entry.split(":")[3] == "":
new_group_entry += member_user
else:
new_group_entry += "," + member_user
new_group_entries.append(new_group_entry)
for group in set(current_group_entries.keys()) - set(
current_system_group_entries.keys()
):
if (
int(current_group_entries[group].split(":")[2]) >= 1000
and group not in new_rootfs_group_entries.keys()
):
new_group_entries.append(current_group_entries[group])
with open("/.new.etc/group", "w") as f:
for group in new_group_entries:
f.write(group + "\n")
def merge_gshadow(new_rootfs, new_passwd_entries):
"""Merge /etc/gshadow from host and new rootfs.
Args:
new_rootfs: Path to new root filesystem.
"""
with open("/etc/group") as f:
current_group_entries = {
line.strip().split(":")[0]: line.strip() for line in f if line.strip()
}
with open("/etc/gshadow") as f:
current_gshadow_entries = {
line.strip().split(":")[0]: line.strip() for line in f if line.strip()
}
with open("/usr/etc/gshadow") as f:
current_system_gshadow_entries = {
line.strip().split(":")[0]: line.strip() for line in f if line.strip()
}
with open(f"{new_rootfs}/etc/gshadow") as f:
new_rootfs_gshadow_entries = {
line.strip().split(":")[0]: line.strip() for line in f if line.strip()
}
new_gshadow_entries_names = list(
set(new_rootfs_gshadow_entries.keys())
- set(current_system_gshadow_entries.keys())
)
new_gshadow_entries = [
new_rootfs_gshadow_entries[new_gshadow_entries_name]
for new_gshadow_entries_name in new_gshadow_entries_names
]
for group in (
set(current_system_gshadow_entries.keys())
& set(new_rootfs_gshadow_entries.keys())
) & set(current_gshadow_entries.keys()):
old_gshadow_entry = current_gshadow_entries[group]
new_gshadow_entry = new_rootfs_gshadow_entries[group]
for member_user in old_gshadow_entry.split(":")[3].split(","):
if member_user in [
passwd_entry.split(":")[0] for passwd_entry in new_passwd_entries
] and member_user not in new_gshadow_entry.split(":")[3].split(","):
if new_gshadow_entry.split(":")[3] == "":
new_gshadow_entry += member_user
else:
new_gshadow_entry += "," + member_user
new_gshadow_entries.append(new_gshadow_entry)
for group in set(current_gshadow_entries.keys()) - set(
current_system_gshadow_entries.keys()
):
if (
int(current_group_entries[group].split(":")[2]) >= 1000
and group not in new_rootfs_gshadow_entries.keys()
):
new_gshadow_entries.append(current_gshadow_entries[group])
with open("/.new.etc/gshadow", "w") as f:
for group in new_gshadow_entries:
f.write(group + "\n")