From 22b426a5ea839488f9e1b2d07a1fb6ee5ae8ffc8 Mon Sep 17 00:00:00 2001 From: Uriziel Date: Sun, 13 Oct 2013 14:21:33 +0200 Subject: [PATCH 1/5] Remmoved prints from protocol classes --- syncplay/protocols.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/syncplay/protocols.py b/syncplay/protocols.py index 171a4ef..14d2a18 100644 --- a/syncplay/protocols.py +++ b/syncplay/protocols.py @@ -23,19 +23,10 @@ class JSONCommandProtocol(LineReceiver): else: self.dropWithError(getMessage("en", "unknown-command-server-error").format(message[1])) #TODO: log, not drop - def printReceived(self, line): #TODO: remove - #print ">>i", line - pass - - def printSent(self, line): - #print "o<<", line - pass - def lineReceived(self, line): line = line.strip() if not line: return - self.printReceived(line) try: messages = json.loads(line) except: @@ -49,7 +40,6 @@ class JSONCommandProtocol(LineReceiver): def sendMessage(self, dict_): line = json.dumps(dict_) - self.printSent(line) self.sendLine(line) def drop(self): From 68486f1e43491e443b08585f9d1dfd782f005d4a Mon Sep 17 00:00:00 2001 From: Uriziel Date: Sun, 13 Oct 2013 18:01:34 +0200 Subject: [PATCH 2/5] Added some extra formating --- syncplay/protocols.py | 106 +++++++++++++++++++++--------------------- 1 file changed, 54 insertions(+), 52 deletions(-) diff --git a/syncplay/protocols.py b/syncplay/protocols.py index 14d2a18..3f058d9 100644 --- a/syncplay/protocols.py +++ b/syncplay/protocols.py @@ -1,4 +1,4 @@ -#coding:utf8 +# coding:utf8 from twisted.protocols.basic import LineReceiver import json import syncplay @@ -6,7 +6,8 @@ from functools import wraps import time from syncplay.messages import getMessage -class JSONCommandProtocol(LineReceiver): + +class JSONCommandProtocol(LineReceiver): def handleMessages(self, messages): for message in messages.iteritems(): command = message[0] @@ -21,7 +22,7 @@ class JSONCommandProtocol(LineReceiver): elif command == "Error": self.handleError(message[1]) else: - self.dropWithError(getMessage("en", "unknown-command-server-error").format(message[1])) #TODO: log, not drop + self.dropWithError(getMessage("en", "unknown-command-server-error").format(message[1])) # TODO: log, not drop def lineReceived(self, line): line = line.strip() @@ -36,8 +37,8 @@ class JSONCommandProtocol(LineReceiver): else: self.dropWithError(getMessage("en", "not-json-server-error").format(line)) return - self.handleMessages(messages) - + self.handleMessages(messages) + def sendMessage(self, dict_): line = json.dumps(dict_) self.sendLine(line) @@ -48,25 +49,26 @@ class JSONCommandProtocol(LineReceiver): def dropWithError(self, error): raise NotImplementedError() + class SyncClientProtocol(JSONCommandProtocol): def __init__(self, client): self._client = client self.clientIgnoringOnTheFly = 0 self.serverIgnoringOnTheFly = 0 self.logged = False - + def connectionMade(self): self._client.initProtocol(self) self.sendHello() def connectionLost(self, reason): self._client.destroyProtocol() - + def dropWithError(self, error): self._client.ui.showErrorMessage(error) self._client.protocolFactory.stopRetrying() self.drop() - + def _extractHelloArguments(self, hello): username = hello["username"] if hello.has_key("username") else None roomName = hello["room"]["name"] if hello.has_key("room") else None @@ -80,7 +82,7 @@ class SyncClientProtocol(JSONCommandProtocol): self.dropWithError(getMessage("en", "hello-server-error").format(hello)) elif(version.split(".")[0:2] != syncplay.version.split(".")[0:2]): self.dropWithError(getMessage("en", "version-mismatch-server-error".format(hello))) - else: + else: self._client.setUsername(username) self._client.setRoom(roomName) self.logged = True @@ -88,22 +90,22 @@ class SyncClientProtocol(JSONCommandProtocol): self._client.ui.showMessage(motd, True, True) self._client.ui.showMessage(getMessage("en", "connected-successful-notification")) self._client.sendFile() - + def sendHello(self): hello = {} - hello["username"] = self._client.getUsername() + hello["username"] = self._client.getUsername() password = self._client.getPassword() if(password): hello["password"] = password room = self._client.getRoom() if(room): hello["room"] = {"name" :room} hello["version"] = syncplay.version self.sendMessage({"Hello": hello}) - + def _SetUser(self, users): for user in users.iteritems(): username = user[0] settings = user[1] - room = settings["room"]["name"] if settings.has_key("room") else None + room = settings["room"]["name"] if settings.has_key("room") else None file_ = settings["file"] if settings.has_key("file") else None if(settings.has_key("event")): if(settings["event"].has_key("joined")): @@ -112,7 +114,7 @@ class SyncClientProtocol(JSONCommandProtocol): self._client.removeUser(username) else: self._client.userlist.modUser(username, room, file_) - + def handleSet(self, settings): for set_ in settings.iteritems(): command = set_[0] @@ -130,7 +132,7 @@ class SyncClientProtocol(JSONCommandProtocol): setting["name"] = roomName if(password): setting["password"] = password self.sendSet({"room": setting}) - + def sendFileSetting(self, file_): self.sendSet({"file": file_}) self.sendList() @@ -145,10 +147,10 @@ class SyncClientProtocol(JSONCommandProtocol): position = user[1]['position'] self._client.userlist.addUser(userName, roomName, file_, position, noMessage=True) self._client.userlist.showUserList() - - def sendList(self): + + def sendList(self): self.sendMessage({"List": None}) - + def _extractStatePlaystateArguments(self, state): position = state["playstate"]["position"] if state["playstate"].has_key("position") else 0 paused = state["playstate"]["paused"] if state["playstate"].has_key("paused") else None @@ -171,7 +173,7 @@ class SyncClientProtocol(JSONCommandProtocol): if(ignore.has_key("server")): self.serverIgnoringOnTheFly = ignore["server"] self.clientIgnoringOnTheFly = 0 - elif(ignore.has_key("client")): + elif(ignore.has_key("client")): if(ignore['client']) == self.clientIgnoringOnTheFly: self.clientIgnoringOnTheFly = 0 if(state.has_key("playstate")): @@ -179,18 +181,18 @@ class SyncClientProtocol(JSONCommandProtocol): if(state.has_key("ping")): yourLatency, senderLatency, latencyCalculation = self._handleStatePing(state) if(position is not None and paused is not None and not self.clientIgnoringOnTheFly): - latency = yourLatency + senderLatency + latency = yourLatency + senderLatency self._client.updateGlobalState(position, paused, doSeek, setBy, latency) position, paused, doSeek, stateChange = self._client.getLocalState() self.sendState(position, paused, doSeek, latencyCalculation, stateChange) def handleHttpRequest(self, request): pass - - def sendState(self, position, paused, doSeek, latencyCalculation, stateChange = False): + + def sendState(self, position, paused, doSeek, latencyCalculation, stateChange=False): state = {} positionAndPausedIsSet = position is not None and paused is not None - clientIgnoreIsNotSet = self.clientIgnoringOnTheFly == 0 or self.serverIgnoringOnTheFly != 0 + clientIgnoreIsNotSet = self.clientIgnoringOnTheFly == 0 or self.serverIgnoringOnTheFly != 0 if(clientIgnoreIsNotSet and positionAndPausedIsSet): state["playstate"] = {} state["playstate"]["position"] = position @@ -199,7 +201,7 @@ class SyncClientProtocol(JSONCommandProtocol): if(latencyCalculation): state["ping"] = {"latencyCalculation": latencyCalculation} if(stateChange): - self.clientIgnoringOnTheFly += 1 + self.clientIgnoringOnTheFly += 1 if(self.serverIgnoringOnTheFly or self.clientIgnoringOnTheFly): state["ignoringOnTheFly"] = {} if(self.serverIgnoringOnTheFly): @@ -208,42 +210,42 @@ class SyncClientProtocol(JSONCommandProtocol): if(self.clientIgnoringOnTheFly): state["ignoringOnTheFly"]["client"] = self.clientIgnoringOnTheFly self.sendMessage({"State": state}) - + def handleError(self, error): - self.dropWithError(error["message"]) #TODO: more processing and fallbacking - + self.dropWithError(error["message"]) # TODO: more processing and fallbacking + def sendError(self, message): self.sendMessage({"Error": {"message": message}}) - - + + class SyncServerProtocol(JSONCommandProtocol): def __init__(self, factory): self._factory = factory self._logged = False self.clientIgnoringOnTheFly = 0 self.serverIgnoringOnTheFly = 0 - + def __hash__(self): return hash('|'.join(( self.transport.getPeer().host, str(id(self)), ))) - - def requireLogged(f): #@NoSelf + + def requireLogged(f): # @NoSelf @wraps(f) def wrapper(self, *args, **kwds): if(not self._logged): self.dropWithError(getMessage("en", "not-known-server-error")) return f(self, *args, **kwds) return wrapper - + def dropWithError(self, error): print getMessage("en", "client-drop-server-error").format(self.transport.getPeer().host, error) self.sendError(error) self.drop() - + def connectionLost(self, reason): - self._factory.removeWatcher(self) + self._factory.removeWatcher(self) def _extractHelloArguments(self, hello): roomName, roomPassword = None, None @@ -267,7 +269,7 @@ class SyncServerProtocol(JSONCommandProtocol): self.dropWithError(getMessage("en", "wrong-password-server-error")) return False return True - + def handleHello(self, hello): username, serverPassword, roomName, roomPassword, version = self._extractHelloArguments(hello) if(not username or not roomName or not version): @@ -291,7 +293,7 @@ class SyncServerProtocol(JSONCommandProtocol): hello["version"] = syncplay.version hello["motd"] = self._factory.getMotd(userIp, username, room) self.sendMessage({"Hello": hello}) - + @requireLogged def handleSet(self, settings): for set_ in settings.iteritems(): @@ -304,10 +306,10 @@ class SyncServerProtocol(JSONCommandProtocol): def sendSet(self, setting): self.sendMessage({"Set": setting}) - + def sendRoomSetting(self, roomName): self.sendSet({"room": {"name": roomName}}) - + def sendUserSetting(self, username, roomName, file_, event): room = {"name": roomName} user = {} @@ -318,7 +320,7 @@ class SyncServerProtocol(JSONCommandProtocol): if(event): user[username]["event"] = event self.sendSet({"user": user}) - + def _addUserOnList(self, userlist, roomPositions, watcher): if (not userlist.has_key(watcher.room)): userlist[watcher.room] = {} @@ -326,7 +328,7 @@ class SyncServerProtocol(JSONCommandProtocol): userlist[watcher.room][watcher.name] = { "file": watcher.file if watcher.file else {}, "position": roomPositions[watcher.room] if roomPositions[watcher.room] else 0 - } + } def sendList(self): userlist = {} roomPositions = {} @@ -334,12 +336,12 @@ class SyncServerProtocol(JSONCommandProtocol): for watcher in watchers.itervalues(): self._addUserOnList(userlist, roomPositions, watcher) self.sendMessage({"List": userlist}) - + @requireLogged def handleList(self, _): self.sendList() - - def sendState(self, position, paused, doSeek, setBy, senderLatency, watcherLatency, forced = False): + + def sendState(self, position, paused, doSeek, setBy, senderLatency, watcherLatency, forced=False): playstate = { "position": position, "paused": paused, @@ -366,8 +368,8 @@ class SyncServerProtocol(JSONCommandProtocol): self.clientIgnoringOnTheFly = 0 if(self.serverIgnoringOnTheFly == 0 or forced): self.sendMessage({"State": state}) - - + + def _extractStatePlaystateArguments(self, state): position = state["playstate"]["position"] if state["playstate"].has_key("position") else 0 paused = state["playstate"]["paused"] if state["playstate"].has_key("paused") else None @@ -383,20 +385,20 @@ class SyncServerProtocol(JSONCommandProtocol): if(self.serverIgnoringOnTheFly == ignore["server"]): self.serverIgnoringOnTheFly = 0 if(ignore.has_key("client")): - self.clientIgnoringOnTheFly = ignore["client"] + self.clientIgnoringOnTheFly = ignore["client"] if(state.has_key("playstate")): position, paused, doSeek = self._extractStatePlaystateArguments(state) if(state.has_key("ping")): latencyCalculation = state["ping"]["latencyCalculation"] if state["ping"].has_key("latencyCalculation") else None if(self.serverIgnoringOnTheFly == 0): self._factory.updateWatcherState(self, position, paused, doSeek, latencyCalculation) - + def handleHttpRequest(self, request): self.sendLine(self._factory.gethttpRequestReply()) - + def handleError(self, error): - self.dropWithError(error["message"]) #TODO: more processing and fallbacking - + self.dropWithError(error["message"]) # TODO: more processing and fallbacking + def sendError(self, message): self.sendMessage({"Error": {"message": message}}) - + From 471fe60108f1b3fa72b7cb8439884f0df57b475e Mon Sep 17 00:00:00 2001 From: Uriziel Date: Sun, 13 Oct 2013 23:32:17 +0200 Subject: [PATCH 3/5] Added ping service --- syncplay/protocols.py | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/syncplay/protocols.py b/syncplay/protocols.py index 3f058d9..f6b01d5 100644 --- a/syncplay/protocols.py +++ b/syncplay/protocols.py @@ -402,3 +402,32 @@ class SyncServerProtocol(JSONCommandProtocol): def sendError(self, message): self.sendMessage({"Error": {"message": message}}) + +class PingService(object): + + def __init__(self): + self._rtt = 0 + self._t0 = None + self._fdDiff = 0 + self._fd = 0 + + def newTimestamp(self): + return time.time() + + def receiveMessage(self, timestamp, senderRtt): + prevRtt = self._rtt + self._rtt = time.time() - timestamp + if(self._t0 == None): + self._t0 = self._rtt / 2 + return + if(senderRtt <= 0): + return + self._fdDiff = self._fdDiff + (prevRtt - senderRtt) + self._fd = self._t0 - self._fdDiff + + def getLastForwardDelay(self): + return self._fd + + def getRtt(self): + return self._rtt + From d5c3ea7028ba312dd1a0d58743cea1218175a16c Mon Sep 17 00:00:00 2001 From: Uriziel Date: Tue, 15 Oct 2013 18:24:27 +0200 Subject: [PATCH 4/5] Changed the network code to be more precise --- syncplay/client.py | 8 +++--- syncplay/protocols.py | 57 +++++++++++++++++++++++++++++-------------- syncplay/server.py | 24 ++++++------------ 3 files changed, 51 insertions(+), 38 deletions(-) diff --git a/syncplay/client.py b/syncplay/client.py index 83907e0..d5894de 100644 --- a/syncplay/client.py +++ b/syncplay/client.py @@ -224,24 +224,24 @@ class SyncplayClient(object): madeChangeOnPlayer = self._serverPaused(setBy) return madeChangeOnPlayer - def _executePlaystateHooks(self, position, paused, doSeek, setBy, latency): + def _executePlaystateHooks(self, position, paused, doSeek, setBy, messageAge): if(self.userlist.hasRoomStateChanged() and not paused): self._warnings.checkWarnings() self.userlist.roomStateConfirmed() self._malUpdater.playingHook(position, paused) - def updateGlobalState(self, position, paused, doSeek, setBy, latency): + def updateGlobalState(self, position, paused, doSeek, setBy, messageAge): if(self.__getUserlistOnLogon): self.__getUserlistOnLogon = False self.getUserList() madeChangeOnPlayer = False if(not paused): - position += latency + position += messageAge if(self._player): madeChangeOnPlayer = self._changePlayerStateAccordingToGlobalState(position, paused, doSeek, setBy) if(madeChangeOnPlayer): self.askPlayer() - self._executePlaystateHooks(position, paused, doSeek, setBy, latency) + self._executePlaystateHooks(position, paused, doSeek, setBy, messageAge) def getUserOffset(self): return self._userOffset diff --git a/syncplay/protocols.py b/syncplay/protocols.py index f6b01d5..cd8eeda 100644 --- a/syncplay/protocols.py +++ b/syncplay/protocols.py @@ -56,6 +56,7 @@ class SyncClientProtocol(JSONCommandProtocol): self.clientIgnoringOnTheFly = 0 self.serverIgnoringOnTheFly = 0 self.logged = False + self._pingService = PingService() def connectionMade(self): self._client.initProtocol(self) @@ -159,15 +160,18 @@ class SyncClientProtocol(JSONCommandProtocol): return position, paused, doSeek, setBy def _handleStatePing(self, state): - yourLatency = state["ping"]["yourLatency"] if state["ping"].has_key("yourLatency") else 0 - senderLatency = state["ping"]["senderLatency"] if state["ping"].has_key("senderLatency") else 0 if (state["ping"].has_key("latencyCalculation")): latencyCalculation = state["ping"]["latencyCalculation"] - return yourLatency, senderLatency, latencyCalculation + if ("clientLatencyCalculation" in state["ping"]): + timestamp = state["ping"]["clientLatencyCalculation"] + senderRtt = state["ping"]["serverRtt"] + self._pingService.receiveMessage(timestamp, senderRtt) + messageAge = self._pingService.getLastForwardDelay() + return messageAge, latencyCalculation def handleState(self, state): position, paused, doSeek, setBy = None, None, None, None - yourLatency, senderLatency = 0, 0 + messageAge = 0 if(state.has_key("ignoringOnTheFly")): ignore = state["ignoringOnTheFly"] if(ignore.has_key("server")): @@ -179,10 +183,9 @@ class SyncClientProtocol(JSONCommandProtocol): if(state.has_key("playstate")): position, paused, doSeek, setBy = self._extractStatePlaystateArguments(state) if(state.has_key("ping")): - yourLatency, senderLatency, latencyCalculation = self._handleStatePing(state) + messageAge, latencyCalculation = self._handleStatePing(state) if(position is not None and paused is not None and not self.clientIgnoringOnTheFly): - latency = yourLatency + senderLatency - self._client.updateGlobalState(position, paused, doSeek, setBy, latency) + self._client.updateGlobalState(position, paused, doSeek, setBy, messageAge) position, paused, doSeek, stateChange = self._client.getLocalState() self.sendState(position, paused, doSeek, latencyCalculation, stateChange) @@ -198,8 +201,11 @@ class SyncClientProtocol(JSONCommandProtocol): state["playstate"]["position"] = position state["playstate"]["paused"] = paused if(doSeek): state["playstate"]["doSeek"] = doSeek + state["ping"] = {} if(latencyCalculation): - state["ping"] = {"latencyCalculation": latencyCalculation} + state["ping"]["latencyCalculation"] = latencyCalculation + state["ping"]["clientLatencyCalculation"] = self._pingService.newTimestamp() + state["ping"]["clientRtt"] = self._pingService.getRtt() if(stateChange): self.clientIgnoringOnTheFly += 1 if(self.serverIgnoringOnTheFly or self.clientIgnoringOnTheFly): @@ -224,7 +230,10 @@ class SyncServerProtocol(JSONCommandProtocol): self._logged = False self.clientIgnoringOnTheFly = 0 self.serverIgnoringOnTheFly = 0 - + self._pingService = PingService() + self._clientLatencyCalculation = 0 + self._clientLatencyCalculationArrivalTime = 0 + def __hash__(self): return hash('|'.join(( self.transport.getPeer().host, @@ -341,7 +350,11 @@ class SyncServerProtocol(JSONCommandProtocol): def handleList(self, _): self.sendList() - def sendState(self, position, paused, doSeek, setBy, senderLatency, watcherLatency, forced=False): + def sendState(self, position, paused, doSeek, setBy, forced=False): + if(self._clientLatencyCalculationArrivalTime): + processingTime = time.time() - self._clientLatencyCalculationArrivalTime + else: + processingTime = 0 playstate = { "position": position, "paused": paused, @@ -349,10 +362,12 @@ class SyncServerProtocol(JSONCommandProtocol): "setBy": setBy } ping = { - "yourLatency": watcherLatency, - "senderLatency": senderLatency, - "latencyCalculation": time.time() + "latencyCalculation": self._pingService.newTimestamp(), + "serverRtt": self._pingService.getRtt() } + if(self._clientLatencyCalculation): + ping["clientLatencyCalculation"] = self._clientLatencyCalculation + processingTime + self._clientLatencyCalculation = 0 state = { "ping": ping, "playstate": playstate, @@ -389,9 +404,13 @@ class SyncServerProtocol(JSONCommandProtocol): if(state.has_key("playstate")): position, paused, doSeek = self._extractStatePlaystateArguments(state) if(state.has_key("ping")): - latencyCalculation = state["ping"]["latencyCalculation"] if state["ping"].has_key("latencyCalculation") else None + latencyCalculation = state["ping"]["latencyCalculation"] if state["ping"].has_key("latencyCalculation") else 0 + clientRtt = state["ping"]["clientRtt"] if state["ping"].has_key("clientRtt") else 0 + self._clientLatencyCalculation = state["ping"]["clientLatencyCalculation"] if state["ping"].has_key("clientLatencyCalculation") else 0 + self._clientLatencyCalculationArrivalTime = time.time() + self._pingService.receiveMessage(latencyCalculation, clientRtt) if(self.serverIgnoringOnTheFly == 0): - self._factory.updateWatcherState(self, position, paused, doSeek, latencyCalculation) + self._factory.updateWatcherState(self, position, paused, doSeek, self._pingService.getLastForwardDelay()) def handleHttpRequest(self, request): self.sendLine(self._factory.gethttpRequestReply()) @@ -416,14 +435,16 @@ class PingService(object): def receiveMessage(self, timestamp, senderRtt): prevRtt = self._rtt + if(not timestamp): + return self._rtt = time.time() - timestamp - if(self._t0 == None): + if(self._t0 == None and self._rtt > 0 and prevRtt): self._t0 = self._rtt / 2 return - if(senderRtt <= 0): + if(senderRtt <= 0 or not prevRtt): return self._fdDiff = self._fdDiff + (prevRtt - senderRtt) - self._fd = self._t0 - self._fdDiff + self._fd = abs(self._t0 - self._fdDiff) def getLastForwardDelay(self): return self._fd diff --git a/syncplay/server.py b/syncplay/server.py index 2583e9b..541f9a1 100644 --- a/syncplay/server.py +++ b/syncplay/server.py @@ -155,7 +155,7 @@ class SyncFactory(Factory): else: return getMessage("en", "server-default-http-reply") - def sendState(self, watcherProtocol, doSeek = False, senderLatency = 0, forcedUpdate = False): + def sendState(self, watcherProtocol, doSeek = False, forcedUpdate = False): watcher = self.getWatcher(watcherProtocol) if(not watcher): return @@ -164,18 +164,10 @@ class SyncFactory(Factory): setBy = self._roomStates[room]["setBy"] watcher.paused = paused watcher.position = position - watcherProtocol.sendState(position, paused, doSeek, setBy, senderLatency, watcher.latency, forcedUpdate) + watcherProtocol.sendState(position, paused, doSeek, setBy, forcedUpdate) if(time.time() - watcher.lastUpdate > constants.PROTOCOL_TIMEOUT): watcherProtocol.drop() self.removeWatcher(watcherProtocol) - - def __updateWatcherPing(self, latencyCalculation, watcher): - if (latencyCalculation): - ping = (time.time() - latencyCalculation) / 2 - if (watcher.latency): - watcher.latency = watcher.latency * (constants.PING_MOVING_AVERAGE_WEIGHT) + ping * (1-constants.PING_MOVING_AVERAGE_WEIGHT) #Exponential moving average - else: - watcher.latency = ping def __shouldServerForceUpdateOnRoom(self, pauseChanged, doSeek): return doSeek or pauseChanged @@ -210,9 +202,8 @@ class SyncFactory(Factory): if (doSeek and position): self.ircBot.sp_seek(watcher.name, oldPosition, position, watcher.room) - def updateWatcherState(self, watcherProtocol, position, paused, doSeek, latencyCalculation): + def updateWatcherState(self, watcherProtocol, position, paused, doSeek, messageAge): watcher = self.getWatcher(watcherProtocol) - self.__updateWatcherPing(latencyCalculation, watcher) watcher.lastUpdate = time.time() if(watcher.file): oldPosition = self._roomStates[watcher.room]["position"] @@ -220,11 +211,13 @@ class SyncFactory(Factory): if(paused is not None): pauseChanged = self.__updatePausedState(paused, watcher) if(position is not None): + if(not paused): + position += messageAge self.__updatePositionState(position, doSeek or pauseChanged, watcher) forceUpdate = self.__shouldServerForceUpdateOnRoom(pauseChanged, doSeek) if(forceUpdate): self.__notifyIrcBot(position, paused, doSeek, watcher, oldPosition, pauseChanged) - l = lambda w: self.sendState(w, doSeek, watcher.latency, forceUpdate) + l = lambda w: self.sendState(w, doSeek, forceUpdate) self.broadcastRoom(watcher.watcherProtocol, l) def removeWatcher(self, watcherProtocol): @@ -299,7 +292,7 @@ class SyncFactory(Factory): self.ircBot.sp_paused("IRC: " + user.name, user.room) elif(not paused): self.ircBot.sp_unpaused("IRC: " + user.name, user.room) - l = lambda w: self.sendState(w, False, user.latency, True) + l = lambda w: self.sendState(w, False, 0, True) self.broadcastRoom(user.watcherProtocol, l) @@ -320,7 +313,7 @@ class SyncFactory(Factory): self._roomStates[user.room]['paused'] = time self._roomStates[user.room]['setBy'] = "IRC: " + setBy self.ircBot.sp_seek(user.name, oldPosition, time, user.room) - l = lambda w: self.sendState(w, True, user.latency, True) + l = lambda w: self.sendState(w, True, 0, True) self.broadcastRoom(user.watcherProtocol, l) @@ -368,7 +361,6 @@ class Watcher(object): self.file = None self._sendStateTimer = None self.position = None - self.latency = 0 self.lastUpdate = time.time() def __lt__(self, b): From 45b56ea92489441f591cb27be5c1cbaf9f542446 Mon Sep 17 00:00:00 2001 From: Uriziel Date: Thu, 7 Nov 2013 13:21:25 +0100 Subject: [PATCH 5/5] Improved ping service --- syncplay/protocols.py | 20 ++++++++++---------- syncplay/server.py | 4 ++++ 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/syncplay/protocols.py b/syncplay/protocols.py index cd8eeda..c8efaf5 100644 --- a/syncplay/protocols.py +++ b/syncplay/protocols.py @@ -5,6 +5,7 @@ import syncplay from functools import wraps import time from syncplay.messages import getMessage +from syncplay.constants import PING_MOVING_AVERAGE_WEIGHT class JSONCommandProtocol(LineReceiver): @@ -426,29 +427,28 @@ class PingService(object): def __init__(self): self._rtt = 0 - self._t0 = None - self._fdDiff = 0 self._fd = 0 + self._avrRtt = 0 def newTimestamp(self): return time.time() def receiveMessage(self, timestamp, senderRtt): - prevRtt = self._rtt if(not timestamp): return self._rtt = time.time() - timestamp - if(self._t0 == None and self._rtt > 0 and prevRtt): - self._t0 = self._rtt / 2 + if(self._rtt < 0 or senderRtt < 0): return - if(senderRtt <= 0 or not prevRtt): - return - self._fdDiff = self._fdDiff + (prevRtt - senderRtt) - self._fd = abs(self._t0 - self._fdDiff) + if(not self._avrRtt): + self._avrRtt = self._rtt + self._avrRtt = self._avrRtt * PING_MOVING_AVERAGE_WEIGHT + self._rtt * (1 - PING_MOVING_AVERAGE_WEIGHT) + if(senderRtt < self._rtt): + self._fd = self._avrRtt/2 + (self._rtt - senderRtt) + else: + self._fd = self._avrRtt/2 def getLastForwardDelay(self): return self._fd def getRtt(self): return self._rtt - diff --git a/syncplay/server.py b/syncplay/server.py index 541f9a1..8df7ba4 100644 --- a/syncplay/server.py +++ b/syncplay/server.py @@ -204,6 +204,8 @@ class SyncFactory(Factory): def updateWatcherState(self, watcherProtocol, position, paused, doSeek, messageAge): watcher = self.getWatcher(watcherProtocol) + if(not watcher): + return watcher.lastUpdate = time.time() if(watcher.file): oldPosition = self._roomStates[watcher.room]["position"] @@ -255,6 +257,8 @@ class SyncFactory(Factory): def watcherSetFile(self, watcherProtocol, file_): watcher = self.getWatcher(watcherProtocol) + if(not watcher): + return watcher.file = file_ l = lambda w: w.sendUserSetting(watcher.name, watcher.room, watcher.file, None) self.broadcast(watcherProtocol, l)