Fixed race conditions between clients on high latency connections

This works, but code craves for a huge cleanup and refactoring.
This commit is contained in:
Tomasz Fluxid Kowalczyk 2012-02-18 22:08:38 +01:00
parent d0e23f9b0e
commit 1705647ddd
4 changed files with 39 additions and 25 deletions

View File

@ -9,7 +9,10 @@ from .network_utils import (
arg_count,
CommandProtocol,
)
from .utils import parse_state
from .utils import (
format_time,
parse_state,
)
class SyncClientProtocol(CommandProtocol):
@ -156,6 +159,7 @@ class Manager(object):
self.global_noted_pause_change = None
self.last_global_update = None
self.counter = 0
self.counter_recv = 0
self.player = None
self.ask_delayed = None
@ -249,8 +253,11 @@ class Manager(object):
else:
self.send_delayed = reactor.callLater(when, self.send_status)
def send_status(self):
if not self.running:
def send_status(self, force = False):
if not (self.running and self.protocol):
return
self.schedule_send_status()
if self.counter > self.counter_recv and not force:
return
self.counter += 1
curtime = time.time()
@ -259,12 +266,12 @@ class Manager(object):
position += curtime - self.last_player_update
if self.protocol:
self.protocol.send_state(self.counter, curtime, self.player_paused, self.player_position)
self.schedule_send_status()
def send_seek(self):
if not (self.running and self.protocol):
return
self.counter += 1
self.protocol.send_seek(self.counter, time.time(), self.player_position)
def send_filename(self):
@ -285,7 +292,7 @@ class Manager(object):
if old_paused and not paused:
self.player_paused_at = None
if old_paused != paused and self.global_paused != paused:
self.send_status()
self.send_status(True)
if not (self.global_paused or self.seek_sent_wait):
diff = position - self.get_global_position()
@ -301,6 +308,7 @@ class Manager(object):
self.player_speed_fix = False
if abs(diff) > 8:
self.send_seek()
self.seek_sent_wait = True
if not paused and self.player_paused_at is not None and position >= self.player_paused_at:
#print 'Pausing %0.2fs after pause point' % (position - self.player_paused_at)
@ -312,27 +320,35 @@ class Manager(object):
self.send_filename()
def update_global_state(self, counter, ctime, paused, position, name):
self.counter_recv = max(self.counter_recv, counter)
counter_valid = self.counter and counter >= self.counter
curtime = time.time()
updated_before = bool(self.last_global_update)
if updated_before and not counter_valid:
return
if not paused:
position += curtime - ctime
self.global_paused = paused
self.global_position = position
self.global_who_changed = name
updated_before = bool(self.last_global_update)
self.last_global_update = curtime
if not self.player:
return
changed = False
self.seek_sent_wait = False
if not updated_before:
self.player.set_position(position)
self.player.set_paused(paused)
changed = True
elif not (self.counter and counter < self.counter):
if counter_valid:
diff = self.get_player_position() - position
if self.last_player_update is not None:
diff += curtime - self.last_player_update
if abs(diff) > 4:
self.player.set_position(position)
changed = True
@ -349,27 +365,20 @@ class Manager(object):
if diff < 0:
self.player.set_paused(True)
changed = True
#else:
# print 'Not pausing now'
self.global_noted_pause_change = paused
self.seek_sent_wait = False
self.global_noted_pause_change = paused
if changed:
self.ask_player()
def seek(self, ctime, position, who):
position += time.time() - ctime
curtime = time.time()
position += curtime - ctime
self.global_position = position
self.last_global_update = curtime
if self.player:
self.player.set_position(position)
self.ask_player()
position = int(position*1000)
seconds, mseconds = divmod(position, 1000)
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
print '%s seeked to %02d:%02d:%02d.%03d' % (
who, hours, minutes, seconds, mseconds
)
print who, 'seeked to', format_time(position)

View File

@ -1,12 +1,11 @@
#coding:utf8
from functools import wraps
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
from functools import wraps
from twisted.internet.defer import succeed
from twisted.internet.protocol import (
ProcessProtocol,
@ -22,6 +21,7 @@ from .utils import (
split_args,
)
def arg_count(minimum, maximum=None):
def decorator(f):
@wraps(f)

View File

@ -212,7 +212,6 @@ class SyncFactory(Factory):
self.pause_change_by = None
if not self.watchers:
self.paused = True
# send info someone quit
def update_state(self, watcher_proto, counter, ctime, paused, position):
watcher = self.watchers.get(watcher_proto)

View File

@ -44,7 +44,6 @@ def split_args(args):
def join_args(args):
return ' '.join(quote_arg(arg) for arg in args)
def parse_state(args):
if len(args) == 4:
counter, ctime, state, position = args
@ -79,3 +78,10 @@ def find_exec_path(name):
if os.access(path, os.X_OK):
return path
def format_time(value):
value = int(value*100)
seconds, mseconds = divmod(value, 100)
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
return '%02d:%02d:%02d.%02d' % (hours, minutes, seconds, mseconds)