mirror of
				https://github.com/element-hq/synapse.git
				synced 2025-10-31 00:02:00 -04:00 
			
		
		
		
	Add basic implementation of local device list changes
This commit is contained in:
		
							parent
							
								
									ba8e144554
								
							
						
					
					
						commit
						2367c5568c
					
				| @ -100,6 +100,7 @@ class TransactionQueue(object): | ||||
|         self.pending_failures_by_dest = {} | ||||
| 
 | ||||
|         self.last_device_stream_id_by_dest = {} | ||||
|         self.last_device_list_stream_id_by_dest = {} | ||||
| 
 | ||||
|         # HACK to get unique tx id | ||||
|         self._next_txn_id = int(self.clock.time_msec()) | ||||
| @ -356,7 +357,7 @@ class TransactionQueue(object): | ||||
|                     success = yield self._send_new_transaction( | ||||
|                         destination, pending_pdus, pending_edus, pending_failures, | ||||
|                         device_stream_id, | ||||
|                         should_delete_from_device_stream=bool(device_message_edus), | ||||
|                         includes_device_messages=bool(device_message_edus), | ||||
|                         limiter=limiter, | ||||
|                     ) | ||||
|                     if not success: | ||||
| @ -373,6 +374,8 @@ class TransactionQueue(object): | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def _get_new_device_messages(self, destination): | ||||
|         # TODO: Send appropriate device list messages | ||||
| 
 | ||||
|         last_device_stream_id = self.last_device_stream_id_by_dest.get(destination, 0) | ||||
|         to_device_stream_id = self.store.get_to_device_stream_token() | ||||
|         contents, stream_id = yield self.store.get_new_device_msgs_for_remote( | ||||
| @ -387,13 +390,27 @@ class TransactionQueue(object): | ||||
|             ) | ||||
|             for content in contents | ||||
|         ] | ||||
| 
 | ||||
|         last_device_list = self.last_device_list_stream_id_by_dest.get(destination, 0) | ||||
|         now_stream_id, results = yield self.store.get_devices_by_remote( | ||||
|             destination, last_device_list | ||||
|         ) | ||||
|         edus.extend( | ||||
|             Edu( | ||||
|                 origin=self.server_name, | ||||
|                 destination=destination, | ||||
|                 edu_type="m.device_list_update", | ||||
|                 content=content, | ||||
|             ) | ||||
|             for content in results | ||||
|         ) | ||||
|         defer.returnValue((edus, stream_id)) | ||||
| 
 | ||||
|     @measure_func("_send_new_transaction") | ||||
|     @defer.inlineCallbacks | ||||
|     def _send_new_transaction(self, destination, pending_pdus, pending_edus, | ||||
|                               pending_failures, device_stream_id, | ||||
|                               should_delete_from_device_stream, limiter): | ||||
|                               includes_device_messages, limiter): | ||||
| 
 | ||||
|         # Sort based on the order field | ||||
|         pending_pdus.sort(key=lambda t: t[1]) | ||||
| @ -506,7 +523,8 @@ class TransactionQueue(object): | ||||
|                 success = False | ||||
|             else: | ||||
|                 # Remove the acknowledged device messages from the database | ||||
|                 if should_delete_from_device_stream: | ||||
|                 # Only bother if we actually sent some device messages | ||||
|                 if includes_device_messages: | ||||
|                     yield self.store.delete_device_msgs_for_remote( | ||||
|                         destination, device_stream_id | ||||
|                     ) | ||||
|  | ||||
| @ -15,6 +15,7 @@ | ||||
| 
 | ||||
| from synapse.api import errors | ||||
| from synapse.util import stringutils | ||||
| from synapse.types import get_domain_from_id | ||||
| from twisted.internet import defer | ||||
| from ._base import BaseHandler | ||||
| 
 | ||||
| @ -27,6 +28,8 @@ class DeviceHandler(BaseHandler): | ||||
|     def __init__(self, hs): | ||||
|         super(DeviceHandler, self).__init__(hs) | ||||
| 
 | ||||
