346 lines
11 KiB
Python
346 lines
11 KiB
Python
#!/usr/bin/python3
|
|
|
|
import argparse
|
|
import logging
|
|
import os
|
|
import sys
|
|
from typing import List, Dict
|
|
|
|
import Ice
|
|
import yaml
|
|
|
|
import Murmur
|
|
|
|
CONFIG_SEARCH_PATH = [".", "/etc"]
|
|
CONFIG_DEFAULT_NAME = "mice.yml"
|
|
|
|
ICE_DEFAULT_HOST = "127.0.0.1"
|
|
ICE_DEFAULT_PORT = 6502
|
|
ICE_DEFAULT_TIMEOUT = 1000
|
|
|
|
|
|
class Mice:
|
|
def __init__(self, host: str = ICE_DEFAULT_HOST,
|
|
port: int = ICE_DEFAULT_PORT, secret: str = None,
|
|
timeout: int = ICE_DEFAULT_TIMEOUT):
|
|
logging.debug("Initializing ICE connection")
|
|
properties = Ice.createProperties()
|
|
properties.setProperty("Ice.ImplicitContext", "Shared")
|
|
init_data = Ice.InitializationData()
|
|
init_data.properties = properties
|
|
self._communicator = Ice.initialize(init_data)
|
|
|
|
if secret:
|
|
logging.debug("Providing ICE secret")
|
|
self._communicator.getImplicitContext().put("secret", secret)
|
|
|
|
logging.debug("Creating meta proxy")
|
|
proxy_str = f"Meta: tcp -h {host} -p {port} -t {timeout}"
|
|
proxy = self._communicator.stringToProxy(proxy_str)
|
|
self.meta = Murmur.MetaPrx.checkedCast(proxy)
|
|
|
|
if not self.meta:
|
|
self._communicator.destroy()
|
|
raise RuntimeError(f"Invalid Proxy: {proxy_str}")
|
|
|
|
try:
|
|
logging.debug("Retrieving the list of Mumble servers")
|
|
servers = self.meta.getAllServers()
|
|
except Murmur.InvalidSecretException:
|
|
self._communicator.destroy()
|
|
raise RuntimeError(f"Failed to connect to {host} "
|
|
f"on port {port}: Invalid secret.") from None
|
|
self.servers = [MurmurServer(server) for server in servers]
|
|
|
|
if len(self.servers) == 0:
|
|
self._communicator.destroy()
|
|
raise RuntimeError("No Mumble server found")
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
self._communicator.destroy()
|
|
|
|
|
|
class ChannelGroup:
|
|
def __init__(self, name: str, inherit: bool = True,
|
|
inheritable: bool = True):
|
|
self.name = name
|
|
self.inherit = inherit
|
|
self.inheritable = inheritable
|
|
|
|
def get_state(self) -> Murmur.Group:
|
|
return Murmur.Group(
|
|
name=self.name,
|
|
inherit=self.inherit,
|
|
inheritable=self.inheritable
|
|
)
|
|
|
|
@staticmethod
|
|
def from_dict(group_dict: Dict[str, object]) -> "ChannelGroup":
|
|
return ChannelGroup(
|
|
name=group_dict["name"],
|
|
inherit=group_dict.get("inherit", True),
|
|
inheritable=group_dict.get("inheritable", True)
|
|
)
|
|
|
|
|
|
class ChannelPermission:
|
|
def __init__(self, group: str = "all", userid: int = -1,
|
|
allow: int = 0, deny: int = 0,
|
|
recursive: bool = True, enabled: bool = True):
|
|
self.group = group
|
|
self.userid = userid
|
|
self.allow = allow
|
|
self.deny = deny
|
|
self.recursive = recursive
|
|
self.enabled = enabled
|
|
|
|
def get_acl(self) -> Murmur.ACL:
|
|
return Murmur.ACL(
|
|
applyHere=self.enabled,
|
|
applySubs=self.recursive,
|
|
userid=self.userid,
|
|
group=self.group,
|
|
allow=self.allow,
|
|
deny=self.deny)
|
|
|
|
@staticmethod
|
|
def from_dict(permission_dict: Dict[str, object]) -> "ChannelPermission":
|
|
allow = ChannelPermission._parse_rules(
|
|
permission_dict.get("allow", []))
|
|
deny = ChannelPermission._parse_rules(
|
|
permission_dict.get("deny", []))
|
|
|
|
return ChannelPermission(
|
|
group=permission_dict["group"],
|
|
allow=allow,
|
|
deny=deny,
|
|
recursive=permission_dict.get("recursive", True),
|
|
enabled=permission_dict.get("enabled", True))
|
|
|
|
@staticmethod
|
|
def _parse_rule(rule: str) -> int:
|
|
return getattr(Murmur, "Permission" + rule)
|
|
|
|
@staticmethod
|
|
def _parse_rules(rules: List[str]) -> int:
|
|
result = 0
|
|
for rule in rules:
|
|
result |= ChannelPermission._parse_rule(rule)
|
|
return result
|
|
|
|
|
|
class Channel:
|
|
def __init__(self, name: str, description: str = "", position: int = 0,
|
|
groups: List[ChannelGroup] = [],
|
|
permissions: List[ChannelPermission] = [],
|
|
inherit: bool = True, isdefault: bool = False,
|
|
parent: "Channel" = None):
|
|
self.id = -1
|
|
self.name = name
|
|
self.description = description
|
|
self.position = position
|
|
self.groups = groups
|
|
self.permissions = permissions
|
|
self.inherit = inherit
|
|
self.isdefault = isdefault
|
|
self.parent = parent
|
|
self.children = []
|
|
|
|
def __str__(self) -> str:
|
|
return self.name
|
|
|
|
def get_state(self) -> Murmur.Channel:
|
|
if self.parent is None:
|
|
parent_id = 0
|
|
else:
|
|
parent_id = self.parent.id
|
|
|
|
return Murmur.Channel(
|
|
id=self.id,
|
|
name=self.name,
|
|
description=self.description,
|
|
position=self.position,
|
|
parent=parent_id)
|
|
|
|
def add(self, channel: "Channel") -> None:
|
|
self.children.append(channel)
|
|
channel.parent = self
|
|
|
|
@staticmethod
|
|
def from_dict(channel_dict: Dict[str, object]) -> "Channel":
|
|
if "parent" in channel_dict:
|
|
parent = Channel(channel_dict["parent"])
|
|
else:
|
|
parent = None
|
|
|
|
groups = []
|
|
for group_name, group_dict in channel_dict.get("groups", {}).items():
|
|
group_dict["name"] = group_name
|
|
group = ChannelGroup.from_dict(group_dict)
|
|
groups.append(group)
|
|
|
|
permissions = []
|
|
for permission_dict in channel_dict.get("permissions", []):
|
|
permission = ChannelPermission.from_dict(permission_dict)
|
|
permissions.append(permission)
|
|
|
|
return Channel(
|
|
name=channel_dict["name"],
|
|
description=channel_dict.get("description", ''),
|
|
position=channel_dict.get("position", 0),
|
|
groups=groups,
|
|
permissions=permissions,
|
|
inherit=channel_dict.get("inherit", True),
|
|
isdefault=channel_dict.get("isdefault", False),
|
|
parent=parent)
|
|
|
|
|
|
class MurmurServer:
|
|
def __init__(self, server: Murmur.Server):
|
|
self._server = server
|
|
|
|
def clear_acls(self, channel_id=0):
|
|
_, groups, inherit = self._server.getACL(0)
|
|
self._server.setACL(channel_id, [], groups, inherit)
|
|
|
|
def clear_channels(self) -> None:
|
|
for channel_state in self._server.getChannels().values():
|
|
if channel_state.id != 0 and channel_state.parent == 0:
|
|
self._server.removeChannel(channel_state.id)
|
|
self.clear_acls()
|
|
|
|
def set_channel_acls(self, channel: Channel) -> None:
|
|
acls = []
|
|
for permission in channel.permissions:
|
|
acl = permission.get_acl()
|
|
acls.append(acl)
|
|
|
|
_, current_groups, _ = self._server.getACL(channel.id)
|
|
current_group_names = {group.name: group for group in current_groups}
|
|
|
|
groups = []
|
|
for group in channel.groups:
|
|
group_state = group.get_state()
|
|
if group.name in current_group_names:
|
|
current_group_state = current_group_names[group.name]
|
|
group_state.add = current_group_state.add
|
|
group_state.remove = current_group_state.remove
|
|
del current_group_names[group.name]
|
|
groups.append(group_state)
|
|
groups.extend(current_group_names.values())
|
|
|
|
self._server.setACL(channel.id, acls, groups, channel.inherit)
|
|
|
|
def create_channels(self, channel: Channel) -> None:
|
|
if channel.parent is None:
|
|
channel.id = 0
|
|
else:
|
|
channel.id = self._server.addChannel(
|
|
channel.name,
|
|
channel.parent.id)
|
|
|
|
self._server.setChannelState(channel.get_state())
|
|
self.set_channel_acls(channel)
|
|
|
|
if channel.isdefault:
|
|
self._server.setConf("defaultchannel", str(channel.id))
|
|
|
|
for child in channel.children:
|
|
self.create_channels(child)
|
|
|
|
|
|
class Config:
|
|
def __init__(self, filename: str = None):
|
|
self.filename = filename
|
|
self._data = {}
|
|
|
|
self.load(filename)
|
|
|
|
def load(self, filename: str = None):
|
|
if not filename:
|
|
for config_path in CONFIG_SEARCH_PATH:
|
|
filename = os.path.join(config_path, CONFIG_DEFAULT_NAME)
|
|
if os.path.exists(filename) and os.path.isfile(filename):
|
|
break
|
|
|
|
with open(filename, 'r') as ifd:
|
|
self._data = yaml.safe_load(ifd)
|
|
|
|
def parse_channels(self) -> Channel:
|
|
channels = []
|
|
for channel_dict in self._data["channels"]:
|
|
channel = Channel.from_dict(channel_dict)
|
|
channels.append(channel)
|
|
|
|
channel_names = {channel.name: channel for channel in channels}
|
|
for channel in channels:
|
|
if channel.parent is None:
|
|
continue
|
|
if channel.parent.name not in channel_names:
|
|
continue
|
|
channel_names[channel.parent.name].add(channel)
|
|
|
|
root_channels = [channel for channel in channels
|
|
if channel.parent is None]
|
|
|
|
if len(root_channels) == 0:
|
|
raise RuntimeError("No root channel found.")
|
|
|
|
if len(root_channels) > 1:
|
|
root_names = root_channels.join(', ')
|
|
raise RuntimeError(f"Multiple root channels found: {root_names}")
|
|
|
|
default_channels = [channel for channel in channels
|
|
if channel.isdefault]
|
|
|
|
if len(default_channels) > 1:
|
|
default_names = default_channels.join(', ')
|
|
raise RuntimeError("Multiple default channels found: "
|
|
f"{default_names}")
|
|
|
|
return root_channels[0]
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Manage Mumble server through ICE connection",
|
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter
|
|
)
|
|
parser.add_argument("--host", "-H", default=ICE_DEFAULT_HOST,
|
|
help="ICE host or ip")
|
|
parser.add_argument("--port", "-p", default=ICE_DEFAULT_PORT,
|
|
help="ICE port number")
|
|
parser.add_argument("--timeout", "-t", default=ICE_DEFAULT_TIMEOUT,
|
|
help="Connection timeout in milliseconds")
|
|
parser.add_argument("--secret", "-s",
|
|
default=os.environ.get("MICE_SECRET"),
|
|
help="ICE write secret")
|
|
parser.add_argument("--server-id", "-i", type=int, default=0,
|
|
help="Server ID to manage")
|
|
parser.add_argument("--config", "-c",
|
|
help="Path to the config file.")
|
|
args = parser.parse_args()
|
|
|
|
config = Config(args.config)
|
|
root_channel = config.parse_channels()
|
|
|
|
server_id = args.server_id
|
|
try:
|
|
with Mice(args.host, args.port, args.secret, args.timeout) as mice:
|
|
if not 0 <= server_id < len(mice.servers):
|
|
raise RuntimeError(f"Cannot find server with ID {server_id}")
|
|
server = mice.servers[server_id]
|
|
|
|
server.clear_channels()
|
|
server.create_channels(root_channel)
|
|
except RuntimeError as e:
|
|
logging.error(e, exc_info=e)
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|