Changed the network code to be more precise
This commit is contained in:
parent
471fe60108
commit
d5c3ea7028
@ -224,24 +224,24 @@ class SyncplayClient(object):
|
||||
madeChangeOnPlayer = self._serverPaused(setBy)
|
||||
return madeChangeOnPlayer
|
||||
|
||||
def _executePlaystateHooks(self, position, paused, doSeek, setBy, latency):
|
||||
def _executePlaystateHooks(self, position, paused, doSeek, setBy, messageAge):
|
||||
if(self.userlist.hasRoomStateChanged() and not paused):
|
||||
self._warnings.checkWarnings()
|
||||
self.userlist.roomStateConfirmed()
|
||||
self._malUpdater.playingHook(position, paused)
|
||||
|
||||
def updateGlobalState(self, position, paused, doSeek, setBy, latency):
|
||||
def updateGlobalState(self, position, paused, doSeek, setBy, messageAge):
|
||||
if(self.__getUserlistOnLogon):
|
||||
self.__getUserlistOnLogon = False
|
||||
self.getUserList()
|
||||
madeChangeOnPlayer = False
|
||||
if(not paused):
|
||||
position += latency
|
||||
position += messageAge
|
||||
if(self._player):
|
||||
madeChangeOnPlayer = self._changePlayerStateAccordingToGlobalState(position, paused, doSeek, setBy)
|
||||
if(madeChangeOnPlayer):
|
||||
self.askPlayer()
|
||||
self._executePlaystateHooks(position, paused, doSeek, setBy, latency)
|
||||
self._executePlaystateHooks(position, paused, doSeek, setBy, messageAge)
|
||||
|
||||
def getUserOffset(self):
|
||||
return self._userOffset
|
||||
|
||||
@ -56,6 +56,7 @@ class SyncClientProtocol(JSONCommandProtocol):
|
||||
self.clientIgnoringOnTheFly = 0
|
||||
self.serverIgnoringOnTheFly = 0
|
||||
self.logged = False
|
||||
self._pingService = PingService()
|
||||
|
||||
def connectionMade(self):
|
||||
self._client.initProtocol(self)
|
||||
@ -159,15 +160,18 @@ class SyncClientProtocol(JSONCommandProtocol):
|
||||
return position, paused, doSeek, setBy
|
||||
|
||||
def _handleStatePing(self, state):
|
||||
yourLatency = state["ping"]["yourLatency"] if state["ping"].has_key("yourLatency") else 0
|
||||
senderLatency = state["ping"]["senderLatency"] if state["ping"].has_key("senderLatency") else 0
|
||||
if (state["ping"].has_key("latencyCalculation")):
|
||||
latencyCalculation = state["ping"]["latencyCalculation"]
|
||||
return yourLatency, senderLatency, latencyCalculation
|
||||
if ("clientLatencyCalculation" in state["ping"]):
|
||||
timestamp = state["ping"]["clientLatencyCalculation"]
|
||||
senderRtt = state["ping"]["serverRtt"]
|
||||
self._pingService.receiveMessage(timestamp, senderRtt)
|
||||
messageAge = self._pingService.getLastForwardDelay()
|
||||
return messageAge, latencyCalculation
|
||||
|
||||
def handleState(self, state):
|
||||
position, paused, doSeek, setBy = None, None, None, None
|
||||
yourLatency, senderLatency = 0, 0
|
||||
messageAge = 0
|
||||
if(state.has_key("ignoringOnTheFly")):
|
||||
ignore = state["ignoringOnTheFly"]
|
||||
if(ignore.has_key("server")):
|
||||
@ -179,10 +183,9 @@ class SyncClientProtocol(JSONCommandProtocol):
|
||||
if(state.has_key("playstate")):
|
||||
position, paused, doSeek, setBy = self._extractStatePlaystateArguments(state)
|
||||
if(state.has_key("ping")):
|
||||
yourLatency, senderLatency, latencyCalculation = self._handleStatePing(state)
|
||||
messageAge, latencyCalculation = self._handleStatePing(state)
|
||||
if(position is not None and paused is not None and not self.clientIgnoringOnTheFly):
|
||||
latency = yourLatency + senderLatency
|
||||
self._client.updateGlobalState(position, paused, doSeek, setBy, latency)
|
||||
self._client.updateGlobalState(position, paused, doSeek, setBy, messageAge)
|
||||
position, paused, doSeek, stateChange = self._client.getLocalState()
|
||||
self.sendState(position, paused, doSeek, latencyCalculation, stateChange)
|
||||
|
||||
@ -198,8 +201,11 @@ class SyncClientProtocol(JSONCommandProtocol):
|
||||
state["playstate"]["position"] = position
|
||||
state["playstate"]["paused"] = paused
|
||||
if(doSeek): state["playstate"]["doSeek"] = doSeek
|
||||
state["ping"] = {}
|
||||
if(latencyCalculation):
|
||||
state["ping"] = {"latencyCalculation": latencyCalculation}
|
||||
state["ping"]["latencyCalculation"] = latencyCalculation
|
||||
state["ping"]["clientLatencyCalculation"] = self._pingService.newTimestamp()
|
||||
state["ping"]["clientRtt"] = self._pingService.getRtt()
|
||||
if(stateChange):
|
||||
self.clientIgnoringOnTheFly += 1
|
||||
if(self.serverIgnoringOnTheFly or self.clientIgnoringOnTheFly):
|
||||
@ -224,7 +230,10 @@ class SyncServerProtocol(JSONCommandProtocol):
|
||||
self._logged = False
|
||||
self.clientIgnoringOnTheFly = 0
|
||||
self.serverIgnoringOnTheFly = 0
|
||||
|
||||
self._pingService = PingService()
|
||||
self._clientLatencyCalculation = 0
|
||||
self._clientLatencyCalculationArrivalTime = 0
|
||||
|
||||
def __hash__(self):
|
||||
return hash('|'.join((
|
||||
self.transport.getPeer().host,
|
||||
@ -341,7 +350,11 @@ class SyncServerProtocol(JSONCommandProtocol):
|
||||
def handleList(self, _):
|
||||
self.sendList()
|
||||
|
||||
def sendState(self, position, paused, doSeek, setBy, senderLatency, watcherLatency, forced=False):
|
||||
def sendState(self, position, paused, doSeek, setBy, forced=False):
|
||||
if(self._clientLatencyCalculationArrivalTime):
|
||||
processingTime = time.time() - self._clientLatencyCalculationArrivalTime
|
||||
else:
|
||||
processingTime = 0
|
||||
playstate = {
|
||||
"position": position,
|
||||
"paused": paused,
|
||||
@ -349,10 +362,12 @@ class SyncServerProtocol(JSONCommandProtocol):
|
||||
"setBy": setBy
|
||||
}
|
||||
ping = {
|
||||
"yourLatency": watcherLatency,
|
||||
"senderLatency": senderLatency,
|
||||
"latencyCalculation": time.time()
|
||||
"latencyCalculation": self._pingService.newTimestamp(),
|
||||
"serverRtt": self._pingService.getRtt()
|
||||
}
|
||||
if(self._clientLatencyCalculation):
|
||||
ping["clientLatencyCalculation"] = self._clientLatencyCalculation + processingTime
|
||||
self._clientLatencyCalculation = 0
|
||||
state = {
|
||||
"ping": ping,
|
||||
"playstate": playstate,
|
||||
@ -389,9 +404,13 @@ class SyncServerProtocol(JSONCommandProtocol):
|
||||
if(state.has_key("playstate")):
|
||||
position, paused, doSeek = self._extractStatePlaystateArguments(state)
|
||||
if(state.has_key("ping")):
|
||||
latencyCalculation = state["ping"]["latencyCalculation"] if state["ping"].has_key("latencyCalculation") else None
|
||||
latencyCalculation = state["ping"]["latencyCalculation"] if state["ping"].has_key("latencyCalculation") else 0
|
||||
clientRtt = state["ping"]["clientRtt"] if state["ping"].has_key("clientRtt") else 0
|
||||
self._clientLatencyCalculation = state["ping"]["clientLatencyCalculation"] if state["ping"].has_key("clientLatencyCalculation") else 0
|
||||
self._clientLatencyCalculationArrivalTime = time.time()
|
||||
self._pingService.receiveMessage(latencyCalculation, clientRtt)
|
||||
if(self.serverIgnoringOnTheFly == 0):
|
||||
self._factory.updateWatcherState(self, position, paused, doSeek, latencyCalculation)
|
||||
self._factory.updateWatcherState(self, position, paused, doSeek, self._pingService.getLastForwardDelay())
|
||||
|
||||
def handleHttpRequest(self, request):
|
||||
self.sendLine(self._factory.gethttpRequestReply())
|
||||
@ -416,14 +435,16 @@ class PingService(object):
|
||||
|
||||
def receiveMessage(self, timestamp, senderRtt):
|
||||
prevRtt = self._rtt
|
||||
if(not timestamp):
|
||||
return
|
||||
self._rtt = time.time() - timestamp
|
||||
if(self._t0 == None):
|
||||
if(self._t0 == None and self._rtt > 0 and prevRtt):
|
||||
self._t0 = self._rtt / 2
|
||||
return
|
||||
if(senderRtt <= 0):
|
||||
if(senderRtt <= 0 or not prevRtt):
|
||||
return
|
||||
self._fdDiff = self._fdDiff + (prevRtt - senderRtt)
|
||||
self._fd = self._t0 - self._fdDiff
|
||||
self._fd = abs(self._t0 - self._fdDiff)
|
||||
|
||||
def getLastForwardDelay(self):
|
||||
return self._fd
|
||||
|
||||
@ -155,7 +155,7 @@ class SyncFactory(Factory):
|
||||
else:
|
||||
return getMessage("en", "server-default-http-reply")
|
||||
|
||||
def sendState(self, watcherProtocol, doSeek = False, senderLatency = 0, forcedUpdate = False):
|
||||
def sendState(self, watcherProtocol, doSeek = False, forcedUpdate = False):
|
||||
watcher = self.getWatcher(watcherProtocol)
|
||||
if(not watcher):
|
||||
return
|
||||
@ -164,18 +164,10 @@ class SyncFactory(Factory):
|
||||
setBy = self._roomStates[room]["setBy"]
|
||||
watcher.paused = paused
|
||||
watcher.position = position
|
||||
watcherProtocol.sendState(position, paused, doSeek, setBy, senderLatency, watcher.latency, forcedUpdate)
|
||||
watcherProtocol.sendState(position, paused, doSeek, setBy, forcedUpdate)
|
||||
if(time.time() - watcher.lastUpdate > constants.PROTOCOL_TIMEOUT):
|
||||
watcherProtocol.drop()
|
||||
self.removeWatcher(watcherProtocol)
|
||||
|
||||
def __updateWatcherPing(self, latencyCalculation, watcher):
|
||||
if (latencyCalculation):
|
||||
ping = (time.time() - latencyCalculation) / 2
|
||||
if (watcher.latency):
|
||||
watcher.latency = watcher.latency * (constants.PING_MOVING_AVERAGE_WEIGHT) + ping * (1-constants.PING_MOVING_AVERAGE_WEIGHT) #Exponential moving average
|
||||
else:
|
||||
watcher.latency = ping
|
||||
|
||||
def __shouldServerForceUpdateOnRoom(self, pauseChanged, doSeek):
|
||||
return doSeek or pauseChanged
|
||||
@ -210,9 +202,8 @@ class SyncFactory(Factory):
|
||||
if (doSeek and position):
|
||||
self.ircBot.sp_seek(watcher.name, oldPosition, position, watcher.room)
|
||||
|
||||
def updateWatcherState(self, watcherProtocol, position, paused, doSeek, latencyCalculation):
|
||||
def updateWatcherState(self, watcherProtocol, position, paused, doSeek, messageAge):
|
||||
watcher = self.getWatcher(watcherProtocol)
|
||||
self.__updateWatcherPing(latencyCalculation, watcher)
|
||||
watcher.lastUpdate = time.time()
|
||||
if(watcher.file):
|
||||
oldPosition = self._roomStates[watcher.room]["position"]
|
||||
@ -220,11 +211,13 @@ class SyncFactory(Factory):
|
||||
if(paused is not None):
|
||||
pauseChanged = self.__updatePausedState(paused, watcher)
|
||||
if(position is not None):
|
||||
if(not paused):
|
||||
position += messageAge
|
||||
self.__updatePositionState(position, doSeek or pauseChanged, watcher)
|
||||
forceUpdate = self.__shouldServerForceUpdateOnRoom(pauseChanged, doSeek)
|
||||
if(forceUpdate):
|
||||
self.__notifyIrcBot(position, paused, doSeek, watcher, oldPosition, pauseChanged)
|
||||
l = lambda w: self.sendState(w, doSeek, watcher.latency, forceUpdate)
|
||||
l = lambda w: self.sendState(w, doSeek, forceUpdate)
|
||||
self.broadcastRoom(watcher.watcherProtocol, l)
|
||||
|
||||
def removeWatcher(self, watcherProtocol):
|
||||
@ -299,7 +292,7 @@ class SyncFactory(Factory):
|
||||
self.ircBot.sp_paused("IRC: " + user.name, user.room)
|
||||
elif(not paused):
|
||||
self.ircBot.sp_unpaused("IRC: " + user.name, user.room)
|
||||
l = lambda w: self.sendState(w, False, user.latency, True)
|
||||
l = lambda w: self.sendState(w, False, 0, True)
|
||||
self.broadcastRoom(user.watcherProtocol, l)
|
||||
|
||||
|
||||
@ -320,7 +313,7 @@ class SyncFactory(Factory):
|
||||
self._roomStates[user.room]['paused'] = time
|
||||
self._roomStates[user.room]['setBy'] = "IRC: " + setBy
|
||||
self.ircBot.sp_seek(user.name, oldPosition, time, user.room)
|
||||
l = lambda w: self.sendState(w, True, user.latency, True)
|
||||
l = lambda w: self.sendState(w, True, 0, True)
|
||||
self.broadcastRoom(user.watcherProtocol, l)
|
||||
|
||||
|
||||
@ -368,7 +361,6 @@ class Watcher(object):
|
||||
self.file = None
|
||||
self._sendStateTimer = None
|
||||
self.position = None
|
||||
self.latency = 0
|
||||
self.lastUpdate = time.time()
|
||||
|
||||
def __lt__(self, b):
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user