|         self.state = hs.get_state_handler() | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def check_device_registered(self, user_id, device_id, | ||||
|                                 initial_device_display_name=None): | ||||
| @ -45,29 +48,29 @@ class DeviceHandler(BaseHandler): | ||||
|             str: device id (generated if none was supplied) | ||||
|         """ | ||||
|         if device_id is not None: | ||||
|             yield self.store.store_device( | ||||
|             new_device = yield self.store.store_device( | ||||
|                 user_id=user_id, | ||||
|                 device_id=device_id, | ||||
|                 initial_device_display_name=initial_device_display_name, | ||||
|                 ignore_if_known=True, | ||||
|             ) | ||||
|             if new_device: | ||||
|                 yield self.notify_device_update(user_id, device_id) | ||||
|             defer.returnValue(device_id) | ||||
| 
 | ||||
|         # if the device id is not specified, we'll autogen one, but loop a few | ||||
|         # times in case of a clash. | ||||
|         attempts = 0 | ||||
|         while attempts < 5: | ||||
|             try: | ||||
|                 device_id = stringutils.random_string(10).upper() | ||||
|                 yield self.store.store_device( | ||||
|                     user_id=user_id, | ||||
|                     device_id=device_id, | ||||
|                     initial_device_display_name=initial_device_display_name, | ||||
|                     ignore_if_known=False, | ||||
|                 ) | ||||
|             device_id = stringutils.random_string(10).upper() | ||||
|             new_device = yield self.store.store_device( | ||||
|                 user_id=user_id, | ||||
|                 device_id=device_id, | ||||
|                 initial_device_display_name=initial_device_display_name, | ||||
|             ) | ||||
|             if new_device: | ||||
|                 yield self.notify_device_update(user_id, device_id) | ||||
|                 defer.returnValue(device_id) | ||||
|             except errors.StoreError: | ||||
|                 attempts += 1 | ||||
|             attempts += 1 | ||||
| 
 | ||||
|         raise errors.StoreError(500, "Couldn't generate a device ID.") | ||||
| 
 | ||||
| @ -147,6 +150,8 @@ class DeviceHandler(BaseHandler): | ||||
|             user_id=user_id, device_id=device_id | ||||
|         ) | ||||
| 
 | ||||
|         yield self.notify_device_update(user_id, device_id) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def update_device(self, user_id, device_id, content): | ||||
|         """ Update the given device | ||||
| @ -166,12 +171,48 @@ class DeviceHandler(BaseHandler): | ||||
|                 device_id, | ||||
|                 new_display_name=content.get("display_name") | ||||
|             ) | ||||
|             yield self.notify_device_update(user_id, device_id) | ||||
|         except errors.StoreError, e: | ||||
|             if e.code == 404: | ||||
|                 raise errors.NotFoundError() | ||||
|             else: | ||||
|                 raise | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def notify_device_update(self, user_id, device_id): | ||||
|         rooms = yield self.store.get_rooms_for_user(user_id) | ||||
|         room_ids = [r.room_id for r in rooms] | ||||
| 
 | ||||
|         hosts = set() | ||||
|         for room_id in room_ids: | ||||
|             users = yield self.state.get_current_user_in_room(room_id) | ||||
|             hosts.update(get_domain_from_id(u) for u in users) | ||||
|         hosts.discard(self.server_name) | ||||
| 
 | ||||
|         position = yield self.store.add_device_change_to_streams( | ||||
|             user_id, device_id, list(hosts) | ||||
|         ) | ||||
| 
 | ||||
|         yield self.notifier.on_new_event( | ||||
|             "device_list_key", position, rooms=room_ids, | ||||
|         ) | ||||
| 
 | ||||
|         for host in hosts: | ||||
|             self.federation.send_device_messages(host) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def get_device_list_changes(self, user_id, room_ids, from_key): | ||||
|         room_ids = frozenset(room_ids) | ||||
| 
 | ||||
|         user_ids_changed = set() | ||||
|         changed = yield self.store.get_user_whose_devices_changed(from_key) | ||||
|         for other_user_id in changed: | ||||
|             other_rooms = yield self.store.get_rooms_for_user(other_user_id) | ||||
|             if room_ids.intersection(e.room_id for e in other_rooms): | ||||
|                 user_ids_changed.add(other_user_id) | ||||
| 
 | ||||
