WIP fix stream management/lack of resume

This commit is contained in:
Xavier Del Campo Romero 2023-10-17 02:26:10 +02:00
parent 26844a2b70
commit 44c5ea24fa
Signed by: xavi
GPG Key ID: 84FF3612A9BF43F2
3 changed files with 55 additions and 20 deletions

View File

@ -74,6 +74,7 @@ public class Conversation : Object {
notify_setting = (NotifySetting) row[db.conversation.notification];
send_typing = (Setting) row[db.conversation.send_typing];
send_marker = (Setting) row[db.conversation.send_marker];
debug(@"account=$(account.display_name),encryption=$(encryption),resource=$(resource ?? "null")");
notify.connect(on_update);
}

View File

@ -20,6 +20,7 @@ public class ConnectionManager : Object {
private HashMap<Account, Connection> connections = new HashMap<Account, Connection>(Account.hash_func, Account.equals_func);
private HashMap<Account, ConnectionError> connection_errors = new HashMap<Account, ConnectionError>(Account.hash_func, Account.equals_func);
private HashMap<Account, string> session_ids = new HashMap<Account, string>(Account.hash_func, Account.equals_func);
private NetworkMonitor? network_monitor;
private ModuleManager module_manager;
public string? log_options;
@ -67,7 +68,7 @@ public class ConnectionManager : Object {
try {
yield stream.disconnect();
} catch (IOError e) {
warning(@"caught IOError $(e.code)");
warning(@"caught IOError: $(e.message)");
}
}
stream = null;
@ -163,7 +164,7 @@ public class ConnectionManager : Object {
public async void disconnect_account(Account account) {
if (connections.has_key(account)) {
make_offline(account);
connections[account].disconnect_account.begin();
yield connections[account].disconnect_account();
connections.unset(account);
}
}
@ -184,7 +185,6 @@ public class ConnectionManager : Object {
change_connection_state(account, ConnectionState.DISCONNECTED);
return on_invalid_certificate(account.domainpart, peer_cert, errors); }
);
connections[account].stream = stream_result.stream;
if (stream_result.stream == null) {
if (stream_result.tls_errors != null) {
@ -192,7 +192,7 @@ public class ConnectionManager : Object {
return;
}
debug("[%s] Could not connect", account.bare_jid.to_string());
warning("[%s] Could not connect", account.bare_jid.to_string());
change_connection_state(account, ConnectionState.DISCONNECTED);
@ -201,7 +201,18 @@ public class ConnectionManager : Object {
XmppStream stream = stream_result.stream;
debug("[%s] New connection with resource %s: %p", account.bare_jid.to_string(), resource, stream);
print("[%s] New connection with resource %s: %p\n", account.bare_jid.to_string(), resource, stream);
var? module = stream.get_module(Xep.StreamManagement.Module.IDENTITY);
if (module != null) {
if (session_ids[account] != null) {
module.session_id = session_ids[account];
}
module.got_session_id.connect((session_id) => {
session_ids[account] = session_id;
});
}
stream.attached_modules.connect((stream) => {
stream_attached_modules(account, stream);
@ -220,6 +231,7 @@ public class ConnectionManager : Object {
warning("Got node for outdated connection");
}
});
connections[account].stream = stream_result.stream;
stream_opened(account, stream);
try {
@ -256,13 +268,14 @@ public class ConnectionManager : Object {
}
}
private void check_ping_reconnect(Xmpp.Xep.Ping.Module identity,
XmppStream stream, Account account) {
identity.send_ping.begin(stream, account.bare_jid.domain_jid, () => {
if (connections[account].stream != stream) return;
connections[account].acked = true;
change_connection_state(account, ConnectionState.CONNECTED);
});
private async void check_ping_reconnect(Account account) {
yield connections[account].reset();
yield connect_stream(account);
XmppStream stream = connections[account].stream;
var? identity = stream.get_module(Xep.Ping.Module.IDENTITY);
if (identity == null) return;
Timeout.add_seconds(10, () => {
if (!connections.has_key(account)) return false;
@ -270,7 +283,7 @@ public class ConnectionManager : Object {
if (connections[account].acked) return false;
// Reconnect. Nothing gets through the stream.
debug("[%s %p] Ping timeouted. Reconnecting", account.bare_jid.to_string(), stream);
warning("[%s %p] Ping timeouted. Reconnecting", account.bare_jid.to_string(), stream);
change_connection_state(account, ConnectionState.DISCONNECTED);
connections[account].reset.begin((_, res) => {
@ -279,9 +292,20 @@ public class ConnectionManager : Object {
});
return false;
});
yield identity.send_ping(stream, account.bare_jid.domain_jid);
connections[account].acked = true;
change_connection_state(account, ConnectionState.CONNECTED);
var? module = stream.get_module(Xep.StreamManagement.Module.IDENTITY);
if (module != null) {
module.session_id = session_ids[account];
stream.require_setup();
}
}
private void check_reconnect(Account account, bool directly_reconnect = false) {
private void check_reconnect(Account account) {
if (!connections.has_key(account)) return;
var cancellable = new Cancellable();
@ -294,16 +318,14 @@ public class ConnectionManager : Object {
debug(@"can-reach: $(reachable)");
if (reachable) {
Xmpp.Xep.Ping.Module? identity = null;
if (connections[account].connection_state == ConnectionState.CONNECTING) return;
XmppStream? stream = connections[account].stream;
connections[account].acked = false;
if (stream != null
&& (identity = stream.get_module(Xep.Ping.Module.IDENTITY)) != null) {
if (stream != null) {
change_connection_state(account, ConnectionState.CONNECTING);
check_ping_reconnect(identity, stream, account);
check_ping_reconnect.begin(account);
}
else {
change_connection_state(account, ConnectionState.DISCONNECTED);
@ -316,7 +338,7 @@ public class ConnectionManager : Object {
}
} catch (Error e) {
print ("Error: %s\n", e.message);
warning(@"$(e.message)");
}
}

View File

@ -7,6 +7,8 @@ public const string NS_URI = "urn:xmpp:sm:3";
public class Module : XmppStreamNegotiationModule, WriteNodeFunc {
public static ModuleIdentity<Module> IDENTITY = new ModuleIdentity<Module>(NS_URI, "0198_stream_management");
public signal void got_session_id(string session_id);
public int h_inbound = 0;
public int h_outbound = 0;
@ -127,11 +129,12 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc {
h_inbound++;
}
private void check_resume(XmppStream stream) {
public void check_resume(XmppStream stream) {
if (stream_has_sm_feature(stream) && session_id != null) {
StanzaNode node = new StanzaNode.build("resume", NS_URI).add_self_xmlns()
.put_attribute("h", h_inbound.to_string())
.put_attribute("previd", session_id);
session_id = null;
write_node.begin(stream, node);
stream.add_flag(new Flag());
}
@ -158,6 +161,8 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc {
if (node.name == "enabled") {
h_inbound = 0;
session_id = node.get_attribute("id", NS_URI);
print(@"this=%p, new session_id=$(session_id)\n", this);
got_session_id(session_id);
flags = stream.flags;
((IoXmppStream)stream).write_obj = this;
} else if (node.name == "resumed") {
@ -219,6 +224,13 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc {
}
private bool stream_has_sm_feature(XmppStream stream) {
print("stream=%p\n", stream);
if (session_id != null) {
print(@"session_id=$(session_id)\n");
}
if (stream.features.get_subnode("sm", NS_URI) != null) {
print(@"stream.features.get_subnode=$(stream.features.get_subnode("sm", NS_URI))\n");
}
return stream.features.get_subnode("sm", NS_URI) != null;
}
}