using Gee; using Xmpp; using Dino.Entities; namespace Dino { public class ConnectionManager : Object { public signal void stream_opened(Account account, XmppStream stream); public signal void stream_attached_modules(Account account, XmppStream stream); public signal void connection_state_changed(Account account, ConnectionState state); public signal void connection_error(Account account, ConnectionError error); public enum ConnectionState { CONNECTED, CONNECTING, DISCONNECTED } private HashMap connections = new HashMap(Account.hash_func, Account.equals_func); private HashMap connection_errors = new HashMap(Account.hash_func, Account.equals_func); private NetworkMonitor? network_monitor; private ModuleManager module_manager; public string? log_options; public class ConnectionError { public enum Source { CONNECTION, SASL, TLS, STREAM_ERROR } public enum Reconnect { NOW, LATER, NEVER } public Source source; public string? identifier; public Reconnect reconnect_recomendation { get; set; default=Reconnect.NOW; } public ConnectionError(Source source, string? identifier) { this.source = source; this.identifier = identifier; } } private class Connection { public string uuid { get; set; } public XmppStream? stream { get; set; } public ConnectionState connection_state { get; set; default = ConnectionState.DISCONNECTED; } public bool acked; public Connection() { reset.begin(); } public async void reset() { acked = false; if (stream != null) { stream.detach_modules(); yield stream.disconnect(); } stream = null; uuid = Xmpp.random_uuid(); } public void make_offline() { Xmpp.Presence.Stanza presence = new Xmpp.Presence.Stanza(); presence.type_ = Xmpp.Presence.Stanza.TYPE_UNAVAILABLE; if (stream != null) { stream.get_module(Presence.Module.IDENTITY).send_presence(stream, presence); } } public async void disconnect_account() { make_offline(); if (stream != null) { try { yield stream.disconnect(); } catch (Error e) { debug("Error disconnecting stream: %s", e.message); } } } } private async void on_network_changed(bool state) { debug(@"on network changed=$(state)"); if (state) { check_reconnects(); } else { make_offline_all(); } } public ConnectionManager(ModuleManager module_manager) { this.module_manager = module_manager; network_monitor = GLib.NetworkMonitor.get_default(); if (network_monitor != null) { network_monitor.network_changed.connect(on_network_changed); } } public XmppStream? get_stream(Account account) { if (get_state(account) == ConnectionState.CONNECTED) { return connections[account].stream; } return null; } public ConnectionState get_state(Account account) { if (connections.has_key(account)){ return connections[account].connection_state; } return ConnectionState.DISCONNECTED; } public ConnectionError? get_error(Account account) { if (connection_errors.has_key(account)) { return connection_errors[account]; } return null; } public Collection get_managed_accounts() { return connections.keys; } public void connect_account(Account account) { if (!connections.has_key(account)) { connections[account] = new Connection(); connect_stream.begin(account); } else { check_reconnect(account); } } public void make_offline_all() { foreach (Account account in connections.keys) { make_offline(account); } } private void make_offline(Account account) { connections[account].make_offline(); change_connection_state(account, ConnectionState.DISCONNECTED); } public async void disconnect_account(Account account) { if (connections.has_key(account)) { make_offline(account); connections[account].disconnect_account.begin(); connections.unset(account); } } private async void connect_stream(Account account, string? resource = null) { if (!connections.has_key(account)) return; debug("[%s] (Maybe) Establishing a new connection", account.bare_jid.to_string()); connection_errors.unset(account); if (resource == null) resource = account.resourcepart; XmppStreamResult stream_result; change_connection_state(account, ConnectionState.CONNECTING); stream_result = yield Xmpp.establish_stream(account.bare_jid, module_manager.get_modules(account, resource), log_options, (peer_cert, errors) => { change_connection_state(account, ConnectionState.DISCONNECTED); return on_invalid_certificate(account.domainpart, peer_cert, errors); } ); connections[account].stream = stream_result.stream; if (stream_result.stream == null) { if (stream_result.tls_errors != null) { set_connection_error(account, new ConnectionError(ConnectionError.Source.TLS, null) { reconnect_recomendation=ConnectionError.Reconnect.NEVER}); return; } debug("[%s] Could not connect", account.bare_jid.to_string()); change_connection_state(account, ConnectionState.DISCONNECTED); return; } XmppStream stream = stream_result.stream; debug("[%s] New connection with resource %s: %p", account.bare_jid.to_string(), resource, stream); stream.attached_modules.connect((stream) => { stream_attached_modules(account, stream); change_connection_state(account, ConnectionState.CONNECTED); // stream.get_module(Xep.Muji.Module.IDENTITY).join_call(stream, new Jid("test@muc.poez.io"), true); }); stream.get_module(Sasl.Module.IDENTITY).received_auth_failure.connect((stream, node) => { set_connection_error(account, new ConnectionError(ConnectionError.Source.SASL, null)); }); string connection_uuid = connections[account].uuid; stream.received_node.connect(() => { if (connections[account].uuid == connection_uuid) { } else { warning("Got node for outdated connection"); } }); stream_opened(account, stream); try { yield stream.loop(); } catch (Error e) { debug("[%s %p] Connection error: %s", account.bare_jid.to_string(), stream, e.message); change_connection_state(account, ConnectionState.DISCONNECTED); connections[account].reset(); StreamError.Flag? flag = stream.get_flag(StreamError.Flag.IDENTITY); if (flag != null) { warning(@"[%s %p] Stream Error: %s", account.bare_jid.to_string(), stream, flag.error_type); set_connection_error(account, new ConnectionError(ConnectionError.Source.STREAM_ERROR, flag.error_type)); if (flag.resource_rejected) { connect_stream.begin(account, account.resourcepart + "-" + random_uuid()); return; } } ConnectionError? error = connection_errors[account]; if (error != null && error.source == ConnectionError.Source.SASL) { return; } check_reconnect(account); } } private void check_reconnects() { foreach (Account account in connections.keys) { check_reconnect(account); } } private void check_ping_reconnect(Xmpp.Xep.Ping.Module identity, XmppStream stream, Account account) { identity.send_ping.begin(stream, account.bare_jid.domain_jid, () => { if (connections[account].stream != stream) return; connections[account].acked = true; change_connection_state(account, ConnectionState.CONNECTED); }); Timeout.add_seconds(10, () => { if (!connections.has_key(account)) return false; if (connections[account].stream != stream) return false; if (connections[account].acked) return false; // Reconnect. Nothing gets through the stream. debug("[%s %p] Ping timeouted. Reconnecting", account.bare_jid.to_string(), stream); change_connection_state(account, ConnectionState.DISCONNECTED); connections[account].reset(); connect_stream.begin(account); return false; }); } private void check_reconnect(Account account, bool directly_reconnect = false) { if (!connections.has_key(account)) return; var cancellable = new Cancellable(); debug(@"account.domainpart=$(account.domainpart)"); try { var address = new GLib.NetworkAddress(account.domainpart, 5222); var reachable = network_monitor.can_reach(address, cancellable); debug(@"can-reach: $(reachable)"); if (reachable) { Xmpp.Xep.Ping.Module? identity = null; if (connections[account].connection_state == ConnectionState.CONNECTING) return; XmppStream? stream = connections[account].stream; connections[account].acked = false; if (stream != null && (identity = stream.get_module(Xep.Ping.Module.IDENTITY)) != null) { change_connection_state(account, ConnectionState.CONNECTING); check_ping_reconnect(identity, stream, account); } else { change_connection_state(account, ConnectionState.DISCONNECTED); connections[account].reset(); connect_stream.begin(account); } } } catch (Error e) { print ("Error: %s\n", e.message); } } private void change_connection_state(Account account, ConnectionState state) { if (connections.has_key(account)) { connections[account].connection_state = state; connection_state_changed(account, state); } } private void set_connection_error(Account account, ConnectionError error) { connection_errors[account] = error; connection_error(account, error); } public static bool on_invalid_certificate(string domain, TlsCertificate peer_cert, TlsCertificateFlags errors) { if (domain.has_suffix(".onion") && errors == TlsCertificateFlags.UNKNOWN_CA) { // It's barely possible for .onion servers to provide a non-self-signed cert. // But that's fine because encryption is provided independently though TOR. warning("Accepting TLS certificate from unknown CA from .onion address %s", domain); return true; } return false; } } }