|         defer.returnValue(user_ids_changed) | ||||
| 
 | ||||
| 
 | ||||
| def _update_device_from_client_ips(device, client_ips): | ||||
|     ip = client_ips.get((device["user_id"], device["device_id"]), {}) | ||||
|  | ||||
| @ -259,6 +259,7 @@ class E2eKeysHandler(object): | ||||
|                 user_id, device_id, time_now, | ||||
|                 encode_canonical_json(device_keys) | ||||
|             ) | ||||
|             yield self.device_handler.notify_device_update(user_id, device_id) | ||||
| 
 | ||||
|         one_time_keys = keys.get("one_time_keys", None) | ||||
|         if one_time_keys: | ||||
|  | ||||
| @ -115,6 +115,7 @@ class SyncResult(collections.namedtuple("SyncResult", [ | ||||
|     "invited",  # InvitedSyncResult for each invited room. | ||||
|     "archived",  # ArchivedSyncResult for each archived room. | ||||
|     "to_device",  # List of direct messages for the device. | ||||
|     "device_lists",  # List of user_ids whose devices have chanegd | ||||
| ])): | ||||
|     __slots__ = [] | ||||
| 
 | ||||
| @ -143,6 +144,7 @@ class SyncHandler(object): | ||||
|         self.clock = hs.get_clock() | ||||
|         self.response_cache = ResponseCache(hs) | ||||
|         self.state = hs.get_state_handler() | ||||
|         self.device_handler = hs.get_device_handler() | ||||
| 
 | ||||
|     def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, | ||||
|                                full_state=False): | ||||
| @ -544,6 +546,16 @@ class SyncHandler(object): | ||||
| 
 | ||||
|         yield self._generate_sync_entry_for_to_device(sync_result_builder) | ||||
| 
 | ||||
|         if since_token and since_token.device_list_key: | ||||
|             user_id = sync_config.user.to_string() | ||||
|             rooms = yield self.store.get_rooms_for_user(user_id) | ||||
|             joined_room_ids = set(r.room_id for r in rooms) | ||||
|             device_lists = yield self.device_handler.get_device_list_changes( | ||||
|                 user_id, joined_room_ids, since_token.device_list_key | ||||
|             ) | ||||
|         else: | ||||
|             device_lists = [] | ||||
| 
 | ||||
|         defer.returnValue(SyncResult( | ||||
|             presence=sync_result_builder.presence, | ||||
|             account_data=sync_result_builder.account_data, | ||||
| @ -551,6 +563,7 @@ class SyncHandler(object): | ||||
|             invited=sync_result_builder.invited, | ||||
|             archived=sync_result_builder.archived, | ||||
|             to_device=sync_result_builder.to_device, | ||||
|             device_lists=device_lists, | ||||
|             next_batch=sync_result_builder.now_token, | ||||
|         )) | ||||
| 
 | ||||
|  | ||||
| @ -170,12 +170,16 @@ class SyncRestServlet(RestServlet): | ||||
|         ) | ||||
| 
 | ||||
|         archived = self.encode_archived( | ||||
|             sync_result.archived, time_now, requester.access_token_id, filter.event_fields | ||||
|             sync_result.archived, time_now, requester.access_token_id, | ||||
|             filter.event_fields, | ||||
|         ) | ||||
| 
 | ||||
