From d0e23f9b0ec2a29c427be958ffed0d3cef7c5a63 Mon Sep 17 00:00:00 2001 From: Tomasz Fluxid Kowalczyk Date: Sat, 18 Feb 2012 16:07:12 +0100 Subject: [PATCH] Changed protocol, now we're sending timestamps. Now client gets more precise positions, and lagged packets/latency spikes shouldn't now cause synching problems. Syncing should be a bit more precise now in general, when seeking/joining to playing session. --- syncplay/client.py | 39 +++++++++++++++---------- syncplay/server.py | 72 +++++++++++++++++++++++++--------------------- syncplay/utils.py | 18 ++++++------ 3 files changed, 71 insertions(+), 58 deletions(-) diff --git a/syncplay/client.py b/syncplay/client.py index bff5da5..534cfd5 100644 --- a/syncplay/client.py +++ b/syncplay/client.py @@ -45,28 +45,30 @@ class SyncClientProtocol(CommandProtocol): else: print '%s is present' % who - @arg_count(3, 4) + @arg_count(4, 5) def handle_connected_state(self, args): args = parse_state(args) if not args: self.drop_with_error('Malformed state attributes') return - counter, paused, position, name = args + counter, ctime, paused, position, name = args - self.manager.update_global_state(counter, paused, position, name) + self.manager.update_global_state(counter, ctime, paused, position, name) - @arg_count(2) + @arg_count(3) def handle_connected_seek(self, args): - position, who = args + ctime, position, who = args try: + ctime = int(ctime) position = int(position) except ValueError: self.drop_with_error('Invalid arguments') + ctime /= 1000.0 position /= 1000.0 - self.manager.seek(position, who) + self.manager.seek(ctime, position, who) @arg_count(1) def handle_connected_ping(self, args): @@ -86,11 +88,11 @@ class SyncClientProtocol(CommandProtocol): print '%s left' % args[0] - def send_state(self, counter, paused, position): - self.send_message('state', counter, ('paused' if paused else 'playing'), int(position*1000)) + def send_state(self, counter, ctime, paused, position): + self.send_message('state', counter, int(ctime*1000), ('paused' if paused else 'playing'), int(position*1000)) - def send_seek(self, counter, position): - self.send_message('seek', counter, int(position*1000)) + def send_seek(self, counter, ctime, position): + self.send_message('seek', counter, int(ctime*1000), int(position*1000)) def send_playing(self, filename): self.send_message('playing', filename) @@ -251,15 +253,19 @@ class Manager(object): if not self.running: return self.counter += 1 + curtime = time.time() + position = self.player_position + if not self.player_paused: + position += curtime - self.last_player_update if self.protocol: - self.protocol.send_state(self.counter, self.player_paused, self.player_position) + self.protocol.send_state(self.counter, curtime, self.player_paused, self.player_position) self.schedule_send_status() def send_seek(self): if not (self.running and self.protocol): return self.counter += 1 - self.protocol.send_seek(self.counter, self.player_position) + self.protocol.send_seek(self.counter, time.time(), self.player_position) def send_filename(self): if self.protocol and self.player_filename: @@ -305,8 +311,10 @@ class Manager(object): self.player_filename = filename self.send_filename() - def update_global_state(self, counter, paused, position, name): + def update_global_state(self, counter, ctime, paused, position, name): curtime = time.time() + if not paused: + position += curtime - ctime self.global_paused = paused self.global_position = position self.global_who_changed = name @@ -344,12 +352,13 @@ class Manager(object): #else: # print 'Not pausing now' - self.global_noted_pause_change = paused self.seek_sent_wait = False + self.global_noted_pause_change = paused if changed: self.ask_player() - def seek(self, position, who): + def seek(self, ctime, position, who): + position += time.time() - ctime self.global_position = position if self.player: self.player.set_position(position) diff --git a/syncplay/server.py b/syncplay/server.py index 78d6dd2..3fa35dd 100644 --- a/syncplay/server.py +++ b/syncplay/server.py @@ -42,28 +42,31 @@ class SyncServerProtocol(CommandProtocol): self.factory.add_watcher(self, args[0]) self.change_state('connected') - @arg_count(3) + @arg_count(4) def handle_connected_state(self, args): args = parse_state(args) if not args: self.drop_with_error('Malformed state attributes') return - counter, paused, position, _ = args + counter, ctime, paused, position, _ = args - self.factory.update_state(self, counter, paused, position) + self.factory.update_state(self, counter, ctime, paused, position) - @arg_count(2) + @arg_count(3) def handle_connected_seek(self, args): + counter, ctime, position = args try: - counter = int(args[0]) - position = int(args[1]) + counter = int(counter) + ctime = int(ctime) + position = int(position) except ValueError: self.drop_with_error('Invalid arguments') + ctime /= 1000.0 position /= 1000.0 - self.factory.seek(self, counter, position) + self.factory.seek(self, counter, ctime, position) @arg_count(2) def handle_connected_pong(self, args): @@ -88,16 +91,17 @@ class SyncServerProtocol(CommandProtocol): ))) - def send_state(self, counter, paused, position, who_last_changed): + def send_state(self, counter, ctime, paused, position, who_last_changed): + ctime = int(ctime*1000) paused = 'paused' if paused else 'playing' position = int(position*1000) if who_last_changed is None: - self.send_message('state', counter, paused, position) + self.send_message('state', counter, ctime, paused, position) else: - self.send_message('state', counter, paused, position, who_last_changed) + self.send_message('state', counter, ctime, paused, position, who_last_changed) - def send_seek(self, position, who_seeked): - self.send_message('seek', int(position*1000), who_seeked) + def send_seek(self, ctime, position, who_seeked): + self.send_message('seek', int(ctime*1000), int(position*1000), who_seeked) def send_ping(self, value): self.send_message('ping', value) @@ -141,6 +145,7 @@ class WatcherInfo(object): self.name = name self.active = True + self.paused = True self.position = 0 self.filename = None self.max_position = 0 @@ -196,6 +201,8 @@ class SyncFactory(Factory): def remove_watcher(self, watcher_proto): watcher = self.watchers.pop(watcher_proto, None) + if not watcher: + return watcher.active = False for receiver in self.watchers.itervalues(): if receiver != watcher: @@ -207,21 +214,24 @@ class SyncFactory(Factory): self.paused = True # send info someone quit - def update_state(self, watcher_proto, counter, paused, position): + def update_state(self, watcher_proto, counter, ctime, paused, position): watcher = self.watchers.get(watcher_proto) if not watcher: return - if not paused and watcher.ping is not None: - position += watcher.ping + curtime = time.time() + ctime += watcher.time_offset + if not paused: + position += curtime - ctime + + watcher.paused = paused watcher.position = position watcher.max_position = max(position, watcher.max_position) - watcher.last_update = time.time() + watcher.last_update = curtime watcher.counter = counter - + pause_changed = paused != self.paused - curtime = time.time() if pause_changed and ( not self.pause_change_by or self.pause_change_by == watcher or @@ -242,26 +252,24 @@ class SyncFactory(Factory): ): self.send_state_to(receiver, position, curtime) - def seek(self, watcher_proto, counter, position): + def seek(self, watcher_proto, counter, ctime, position): watcher = self.watchers.get(watcher_proto) if not watcher: return #print watcher.name, 'seeked to', position - if not self.paused and watcher.ping is not None: - position += watcher.ping + curtime = time.time() + ctime += watcher.time_offset + position += curtime - ctime watcher.counter = counter for receiver in self.watchers.itervalues(): position2 = position - if not self.paused and receiver.ping is not None: - position2 += receiver.ping receiver.max_position = position2 if receiver == watcher: - # send_state_to modifies by ping already... - self.send_state_to(receiver, position) + self.send_state_to(receiver, position, curtime) else: - receiver.watcher_proto.send_seek(position2, watcher.name) + receiver.watcher_proto.send_seek(curtime-receiver.time_offset, position2, watcher.name) def send_state_to(self, watcher, position=None, curtime=None): if position is None: @@ -269,13 +277,12 @@ class SyncFactory(Factory): if curtime is None: curtime = time.time() - if not self.paused and watcher.ping is not None: - position += watcher.ping + ctime = curtime - watcher.time_offset if self.pause_change_by: - watcher.watcher_proto.send_state(watcher.counter, self.paused, position, self.pause_change_by.name) + watcher.watcher_proto.send_state(watcher.counter, ctime, self.paused, position, self.pause_change_by.name) else: - watcher.watcher_proto.send_state(watcher.counter, self.paused, position, None) + watcher.watcher_proto.send_state(watcher.counter, ctime, self.paused, position, None) watcher.last_update_sent = curtime @@ -331,9 +338,8 @@ class SyncFactory(Factory): while not chars or chars in watcher.pings_sent: chars = random_chars() - ctime = time.time() - print ctime - watcher.last_ping_received - if ctime - watcher.last_ping_received > 60: + curtime = time.time() + if curtime - watcher.last_ping_received > 60: watcher.watcher_proto.drop() return diff --git a/syncplay/utils.py b/syncplay/utils.py index e36b5bd..c29951c 100644 --- a/syncplay/utils.py +++ b/syncplay/utils.py @@ -46,32 +46,30 @@ def join_args(args): def parse_state(args): - if len(args) == 3: - counter, state, position = args + if len(args) == 4: + counter, ctime, state, position = args who_changed_state = None - elif len(args) == 4: - counter, state, position, who_changed_state = args + elif len(args) == 5: + counter, ctime, state, position, who_changed_state = args else: return - try: - counter = int(counter) - except ValueError: - return - if not state in ('paused', 'playing'): return paused = state == 'paused' try: + counter = int(counter) + ctime = int(ctime) position = int(position) except ValueError: return + ctime /= 1000.0 position /= 1000.0 - return counter, paused, position, who_changed_state + return counter, ctime, paused, position, who_changed_state def find_exec_path(name): if os.access(name, os.X_OK):