#! /usr/bin/env python3 # -*- coding: utf-8 -*- """ Copyright 2020, Nils Hilbricht, Germany ( https://www.hilbricht.net ) The Non-Session-Manager by Jonathan Moore Liles : http://non.tuxfamily.org/nsm/ New Session Manager, by LinuxAudio.org: https://github.com/linuxaudio/new-session-manager With help from code fragments from https://github.com/attwad/python-osc ( DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE v2 ) This file is part of the Laborejo Software Suite ( https://www.laborejo.org ). This application is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . """ import logging; logger = logging.getLogger(__name__); logger.info("import") #Standard Library import struct import socket from os import getenv #to get NSM env var from shutil import rmtree as shutilrmtree from shutil import copytree as shutilcopytree from urllib.parse import urlparse #to convert NSM env var import subprocess import atexit import pathlib import json from uuid import uuid4 from datetime import datetime from sys import exit as sysexit def nothing(*args, **kwargs): pass class _IncomingMessage(object): """Representation of a parsed datagram representing an OSC message. An OSC message consists of an OSC Address Pattern followed by an OSC Type Tag String followed by zero or more OSC Arguments. """ def __init__(self, dgram): #NSM Broadcasts are bundles, but very simple ones. We only need to care about the single message it contains. #Therefore we can strip the bundle prefix and handle it as normal message. if b"#bundle" in dgram: bundlePrefix, singleMessage = dgram.split(b"/", maxsplit=1) dgram = b"/" + singleMessage # / eaten by split self.LENGTH = 4 #32 bit self._dgram = dgram self._parameters = [] self.parse_datagram() def get_int(self, dgram, start_index): """Get a 32-bit big-endian two's complement integer from the datagram. Args: dgram: A datagram packet. start_index: An index where the integer starts in the datagram. Returns: A tuple containing the integer and the new end index. Raises: ValueError if the datagram could not be parsed. """ try: if len(dgram[start_index:]) < self.LENGTH: raise ValueError('Datagram is too short') return ( struct.unpack('>i', dgram[start_index:start_index + self.LENGTH])[0], start_index + self.LENGTH) except (struct.error, TypeError) as e: raise ValueError('Could not parse datagram %s' % e) def get_string(self, dgram, start_index): """Get a python string from the datagram, starting at pos start_index. We receive always the full string, but handle only the part from the start_index internally. In the end return the offset so it can be added to the index for the next parameter. Each subsequent call handles less of the same string, starting further to the right. According to the specifications, a string is: "A sequence of non-null ASCII characters followed by a null, followed by 0-3 additional null characters to make the total number of bits a multiple of 32". Args: dgram: A datagram packet. start_index: An index where the string starts in the datagram. Returns: A tuple containing the string and the new end index. Raises: ValueError if the datagram could not be parsed. """ #First test for empty string, which is nothing, followed by a terminating \x00 padded by three additional \x00. if dgram[start_index:].startswith(b"\x00\x00\x00\x00"): return "", start_index + 4 #Otherwise we have a non-empty string that must follow the rules of the docstring. offset = 0 try: while dgram[start_index + offset] != 0: offset += 1 if offset == 0: raise ValueError('OSC string cannot begin with a null byte: %s' % dgram[start_index:]) # Align to a byte word. if (offset) % self.LENGTH == 0: offset += self.LENGTH else: offset += (-offset % self.LENGTH) # Python slices do not raise an IndexError past the last index, # do it ourselves. if offset > len(dgram[start_index:]): raise ValueError('Datagram is too short') data_str = dgram[start_index:start_index + offset] return data_str.replace(b'\x00', b'').decode('utf-8'), start_index + offset except IndexError as ie: raise ValueError('Could not parse datagram %s' % ie) except TypeError as te: raise ValueError('Could not parse datagram %s' % te) def get_float(self, dgram, start_index): """Get a 32-bit big-endian IEEE 754 floating point number from the datagram. Args: dgram: A datagram packet. start_index: An index where the float starts in the datagram. Returns: A tuple containing the float and the new end index. Raises: ValueError if the datagram could not be parsed. """ try: return (struct.unpack('>f', dgram[start_index:start_index + self.LENGTH])[0], start_index + self.LENGTH) except (struct.error, TypeError) as e: raise ValueError('Could not parse datagram %s' % e) def parse_datagram(self): try: self._address_regexp, index = self.get_string(self._dgram, 0) if not self._dgram[index:]: # No params is legit, just return now. return # Get the parameters types. type_tag, index = self.get_string(self._dgram, index) if type_tag.startswith(','): type_tag = type_tag[1:] # Parse each parameter given its type. for param in type_tag: if param == "i": # Integer. val, index = self.get_int(self._dgram, index) elif param == "f": # Float. val, index = self.get_float(self._dgram, index) elif param == "s": # String. val, index = self.get_string(self._dgram, index) else: logger.warning("Unhandled parameter type: {0}".format(param)) continue self._parameters.append(val) except ValueError as pe: #raise ValueError('Found incorrect datagram, ignoring it', pe) # Raising an error is not ignoring it! logger.warning("Found incorrect datagram, ignoring it. {}".format(pe)) @property def oscpath(self): """Returns the OSC address regular expression.""" return self._address_regexp @staticmethod def dgram_is_message(dgram): """Returns whether this datagram starts as an OSC message.""" return dgram.startswith(b'/') @property def size(self): """Returns the length of the datagram for this message.""" return len(self._dgram) @property def dgram(self): """Returns the datagram from which this message was built.""" return self._dgram @property def params(self): """Convenience method for list(self) to get the list of parameters.""" return list(self) def __iter__(self): """Returns an iterator over the parameters of this message.""" return iter(self._parameters) class _OutgoingMessage(object): def __init__(self, oscpath): self.LENGTH = 4 #32 bit self.oscpath = oscpath self._args = [] def write_string(self, val): dgram = val.encode('utf-8') diff = self.LENGTH - (len(dgram) % self.LENGTH) dgram += (b'\x00' * diff) return dgram def write_int(self, val): return struct.pack('>i', val) def write_float(self, val): return struct.pack('>f', val) def add_arg(self, argument): t = {str:"s", int:"i", float:"f"}[type(argument)] self._args.append((t, argument)) def build(self): dgram = b'' #OSC Path dgram += self.write_string(self.oscpath) if not self._args: dgram += self.write_string(',') return dgram # Write the parameters. arg_types = "".join([arg[0] for arg in self._args]) dgram += self.write_string(',' + arg_types) for arg_type, value in self._args: f = {"s":self.write_string, "i":self.write_int, "f":self.write_float}[arg_type] dgram += f(value) return dgram class NsmServerControl(object): """ The ServerControl can be started in three modes, regarding nsmd. We expect that starting our own nsmd will be the majority of cases. SessionRoot parameter is only honored if we start nsmd ourselves. Ascending lookup priority: 1) Default is to start our own nsmd. A single-instance watcher will prevent multiple programs on the same system. 2) When $NSM_URL is found as environment we will connect to that nsmd. 3) When hostname and portnumber are given explicitely as instance variables we will first test if a server is running at that URL, if not we will start our own with these parameters. This is not only a pure implemenation of the protocol. It is extended by us reacting to and storing incoming data. This data can be interpreted and enhanced by looking at the session dir ourselves. However, we don't do anything that is not possible by the original nsmd + human interaction. 100% Compatibility is the highest priority. The big problems are the async nature of communication, message come out of order or interleaved, and nsm is not consistent in its usage of osc-paths. For example it starts listing sessions with /nsm/gui/server/message, but sends the content with /reply [/nsm/server/list, nsmSessionName] and then ends it with /nsm/server/list [0, Done] (no reply!). So three message types, three callbacks for one logically connected process. To update our internal session information we therefore need to split the functionality into severall seemingly unconnected callbacks and you need to know how the protocol works to actually know the order of operations. Switch logging to info to learn more. We have a mix between NSM callbacks and our own functions. Most important there is a watchdog that looks at the session directory and creates its own callbacks if something changes. A typical operation, say sessionDelete or sessionCopy looks like this: * Ask (blocking) nsmd for a current list of sessions, update our internal state * Perform a file operation, like copy or delete or lift a lock * Let our watchdog discover the changes in the file system and trigger another (non-blocking) request for a list of sessions to adjust our internal state to reality. Granted, we could just call our blocking query again at the end, but we would still need to let the watchdog run for operations that the user does with a filemanager, which would end up in redundant calls. Bottom line: _updateSessionListBlocking is called at the beginning of a function, but not at the end. Docs: http://non.tuxfamily.org/nsm/ http://non.tuxfamily.org/wiki/Non%20Session%20Manager http://non.tuxfamily.org/wiki/ApplicationsSupportingNsm http://non.tuxfamily.org/nsm/API.html """ def __init__(self, sessionOpenReadyHook, sessionOpenLoadingHook, sessionClosedHook, clientStatusHook, singleInstanceActivateWindowHook, dataClientNamesHook, dataClientDescriptionHook, parameterNsmOSCUrl=None, sessionRoot=None, startupSession=None, useCallbacks=True): """If useCallbacks is False you will see every message in the log. This is just a development mode to see all messages, unfiltered. Normally we have special hook functions that save and interpret data, so they don't show in the logs""" #Deactivate hooks for now. During init no hooks may be called, #but some functions want to do that already. We setup the true hooks at the end of init self.sessionOpenReadyHook= self.sessionOpenLoadingHook= self.sessionClosedHook= self.clientStatusHook= self.singleInstanceActivateWindowHook= self.dataClientNamesHook= self.dataClientDescriptionHook= nothing self._queue = list() #Incoming OSC messages are buffered here. #Status variables that are set by our callbacks self.internalState = { "sessions" : set(), #nsmSessionNames:str . We use set for unqiue, just in case. But we also clear that on /nsm/gui/server/message ['Listing sessions'] to handle deleted sessions "currentSession" : None, "port" : None, #Our GUI port "serverPort" : None, #nsmd port "nsmUrl" : None, #the environment variable "clients" : {}, #clientId:dict see self._initializeEmptyClient . Gets replaced with a new dict instance on session changes. "broadcasts" : [], #in the order they appeared "datastorage" : None, #URL, if present in the session } self.dataStorage = None #Becomes DataStorage() every time a datastorage client does a broadcast announce. self._addToNextSession = [] #A list of executables in PATH. Filled by new, waits for reply that session is created and then will send clientNew and clear the list. if useCallbacks: self.callbacks = { "/nsm/gui/session/name" : self._reactCallback_activeSessionChanged, #"/nsm/gui/session/root" #Session root is an active blocking call in init "/nsm/gui/client/label" : self._reactCallback_ClientLabelChanged, "/nsm/gui/client/new" : self._reactCallback_ClientNew, "/nsm/gui/session/session" : self._reactCallback_SessionSession, "/nsm/gui/client/status" : self._reactCallback_statusChanged, #handles multiple status keywords "/reply" : self._reactCallback_reply, #handles multiple replies "/error" : self._reactCallback_error, "/nsm/gui/client/has_optional_gui" : self._reactCallback_clientHasOptionalGui, "/nsm/gui/client/gui_visible" : self._reactCallback_clientGuiVisible, "/nsm/gui/client/pid" : self._reactCallback_clientPid, "/nsm/gui/client/dirty" : self._reactCallback_clientDirty, "/nsm/gui/server/message" : self._reactCallback_serverMessage, "/nsm/gui/gui_announce" : self._reactCallback_guiAnnounce, #we rarely receive that, especially not in init. "/nsm/server/list" : self._reactCallback_serverList, "/nsm/server/broadcast" : self._reactCallback_broadcast, } else: #This is just a development mode to see all messages, unfiltered self.callbacks = set() #empty set is easiest to check #Networking and Init for our control part, not for the server self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #internet, udp self.sock.bind(('', 0)) #pick a free port on localhost. self.sock.setblocking(False) self.internalState["port"] = self.sock.getsockname()[1] #only happens once, ports don't change during runtime #self.sock.close() Do not close, this runs until the end of the program ###Testing of existing servers, starting and connecting #First handle the NSM URL, or generate on. #self.nsmOSCUrl must be a tuple compatible to the result of urlparse. (hostname, port) self.singleInstanceSocket = None if parameterNsmOSCUrl: o = urlparse(parameterNsmOSCUrl) self.nsmOSCUrl = (o.hostname, o.port) else: envResult = self._getNsmOSCUrlFromEnvironment() if envResult: self.nsmOSCUrl = envResult else: #This is the default case. User just starts the GUI. The other modes are concious decisions to either start with URL as parameter or in an NSM environment. #But now we need to test if the user accidentaly opened a second GUI, which would start a second server. self._setupAndTestForSingleInstance() #This might quit the whole program and we will never see the next line. self.nsmOSCUrl = self._generateFreeNsmOSCUrl() assert self.nsmOSCUrl self.internalState["serverPort"] = self.nsmOSCUrl[1] #only happens once, ports don't change during runtime self.internalState["nsmUrl"] = f"osc.udp://{self.nsmOSCUrl[0]}:{self.nsmOSCUrl[1]}/" #only happens once, ports don't change during runtime #Then check if a server is running there. If not start one. self.ourOwnServer = None #Might become a subprocess handle if self._isNsmdRunning(self.nsmOSCUrl): #serverport = self.nsmOSCUrl[1] #No further action required. GUI announce below this testing. pass else: self._startNsmdOurselves(sessionRoot, startupSession) #Session root can be a commandline parameter we forward to the server if we start it ourselves. startupSession is an autoloader. Both are usually None. assert type(self.ourOwnServer) is subprocess.Popen, (self.ourOwnServer, type(self.ourOwnServer)) #Wait for the server, or test if it is reacting. self._waitForPingResponseBlocking() logger.info("nsmd is ready @ {}".format(self.nsmOSCUrl)) #Tell nsmd that we are a GUI and want to receive general messages async, not only after we request something self.sessionRoot = self._initial_announce() #Triggers "hi" and session root self.internalState["sessionRoot"] = self.sessionRoot self._forceProcessOnceToEmptyQueue() #process any leftover messages. atexit.register(self.quit) #mostly does stuff when we started nsmd ourself #Activate hooks for api callbacks, now that we are finished here. #Otherwise the hooks will get called from our functions (e.g. new client) while we are still during init self.sessionOpenReadyHook = sessionOpenReadyHook #self.sessionAsDict(nsmSessionName) as parameter self.sessionOpenLoadingHook = sessionOpenLoadingHook #self.sessionAsDict(nsmSessionName) as parameter self.sessionClosedHook = sessionClosedHook #no parameter. This is also "choose a session" mode self.clientStatusHook = clientStatusHook #all client status is done via this single hook. GUIs need to check if they already know the client or not. self.dataClientNamesHook = dataClientNamesHook self.dataClientDescriptionHook = dataClientDescriptionHook self.singleInstanceActivateWindowHook = singleInstanceActivateWindowHook #added to self.processSingleInstance() to listen for a message from another wannabe-instance self._receiverActive = True logger.info("nsmservercontrol init is complete. Ready for event loop") #Now an external event loop can add self.process #Internal Methods def _setupAndTestForSingleInstance(self): """on program startup trigger this if there is already another instance of us running. This socket is only """ self.singleInstanceSocket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) logger.info("Testing if another non-specific Agordejo is running.") try: ## Create an abstract socket, by prefixing it with null. # this relies on a feature only in linux, when current process quits, the # socket will be deleted. self.singleInstanceSocket.bind('\0' + "agordejo") self.singleInstanceSocket.listen(1) self.singleInstanceSocket.setblocking(False) logger.info("No other non-specific Agordejo found. Starting GUI") #Continue in self.processSingleInstance() return True except socket.error: logger.error("GUI for this nsmd server already running. Informing the existing application to show itself.") self.singleInstanceSocket.connect('\0' + "agordejo") self.singleInstanceSocket.send("agordejoactivate".encode()); self.singleInstanceSocket.close() sysexit(1) #triggers atexit #print ("not executed") return False def processSingleInstance(self): """Tests our unix socket for an incoming signal. if received forward to the engine->gui Can be added to a slower event loop, so it is not in self.process""" if self.singleInstanceSocket: try: connection, client_address = self.singleInstanceSocket.accept() #This blocks and waits for a message incoming = connection.recv(1024) if incoming and incoming == b"agordejoactivate": self.singleInstanceActivateWindowHook() except BlockingIOError: #happens while no data is received. Has nothing to do with blocking or not. In fact: this happens when in non-blocking mode. pass except socket.timeout: pass def _setPause(self, state:bool): """Set both the socket and the thread into waiting mode or not. With this we can wait for answers until we resume async operation""" if state: self.sock.setblocking(True) #explicitly wait. self.sock.settimeout(0.5) self._receiverActive = False logger.info("Suspending receiving async mode.") else: self.sock.setblocking(False) self._receiverActive = True logger.info("Resuming receiving async mode.") def _forceProcessOnceToEmptyQueue(self): """Sometimes we want to make sure everything is processed until we continue. For example in our init. Initial usecase was connecting to a running nsmd with session. The api first callback to export sessions to the GUI was freezing because listSession was chocking on leftover messages from gui_announce to a running session, which sends the session name and a list of clients. The latter is not happening when starting the server ourselves, so we weren't expecting this. To be honest, this is really a patch to work around a design flaw and we hope this is a one-off corner case.""" logger.info("Force processing queue") #First gather all osc messages still in the pipe while True: try: data, addr = self.sock.recvfrom(1024) msg = _IncomingMessage(data) if msg: self._queue.append(msg) except BlockingIOError: #happens while no data is received. Has nothing to do with blocking or not. break except socket.timeout: break #Now process them all. This is different than normal self.process(). for msg in self._queue: if msg.oscpath in self.callbacks: self.callbacks[msg.oscpath](msg.params) else: logger.warning(f"Unhandled message with path {msg.oscpath} and parameters {msg.params}") self._queue.clear() logger.info("Ended force processing queue") def process(self): """Use this in an external event loop""" if self._receiverActive: while True: try: data, addr = self.sock.recvfrom(1024) msg = _IncomingMessage(data) if msg: self._queue.append(msg) except BlockingIOError: #happens while no data is received. Has nothing to do with blocking or not. break except socket.timeout: break for msg in self._queue: if msg.oscpath in self.callbacks: self.callbacks[msg.oscpath](msg.params) else: logger.warning(f"Unhandled message with path {msg.oscpath} and parameters {msg.params}") self._queue.clear() def _getNsmOSCUrlFromEnvironment(self): """Return the nsm osc url or None""" nsmOSCUrl = getenv("NSM_URL") if not nsmOSCUrl: return None else: #osc.udp://hostname:portnumber/ o = urlparse(nsmOSCUrl) return o.hostname, o.port def _generateFreeNsmOSCUrl(self): #Instead of reading out the NSM port we get a free port ourselves and set up nsmd with that tempServerSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #internet, udp tempServerSock.bind(('', 0)) #pick a free port on localhost. address, tempServerSockPort = tempServerSock.getsockname() tempServerSock.close() #We need to close it because nsmd will open it right away. nsmOSCUrl = ("0.0.0.0", tempServerSockPort) #compatible to result of urlparse logger.info("Generated our own free NSM_URL to start a server @ {}".format(nsmOSCUrl)) return nsmOSCUrl def _isNsmdRunning(self, nsmOSCUrl): """Test if the port is open or not""" logger.info(f"Testing if a server is running @ {nsmOSCUrl}") hostname, port = nsmOSCUrl tempServerSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #internet, udp try: tempServerSock.bind((hostname, port)) logger.info(f"No external nsmd found (we tested if port is closed) @ {nsmOSCUrl}") return False except: logger.info(f"External nsmd found (we tested if port is closed) @ {nsmOSCUrl}") return True finally: tempServerSock.close() def _startNsmdOurselves(self, sessionRoot:str, startupSession:str): assert self.nsmOSCUrl hostname, port = self.nsmOSCUrl arguments = ["nsmd","--osc-port", str(port)] if sessionRoot: arguments += ["--session-root", sessionRoot] if startupSession: logger.info(f"Got start-session as command line parameter. Fowarding to nsmd command line: {startupSession}") arguments += ["--load-session", startupSession] #nsmd allows all executables in $PATH. For technical reasons our GUI extends this PATH before we start the server. #This is a convenience service for fellow developers, that does not belong in the server control. #However, if you wonder why there are are more applications from unknown PATHs check qtgui/settings.py self.ourOwnServer = subprocess.Popen(arguments) def _blockingRequest(self, path:str, arguments:list, answerPath:str, answerArguments:list, repeat=False)->list: """During start-up we need to wait for replies. Also some operations only make sense if we got data back. This is an abstraction that deals with messages that may come out-of-order and keeps them for later, but at least prevents our side from sending messages out-of-order itself. Default is: send once, wait for answer. repeat=True sends multiple times until an answer arrives. Returns list of arguments, can be empty. """ assert not self._queue, [(m.oscpath, m.params) for m in self._queue] logger.info(f"[wait for answer]: Sending {path}: {arguments}") self._setPause(True) out_msg = _OutgoingMessage(path) for arg in arguments: out_msg.add_arg(arg) if not repeat: self.sock.sendto(out_msg.build(), self.nsmOSCUrl) #Wait for answer ready = False while not ready: if repeat: #we need to send multiple times. self.sock.sendto(out_msg.build(), self.nsmOSCUrl) try: data, addr = self.sock.recvfrom(1024) msg = _IncomingMessage(data) if answerArguments and msg.oscpath == answerPath and msg.params == answerArguments: result = msg.params logger.info(f"[wait from {path}] Received {answerPath}: {result}") ready = True elif msg.oscpath == answerPath: result = msg.params logger.info(f"[wait from {path}] Received {answerPath}: {result}") ready = True else: logger.warning(f"Waiting for {answerPath} from nsmd, but got: {msg.oscpath} with {msg.params}. Adding to queue for later.") self._queue.append(msg) except BlockingIOError: #happens while no data is received. Has nothing to do with blocking or not. continue except socket.timeout: continue self._setPause(False) return result def _waitForPingResponseBlocking(self): self._blockingRequest(path="/osc/ping", arguments=[], answerPath="/reply", answerArguments=["/osc/ping",], repeat=True) def _initial_announce(self)->pathlib.Path: """nsm/gui/gui_announce triggers a multi-stage reply. First we get "hi", then we get the session root. We wait for session root and then clean 'hi' from the queue. When we connect to a running nsmd we also receive /nsm/gui/session/name with the current session (or empty string for no current). If in a session we will receive a list of clients which ends the gui_announce stage. Returns session root as pathlib-path.""" resultArguments = self._blockingRequest(path="/nsm/gui/gui_announce", arguments=[], answerPath="/nsm/gui/session/root", answerArguments=[]) if len(self._queue) == 1 and self._queue[0].oscpath == "/nsm/gui/gui_announce" and self._queue[0].params == ["hi"]: logger.info("Got 'hi'. We are now the registered nsmd GUI as per our initial /nsm/gui/gui_announce") self._queue.clear() #this is safe because we tested above that there is exactly the hi message in the queue. else: logging.error(f"For ValueError below: {[(m.oscpath, m.params) for m in self._queue]}") raise ValueError("We were expecting a clean _queue with only 'hi' as leftover, but instead there were unhandled messages. see print above. Better abort than a wrong program state") #all ok return pathlib.Path(resultArguments[0]) #General Commands def send(self, arg): """ Intended for a text input / command line interface. Sends anything to nsmd, separated by semicolon. First part is the message address, the rest are string-parameters.""" args = arg.split() msg = _OutgoingMessage(args[0]) for p in args[1:]: msg.add_arg(p) self.sock.sendto(msg.build(), self.nsmOSCUrl) def gui_announce(self): """This is just the announce without any answer. This is a last-resort method if another GUI "stole" our slot. For our own initial announce we use self._initial_announce()""" msg = _OutgoingMessage("/nsm/gui/gui_announce") self.sock.sendto(msg.build(), self.nsmOSCUrl) def ping(self): msg = _OutgoingMessage("/osc/ping") self.sock.sendto(msg.build(), self.nsmOSCUrl) def list(self): msg = _OutgoingMessage("/nsm/server/list") self.sock.sendto(msg.build(), self.nsmOSCUrl) def _updateSessionListBlocking(self): """To ensure correct data on session operations we manage ourselves, like copy, rename and delete. Ask nsmd for projects in session root and update our internal state. This will return None without doing anything when we are already in a session. This will wait for an answer and block all other operations. First is /nsm/gui/server/message ['Listing sessions'] Then session names come one reply at a time such as /reply ['/nsm/server/list', 'test3'] Finally /nsm/server/list [0, 'Done.'] , not a reply """ #In the past we only regenerated if we are not in a session. However, that was overzealous. #Some GUI functions did not work. Better regenerate that list as often as we want. logger.info("Requesting project list from session server in blocking mode") self._setPause(True) msg = _OutgoingMessage("/nsm/server/list") self.sock.sendto(msg.build(), self.nsmOSCUrl) #Wait for /reply ready = False while True: try: data, addr = self.sock.recvfrom(1024) except socket.timeout: continue msg = _IncomingMessage(data) if not ready and msg.oscpath == "/nsm/gui/server/message" and msg.params == ["Listing sessions"]: self.internalState["sessions"].clear() # new clients are added as /reply /nsm/server/list callbacks ready = True else: if len(msg.params) != 2: logger.warning(f"Expected project but got path {msg.oscpath} with {msg.params}. Adding to queue for later.") self._queue.append(msg) continue #This is what we want: elif msg.oscpath == "/reply" and msg.params[0] == "/nsm/server/list": #/reply ['/nsm/server/list', 'test3'] for a real session or #/reply ['/nsm/server/list', ''] as "list ended" marker if msg.params[1]: self.internalState["sessions"].add(msg.params[1]) logger.info(f"Received session name: {msg.params[1]}") else: #empty string break elif msg.params[0] == 0 and msg.params[1] == "Done.": # legacy nsmd sent the wrong message. Fixed in new-session-manager june 2020 break else: logger.warning(f"Expected project but got path {msg.oscpath} with {msg.params}. Adding to queue for later.") self._queue.append(msg) continue self._setPause(False) def quit(self): """Called through atexit. Thanks to start.py sys exception hook this will also trigger on PyQt crash""" if self.ourOwnServer: msg = _OutgoingMessage("/nsm/server/quit") self.sock.sendto(msg.build(), self.nsmOSCUrl) returncode = self.ourOwnServer.wait() logger.info("Stopped our own server with return code {}".format(returncode)) def broadcast(self, path:str, arguments:list): """/nsm/server/broadcast s:path [arguments...] http://non.tuxfamily.org/nsm/API.html 1.2.7.1 /nsm/server/broadcast s:path [arguments...] /nsm/server/broadcast /tempomap/update "0,120,4/4:12351234,240,4/4" All clients except the sender recive: /tempomap/update "0,120,4/4:12351234,240,4/4" """ logger.info(f"Sending broadcast with path {path} and args {arguments}") message = _OutgoingMessage("/nsm/server/broadcast") message.add_arg(path) for arg in arguments: message.add_arg(arg) #type autodetect self.sock.sendto(message.build(), self.nsmOSCUrl) #Primarily Without Session def open(self, nsmSessionName:str): if nsmSessionName in self.internalState["sessions"]: msg = _OutgoingMessage("/nsm/server/open") msg.add_arg(nsmSessionName) #s:project_name self.sock.sendto(msg.build(), self.nsmOSCUrl) else: logger.warning(f"Session {nsmSessionName} not found. Not forwarding to nsmd.") def new(self, newName:str, startClients:list=[])->str: """Saves the current session and creates a new session. Only works if dir does not exist yet. """ basePath = pathlib.Path(self.sessionRoot, newName) if basePath.exists(): return None self._addToNextSession = startClients msg = _OutgoingMessage("/nsm/server/new") msg.add_arg(newName) #s:project_name self.sock.sendto(msg.build(), self.nsmOSCUrl) #Only with Current Session def save(self): msg = _OutgoingMessage("/nsm/server/save") self.sock.sendto(msg.build(), self.nsmOSCUrl) def close(self, blocking=False): if not blocking: msg = _OutgoingMessage("/nsm/server/close") self.sock.sendto(msg.build(), self.nsmOSCUrl) else: msg = _OutgoingMessage("/nsm/server/close") self.sock.sendto(msg.build(), self.nsmOSCUrl) #Drive the process loop ourselves. This will still trigger updates but the mainloop will wait. while self.internalState["currentSession"]: self.process() def abort(self, blocking=False): if not blocking: msg = _OutgoingMessage("/nsm/server/abort") self.sock.sendto(msg.build(), self.nsmOSCUrl) else: msg = _OutgoingMessage("/nsm/server/abort") self.sock.sendto(msg.build(), self.nsmOSCUrl) #Drive the process loop ourselves. This will still trigger updates but the mainloop will wait. while self.internalState["currentSession"]: self.process() def duplicate(self, newName:str)->str: """Saves the current session and creates a new session. Requires an open session and uses nsmd to do the work. If you want to do copy of any session use our owns self.sessionCopy""" msg = _OutgoingMessage("/nsm/server/duplicate") msg.add_arg(newName) #s:project_name self.sock.sendto(msg.build(), self.nsmOSCUrl) #Client Commands for Loaded Session def clientAdd(self, executableName:str): """Adds a client to the current session. executable must be in $PATH. We do not trust NSM to perform the right checks. It will add an empty path or wrong path. """ if not pathlib.Path(executableName).name == executableName: logger.warning(f"{executableName} must be just an executable file in your $PATH. We expected: {pathlib.Path(executableName).name} . We will not ask nsmd to add it as client") return False allPaths = getenv("PATH") assert allPaths, allPaths binaryPaths = allPaths.split(":") #TODO: There is a corner case that NSMD runs in a different $PATH environment. executableInPath = any(pathlib.Path(bp, executableName).is_file() for bp in binaryPaths) if executableInPath: msg = _OutgoingMessage("/nsm/server/add") msg.add_arg(executableName) #s:executable_name self.sock.sendto(msg.build(), self.nsmOSCUrl) return True else: logger.warning("Executable {} not found. We will not ask nsmd to add it as client".format(executableName)) return False def clientStop(self, clientId:str): msg = _OutgoingMessage("/nsm/gui/client/stop") msg.add_arg(clientId) #s:clientId self.sock.sendto(msg.build(), self.nsmOSCUrl) def clientResume(self, clientId:str): """Opposite of clientStop""" msg = _OutgoingMessage("/nsm/gui/client/resume") msg.add_arg(clientId) #s:clientId self.sock.sendto(msg.build(), self.nsmOSCUrl) def clientRemove(self, clientId:str): """Client needs to be stopped already. We will do that and wait for an answer. Remove from the session. Will not delete the save-files, but make them inaccesible. There is never a point in nsmservercontrol where self.internalState["clients"] is emptied. nsmd actually sends a clientRemove for every client at session stop. """ #We have a blocking operation in here so we need to be extra cautios that the client exists. if not clientId in self.internalState["clients"]: return False self.clientStop(clientId) #We need to wait for an answer. #Drive the process loop ourselves. This will still trigger updates but the mainloop will wait. logger.info(f"Waiting for {clientId} to be status 'stopped'") while not self.internalState["clients"][clientId]["lastStatus"] == "stopped": self.process() msg = _OutgoingMessage("/nsm/gui/client/remove") msg.add_arg(clientId) #s:clientId self.sock.sendto(msg.build(), self.nsmOSCUrl) #Flood lazy lagging nsmd until it removed the client. #We will receive a few -10 "No such client." errors but that is ok. while True: if not clientId in self.internalState["clients"]: break if self.internalState["clients"][clientId]["lastStatus"] == "removed": break self.sock.sendto(msg.build(), self.nsmOSCUrl) self.process() def clientSave(self, clientId:str): """Saves only the given client""" msg = _OutgoingMessage("/nsm/gui/client/save") msg.add_arg(clientId) #s:clientId self.sock.sendto(msg.build(), self.nsmOSCUrl) def clientHide(self, clientId:str): """Hides the client. Works only if client announced itself with this feature""" msg = _OutgoingMessage("/nsm/gui/client/hide_optional_gui") msg.add_arg(clientId) #s:clientId self.sock.sendto(msg.build(), self.nsmOSCUrl) def clientShow(self, clientId:str): """Hides the client. Works only if client announced itself with this feature""" msg = _OutgoingMessage("/nsm/gui/client/show_optional_gui") msg.add_arg(clientId) #s:clientId self.sock.sendto(msg.build(), self.nsmOSCUrl) #Callbacks def _reactCallback_guiAnnounce(self, parameters:list): """This should not happen, but let's keep it in in case of edge-case multi GUI scenarios""" assert parameters == ["hi"], parameters logger.info("We got an unexpected 'hi', as if requesting gui_announce. Our own initial GUI announce as received and processed silently earlier already.") def _reactCallback_error(self, parameters:list): logger.error(parameters) def _reactCallback_reply(self, parameters:list): """This is a difficult function because replies arrive for many unrelated things, like status. We do our best to send all replies on the right way""" success = False l = len(parameters) if l == 2: originalMessage, data = parameters logger.info(f"Got reply for {originalMessage} with {data}") reply = { "/nsm/server/list" : self._reactReply_nsmServerList, "/nsm/server/new" : self._reactReply_nsmServerNew, "/nsm/server/close" : self._reactReply_nsmServerClose, "/nsm/server/open" : self._reactReply_nsmServerOpen, "/nsm/server/save" : self._reactReply_nsmServerSave, "/nsm/server/abort" : self._reactReply_nsmServerAbort, "/nsm/server/duplicate" : self._reactReply_nsmServerDuplicate, "/nsm/server/add" : self._reactReply_nsmServerAdd, } if originalMessage in reply: reply[originalMessage](data) success = True elif l == 3: originalMessage, errorCode, answer = parameters logger.info(f"Got reply for {originalMessage} with code {errorCode} saying {answer}") if originalMessage == "/nsm/server/add": assert errorCode == 0, parameters self._reactReply_nsmServerAdd(answer) success = True elif l == 1: singleMessage = parameters[0] """For unknown reasons these replies do not repeat the originalMessage""" if singleMessage == "/osc/ping": logger.info(singleMessage) success = True elif singleMessage == "Client removed.": self._reactReply_nsmServerRemoved() success = True elif singleMessage == "Client stopped.": self._reactReply_nsmServerStopped() success = True #After all these reactions and checks the function will eventually return here. if not success: raise NotImplementedError(parameters) def _reactCallback_serverMessage(self, parameters:list): """Messages are normally harmless and uninteresting. Howerver, we need to use some of them for actual tasks. In opposite to reply and status this all go in our function for now, until refactoring""" if parameters == ["Listing session"]: #this feels bad! A simple message is not a reliable state token and could change in the future. #we cannot put that into our own /list outgoing message because other actions like "new" also trigger this callback self.internalState["sessions"].clear() # new clients are added as /reply /nsm/server/list callbacks if parameters[0].startswith("Opening session"): #This gets send only when an existing session starts loading. It will not trigger on new sessions, be it really new or duplicate. #e.g. /nsm/gui/server/message ["Opening session FOO"] nsmSessionName = parameters[0].replace("Opening session ", "") logger.info(f"Starting to load clients of session: {nsmSessionName}") self.sessionOpenLoadingHook(self.sessionAsDict(nsmSessionName)) #notify the api->UI else: logger.info("/nsm/gui/server/message " + repr(parameters)) def _reactCallback_broadcast(self, parameters:list): """We have nothing to do with broadcast. But we save them, so they can be shown on request parameters[0] is an osc path:str without naming constraints the rest is a list of arguments. Attention: a broadcast is not saved by the server. You either are in the session to receive it or you will miss it. If we run Agordejo as attached GUI (incl. --load-session) a broadcast after the session was loaded, where programs announce themselves to all other clients, will not be received here. Such is the case with our data-client. """ logger.info(f"Received broadcast. Saving in internal state: {parameters}") self.internalState["broadcasts"].append(parameters) #Our little trick. We know and like some clients better than others. #If we detect our own data-storage we remember our friends. #It is possible that another datastorage broadcasts, then we overwrite the URL. if parameters and parameters[0] == "/agordejo/datastorage/announce": path, clientId, messageSizeLimit, url = parameters assert "osc.udp" in url logger.info(f"Got announce from agordejo datastorage clientId {clientId} @ {url}") o = urlparse(url) self.dataStorage = DataStorage(self, clientId, messageSizeLimit, (o.hostname, o.port), self.sock) def _reactCallback_serverList(self, parameters:list): """This finalizes a new session list. Here we send new data to the GUI etc.""" l = len(parameters) if l == 2: errorCode, message = parameters assert errorCode == 0, errorCode assert message == "Done.", message #don't miss the dot after Done logger.info("/nsm/server/list is done and has transmitted all available sessions to us") else: raise NotImplementedError(parameters) def _reactCallback_activeSessionChanged(self, parameters:list): """We receive this trough /nsm/gui/session/name This is called when the session has already changed. This also happens when you connect to a headless nsmd with a running session. We expect two parameters: [session name, session path] both of which could be "". If we start nsmd ourselves into an empty state we expect session name to be empty Session path is the subdirectory relative to session root. The session root is not included. !The unqiue name is the session path, not the name! Shortly before we received /nsm/gui/session/session which indicates the attempt to create a new one, I guess! :) If you want to react to the attempt to open a session you need to use /nsm/gui/server/message ["Opening session FOO"] OR creating a new session, after which nsmd will open that session without a message. Empty string is "No session" or "Choose A Session" mode. """ l = len(parameters) if l == 2: nsmSessionName, sessionPath = parameters if not nsmSessionName and not sessionPath: #No session loaded. We are in session-choosing mode. logger.info("Session closed or never started. Choose-A-Session mode.") self.internalState["currentSession"] = None #sessionCloseHooked triggers rebuilding of the session list, which will not work when there is a current session. self.sessionClosedHook() else: #Session path is the subdirectory relative to session root. The session root is not included. sessionPath = sessionPath.lstrip("/") #we strip for technical reasons. logger.info(f"Current Session changed. We are now {nsmSessionName} in {sessionPath}") self.internalState["currentSession"] = sessionPath #This is after the session, received after all programs have loaded. #We have a counterpart-message reaction that signals the attempt to load. self.sessionOpenReadyHook(self.sessionAsDict(sessionPath)) #notify the api->UI for autoClientExecutableInPath in self._addToNextSession: self.clientAdd(autoClientExecutableInPath) self._addToNextSession = [] #reset elif l == 0: #Another way of "no session". self.internalState["currentSession"] = None #sessionCloseHooked triggers rebuilding of the session list, which will not work when there is a current session. self.sessionClosedHook() else: raise NotImplementedError(parameters) def _initializeEmptyClient(self, clientId:str): """NSM reuses signals. It is quite possible that this will be called multiple times, e.g. after opening a session. This is not a reaction callback, we call this ourselves only in _reactCallback_ClientNew """ #if not self.internalState["currentSession"]: # logger.warning(f"We received a clientNew for ID {clientId} but no session open was received." # "This would happen in an old nsmd version. If you see the GUI with an open session and a client list you can ignore this warning") if clientId in self.internalState["clients"]: return logger.info(f"Creating new internal entry for client {clientId}") client = { "clientId":clientId, #for convenience, included internally as well "dumbClient":True, #Bool. Real nsm or just any old program? status "Ready" switches this. "executable":None, #Every client announces to the GUI with the exectuable name. True nsm clients later overwrite with a pretty name which we save as "reportedName" "reportedName":None, #str . The reported name is first the executable name, for status started. But for NSM clients it gets replaced with a reported name. "label":None, #str "lastStatus":None, #str "statusHistory":[], #list "hasOptionalGUI": False, #bool "visible": None, # bool "dirty": None, # bool } self.internalState["clients"][clientId] = client def _setClientData(self, clientId:str, parameter:str, value): if clientId in self.internalState["clients"]: self.internalState["clients"][clientId][parameter] = value return True else: logger.warning(f"Client {clientId} not found in internal status storage. If the session was just closed this is most likely a known race condition. Everything is fine in this case.") return False def _reactCallback_ClientLabelChanged(self, parameters:list): """osc->add_method( "/nsm/gui/client/label", "ss", osc_handler, osc, "path,display_name" ); """ l = len(parameters) if l == 2: clientId, label = parameters logger.info(f"Label for client {clientId} changed to {label}") self._setClientData(clientId, "label", label) self.clientStatusHook(self.internalState["clients"][clientId]) else: raise NotImplementedError(parameters) def _reactCallback_clientPid(self, parameters:list): clientId, pid = parameters self._setClientData(clientId, "pid", pid) def _reactCallback_SessionSession(self, parameters:list): """This is received only when a new session gets created and followed by /nsm/gui/client/new and then a reply for /reply /nsm/server/new Session created""" #This is the counterpart to Message "Opening Session", but for really new or freshly duplicated session. logger.info(f"Attempt to create session: {parameters}") self.sessionOpenLoadingHook(self.sessionAsDict(parameters[0])) #notify the api->UI def _reactCallback_ClientNew(self, parameters:list): """/nsm/gui/client/new ['nBAVO', 'jackpatch'] This is both add client or open. The message comes twice. Once when you add a client, then parameters will contain the executable name. If the client reports itself as NSM compatible through announce we will also get the Open message through this function. Then the name changes from executableName to a reportedName, which will remain for the rest of the session. Executable name is still important to look up icons in the GUI. This message is usually followed by /nsm/gui/client/status """ l = len(parameters) if l == 2: clientId, name = parameters if not clientId in self.internalState["clients"]: self._initializeEmptyClient(clientId) self._setClientData(clientId, "executable", name) logger.info(f"Client started {name}:{clientId}") else: self._setClientData(clientId, "reportedName", name) logger.info(f"Client upgraded to NSM-compatible: {name}:{clientId}") self.clientStatusHook(self.internalState["clients"][clientId]) else: raise NotImplementedError(parameters) def _reactCallback_clientDirty(self, parameters:list): """/nsm/gui/client/dirty ['nMAJH', 1] """ l = len(parameters) if l == 2: clientId, dirty = parameters dirty = bool(dirty) self._setClientData(clientId, "dirty", dirty) logger.info(f"Client {clientId} save status dirty is now: {dirty}") self.clientStatusHook(self.internalState["clients"][clientId]) else: raise NotImplementedError(parameters) def _reactCallback_clientGuiVisible(self, parameters:list): """/nsm/gui/client/gui_visible ['nMAJH', 0] """ l = len(parameters) if l == 2: clientId, visible = parameters visible = bool(visible) self._setClientData(clientId, "visible", visible) logger.info(f"Client {clientId} visibility is now: {visible}") self.clientStatusHook(self.internalState["clients"][clientId]) else: raise NotImplementedError(parameters) def _reactCallback_clientHasOptionalGui(self, parameters:list): """/nsm/gui/client/has_optional_gui ['nFDBK'] nsmd sends us this as reaction to a clients announce capabilities list """ l = len(parameters) if l == 1: clientId = parameters[0] self._setClientData(clientId, "hasOptionalGUI", True) logger.info(f"Client {clientId} supports optional GUI") else: raise NotImplementedError(parameters) def _reactCallback_statusChanged(self, parameters:list): """ Handles all status messages. Some changes, like removed and quit, are only available as status. This means that status removed is the opposite of /nsm/gui/client/new, even if it doesn't read like it. /nsm/gui/client/status ['nFDBK', 'open'] /nsm/gui/client/status ['nMAJH', 'launch'] /nsm/gui/client/status ['nLUPX', 'ready'] /nsm/gui/client/status ['nLUPX', 'save'] /nsm/gui/client/status ['nFHLB', 'quit'] /nsm/gui/client/status ['nLUPX', 'removed'] /nsm/gui/client/status ['nLUPX', 'stopped'] /nsm/gui/client/status ['nLUPX', 'noop'] #For dumb clients! no nsm support! /nsm/gui/client/status ['nLUPX', 'switch'] /nsm/gui/client/status ['nLUPX', 'error'] """ l = len(parameters) if l == 2: clientId, status = parameters logger.info(f"Client status {clientId} now {status}") r = self._setClientData(clientId, "lastStatus", status) if r: #a known race condition at quit may delete this in between calls self.internalState["clients"][clientId]["statusHistory"].append(status) if status == "ready": #we need to check for this now. Below in actions is after the statusHook and too late. self._setClientData(clientId, "dumbClient", False) self.clientStatusHook(self.internalState["clients"][clientId]) else: raise NotImplementedError(parameters) #Now handle our actions. For better readability in separate functions. actions = { "open": self._reactStatus_open, "launch": self._reactStatus_launch, "ready": self._reactStatus_ready, "save": self._reactStatus_save, "quit": self._reactStatus_quit, "removed": self._reactStatus_removed, "stopped": self._reactStatus_stopped, "noop": self._reactStatus_noop, "switch": self._reactStatus_switch, "error": self._reactStatus_error, }[status](clientId) actions #pylint does not like temporary dicts for case-switch def _reactStatus_removed(self, clientId:str): """Remove the client entry from our internal state. This also covers crashes.""" if clientId in self.internalState["clients"]: #race condition at quit del self.internalState["clients"][clientId] if self.dataStorage and clientId == self.dataStorage.ourClientId: #We only care about the current data-storage, not another instance that was started before it. self.dataClientNamesHook(None) self.dataClientDescriptionHook(None) self.dataStorage = None def _reactStatus_stopped(self, clientId:str): """The client has stopped and can be restarted. The status is not saved. NSM will try to open all clients on session open and end in "ready" """ if self.dataStorage and clientId == self.dataStorage.ourClientId: #We only care about the current data-storage, not another instance that was started before it. self.dataClientNamesHook(None) self.dataClientDescriptionHook(None) self.dataStorage = None def _reactStatus_launch(self, clientId:str): """ Launch is a transitional status for NSM clients but the terminal status for dumb clients """ pass def _reactStatus_open(self, clientId:str): """ """ pass def _reactStatus_ready(self, clientId:str): """ This is sent after startup but also after every save. It signals that the client can react to nsm signals, not that it is ready for something else. Note that this is *After* the clientStatusHook, so any data changed here is not submitted to the api/GUI yet. E.g. you can't change dumbClient to True here if that is needed directly after start by the GUI. """ pass def _reactStatus_save(self, clientId:str): """ """ pass def _reactStatus_quit(self, clientId:str): """ """ pass def _reactStatus_noop(self, clientId:str): """ Dumb clients, or rather nsmd, react with noop on signals they cannot understand, like saving. """ pass def _reactStatus_switch(self, clientId:str): """ """ pass def _reactStatus_error(self, clientId:str): """ """ logger.error(f"{clientId} has error status!") def _reactReply_nsmServerOpen(self, answer:str): assert answer == "Loaded.", answer def _reactReply_nsmServerSave(self, answer:str): assert answer == "Saved.", answer def _reactReply_nsmServerClose(self, answer:str): assert answer == "Closed.", answer def _reactReply_nsmServerAbort(self, answer:str): assert answer == "Aborted.", answer def _reactReply_nsmServerAdd(self, answer:str): """Reaction to add client""" assert answer == "Launched.", answer def _reactReply_nsmServerRemoved(self): pass def _reactReply_nsmServerStopped(self): pass def _reactReply_nsmServerDuplicate(self, answer:str): """There are a lot of errors possible here, reported through nsmd /error, because we are dealing with the file system. Our own GUI and other safeguards should protect us from most though Positive answers are 'Duplicated.' when nsmd finished copying and 'Loaded.' when the new session is loaded. Or so one would think... the messages arrive the other way around. Anyway, both are needed to signify a succesful duplication. """ assert answer == "Loaded." or answer == "Duplicated.", answer #We don't need any callbacks here, nsmd sends a session change on top of the duplicate replies. def _reactReply_nsmServerNew(self, answer:str): """Created. arrives when a new session is created for the first time and directory is mkdir Session created arrives when a session was opened and nsm created its internal "session". We do not need to react to the new signal because we watch the dir for new sessions ourselves and the currently active session is send through "/nsm/gui/session/name" : self._reactCallback_activeSessionChanged, """ assert answer == 'Created.' or answer == "Session created", answer def _reactReply_nsmServerList(self, nsmSessionName:str): """Session names come one reply at a time. We reacted to the message /nsm/gui/server/message ['Listing sessions'] by clearing our internal session status and will save the new ones here /reply ['/nsm/server/list', 'test3'] Do not confuse reply server list with the message /nsm/server/list [0, 'Done.'] The latter is a top level message :( """ self.internalState["sessions"].add(nsmSessionName) #Our own functions def allClientsHide(self): for clientId, clientDict in self.internalState["clients"].items(): if clientDict["hasOptionalGUI"]: self.clientHide(clientId) def allClientsShow(self): for clientId, clientDict in self.internalState["clients"].items(): if clientDict["hasOptionalGUI"]: self.clientShow(clientId) def clientToggleVisible(self, clientId:str): if self.internalState["clients"][clientId]["hasOptionalGUI"]: if self.internalState["clients"][clientId]["visible"]: self.clientHide(clientId) else: self.clientShow(clientId) #data-storage / nsm-data def clientNameOverride(self, clientId:str, name:str): """An agordejo-specific function that requires the client nsm-data in the session. If nsm-data is not present this function will write nothing, not touch any data. It will still send a callback to revert any GUI changes back to the original name. We accept empty string as a name to remove the name override """ if self.dataStorage: assert clientId in self.internalState["clients"], self.internalState["clients"] self.dataStorage.setClientOverrideName(clientId, name) #triggers callback #data-storage / nsm-data def setDescription(self, text:str): if self.dataStorage: self.dataStorage.setDescription(text) def _checkDirectoryForSymlinks(self, path)->bool: for p in path.rglob("*"): if p.is_symlink(): return True return False def _checkIfLocked(self, nsmSessionName:str)->bool: basePath = pathlib.Path(self.sessionRoot, nsmSessionName) assert basePath.exists() lockFile = pathlib.Path(basePath, ".lock") return lockFile.exists() def forceLiftLock(self, nsmSessionName:str): """Removes lockfile, no matter if session is actually open or just a remainder from a crash. If no lock exist it does nothing.""" self._updateSessionListBlocking() if self._checkIfLocked(nsmSessionName): basePath = pathlib.Path(self.sessionRoot, nsmSessionName) assert basePath.exists() #implied by _checkIfLocked lockFile = pathlib.Path(basePath, ".lock") lockFile.unlink(missing_ok=True) logger.info(f"{nsmSessionName} was forced to unlock by us.") else: logger.info(f"Tried to unlock, but {nsmSessionName} is not locked") def getSessionFiles(self, nsmSessionName:str)->list: """Return all session files, useful to present to the user, e.g. as warning before deletion""" self._updateSessionListBlocking() basePath = pathlib.Path(self.sessionRoot, nsmSessionName) assert basePath.exists() return [f.as_posix() for f in basePath.rglob("*")] #Includes directories themselves #Only files, no directories themselves. #result = [] #for path, dirs, files in walk(basePath): # for file in files: # result.append(pathlib.Path(path, file).as_posix()) #return result def deleteSession(self, nsmSessionName:str): """Delete project directory with all data. No undo. Only if session is not locked""" self._updateSessionListBlocking() if not nsmSessionName in self.internalState["sessions"]: logger.warning(f"{nsmSessionName} is not a session") return False basePath = pathlib.Path(self.sessionRoot, nsmSessionName) assert basePath.exists() if not self._checkIfLocked(nsmSessionName): logger.info(f"Deleting session {nsmSessionName}: {self.getSessionFiles(nsmSessionName)}") shutilrmtree(basePath) else: logger.warning(f"Tried to delete {basePath} but it is locked") def renameSession(self, nsmSessionName:str, newName:str): """Only works if session is not locked and dir does not exist yet""" self._updateSessionListBlocking() newPath = pathlib.Path(self.sessionRoot, newName) oldPath = pathlib.Path(self.sessionRoot, nsmSessionName) assert oldPath.exists() if self._checkIfLocked(nsmSessionName): logger.warning(f"Can't rename {nsmSessionName} to {newName}. {nsmSessionName} is locked.") return False elif newPath.exists(): logger.warning(f"Can't rename {nsmSessionName} to {newName}. {newName} already exists.") return False else: logger.info(f"Renaming {nsmSessionName} to {newName}.") tmp = pathlib.Path(oldPath.name+str(uuid4())) #Can't move itself into a subdir in itself. move to temp first. We don't use tempdir because that could be on another partition. we already know we can write here. oldPath.rename(tmp) pathlib.Path(newPath).mkdir(parents=True, exist_ok=True) tmp.rename(newPath) assert newPath.exists() def copySession(self, nsmSessionName:str, newName:str): """Copy a whole tree. Keep symlinks as symlinks. Lift lock""" self._updateSessionListBlocking() source = pathlib.Path(self.sessionRoot, nsmSessionName) destination = pathlib.Path(self.sessionRoot, newName) if destination.exists(): logger.warning(f"Can't copy {nsmSessionName} to {newName}. {newName} already exists.") return False elif not nsmSessionName in self.internalState["sessions"]: logger.warning(f"{nsmSessionName} is not a session") return elif not source.exists(): logger.warning(f"Can't copy {nsmSessionName} because it does not exist.") return False #All is well. try: shutilcopytree(source, destination, symlinks=True, dirs_exist_ok=False) #raises an error if dir already exists. But we already test above. self.forceLiftLock(newName) except Exception as e: #we don't want to crash if user tries to copy to /root or so. logger.error(e) return False #Export to the User Interface def sessionAsDict(self, nsmSessionName:str)->dict: assert self.sessionRoot entry = {} entry["nsmSessionName"] = nsmSessionName entry["name"] = pathlib.Path(nsmSessionName).name basePath = pathlib.Path(self.sessionRoot, nsmSessionName) sessionFile = pathlib.Path(basePath, "session.nsm") if not sessionFile.exists(): #This is a reason to let the program exit. logger.error("Got wrong session directory from nsmd. Race condition after delete? In any case a breaking error (please report). Quitting. Project was: " + repr(sessionFile)) sysexit() #return None switch to return None to let it crash and see the python traceback timestamp = datetime.fromtimestamp(sessionFile.stat().st_mtime).isoformat(sep=" ", timespec='minutes') entry["lastSavedDate"] = timestamp entry["sessionFile"] = sessionFile entry["lockFile"] = pathlib.Path(basePath, ".lock") entry["fullPath"] = str(basePath) entry["sizeInBytes"] = sum(f.stat().st_size for f in basePath.glob('**/*') if f.is_file() ) entry["numberOfClients"] = len(open(sessionFile).readlines()) entry["hasSymlinks"] = self._checkDirectoryForSymlinks(basePath) entry["parents"] = basePath.relative_to(self.sessionRoot).parts[:-1] #tuple of each dir between NSM root and nsmSessionName/session.nsm, exluding the actual project name. This is the tree entry["locked"] = self._checkIfLocked(nsmSessionName) #not for direct display return entry def exportSessionsAsDicts(self)->list: """Return a list of dicts of projects with additional information: """ logger.info("Exporting sessions to dict. Will call blocking list sessions next") results = [] #assert not self.internalState["currentSession"], self.internalState["currentSession"] #Do not request session list while in active session self._updateSessionListBlocking() for nsmSessionName in self.internalState["sessions"]: result = self.sessionAsDict(nsmSessionName) results.append(result) return results class DataStorage(object): """Interface to handle the external datastorage client url is pre-processed (host, port) Our init is the same as announcing the nsm-data client in the session. That means everytime nsm-data sends a new/open reply we get created. Thus we will send all our data to parent and subsequently to GUI-callbacks in init. Keys are strings, While nsmd OSC support int, str and float we use json exclusively. We send json string and parse the received data. Try to use only ints, floats, strin gs, lists and dicts. Client pretty names are limited to 512 chars, depending on our OSC message size. nsm-data will just cut to 512 chars. So a GUI should better protect that limit. """ def __init__(self, parent, ourClientId, messageSizeLimit:int, url:tuple, sock): logger.info("Create new DataStorage instance") self.parent = parent self.messageSizeLimit = messageSizeLimit # e.g. 512 self.ourClientId = ourClientId self.clients = parent.internalState["clients"] #shortcut. Mutable, persistent dict, until instance gets deleted. self.url = url self.sock = sock self.ip, self.port = self.sock.getsockname() self.data = self.getAll() #blocks. our local copy. = {"clientOverrideNames":{clientId:nameOverride}, "description":"str"} self.namesToParentAndCallbacks() self.descriptionToParentAndCallbacks() def namesToParentAndCallbacks(self): self.parent.dataClientNamesHook(self.data["clientOverrideNames"]) def descriptionToParentAndCallbacks(self): """Every char!!!""" self.parent.dataClientDescriptionHook(self.data["description"]) def _waitForMultipartMessage(self, pOscpath:str)->str: """Returns a json string, as if the message was sent as a single one. Can consist of only one part as well.""" logger.info(f"Waiting for multi message {pOscpath} in blocking mode") self.parent._setPause(True) jsonString = "" chunkNumberOfParts = float("+inf") #zero based currentPartNumber = float("-inf") #zero based while True: if currentPartNumber >= chunkNumberOfParts: break try: data, addr = self.sock.recvfrom(1024) except socket.timeout: break msg = _IncomingMessage(data) if msg.oscpath == pOscpath: currentPartNumber, l, jsonChunk = msg.params jsonString += jsonChunk chunkNumberOfParts = l #overwrite infinity the first time and redundant afterwards. else: self.parent._queue.append(msg) self.parent._setPause(False) logger.info(f"Message complete with {chunkNumberOfParts} chunks.") return jsonString def getAll(self): """Mirror everything from nsm-data""" msg = _OutgoingMessage("/agordejo/datastorage/getall") msg.add_arg(self.ip) msg.add_arg(self.port) self.sock.sendto(msg.build(), self.url) jsonString = self._waitForMultipartMessage("/agordejo/datastorage/reply/getall") return json.loads(jsonString) def setClientOverrideName(self, clientId:str, value): """We accept empty string as a name to remove the name override""" assert clientId in self.clients, self.clients msg = _OutgoingMessage("/agordejo/datastorage/setclientoverridename") msg.add_arg(clientId) msg.add_arg(json.dumps(value)) self.sock.sendto(msg.build(), self.url) self.getClientOverrideName(clientId) #verifies data and triggers callback def getClientOverrideName(self, clientId:str): msg = _OutgoingMessage("/agordejo/datastorage/getclientoverridename") msg.add_arg(clientId) msg.add_arg(self.ip) msg.add_arg(self.port) self.sock.sendto(msg.build(), self.url) #Wait in blocking mode self.parent._setPause(True) while True: try: data, addr = self.sock.recvfrom(1024) except socket.timeout: break msg = _IncomingMessage(data) if msg.oscpath == "/agordejo/datastorage/reply/getclient": replyClientId, jsonName = msg.params assert replyClientId == clientId, (replyClientId, clientId) break else: self.parent._queue.append(msg) self.parent._setPause(False) #Got answer answer = json.loads(jsonName) if answer: self.data["clientOverrideNames"][clientId] = answer else: #It is possible that a client not present in our storage will send an empty string. Protect. if clientId in self.data["clientOverrideNames"]: del self.data["clientOverrideNames"][clientId] self.namesToParentAndCallbacks() def _chunkstring(self, string): return [string[0+i:self.messageSizeLimit+i] for i in range(0, len(string), self.messageSizeLimit)] def setDescription(self, text:str): """This most likely arrives one char at time with the complete text""" chunks = self._chunkstring(text) descriptionId = str(id(text))[:8] for index, chunk in enumerate(chunks): msg = _OutgoingMessage("/agordejo/datastorage/setdescription") msg.add_arg(descriptionId) msg.add_arg(index) msg.add_arg(chunk) msg.add_arg(self.ip) msg.add_arg(self.port) self.sock.sendto(msg.build(), self.url) #No echo answer. #We cheat a bit and inform parents with the new text directly. self.data["description"] = text self.descriptionToParentAndCallbacks() #and back #Generic Functions. Not in use and not ready. def _test(self): self.readAll() self.setDescription("Ein Jäger aus Kurpfalz,\nDer reitet durch den grünen Wald,\nEr schießt das Wild daher,\nGleich wie es ihm gefällt.") self.read("welt") self.create("welt", "world") self.read("welt") self.create("str", "bar") self.create("int", 1) self.create("list", [1, 2, 3]) self.create("tuple", (1, 2, 3)) #no tuples, everything will be a list. self.create("dict", {1:2, 3:4, 5:6}) self.update("str", "rolf") self.delete("str") def read(self, key:str): """Request one value""" msg = _OutgoingMessage("/agordejo/datastorage/read") msg.add_arg(key) msg.add_arg(self.ip) msg.add_arg(self.port) self.sock.sendto(msg.build(), self.url) def readAll(self): """Request all data""" msg = _OutgoingMessage("/agordejo/datastorage/readall") msg.add_arg(self.ip) msg.add_arg(self.port) self.sock.sendto(msg.build(), self.url) def create(self, key:str, value): """Write/Create one value.""" msg = _OutgoingMessage("/agordejo/datastorage/create") msg.add_arg(key) msg.add_arg(json.dumps(value)) self.sock.sendto(msg.build(), self.url) def update(self, key:str, value): """Update a value, but only if it exists""" msg = _OutgoingMessage("/agordejo/datastorage/update") msg.add_arg(key) msg.add_arg(json.dumps(value)) self.sock.sendto(msg.build(), self.url) def delete(self, key:str): """Delete a key/value completely""" msg = _OutgoingMessage("/agordejo/datastorage/delete") msg.add_arg(key) self.sock.sendto(msg.build(), self.url)