|         response_content = { | ||||
|             "account_data": {"events": sync_result.account_data}, | ||||
|             "to_device": {"events": sync_result.to_device}, | ||||
|             "device_lists": { | ||||
|                 "changed": list(sync_result.device_lists), | ||||
|             }, | ||||
|             "presence": self.encode_presence( | ||||
|                 sync_result.presence, time_now | ||||
|             ), | ||||
|  | ||||
| @ -116,6 +116,9 @@ class DataStore(RoomMemberStore, RoomStore, | ||||
|         self._public_room_id_gen = StreamIdGenerator( | ||||
|             db_conn, "public_room_list_stream", "stream_id" | ||||
|         ) | ||||
|         self._device_list_id_gen = StreamIdGenerator( | ||||
|             db_conn, "device_lists_stream", "stream_id", | ||||
|         ) | ||||
| 
 | ||||
|         self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id") | ||||
|         self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id") | ||||
| @ -210,6 +213,14 @@ class DataStore(RoomMemberStore, RoomStore, | ||||
|             prefilled_cache=device_outbox_prefill, | ||||
|         ) | ||||
| 
 | ||||
|         device_list_max = self._device_list_id_gen.get_current_token() | ||||
|         self._device_list_stream_cache = StreamChangeCache( | ||||
|             "DeviceListStreamChangeCache", device_list_max, | ||||
|         ) | ||||
|         self._device_list_federation_stream_cache = StreamChangeCache( | ||||
|             "DeviceListFederationStreamChangeCache", device_list_max, | ||||
|         ) | ||||
| 
 | ||||
|         cur = LoggingTransaction( | ||||
|             db_conn.cursor(), | ||||
|             name="_find_stream_orderings_for_times_txn", | ||||
|  | ||||
| @ -387,6 +387,10 @@ class SQLBaseStore(object): | ||||
|         Args: | ||||
|             table : string giving the table name | ||||
|             values : dict of new column names and values for them | ||||
| 
 | ||||
|         Returns: | ||||
|             bool: Whether the row was inserted or not. Only useful when | ||||
|             `or_ignore` is True | ||||
|         """ | ||||
|         try: | ||||
|             yield self.runInteraction( | ||||
| @ -398,6 +402,8 @@ class SQLBaseStore(object): | ||||
|             # a cursor after we receive an error from the db. | ||||
|             if not or_ignore: | ||||
|                 raise | ||||
|             defer.returnValue(False) | ||||
|         defer.returnValue(True) | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def _simple_insert_txn(txn, table, values): | ||||
|  | ||||
| @ -13,6 +13,7 @@ | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| import logging | ||||
| import ujson as json | ||||
| 
 | ||||
| from twisted.internet import defer | ||||
| 
 | ||||
| @ -33,17 +34,13 @@ class DeviceStore(SQLBaseStore): | ||||
|             user_id (str): id of user associated with the device | ||||
|             device_id (str): id of device | ||||
|             initial_device_display_name (str): initial displayname of the | ||||
|                device | ||||
|             ignore_if_known (bool): ignore integrity errors which mean the | ||||
|                device is already known | ||||
|                device. Ignored if device exists. | ||||
|         Returns: | ||||
|             defer.Deferred | ||||
|         Raises: | ||||
|             StoreError: if ignore_if_known is False and the device was already | ||||
|                known | ||||
|             defer.Deferred: boolean whether the device was inserted or an | ||||
|                 existing device existed with that ID. | ||||
|         """ | ||||
|         try: | ||||
|             yield self._simple_insert( | ||||
|             inserted = yield self._simple_insert( | ||||
|                 "devices", | ||||
|                 values={ | ||||
|                     "user_id": user_id, | ||||
| @ -51,8 +48,9 @@ class DeviceStore(SQLBaseStore): | ||||
|                     "display_name": initial_device_display_name | ||||
|                 }, | ||||
|                 desc="store_device", | ||||
|                 or_ignore=ignore_if_known, | ||||
|                 or_ignore=True, | ||||
|             ) | ||||
|             defer.returnValue(inserted) | ||||
|         except Exception as e: | ||||
|             logger.error("store_device with device_id=%s(%r) user_id=%s(%r)" | ||||
|                          " display_name=%s(%r) failed: %s", | ||||
| @ -139,3 +137,156 @@ class DeviceStore(SQLBaseStore): | ||||
|         ) | ||||
| 
 | ||||
|         defer.returnValue({d["device_id"]: d for d in devices}) | ||||
| 
 | ||||
|     def get_devices_by_remote(self, destination, from_stream_id): | ||||
|         now_stream_id = self._device_list_id_gen.get_current_token() | ||||
| 
 | ||||
|         has_changed = self._device_list_stream_cache.has_entity_changed( | ||||
|             destination, int(from_stream_id) | ||||
|         ) | ||||
|         if not has_changed: | ||||
|             defer.returnValue((now_stream_id, [])) | ||||
| 
 | ||||
|         return self.runInteraction( | ||||
|             "get_devices_by_remote", self._get_devices_by_remote_txn, | ||||
|             destination, from_stream_id, now_stream_id, | ||||
|         ) | ||||
| 
 | ||||
|     def _get_devices_by_remote_txn(self, txn, destination, from_stream_id, | ||||
|                                    now_stream_id): | ||||
|         sql = """ | ||||
|             SELECT user_id, device_id, max(stream_id) FROM device_lists_outbound_pokes | ||||
|             WHERE destination = ? AND stream_id > ? AND stream_id <= ? AND sent = ? | ||||
|             GROUP BY user_id, device_id | ||||
|         """ | ||||
|         txn.execute( | ||||
|             sql, (destination, from_stream_id, now_stream_id, False) | ||||
|         ) | ||||
|         rows = txn.fetchall() | ||||
| 
 | ||||
|         if not rows: | ||||
|             return now_stream_id, [] | ||||
| 
 | ||||
|         # maps (user_id, device_id) -> stream_id | ||||
|         query_map = {(r[0], r[1]): r[2] for r in rows} | ||||
|         devices = self._get_e2e_device_keys_txn( | ||||
|             txn, query_map.keys(), include_all_devices=True | ||||
|         ) | ||||
| 
 | ||||
|         prev_sent_id_sql = """ | ||||
|             SELECT coalesce(max(stream_id), 0) as stream_id | ||||
|             FROM device_lists_outbound_pokes | ||||
|             WHERE destination = ? AND user_id = ? AND sent = ? | ||||
|         """ | ||||
| 
 | ||||
|         results = [] | ||||
|         for user_id, user_devices in devices.iteritems(): | ||||
|             txn.execute(prev_sent_id_sql, (destination, user_id, True)) | ||||
|             rows = txn.fetchall() | ||||
|             prev_id = rows[0][0] | ||||
|             for device_id, result in user_devices.iteritems(): | ||||
|                 stream_id = query_map[(user_id, device_id)] | ||||
|                 result = { | ||||
|                     "user_id": user_id, | ||||
|                     "device_id": device_id, | ||||
|                     "prev_id": prev_id, | ||||
|                     "stream_id": stream_id, | ||||
|                 } | ||||
| 
 | ||||
|                 prev_id = stream_id | ||||
| 
 | ||||
|                 key_json = result.get("key_json", None) | ||||
|                 if key_json: | ||||
|                     result["keys"] = json.loads(key_json) | ||||
|                 device_display_name = result.get("device_display_name", None) | ||||
|                 if device_display_name: | ||||
|                     result["device_display_name"] = device_display_name | ||||
| 
 | ||||
|                 results.setdefault(user_id, {})[device_id] = result | ||||
| 
 | ||||
|         return now_stream_id, results | ||||
| 
 | ||||
|     def mark_as_sent_devices_by_remote(self, destination, stream_id): | ||||
|         return self.runInteraction( | ||||
|             "mark_as_sent_devices_by_remote", self._mark_as_sent_devices_by_remote_txn, | ||||
|             destination, stream_id, | ||||
|         ) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def get_user_whose_devices_changed(self, from_key): | ||||
|         from_key = int(from_key) | ||||
|         changed = self._device_list_stream_cache.get_all_entities_changed(from_key) | ||||
|         if changed is not None: | ||||
|             defer.returnValue(set(changed)) | ||||
| 
 | ||||
|         sql = """ | ||||
|             SELECT user_id FROM device_lists_stream WHERE stream_id > ? | ||||
|         """ | ||||
|         rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key) | ||||
|         defer.returnValue(set(row["user_id"] for row in rows)) | ||||
| 
 | ||||
|     def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id): | ||||
|         sql = """ | ||||
|             DELETE FROM device_lists_outbound_pokes | ||||
|             WHERE destination = ? AND stream_id < ( | ||||
|                 SELECT coalesce(max(stream_id), 0) FROM device_lists_outbound_pokes | ||||
|                 WHERE destination = ? AND stream_id <= ? | ||||
|             ) | ||||
|         """ | ||||
|         txn.execute(sql, (destination, destination, stream_id,)) | ||||
| 
 | ||||
|         sql = """ | ||||
|             UPDATE device_lists_outbound_pokes SET sent = ? | ||||
|             WHERE destination = ? AND stream_id <= ? | ||||
|         """ | ||||
|         txn.execute(sql, (destination, True,)) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def add_device_change_to_streams(self, user_id, device_id, hosts): | ||||
|         # device_lists_stream | ||||
|         # device_lists_outbound_pokes | ||||
|         with self._device_list_id_gen.get_next() as stream_id: | ||||
|             yield self.runInteraction( | ||||
|                 "add_device_change_to_streams", self._add_device_change_txn, | ||||
|                 user_id, device_id, hosts, stream_id, | ||||
|             ) | ||||
|         defer.returnValue(stream_id) | ||||
| 
 | ||||
|     def _add_device_change_txn(self, txn, user_id, device_id, hosts, stream_id): | ||||
|         txn.call_after( | ||||
|             self._device_list_stream_cache.entity_has_changed, | ||||
|             user_id, stream_id, | ||||
|         ) | ||||
|         for host in hosts: | ||||
|             txn.call_after( | ||||
|                 self._device_list_federation_stream_cache.entity_has_changed, | ||||
|                 host, stream_id, | ||||
|             ) | ||||
| 
 | ||||
|         self._simple_insert_txn( | ||||
|             txn, | ||||
|             table="device_lists_stream", | ||||
|             values={ | ||||
|                 "stream_id": stream_id, | ||||
|                 "user_id": user_id, | ||||
|                 "device_id": device_id, | ||||
|             } | ||||
|         ) | ||||
| 
 | ||||
|         self._simple_insert_many_txn( | ||||
|             txn, | ||||
|             table="device_lists_outbound_pokes", | ||||
|             values=[ | ||||
|                 { | ||||
|                     "destination": destination, | ||||
|                     "stream_id": stream_id, | ||||
|                     "user_id": user_id, | ||||
|                     "device_id": device_id, | ||||
|                     "sent": False, | ||||
|                 } | ||||
|                 for destination in hosts | ||||
|             ] | ||||
|         ) | ||||
| 
 | ||||
|     def get_device_stream_token(self): | ||||
|         return self._device_list_id_gen.get_current_token() | ||||
|  | ||||
| @ -12,9 +12,7 @@ | ||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| import collections | ||||
| 
 | ||||
| import twisted.internet.defer | ||||
| from twisted.internet import defer | ||||
| 
 | ||||
| from ._base import SQLBaseStore | ||||
| 
 | ||||
| @ -33,7 +31,7 @@ class EndToEndKeyStore(SQLBaseStore): | ||||
|             } | ||||
|         ) | ||||
| 
 | ||||
