Merge branch 'spikeDrops'

This commit is contained in:
Uriziel 2013-11-07 14:20:54 +01:00
commit a919cfe49a
3 changed files with 131 additions and 93 deletions

View File

@ -224,24 +224,24 @@ class SyncplayClient(object):
madeChangeOnPlayer = self._serverPaused(setBy) madeChangeOnPlayer = self._serverPaused(setBy)
return madeChangeOnPlayer return madeChangeOnPlayer
def _executePlaystateHooks(self, position, paused, doSeek, setBy, latency): def _executePlaystateHooks(self, position, paused, doSeek, setBy, messageAge):
if(self.userlist.hasRoomStateChanged() and not paused): if(self.userlist.hasRoomStateChanged() and not paused):
self._warnings.checkWarnings() self._warnings.checkWarnings()
self.userlist.roomStateConfirmed() self.userlist.roomStateConfirmed()
self._malUpdater.playingHook(position, paused) self._malUpdater.playingHook(position, paused)
def updateGlobalState(self, position, paused, doSeek, setBy, latency): def updateGlobalState(self, position, paused, doSeek, setBy, messageAge):
if(self.__getUserlistOnLogon): if(self.__getUserlistOnLogon):
self.__getUserlistOnLogon = False self.__getUserlistOnLogon = False
self.getUserList() self.getUserList()
madeChangeOnPlayer = False madeChangeOnPlayer = False
if(not paused): if(not paused):
position += latency position += messageAge
if(self._player): if(self._player):
madeChangeOnPlayer = self._changePlayerStateAccordingToGlobalState(position, paused, doSeek, setBy) madeChangeOnPlayer = self._changePlayerStateAccordingToGlobalState(position, paused, doSeek, setBy)
if(madeChangeOnPlayer): if(madeChangeOnPlayer):
self.askPlayer() self.askPlayer()
self._executePlaystateHooks(position, paused, doSeek, setBy, latency) self._executePlaystateHooks(position, paused, doSeek, setBy, messageAge)
def getUserOffset(self): def getUserOffset(self):
return self._userOffset return self._userOffset

View File

