Rewrite MAM logic and add MUC MAM

This commit is contained in:
fiaxh 2022-07-08 16:33:40 +02:00 committed by Xavier Del Campo Romero
parent b731e62e40
commit b0534dcf07
Signed by: xavi
GPG Key ID: 84FF3612A9BF43F2
19 changed files with 836 additions and 358 deletions

View File

@ -42,6 +42,7 @@ SOURCES
src/service/entity_info.vala
src/service/file_manager.vala
src/service/file_transfer_storage.vala
src/service/history_sync.vala
src/service/jingle_file_transfers.vala
src/service/message_correction.vala
src/service/message_processor.vala

View File

@ -22,6 +22,7 @@ public class Conversation : Object {
public Jid counterpart { get; private set; }
public string? nickname { get; set; }
public bool active { get; set; default = false; }
public DateTime active_last_changed { get; private set; }
private DateTime? _last_active;
public DateTime? last_active {
get { return _last_active; }
@ -63,6 +64,7 @@ public class Conversation : Object {
if (type_ == Conversation.Type.GROUPCHAT_PM) counterpart = counterpart.with_resource(resource);
nickname = type_ == Conversation.Type.GROUPCHAT ? resource : null;
active = row[db.conversation.active];
active_last_changed = new DateTime.from_unix_utc(row[db.conversation.active_last_changed]);
int64? last_active = row[db.conversation.last_active];
if (last_active != null) this.last_active = new DateTime.from_unix_utc(last_active);
encryption = (Encryption) row[db.conversation.encryption];
@ -78,12 +80,15 @@ public class Conversation : Object {
public void persist(Database db) {
this.db = db;
this.active_last_changed = new DateTime.now_utc();
var insert = db.conversation.insert()
.value(db.conversation.account_id, account.id)
.value(db.conversation.jid_id, db.get_jid_id(counterpart))
.value(db.conversation.type_, type_)
.value(db.conversation.encryption, encryption)
.value(db.conversation.active, active)
.value(db.conversation.active_last_changed, (long) active_last_changed.to_unix())
.value(db.conversation.notification, notify_setting)
.value(db.conversation.send_typing, send_typing)
.value(db.conversation.send_marker, send_marker);
@ -176,7 +181,9 @@ public class Conversation : Object {
case "nickname":
update.set(db.conversation.resource, nickname); break;
case "active":
update.set(db.conversation.active, active); break;
update.set(db.conversation.active, active);
update.set(db.conversation.active_last_changed, (long) new DateTime.now_utc().to_unix());
break;
case "last-active":
if (last_active != null) {
update.set(db.conversation.last_active, (long) last_active.to_unix());

View File

@ -188,7 +188,7 @@ public class ChatInteraction : StreamInteractionModule, Object {
}
public override async bool run(Entities.Message message, Xmpp.MessageStanza stanza, Conversation conversation) {
if (Xep.MessageArchiveManagement.MessageFlag.get_flag(stanza) != null) return false;
if (Xmpp.MessageArchiveManagement.MessageFlag.get_flag(stanza) != null) return false;
ChatInteraction outer = stream_interactor.get_module(ChatInteraction.IDENTITY);
outer.send_delivery_receipt(message, stanza, conversation);

View File

@ -176,7 +176,7 @@ public class ConversationManager : StreamInteractionModule, Object {
conversation.last_active = message.time;
if (stanza != null) {
bool is_mam_message = Xep.MessageArchiveManagement.MessageFlag.get_flag(stanza) != null;
bool is_mam_message = Xmpp.MessageArchiveManagement.MessageFlag.get_flag(stanza) != null;
bool is_recent = message.time.compare(new DateTime.now_utc().add_days(-3)) > 0;
if (is_mam_message && !is_recent) return false;
}

View File

@ -7,7 +7,7 @@ using Dino.Entities;
namespace Dino {
public class Database : Qlite.Database {
private const int VERSION = 22;
private const int VERSION = 23;
public class AccountTable : Table {
public Column<int> id = new Column.Integer("id") { primary_key = true, auto_increment = true };
@ -193,6 +193,7 @@ public class Database : Qlite.Database {
public Column<int> jid_id = new Column.Integer("jid_id") { not_null = true };
public Column<string> resource = new Column.Text("resource") { min_version=1 };
public Column<bool> active = new Column.BoolInt("active");
public Column<long> active_last_changed = new Column.Integer("active_last_changed") { not_null=true, default="0", min_version=23 };
public Column<long> last_active = new Column.Long("last_active");
public Column<int> type_ = new Column.Integer("type");
public Column<int> encryption = new Column.Integer("encryption");
@ -204,7 +205,7 @@ public class Database : Qlite.Database {
internal ConversationTable(Database db) {
base(db, "conversation");
init({id, account_id, jid_id, resource, active, last_active, type_, encryption, read_up_to, read_up_to_item, notification, send_typing, send_marker});
init({id, account_id, jid_id, resource, active, active_last_changed, last_active, type_, encryption, read_up_to, read_up_to_item, notification, send_typing, send_marker});
}
}
@ -263,15 +264,16 @@ public class Database : Qlite.Database {
public class MamCatchupTable : Table {
public Column<int> id = new Column.Integer("id") { primary_key = true, auto_increment = true };
public Column<int> account_id = new Column.Integer("account_id") { not_null = true };
public Column<bool> from_end = new Column.BoolInt("from_end");
public Column<string> from_id = new Column.Text("from_id");
public Column<string> server_jid = new Column.Text("server_jid") { not_null = true };
public Column<string> from_id = new Column.Text("from_id") { not_null = true };
public Column<long> from_time = new Column.Long("from_time") { not_null = true };
public Column<string> to_id = new Column.Text("to_id");
public Column<bool> from_end = new Column.BoolInt("from_end") { not_null = true };
public Column<string> to_id = new Column.Text("to_id") { not_null = true };
public Column<long> to_time = new Column.Long("to_time") { not_null = true };
internal MamCatchupTable(Database db) {
base(db, "mam_catchup");
init({id, account_id, from_end, from_id, from_time, to_id, to_time});
init({id, account_id, server_jid, from_end, from_id, from_time, to_id, to_time});
}
}
@ -474,6 +476,25 @@ public class Database : Qlite.Database {
// FROM call2");
// exec("DROP TABLE call2");
}
if (oldVersion < 23) {
try {
exec("ALTER TABLE mam_catchup RENAME TO mam_catchup2");
mam_catchup.create_table_at_version(VERSION);
exec("""INSERT INTO mam_catchup (id, account_id, server_jid, from_id, from_time, from_end, to_id, to_time)
SELECT mam_catchup2.id, account_id, bare_jid, ifnull(from_id, ""), from_time, ifnull(from_end, 0), ifnull(to_id, ""), to_time
FROM mam_catchup2 JOIN account ON mam_catchup2.account_id=account.id""");
exec("DROP TABLE mam_catchup2");
} catch (Error e) {
error("Failed to upgrade to database version 23 (mam_catchup): %s", e.message);
}
try {
long active_last_updated = (long) new DateTime.now_utc().to_unix();
exec(@"UPDATE conversation SET active_last_changed=$active_last_updated WHERE active_last_changed=0");
} catch (Error e) {
error("Failed to upgrade to database version 23 (conversation): %s", e.message);
}
}
}
public ArrayList<Account> get_accounts() {

View File

@ -0,0 +1,557 @@
using Gee;
using Xmpp;
using Xmpp.Xep;
using Dino.Entities;
using Qlite;
public class Dino.HistorySync {
private StreamInteractor stream_interactor;
private Database db;
public HashMap<Account, HashMap<Jid, int>> current_catchup_id = new HashMap<Account, HashMap<Jid, int>>(Account.hash_func, Account.equals_func);
public HashMap<Account, HashMap<string, DateTime>> mam_times = new HashMap<Account, HashMap<string, DateTime>>();
public HashMap<string, int> hitted_range = new HashMap<string, int>();
// Server ID of the latest message of the previous segment
public HashMap<Account, string> catchup_until_id = new HashMap<Account, string>(Account.hash_func, Account.equals_func);
// Time of the latest message of the previous segment
public HashMap<Account, DateTime> catchup_until_time = new HashMap<Account, DateTime>(Account.hash_func, Account.equals_func);
private HashMap<string, Gee.List<Xmpp.MessageStanza>> stanzas = new HashMap<string, Gee.List<Xmpp.MessageStanza>>();
public class HistorySync(Database db, StreamInteractor stream_interactor) {
this.stream_interactor = stream_interactor;
this.db = db;
stream_interactor.account_added.connect(on_account_added);
stream_interactor.connection_manager.stream_opened.connect((account, stream) => {
debug("MAM: [%s] Reset catchup_id", account.bare_jid.to_string());
current_catchup_id.unset(account);
});
}
public bool process(Account account, Xmpp.MessageStanza message_stanza) {
var mam_flag = Xmpp.MessageArchiveManagement.MessageFlag.get_flag(message_stanza);
if (mam_flag != null) {
process_mam_message(account, message_stanza, mam_flag);
return true;
} else {
update_latest_db_range(account, message_stanza);
return false;
}
}
public void update_latest_db_range(Account account, Xmpp.MessageStanza message_stanza) {
Jid mam_server = stream_interactor.get_module(MucManager.IDENTITY).might_be_groupchat(message_stanza.from, account) ? message_stanza.from.bare_jid : account.bare_jid;
if (!current_catchup_id.has_key(account) || !current_catchup_id[account].has_key(mam_server)) return;
string? stanza_id = UniqueStableStanzaIDs.get_stanza_id(message_stanza, mam_server);
if (stanza_id == null) return;
db.mam_catchup.update()
.with(db.mam_catchup.id, "=", current_catchup_id[account][mam_server])
.set(db.mam_catchup.to_time, (long)new DateTime.now_utc().to_unix())
.set(db.mam_catchup.to_id, stanza_id)
.perform();
}
public void process_mam_message(Account account, Xmpp.MessageStanza message_stanza, Xmpp.MessageArchiveManagement.MessageFlag mam_flag) {
Jid mam_server = mam_flag.sender_jid;
Jid message_author = message_stanza.from;
// MUC servers may only send MAM messages from that MUC
bool is_muc_mam = stream_interactor.get_module(MucManager.IDENTITY).might_be_groupchat(mam_server, account) &&
message_author.equals_bare(mam_server);
bool from_our_server = mam_server.equals_bare(account.bare_jid);
if (!is_muc_mam && !from_our_server) {
warning("Received alleged MAM message from %s, ignoring", mam_server.to_string());
return;
}
if (!stanzas.has_key(mam_flag.query_id)) stanzas[mam_flag.query_id] = new ArrayList<Xmpp.MessageStanza>();
stanzas[mam_flag.query_id].add(message_stanza);
print(@"[$(message_stanza.from)] qid $(mam_flag.query_id) time $(mam_flag.server_time) $(mam_flag.mam_id) $(message_stanza.body ?? "[none]")\n");
}
private void on_unprocessed_message(Account account, XmppStream stream, MessageStanza message) {
// Check that it's a legit MAM server
bool is_muc_mam = stream_interactor.get_module(MucManager.IDENTITY).might_be_groupchat(message.from, account);
bool from_our_server = message.from.equals_bare(account.bare_jid);
if (!is_muc_mam && !from_our_server) return;
// Get the server time of the message and store it in `mam_times`
Xmpp.MessageArchiveManagement.Flag? mam_flag = stream != null ? stream.get_flag(Xmpp.MessageArchiveManagement.Flag.IDENTITY) : null;
if (mam_flag == null) return;
string? id = message.stanza.get_deep_attribute(mam_flag.ns_ver + ":result", "id");
if (id == null) return;
StanzaNode? delay_node = message.stanza.get_deep_subnode(mam_flag.ns_ver + ":result", StanzaForwarding.NS_URI + ":forwarded", DelayedDelivery.NS_URI + ":delay");
if (delay_node == null) {
warning("MAM result did not contain delayed time %s", message.stanza.to_string());
return;
}
DateTime? time = DelayedDelivery.get_time_for_node(delay_node);
if (time == null) return;
mam_times[account][id] = time;
// Check if this is the target message
string? query_id = message.stanza.get_deep_attribute(mam_flag.ns_ver + ":result", mam_flag.ns_ver + ":queryid");
if (query_id != null && id == catchup_until_id[account]) {
debug("MAM: [%s] Hitted range (id) %s", account.bare_jid.to_string(), id);
hitted_range[query_id] = -2;
}
}
public void on_server_id_duplicate(Account account, Xmpp.MessageStanza message_stanza, Entities.Message message) {
Xmpp.MessageArchiveManagement.MessageFlag? mam_flag = Xmpp.MessageArchiveManagement.MessageFlag.get_flag(message_stanza);
if (mam_flag == null) return;
// debug(@"MAM: [%s] Hitted range duplicate server id. id %s qid %s", account.bare_jid.to_string(), message.server_id, mam_flag.query_id);
if (catchup_until_time.has_key(account) && mam_flag.server_time.compare(catchup_until_time[account]) < 0) {
hitted_range[mam_flag.query_id] = -1;
// debug(@"MAM: [%s] In range (time) %s < %s", account.bare_jid.to_string(), mam_flag.server_time.to_string(), catchup_until_time[account].to_string());
}
}
public async void fetch_everything(Account account, Jid mam_server, DateTime until_earliest_time = new DateTime.from_unix_utc(0)) {
print(@"Fetch everything for $(mam_server) %s\n".printf(until_earliest_time != null ? @"(until $until_earliest_time)" : ""));
RowOption latest_row_opt = db.mam_catchup.select()
.with(db.mam_catchup.account_id, "=", account.id)
.with(db.mam_catchup.server_jid, "=", mam_server.to_string())
.with(db.mam_catchup.to_time, ">=", (long) until_earliest_time.to_unix())
.order_by(db.mam_catchup.to_time, "DESC")
.single().row();
Row? latest_row = latest_row_opt.is_present() ? latest_row_opt.inner : null;
Row? new_row = yield fetch_latest_page(account, mam_server, latest_row, until_earliest_time);
if (new_row != null) {
current_catchup_id[account][mam_server] = new_row[db.mam_catchup.id];
} else if (latest_row != null) {
current_catchup_id[account][mam_server] = latest_row[db.mam_catchup.id];
}
// Set the previous and current row
print(@"$(new_row == null) $(latest_row == null)\n");
Row? previous_row = null;
Row? current_row = null;
if (new_row != null) {
print(@"Fetch everything $(mam_server) a\n");
current_row = new_row;
previous_row = latest_row;
} else if (latest_row != null) {
print(@"Fetch everything $(mam_server) b\n");
current_row = latest_row;
RowOption previous_row_opt = db.mam_catchup.select()
.with(db.mam_catchup.account_id, "=", account.id)
.with(db.mam_catchup.server_jid, "=", mam_server.to_string())
.with(db.mam_catchup.to_time, "<", current_row[db.mam_catchup.from_time])
.with(db.mam_catchup.to_time, ">=", (long) until_earliest_time.to_unix())
.order_by(db.mam_catchup.to_time, "DESC")
.single().row();
previous_row = previous_row_opt.is_present() ? previous_row_opt.inner : null;
}
print(@"Fetch everything $(mam_server) c $(current_row == null) $(previous_row == null)\n");
// Fetch messages between two db ranges and merge them
while (current_row != null && previous_row != null) {
if (current_row[db.mam_catchup.from_end]) return;
print("FETCH BETWEEN RANGES\n");
current_row = yield fetch_between_ranges(account, mam_server, previous_row, current_row);
if (current_row == null) return;
RowOption previous_row_opt = db.mam_catchup.select()
.with(db.mam_catchup.account_id, "=", account.id)
.with(db.mam_catchup.server_jid, "=", mam_server.to_string())
.with(db.mam_catchup.to_time, "<", current_row[db.mam_catchup.from_time])
.with(db.mam_catchup.to_time, ">=", (long) until_earliest_time.to_unix())
.order_by(db.mam_catchup.to_time, "DESC")
.single().row();
previous_row = previous_row_opt.is_present() ? previous_row_opt.inner : null;
}
// We're at the earliest range. Try to expand it even further back.
if (current_row == null || current_row[db.mam_catchup.from_end]) return;
// We don't want to fetch before the earliest range over and over again in MUCs if it's after until_earliest_time.
// For now, don't query if we are within a week of until_earliest_time
if (until_earliest_time != null &&
current_row[db.mam_catchup.from_time] > until_earliest_time.add(-TimeSpan.DAY * 7).to_unix()) return;
print("FETCH BEFORE RANGE\n");
yield fetch_before_range(account, mam_server, current_row, until_earliest_time);
}
// Fetches the latest page (up to previous db row). Extends the previous db row if it was reached, creates a new row otherwise.
public async Row? fetch_latest_page(Account account, Jid mam_server, Row? latest_row, DateTime? until_earliest_time) {
debug("MAM: [%s | %s] Fetching latest page", mam_server.to_string(), mam_server.to_string());
int latest_row_id = -1;
DateTime latest_message_time = until_earliest_time;
string? latest_message_id = null;
if (latest_row != null) {
latest_row_id = latest_row[db.mam_catchup.id];
latest_message_time = (new DateTime.from_unix_utc(latest_row[db.mam_catchup.to_time])).add_minutes(-5);
print(@"latest msg time $latest_message_time\n");
latest_message_id = latest_row[db.mam_catchup.to_id];
// Make sure we only fetch to until_earliest_time if latest_message_time is further back
if (until_earliest_time != null && latest_message_time.compare(until_earliest_time) < 0) {
latest_message_time = until_earliest_time.add_minutes(-5);
latest_message_id = null;
}
}
var query_params = new Xmpp.MessageArchiveManagement.V2.MamQueryParams.query_latest(mam_server, latest_message_time, latest_message_id);
PageRequestResult page_result = yield get_mam_page(account, query_params, null);
if (page_result.page_result == PageResult.Error || page_result.page_result == PageResult.Duplicate) {
debug("MAM [%s | %s] Failed fetching latest page %s", mam_server.to_string(), mam_server.to_string(), page_result.page_result.to_string());
return null;
}
print(@"MAM result: $(page_result.page_result))\n");
// Catchup finished within first page. Update latest db entry.
if (page_result.page_result in new PageResult[] { PageResult.TargetReached, PageResult.NoMoreMessages } && latest_row_id != -1) {
if (page_result.stanzas == null || page_result.stanzas.is_empty) return null;
string first_mam_id = page_result.query_result.first;
long first_mam_time = (long) mam_times[account][first_mam_id].to_unix();
print(@"Updating $mam_server to $first_mam_time, $first_mam_id\n");
var query = db.mam_catchup.update()
.with(db.mam_catchup.id, "=", latest_row_id)
.set(db.mam_catchup.to_time, first_mam_time)
.set(db.mam_catchup.to_id, first_mam_id);
if (page_result.page_result == PageResult.NoMoreMessages) {
// If the server doesn't have more messages, store that this range is at its end.
query.set(db.mam_catchup.from_end, true);
}
query.perform();
return null;
}
if (page_result.query_result.first == null || page_result.query_result.last == null) {
print(@"from/to id null\n");
return null;
}
// Either we need to fetch more pages or this is the first db entry ever
debug("MAM: [%s | %s] Creating new db range for latest page", mam_server.to_string(), mam_server.to_string());
string from_id = page_result.query_result.first;
string to_id = page_result.query_result.last;
if (!mam_times[account].has_key(from_id) || !mam_times[account].has_key(to_id)) {
print(@"Missing from/to id $from_id $to_id\n");
return null;
}
long from_time = (long) mam_times[account][from_id].to_unix();
long to_time = (long) mam_times[account][to_id].to_unix();
int new_row_id = (int) db.mam_catchup.insert()
.value(db.mam_catchup.account_id, account.id)
.value(db.mam_catchup.server_jid, mam_server.to_string())
.value(db.mam_catchup.from_id, from_id)
.value(db.mam_catchup.from_time, from_time)
.value(db.mam_catchup.from_end, false)
.value(db.mam_catchup.to_id, to_id)
.value(db.mam_catchup.to_time, to_time)
.perform();
return db.mam_catchup.select().with(db.mam_catchup.id, "=", new_row_id).single().row().inner;
}
/** Fetches messages between the end of `earlier_range` and start of `later_range`
** Merges the `earlier_range` db row into the `later_range` db row.
** @return The resulting range comprising `earlier_range`, `later_rage`, and everything in between. null if fetching/merge failed.
**/
private async Row? fetch_between_ranges(Account account, Jid mam_server, Row earlier_range, Row later_range) {
int later_range_id = (int) later_range[db.mam_catchup.id];
DateTime earliest_time = new DateTime.from_unix_utc(earlier_range[db.mam_catchup.to_time]);
DateTime latest_time = new DateTime.from_unix_utc(later_range[db.mam_catchup.from_time]);
debug("MAM [%s | %s] Fetching between %s (%s) and %s (%s)", mam_server.to_string(), mam_server.to_string(), earliest_time.to_string(), earlier_range[db.mam_catchup.to_id], latest_time.to_string(), later_range[db.mam_catchup.from_id]);
var query_params = new Xmpp.MessageArchiveManagement.V2.MamQueryParams.query_between(mam_server,
earliest_time, earlier_range[db.mam_catchup.to_id],
latest_time, later_range[db.mam_catchup.from_id]);
print("fetch between ranges\n");
PageRequestResult page_result = yield fetch_query(account, query_params, later_range_id);
print(@"page result null? $(page_result == null)\n");
if (page_result.page_result == PageResult.TargetReached) {
debug("MAM [%s | %s] Merging range %i into %i", mam_server.to_string(), mam_server.to_string(), earlier_range[db.mam_catchup.id], later_range_id);
// Merge earlier range into later one.
db.mam_catchup.update()
.with(db.mam_catchup.id, "=", later_range_id)
.set(db.mam_catchup.from_time, earlier_range[db.mam_catchup.from_time])
.set(db.mam_catchup.from_id, earlier_range[db.mam_catchup.from_id])
.set(db.mam_catchup.from_end, earlier_range[db.mam_catchup.from_end])
.perform();
db.mam_catchup.delete().with(db.mam_catchup.id, "=", earlier_range[db.mam_catchup.id]).perform();
// Return the updated version of the later range
return db.mam_catchup.select().with(db.mam_catchup.id, "=", later_range_id).single().row().inner;
}
return null;
}
private async void fetch_before_range(Account account, Jid mam_server, Row range, DateTime? until_earliest_time) {
DateTime latest_time = new DateTime.from_unix_utc(range[db.mam_catchup.from_time]);
string latest_id = range[db.mam_catchup.from_id];
Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params;
if (until_earliest_time == null) {
query_params = new Xmpp.MessageArchiveManagement.V2.MamQueryParams.query_before(mam_server, latest_time, latest_id);
} else {
query_params = new Xmpp.MessageArchiveManagement.V2.MamQueryParams.query_between(
mam_server,
until_earliest_time, null,
latest_time, latest_id
);
}
PageRequestResult page_result = yield fetch_query(account, query_params, range[db.mam_catchup.id]);
}
/**
* Iteratively fetches all pages returned for a query (until a PageResult other than MorePagesAvailable is returned)
* @return The last PageRequestResult result
**/
private async PageRequestResult fetch_query(Account account, Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params, int db_id) {
print("fetch query\n");
PageRequestResult? page_result = null;
do {
page_result = yield get_mam_page(account, query_params, page_result);
print(@"page result $(page_result.page_result) $(page_result.stanzas == null)\n");
if (page_result.page_result == PageResult.Error || page_result.stanzas == null) return page_result;
string last_mam_id = page_result.query_result.last;
long last_mam_time = (long)mam_times[account][last_mam_id].to_unix();
print(@"Updating $(query_params.mam_server) to $last_mam_time, $last_mam_id\n");
var query = db.mam_catchup.update()
.with(db.mam_catchup.id, "=", db_id)
.set(db.mam_catchup.from_time, last_mam_time)
.set(db.mam_catchup.from_id, last_mam_id);
if (page_result.page_result == PageResult.NoMoreMessages) {
// If the server doesn't have more messages, store that this range is at its end.
print("no more message\n");
query.set(db.mam_catchup.from_end, true);
}
query.perform();
} while (page_result.page_result == PageResult.MorePagesAvailable);
print(@"page result 2 $(page_result.page_result)\n");
return page_result;
}
enum PageResult {
MorePagesAvailable,
TargetReached,
NoMoreMessages,
Duplicate, // TODO additional boolean
Error
}
/**
* prev_page_result: null if this is the first page request
**/
private async PageRequestResult get_mam_page(Account account, Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params, PageRequestResult? prev_page_result) {
XmppStream stream = stream_interactor.get_stream(account);
Xmpp.MessageArchiveManagement.QueryResult query_result = null;
if (prev_page_result == null) {
query_result = yield Xmpp.MessageArchiveManagement.V2.query_archive(stream, query_params);
} else {
query_result = yield Xmpp.MessageArchiveManagement.V2.page_through_results(stream, query_params, prev_page_result.query_result);
}
return yield process_query_result(account, query_result, query_params.query_id, query_params.start_id);
}
private async PageRequestResult process_query_result(Account account, Xmpp.MessageArchiveManagement.QueryResult query_result, string query_id, string? after_id) {
PageResult page_result = PageResult.MorePagesAvailable;
if (query_result.malformed || query_result.error) {
print(@"$(query_result.malformed) $(query_result.error)\n");
page_result = PageResult.Error;
}
// We wait until all the messages from the page are processed (and we got the `mam_times` from them)
Idle.add(process_query_result.callback, Priority.LOW);
yield;
// We might have successfully reached the target or the server doesn't have all messages stored anymore
// If it's the former, we'll overwrite the value with PageResult.MorePagesAvailable below.
if (query_result.complete) {
page_result = PageResult.NoMoreMessages;
}
string selection = null;
string[] selection_args = {};
// Check the server id of all returned messages. Check if we've hit our target (from_id) or got a duplicate.
if (stanzas.has_key(query_id) && !stanzas[query_id].is_empty) {
print(@"$(stanzas.has_key(query_id)) $(!stanzas[query_id].is_empty) looking for $(after_id ?? "")\n");
foreach (Xmpp.MessageStanza message in stanzas[query_id]) {
Xmpp.MessageArchiveManagement.MessageFlag? mam_message_flag = Xmpp.MessageArchiveManagement.MessageFlag.get_flag(message);
if (mam_message_flag != null && mam_message_flag.mam_id != null) {
if (after_id != null && mam_message_flag.mam_id == after_id) {
// Successfully fetched the whole range
page_result = PageResult.TargetReached;
}
if (selection != null) selection += " OR ";
selection = @"$(db.message.server_id) = ?";
}
}
if (hitted_range.has_key(query_id)) {
// Message got filtered out by xmpp-vala, but succesfull range fetch nevertheless
page_result = PageResult.TargetReached;
}
int64 duplicates_found = db.message.select().where(selection, selection_args).count();
if (duplicates_found > 0) {
// We got a duplicate although we thought we have to catch up.
// There was a server bug where prosody would send all messages if it didn't know the after ID that was given
page_result = PageResult.Duplicate;
}
}
var res = new PageRequestResult() { stanzas=stanzas[query_id], page_result=page_result, query_result=query_result };
send_messages_back_into_pipeline(account, query_id);
return res;
}
private void send_messages_back_into_pipeline(Account account, string query_id) {
print(@"send_messages_back_into_pipeline $query_id\n");
if (!stanzas.has_key(query_id)) return;
foreach (Xmpp.MessageStanza message in stanzas[query_id]) {
stream_interactor.get_module(MessageProcessor.IDENTITY).run_pipeline_announce.begin(account, message);
}
stanzas.unset(query_id);
print(@"send_messages_back_into_pipeline $query_id done\n");
}
private void on_account_added(Account account) {
cleanup_db_ranges(db, account);
mam_times[account] = new HashMap<string, DateTime>();
XmppStream? stream_bak = null;
stream_interactor.module_manager.get_module(account, Xmpp.MessageArchiveManagement.Module.IDENTITY).feature_available.connect( (stream) => {
if (stream == stream_bak) return;
current_catchup_id[account] = new HashMap<Jid, int>(Jid.hash_func, Jid.equals_func);
stream_bak = stream;
debug("MAM: [%s] MAM available", account.bare_jid.to_string());
fetch_everything.begin(account, account.bare_jid);
});
stream_interactor.module_manager.get_module(account, Xmpp.MessageModule.IDENTITY).received_message_unprocessed.connect((stream, message) => {
on_unprocessed_message(account, stream, message);
});
}
public static void cleanup_db_ranges(Database db, Account account) {
var ranges = new HashMap<Jid, ArrayList<MamRange>>(Jid.hash_func, Jid.equals_func);
foreach (Row row in db.mam_catchup.select().with(db.mam_catchup.account_id, "=", account.id)) {
var mam_range = new MamRange();
mam_range.id = row[db.mam_catchup.id];
mam_range.server_jid = new Jid(row[db.mam_catchup.server_jid]);
mam_range.from_time = row[db.mam_catchup.from_time];
mam_range.from_id = row[db.mam_catchup.from_id];
mam_range.from_end = row[db.mam_catchup.from_end];
mam_range.to_time = row[db.mam_catchup.to_time];
mam_range.to_id = row[db.mam_catchup.to_id];
if (!ranges.has_key(mam_range.server_jid)) ranges[mam_range.server_jid] = new ArrayList<MamRange>();
ranges[mam_range.server_jid].add(mam_range);
}
var to_delete = new ArrayList<MamRange>();
foreach (Jid server_jid in ranges.keys) {
foreach (var range1 in ranges[server_jid]) {
if (to_delete.contains(range1)) continue;
foreach (MamRange range2 in ranges[server_jid]) {
print(@"$(account.bare_jid) | $(server_jid) | $(range1.from_time) - $(range1.to_time) vs $(range2.from_time) - $(range2.to_time)\n");
if (range1 == range2 || to_delete.contains(range2)) continue;
// Check if range2 is a subset of range1
// range1: #####################
// range2: ######
if (range1.from_time <= range2.from_time && range1.to_time >= range2.to_time) {
critical(@"MAM: Removing db range which is a subset of another one");
to_delete.add(range2);
continue;
}
// Check if range2 is an extension of range1 (towards earlier)
// range1: #####################
// range2: ###############
if (range1.from_time <= range2.from_time <= range1.to_time && range1.to_time < range2.to_time) {
critical(@"MAM: Removing db range that overlapped another one (towards earlier)");
db.mam_catchup.update()
.with(db.mam_catchup.id, "=", range1.id)
.set(db.mam_catchup.from_id, range2.to_id)
.set(db.mam_catchup.from_time, range2.to_time)
.set(db.mam_catchup.from_end, range2.from_end)
.perform();
to_delete.add(range2);
continue;
}
// Check if range2 is an extension of range1 (towards more current)
// range1: #####################
// range2: ###############
if (range1.from_time <= range2.from_time <= range1.to_time && range1.to_time < range2.to_time) {
critical(@"MAM: Removing db range that overlapped another one (towards more current)");
db.mam_catchup.update()
.with(db.mam_catchup.id, "=", range1.id)
.set(db.mam_catchup.to_id, range2.to_id)
.set(db.mam_catchup.to_time, range2.to_time)
.perform();
to_delete.add(range2);
continue;
}
}
}
}
foreach (MamRange row in to_delete) {
db.mam_catchup.delete().with(db.mam_catchup.id, "=", row.id).perform();
}
}
class MamRange {
public int id;
public Jid server_jid;
public long from_time;
public string from_id;
public bool from_end;
public long to_time;
public string to_id;
}
class PageRequestResult {
public Gee.List<MessageStanza> stanzas { get; set; }
public PageResult page_result { get; set; }
public Xmpp.MessageArchiveManagement.QueryResult query_result { get; set; }
}
}

View File

@ -18,15 +18,11 @@ public class MessageProcessor : StreamInteractionModule, Object {
public signal void message_sent_or_received(Entities.Message message, Conversation conversation);
public signal void history_synced(Account account);
public HistorySync history_sync;
public MessageListenerHolder received_pipeline = new MessageListenerHolder();
private StreamInteractor stream_interactor;
private Database db;
private HashMap<Account, int> current_catchup_id = new HashMap<Account, int>(Account.hash_func, Account.equals_func);
private HashMap<Account, HashMap<string, DateTime>> mam_times = new HashMap<Account, HashMap<string, DateTime>>();
public HashMap<string, int> hitted_range = new HashMap<string, int>();
public HashMap<Account, string> catchup_until_id = new HashMap<Account, string>(Account.hash_func, Account.equals_func);
public HashMap<Account, DateTime> catchup_until_time = new HashMap<Account, DateTime>(Account.hash_func, Account.equals_func);
public static void start(StreamInteractor stream_interactor, Database db) {
MessageProcessor m = new MessageProcessor(stream_interactor, db);
@ -36,6 +32,7 @@ public class MessageProcessor : StreamInteractionModule, Object {
private MessageProcessor(StreamInteractor stream_interactor, Database db) {
this.stream_interactor = stream_interactor;
this.db = db;
this.history_sync = new HistorySync(db, stream_interactor);
received_pipeline.connect(new DeduplicateMessageListener(this, db));
received_pipeline.connect(new FilterMessageListener());
@ -47,11 +44,6 @@ public class MessageProcessor : StreamInteractionModule, Object {
stream_interactor.stream_negotiated.connect(send_unsent_chat_messages);
stream_interactor.stream_resumed.connect(send_unsent_chat_messages);
stream_interactor.connection_manager.stream_opened.connect((account, stream) => {
debug("MAM: [%s] Reset catchup_id", account.bare_jid.to_string());
current_catchup_id.unset(account);
});
}
public Entities.Message send_text(string text, Conversation conversation) {
@ -106,43 +98,10 @@ public class MessageProcessor : StreamInteractionModule, Object {
}
private void on_account_added(Account account) {
mam_times[account] = new HashMap<string, DateTime>();
stream_interactor.module_manager.get_module(account, Xmpp.MessageModule.IDENTITY).received_message.connect( (stream, message) => {
on_message_received.begin(account, message);
});
XmppStream? stream_bak = null;
stream_interactor.module_manager.get_module(account, Xmpp.Xep.MessageArchiveManagement.Module.IDENTITY).feature_available.connect( (stream) => {
if (stream == stream_bak) return;
current_catchup_id.unset(account);
stream_bak = stream;
debug("MAM: [%s] MAM available", account.bare_jid.to_string());
do_mam_catchup.begin(account);
});
stream_interactor.module_manager.get_module(account, Xmpp.MessageModule.IDENTITY).received_message_unprocessed.connect((stream, message) => {
if (!message.from.equals(account.bare_jid)) return;
Xep.MessageArchiveManagement.Flag? mam_flag = stream != null ? stream.get_flag(Xep.MessageArchiveManagement.Flag.IDENTITY) : null;
if (mam_flag == null) return;
string? id = message.stanza.get_deep_attribute(mam_flag.ns_ver + ":result", "id");
if (id == null) return;
StanzaNode? delay_node = message.stanza.get_deep_subnode(mam_flag.ns_ver + ":result", "urn:xmpp:forward:0:forwarded", "urn:xmpp:delay:delay");
if (delay_node == null) {
warning("MAM result did not contain delayed time %s", message.stanza.to_string());
return;
}
DateTime? time = DelayedDelivery.get_time_for_node(delay_node);
if (time == null) return;
mam_times[account][id] = time;
string? query_id = message.stanza.get_deep_attribute(mam_flag.ns_ver + ":result", mam_flag.ns_ver + ":queryid");
if (query_id != null && id == catchup_until_id[account]) {
debug("MAM: [%s] Hitted range (id) %s", account.bare_jid.to_string(), id);
hitted_range[query_id] = -2;
}
});
stream_interactor.module_manager.get_module(account, Xmpp.MessageModule.IDENTITY).received_error.connect((stream, message_stanza, error_stanza) => {
Message? message = null;
@ -164,203 +123,20 @@ public class MessageProcessor : StreamInteractionModule, Object {
convert_sending_to_unsent_msgs(account);
}
private async void do_mam_catchup(Account account) {
debug("MAM: [%s] Start catchup", account.bare_jid.to_string());
string? earliest_id = null;
DateTime? earliest_time = null;
bool continue_sync = true;
while (continue_sync) {
continue_sync = false;
// Get previous row
var previous_qry = db.mam_catchup.select().with(db.mam_catchup.account_id, "=", account.id).order_by(db.mam_catchup.to_time, "DESC");
if (current_catchup_id.has_key(account)) {
previous_qry.with(db.mam_catchup.id, "!=", current_catchup_id[account]);
}
RowOption previous_row = previous_qry.single().row();
if (previous_row.is_present()) {
catchup_until_id[account] = previous_row[db.mam_catchup.to_id];
catchup_until_time[account] = (new DateTime.from_unix_utc(previous_row[db.mam_catchup.to_time])).add_minutes(-5);
debug("MAM: [%s] Previous entry exists", account.bare_jid.to_string());
} else {
catchup_until_id.unset(account);
catchup_until_time.unset(account);
}
string query_id = Xmpp.random_uuid();
yield get_mam_range(account, query_id, null, null, earliest_time, earliest_id);
if (!hitted_range.has_key(query_id)) {
debug("MAM: [%s] Set catchup end reached", account.bare_jid.to_string());
db.mam_catchup.update()
.set(db.mam_catchup.from_end, true)
.with(db.mam_catchup.id, "=", current_catchup_id[account])
.perform();
}
if (hitted_range.has_key(query_id)) {
if (merge_ranges(account, null)) {
RowOption current_row = db.mam_catchup.row_with(db.mam_catchup.id, current_catchup_id[account]);
bool range_from_complete = current_row[db.mam_catchup.from_end];
if (!range_from_complete) {
continue_sync = true;
earliest_id = current_row[db.mam_catchup.from_id];
earliest_time = (new DateTime.from_unix_utc(current_row[db.mam_catchup.from_time])).add_seconds(1);
}
}
}
}
}
/*
* Merges the row with `current_catchup_id` with the previous range (optional: with `earlier_id`)
* Changes `current_catchup_id` to the previous range
*/
private bool merge_ranges(Account account, int? earlier_id) {
RowOption current_row = db.mam_catchup.row_with(db.mam_catchup.id, current_catchup_id[account]);
RowOption previous_row = null;
if (earlier_id != null) {
previous_row = db.mam_catchup.row_with(db.mam_catchup.id, earlier_id);
} else {
previous_row = db.mam_catchup.select()
.with(db.mam_catchup.account_id, "=", account.id)
.with(db.mam_catchup.id, "!=", current_catchup_id[account])
.order_by(db.mam_catchup.to_time, "DESC").single().row();
}
if (!previous_row.is_present()) {
debug("MAM: [%s] Merging: No previous row", account.bare_jid.to_string());
return false;
}
var qry = db.mam_catchup.update().with(db.mam_catchup.id, "=", previous_row[db.mam_catchup.id]);
debug("MAM: [%s] Merging %ld-%ld with %ld- %ld", account.bare_jid.to_string(), previous_row[db.mam_catchup.from_time], previous_row[db.mam_catchup.to_time], current_row[db.mam_catchup.from_time], current_row[db.mam_catchup.to_time]);
if (current_row[db.mam_catchup.from_time] < previous_row[db.mam_catchup.from_time]) {
qry.set(db.mam_catchup.from_id, current_row[db.mam_catchup.from_id])
.set(db.mam_catchup.from_time, current_row[db.mam_catchup.from_time]);
}
if (current_row[db.mam_catchup.to_time] > previous_row[db.mam_catchup.to_time]) {
qry.set(db.mam_catchup.to_id, current_row[db.mam_catchup.to_id])
.set(db.mam_catchup.to_time, current_row[db.mam_catchup.to_time]);
}
qry.perform();
current_catchup_id[account] = previous_row[db.mam_catchup.id];
db.mam_catchup.delete().with(db.mam_catchup.id, "=", current_row[db.mam_catchup.id]).perform();
return true;
}
private async bool get_mam_range(Account account, string? query_id, DateTime? from_time, string? from_id, DateTime? to_time, string? to_id) {
debug("MAM: [%s] Get range %s - %s", account.bare_jid.to_string(), from_time != null ? from_time.to_string() : "", to_time != null ? to_time.to_string() : "");
XmppStream stream = stream_interactor.get_stream(account);
Iq.Stanza? iq = yield stream.get_module(Xep.MessageArchiveManagement.Module.IDENTITY).query_archive(stream, null, query_id, from_time, from_id, to_time, to_id);
if (iq == null) {
debug(@"MAM: [%s] IQ null", account.bare_jid.to_string());
return true;
}
if (iq.stanza.get_deep_string_content("urn:xmpp:mam:2:fin", "http://jabber.org/protocol/rsm" + ":set", "first") == null) {
return true;
}
while (iq != null) {
string? earliest_id = iq.stanza.get_deep_string_content("urn:xmpp:mam:2:fin", "http://jabber.org/protocol/rsm" + ":set", "first");
if (earliest_id == null) return true;
string? latest_id = iq.stanza.get_deep_string_content("urn:xmpp:mam:2:fin", "http://jabber.org/protocol/rsm" + ":set", "last");
// We wait until all the messages from the page are processed (and we got the `mam_times` from them)
Idle.add(get_mam_range.callback, Priority.LOW);
yield;
int wait_ms = 1000;
if (mam_times[account].has_key(earliest_id) && (current_catchup_id.has_key(account) || mam_times[account].has_key(latest_id))) {
debug("MAM: [%s] Update from_id %s", account.bare_jid.to_string(), earliest_id);
if (!current_catchup_id.has_key(account)) {
debug("MAM: [%s] We get our first MAM page", account.bare_jid.to_string());
current_catchup_id[account] = (int) db.mam_catchup.insert()
.value(db.mam_catchup.account_id, account.id)
.value(db.mam_catchup.from_id, earliest_id)
.value(db.mam_catchup.from_time, (long)mam_times[account][earliest_id].to_unix())
.value(db.mam_catchup.to_id, latest_id)
.value(db.mam_catchup.to_time, (long)mam_times[account][latest_id].to_unix())
.perform();
} else {
// Update existing id
db.mam_catchup.update()
.set(db.mam_catchup.from_id, earliest_id)
.set(db.mam_catchup.from_time, (long)mam_times[account][earliest_id].to_unix())
.with(db.mam_catchup.id, "=", current_catchup_id[account])
.perform();
}
TimeSpan catchup_time_ago = (new DateTime.now_utc()).difference(mam_times[account][earliest_id]);
if (catchup_time_ago > 14 * TimeSpan.DAY) {
wait_ms = 2000;
} else if (catchup_time_ago > 5 * TimeSpan.DAY) {
wait_ms = 1000;
} else if (catchup_time_ago > 2 * TimeSpan.DAY) {
wait_ms = 200;
} else if (catchup_time_ago > TimeSpan.DAY) {
wait_ms = 50;
} else {
wait_ms = 10;
}
} else {
warning("Didn't have time for MAM id; earliest_id:%s latest_id:%s", mam_times[account].has_key(earliest_id).to_string(), mam_times[account].has_key(latest_id).to_string());
}
mam_times[account] = new HashMap<string, DateTime>();
Timeout.add(wait_ms, () => {
if (hitted_range.has_key(query_id)) {
debug(@"MAM: [%s] Hitted contains key %s", account.bare_jid.to_string(), query_id);
iq = null;
Idle.add(get_mam_range.callback);
return false;
}
stream.get_module(Xep.MessageArchiveManagement.Module.IDENTITY).page_through_results.begin(stream, null, query_id, from_time, to_time, iq, (_, res) => {
iq = stream.get_module(Xep.MessageArchiveManagement.Module.IDENTITY).page_through_results.end(res);
Idle.add(get_mam_range.callback);
});
return false;
});
yield;
}
return false;
}
private async void on_message_received(Account account, Xmpp.MessageStanza message_stanza) {
// If it's a message from MAM, it's going to be processed by HistorySync which calls run_pipeline_announce later.
if (history_sync.process(account, message_stanza)) return;
run_pipeline_announce(account, message_stanza);
}
public async void run_pipeline_announce(Account account, Xmpp.MessageStanza message_stanza) {
Entities.Message message = yield parse_message_stanza(account, message_stanza);
Conversation? conversation = stream_interactor.get_module(ConversationManager.IDENTITY).get_conversation_for_message(message);
if (conversation == null) return;
// MAM state database update
Xep.MessageArchiveManagement.MessageFlag? mam_flag = Xep.MessageArchiveManagement.MessageFlag.get_flag(message_stanza);
if (mam_flag == null) {
if (current_catchup_id.has_key(account)) {
string? stanza_id = UniqueStableStanzaIDs.get_stanza_id(message_stanza, account.bare_jid);
if (stanza_id != null) {
db.mam_catchup.update()
.with(db.mam_catchup.id, "=", current_catchup_id[account])
.set(db.mam_catchup.to_time, (long)message.local_time.to_unix())
.set(db.mam_catchup.to_id, stanza_id)
.perform();
}
}
}
bool abort = yield received_pipeline.run(message, message_stanza, conversation);
if (abort) return;
@ -373,7 +149,7 @@ public class MessageProcessor : StreamInteractionModule, Object {
message_sent_or_received(message, conversation);
}
private async Entities.Message parse_message_stanza(Account account, Xmpp.MessageStanza message) {
public async Entities.Message parse_message_stanza(Account account, Xmpp.MessageStanza message) {
string? body = message.body;
if (body != null) body = body.strip();
Entities.Message new_message = new Entities.Message(body);
@ -393,20 +169,20 @@ public class MessageProcessor : StreamInteractionModule, Object {
new_message.ourpart = new_message.direction == Entities.Message.DIRECTION_SENT ? message.from : message.to;
XmppStream? stream = stream_interactor.get_stream(account);
Xep.MessageArchiveManagement.MessageFlag? mam_message_flag = Xep.MessageArchiveManagement.MessageFlag.get_flag(message);
Xep.MessageArchiveManagement.Flag? mam_flag = stream != null ? stream.get_flag(Xep.MessageArchiveManagement.Flag.IDENTITY) : null;
Xmpp.MessageArchiveManagement.MessageFlag? mam_message_flag = Xmpp.MessageArchiveManagement.MessageFlag.get_flag(message);
Xmpp.MessageArchiveManagement.Flag? mam_flag = stream != null ? stream.get_flag(Xmpp.MessageArchiveManagement.Flag.IDENTITY) : null;
EntityInfo entity_info = stream_interactor.get_module(EntityInfo.IDENTITY);
if (mam_message_flag != null && mam_flag != null && mam_flag.ns_ver == Xep.MessageArchiveManagement.NS_URI_2 && mam_message_flag.mam_id != null) {
if (mam_message_flag != null && mam_flag != null && mam_flag.ns_ver == Xmpp.MessageArchiveManagement.NS_URI_2 && mam_message_flag.mam_id != null) {
new_message.server_id = mam_message_flag.mam_id;
} else if (message.type_ == Xmpp.MessageStanza.TYPE_GROUPCHAT) {
bool server_supports_sid = (yield entity_info.has_feature(account, new_message.counterpart.bare_jid, Xep.UniqueStableStanzaIDs.NS_URI)) ||
(yield entity_info.has_feature(account, new_message.counterpart.bare_jid, Xep.MessageArchiveManagement.NS_URI_2));
(yield entity_info.has_feature(account, new_message.counterpart.bare_jid, Xmpp.MessageArchiveManagement.NS_URI_2));
if (server_supports_sid) {
new_message.server_id = Xep.UniqueStableStanzaIDs.get_stanza_id(message, new_message.counterpart.bare_jid);
}
} else if (message.type_ == Xmpp.MessageStanza.TYPE_CHAT) {
bool server_supports_sid = (yield entity_info.has_feature(account, account.bare_jid, Xep.UniqueStableStanzaIDs.NS_URI)) ||
(yield entity_info.has_feature(account, account.bare_jid, Xep.MessageArchiveManagement.NS_URI_2));
(yield entity_info.has_feature(account, account.bare_jid, Xmpp.MessageArchiveManagement.NS_URI_2));
if (server_supports_sid) {
new_message.server_id = Xep.UniqueStableStanzaIDs.get_stanza_id(message, account.bare_jid);
}
@ -474,7 +250,6 @@ public class MessageProcessor : StreamInteractionModule, Object {
public override async bool run(Entities.Message message, Xmpp.MessageStanza stanza, Conversation conversation) {
Account account = conversation.account;
Xep.MessageArchiveManagement.MessageFlag? mam_flag = Xep.MessageArchiveManagement.MessageFlag.get_flag(stanza);
// Deduplicate by server_id
if (message.server_id != null) {
@ -482,16 +257,12 @@ public class MessageProcessor : StreamInteractionModule, Object {
.with(db.message.server_id, "=", message.server_id)
.with(db.message.counterpart_id, "=", db.get_jid_id(message.counterpart))
.with(db.message.account_id, "=", account.id);
bool duplicate = builder.count() > 0;
if (duplicate && mam_flag != null) {
debug(@"MAM: [%s] Hitted range duplicate server id. id %s qid %s", account.bare_jid.to_string(), message.server_id, mam_flag.query_id);
if (outer.catchup_until_time.has_key(account) && mam_flag.server_time.compare(outer.catchup_until_time[account]) < 0) {
outer.hitted_range[mam_flag.query_id] = -1;
debug(@"MAM: [%s] In range (time) %s < %s", account.bare_jid.to_string(), mam_flag.server_time.to_string(), outer.catchup_until_time[account].to_string());
}
// If the message is a duplicate
if (builder.count() > 0) {
outer.history_sync.on_server_id_duplicate(account, stanza, message);
return true;
}
if (duplicate) return true;
}
// Deduplicate messages by uuid
@ -514,14 +285,7 @@ public class MessageProcessor : StreamInteractionModule, Object {
builder.with_null(db.message.our_resource);
}
}
RowOption row_opt = builder.single().row();
bool duplicate = row_opt.is_present();
if (duplicate && mam_flag != null && row_opt[db.message.server_id] == null &&
outer.catchup_until_time.has_key(account) && mam_flag.server_time.compare(outer.catchup_until_time[account]) > 0) {
outer.hitted_range[mam_flag.query_id] = -1;
debug(@"MAM: [%s] Hitted range duplicate message id. id %s qid %s", account.bare_jid.to_string(), message.stanza_id, mam_flag.query_id);
}
bool duplicate = builder.single().row().is_present();
return duplicate;
}
@ -608,9 +372,9 @@ public class MessageProcessor : StreamInteractionModule, Object {
}
public override async bool run(Entities.Message message, Xmpp.MessageStanza stanza, Conversation conversation) {
bool is_mam_message = Xep.MessageArchiveManagement.MessageFlag.get_flag(stanza) != null;
bool is_mam_message = Xmpp.MessageArchiveManagement.MessageFlag.get_flag(stanza) != null;
XmppStream? stream = stream_interactor.get_stream(conversation.account);
Xep.MessageArchiveManagement.Flag? mam_flag = stream != null ? stream.get_flag(Xep.MessageArchiveManagement.Flag.IDENTITY) : null;
Xmpp.MessageArchiveManagement.Flag? mam_flag = stream != null ? stream.get_flag(Xmpp.MessageArchiveManagement.Flag.IDENTITY) : null;
if (is_mam_message || (mam_flag != null && mam_flag.cought_up == true)) {
conversation.account.mam_earliest_synced = message.local_time;
}

View File

@ -57,7 +57,7 @@ public class ModuleManager {
module_map[account].add(new Xep.Bookmarks2.Module());
module_map[account].add(new Presence.Module());
module_map[account].add(new Xmpp.MessageModule());
module_map[account].add(new Xep.MessageArchiveManagement.Module());
module_map[account].add(new Xmpp.MessageArchiveManagement.Module());
module_map[account].add(new Xep.MessageCarbons.Module());
module_map[account].add(new Xep.Muc.Module());
module_map[account].add(new Xep.Pubsub.Module());

View File

@ -68,6 +68,15 @@ public class MucManager : StreamInteractionModule, Object {
if (last_message != null) history_since = last_message.time;
}
bool receive_history = true;
EntityInfo entity_info = stream_interactor.get_module(EntityInfo.IDENTITY);
bool can_do_mam = yield entity_info.has_feature(account, jid, Xmpp.MessageArchiveManagement.NS_URI_2);
print(@"$(jid) $can_do_mam\n");
if (can_do_mam) {
receive_history = false;
history_since = null;
}
if (!mucs_joining.has_key(account)) {
mucs_joining[account] = new HashSet<Jid>(Jid.hash_bare_func, Jid.equals_bare_func);
}
@ -78,7 +87,7 @@ public class MucManager : StreamInteractionModule, Object {
}
mucs_todo[account].add(jid.with_resource(nick_));
Muc.JoinResult? res = yield stream.get_module(Xep.Muc.Module.IDENTITY).enter(stream, jid.bare_jid, nick_, password, history_since, null);
Muc.JoinResult? res = yield stream.get_module(Xep.Muc.Module.IDENTITY).enter(stream, jid.bare_jid, nick_, password, history_since, receive_history, null);
mucs_joining[account].remove(jid);
@ -91,6 +100,18 @@ public class MucManager : StreamInteractionModule, Object {
Conversation joined_conversation = stream_interactor.get_module(ConversationManager.IDENTITY).create_conversation(jid, account, Conversation.Type.GROUPCHAT);
joined_conversation.nickname = nick;
stream_interactor.get_module(ConversationManager.IDENTITY).start_conversation(joined_conversation);
if (can_do_mam) {
if (conversation == null) {
// We never joined the conversation before, just fetch the latest MAM page
yield stream_interactor.get_module(MessageProcessor.IDENTITY).history_sync
.fetch_latest_page(account, jid.bare_jid, null, new DateTime.from_unix_utc(0));
} else {
// Fetch everything up to the last time the user actively joined
stream_interactor.get_module(MessageProcessor.IDENTITY).history_sync
.fetch_everything.begin(account, jid.bare_jid, conversation.active_last_changed);
}
}
} else if (res.muc_error != null) {
// Join failed
enter_errors[jid] = res.muc_error;

View File

@ -75,6 +75,7 @@ SOURCES
"src/module/xep/0047_in_band_bytestreams.vala"
"src/module/xep/0049_private_xml_storage.vala"
"src/module/xep/0059_result_set_management.vala"
"src/module/xep/0054_vcard/module.vala"
"src/module/xep/0060_pubsub.vala"
"src/module/xep/0065_socks5_bytestreams.vala"
@ -126,9 +127,11 @@ SOURCES
"src/module/xep/0261_jingle_in_band_bytestreams.vala"
"src/module/xep/0272_muji.vala"
"src/module/xep/0280_message_carbons.vala"
"src/module/xep/0297_stanza_forwarding.vala"
"src/module/xep/0298_coin.vala"
"src/module/xep/0308_last_message_correction.vala"
"src/module/xep/0313_message_archive_management.vala"
"src/module/xep/0313_2_message_archive_management.vala"
"src/module/xep/0333_chat_markers.vala"
"src/module/xep/0334_message_processing_hints.vala"
"src/module/xep/0353_jingle_message_initiation.vala"

View File

@ -81,7 +81,7 @@ public class Module : XmppStreamModule {
received_pipeline_listener = new ReceivedPipelineListener(this);
}
public async JoinResult? enter(XmppStream stream, Jid bare_jid, string nick, string? password, DateTime? history_since, StanzaNode? additional_node) {
public async JoinResult? enter(XmppStream stream, Jid bare_jid, string nick, string? password, DateTime? history_since, bool receive_history, StanzaNode? additional_node) {
try {
Presence.Stanza presence = new Presence.Stanza();
presence.to = bare_jid.with_resource(nick);
@ -90,10 +90,15 @@ public class Module : XmppStreamModule {
if (password != null) {
x_node.put_node(new StanzaNode.build("password", NS_URI).put_node(new StanzaNode.text(password)));
}
if (history_since != null) {
if (history_since != null || !receive_history) {
StanzaNode history_node = new StanzaNode.build("history", NS_URI);
history_node.set_attribute("since", DateTimeProfiles.to_datetime(history_since));
x_node.put_node(history_node);
if (history_since != null) {
history_node.set_attribute("since", DateTimeProfiles.to_datetime(history_since));
} else if (!receive_history) {
history_node.set_attribute("maxchars", "0");
}
}
presence.stanza.put_node(x_node);
@ -561,7 +566,7 @@ public class ReceivedPipelineListener : StanzaListener<MessageStanza> {
StanzaNode? reason_node = invite_node.get_subnode("reason", NS_URI_USER);
string? reason = null;
if (reason_node != null) reason = reason_node.get_string_content();
bool is_mam_message = Xep.MessageArchiveManagement.MessageFlag.get_flag(message) != null; // TODO
bool is_mam_message = Xmpp.MessageArchiveManagement.MessageFlag.get_flag(message) != null; // TODO
if (!is_mam_message) outer.invite_received(stream, message.from, from_jid, password, reason);
return true;
}

View File

@ -0,0 +1,30 @@
namespace Xmpp.ResultSetManagement {
public const string NS_URI = "http://jabber.org/protocol/rsm";
public class ResultSetParameters {
string? before { get; set; }
string? after { get; set; }
int? max { get; set; }
}
public StanzaNode create_set_rsm_node_before(string? before_id) {
var max_node = (new StanzaNode.build("max", Xmpp.ResultSetManagement.NS_URI)).put_node(new StanzaNode.text("20"));
var node = (new StanzaNode.build("set", Xmpp.ResultSetManagement.NS_URI)).add_self_xmlns()
.put_node(max_node);
var before_node = new StanzaNode.build("before", Xmpp.ResultSetManagement.NS_URI);
if (before_id != null) before_node.put_node(new StanzaNode.text(before_id));
node.put_node(before_node);
return node;
}
public StanzaNode create_set_rsm_node_after(string after_id) {
var max_node = (new StanzaNode.build("max", Xmpp.ResultSetManagement.NS_URI)).put_node(new StanzaNode.text("20"));
var node = (new StanzaNode.build("set", Xmpp.ResultSetManagement.NS_URI)).add_self_xmlns()
.put_node(max_node);
var after_node = new StanzaNode.build("after", Xmpp.ResultSetManagement.NS_URI)
.put_node(new StanzaNode.text(after_id));
node.put_node(after_node);
return node;
}
}

View File

@ -1,6 +1,6 @@
namespace Xmpp.Xep.DelayedDelivery {
private const string NS_URI = "urn:xmpp:delay";
public const string NS_URI = "urn:xmpp:delay";
public static DateTime? get_time_for_node(StanzaNode node) {
string? time = node.get_attribute("stamp");

View File

@ -15,7 +15,7 @@ namespace Xmpp.Xep.Muji {
group_call.our_nick = "%08x".printf(Random.next_int());
debug(@"[%s] MUJI joining as %s", stream.get_flag(Bind.Flag.IDENTITY).my_jid.to_string(), group_call.our_nick);
Xep.Muc.JoinResult? result = yield stream.get_module(Muc.Module.IDENTITY).enter(stream, muc_jid, group_call.our_nick, null, null, initial_muji_node);
Xep.Muc.JoinResult? result = yield stream.get_module(Muc.Module.IDENTITY).enter(stream, muc_jid, group_call.our_nick, null, null, false, initial_muji_node);
if (result == null || result.nick == null) return null;
debug(@"[%s] MUJI joining as %s done", stream.get_flag(Bind.Flag.IDENTITY).my_jid.to_string(), group_call.our_nick);

View File

@ -0,0 +1,3 @@
namespace Xmpp.StanzaForwarding {
public const string NS_URI = "urn:xmpp:forward:0";
}

View File

@ -0,0 +1,80 @@
using Gee;
using Xmpp.Xep;
namespace Xmpp.MessageArchiveManagement.V2 {
public class MamQueryParams {
public bool use_ns2_extended = false;
public string query_id = Xmpp.random_uuid();
public Jid mam_server { get; set; }
public Jid? with { get; set; }
// "The 'start' field is used to filter out messages before a certain date/time."
public DateTime? start { get; set; }
// "the 'end' field is used to exclude from the results messages after a certain point in time"
public DateTime? end { get; set; }
public string? start_id { get; set; }
public string? end_id { get; set; }
public MamQueryParams.query_latest(Jid mam_server, DateTime? latest_known_time, string? latest_known_id) {
this.mam_server = mam_server;
this.start = latest_known_time;
this.start_id = latest_known_id;
}
public MamQueryParams.query_between(Jid mam_server,
DateTime? earliest_time, string? earliest_id,
DateTime? latest_time, string? latest_id) {
this.mam_server = mam_server;
this.start = earliest_time;
this.start_id = earliest_id;
this.end = latest_time;
this.end_id = latest_id;
}
public MamQueryParams.query_before(Jid mam_server, DateTime? earliest_time, string? earliest_id) {
this.mam_server = mam_server;
this.end = earliest_time;
this.end_id = earliest_id;
}
}
private StanzaNode create_base_query(XmppStream stream, MamQueryParams mam_params) {
var fields = new ArrayList<DataForms.DataForm.Field>();
if (mam_params.with != null) {
DataForms.DataForm.Field field = new DataForms.DataForm.Field() { var="with" };
field.set_value_string(mam_params.with.to_string());
fields.add(field);
}
if (mam_params.start != null) {
DataForms.DataForm.Field field = new DataForms.DataForm.Field() { var="start" };
field.set_value_string(DateTimeProfiles.to_datetime(mam_params.start));
fields.add(field);
}
if (mam_params.end != null) {
DataForms.DataForm.Field field = new DataForms.DataForm.Field() { var="end" };
field.set_value_string(DateTimeProfiles.to_datetime(mam_params.end));
fields.add(field);
}
return MessageArchiveManagement.create_base_query(stream, MessageArchiveManagement.NS_URI_2, mam_params.query_id, fields);
}
public async QueryResult query_archive(XmppStream stream, MamQueryParams mam_params) {
var query_node = create_base_query(stream, mam_params);
if (!mam_params.use_ns2_extended) {
query_node.put_node(ResultSetManagement.create_set_rsm_node_before(mam_params.end_id));
}
return yield MessageArchiveManagement.query_archive(stream, MessageArchiveManagement.NS_URI_2, mam_params.mam_server, query_node);
}
public async QueryResult page_through_results(XmppStream stream, MamQueryParams mam_params, QueryResult prev_result) {
var query_node = create_base_query(stream, mam_params);
query_node.put_node(ResultSetManagement.create_set_rsm_node_before(prev_result.first));
return yield MessageArchiveManagement.query_archive(stream, MessageArchiveManagement.NS_URI_2, mam_params.mam_server, query_node);
}
}

View File

@ -1,11 +1,18 @@
namespace Xmpp.Xep.MessageArchiveManagement {
using Gee;
using Xmpp.Xep;
namespace Xmpp.MessageArchiveManagement {
public const string NS_URI = "urn:xmpp:mam:2";
public const string NS_URI_2 = "urn:xmpp:mam:2";
public const string NS_URI_1 = "urn:xmpp:mam:1";
private static string NS_VER(XmppStream stream) {
return stream.get_flag(Flag.IDENTITY).ns_ver;
public class QueryResult {
public bool error { get; set; default=false; }
public bool malformed { get; set; default=false; }
public bool complete { get; set; default=false; }
public string first { get; set; }
public string last { get; set; }
}
public class Module : XmppStreamModule {
@ -15,54 +22,6 @@ public class Module : XmppStreamModule {
private ReceivedPipelineListener received_pipeline_listener = new ReceivedPipelineListener();
private StanzaNode crate_base_query(XmppStream stream, string? jid, string? queryid, DateTime? start, DateTime? end) {
DataForms.DataForm data_form = new DataForms.DataForm();
DataForms.DataForm.HiddenField form_type_field = new DataForms.DataForm.HiddenField() { var="FORM_TYPE" };
form_type_field.set_value_string(NS_VER(stream));
data_form.add_field(form_type_field);
if (jid != null) {
DataForms.DataForm.Field field = new DataForms.DataForm.Field() { var="with" };
field.set_value_string(jid);
data_form.add_field(field);
}
if (start != null) {
DataForms.DataForm.Field field = new DataForms.DataForm.Field() { var="start" };
field.set_value_string(DateTimeProfiles.to_datetime(start));
data_form.add_field(field);
}
if (end != null) {
DataForms.DataForm.Field field = new DataForms.DataForm.Field() { var="end" };
field.set_value_string(DateTimeProfiles.to_datetime(end));
data_form.add_field(field);
}
StanzaNode query_node = new StanzaNode.build("query", NS_VER(stream)).add_self_xmlns().put_node(data_form.get_submit_node());
if (queryid != null) {
query_node.put_attribute("queryid", queryid);
}
return query_node;
}
private StanzaNode create_set_rsm_node(string? before_id) {
var before_node = new StanzaNode.build("before", "http://jabber.org/protocol/rsm");
if (before_id != null) {
before_node.put_node(new StanzaNode.text(before_id));
}
var max_node = (new StanzaNode.build("max", "http://jabber.org/protocol/rsm")).put_node(new StanzaNode.text("20"));
return (new StanzaNode.build("set", "http://jabber.org/protocol/rsm")).add_self_xmlns()
.put_node(before_node)
.put_node(max_node);
}
public async Iq.Stanza? query_archive(XmppStream stream, string? jid, string? query_id, DateTime? start_time, string? start_id, DateTime? end_time, string? end_id) {
if (stream.get_flag(Flag.IDENTITY) == null) return null;
var query_node = crate_base_query(stream, jid, query_id, start_time, end_time);
query_node.put_node(create_set_rsm_node(end_id));
Iq.Stanza iq = new Iq.Stanza.set(query_node);
return yield stream.get_module(Iq.Module.IDENTITY).send_iq_async(stream, iq);
}
public override void attach(XmppStream stream) {
stream.get_module(MessageModule.IDENTITY).received_pipeline.connect(received_pipeline_listener);
stream.stream_negotiated.connect(query_availability);
@ -75,25 +34,6 @@ public class Module : XmppStreamModule {
public override string get_ns() { return NS_URI; }
public override string get_id() { return IDENTITY.id; }
public async Iq.Stanza? page_through_results(XmppStream stream, string? jid, string? query_id, DateTime? start_time, DateTime? end_time, Iq.Stanza iq) {
string? complete = iq.stanza.get_deep_attribute("urn:xmpp:mam:2:fin", "complete");
if (complete == "true") {
return null;
}
string? first = iq.stanza.get_deep_string_content(NS_VER(stream) + ":fin", "http://jabber.org/protocol/rsm" + ":set", "first");
if (first == null) {
return null;
}
var query_node = crate_base_query(stream, jid, query_id, start_time, end_time);
query_node.put_node(create_set_rsm_node(first));
Iq.Stanza paging_iq = new Iq.Stanza.set(query_node);
return yield stream.get_module(Iq.Module.IDENTITY).send_iq_async(stream, paging_iq);
}
private async void query_availability(XmppStream stream) {
Jid own_jid = stream.get_flag(Bind.Flag.IDENTITY).my_jid.bare_jid;
@ -113,6 +53,52 @@ public class Module : XmppStreamModule {
}
}
internal StanzaNode create_base_query(XmppStream stream, string ns, string? queryid, Gee.List<DataForms.DataForm.Field> fields) {
DataForms.DataForm data_form = new DataForms.DataForm();
DataForms.DataForm.HiddenField form_type_field = new DataForms.DataForm.HiddenField() { var="FORM_TYPE" };
form_type_field.set_value_string(NS_VER(stream));
data_form.add_field(form_type_field);
foreach (var field in fields) {
data_form.add_field(field);
}
StanzaNode query_node = new StanzaNode.build("query", NS_VER(stream)).add_self_xmlns().put_node(data_form.get_submit_node());
if (queryid != null) {
query_node.put_attribute("queryid", queryid);
}
return query_node;
}
internal async QueryResult query_archive(XmppStream stream, string ns, Jid? mam_server, StanzaNode query_node) {
var res = new QueryResult();
if (stream.get_flag(Flag.IDENTITY) == null) { res.error = true; return res; }
// Build and send query
Iq.Stanza iq = new Iq.Stanza.set(query_node) { to=mam_server };
print(@"OUT:\n$(iq.stanza.to_string())\n");
Iq.Stanza result_iq = yield stream.get_module(Iq.Module.IDENTITY).send_iq_async(stream, iq);
print(result_iq.stanza.to_string() + "\n");
// Parse the response IQ into a QueryResult.
StanzaNode? fin_node = result_iq.stanza.get_subnode("fin", ns);
if (fin_node == null) { print(@"$ns a1\n"); res.malformed = true; return res; }
StanzaNode? rsm_node = fin_node.get_subnode("set", Xmpp.ResultSetManagement.NS_URI);
if (rsm_node == null) { print("a2\n"); res.malformed = true; return res; }
res.first = rsm_node.get_deep_string_content("first");
res.last = rsm_node.get_deep_string_content("last");
if ((res.first == null) != (res.last == null)) { print("a3\n"); res.malformed = true; }
res.complete = fin_node.get_attribute_bool("complete", false, ns);
return res;
}
public class ReceivedPipelineListener : StanzaListener<MessageStanza> {
private string[] after_actions_const = {};
@ -123,19 +109,13 @@ public class ReceivedPipelineListener : StanzaListener<MessageStanza> {
public override async bool run(XmppStream stream, MessageStanza message) {
if (stream.get_flag(Flag.IDENTITY) == null) return false;
StanzaNode? message_node = message.stanza.get_deep_subnode(NS_VER(stream) + ":result", "urn:xmpp:forward:0:forwarded", Xmpp.NS_URI + ":message");
StanzaNode? message_node = message.stanza.get_deep_subnode(NS_VER(stream) + ":result", StanzaForwarding.NS_URI + ":forwarded", Xmpp.NS_URI + ":message");
if (message_node != null) {
// MAM messages must come from our server // TODO or a MUC server
if (!message.from.equals(stream.get_flag(Bind.Flag.IDENTITY).my_jid.bare_jid)) {
warning("Received alleged MAM message from %s, ignoring", message.from.to_string());
return true;
}
StanzaNode? forward_node = message.stanza.get_deep_subnode(NS_VER(stream) + ":result", "urn:xmpp:forward:0:forwarded", DelayedDelivery.NS_URI + ":delay");
StanzaNode? forward_node = message.stanza.get_deep_subnode(NS_VER(stream) + ":result", StanzaForwarding.NS_URI + ":forwarded", DelayedDelivery.NS_URI + ":delay");
DateTime? datetime = DelayedDelivery.get_time_for_node(forward_node);
string? mam_id = message.stanza.get_deep_attribute(NS_VER(stream) + ":result", NS_VER(stream) + ":id");
string? query_id = message.stanza.get_deep_attribute(NS_VER(stream) + ":result", NS_VER(stream) + ":queryid");
message.add_flag(new MessageFlag(datetime, mam_id, query_id));
message.add_flag(new MessageFlag(message.from, datetime, mam_id, query_id));
message.stanza = message_node;
message.rerun_parsing = true;
@ -160,11 +140,13 @@ public class Flag : XmppStreamFlag {
public class MessageFlag : Xmpp.MessageFlag {
public const string ID = "message_archive_management";
public Jid sender_jid { get; private set; }
public DateTime? server_time { get; private set; }
public string? mam_id { get; private set; }
public string? query_id { get; private set; }
public MessageFlag(DateTime? server_time, string? mam_id, string? query_id) {
public MessageFlag(Jid sender_jid, DateTime? server_time, string? mam_id, string? query_id) {
this.sender_jid = sender_jid;
this.server_time = server_time;
this.mam_id = mam_id;
this.query_id = query_id;
@ -176,4 +158,8 @@ public class MessageFlag : Xmpp.MessageFlag {
public override string get_id() { return ID; }
}
private static string NS_VER(XmppStream stream) {
return stream.get_flag(Flag.IDENTITY).ns_ver;
}
}

View File

@ -68,7 +68,7 @@ namespace Xmpp.Xep.CallInvites {
}
private void on_received_message(XmppStream stream, MessageStanza message) {
Xep.MessageArchiveManagement.MessageFlag? mam_flag = Xep.MessageArchiveManagement.MessageFlag.get_flag(message);
Xmpp.MessageArchiveManagement.MessageFlag? mam_flag = Xmpp.MessageArchiveManagement.MessageFlag.get_flag(message);
if (mam_flag != null) return;
StanzaNode? relevant_node = null;

View File

@ -53,7 +53,7 @@ namespace Xmpp.Xep.JingleMessageInitiation {
private void on_received_message(XmppStream stream, MessageStanza message) {
if (message.type_ == MessageStanza.TYPE_GROUPCHAT) return;
Xep.MessageArchiveManagement.MessageFlag? mam_flag = Xep.MessageArchiveManagement.MessageFlag.get_flag(message);
Xmpp.MessageArchiveManagement.MessageFlag? mam_flag = Xmpp.MessageArchiveManagement.MessageFlag.get_flag(message);
if (mam_flag != null) return;
StanzaNode? mi_node = null;