Changed protocol, now we're sending timestamps.

Now client gets more precise positions, and lagged packets/latency
spikes shouldn't now cause synching problems. Syncing should be a bit
more precise now in general, when seeking/joining to playing session.
This commit is contained in:
Tomasz Fluxid Kowalczyk 2012-02-18 16:07:12 +01:00
parent 34eb082eef
commit d0e23f9b0e
3 changed files with 71 additions and 58 deletions

View File

@ -45,28 +45,30 @@ class SyncClientProtocol(CommandProtocol):
else: else:
print '%s is present' % who print '%s is present' % who
@arg_count(3, 4) @arg_count(4, 5)
def handle_connected_state(self, args): def handle_connected_state(self, args):
args = parse_state(args) args = parse_state(args)
if not args: if not args:
self.drop_with_error('Malformed state attributes') self.drop_with_error('Malformed state attributes')
return return
counter, paused, position, name = args counter, ctime, paused, position, name = args
self.manager.update_global_state(counter, paused, position, name) self.manager.update_global_state(counter, ctime, paused, position, name)
@arg_count(2) @arg_count(3)
def handle_connected_seek(self, args): def handle_connected_seek(self, args):
position, who = args ctime, position, who = args
try: try:
ctime = int(ctime)
position = int(position) position = int(position)
except ValueError: except ValueError:
self.drop_with_error('Invalid arguments') self.drop_with_error('Invalid arguments')
ctime /= 1000.0
position /= 1000.0 position /= 1000.0
self.manager.seek(position, who) self.manager.seek(ctime, position, who)
@arg_count(1) @arg_count(1)
def handle_connected_ping(self, args): def handle_connected_ping(self, args):
@ -86,11 +88,11 @@ class SyncClientProtocol(CommandProtocol):
print '%s left' % args[0] print '%s left' % args[0]
def send_state(self, counter, paused, position): def send_state(self, counter, ctime, paused, position):
self.send_message('state', counter, ('paused' if paused else 'playing'), int(position*1000)) self.send_message('state', counter, int(ctime*1000), ('paused' if paused else 'playing'), int(position*1000))
def send_seek(self, counter, position): def send_seek(self, counter, ctime, position):
self.send_message('seek', counter, int(position*1000)) self.send_message('seek', counter, int(ctime*1000), int(position*1000))
def send_playing(self, filename): def send_playing(self, filename):
self.send_message('playing', filename) self.send_message('playing', filename)
@ -251,15 +253,19 @@ class Manager(object):
if not self.running: if not self.running:
return return
self.counter += 1 self.counter += 1
curtime = time.time()
position = self.player_position
if not self.player_paused:
position += curtime - self.last_player_update
if self.protocol: if self.protocol:
self.protocol.send_state(self.counter, self.player_paused, self.player_position) self.protocol.send_state(self.counter, curtime, self.player_paused, self.player_position)
self.schedule_send_status() self.schedule_send_status()
def send_seek(self): def send_seek(self):
if not (self.running and self.protocol): if not (self.running and self.protocol):
return return
self.counter += 1 self.counter += 1
self.protocol.send_seek(self.counter, self.player_position) self.protocol.send_seek(self.counter, time.time(), self.player_position)
def send_filename(self): def send_filename(self):
if self.protocol and self.player_filename: if self.protocol and self.player_filename:
@ -305,8 +311,10 @@ class Manager(object):
self.player_filename = filename self.player_filename = filename
self.send_filename() self.send_filename()
def update_global_state(self, counter, paused, position, name): def update_global_state(self, counter, ctime, paused, position, name):
curtime = time.time() curtime = time.time()
if not paused:
position += curtime - ctime
self.global_paused = paused self.global_paused = paused
self.global_position = position self.global_position = position
self.global_who_changed = name self.global_who_changed = name
@ -344,12 +352,13 @@ class Manager(object):
#else: #else:
# print 'Not pausing now' # print 'Not pausing now'
self.global_noted_pause_change = paused
self.seek_sent_wait = False self.seek_sent_wait = False
self.global_noted_pause_change = paused
if changed: if changed:
self.ask_player() self.ask_player()
def seek(self, position, who): def seek(self, ctime, position, who):
position += time.time() - ctime
self.global_position = position self.global_position = position
if self.player: if self.player:
self.player.set_position(position) self.player.set_position(position)

