Use WeakMap for message caching

This commit is contained in:
fiaxh 2020-11-14 16:59:21 +01:00
parent b8d216a057
commit d0488401ce
3 changed files with 130 additions and 74 deletions

View File

@ -49,24 +49,12 @@ public class ContentItemStore : StreamInteractionModule, Object {
DateTime local_time = new DateTime.from_unix_utc(row[db.content_item.local_time]);
switch (provider) {
case 1:
RowOption row_option = db.message.select().with(db.message.id, "=", foreign_id)
.outer_join_with(db.message_correction, db.message_correction.message_id, db.message.id)
.row();
if (row_option.is_present()) {
Message? message = stream_interactor.get_module(MessageStorage.IDENTITY).get_message_by_id(foreign_id, conversation);
if (message == null) {
try {
message = new Message.from_row(db, row_option.inner);
} catch (InvalidJidError e) {
warning("Ignoring message with invalid Jid: %s", e.message);
}
}
if (message != null) {
var message_item = new MessageItem(message, conversation, row[db.content_item.id]);
message_item.display_time = time;
message_item.sort_time = local_time;
items.add(message_item);
}
Message? message = stream_interactor.get_module(MessageStorage.IDENTITY).get_message_by_id(foreign_id, conversation);
if (message != null) {
var message_item = new MessageItem(message, conversation, row[db.content_item.id]);
message_item.display_time = time;
message_item.sort_time = local_time;
items.add(message_item);
}
break;
case 2:

View File

@ -1,3 +1,4 @@
using Xmpp;
using Gee;
using Qlite;
@ -12,7 +13,12 @@ public class MessageStorage : StreamInteractionModule, Object {
private StreamInteractor stream_interactor;
private Database db;
private HashMap<Conversation, Gee.TreeSet<Message>> messages = new HashMap<Conversation, Gee.TreeSet<Message>>(Conversation.hash_func, Conversation.equals_func);
private WeakMap<int, Message> messages_by_db_id = new WeakMap<int, Message>();
private HashMap<Conversation, WeakMap<string, Message>> messages_by_stanza_id = new HashMap<Conversation, WeakMap<string, Message>>(Conversation.hash_func, Conversation.equals_func);
private HashMap<Conversation, WeakMap<string, Message>> messages_by_server_id = new HashMap<Conversation, WeakMap<string, Message>>(Conversation.hash_func, Conversation.equals_func);
// This is to keep the last 300 messages such that we don't have to recreate the newest ones all the time
private LinkedList<Message> message_refs = new LinkedList<Message>();
public static void start(StreamInteractor stream_interactor, Database db) {
MessageStorage m = new MessageStorage(stream_interactor, db);
@ -26,53 +32,48 @@ public class MessageStorage : StreamInteractionModule, Object {
public void add_message(Message message, Conversation conversation) {
message.persist(db);
init_conversation(conversation);
messages[conversation].add(message);
cache_message(message, conversation);
}
public Gee.List<Message> get_messages(Conversation conversation, int count = 50) {
init_conversation(conversation);
Gee.List<Message> ret = new ArrayList<Message>(Message.equals_func);
BidirIterator<Message> iter = messages[conversation].bidir_iterator();
iter.last();
if (messages[conversation].size > 0) {
do {
ret.insert(0, iter.get());
iter.previous();
} while (iter.has_previous() && ret.size < count);
var query = db.message.select()
.with(db.message.account_id, "=", conversation.account.id)
.with(db.message.counterpart_id, "=", db.get_jid_id(conversation.counterpart))
.with(db.message.type_, "=", (int) Util.get_message_type_for_conversation(conversation))
.order_by(db.message.local_time, "DESC")
.order_by(db.message.time, "DESC")
.outer_join_with(db.message_correction, db.message_correction.message_id, db.message.id)
.limit(count);
Gee.List<Message> ret = new LinkedList<Message>(Message.equals_func);
foreach (Row row in query) {
Message? message = messages_by_db_id[row[db.message.id]];
if (message == null) {
message = create_message_from_row(row, conversation);
}
ret.insert(0, message);
}
return ret;
}
public Message? get_last_message(Conversation conversation) {
init_conversation(conversation);
if (messages[conversation].size > 0) {
return messages[conversation].last();
Gee.List<Message> messages = get_messages(conversation, 1);
if (messages.size > 0) {
return messages[0];
}
return null;
}
public Gee.List<MessageItem> get_messages_before_message(Conversation? conversation, DateTime before, int id, int count = 20) {
// SortedSet<Message>? before = messages[conversation].head_set(message);
// if (before != null && before.size >= count) {
// Gee.List<Message> ret = new ArrayList<Message>(Message.equals_func);
// Iterator<Message> iter = before.iterator();
// iter.next();
// for (int from_index = before.size - count; iter.has_next() && from_index > 0; from_index--) iter.next();
// while(iter.has_next()) {
// Message m = iter.get();
// ret.add(m);
// iter.next();
// }
// return ret;
// } else {
Gee.List<Message> db_messages = db.get_messages(conversation.counterpart, conversation.account, Util.get_message_type_for_conversation(conversation), count, before, null, id);
Gee.List<MessageItem> ret = new ArrayList<MessageItem>();
foreach (Message message in db_messages) {
ret.add(new MessageItem(message, conversation, -1));
}
return ret;
// }
}
public Gee.List<MessageItem> get_messages_after_message(Conversation? conversation, DateTime after, int id, int count = 20) {
@ -85,43 +86,110 @@ public class MessageStorage : StreamInteractionModule, Object {
}
public Message? get_message_by_id(int id, Conversation conversation) {
init_conversation(conversation);
foreach (Message message in messages[conversation]) {
if (message.id == id) return message;
Message? message = messages_by_db_id[id];
if (message != null) {
return message;
}
return null;
RowOption row_option = db.message.select().with(db.message.id, "=", id)
.outer_join_with(db.message_correction, db.message_correction.message_id, db.message.id)
.row();
return create_message_from_row_opt(row_option, conversation);
}
public Message? get_message_by_stanza_id(string stanza_id, Conversation conversation) {
init_conversation(conversation);
foreach (Message message in messages[conversation]) {
if (message.stanza_id == stanza_id) return message;
if (messages_by_stanza_id.has_key(conversation)) {
Message? message = messages_by_stanza_id[conversation][stanza_id];
if (message != null) {
return message;
}
}
return null;
var query = db.message.select()
.with(db.message.account_id, "=", conversation.account.id)
.with(db.message.counterpart_id, "=", db.get_jid_id(conversation.counterpart))
.with(db.message.type_, "=", (int) Util.get_message_type_for_conversation(conversation))
.with(db.message.stanza_id, "=", stanza_id)
.order_by(db.message.local_time, "DESC")
.order_by(db.message.time, "DESC")
.outer_join_with(db.message_correction, db.message_correction.message_id, db.message.id);
if (conversation.counterpart.resourcepart == null) {
query.with_null(db.message.counterpart_resource);
} else {
query.with(db.message.counterpart_resource, "=", conversation.counterpart.resourcepart);
}
RowOption row_option = query.single().row();
return create_message_from_row_opt(row_option, conversation);
}
public Message? get_message_by_server_id(string server_id, Conversation conversation) {
init_conversation(conversation);
foreach (Message message in messages[conversation]) {
if (message.server_id == server_id) return message;
if (messages_by_server_id.has_key(conversation)) {
Message? message = messages_by_server_id[conversation][server_id];
if (message != null) {
return message;
}
}
var query = db.message.select()
.with(db.message.account_id, "=", conversation.account.id)
.with(db.message.counterpart_id, "=", db.get_jid_id(conversation.counterpart))
.with(db.message.type_, "=", (int) Util.get_message_type_for_conversation(conversation))
.with(db.message.server_id, "=", server_id)
.order_by(db.message.local_time, "DESC")
.order_by(db.message.time, "DESC")
.outer_join_with(db.message_correction, db.message_correction.message_id, db.message.id);
if (conversation.counterpart.resourcepart == null) {
query.with_null(db.message.counterpart_resource);
} else {
query.with(db.message.counterpart_resource, "=", conversation.counterpart.resourcepart);
}
RowOption row_option = query.single().row();
return create_message_from_row_opt(row_option, conversation);
}
private Message? create_message_from_row_opt(RowOption row_option, Conversation conversation) {
if (!row_option.is_present()) return null;
return create_message_from_row(row_option.inner, conversation);
}
private Message? create_message_from_row(Row row, Conversation conversation) {
try {
Message message = new Message.from_row(db, row);
cache_message(message, conversation);
return message;
} catch (InvalidJidError e) {
warning("Got message with invalid Jid: %s", e.message);
}
return null;
}
private void init_conversation(Conversation conversation) {
if (!messages.has_key(conversation)) {
messages[conversation] = new Gee.TreeSet<Message>((a, b) => {
int res = a.local_time.compare(b.local_time);
if (res == 0) {
res = a.time.compare(b.time);
}
if (res == 0) {
res = a.id - b.id > 0 ? 1 : -1;
}
return res;
});
Gee.List<Message> db_messages = db.get_messages(conversation.counterpart, conversation.account, Util.get_message_type_for_conversation(conversation), 50, null, null, -1);
messages[conversation].add_all(db_messages);
private void cache_message(Message message, Conversation conversation) {
messages_by_db_id[message.id] = message;
if (message.stanza_id != null) {
if (!messages_by_stanza_id.has_key(conversation)) {
messages_by_stanza_id[conversation] = new WeakMap<string, Message>();
}
messages_by_stanza_id[conversation][message.stanza_id] = message;
}
if (message.server_id != null) {
if (!messages_by_server_id.has_key(conversation)) {
messages_by_server_id[conversation] = new WeakMap<string, Message>();
}
messages_by_server_id[conversation][message.server_id] = message;
}
message_refs.insert(0, message);
if (message_refs.size > 300) {
message_refs.remove_at(message_refs.size - 1);
}
}
}

View File

@ -114,7 +114,7 @@ public class FileProvider : Dino.FileProvider, Object {
Conversation? conversation = stream_interactor.get_module(ConversationManager.IDENTITY).get_conversation(file_transfer.counterpart.bare_jid, file_transfer.account);
if (conversation == null) throw new FileReceiveError.GET_METADATA_FAILED("No conversation");
Message? message = dino_db.get_message_by_id(int.parse(file_transfer.info));
Message? message = stream_interactor.get_module(MessageStorage.IDENTITY).get_message_by_id(int.parse(file_transfer.info), conversation);
if (message == null) throw new FileReceiveError.GET_METADATA_FAILED("No message");
var file_meta = new HttpFileMeta();
@ -132,7 +132,7 @@ public class FileProvider : Dino.FileProvider, Object {
Conversation? conversation = stream_interactor.get_module(ConversationManager.IDENTITY).get_conversation(file_transfer.counterpart.bare_jid, file_transfer.account);
if (conversation == null) return null;
Message? message = dino_db.get_message_by_id(int.parse(file_transfer.info));
Message? message = stream_interactor.get_module(MessageStorage.IDENTITY).get_message_by_id(int.parse(file_transfer.info), conversation);
if (message == null) return null;
var receive_data = new HttpFileReceiveData();