From 1705647ddd6bc3e99ed88cc722ceb1c3a58823b4 Mon Sep 17 00:00:00 2001 From: Tomasz Fluxid Kowalczyk Date: Sat, 18 Feb 2012 22:08:38 +0100 Subject: [PATCH] Fixed race conditions between clients on high latency connections This works, but code craves for a huge cleanup and refactoring. --- syncplay/client.py | 51 +++++++++++++++++++++++---------------- syncplay/network_utils.py | 4 +-- syncplay/server.py | 1 - syncplay/utils.py | 8 +++++- 4 files changed, 39 insertions(+), 25 deletions(-) diff --git a/syncplay/client.py b/syncplay/client.py index 534cfd5..d53659b 100644 --- a/syncplay/client.py +++ b/syncplay/client.py @@ -9,7 +9,10 @@ from .network_utils import ( arg_count, CommandProtocol, ) -from .utils import parse_state +from .utils import ( + format_time, + parse_state, +) class SyncClientProtocol(CommandProtocol): @@ -156,6 +159,7 @@ class Manager(object): self.global_noted_pause_change = None self.last_global_update = None self.counter = 0 + self.counter_recv = 0 self.player = None self.ask_delayed = None @@ -249,8 +253,11 @@ class Manager(object): else: self.send_delayed = reactor.callLater(when, self.send_status) - def send_status(self): - if not self.running: + def send_status(self, force = False): + if not (self.running and self.protocol): + return + self.schedule_send_status() + if self.counter > self.counter_recv and not force: return self.counter += 1 curtime = time.time() @@ -259,12 +266,12 @@ class Manager(object): position += curtime - self.last_player_update if self.protocol: self.protocol.send_state(self.counter, curtime, self.player_paused, self.player_position) - self.schedule_send_status() def send_seek(self): if not (self.running and self.protocol): return self.counter += 1 + self.protocol.send_seek(self.counter, time.time(), self.player_position) def send_filename(self): @@ -285,7 +292,7 @@ class Manager(object): if old_paused and not paused: self.player_paused_at = None if old_paused != paused and self.global_paused != paused: - self.send_status() + self.send_status(True) if not (self.global_paused or self.seek_sent_wait): diff = position - self.get_global_position() @@ -301,6 +308,7 @@ class Manager(object): self.player_speed_fix = False if abs(diff) > 8: self.send_seek() + self.seek_sent_wait = True if not paused and self.player_paused_at is not None and position >= self.player_paused_at: #print 'Pausing %0.2fs after pause point' % (position - self.player_paused_at) @@ -312,27 +320,35 @@ class Manager(object): self.send_filename() def update_global_state(self, counter, ctime, paused, position, name): + self.counter_recv = max(self.counter_recv, counter) + counter_valid = self.counter and counter >= self.counter + curtime = time.time() + updated_before = bool(self.last_global_update) + + if updated_before and not counter_valid: + return + if not paused: position += curtime - ctime self.global_paused = paused self.global_position = position self.global_who_changed = name - updated_before = bool(self.last_global_update) self.last_global_update = curtime if not self.player: return changed = False + self.seek_sent_wait = False + if not updated_before: self.player.set_position(position) self.player.set_paused(paused) changed = True - elif not (self.counter and counter < self.counter): + + if counter_valid: diff = self.get_player_position() - position - if self.last_player_update is not None: - diff += curtime - self.last_player_update if abs(diff) > 4: self.player.set_position(position) changed = True @@ -349,27 +365,20 @@ class Manager(object): if diff < 0: self.player.set_paused(True) changed = True - #else: - # print 'Not pausing now' + self.global_noted_pause_change = paused - self.seek_sent_wait = False - self.global_noted_pause_change = paused if changed: self.ask_player() def seek(self, ctime, position, who): - position += time.time() - ctime + curtime = time.time() + position += curtime - ctime self.global_position = position + self.last_global_update = curtime if self.player: self.player.set_position(position) self.ask_player() - position = int(position*1000) - seconds, mseconds = divmod(position, 1000) - minutes, seconds = divmod(seconds, 60) - hours, minutes = divmod(minutes, 60) - print '%s seeked to %02d:%02d:%02d.%03d' % ( - who, hours, minutes, seconds, mseconds - ) + print who, 'seeked to', format_time(position) diff --git a/syncplay/network_utils.py b/syncplay/network_utils.py index 014cd23..08da0f7 100644 --- a/syncplay/network_utils.py +++ b/syncplay/network_utils.py @@ -1,12 +1,11 @@ #coding:utf8 +from functools import wraps try: from cStringIO import StringIO except ImportError: from StringIO import StringIO -from functools import wraps - from twisted.internet.defer import succeed from twisted.internet.protocol import ( ProcessProtocol, @@ -22,6 +21,7 @@ from .utils import ( split_args, ) + def arg_count(minimum, maximum=None): def decorator(f): @wraps(f) diff --git a/syncplay/server.py b/syncplay/server.py index 3fa35dd..9ebef26 100644 --- a/syncplay/server.py +++ b/syncplay/server.py @@ -212,7 +212,6 @@ class SyncFactory(Factory): self.pause_change_by = None if not self.watchers: self.paused = True - # send info someone quit def update_state(self, watcher_proto, counter, ctime, paused, position): watcher = self.watchers.get(watcher_proto) diff --git a/syncplay/utils.py b/syncplay/utils.py index c29951c..c804929 100644 --- a/syncplay/utils.py +++ b/syncplay/utils.py @@ -44,7 +44,6 @@ def split_args(args): def join_args(args): return ' '.join(quote_arg(arg) for arg in args) - def parse_state(args): if len(args) == 4: counter, ctime, state, position = args @@ -79,3 +78,10 @@ def find_exec_path(name): if os.access(path, os.X_OK): return path +def format_time(value): + value = int(value*100) + seconds, mseconds = divmod(value, 100) + minutes, seconds = divmod(seconds, 60) + hours, minutes = divmod(minutes, 60) + return '%02d:%02d:%02d.%02d' % (hours, minutes, seconds, mseconds) +