From d5c3ea7028ba312dd1a0d58743cea1218175a16c Mon Sep 17 00:00:00 2001 From: Uriziel Date: Tue, 15 Oct 2013 18:24:27 +0200 Subject: [PATCH] Changed the network code to be more precise --- syncplay/client.py | 8 +++--- syncplay/protocols.py | 57 +++++++++++++++++++++++++++++-------------- syncplay/server.py | 24 ++++++------------ 3 files changed, 51 insertions(+), 38 deletions(-) diff --git a/syncplay/client.py b/syncplay/client.py index 83907e0..d5894de 100644 --- a/syncplay/client.py +++ b/syncplay/client.py @@ -224,24 +224,24 @@ class SyncplayClient(object): madeChangeOnPlayer = self._serverPaused(setBy) return madeChangeOnPlayer - def _executePlaystateHooks(self, position, paused, doSeek, setBy, latency): + def _executePlaystateHooks(self, position, paused, doSeek, setBy, messageAge): if(self.userlist.hasRoomStateChanged() and not paused): self._warnings.checkWarnings() self.userlist.roomStateConfirmed() self._malUpdater.playingHook(position, paused) - def updateGlobalState(self, position, paused, doSeek, setBy, latency): + def updateGlobalState(self, position, paused, doSeek, setBy, messageAge): if(self.__getUserlistOnLogon): self.__getUserlistOnLogon = False self.getUserList() madeChangeOnPlayer = False if(not paused): - position += latency + position += messageAge if(self._player): madeChangeOnPlayer = self._changePlayerStateAccordingToGlobalState(position, paused, doSeek, setBy) if(madeChangeOnPlayer): self.askPlayer() - self._executePlaystateHooks(position, paused, doSeek, setBy, latency) + self._executePlaystateHooks(position, paused, doSeek, setBy, messageAge) def getUserOffset(self): return self._userOffset diff --git a/syncplay/protocols.py b/syncplay/protocols.py index f6b01d5..cd8eeda 100644 --- a/syncplay/protocols.py +++ b/syncplay/protocols.py @@ -56,6 +56,7 @@ class SyncClientProtocol(JSONCommandProtocol): self.clientIgnoringOnTheFly = 0 self.serverIgnoringOnTheFly = 0 self.logged = False + self._pingService = PingService() def connectionMade(self): self._client.initProtocol(self) @@ -159,15 +160,18 @@ class SyncClientProtocol(JSONCommandProtocol): return position, paused, doSeek, setBy def _handleStatePing(self, state): - yourLatency = state["ping"]["yourLatency"] if state["ping"].has_key("yourLatency") else 0 - senderLatency = state["ping"]["senderLatency"] if state["ping"].has_key("senderLatency") else 0 if (state["ping"].has_key("latencyCalculation")): latencyCalculation = state["ping"]["latencyCalculation"] - return yourLatency, senderLatency, latencyCalculation + if ("clientLatencyCalculation" in state["ping"]): + timestamp = state["ping"]["clientLatencyCalculation"] + senderRtt = state["ping"]["serverRtt"] + self._pingService.receiveMessage(timestamp, senderRtt) + messageAge = self._pingService.getLastForwardDelay() + return messageAge, latencyCalculation def handleState(self, state): position, paused, doSeek, setBy = None, None, None, None - yourLatency, senderLatency = 0, 0 + messageAge = 0 if(state.has_key("ignoringOnTheFly")): ignore = state["ignoringOnTheFly"] if(ignore.has_key("server")): @@ -179,10 +183,9 @@ class SyncClientProtocol(JSONCommandProtocol): if(state.has_key("playstate")): position, paused, doSeek, setBy = self._extractStatePlaystateArguments(state) if(state.has_key("ping")): - yourLatency, senderLatency, latencyCalculation = self._handleStatePing(state) + messageAge, latencyCalculation = self._handleStatePing(state) if(position is not None and paused is not None and not self.clientIgnoringOnTheFly): - latency = yourLatency + senderLatency - self._client.updateGlobalState(position, paused, doSeek, setBy, latency) + self._client.updateGlobalState(position, paused, doSeek, setBy, messageAge) position, paused, doSeek, stateChange = self._client.getLocalState() self.sendState(position, paused, doSeek, latencyCalculation, stateChange) @@ -198,8 +201,11 @@ class SyncClientProtocol(JSONCommandProtocol): state["playstate"]["position"] = position state["playstate"]["paused"] = paused if(doSeek): state["playstate"]["doSeek"] = doSeek + state["ping"] = {} if(latencyCalculation): - state["ping"] = {"latencyCalculation": latencyCalculation} + state["ping"]["latencyCalculation"] = latencyCalculation + state["ping"]["clientLatencyCalculation"] = self._pingService.newTimestamp() + state["ping"]["clientRtt"] = self._pingService.getRtt() if(stateChange): self.clientIgnoringOnTheFly += 1 if(self.serverIgnoringOnTheFly or self.clientIgnoringOnTheFly): @@ -224,7 +230,10 @@ class SyncServerProtocol(JSONCommandProtocol): self._logged = False self.clientIgnoringOnTheFly = 0 self.serverIgnoringOnTheFly = 0 - + self._pingService = PingService() + self._clientLatencyCalculation = 0 + self._clientLatencyCalculationArrivalTime = 0 + def __hash__(self): return hash('|'.join(( self.transport.getPeer().host, @@ -341,7 +350,11 @@ class SyncServerProtocol(JSONCommandProtocol): def handleList(self, _): self.sendList() - def sendState(self, position, paused, doSeek, setBy, senderLatency, watcherLatency, forced=False): + def sendState(self, position, paused, doSeek, setBy, forced=False): + if(self._clientLatencyCalculationArrivalTime): + processingTime = time.time() - self._clientLatencyCalculationArrivalTime + else: + processingTime = 0 playstate = { "position": position, "paused": paused, @@ -349,10 +362,12 @@ class SyncServerProtocol(JSONCommandProtocol): "setBy": setBy } ping = { - "yourLatency": watcherLatency, - "senderLatency": senderLatency, - "latencyCalculation": time.time() + "latencyCalculation": self._pingService.newTimestamp(), + "serverRtt": self._pingService.getRtt() } + if(self._clientLatencyCalculation): + ping["clientLatencyCalculation"] = self._clientLatencyCalculation + processingTime + self._clientLatencyCalculation = 0 state = { "ping": ping, "playstate": playstate, @@ -389,9 +404,13 @@ class SyncServerProtocol(JSONCommandProtocol): if(state.has_key("playstate")): position, paused, doSeek = self._extractStatePlaystateArguments(state) if(state.has_key("ping")): - latencyCalculation = state["ping"]["latencyCalculation"] if state["ping"].has_key("latencyCalculation") else None + latencyCalculation = state["ping"]["latencyCalculation"] if state["ping"].has_key("latencyCalculation") else 0 + clientRtt = state["ping"]["clientRtt"] if state["ping"].has_key("clientRtt") else 0 + self._clientLatencyCalculation = state["ping"]["clientLatencyCalculation"] if state["ping"].has_key("clientLatencyCalculation") else 0 + self._clientLatencyCalculationArrivalTime = time.time() + self._pingService.receiveMessage(latencyCalculation, clientRtt) if(self.serverIgnoringOnTheFly == 0): - self._factory.updateWatcherState(self, position, paused, doSeek, latencyCalculation) + self._factory.updateWatcherState(self, position, paused, doSeek, self._pingService.getLastForwardDelay()) def handleHttpRequest(self, request): self.sendLine(self._factory.gethttpRequestReply()) @@ -416,14 +435,16 @@ class PingService(object): def receiveMessage(self, timestamp, senderRtt): prevRtt = self._rtt + if(not timestamp): + return self._rtt = time.time() - timestamp - if(self._t0 == None): + if(self._t0 == None and self._rtt > 0 and prevRtt): self._t0 = self._rtt / 2 return - if(senderRtt <= 0): + if(senderRtt <= 0 or not prevRtt): return self._fdDiff = self._fdDiff + (prevRtt - senderRtt) - self._fd = self._t0 - self._fdDiff + self._fd = abs(self._t0 - self._fdDiff) def getLastForwardDelay(self): return self._fd diff --git a/syncplay/server.py b/syncplay/server.py index 2583e9b..541f9a1 100644 --- a/syncplay/server.py +++ b/syncplay/server.py @@ -155,7 +155,7 @@ class SyncFactory(Factory): else: return getMessage("en", "server-default-http-reply") - def sendState(self, watcherProtocol, doSeek = False, senderLatency = 0, forcedUpdate = False): + def sendState(self, watcherProtocol, doSeek = False, forcedUpdate = False): watcher = self.getWatcher(watcherProtocol) if(not watcher): return @@ -164,18 +164,10 @@ class SyncFactory(Factory): setBy = self._roomStates[room]["setBy"] watcher.paused = paused watcher.position = position - watcherProtocol.sendState(position, paused, doSeek, setBy, senderLatency, watcher.latency, forcedUpdate) + watcherProtocol.sendState(position, paused, doSeek, setBy, forcedUpdate) if(time.time() - watcher.lastUpdate > constants.PROTOCOL_TIMEOUT): watcherProtocol.drop() self.removeWatcher(watcherProtocol) - - def __updateWatcherPing(self, latencyCalculation, watcher): - if (latencyCalculation): - ping = (time.time() - latencyCalculation) / 2 - if (watcher.latency): - watcher.latency = watcher.latency * (constants.PING_MOVING_AVERAGE_WEIGHT) + ping * (1-constants.PING_MOVING_AVERAGE_WEIGHT) #Exponential moving average - else: - watcher.latency = ping def __shouldServerForceUpdateOnRoom(self, pauseChanged, doSeek): return doSeek or pauseChanged @@ -210,9 +202,8 @@ class SyncFactory(Factory): if (doSeek and position): self.ircBot.sp_seek(watcher.name, oldPosition, position, watcher.room) - def updateWatcherState(self, watcherProtocol, position, paused, doSeek, latencyCalculation): + def updateWatcherState(self, watcherProtocol, position, paused, doSeek, messageAge): watcher = self.getWatcher(watcherProtocol) - self.__updateWatcherPing(latencyCalculation, watcher) watcher.lastUpdate = time.time() if(watcher.file): oldPosition = self._roomStates[watcher.room]["position"] @@ -220,11 +211,13 @@ class SyncFactory(Factory): if(paused is not None): pauseChanged = self.__updatePausedState(paused, watcher) if(position is not None): + if(not paused): + position += messageAge self.__updatePositionState(position, doSeek or pauseChanged, watcher) forceUpdate = self.__shouldServerForceUpdateOnRoom(pauseChanged, doSeek) if(forceUpdate): self.__notifyIrcBot(position, paused, doSeek, watcher, oldPosition, pauseChanged) - l = lambda w: self.sendState(w, doSeek, watcher.latency, forceUpdate) + l = lambda w: self.sendState(w, doSeek, forceUpdate) self.broadcastRoom(watcher.watcherProtocol, l) def removeWatcher(self, watcherProtocol): @@ -299,7 +292,7 @@ class SyncFactory(Factory): self.ircBot.sp_paused("IRC: " + user.name, user.room) elif(not paused): self.ircBot.sp_unpaused("IRC: " + user.name, user.room) - l = lambda w: self.sendState(w, False, user.latency, True) + l = lambda w: self.sendState(w, False, 0, True) self.broadcastRoom(user.watcherProtocol, l) @@ -320,7 +313,7 @@ class SyncFactory(Factory): self._roomStates[user.room]['paused'] = time self._roomStates[user.room]['setBy'] = "IRC: " + setBy self.ircBot.sp_seek(user.name, oldPosition, time, user.room) - l = lambda w: self.sendState(w, True, user.latency, True) + l = lambda w: self.sendState(w, True, 0, True) self.broadcastRoom(user.watcherProtocol, l) @@ -368,7 +361,6 @@ class Watcher(object): self.file = None self._sendStateTimer = None self.position = None - self.latency = 0 self.lastUpdate = time.time() def __lt__(self, b):