ansible-role-mumble/files/murmur/scripts/mice.py

346 lines
11 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/python3
import argparse
import logging
import os
import sys
from typing import List, Dict
import Ice
import yaml
import Murmur
CONFIG_SEARCH_PATH = [".", "/etc"]
CONFIG_DEFAULT_NAME = "mice.yml"
ICE_DEFAULT_HOST = "127.0.0.1"
ICE_DEFAULT_PORT = 6502
ICE_DEFAULT_TIMEOUT = 1000
class Mice:
def __init__(self, host: str = ICE_DEFAULT_HOST,
port: int = ICE_DEFAULT_PORT, secret: str = None,
timeout: int = ICE_DEFAULT_TIMEOUT):
logging.debug("Initializing ICE connection")
properties = Ice.createProperties()
properties.setProperty("Ice.ImplicitContext", "Shared")
init_data = Ice.InitializationData()
init_data.properties = properties
self._communicator = Ice.initialize(init_data)
if secret:
logging.debug("Providing ICE secret")
self._communicator.getImplicitContext().put("secret", secret)
logging.debug("Creating meta proxy")
proxy_str = f"Meta: tcp -h {host} -p {port} -t {timeout}"
proxy = self._communicator.stringToProxy(proxy_str)
self.meta = Murmur.MetaPrx.checkedCast(proxy)
if not self.meta:
self._communicator.destroy()
raise RuntimeError(f"Invalid Proxy: {proxy_str}")
try:
logging.debug("Retrieving the list of Mumble servers")
servers = self.meta.getAllServers()
except Murmur.InvalidSecretException:
self._communicator.destroy()
raise RuntimeError(f"Failed to connect to {host} "
f"on port {port}: Invalid secret.") from None
self.servers = [MurmurServer(server) for server in servers]
if len(self.servers) == 0:
self._communicator.destroy()
raise RuntimeError("No Mumble server found")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self._communicator.destroy()
class ChannelGroup:
def __init__(self, name: str, inherit: bool = True,
inheritable: bool = True):
self.name = name
self.inherit = inherit
self.inheritable = inheritable
def get_state(self) -> Murmur.Group:
return Murmur.Group(
name=self.name,
inherit=self.inherit,
inheritable=self.inheritable
)
@staticmethod
def from_dict(group_dict: Dict[str, object]) -> "ChannelGroup":
return ChannelGroup(
name=group_dict["name"],
inherit=group_dict.get("inherit", True),
inheritable=group_dict.get("inheritable", True)
)
class ChannelPermission:
def __init__(self, group: str = "all", userid: int = -1,
allow: int = 0, deny: int = 0,
recursive: bool = True, enabled: bool = True):
self.group = group
self.userid = userid
self.allow = allow
self.deny = deny
self.recursive = recursive
self.enabled = enabled
def get_acl(self) -> Murmur.ACL:
return Murmur.ACL(
applyHere=self.enabled,
applySubs=self.recursive,
userid=self.userid,
group=self.group,
allow=self.allow,
deny=self.deny)
@staticmethod
def from_dict(permission_dict: Dict[str, object]) -> "ChannelPermission":
allow = ChannelPermission._parse_rules(
permission_dict.get("allow", []))
deny = ChannelPermission._parse_rules(
permission_dict.get("deny", []))
return ChannelPermission(
group=permission_dict["group"],
allow=allow,
deny=deny,
recursive=permission_dict.get("recursive", True),
enabled=permission_dict.get("enabled", True))
@staticmethod
def _parse_rule(rule: str) -> int:
return getattr(Murmur, "Permission" + rule)
@staticmethod
def _parse_rules(rules: List[str]) -> int:
result = 0
for rule in rules:
result |= ChannelPermission._parse_rule(rule)
return result
class Channel:
def __init__(self, name: str, description: str = "", position: int = 0,
groups: List[ChannelGroup] = [],
permissions: List[ChannelPermission] = [],
inherit: bool = True, isdefault: bool = False,
parent: "Channel" = None):
self.id = -1
self.name = name
self.description = description
self.position = position
self.groups = groups
self.permissions = permissions
self.inherit = inherit
self.isdefault = isdefault
self.parent = parent
self.children = []
def __str__(self) -> str:
return self.name
def get_state(self) -> Murmur.Channel:
if self.parent is None:
parent_id = 0
else:
parent_id = self.parent.id
return Murmur.Channel(
id=self.id,
name=self.name,
description=self.description,
position=self.position,
parent=parent_id)
def add(self, channel: "Channel") -> None:
self.children.append(channel)
channel.parent = self
@staticmethod
def from_dict(channel_dict: Dict[str, object]) -> "Channel":
if "parent" in channel_dict:
parent = Channel(channel_dict["parent"])
else:
parent = None
groups = []
for group_name, group_dict in channel_dict.get("groups", {}).items():
group_dict["name"] = group_name
group = ChannelGroup.from_dict(group_dict)
groups.append(group)
permissions = []
for permission_dict in channel_dict.get("permissions", []):
permission = ChannelPermission.from_dict(permission_dict)
permissions.append(permission)
return Channel(
name=channel_dict["name"],
description=channel_dict.get("description", ''),
position=channel_dict.get("position", 0),
groups=groups,
permissions=permissions,
inherit=channel_dict.get("inherit", True),
isdefault=channel_dict.get("isdefault", False),
parent=parent)
class MurmurServer:
def __init__(self, server: Murmur.Server):
self._server = server
def clear_acls(self, channel_id=0):
_, groups, inherit = self._server.getACL(0)
self._server.setACL(channel_id, [], groups, inherit)
def clear_channels(self) -> None:
for channel_state in self._server.getChannels().values():
if channel_state.id != 0 and channel_state.parent == 0:
self._server.removeChannel(channel_state.id)
self.clear_acls()
def set_channel_acls(self, channel: Channel) -> None:
acls = []
for permission in channel.permissions:
acl = permission.get_acl()
acls.append(acl)
_, current_groups, _ = self._server.getACL(channel.id)
current_group_names = {group.name: group for group in current_groups}
groups = []
for group in channel.groups:
group_state = group.get_state()
if group.name in current_group_names:
current_group_state = current_group_names[group.name]
group_state.add = current_group_state.add
group_state.remove = current_group_state.remove
del current_group_names[group.name]
groups.append(group_state)
groups.extend(current_group_names.values())
self._server.setACL(channel.id, acls, groups, channel.inherit)
def create_channels(self, channel: Channel) -> None:
if channel.parent is None:
channel.id = 0
else:
channel.id = self._server.addChannel(
channel.name,
channel.parent.id)
self._server.setChannelState(channel.get_state())
self.set_channel_acls(channel)
if channel.isdefault:
self._server.setConf("defaultchannel", str(channel.id))
for child in channel.children:
self.create_channels(child)
class Config:
def __init__(self, filename: str = None):
self.filename = filename
self._data = {}
self.load(filename)
def load(self, filename: str = None):
if not filename:
for config_path in CONFIG_SEARCH_PATH:
filename = os.path.join(config_path, CONFIG_DEFAULT_NAME)
if os.path.exists(filename) and os.path.isfile(filename):
break
with open(filename, 'r') as ifd:
self._data = yaml.safe_load(ifd)
def parse_channels(self) -> Channel:
channels = []
for channel_dict in self._data["channels"]:
channel = Channel.from_dict(channel_dict)
channels.append(channel)
channel_names = {channel.name: channel for channel in channels}
for channel in channels:
if channel.parent is None:
continue
if channel.parent.name not in channel_names:
continue
channel_names[channel.parent.name].add(channel)
root_channels = [channel for channel in channels
if channel.parent is None]
if len(root_channels) == 0:
raise RuntimeError("No root channel found.")
if len(root_channels) > 1:
root_names = root_channels.join(', ')
raise RuntimeError(f"Multiple root channels found: {root_names}")
default_channels = [channel for channel in channels
if channel.isdefault]
if len(default_channels) > 1:
default_names = default_channels.join(', ')
raise RuntimeError("Multiple default channels found: "
f"{default_names}")
return root_channels[0]
def main():
parser = argparse.ArgumentParser(
description="Manage Mumble server through ICE connection",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument("--host", "-H", default=ICE_DEFAULT_HOST,
help="ICE host or ip")
parser.add_argument("--port", "-p", default=ICE_DEFAULT_PORT,
help="ICE port number")
parser.add_argument("--timeout", "-t", default=ICE_DEFAULT_TIMEOUT,
help="Connection timeout in milliseconds")
parser.add_argument("--secret", "-s",
default=os.environ.get("MICE_SECRET"),
help="ICE write secret")
parser.add_argument("--server-id", "-i", type=int, default=0,
help="Server ID to manage")
parser.add_argument("--config", "-c",
help="Path to the config file.")
args = parser.parse_args()
config = Config(args.config)
root_channel = config.parse_channels()
server_id = args.server_id
try:
with Mice(args.host, args.port, args.secret, args.timeout) as mice:
if not 0 <= server_id < len(mice.servers):
raise RuntimeError(f"Cannot find server with ID {server_id}")
server = mice.servers[server_id]
server.clear_channels()
server.create_channels(root_channel)
except RuntimeError as e:
logging.error(e, exc_info=e)
return 1
if __name__ == "__main__":
sys.exit(main())