@ -5,6 +5,8 @@ import syncplay
from functools import wraps from functools import wraps
import time import time
from syncplay.messages import getMessage from syncplay.messages import getMessage
from syncplay.constants import PING_MOVING_AVERAGE_WEIGHT
class JSONCommandProtocol(LineReceiver): class JSONCommandProtocol(LineReceiver):
def handleMessages(self, messages): def handleMessages(self, messages):
@ -23,19 +25,10 @@ class JSONCommandProtocol(LineReceiver):
else: else:
self.dropWithError(getMessage("en", "unknown-command-server-error").format(message[1])) # TODO: log, not drop self.dropWithError(getMessage("en", "unknown-command-server-error").format(message[1])) # TODO: log, not drop
def printReceived(self, line): #TODO: remove
#print ">>i", line
pass
def printSent(self, line):
#print "o<<", line
pass
def lineReceived(self, line): def lineReceived(self, line):
line = line.strip() line = line.strip()
if not line: if not line:
return return
self.printReceived(line)
try: try:
messages = json.loads(line) messages = json.loads(line)
except: except:
@ -49,7 +42,6 @@ class JSONCommandProtocol(LineReceiver):
def sendMessage(self, dict_): def sendMessage(self, dict_):
line = json.dumps(dict_) line = json.dumps(dict_)
self.printSent(line)
self.sendLine(line) self.sendLine(line)
def drop(self): def drop(self):
@ -58,12 +50,14 @@ class JSONCommandProtocol(LineReceiver):
def dropWithError(self, error): def dropWithError(self, error):
raise NotImplementedError() raise NotImplementedError()
class SyncClientProtocol(JSONCommandProtocol): class SyncClientProtocol(JSONCommandProtocol):
def __init__(self, client): def __init__(self, client):
self._client = client self._client = client
self.clientIgnoringOnTheFly = 0 self.clientIgnoringOnTheFly = 0
self.serverIgnoringOnTheFly = 0 self.serverIgnoringOnTheFly = 0
self.logged = False self.logged = False
self._pingService = PingService()
def connectionMade(self): def connectionMade(self):
self._client.initProtocol(self) self._client.initProtocol(self)
@ -167,15 +161,18 @@ class SyncClientProtocol(JSONCommandProtocol):
return position, paused, doSeek, setBy return position, paused, doSeek, setBy
def _handleStatePing(self, state): def _handleStatePing(self, state):
yourLatency = state["ping"]["yourLatency"] if state["ping"].has_key("yourLatency") else 0
senderLatency = state["ping"]["senderLatency"] if state["ping"].has_key("senderLatency") else 0
if (state["ping"].has_key("latencyCalculation")): if (state["ping"].has_key("latencyCalculation")):
latencyCalculation = state["ping"]["latencyCalculation"] latencyCalculation = state["ping"]["latencyCalculation"]
return yourLatency, senderLatency, latencyCalculation if ("clientLatencyCalculation" in state["ping"]):
timestamp = state["ping"]["clientLatencyCalculation"]
senderRtt = state["ping"]["serverRtt"]
self._pingService.receiveMessage(timestamp, senderRtt)
messageAge = self._pingService.getLastForwardDelay()
return messageAge, latencyCalculation
def handleState(self, state): def handleState(self, state):
position, paused, doSeek, setBy = None, None, None, None position, paused, doSeek, setBy = None, None, None, None
yourLatency, senderLatency = 0, 0 messageAge = 0
if(state.has_key("ignoringOnTheFly")): if(state.has_key("ignoringOnTheFly")):
ignore = state["ignoringOnTheFly"] ignore = state["ignoringOnTheFly"]
if(ignore.has_key("server")): if(ignore.has_key("server")):
@ -187,10 +184,9 @@ class SyncClientProtocol(JSONCommandProtocol):
if(state.has_key("playstate")): if(state.has_key("playstate")):
position, paused, doSeek, setBy = self._extractStatePlaystateArguments(state) position, paused, doSeek, setBy = self._extractStatePlaystateArguments(state)
if(state.has_key("ping")): if(state.has_key("ping")):
yourLatency, senderLatency, latencyCalculation = self._handleStatePing(state) messageAge, latencyCalculation = self._handleStatePing(state)
if(position is not None and paused is not None and not self.clientIgnoringOnTheFly): if(position is not None and paused is not None and not self.clientIgnoringOnTheFly):
latency = yourLatency + senderLatency self._client.updateGlobalState(position, paused, doSeek, setBy, messageAge)
self._client.updateGlobalState(position, paused, doSeek, setBy, latency)
position, paused, doSeek, stateChange = self._client.getLocalState() position, paused, doSeek, stateChange = self._client.getLocalState()
self.sendState(position, paused, doSeek, latencyCalculation, stateChange) self.sendState(position, paused, doSeek, latencyCalculation, stateChange)
@ -206,8 +202,11 @@ class SyncClientProtocol(JSONCommandProtocol):
state["playstate"]["position"] = position state["playstate"]["position"] = position
state["playstate"]["paused"] = paused state["playstate"]["paused"] = paused
if(doSeek): state["playstate"]["doSeek"] = doSeek if(doSeek): state["playstate"]["doSeek"] = doSeek
state["ping"] = {}
if(latencyCalculation): if(latencyCalculation):
state["ping"] = {"latencyCalculation": latencyCalculation} state["ping"]["latencyCalculation"] = latencyCalculation
state["ping"]["clientLatencyCalculation"] = self._pingService.newTimestamp()
state["ping"]["clientRtt"] = self._pingService.getRtt()
if(stateChange): if(stateChange):
self.clientIgnoringOnTheFly += 1 self.clientIgnoringOnTheFly += 1
if(self.serverIgnoringOnTheFly or self.clientIgnoringOnTheFly): if(self.serverIgnoringOnTheFly or self.clientIgnoringOnTheFly):
@ -232,6 +231,9 @@ class SyncServerProtocol(JSONCommandProtocol):
self._logged = False self._logged = False
self.clientIgnoringOnTheFly = 0 self.clientIgnoringOnTheFly = 0
self.serverIgnoringOnTheFly = 0 self.serverIgnoringOnTheFly = 0
self._pingService = PingService()
self._clientLatencyCalculation = 0
self._clientLatencyCalculationArrivalTime = 0
def __hash__(self): def __hash__(self):
return hash('|'.join(( return hash('|'.join((
@ -349,7 +351,11 @@ class SyncServerProtocol(JSONCommandProtocol):
def handleList(self, _): def handleList(self, _):
self.sendList() self.sendList()
def sendState(self, position, paused, doSeek, setBy, senderLatency, watcherLatency, forced = False): def sendState(self, position, paused, doSeek, setBy, forced=False):
if(self._clientLatencyCalculationArrivalTime):
processingTime = time.time() - self._clientLatencyCalculationArrivalTime
else:
processingTime = 0
playstate = { playstate = {
"position": position, "position": position,
"paused": paused, "paused": paused,
@ -357,10 +363,12 @@ class SyncServerProtocol(JSONCommandProtocol):
"setBy": setBy "setBy": setBy
} }
ping = { ping = {
"yourLatency": watcherLatency, "latencyCalculation": self._pingService.newTimestamp(),
"senderLatency": senderLatency, "serverRtt": self._pingService.getRtt()
"latencyCalculation": time.time()
} }
if(self._clientLatencyCalculation):
ping["clientLatencyCalculation"] = self._clientLatencyCalculation + processingTime
self._clientLatencyCalculation = 0
state = { state = {
"ping": ping, "ping": ping,
"playstate": playstate, "playstate": playstate,
@ -397,9 +405,13 @@ class SyncServerProtocol(JSONCommandProtocol):
if(state.has_key("playstate")): if(state.has_key("playstate")):
position, paused, doSeek = self._extractStatePlaystateArguments(state) position, paused, doSeek = self._extractStatePlaystateArguments(state)
if(state.has_key("ping")): if(state.has_key("ping")):
latencyCalculation = state["ping"]["latencyCalculation"] if state["ping"].has_key("latencyCalculation") else None latencyCalculation = state["ping"]["latencyCalculation"] if state["ping"].has_key("latencyCalculation") else 0
clientRtt = state["ping"]["clientRtt"] if state["ping"].has_key("clientRtt") else 0
self._clientLatencyCalculation = state["ping"]["clientLatencyCalculation"] if state["ping"].has_key("clientLatencyCalculation") else 0
self._clientLatencyCalculationArrivalTime = time.time()
self._pingService.receiveMessage(latencyCalculation, clientRtt)
if(self.serverIgnoringOnTheFly == 0): if(self.serverIgnoringOnTheFly == 0):
self._factory.updateWatcherState(self, position, paused, doSeek, latencyCalculation) self._factory.updateWatcherState(self, position, paused, doSeek, self._pingService.getLastForwardDelay())
def handleHttpRequest(self, request): def handleHttpRequest(self, request):
self.sendLine(self._factory.gethttpRequestReply()) self.sendLine(self._factory.gethttpRequestReply())
@ -410,3 +422,33 @@ class SyncServerProtocol(JSONCommandProtocol):
def sendError(self, message): def sendError(self, message):
self.sendMessage({"Error": {"message": message}}) self.sendMessage({"Error": {"message": message}})
class PingService(object):
def __init__(self):
self._rtt = 0
self._fd = 0
self._avrRtt = 0
def newTimestamp(self):
return time.time()
def receiveMessage(self, timestamp, senderRtt):
if(not timestamp):
return
self._rtt = time.time() - timestamp
if(self._rtt < 0 or senderRtt < 0):
return
if(not self._avrRtt):
self._avrRtt = self._rtt
self._avrRtt = self._avrRtt * PING_MOVING_AVERAGE_WEIGHT + self._rtt * (1 - PING_MOVING_AVERAGE_WEIGHT)
if(senderRtt < self._rtt):
self._fd = self._avrRtt/2 + (self._rtt - senderRtt)
else:
self._fd = self._avrRtt/2
def getLastForwardDelay(self):
return self._fd
def getRtt(self):
return self._rtt

View File

@ -155,7 +155,7 @@ class SyncFactory(Factory):
else: else:
return getMessage("en", "server-default-http-reply") return getMessage("en", "server-default-http-reply")
def sendState(self, watcherProtocol, doSeek = False, senderLatency = 0, forcedUpdate = False): def sendState(self, watcherProtocol, doSeek = False, forcedUpdate = False):
watcher = self.getWatcher(watcherProtocol) watcher = self.getWatcher(watcherProtocol)
if(not watcher): if(not watcher):
return return
@ -164,19 +164,11 @@ class SyncFactory(Factory):
setBy = self._roomStates[room]["setBy"] setBy = self._roomStates[room]["setBy"]
watcher.paused = paused watcher.paused = paused
watcher.position = position watcher.position = position
watcherProtocol.sendState(position, paused, doSeek, setBy, senderLatency, watcher.latency, forcedUpdate) watcherProtocol.sendState(position, paused, doSeek, setBy, forcedUpdate)
if(time.time() - watcher.lastUpdate > constants.PROTOCOL_TIMEOUT): if(time.time() - watcher.lastUpdate > constants.PROTOCOL_TIMEOUT):
watcherProtocol.drop() watcherProtocol.drop()
self.removeWatcher(watcherProtocol) self.removeWatcher(watcherProtocol)
def __updateWatcherPing(self, latencyCalculation, watcher):
if (latencyCalculation):
ping = (time.time() - latencyCalculation) / 2
if (watcher.latency):
watcher.latency = watcher.latency * (constants.PING_MOVING_AVERAGE_WEIGHT) + ping * (1-constants.PING_MOVING_AVERAGE_WEIGHT) #Exponential moving average
else:
watcher.latency = ping
def __shouldServerForceUpdateOnRoom(self, pauseChanged, doSeek): def __shouldServerForceUpdateOnRoom(self, pauseChanged, doSeek):
return doSeek or pauseChanged return doSeek or pauseChanged
@ -210,9 +202,10 @@ class SyncFactory(Factory):
if (doSeek and position): if (doSeek and position):
self.ircBot.sp_seek(watcher.name, oldPosition, position, watcher.room) self.ircBot.sp_seek(watcher.name, oldPosition, position, watcher.room)
def updateWatcherState(self, watcherProtocol, position, paused, doSeek, latencyCalculation): def updateWatcherState(self, watcherProtocol, position, paused, doSeek, messageAge):
watcher = self.getWatcher(watcherProtocol) watcher = self.getWatcher(watcherProtocol)
self.__updateWatcherPing(latencyCalculation, watcher) if(not watcher):
return
watcher.lastUpdate = time.time() watcher.lastUpdate = time.time()
if(watcher.file): if(watcher.file):
oldPosition = self._roomStates[watcher.room]["position"] oldPosition = self._roomStates[watcher.room]["position"]
@ -220,11 +213,13 @@ class SyncFactory(Factory):
if(paused is not None): if(paused is not None):
pauseChanged = self.__updatePausedState(paused, watcher) pauseChanged = self.__updatePausedState(paused, watcher)
if(position is not None): if(position is not None):
if(not paused):
position += messageAge
self.__updatePositionState(position, doSeek or pauseChanged, watcher) self.__updatePositionState(position, doSeek or pauseChanged, watcher)
forceUpdate = self.__shouldServerForceUpdateOnRoom(pauseChanged, doSeek) forceUpdate = self.__shouldServerForceUpdateOnRoom(pauseChanged, doSeek)
if(forceUpdate): if(forceUpdate):
self.__notifyIrcBot(position, paused, doSeek, watcher, oldPosition, pauseChanged) self.__notifyIrcBot(position, paused, doSeek, watcher, oldPosition, pauseChanged)
l = lambda w: self.sendState(w, doSeek, watcher.latency, forceUpdate) l = lambda w: self.sendState(w, doSeek, forceUpdate)
self.broadcastRoom(watcher.watcherProtocol, l) self.broadcastRoom(watcher.watcherProtocol, l)
def removeWatcher(self, watcherProtocol): def removeWatcher(self, watcherProtocol):
@ -262,6 +257,8 @@ class SyncFactory(Factory):
def watcherSetFile(self, watcherProtocol, file_): def watcherSetFile(self, watcherProtocol, file_):
watcher = self.getWatcher(watcherProtocol) watcher = self.getWatcher(watcherProtocol)
if(not watcher):
return
watcher.file = file_ watcher.file = file_
l = lambda w: w.sendUserSetting(watcher.name, watcher.room, watcher.file, None) l = lambda w: w.sendUserSetting(watcher.name, watcher.room, watcher.file, None)
self.broadcast(watcherProtocol, l) self.broadcast(watcherProtocol, l)
@ -299,7 +296,7 @@ class SyncFactory(Factory):
self.ircBot.sp_paused("IRC: " + user.name, user.room) self.ircBot.sp_paused("IRC: " + user.name, user.room)
elif(not paused): elif(not paused):
self.ircBot.sp_unpaused("IRC: " + user.name, user.room) self.ircBot.sp_unpaused("IRC: " + user.name, user.room)
l = lambda w: self.sendState(w, False, user.latency, True) l = lambda w: self.sendState(w, False, 0, True)
self.broadcastRoom(user.watcherProtocol, l) self.broadcastRoom(user.watcherProtocol, l)
@ -320,7 +317,7 @@ class SyncFactory(Factory):
self._roomStates[user.room]['paused'] = time self._roomStates[user.room]['paused'] = time
self._roomStates[user.room]['setBy'] = "IRC: " + setBy self._roomStates[user.room]['setBy'] = "IRC: " + setBy
self.ircBot.sp_seek(user.name, oldPosition, time, user.room) self.ircBot.sp_seek(user.name, oldPosition, time, user.room)
l = lambda w: self.sendState(w, True, user.latency, True) l = lambda w: self.sendState(w, True, 0, True)
self.broadcastRoom(user.watcherProtocol, l) self.broadcastRoom(user.watcherProtocol, l)
@ -368,7 +365,6 @@ class Watcher(object):
self.file = None self.file = None
self._sendStateTimer = None self._sendStateTimer = None
self.position = None self.position = None
self.latency = 0
self.lastUpdate = time.time() self.lastUpdate = time.time()
def __lt__(self, b): def __lt__(self, b):