From 5c7ba50e5534935043565f2677e0f2f83046d78e Mon Sep 17 00:00:00 2001 From: Tomasz Fluxid Kowalczyk Date: Sat, 28 Jan 2012 19:40:51 +0100 Subject: [PATCH] Client works, mplayer wrapper works, but synchronization doesn't yet --- mplayer.py | 27 ++++++++++ syncplay/client.py | 100 ++++++++++++++++++++++++++++++++++-- syncplay/network_utils.py | 20 ++++---- syncplay/players/mplayer.py | 69 ++++++++++++++++++++++--- syncplay/utils.py | 11 ++++ 5 files changed, 205 insertions(+), 22 deletions(-) create mode 100644 mplayer.py diff --git a/mplayer.py b/mplayer.py new file mode 100644 index 0000000..b1d5756 --- /dev/null +++ b/mplayer.py @@ -0,0 +1,27 @@ +#coding:utf8 + +import sys + +from twisted.internet import reactor + +from syncplay import client +from syncplay.players import mplayer + +if __name__ == '__main__': + args = sys.argv[1:] + host = args.pop(0) + name = args.pop(0) + if ':' in host: + host, port = host.split(':', 1) + port = int(port) + else: + port = 8999 + + args.append('-slave') + args.append('-quiet') + + manager = client.Manager(host, port, name) + manager.start() + mplayer.run_mplayer(manager, 'mplayer', args) + reactor.run() + diff --git a/syncplay/client.py b/syncplay/client.py index 6a1a5e3..4596e1c 100644 --- a/syncplay/client.py +++ b/syncplay/client.py @@ -2,20 +2,25 @@ import time -from twisted.internet.protocol import Factory +from twisted.internet import reactor +from twisted.internet.protocol import ReconnectingClientFactory from .network_utils import CommandProtocol from .utils import parse_state class SyncClientProtocol(CommandProtocol): - def __init__(self, factory): + def __init__(self, manager): CommandProtocol.__init__(self) - self.factory = factory + self.manager = manager def connectionMade(self): - self.send_message('iam', self.factory.name) + self.manager.protocol = self + self.send_message('iam', self.manager.name) + + def connectionLost(self, reason): + self.manager.protocol = None def handle_connected_state(self, arg): arg = parse_state(arg) @@ -25,7 +30,10 @@ class SyncClientProtocol(CommandProtocol): paused, position, name = arg - self.factory.update_state(self, paused, position, name) + self.manager.update_global_state(paused, position, name) + + def send_state(self, paused, position): + self.send_message('state', ('paused' if paused else 'playing'), int(position*100)) states = dict( @@ -37,3 +45,85 @@ class SyncClientProtocol(CommandProtocol): ) initial_state = 'connected' +class SyncClientFactory(ReconnectingClientFactory): + def __init__(self, manager): + self.manager = manager + + def buildProtocol(self, addr): + return SyncClientProtocol(self.manager) + + +class Manager(object): + def __init__(self, host, port, name): + self.host = host + self.port = port + self.name = name + + self.player = None + self.protocol = None + + self.ask_delayed = None + self.send_delayed = None + + self.global_paused = True + self.global_position = 0.0 + + self.player_paused = True + self.player_position = 0.0 + + def start(self): + factory = SyncClientFactory(self) + reactor.connectTCP(self.host, self.port, factory) + self.running = True + self.schedule_ask_player() + self.schedule_send_status() + + def stop(self): + if self.protocol: + self.protocol.drop() + if self.player: + self.player.drop() + self.running = False + + def schedule_ask_player(self, when=0.2): + if self.ask_delayed and self.ask_delayed.active(): + self.ask_delayed.reset(when) + else: + self.ask_delayed = reactor.callLater(when, self.ask_player) + + def ask_player(self): + if not self.running: + return + if self.player: + self.player.send_get_position() + self.player.send_get_paused() + self.schedule_ask_player() + + def schedule_send_status(self, when=1): + if self.send_delayed and self.send_delayed.active(): + self.send_delayed.reset(when) + else: + self.send_delayed = reactor.callLater(when, self.send_status) + + def send_status(self): + if not self.running: + return + if self.protocol: + self.protocol.send_state(self.player_paused, self.player_position) + self.schedule_send_status() + + + def update_player_position(self, value): + print value + self.player_position = value + + def update_player_paused(self, value): + old = self.player_paused + self.player_paused = value + if old != value and self.global_paused != value: + self.send_status() + + def update_global_state(self, paused, position, name): + self.global_paused = paused + self.global_position = position + diff --git a/syncplay/network_utils.py b/syncplay/network_utils.py index 11d90ce..04252e6 100644 --- a/syncplay/network_utils.py +++ b/syncplay/network_utils.py @@ -1,9 +1,5 @@ #coding:utf8 -import re - -RE_NL_SPLIT = re.compile(r'(?:\r\n|\n|\r)') - from twisted.internet.protocol import ProcessProtocol from twisted.protocols.basic import LineReceiver @@ -55,21 +51,27 @@ class CommandProtocol(LineReceiver): def parse_lines(leftovers, data): data = leftovers+data - lines = RE_NL.split(data) + lines = data.split('\n') leftovers = lines.pop(-1) return leftovers, lines class LineProcessProtocol(ProcessProtocol): - __leftover_out = '' - __leftover_err = '' + _leftover_out = '' + _leftover_err = '' + + def errLineReceived(self, line): + pass + + def outLineReceived(self, line): + pass def outReceived(self, data): - self.__leftover_out, lines = parse_lines(__leftover_out, data) + self._leftover_out, lines = parse_lines(self._leftover_out, data) for line in lines: self.outLineReceived(line) def errReceived(self, data): - self.__leftover_err, lines = parse_lines(__leftover_err, data) + self._leftover_err, lines = parse_lines(self._leftover_err, data) for line in lines: self.errLineReceived(line) diff --git a/syncplay/players/mplayer.py b/syncplay/players/mplayer.py index 72348cd..b196b25 100644 --- a/syncplay/players/mplayer.py +++ b/syncplay/players/mplayer.py @@ -1,37 +1,64 @@ #coding:utf8 import re +import sys from twisted.internet import reactor -from ..network_utils import network_utils +from ..network_utils import LineProcessProtocol +from ..utils import find_exec_path -RE_ANSWER = re.compile('^ANS_([a-zA-Z_])=(.+)$') +RE_ANSWER = re.compile('^ANS_([a-zA-Z_]+)=(.+)$') -class MplayerProtocol(ProcessProtocol): +class MplayerProtocol(LineProcessProtocol): + def __init__(self, manager): + self.manager = manager + + def connectionMade(self): + self.manager.player = self + self.send_set_paused(True) + self.send_set_position(0) + + def processExited(self, reason): + self.manager.player = None + + def errLineReceived(self, line): + sys.stderr.write(line+'\n') + def outLineReceived(self, line): - if not line.starts_with('ANS_'): + if not line.startswith('ANS_'): + sys.stdout.write(line+'\n') return + print line m = RE_ANSWER.match(line) if not m: return - name, value = m.group(1).lower, m.group(2) + name, value = m.group(1).lower(), m.group(2) handler = getattr(self, 'answer_' + name, None) + print value if handler: handler(value) + + def set_property(self, name, value): + self.writeLines('%s %s %s' % ('set_property', name, value)) + + def get_property(self, name): + self.writeLines('%s %s' % ('get_property', name)) + def send_set_paused(self, value): # docs say i can't set "pause" property, but it works... - self.set_property('paused', 'yes' if value else 'no') + self.set_property('pause', 'yes' if value else 'no') def send_get_paused(self): - self.get_property('paused') + self.get_property('pause') def answer_pause(self, value): value = value == 'yes' + self.manager.update_player_paused(value) def send_set_position(self, value): @@ -42,6 +69,7 @@ class MplayerProtocol(ProcessProtocol): def answer_time_pos(self, value): value = float(value) + self.manager.update_player_position(value) def send_set_speed(self, value): @@ -52,7 +80,32 @@ class MplayerProtocol(ProcessProtocol): def answer_speed(self, value): value = float(value) + self.manager.update_player_speed(value) + + + def drop(self): + self.transport.loseConnection() + reactor.callLater(1, self.graceful_kill) + + def gracefull_kill(self): + if self.transport.pid: + self.transport.signalProcess('TERM') + reactor.callLater(2, self.try_kill) + + def kill(self): + if self.transport.pid: + self.transport.signalProcess('KILL') def run_mplayer(manager, mplayer_path, args): - pass + exec_path = find_exec_path(mplayer_path) + print 'Running', exec_path + if not exec_path: + raise Exception('Mplayer executable not found') + + args = list(args) + args.insert(0, mplayer_path) + + process_protocol = MplayerProtocol(manager) + reactor.spawnProcess(process_protocol, exec_path, args=args, env=None) + diff --git a/syncplay/utils.py b/syncplay/utils.py index 3563aa9..40848f8 100644 --- a/syncplay/utils.py +++ b/syncplay/utils.py @@ -1,5 +1,8 @@ #coding:utf8 +import os + + def split_args(args, number): # FIXME Make argument format smarter return args.split(None, number-1) @@ -29,3 +32,11 @@ def parse_state(args): return paused, position, who_changed_state +def find_exec_path(name): + if os.access(name, os.X_OK): + return name + for path in os.environ['PATH'].split(':'): + path = os.path.join(os.path.realpath(path), name) + if os.access(path, os.X_OK): + return path +