View File

@ -42,28 +42,31 @@ class SyncServerProtocol(CommandProtocol):
self.factory.add_watcher(self, args[0]) self.factory.add_watcher(self, args[0])
self.change_state('connected') self.change_state('connected')
@arg_count(3) @arg_count(4)
def handle_connected_state(self, args): def handle_connected_state(self, args):
args = parse_state(args) args = parse_state(args)
if not args: if not args:
self.drop_with_error('Malformed state attributes') self.drop_with_error('Malformed state attributes')
return return
counter, paused, position, _ = args counter, ctime, paused, position, _ = args
self.factory.update_state(self, counter, paused, position) self.factory.update_state(self, counter, ctime, paused, position)
@arg_count(2) @arg_count(3)
def handle_connected_seek(self, args): def handle_connected_seek(self, args):
counter, ctime, position = args
try: try:
counter = int(args[0]) counter = int(counter)
position = int(args[1]) ctime = int(ctime)
position = int(position)
except ValueError: except ValueError:
self.drop_with_error('Invalid arguments') self.drop_with_error('Invalid arguments')
ctime /= 1000.0
position /= 1000.0 position /= 1000.0
self.factory.seek(self, counter, position) self.factory.seek(self, counter, ctime, position)
@arg_count(2) @arg_count(2)
def handle_connected_pong(self, args): def handle_connected_pong(self, args):
@ -88,16 +91,17 @@ class SyncServerProtocol(CommandProtocol):
))) )))
def send_state(self, counter, paused, position, who_last_changed): def send_state(self, counter, ctime, paused, position, who_last_changed):
ctime = int(ctime*1000)
paused = 'paused' if paused else 'playing' paused = 'paused' if paused else 'playing'
position = int(position*1000) position = int(position*1000)
if who_last_changed is None: if who_last_changed is None:
self.send_message('state', counter, paused, position) self.send_message('state', counter, ctime, paused, position)
else: else:
self.send_message('state', counter, paused, position, who_last_changed) self.send_message('state', counter, ctime, paused, position, who_last_changed)
def send_seek(self, position, who_seeked): def send_seek(self, ctime, position, who_seeked):
self.send_message('seek', int(position*1000), who_seeked) self.send_message('seek', int(ctime*1000), int(position*1000), who_seeked)
def send_ping(self, value): def send_ping(self, value):
self.send_message('ping', value) self.send_message('ping', value)
@ -141,6 +145,7 @@ class WatcherInfo(object):
self.name = name self.name = name
self.active = True self.active = True
self.paused = True
self.position = 0 self.position = 0
self.filename = None self.filename = None
self.max_position = 0 self.max_position = 0
@ -196,6 +201,8 @@ class SyncFactory(Factory):
def remove_watcher(self, watcher_proto): def remove_watcher(self, watcher_proto):
watcher = self.watchers.pop(watcher_proto, None) watcher = self.watchers.pop(watcher_proto, None)
if not watcher:
return
watcher.active = False watcher.active = False
for receiver in self.watchers.itervalues(): for receiver in self.watchers.itervalues():
if receiver != watcher: if receiver != watcher:
@ -207,21 +214,24 @@ class SyncFactory(Factory):
self.paused = True self.paused = True
# send info someone quit # send info someone quit
def update_state(self, watcher_proto, counter, paused, position): def update_state(self, watcher_proto, counter, ctime, paused, position):
watcher = self.watchers.get(watcher_proto) watcher = self.watchers.get(watcher_proto)
if not watcher: if not watcher:
return return
if not paused and watcher.ping is not None: curtime = time.time()
position += watcher.ping ctime += watcher.time_offset
if not paused:
position += curtime - ctime
watcher.paused = paused
watcher.position = position watcher.position = position
watcher.max_position = max(position, watcher.max_position) watcher.max_position = max(position, watcher.max_position)
watcher.last_update = time.time() watcher.last_update = curtime
watcher.counter = counter watcher.counter = counter
pause_changed = paused != self.paused pause_changed = paused != self.paused
curtime = time.time()
if pause_changed and ( if pause_changed and (
not self.pause_change_by or not self.pause_change_by or
self.pause_change_by == watcher or self.pause_change_by == watcher or
@ -242,26 +252,24 @@ class SyncFactory(Factory):
): ):
self.send_state_to(receiver, position, curtime) self.send_state_to(receiver, position, curtime)
def seek(self, watcher_proto, counter, position): def seek(self, watcher_proto, counter, ctime, position):
watcher = self.watchers.get(watcher_proto) watcher = self.watchers.get(watcher_proto)
if not watcher: if not watcher:
return return
#print watcher.name, 'seeked to', position #print watcher.name, 'seeked to', position
if not self.paused and watcher.ping is not None: curtime = time.time()
position += watcher.ping ctime += watcher.time_offset
position += curtime - ctime
watcher.counter = counter watcher.counter = counter
for receiver in self.watchers.itervalues(): for receiver in self.watchers.itervalues():
position2 = position position2 = position
if not self.paused and receiver.ping is not None:
position2 += receiver.ping
receiver.max_position = position2 receiver.max_position = position2
if receiver == watcher: if receiver == watcher:
# send_state_to modifies by ping already... self.send_state_to(receiver, position, curtime)
self.send_state_to(receiver, position)
else: else:
receiver.watcher_proto.send_seek(position2, watcher.name) receiver.watcher_proto.send_seek(curtime-receiver.time_offset, position2, watcher.name)
def send_state_to(self, watcher, position=None, curtime=None): def send_state_to(self, watcher, position=None, curtime=None):
if position is None: if position is None:
@ -269,13 +277,12 @@ class SyncFactory(Factory):
if curtime is None: if curtime is None:
curtime = time.time() curtime = time.time()
if not self.paused and watcher.ping is not None: ctime = curtime - watcher.time_offset
position += watcher.ping
if self.pause_change_by: if self.pause_change_by:
watcher.watcher_proto.send_state(watcher.counter, self.paused, position, self.pause_change_by.name) watcher.watcher_proto.send_state(watcher.counter, ctime, self.paused, position, self.pause_change_by.name)
else: else:
watcher.watcher_proto.send_state(watcher.counter, self.paused, position, None) watcher.watcher_proto.send_state(watcher.counter, ctime, self.paused, position, None)
watcher.last_update_sent = curtime watcher.last_update_sent = curtime
@ -331,9 +338,8 @@ class SyncFactory(Factory):
while not chars or chars in watcher.pings_sent: while not chars or chars in watcher.pings_sent:
chars = random_chars() chars = random_chars()
ctime = time.time() curtime = time.time()
print ctime - watcher.last_ping_received if curtime - watcher.last_ping_received > 60:
if ctime - watcher.last_ping_received > 60:
watcher.watcher_proto.drop() watcher.watcher_proto.drop()
return return

View File

@ -46,32 +46,30 @@ def join_args(args):
def parse_state(args): def parse_state(args):
if len(args) == 3: if len(args) == 4:
counter, state, position = args counter, ctime, state, position = args
who_changed_state = None who_changed_state = None
elif len(args) == 4: elif len(args) == 5:
counter, state, position, who_changed_state = args counter, ctime, state, position, who_changed_state = args
else: else:
return return
try:
counter = int(counter)
except ValueError:
return
if not state in ('paused', 'playing'): if not state in ('paused', 'playing'):
return return
paused = state == 'paused' paused = state == 'paused'
try: try:
counter = int(counter)
ctime = int(ctime)
position = int(position) position = int(position)
except ValueError: except ValueError:
return return
ctime /= 1000.0
position /= 1000.0 position /= 1000.0
return counter, paused, position, who_changed_state return counter, ctime, paused, position, who_changed_state
def find_exec_path(name): def find_exec_path(name):
if os.access(name, os.X_OK): if os.access(name, os.X_OK):