|     def get_e2e_device_keys(self, query_list): | ||||
|     def get_e2e_device_keys(self, query_list, include_all_devices=False): | ||||
|         """Fetch a list of device keys. | ||||
|         Args: | ||||
|             query_list(list): List of pairs of user_ids and device_ids. | ||||
| @ -45,10 +43,11 @@ class EndToEndKeyStore(SQLBaseStore): | ||||
|             return {} | ||||
| 
 | ||||
|         return self.runInteraction( | ||||
|             "get_e2e_device_keys", self._get_e2e_device_keys_txn, query_list | ||||
|             "get_e2e_device_keys", self._get_e2e_device_keys_txn, | ||||
|             query_list, include_all_devices, | ||||
|         ) | ||||
| 
 | ||||
|     def _get_e2e_device_keys_txn(self, txn, query_list): | ||||
|     def _get_e2e_device_keys_txn(self, txn, query_list, include_all_devices): | ||||
|         query_clauses = [] | ||||
|         query_params = [] | ||||
| 
 | ||||
| @ -63,23 +62,23 @@ class EndToEndKeyStore(SQLBaseStore): | ||||
|             query_clauses.append(query_clause) | ||||
| 
 | ||||
|         sql = ( | ||||
|             "SELECT k.user_id, k.device_id, " | ||||
|             "SELECT user_id, device_id, " | ||||
|             "    d.display_name AS device_display_name, " | ||||
|             "    k.key_json" | ||||
|             " FROM e2e_device_keys_json k" | ||||
|             "    LEFT JOIN devices d ON d.user_id = k.user_id" | ||||
|             "      AND d.device_id = k.device_id" | ||||
|             "    %s JOIN devices d USING (user_id, device_id)" | ||||
|             " WHERE %s" | ||||
|         ) % ( | ||||
|             "FULL OUTER" if include_all_devices else "LEFT", | ||||
|             " OR ".join("(" + q + ")" for q in query_clauses) | ||||
|         ) | ||||
| 
 | ||||
|         txn.execute(sql, query_params) | ||||
|         rows = self.cursor_to_dict(txn) | ||||
| 
 | ||||
|         result = collections.defaultdict(dict) | ||||
|         result = {} | ||||
|         for row in rows: | ||||
|             result[row["user_id"]][row["device_id"]] = row | ||||
|             result.setdefault(row["user_id"], {})[row["device_id"]] = row | ||||
| 
 | ||||
|         return result | ||||
| 
 | ||||
| @ -152,7 +151,7 @@ class EndToEndKeyStore(SQLBaseStore): | ||||
|             "claim_e2e_one_time_keys", _claim_e2e_one_time_keys | ||||
|         ) | ||||
| 
 | ||||
|     @twisted.internet.defer.inlineCallbacks | ||||
|     @defer.inlineCallbacks | ||||
|     def delete_e2e_keys_by_device(self, user_id, device_id): | ||||
|         yield self._simple_delete( | ||||
|             table="e2e_device_keys_json", | ||||
|  | ||||
							
								
								
									
										56
									
								
								synapse/storage/schema/delta/40/device_list_streams.sql
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										56
									
								
								synapse/storage/schema/delta/40/device_list_streams.sql
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,56 @@ | ||||
| /* Copyright 2017 OpenMarket Ltd | ||||
|  * | ||||
|  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|  * you may not use this file except in compliance with the License. | ||||
|  * You may obtain a copy of the License at | ||||
|  * | ||||
|  *    http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  */ | ||||
| 
 | ||||
| CREATE TABLE device_list_streams_remote ( | ||||
|     list_id TEXT NOT NULL, | ||||
|     origin TEXT NOT NULL, | ||||
|     user_id TEXT NOT NULL, | ||||
|     is_full BOOLEAN NOT NULL, | ||||
|     ts BIGINT NOT NULL | ||||
| ); | ||||
| 
 | ||||
| CREATE INDEX device_list_streams_remote_id_origin ON device_list_streams_remote( | ||||
|     origin, list_id, user_id | ||||
| ); | ||||
| 
 | ||||
| 
 | ||||
| CREATE TABLE device_lists_remote_cache ( | ||||
|     user_id TEXT NOT NULL, | ||||
|     device_id TEXT NOT NULL, | ||||
|     content TEXT NOT NULL | ||||
| ); | ||||
| 
 | ||||
| CREATE INDEX device_lists_remote_cache_id ON device_lists_remote_cache(user_id, device_id); | ||||
| 
 | ||||
| 
 | ||||
| CREATE TABLE device_lists_stream ( | ||||
|     stream_id BIGINT NOT NULL, | ||||
|     user_id TEXT NOT NULL, | ||||
|     device_id TEXT NOT NULL | ||||
| ); | ||||
| 
 | ||||
| CREATE INDEX device_lists_stream_id ON device_lists_stream(stream_id, user_id); | ||||
| 
 | ||||
| 
 | ||||
| CREATE TABLE device_lists_outbound_pokes ( | ||||
|     destination TEXT NOT NULL, | ||||
|     stream_id BIGINT NOT NULL, | ||||
|     user_id TEXT NOT NULL, | ||||
|     device_id TEXT NOT NULL, | ||||
|     sent BOOLEAN NOT NULL | ||||
| ); | ||||
| 
 | ||||
| CREATE INDEX device_lists_outbound_pokes_id ON device_lists_outbound_pokes(destination, stream_id); | ||||
| CREATE INDEX device_lists_outbound_pokes_user ON device_lists_outbound_pokes(destination, user_id); | ||||
| @ -44,6 +44,7 @@ class EventSources(object): | ||||
|     def get_current_token(self): | ||||
|         push_rules_key, _ = self.store.get_push_rules_stream_token() | ||||
|         to_device_key = self.store.get_to_device_stream_token() | ||||
|         device_list_key = self.store.get_device_stream_token() | ||||
| 
 | ||||
|         token = StreamToken( | ||||
|             room_key=( | ||||
| @ -63,6 +64,7 @@ class EventSources(object): | ||||
|             ), | ||||
|             push_rules_key=push_rules_key, | ||||
|             to_device_key=to_device_key, | ||||
|             device_list_key=device_list_key, | ||||
|         ) | ||||
|         defer.returnValue(token) | ||||
| 
 | ||||
| @ -70,6 +72,7 @@ class EventSources(object): | ||||
|     def get_current_token_for_room(self, room_id): | ||||
|         push_rules_key, _ = self.store.get_push_rules_stream_token() | ||||
|         to_device_key = self.store.get_to_device_stream_token() | ||||
|         device_list_key = self.store.get_device_stream_token() | ||||
| 
 | ||||
|         token = StreamToken( | ||||
|             room_key=( | ||||
| @ -89,5 +92,6 @@ class EventSources(object): | ||||
|             ), | ||||
|             push_rules_key=push_rules_key, | ||||
|             to_device_key=to_device_key, | ||||
|             device_list_key=device_list_key, | ||||
|         ) | ||||
|         defer.returnValue(token) | ||||
|  | ||||
| @ -158,6 +158,7 @@ class StreamToken( | ||||
|         "account_data_key", | ||||
|         "push_rules_key", | ||||
|         "to_device_key", | ||||
|         "device_list_key", | ||||
|     )) | ||||
| ): | ||||
|     _SEPARATOR = "_" | ||||
| @ -195,6 +196,7 @@ class StreamToken( | ||||
|             or (int(other.account_data_key) < int(self.account_data_key)) | ||||
|             or (int(other.push_rules_key) < int(self.push_rules_key)) | ||||
|             or (int(other.to_device_key) < int(self.to_device_key)) | ||||
|             or (int(other.device_list_key) < int(self.device_list_key)) | ||||
|         ) | ||||
| 
 | ||||
|     def copy_and_advance(self, key, new_value): | ||||
|  | ||||
| @ -75,6 +75,7 @@ class TypingNotificationsTestCase(unittest.TestCase): | ||||
|                 "get_received_txn_response", | ||||
|                 "set_received_txn_response", | ||||
|                 "get_destination_retry_timings", | ||||
|                 "get_devices_by_remote", | ||||
|             ]), | ||||
|             state_handler=self.state_handler, | ||||
|             handlers=None, | ||||
| @ -99,6 +100,8 @@ class TypingNotificationsTestCase(unittest.TestCase): | ||||
|             defer.succeed(retry_timings_res) | ||||
|         ) | ||||
| 
 | ||||
|         self.datastore.get_devices_by_remote.return_value = (0, []) | ||||
| 
 | ||||
|         def get_received_txn_response(*args): | ||||
|             return defer.succeed(None) | ||||
|         self.datastore.get_received_txn_response = get_received_txn_response | ||||
|  | ||||
| @ -1032,7 +1032,7 @@ class RoomMessageListTestCase(RestTestCase): | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def test_topo_token_is_accepted(self): | ||||
|         token = "t1-0_0_0_0_0_0_0" | ||||
|         token = "t1-0_0_0_0_0_0_0_0" | ||||
|         (code, response) = yield self.mock_resource.trigger_get( | ||||
|             "/rooms/%s/messages?access_token=x&from=%s" % | ||||
|             (self.room_id, token)) | ||||
| @ -1044,7 +1044,7 @@ class RoomMessageListTestCase(RestTestCase): | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def test_stream_token_is_accepted_for_fwd_pagianation(self): | ||||
|         token = "s0_0_0_0_0_0_0" | ||||
|         token = "s0_0_0_0_0_0_0_0" | ||||
|         (code, response) = yield self.mock_resource.trigger_get( | ||||
|             "/rooms/%s/messages?access_token=x&from=%s" % | ||||
|             (self.room_id, token)) | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user