#!/usr/bin/python3 import argparse import logging import os import sys from typing import List, Dict import Ice import yaml import Murmur CONFIG_SEARCH_PATH = [".", "/etc"] CONFIG_DEFAULT_NAME = "mice.yml" ICE_DEFAULT_HOST = "127.0.0.1" ICE_DEFAULT_PORT = 6502 ICE_DEFAULT_TIMEOUT = 1000 class Mice: def __init__(self, host: str = ICE_DEFAULT_HOST, port: int = ICE_DEFAULT_PORT, secret: str = None, timeout: int = ICE_DEFAULT_TIMEOUT): logging.debug("Initializing ICE connection") properties = Ice.createProperties() properties.setProperty("Ice.ImplicitContext", "Shared") init_data = Ice.InitializationData() init_data.properties = properties self._communicator = Ice.initialize(init_data) if secret: logging.debug("Providing ICE secret") self._communicator.getImplicitContext().put("secret", secret) logging.debug("Creating meta proxy") proxy_str = f"Meta: tcp -h {host} -p {port} -t {timeout}" proxy = self._communicator.stringToProxy(proxy_str) self.meta = Murmur.MetaPrx.checkedCast(proxy) if not self.meta: self._communicator.destroy() raise RuntimeError(f"Invalid Proxy: {proxy_str}") try: logging.debug("Retrieving the list of Mumble servers") servers = self.meta.getAllServers() except Murmur.InvalidSecretException: self._communicator.destroy() raise RuntimeError(f"Failed to connect to {host} " f"on port {port}: Invalid secret.") from None self.servers = [MurmurServer(server) for server in servers] if len(self.servers) == 0: self._communicator.destroy() raise RuntimeError("No Mumble server found") def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self._communicator.destroy() class ChannelGroup: def __init__(self, name: str, inherit: bool = True, inheritable: bool = True): self.name = name self.inherit = inherit self.inheritable = inheritable def get_state(self) -> Murmur.Group: return Murmur.Group( name=self.name, inherit=self.inherit, inheritable=self.inheritable ) @staticmethod def from_dict(group_dict: Dict[str, object]) -> "ChannelGroup": return ChannelGroup( name=group_dict["name"], inherit=group_dict.get("inherit", True), inheritable=group_dict.get("inheritable", True) ) class ChannelPermission: def __init__(self, group: str = "all", userid: int = -1, allow: int = 0, deny: int = 0, recursive: bool = True, enabled: bool = True): self.group = group self.userid = userid self.allow = allow self.deny = deny self.recursive = recursive self.enabled = enabled def get_acl(self) -> Murmur.ACL: return Murmur.ACL( applyHere=self.enabled, applySubs=self.recursive, userid=self.userid, group=self.group, allow=self.allow, deny=self.deny) @staticmethod def from_dict(permission_dict: Dict[str, object]) -> "ChannelPermission": allow = ChannelPermission._parse_rules( permission_dict.get("allow", [])) deny = ChannelPermission._parse_rules( permission_dict.get("deny", [])) return ChannelPermission( group=permission_dict["group"], allow=allow, deny=deny, recursive=permission_dict.get("recursive", True), enabled=permission_dict.get("enabled", True)) @staticmethod def _parse_rule(rule: str) -> int: return getattr(Murmur, "Permission" + rule) @staticmethod def _parse_rules(rules: List[str]) -> int: result = 0 for rule in rules: result |= ChannelPermission._parse_rule(rule) return result class Channel: def __init__(self, name: str, description: str = "", position: int = 0, groups: List[ChannelGroup] = [], permissions: List[ChannelPermission] = [], inherit: bool = True, isdefault: bool = False, parent: "Channel" = None): self.id = -1 self.name = name self.description = description self.position = position self.groups = groups self.permissions = permissions self.inherit = inherit self.isdefault = isdefault self.parent = parent self.children = [] def __str__(self) -> str: return self.name def get_state(self) -> Murmur.Channel: if self.parent is None: parent_id = 0 else: parent_id = self.parent.id return Murmur.Channel( id=self.id, name=self.name, description=self.description, position=self.position, parent=parent_id) def add(self, channel: "Channel") -> None: self.children.append(channel) channel.parent = self @staticmethod def from_dict(channel_dict: Dict[str, object]) -> "Channel": if "parent" in channel_dict: parent = Channel(channel_dict["parent"]) else: parent = None groups = [] for group_name, group_dict in channel_dict.get("groups", {}).items(): group_dict["name"] = group_name group = ChannelGroup.from_dict(group_dict) groups.append(group) permissions = [] for permission_dict in channel_dict.get("permissions", []): permission = ChannelPermission.from_dict(permission_dict) permissions.append(permission) return Channel( name=channel_dict["name"], description=channel_dict.get("description", ''), position=channel_dict.get("position", 0), groups=groups, permissions=permissions, inherit=channel_dict.get("inherit", True), isdefault=channel_dict.get("isdefault", False), parent=parent) class MurmurServer: def __init__(self, server: Murmur.Server): self._server = server def clear_acls(self, channel_id=0): _, groups, inherit = self._server.getACL(0) self._server.setACL(channel_id, [], groups, inherit) def clear_channels(self) -> None: for channel_state in self._server.getChannels().values(): if channel_state.id != 0 and channel_state.parent == 0: self._server.removeChannel(channel_state.id) self.clear_acls() def set_channel_acls(self, channel: Channel) -> None: acls = [] for permission in channel.permissions: acl = permission.get_acl() acls.append(acl) _, current_groups, _ = self._server.getACL(channel.id) current_group_names = {group.name: group for group in current_groups} groups = [] for group in channel.groups: group_state = group.get_state() if group.name in current_group_names: current_group_state = current_group_names[group.name] group_state.add = current_group_state.add group_state.remove = current_group_state.remove del current_group_names[group.name] groups.append(group_state) groups.extend(current_group_names.values()) self._server.setACL(channel.id, acls, groups, channel.inherit) def create_channels(self, channel: Channel) -> None: if channel.parent is None: channel.id = 0 else: channel.id = self._server.addChannel( channel.name, channel.parent.id) self._server.setChannelState(channel.get_state()) self.set_channel_acls(channel) if channel.isdefault: self._server.setConf("defaultchannel", str(channel.id)) for child in channel.children: self.create_channels(child) class Config: def __init__(self, filename: str = None): self.filename = filename self._data = {} self.load(filename) def load(self, filename: str = None): if not filename: for config_path in CONFIG_SEARCH_PATH: filename = os.path.join(config_path, CONFIG_DEFAULT_NAME) if os.path.exists(filename) and os.path.isfile(filename): break with open(filename, 'r') as ifd: self._data = yaml.safe_load(ifd) def parse_channels(self) -> Channel: channels = [] for channel_dict in self._data: channel = Channel.from_dict(channel_dict) channels.append(channel) channel_names = {channel.name: channel for channel in channels} for channel in channels: if channel.parent is None: continue if channel.parent.name not in channel_names: continue channel_names[channel.parent.name].add(channel) root_channels = [channel for channel in channels if channel.parent is None] if len(root_channels) == 0: raise RuntimeError("No root channel found.") if len(root_channels) > 1: root_names = root_channels.join(', ') raise RuntimeError(f"Multiple root channels found: {root_names}") default_channels = [channel for channel in channels if channel.isdefault] if len(default_channels) > 1: default_names = default_channels.join(', ') raise RuntimeError("Multiple default channels found: " f"{default_names}") return root_channels[0] def main(): parser = argparse.ArgumentParser( description="Manage Mumble server through ICE connection", formatter_class=argparse.ArgumentDefaultsHelpFormatter ) parser.add_argument("--host", "-H", default=ICE_DEFAULT_HOST, help="ICE host or ip") parser.add_argument("--port", "-p", default=ICE_DEFAULT_PORT, help="ICE port number") parser.add_argument("--timeout", "-t", default=ICE_DEFAULT_TIMEOUT, help="Connection timeout in milliseconds") parser.add_argument("--secret", "-s", default=os.environ.get("MICE_SECRET"), help="ICE write secret") parser.add_argument("--server-id", "-i", type=int, default=0, help="Server ID to manage") parser.add_argument("--config", "-c", help="Path to the config file.") args = parser.parse_args() config = Config(args.config) root_channel = config.parse_channels() server_id = args.server_id try: with Mice(args.host, args.port, args.secret, args.timeout) as mice: if not 0 <= server_id < len(mice.servers): raise RuntimeError(f"Cannot find server with ID {server_id}") server = mice.servers[server_id] server.clear_channels() server.create_channels(root_channel) except RuntimeError as e: logging.error(e, exc_info=e) return 1 if __name__ == "__main__": sys.exit(main())