Music production session manager https://www.laborejo.org
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

1841 lines
84 KiB

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Copyright 2021, Nils Hilbricht, Germany ( https://www.hilbricht.net )
The Non-Session-Manager by Jonathan Moore Liles <male@tuxfamily.org>: http://non.tuxfamily.org/nsm/
New Session Manager, by LinuxAudio.org: https://github.com/linuxaudio/new-session-manager
With help from code fragments from https://github.com/attwad/python-osc ( DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE v2 )
This file is part of the Laborejo Software Suite ( https://www.laborejo.org ).
This application is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import logging; logger = logging.getLogger(__name__); logger.info("import")
#Standard Library
import struct
import socket
from os import getenv #to get NSM env var
from shutil import rmtree as shutilrmtree
from shutil import copytree as shutilcopytree
from multiprocessing import Process
from urllib.parse import urlparse #to convert NSM env var
import subprocess
import atexit
import pathlib
import json
from uuid import uuid4
from datetime import datetime
from sys import exit as sysexit
from time import sleep
#Our files
from .comparedirectories import md5_dir
def nothing(*args, **kwargs):
pass
class _IncomingMessage(object):
"""Representation of a parsed datagram representing an OSC message.
An OSC message consists of an OSC Address Pattern followed by an OSC
Type Tag String followed by zero or more OSC Arguments.
"""
def __init__(self, dgram):
#NSM Broadcasts are bundles, but very simple ones. We only need to care about the single message it contains.
#Therefore we can strip the bundle prefix and handle it as normal message.
if b"#bundle" in dgram:
bundlePrefix, singleMessage = dgram.split(b"/", maxsplit=1)
dgram = b"/" + singleMessage # / eaten by split
self.LENGTH = 4 #32 bit
self._dgram = dgram
self._parameters = []
self.parse_datagram()
def get_int(self, dgram, start_index):
"""Get a 32-bit big-endian two's complement integer from the datagram.
Args:
dgram: A datagram packet.
start_index: An index where the integer starts in the datagram.
Returns:
A tuple containing the integer and the new end index.
Raises:
ValueError if the datagram could not be parsed.
"""
try:
if len(dgram[start_index:]) < self.LENGTH:
raise ValueError('Datagram is too short')
return (
struct.unpack('>i', dgram[start_index:start_index + self.LENGTH])[0], start_index + self.LENGTH)
except (struct.error, TypeError) as e:
raise ValueError('Could not parse datagram %s' % e)
def get_string(self, dgram, start_index):
"""Get a python string from the datagram, starting at pos start_index.
We receive always the full string, but handle only the part from the start_index internally.
In the end return the offset so it can be added to the index for the next parameter.
Each subsequent call handles less of the same string, starting further to the right.
According to the specifications, a string is:
"A sequence of non-null ASCII characters followed by a null,
followed by 0-3 additional null characters to make the total number
of bits a multiple of 32".
Args:
dgram: A datagram packet.
start_index: An index where the string starts in the datagram.
Returns:
A tuple containing the string and the new end index.
Raises:
ValueError if the datagram could not be parsed.
"""
#First test for empty string, which is nothing, followed by a terminating \x00 padded by three additional \x00.
if dgram[start_index:].startswith(b"\x00\x00\x00\x00"):
return "", start_index + 4
#Otherwise we have a non-empty string that must follow the rules of the docstring.
offset = 0
try:
while dgram[start_index + offset] != 0:
offset += 1
if offset == 0:
raise ValueError('OSC string cannot begin with a null byte: %s' % dgram[start_index:])
# Align to a byte word.
if (offset) % self.LENGTH == 0:
offset += self.LENGTH
else:
offset += (-offset % self.LENGTH)
# Python slices do not raise an IndexError past the last index,
# do it ourselves.
if offset > len(dgram[start_index:]):
raise ValueError('Datagram is too short')
data_str = dgram[start_index:start_index + offset]
return data_str.replace(b'\x00', b'').decode('utf-8'), start_index + offset
except IndexError as ie:
raise ValueError('Could not parse datagram %s' % ie)
except TypeError as te:
raise ValueError('Could not parse datagram %s' % te)
def get_float(self, dgram, start_index):
"""Get a 32-bit big-endian IEEE 754 floating point number from the datagram.
Args:
dgram: A datagram packet.
start_index: An index where the float starts in the datagram.
Returns:
A tuple containing the float and the new end index.
Raises:
ValueError if the datagram could not be parsed.
"""
try:
return (struct.unpack('>f', dgram[start_index:start_index + self.LENGTH])[0], start_index + self.LENGTH)
except (struct.error, TypeError) as e:
raise ValueError('Could not parse datagram %s' % e)
def parse_datagram(self):
try:
self._address_regexp, index = self.get_string(self._dgram, 0)
if not self._dgram[index:]:
# No params is legit, just return now.
return
# Get the parameters types.
type_tag, index = self.get_string(self._dgram, index)
if type_tag.startswith(','):
type_tag = type_tag[1:]
# Parse each parameter given its type.
for param in type_tag:
if param == "i": # Integer.
val, index = self.get_int(self._dgram, index)
elif param == "f": # Float.
val, index = self.get_float(self._dgram, index)
elif param == "s": # String.
val, index = self.get_string(self._dgram, index)
else:
logger.warning("Unhandled parameter type: {0}".format(param))
continue
self._parameters.append(val)
except ValueError as pe:
#raise ValueError('Found incorrect datagram, ignoring it', pe)
# Raising an error is not ignoring it!
logger.warning("Found incorrect datagram, ignoring it. {}".format(pe))
@property
def oscpath(self):
"""Returns the OSC address regular expression."""
return self._address_regexp
@staticmethod
def dgram_is_message(dgram):
"""Returns whether this datagram starts as an OSC message."""
return dgram.startswith(b'/')
@property
def size(self):
"""Returns the length of the datagram for this message."""
return len(self._dgram)
@property
def dgram(self):
"""Returns the datagram from which this message was built."""
return self._dgram
@property
def params(self):
"""Convenience method for list(self) to get the list of parameters."""
return list(self)
def __iter__(self):
"""Returns an iterator over the parameters of this message."""
return iter(self._parameters)
class _OutgoingMessage(object):
def __init__(self, oscpath):
self.LENGTH = 4 #32 bit
self.oscpath = oscpath
self._args = []
def write_string(self, val):
dgram = val.encode('utf-8')
diff = self.LENGTH - (len(dgram) % self.LENGTH)
dgram += (b'\x00' * diff)
return dgram
def write_int(self, val):
return struct.pack('>i', val)
def write_float(self, val):
return struct.pack('>f', val)
def add_arg(self, argument):
t = {str:"s", int:"i", float:"f"}[type(argument)]
self._args.append((t, argument))
def build(self):
dgram = b''
#OSC Path
dgram += self.write_string(self.oscpath)
if not self._args:
dgram += self.write_string(',')
return dgram
# Write the parameters.
arg_types = "".join([arg[0] for arg in self._args])
dgram += self.write_string(',' + arg_types)
for arg_type, value in self._args:
f = {"s":self.write_string, "i":self.write_int, "f":self.write_float}[arg_type]
dgram += f(value)
return dgram
class NsmServerControl(object):
"""
The ServerControl can be started in three modes, regarding nsmd.
We expect that starting our own nsmd will be the majority of cases.
SessionRoot parameter is only honored if we start nsmd ourselves.
Ascending lookup priority:
1) Default is to start our own nsmd. A single-instance watcher will prevent multiple programs
on the same system.
2) When $NSM_URL is found as environment we will connect to that nsmd.
3) When hostname and portnumber are given explicitely as instance variables we will first test
if a server is running at that URL, if not we will start our own with these parameters.
This is not only a pure implemenation of the protocol.
It is extended by us reacting to and storing incoming data. This data can be interpreted
and enhanced by looking at the session dir ourselves. However, we don't do anything that
is not possible by the original nsmd + human interaction.
100% Compatibility is the highest priority.
The big problems are the async nature of communication, message come out of order or interleaved,
and nsm is not consistent in its usage of osc-paths. For example it starts listing sessions
with /nsm/gui/server/message, but sends the content with /reply [/nsm/server/list, nsmSessionName] and
then ends it with /nsm/server/list [0, Done] (no reply!). So three message types, three callbacks
for one logically connected process.
To update our internal session information we therefore need to split the functionality into
severall seemingly unconnected callbacks and you need to know how the protocol works to actually
know the order of operations. Switch logging to info to learn more.
We have a mix between NSM callbacks and our own functions.
Most important there is a watchdog that looks at the session directory and creates its own
callbacks if something changes.
A typical operation, say sessionDelete or sessionCopy looks like this:
* Ask (blocking) nsmd for a current list of sessions, update our internal state
* Perform a file operation, like copy or delete or lift a lock
* Let our watchdog discover the changes in the file system and trigger another (non-blocking)
request for a list of sessions to adjust our internal state to reality.
Granted, we could just call our blocking query again at the end, but we would still need to
let the watchdog run for operations that the user does with a filemanager, which would end up
in redundant calls. Bottom line: _updateSessionListBlocking is called at the beginning of a
function, but not at the end.
Docs:
http://non.tuxfamily.org/nsm/
http://non.tuxfamily.org/wiki/Non%20Session%20Manager
http://non.tuxfamily.org/wiki/ApplicationsSupportingNsm
http://non.tuxfamily.org/nsm/API.html
"""
def __init__(self, sessionOpenReadyHook, sessionOpenLoadingHook, sessionClosedHook, clientStatusHook, singleInstanceActivateWindowHook, dataClientNamesHook, dataClientDescriptionHook, dataClientTimelineMaximumDurationChangedHook,
parameterNsmOSCUrl=None, sessionRoot=None, startupSession=None, useCallbacks=True):
"""If useCallbacks is False you will see every message in the log.
This is just a development mode to see all messages, unfiltered.
Normally we have special hook functions that save and interpret data, so they don't
show in the logs"""
#Deactivate hooks for now. During init no hooks may be called,
#but some functions want to do that already. We setup the true hooks at the end of init
self.sessionOpenReadyHook= self.sessionOpenLoadingHook= self.sessionClosedHook= self.clientStatusHook= self.singleInstanceActivateWindowHook= self.dataClientNamesHook= self.dataClientDescriptionHook= nothing
self._queue = list() #Incoming OSC messages are buffered here.
#Status variables that are set by our callbacks
self.internalState = {
"sessions" : set(), #nsmSessionNames:str . We use set for unqiue, just in case. But we also clear that on /nsm/gui/server/message ['Listing sessions'] to handle deleted sessions
"currentSession" : None,
"port" : None, #Our GUI port
"serverPort" : None, #nsmd port
"nsmUrl" : None, #the environment variable
"clients" : {}, #clientId:dict see self._initializeEmptyClient . Gets replaced with a new dict instance on session changes.
"broadcasts" : [], #in the order they appeared
"datastorage" : None, #URL, if present in the session
}
self.dataStorage = None #Becomes DataStorage() every time a datastorage client does a broadcast announce.
self._addToNextSession = [] #A list of executables in PATH. Filled by new, waits for reply that session is created and then will send clientNew and clear the list.
if useCallbacks:
self.callbacks = {
"/nsm/gui/session/name" : self._reactCallback_activeSessionChanged,
#"/nsm/gui/session/root" #Session root is an active blocking call in init
"/nsm/gui/client/label" : self._reactCallback_ClientLabelChanged,
"/nsm/gui/client/new" : self._reactCallback_ClientNew,
"/nsm/gui/session/session" : self._reactCallback_SessionSession,
"/nsm/gui/client/status" : self._reactCallback_statusChanged, #handles multiple status keywords
"/reply" : self._reactCallback_reply, #handles multiple replies
"/error" : self._reactCallback_error,
"/nsm/gui/client/has_optional_gui" : self._reactCallback_clientHasOptionalGui,
"/nsm/gui/client/gui_visible" : self._reactCallback_clientGuiVisible,
"/nsm/gui/client/pid" : self._reactCallback_clientPid,
"/nsm/gui/client/dirty" : self._reactCallback_clientDirty,
"/nsm/gui/server/message" : self._reactCallback_serverMessage,
"/nsm/gui/gui_announce" : self._reactCallback_guiAnnounce, #we rarely receive that, especially not in init.
"/nsm/server/list" : self._reactCallback_serverList,
"/nsm/server/broadcast" : self._reactCallback_broadcast,
}
else: #This is just a development mode to see all messages, unfiltered
self.callbacks = set() #empty set is easiest to check
#Networking and Init for our control part, not for the server
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #internet, udp
self.sock.bind(('', 0)) #pick a free port on localhost.
self.sock.setblocking(False)
self.internalState["port"] = self.sock.getsockname()[1] #only happens once, ports don't change during runtime
#self.sock.close() Do not close, this runs until the end of the program
###Testing of existing servers, starting and connecting
#First handle the NSM URL, or generate on.
#self.nsmOSCUrl must be a tuple compatible to the result of urlparse. (hostname, port)
self.singleInstanceSocket = None
if parameterNsmOSCUrl:
o = urlparse(parameterNsmOSCUrl)
#self.nsmOSCUrl = (o.hostname, o.port) #this forces lowercase. in rare circumstances this is not correct and we must be case sensitive. fix:
self.nsmOSCUrl = o.netloc.split(":")[0], o.port
else:
envResult = self._getNsmOSCUrlFromEnvironment()
if envResult:
self.nsmOSCUrl = envResult
else:
#This is the default case. User just starts the GUI. The other modes are concious decisions to either start with URL as parameter or in an NSM environment.
#But now we need to test if the user accidentaly opened a second GUI, which would start a second server.
self._setupAndTestForSingleInstance() #This might quit the whole program and we will never see the next line.
self.nsmOSCUrl = self._generateFreeNsmOSCUrl()
assert self.nsmOSCUrl
self.internalState["serverPort"] = self.nsmOSCUrl[1] #only happens once, ports don't change during runtime
self.internalState["nsmUrl"] = f"osc.udp://{self.nsmOSCUrl[0]}:{self.nsmOSCUrl[1]}/" #only happens once, ports don't change during runtime
#Then check if a server is running there. If not start one.
self.ourOwnServer = None #Might become a subprocess handle
if self._isNsmdRunning(self.nsmOSCUrl):
#serverport = self.nsmOSCUrl[1]
#No further action required. GUI announce below this testing.
pass
else:
self._startNsmdOurselves(sessionRoot, startupSession) #Session root can be a commandline parameter we forward to the server if we start it ourselves. startupSession is an autoloader. Both are usually None.
assert type(self.ourOwnServer) is subprocess.Popen, (self.ourOwnServer, type(self.ourOwnServer))
#Wait for the server, or test if it is reacting.
self._waitForPingResponseBlocking()
logger.info("nsmd is ready @ {}".format(self.nsmOSCUrl))
#Tell nsmd that we are a GUI and want to receive general messages async, not only after we request something
self.sessionRoot = self._initial_announce() #Triggers "hi" and session root
self.internalState["sessionRoot"] = self.sessionRoot
self._forceProcessOnceToEmptyQueue() #process any leftover messages.
atexit.register(self.quit) #mostly does stuff when we started nsmd ourself
#Activate hooks for api callbacks, now that we are finished here.
#Otherwise the hooks will get called from our functions (e.g. new client) while we are still during init
self.sessionOpenReadyHook = sessionOpenReadyHook #self.sessionAsDict(nsmSessionName) as parameter
self.sessionOpenLoadingHook = sessionOpenLoadingHook #self.sessionAsDict(nsmSessionName) as parameter
self.sessionClosedHook = sessionClosedHook #no parameter. This is also "choose a session" mode
self.clientStatusHook = clientStatusHook #all client status is done via this single hook. GUIs need to check if they already know the client or not.
self.dataClientNamesHook = dataClientNamesHook
self.dataClientDescriptionHook = dataClientDescriptionHook
self.dataClientTimelineMaximumDurationChangedHook = dataClientTimelineMaximumDurationChangedHook
self.singleInstanceActivateWindowHook = singleInstanceActivateWindowHook #added to self.processSingleInstance() to listen for a message from another wannabe-instance
self._receiverActive = True
logger.info("nsmservercontrol init is complete. Ready for event loop")
#Now an external event loop can add self.process
#Internal Methods
def _setupAndTestForSingleInstance(self):
"""on program startup trigger this if there is already another instance of us running.
This socket is only
"""
self.singleInstanceSocket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
logger.info("Testing if another non-specific Agordejo is running.")
try:
## Create an abstract socket, by prefixing it with null.
# this relies on a feature only in linux, when current process quits, the
# socket will be deleted.
self.singleInstanceSocket.bind('\0' + "agordejo")
self.singleInstanceSocket.listen(1)
self.singleInstanceSocket.setblocking(False)
logger.info("No other non-specific Agordejo found. Starting GUI")
#Continue in self.processSingleInstance()
return True
except socket.error:
logger.error("GUI for this nsmd server already running. Informing the existing application to show itself.")
self.singleInstanceSocket.connect('\0' + "agordejo")
self.singleInstanceSocket.send("agordejoactivate".encode());
self.singleInstanceSocket.close()
sysexit(1) #triggers atexit
#print ("not executed")
return False
def processSingleInstance(self):
"""Tests our unix socket for an incoming signal.
if received forward to the engine->gui
Can be added to a slower event loop, so it is not in self.process"""
if self.singleInstanceSocket:
try:
connection, client_address = self.singleInstanceSocket.accept() #This blocks and waits for a message
incoming = connection.recv(1024)
if incoming and incoming == b"agordejoactivate":
self.singleInstanceActivateWindowHook()
except BlockingIOError: #happens while no data is received. Has nothing to do with blocking or not. In fact: this happens when in non-blocking mode.
pass
except socket.timeout:
pass
def _setPause(self, state:bool):
"""Set both the socket and the thread into waiting mode or not.
With this we can wait for answers until we resume async operation"""
if state:
self.sock.setblocking(True) #explicitly wait.
self.sock.settimeout(0.5)
self._receiverActive = False
logger.info("Suspending receiving async mode.")
else:
self.sock.setblocking(False)
self._receiverActive = True
logger.info("Resuming receiving async mode.")
def _forceProcessOnceToEmptyQueue(self):
"""Sometimes we want to make sure everything is processed until we continue. For example
in our init.
Initial usecase was connecting to a running nsmd with session. The api first callback to
export sessions to the GUI was freezing because listSession was chocking on leftover messages
from gui_announce to a running session, which sends the session name and a list of clients.
The latter is not happening when starting the server ourselves, so we weren't expecting this.
To be honest, this is really a patch to work around a design flaw and we hope this is a
one-off corner case."""
logger.info("Force processing queue")
#First gather all osc messages still in the pipe
while True:
try:
data, addr = self.sock.recvfrom(1024)
msg = _IncomingMessage(data)
if msg:
self._queue.append(msg)
except BlockingIOError: #happens while no data is received. Has nothing to do with blocking or not.
break
except socket.timeout:
break
#Now process them all. This is different than normal self.process().
for msg in self._queue:
if msg.oscpath in self.callbacks:
self.callbacks[msg.oscpath](msg.params)
else:
logger.warning(f"Unhandled message with path {msg.oscpath} and parameters {msg.params}")
self._queue.clear()
logger.info("Ended force processing queue")
def process(self):
"""Use this in an external event loop"""
if self._receiverActive:
while True:
try:
data, addr = self.sock.recvfrom(1024)
msg = _IncomingMessage(data)
if msg:
self._queue.append(msg)
except BlockingIOError: #happens while no data is received. Has nothing to do with blocking or not.
break
except socket.timeout:
break
for msg in self._queue:
if msg.oscpath in self.callbacks:
self.callbacks[msg.oscpath](msg.params)
else:
logger.warning(f"Unhandled message with path {msg.oscpath} and parameters {msg.params}")
self._queue.clear()
def _getNsmOSCUrlFromEnvironment(self):
"""Return the nsm osc url or None"""
nsmOSCUrl = getenv("NSM_URL")
if not nsmOSCUrl:
return None
else:
#osc.udp://hostname:portnumber/
o = urlparse(nsmOSCUrl)
return o.hostname, o.port
def _generateFreeNsmOSCUrl(self):
#Instead of reading out the NSM port we get a free port ourselves and set up nsmd with that
tempServerSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #internet, udp
tempServerSock.bind(('', 0)) #pick a free port on localhost.
address, tempServerSockPort = tempServerSock.getsockname()
tempServerSock.close() #We need to close it because nsmd will open it right away.
nsmOSCUrl = ("0.0.0.0", tempServerSockPort) #compatible to result of urlparse
logger.info("Generated our own free NSM_URL to start a server @ {}".format(nsmOSCUrl))
return nsmOSCUrl
def _isNsmdRunning(self, nsmOSCUrl):
"""Test if the port is open or not"""
logger.info(f"Testing if a server is running @ {nsmOSCUrl}")
hostname, port = nsmOSCUrl
tempServerSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #internet, udp
try:
tempServerSock.bind((hostname, port))
logger.info(f"No external nsmd found (we tested if port is closed) @ {nsmOSCUrl}")
return False
except:
logger.info(f"External nsmd found (we tested if port is closed) @ {nsmOSCUrl}")
return True
finally:
tempServerSock.close()
def _startNsmdOurselves(self, sessionRoot:str, startupSession:str):
assert self.nsmOSCUrl
hostname, port = self.nsmOSCUrl
arguments = ["nsmd","--osc-port", str(port)]
if sessionRoot:
arguments += ["--session-root", sessionRoot]
if startupSession:
logger.info(f"Got start-session as command line parameter. Fowarding to nsmd command line: {startupSession}")
arguments += ["--load-session", startupSession]
#nsmd allows all executables in $PATH. For technical reasons our GUI extends this PATH before we start the server.
#This is a convenience service for fellow developers, that does not belong in the server control.
#However, if you wonder why there are are more applications from unknown PATHs check qtgui/settings.py
self.ourOwnServer = subprocess.Popen(arguments)
def _blockingRequest(self, path:str, arguments:list, answerPath:str, answerArguments:list, repeat=False)->list:
"""During start-up we need to wait for replies. Also some operations only make sense
if we got data back. This is an abstraction that deals with messages that may come
out-of-order and keeps them for later, but at least prevents our side from sending
messages out-of-order itself.
Default is: send once, wait for answer. repeat=True sends multiple times until an answer arrives.
Returns list of arguments, can be empty.
"""
assert not self._queue, [(m.oscpath, m.params) for m in self._queue]
logger.info(f"[wait for answer]: Sending {path}: {arguments}")
self._setPause(True)
out_msg = _OutgoingMessage(path)
for arg in arguments:
out_msg.add_arg(arg)
if not repeat:
self.sock.sendto(out_msg.build(), self.nsmOSCUrl)
#Wait for answer
ready = False
while not ready:
if repeat: #we need to send multiple times.
self.sock.sendto(out_msg.build(), self.nsmOSCUrl)
try:
data, addr = self.sock.recvfrom(1024)
msg = _IncomingMessage(data)
if answerArguments and msg.oscpath == answerPath and msg.params == answerArguments:
result = msg.params
logger.info(f"[wait from {path}] Received {answerPath}: {result}")
ready = True
elif msg.oscpath == answerPath:
result = msg.params
logger.info(f"[wait from {path}] Received {answerPath}: {result}")
ready = True
else:
logger.warning(f"Waiting for {answerPath} from nsmd, but got: {msg.oscpath} with {msg.params}. Adding to queue for later.")
self._queue.append(msg)
except BlockingIOError: #happens while no data is received. Has nothing to do with blocking or not.
continue
except socket.timeout:
continue
self._setPause(False)
return result
def _waitForPingResponseBlocking(self):
self._blockingRequest(path="/osc/ping", arguments=[], answerPath="/reply", answerArguments=["/osc/ping",], repeat=True)
def _initial_announce(self)->pathlib.Path:
"""nsm/gui/gui_announce triggers a multi-stage reply. First we get "hi",
then we get the session root. We wait for session root and then clean 'hi' from the queue.
When we connect to a running nsmd we also receive /nsm/gui/session/name with the current
session (or empty string for no current).
If in a session we will receive a list of clients which ends the gui_announce stage.
Returns session root as pathlib-path."""
resultArguments = self._blockingRequest(path="/nsm/gui/gui_announce", arguments=[], answerPath="/nsm/gui/session/root", answerArguments=[])
if len(self._queue) == 1 and self._queue[0].oscpath == "/nsm/gui/gui_announce" and self._queue[0].params == ["hi"]:
logger.info("Got 'hi'. We are now the registered nsmd GUI as per our initial /nsm/gui/gui_announce")
self._queue.clear() #this is safe because we tested above that there is exactly the hi message in the queue.
else:
logging.error(f"For ValueError below: {[(m.oscpath, m.params) for m in self._queue]}")
raise ValueError("We were expecting a clean _queue with only 'hi' as leftover, but instead there were unhandled messages. see print above. Better abort than a wrong program state")
#all ok
return pathlib.Path(resultArguments[0])
#General Commands
def send(self, arg):
"""
Intended for a text input / command line interface.
Sends anything to nsmd, separated by semicolon. First part is the message address,
the rest are string-parameters."""
args = arg.split()
msg = _OutgoingMessage(args[0])
for p in args[1:]:
msg.add_arg(p)
self.sock.sendto(msg.build(), self.nsmOSCUrl)
def gui_announce(self):
"""This is just the announce without any answer. This is a last-resort method if another GUI
"stole" our slot. For our own initial announce we use self._initial_announce()"""
msg = _OutgoingMessage("/nsm/gui/gui_announce")
self.sock.sendto(msg.build(), self.nsmOSCUrl)
def ping(self):
msg = _OutgoingMessage("/osc/ping")
self.sock.sendto(msg.build(), self.nsmOSCUrl)
def list(self):
msg = _OutgoingMessage("/nsm/server/list")
self.sock.sendto(msg.build(), self.nsmOSCUrl)
def _updateSessionListBlocking(self):
"""To ensure correct data on session operations we manage ourselves,
like copy, rename and delete.
Ask nsmd for projects in session root and update our internal state.
This will return None without doing anything when we are already in a session.
This will wait for an answer and block all other operations.
First is /nsm/gui/server/message ['Listing sessions']
Then session names come one reply at a time such as /reply ['/nsm/server/list', 'test3']
Finally /nsm/server/list [0, 'Done.'] , not a reply
"""
#In the past we only regenerated if we are not in a session. However, that was overzealous.
#Some GUI functions did not work. Better regenerate that list as often as we want.
logger.info("Requesting project list from session server in blocking mode")
self._setPause(True)
msg = _OutgoingMessage("/nsm/server/list")
self.sock.sendto(msg.build(), self.nsmOSCUrl)
#Wait for /reply
ready = False
while True:
try:
data, addr = self.sock.recvfrom(1024)
except socket.timeout:
continue
msg = _IncomingMessage(data)
if not ready and msg.oscpath == "/nsm/gui/server/message" and msg.params == ["Listing sessions"]:
self.internalState["sessions"].clear() # new clients are added as /reply /nsm/server/list callbacks
ready = True
else:
if len(msg.params) != 2:
logger.warning(f"Expected project but got path {msg.oscpath} with {msg.params}. Adding to queue for later.")
self._queue.append(msg)
continue
#This is what we want:
elif msg.oscpath == "/reply" and msg.params[0] == "/nsm/server/list":
#/reply ['/nsm/server/list', 'test3'] for a real session or
#/reply ['/nsm/server/list', ''] as "list ended" marker
if msg.params[1]:
self.internalState["sessions"].add(msg.params[1])
logger.info(f"Received session name: {msg.params[1]}")
else: #empty string
break
elif msg.params[0] == 0 and msg.params[1] == "Done.": # legacy nsmd sent the wrong message. Fixed in new-session-manager june 2020
break
else:
logger.warning(f"Expected project but got path {msg.oscpath} with {msg.params}. Adding to queue for later.")
self._queue.append(msg)
continue
self._setPause(False)
def quit(self):
"""Called through atexit.
Thanks to start.py sys exception hook this will also trigger on PyQt crash"""
if self.ourOwnServer:
msg = _OutgoingMessage("/nsm/server/quit")
self.sock.sendto(msg.build(), self.nsmOSCUrl)
returncode = self.ourOwnServer.wait()
logger.info("Stopped our own server with return code {}".format(returncode))
def broadcast(self, path:str, arguments:list):
"""/nsm/server/broadcast s:path [arguments...]
http://non.tuxfamily.org/nsm/API.html 1.2.7.1
/nsm/server/broadcast s:path [arguments...]
/nsm/server/broadcast /tempomap/update "0,120,4/4:12351234,240,4/4"
All clients except the sender recive:
/tempomap/update "0,120,4/4:12351234,240,4/4"
"""
logger.info(f"Sending broadcast with path {path} and args {arguments}")
message = _OutgoingMessage("/nsm/server/broadcast")
message.add_arg(path)
for arg in arguments:
message.add_arg(arg) #type autodetect
self.sock.sendto(message.build(), self.nsmOSCUrl)
#Primarily Without Session
def open(self, nsmSessionName:str):
if nsmSessionName in self.internalState["sessions"]:
msg = _OutgoingMessage("/nsm/server/open")
msg.add_arg(nsmSessionName) #s:project_name
self.sock.sendto(msg.build(), self.nsmOSCUrl)
else:
logger.warning(f"Session {nsmSessionName} not found. Not forwarding to nsmd.")
def new(self, newName:str, startClients:list=[])->str:
"""Saves the current session and creates a new session.
Only works if dir does not exist yet.
"""
basePath = pathlib.Path(self.sessionRoot, newName)
if basePath.exists():
return None
self._addToNextSession = startClients
msg = _OutgoingMessage("/nsm/server/new")
msg.add_arg(newName) #s:project_name
self.sock.sendto(msg.build(), self.nsmOSCUrl)
#Only with Current Session
def save(self):
msg = _OutgoingMessage("/nsm/server/save")
self.sock.sendto(msg.build(), self.nsmOSCUrl)
def close(self, blocking=False):
if not blocking:
msg = _OutgoingMessage("/nsm/server/close")
self.sock.sendto(msg.build(), self.nsmOSCUrl)
else:
msg = _OutgoingMessage("/nsm/server/close")
self.sock.sendto(msg.build(), self.nsmOSCUrl)
#Drive the process loop ourselves. This will still trigger updates but the mainloop will wait.
while self.internalState["currentSession"]:
self.process()
def abort(self, blocking=False):
if not blocking:
msg = _OutgoingMessage("/nsm/server/abort")
self.sock.sendto(msg.build(), self.nsmOSCUrl)
else:
msg = _OutgoingMessage("/nsm/server/abort")
self.sock.sendto(msg.build(), self.nsmOSCUrl)
#Drive the process loop ourselves. This will still trigger updates but the mainloop will wait.
while self.internalState["currentSession"]:
self.process()
def duplicate(self, newName:str)->str:
"""Saves the current session and creates a new session.
Requires an open session and uses nsmd to do the work.
If you want to do copy of any session use our owns
self.sessionCopy"""
msg = _OutgoingMessage("/nsm/server/duplicate")
msg.add_arg(newName) #s:project_name
self.sock.sendto(msg.build(), self.nsmOSCUrl)
#Client Commands for Loaded Session
def clientAdd(self, executableName:str):
"""Adds a client to the current session.
executable must be in $PATH.
We do not trust NSM to perform the right checks. It will add an empty path or wrong
path.
"""
if not pathlib.Path(executableName).name == executableName:
logger.warning(f"{executableName} must be just an executable file in your $PATH. We expected: {pathlib.Path(executableName).name} . We will not ask nsmd to add it as client")
return False
allPaths = getenv("PATH")
assert allPaths, allPaths
binaryPaths = allPaths.split(":") #TODO: There is a corner case that NSMD runs in a different $PATH environment.
executableInPath = any(pathlib.Path(bp, executableName).is_file() for bp in binaryPaths)
if executableInPath:
msg = _OutgoingMessage("/nsm/server/add")
msg.add_arg(executableName) #s:executable_name
self.sock.sendto(msg.build(), self.nsmOSCUrl)
return True
else:
logger.warning("Executable {} not found. We will not ask nsmd to add it as client".format(executableName))
return False
def clientStop(self, clientId:str):
msg = _OutgoingMessage("/nsm/gui/client/stop")
msg.add_arg(clientId) #s:clientId
self.sock.sendto(msg.build(), self.nsmOSCUrl)
def clientResume(self, clientId:str):
"""Opposite of clientStop"""
msg = _OutgoingMessage("/nsm/gui/client/resume")
msg.add_arg(clientId) #s:clientId
self.sock.sendto(msg.build(), self.nsmOSCUrl)
def clientRemove(self, clientId:str):
"""Client needs to be stopped already. We will do that and wait for an answer.
Remove from the session. Will not delete the save-files, but make them inaccesible.
There is never a point in nsmservercontrol where self.internalState["clients"] is emptied.
nsmd actually sends a clientRemove for every client at session stop.
"""
#We have a blocking operation in here so we need to be extra cautios that the client exists.
if not clientId in self.internalState["clients"]:
return False
self.clientStop(clientId) #We need to wait for an answer.
#Drive the process loop ourselves. This will still trigger updates but the mainloop will wait.
logger.info(f"Waiting for {clientId} to be status 'stopped'")
while not self.internalState["clients"][clientId]["lastStatus"] == "stopped":
self.process()
msg = _OutgoingMessage("/nsm/gui/client/remove")
msg.add_arg(clientId) #s:clientId
self.sock.sendto(msg.build(), self.nsmOSCUrl)
#Flood lazy lagging nsmd until it removed the client.
#We will receive a few -10 "No such client." errors but that is ok.
while True:
if not clientId in self.internalState["clients"]:
break
if self.internalState["clients"][clientId]["lastStatus"] == "removed":
break
self.sock.sendto(msg.build(), self.nsmOSCUrl)
self.process()
def clientSave(self, clientId:str):
"""Saves only the given client"""
msg = _OutgoingMessage("/nsm/gui/client/save")
msg.add_arg(clientId) #s:clientId
self.sock.sendto(msg.build(), self.nsmOSCUrl)
def clientHide(self, clientId:str):
"""Hides the client. Works only if client announced itself with this feature"""
msg = _OutgoingMessage("/nsm/gui/client/hide_optional_gui")
msg.add_arg(clientId) #s:clientId
self.sock.sendto(msg.build(), self.nsmOSCUrl)
def clientShow(self, clientId:str):
"""Hides the client. Works only if client announced itself with this feature"""
msg = _OutgoingMessage("/nsm/gui/client/show_optional_gui")
msg.add_arg(clientId) #s:clientId
self.sock.sendto(msg.build(), self.nsmOSCUrl)
#Callbacks
def _reactCallback_guiAnnounce(self, parameters:list):
"""This should not happen, but let's keep it in in case of edge-case multi GUI scenarios"""
assert parameters == ["hi"], parameters
logger.info("We got an unexpected 'hi', as if requesting gui_announce. Our own initial GUI announce as received and processed silently earlier already.")
def _reactCallback_error(self, parameters:list):
logger.error(parameters)
def _reactCallback_reply(self, parameters:list):
"""This is a difficult function because replies arrive for many unrelated things, like
status. We do our best to send all replies on the right way"""
success = False
l = len(parameters)
if l == 2:
originalMessage, data = parameters
logger.info(f"Got reply for {originalMessage} with {data}")
reply = {
"/nsm/server/list" : self._reactReply_nsmServerList,
"/nsm/server/new" : self._reactReply_nsmServerNew,
"/nsm/server/close" : self._reactReply_nsmServerClose,
"/nsm/server/open" : self._reactReply_nsmServerOpen,
"/nsm/server/save" : self._reactReply_nsmServerSave,
"/nsm/server/abort" : self._reactReply_nsmServerAbort,
"/nsm/server/duplicate" : self._reactReply_nsmServerDuplicate,
"/nsm/server/add" : self._reactReply_nsmServerAdd,
}
if originalMessage in reply:
reply[originalMessage](data)
success = True
elif l == 3:
originalMessage, errorCode, answer = parameters
logger.info(f"Got reply for {originalMessage} with code {errorCode} saying {answer}")
if originalMessage == "/nsm/server/add":
assert errorCode == 0, parameters
self._reactReply_nsmServerAdd(answer)
success = True
elif l == 1:
singleMessage = parameters[0]
"""For unknown reasons these replies do not repeat the originalMessage"""
if singleMessage == "/osc/ping":
logger.info(singleMessage)
success = True
elif singleMessage == "Client removed.":
self._reactReply_nsmServerRemoved()
success = True
elif singleMessage == "Client stopped.":
self._reactReply_nsmServerStopped()
success = True
#After all these reactions and checks the function will eventually return here.
if not success:
raise NotImplementedError(parameters)
def _reactCallback_serverMessage(self, parameters:list):
"""Messages are normally harmless and uninteresting.
Howerver, we need to use some of them for actual tasks.
In opposite to reply and status this all go in our function for now, until refactoring"""
if parameters == ["Listing session"]:
#this feels bad! A simple message is not a reliable state token and could change in the future.
#we cannot put that into our own /list outgoing message because other actions like "new" also trigger this callback
self.internalState["sessions"].clear() # new clients are added as /reply /nsm/server/list callbacks
if parameters[0].startswith("Opening session"):
#This gets send only when an existing session starts loading. It will not trigger on new sessions, be it really new or duplicate.
#e.g. /nsm/gui/server/message ["Opening session FOO"]
nsmSessionName = parameters[0].replace("Opening session ", "")
logger.info(f"Starting to load clients of session: {nsmSessionName}")
self.sessionOpenLoadingHook(self.sessionAsDict(nsmSessionName)) #notify the api->UI
else:
logger.info("/nsm/gui/server/message " + repr(parameters))
def _reactCallback_broadcast(self, parameters:list):
"""We have nothing to do with broadcast. But we save them, so they can be shown on request
parameters[0] is an osc path:str without naming constraints
the rest is a list of arguments.
Attention: a broadcast is not saved by the server. You either are in the session to receive
it or you will miss it. If we run Agordejo as attached GUI (incl. --load-session) a broadcast
after the session was loaded, where programs announce themselves to all other clients,
will not be received here. Such is the case with our data-client.
"""
logger.info(f"Received broadcast. Saving in internal state: {parameters}")
self.internalState["broadcasts"].append(parameters)
#Our little trick. We know and like some clients better than others.
#If we detect our own data-storage we remember our friends.
#It is possible that another datastorage broadcasts, then we overwrite the URL.
if parameters and parameters[0] == "/agordejo/datastorage/announce":
path, clientId, messageSizeLimit, url = parameters
assert "osc.udp" in url
logger.info(f"Got announce from agordejo datastorage clientId {clientId} @ {url}")
o = urlparse(url)
self.dataStorage = DataStorage(self, clientId, messageSizeLimit, (o.hostname, o.port), self.sock)
def _reactCallback_serverList(self, parameters:list):
"""This finalizes a new session list. Here we send new data to the GUI etc."""
l = len(parameters)
if l == 2:
errorCode, message = parameters
assert errorCode == 0, errorCode
assert message == "Done.", message #don't miss the dot after Done
logger.info("/nsm/server/list is done and has transmitted all available sessions to us")
else:
raise NotImplementedError(parameters)
def _reactCallback_activeSessionChanged(self, parameters:list):
"""We receive this trough /nsm/gui/session/name
This is called when the session has already changed.
This also happens when you connect to a headless nsmd with a running session.
We expect two parameters: [session name, session path] both of which could be "".
If we start nsmd ourselves into an empty state we expect session name to be empty
Session path is the subdirectory relative to session root. The session root is not included.
!The unqiue name is the session path, not the name!
Shortly before we received /nsm/gui/session/session which indicates the attempt to create a
new one, I guess! :)
If you want to react to the attempt to open a session you need to use
/nsm/gui/server/message ["Opening session FOO"] OR creating a new session, after which nsmd
will open that session without a message.
Empty string is "No session" or "Choose A Session" mode.
"""
l = len(parameters)
if l == 2:
nsmSessionName, sessionPath = parameters
if not nsmSessionName and not sessionPath: #No session loaded. We are in session-choosing mode.
logger.info("Session closed or never started. Choose-A-Session mode.")
self.internalState["currentSession"] = None #sessionCloseHooked triggers rebuilding of the session list, which will not work when there is a current session.
self.sessionClosedHook()
else:
#Session path is the subdirectory relative to session root. The session root is not included.
sessionPath = sessionPath.lstrip("/") #we strip for technical reasons.
logger.info(f"Current Session changed. We are now {nsmSessionName} in {sessionPath}")
self.internalState["currentSession"] = sessionPath
#This is after the session, received after all programs have loaded.
#We have a counterpart-message reaction that signals the attempt to load.
self.sessionOpenReadyHook(self.sessionAsDict(sessionPath)) #notify the api->UI
for autoClientExecutableInPath in self._addToNextSession:
self.clientAdd(autoClientExecutableInPath)
self._addToNextSession = [] #reset
elif l == 0: #Another way of "no session".
self.internalState["currentSession"] = None #sessionCloseHooked triggers rebuilding of the session list, which will not work when there is a current session.
self.sessionClosedHook()
else:
raise NotImplementedError(parameters)
def _initializeEmptyClient(self, clientId:str):
"""NSM reuses signals. It is quite possible that this will be called multiple times,
e.g. after opening a session.
This is not a reaction callback, we call this ourselves only in _reactCallback_ClientNew
"""
#if not self.internalState["currentSession"]:
# logger.warning(f"We received a clientNew for ID {clientId} but no session open was received."
# "This would happen in an old nsmd version. If you see the GUI with an open session and a client list you can ignore this warning")
if clientId in self.internalState["clients"]:
return
logger.info(f"Creating new internal entry for client {clientId}")
client = {
"clientId":clientId, #for convenience, included internally as well
"dumbClient":True, #Bool. Real nsm or just any old program? status "Ready" switches this.
"executable":None, #Every client announces to the GUI with the exectuable name. True nsm clients later overwrite with a pretty name which we save as "reportedName"
"reportedName":None, #str . The reported name is first the executable name, for status started. But for NSM clients it gets replaced with a reported name.
"label":None, #str
"lastStatus":None, #str
"statusHistory":[], #list
"hasOptionalGUI": False, #bool
"visible": None, # bool
"dirty": None, # bool
}
self.internalState["clients"][clientId] = client
def _setClientData(self, clientId:str, parameter:str, value):
if clientId in self.internalState["clients"]:
self.internalState["clients"][clientId][parameter] = value
return True
else:
logger.warning(f"Client {clientId} not found in internal status storage. If the session was just closed this is most likely a known race condition. Everything is fine in this case.")
return False
def _reactCallback_ClientLabelChanged(self, parameters:list):
"""osc->add_method( "/nsm/gui/client/label", "ss", osc_handler, osc, "path,display_name" );
"""
l = len(parameters)
if l == 2:
clientId, label = parameters
logger.info(f"Label for client {clientId} changed to {label}")
self._setClientData(clientId, "label", label)
self.clientStatusHook(self.internalState["clients"][clientId])
else:
raise NotImplementedError(parameters)
def _reactCallback_clientPid(self, parameters:list):
clientId, pid = parameters
self._setClientData(clientId, "pid", pid)
def _reactCallback_SessionSession(self, parameters:list):
"""This is received only when a new session gets created and followed by
/nsm/gui/client/new and then a reply for
/reply /nsm/server/new Session created"""
#This is the counterpart to Message "Opening Session", but for really new or freshly duplicated session.
logger.info(f"Attempt to create session: {parameters}")
self.sessionOpenLoadingHook(self.sessionAsDict(parameters[0])) #notify the api->UI
def _reactCallback_ClientNew(self, parameters:list):
"""/nsm/gui/client/new ['nBAVO', 'jackpatch']
This is both add client or open.
The message comes twice. Once when you add a client, then parameters
will contain the executable name. If the client reports itself as NSM compatible through
announce we will also get the Open message through this function.
Then the name changes from executableName to a reportedName, which will remain for the rest
of the session. Executable name is still important to look up icons in the GUI.
This message is usually followed by /nsm/gui/client/status
"""
l = len(parameters)
if l == 2:
clientId, name = parameters
if not clientId in self.internalState["clients"]:
self._initializeEmptyClient(clientId)
self._setClientData(clientId, "executable", name)
logger.info(f"Client started {name}:{clientId}")
else:
self._setClientData(clientId, "reportedName", name)
logger.info(f"Client upgraded to NSM-compatible: {name}:{clientId}")
self.clientStatusHook(self.internalState["clients"][clientId])
else:
raise NotImplementedError(parameters)
def _reactCallback_clientDirty(self, parameters:list):
"""/nsm/gui/client/dirty ['nMAJH', 1]
"""
l = len(parameters)
if l == 2:
clientId, dirty = parameters
dirty = bool(dirty)
self._setClientData(clientId, "dirty", dirty)
logger.info(f"Client {clientId} save status dirty is now: {dirty}")
self.clientStatusHook(self.internalState["clients"][clientId])
else:
raise NotImplementedError(parameters)
def _reactCallback_clientGuiVisible(self, parameters:list):
"""/nsm/gui/client/gui_visible ['nMAJH', 0]
"""
l = len(parameters)
if l == 2:
clientId, visible = parameters
visible = bool(visible)
self._setClientData(clientId, "visible", visible)
logger.info(f"Client {clientId} visibility is now: {visible}")
self.clientStatusHook(self.internalState["clients"][clientId])
else:
raise NotImplementedError(parameters)
def _reactCallback_clientHasOptionalGui(self, parameters:list):
"""/nsm/gui/client/has_optional_gui ['nFDBK']
nsmd sends us this as reaction to a clients announce capabilities list
"""
l = len(parameters)
if l == 1:
clientId = parameters[0]
self._setClientData(clientId, "hasOptionalGUI", True)
logger.info(f"Client {clientId} supports optional GUI")
else:
raise NotImplementedError(parameters)
def _reactCallback_statusChanged(self, parameters:list):
"""
Handles all status messages.
Some changes, like removed and quit, are only available as status.
This means that status removed is the opposite of /nsm/gui/client/new, even if it doesn't
read like it.
/nsm/gui/client/status ['nFDBK', 'open']
/nsm/gui/client/status ['nMAJH', 'launch']
/nsm/gui/client/status ['nLUPX', 'ready']
/nsm/gui/client/status ['nLUPX', 'save']
/nsm/gui/client/status ['nFHLB', 'quit']
/nsm/gui/client/status ['nLUPX', 'removed']
/nsm/gui/client/status ['nLUPX', 'stopped']
/nsm/gui/client/status ['nLUPX', 'noop'] #For dumb clients! no nsm support!
/nsm/gui/client/status ['nLUPX', 'switch']
/nsm/gui/client/status ['nLUPX', 'error']
"""
l = len(parameters)
if l == 2:
clientId, status = parameters
logger.info(f"Client status {clientId} now {status}")
r = self._setClientData(clientId, "lastStatus", status)
if r: #a known race condition at quit may delete this in between calls
self.internalState["clients"][clientId]["statusHistory"].append(status)
if status == "ready": #we need to check for this now. Below in actions is after the statusHook and too late.
self._setClientData(clientId, "dumbClient", False)
self.clientStatusHook(self.internalState["clients"][clientId])
else:
raise NotImplementedError(parameters)
#Now handle our actions. For better readability in separate functions.
actions = {
"open": self._reactStatus_open,
"launch": self._reactStatus_launch,
"ready": self._reactStatus_ready,
"save": self._reactStatus_save,
"quit": self._reactStatus_quit,
"removed": self._reactStatus_removed,
"stopped": self._reactStatus_stopped,
"noop": self._reactStatus_noop,
"switch": self._reactStatus_switch,
"error": self._reactStatus_error,
}[status](clientId)
actions #pylint does not like temporary dicts for case-switch
def _reactStatus_removed(self, clientId:str):
"""Remove the client entry from our internal state.
This also covers crashes."""
if clientId in self.internalState["clients"]: #race condition at quit
del self.internalState["clients"][clientId]
if self.dataStorage and clientId == self.dataStorage.ourClientId: #We only care about the current data-storage, not another instance that was started before it.
self.dataClientNamesHook(None)
self.dataClientDescriptionHook(None)
self.dataClientTimelineMaximumDurationChangedHook(None)
self.dataStorage = None
def _reactStatus_stopped(self, clientId:str):
"""The client has stopped and can be restarted.
The status is not saved. NSM will try to open all clients on session open and end in "ready"
"""
if self.dataStorage and clientId == self.dataStorage.ourClientId: #We only care about the current data-storage, not another instance that was started before it.
self.dataClientNamesHook(None)
self.dataClientDescriptionHook(None)
self.dataClientTimelineMaximumDurationChangedHook(None)
self.dataStorage = None
def _reactStatus_launch(self, clientId:str):
"""
Launch is a transitional status for NSM clients but the terminal status for dumb clients
"""
pass
def _reactStatus_open(self, clientId:str):
"""
"""
pass
def _reactStatus_ready(self, clientId:str):
"""
This is sent after startup but also after every save.
It signals that the client can react to nsm signals, not that it is ready for something else.
Note that this is *After* the clientStatusHook, so any data changed here is not submitted to the
api/GUI yet. E.g. you can't change dumbClient to True here if that is needed directly
after start by the GUI.
"""
pass
def _reactStatus_save(self, clientId:str):
"""
"""
pass
def _reactStatus_quit(self, clientId:str):
"""
"""
pass
def _reactStatus_noop(self, clientId:str):
"""
Dumb clients, or rather nsmd, react with noop on signals they cannot understand, like
saving.
"""
pass
def _reactStatus_switch(self, clientId:str):
"""
"""
pass
def _reactStatus_error(self, clientId:str):
"""
"""
logger.error(f"{clientId} has error status!")
def _reactReply_nsmServerOpen(self, answer:str):
assert answer == "Loaded.", answer
def _reactReply_nsmServerSave(self, answer:str):
assert answer == "Saved.", answer
def _reactReply_nsmServerClose(self, answer:str):
assert answer == "Closed.", answer
def _reactReply_nsmServerAbort(self, answer:str):
assert answer == "Aborted.", answer
def _reactReply_nsmServerAdd(self, answer:str):
"""Reaction to add client"""
assert answer == "Launched.", answer
def _reactReply_nsmServerRemoved(self):
pass
def _reactReply_nsmServerStopped(self):
pass
def _reactReply_nsmServerDuplicate(self, answer:str):
"""There are a lot of errors possible here, reported through nsmd /error,
because we are dealing with the file system. Our own GUI and other safeguards should
protect us from most though
Positive answers are 'Duplicated.' when nsmd finished copying and 'Loaded.' when the new
session is loaded. Or so one would think... the messages arrive the other way around.
Anyway, both are needed to signify a succesful duplication.
"""
assert answer == "Loaded." or answer == "Duplicated.", answer
#We don't need any callbacks here, nsmd sends a session change on top of the duplicate replies.
def _reactReply_nsmServerNew(self, answer:str):
"""Created. arrives when a new session is created for the first time and directory is mkdir
Session created arrives when a session was opened and nsm created its internal "session".
We do not need to react to the new signal because we watch the dir for new sessions ourselves
and the currently active session is send through
"/nsm/gui/session/name" : self._reactCallback_activeSessionChanged,
"""
assert answer == 'Created.' or answer == "Session created", answer
def _reactReply_nsmServerList(self, nsmSessionName:str):
"""Session names come one reply at a time.
We reacted to the message /nsm/gui/server/message ['Listing sessions']
by clearing our internal session status and will save the new ones here
/reply ['/nsm/server/list', 'test3']
Do not confuse reply server list with the message /nsm/server/list [0, 'Done.']
The latter is a top level message :(
"""
self.internalState["sessions"].add(nsmSessionName)
#Our own functions
def allClientsHide(self):
for clientId, clientDict in self.internalState["clients"].items():
if clientDict["hasOptionalGUI"]:
self.clientHide(clientId)
def allClientsShow(self):
for clientId, clientDict in self.internalState["clients"].items():
if clientDict["hasOptionalGUI"]:
self.clientShow(clientId)
def clientToggleVisible(self, clientId:str):
if self.internalState["clients"][clientId]["hasOptionalGUI"]:
if self.internalState["clients"][clientId]["visible"]:
self.clientHide(clientId)
else:
self.clientShow(clientId)
#data-storage / nsm-data
def clientNameOverride(self, clientId:str, name:str):
"""An agordejo-specific function that requires the client nsm-data in the session.
If nsm-data is not present this function will write nothing, not touch any data.
It will still send a callback to revert any GUI changes back to the original name.
We accept empty string as a name to remove the name override
"""
if self.dataStorage:
assert clientId in self.internalState["clients"], self.internalState["clients"]
self.dataStorage.setClientOverrideName(clientId, name) #triggers callback
#data-storage / nsm-data
def setDescription(self, text:str):
if self.dataStorage:
self.dataStorage.setDescription(text)
#data-storage / nsm-data
def setTimelineMaximumDuration(self, minutes:int):
if self.dataStorage:
self.dataStorage.setTimelineMaximumDuration(minutes)
def _checkDirectoryForSymlinks(self, path)->bool:
for p in path.rglob("*"):
if p.is_symlink():
return True
return False
def _checkIfLocked(self, nsmSessionName:str)->bool:
basePath = pathlib.Path(self.sessionRoot, nsmSessionName)
assert basePath.exists()
lockFile = pathlib.Path(basePath, ".lock")
return lockFile.exists()
def forceLiftLock(self, nsmSessionName:str):
"""Removes lockfile, no matter if session is actually open or just a remainder
from a crash.
If no lock exist it does nothing."""
self._updateSessionListBlocking()
if self._checkIfLocked(nsmSessionName):
basePath = pathlib.Path(self.sessionRoot, nsmSessionName)
assert basePath.exists() #implied by _checkIfLocked
lockFile = pathlib.Path(basePath, ".lock")
lockFile.unlink(missing_ok=True)
logger.info(f"{nsmSessionName} was forced to unlock by us.")
else:
logger.info(f"Tried to unlock, but {nsmSessionName} is not locked")
def getSessionFiles(self, nsmSessionName:str)->list:
"""Return all session files, useful to present to the user, e.g. as warning
before deletion"""
self._updateSessionListBlocking()
basePath = pathlib.Path(self.sessionRoot, nsmSessionName)
assert basePath.exists()
return [f.as_posix() for f in basePath.rglob("*")] #Includes directories themselves
#Only files, no directories themselves.
#result = []
#for path, dirs, files in walk(basePath):
# for file in files:
# result.append(pathlib.Path(path, file).as_posix())
#return result
def deleteSession(self, nsmSessionName:str):
"""Delete project directory with all data. No undo.
Only if session is not locked"""
self._updateSessionListBlocking()
if not nsmSessionName in self.internalState["sessions"]:
logger.warning(f"{nsmSessionName} is not a session")
return False
basePath = pathlib.Path(self.sessionRoot, nsmSessionName)
assert basePath.exists()
if not self._checkIfLocked(nsmSessionName):
try:
logger.info(f"Deleting session {nsmSessionName}: {self.getSessionFiles(nsmSessionName)}")
shutilrmtree(basePath)
except PermissionError:
logger.warning(f"Tried to delete {basePath} but permission was denied.")
else:
logger.warning(f"Tried to delete {basePath} but it is locked")
self._updateSessionListBlocking() #if we don't update our internal representation the watchdog will go mad.
def renameSession(self, nsmSessionName:str, newName:str):
"""Only works if session is not locked and dir does not exist yet"""
self._updateSessionListBlocking()
newPath = pathlib.Path(self.sessionRoot, newName)
oldPath = pathlib.Path(self.sessionRoot, nsmSessionName)
assert oldPath.exists()
if self._checkIfLocked(nsmSessionName):
logger.warning(f"Can't rename {nsmSessionName} to {newName}. {nsmSessionName} is locked.")
return False
elif newPath.exists():
logger.warning(f"Can't rename {nsmSessionName} to {newName}. {newName} already exists.")
return False
else:
logger.info(f"Renaming {nsmSessionName} to {newName}.")
tmp = pathlib.Path(oldPath.name+str(uuid4())) #Can't move itself into a subdir in itself. move to temp first. We don't use tempdir because that could be on another partition. we already know we can write here.
oldPath.rename(tmp)
pathlib.Path(newPath).mkdir(parents=True, exist_ok=True)
tmp.rename(newPath)
assert newPath.exists()
def copySession(self, nsmSessionName:str, newName:str, progressHook=None):
"""Copy a whole tree. Keep symlinks as symlinks.
Lift lock.
If progressHook is provided (e.g. by a GUI) it will be called at regular intervals
to inform of the copy process, or at least that it is still running.
"""
self._updateSessionListBlocking()
source = pathlib.Path(self.sessionRoot, nsmSessionName)
destination = pathlib.Path(self.sessionRoot, newName)
if destination.exists():
logger.warning(f"Can't copy {nsmSessionName} to {newName}. {newName} already exists.")
return False
elif not nsmSessionName in self.internalState["sessions"]:
logger.warning(f"{nsmSessionName} is not a session")
return
elif not source.exists():
logger.warning(f"Can't copy {nsmSessionName} because it does not exist.")
return False
#All is well.
try:
def mycopy():
shutilcopytree(source, destination, symlinks=True, dirs_exist_ok=False) #raises an error if dir already exists. But we already test above.
if progressHook:
def waiter(copyProcess):
"""Compare the final size with the current size and generate a percentage
from it, which we send as progress"""
sourceDirectorySize = sum(f.stat().st_size for f in source.glob('**/*') if f.is_file()) - 2048 #padded so we don't create an infinite loop from a rounding error
destinationDirectorySize = sum(f.stat().st_size for f in destination.glob('**/*') if f.is_file())
#destinationDirectorySize does not start at 0. the copy() function might already by running before waiter() starts.
while destinationDirectorySize < sourceDirectorySize:
if not copyProcess.is_alive():
break
percentString = str( int((destinationDirectorySize / sourceDirectorySize) * 100)) + "%"
progressHook(percentString)
sleep(0.5) #don't send too much. two times a second is plenty.
#For next round
destinationDirectorySize = sum(f.stat().st_size for f in destination.glob('**/*') if f.is_file())
"""
#This moves both processes away from the main thread. It works, but Qt will not update anymore
#We need a way to just spawn one extra process and wait/processHook in the main process
processes = []
for function in (waiter, mycopy):
proc = Process(target=function)
proc.start()
processes.append(proc)
for proc in processes:
proc.join()
"""
proc = Process(target=mycopy)
proc.start()
waiter(proc) #has the while loop to wait and check proc
proc.join() #finish
#Do a check if both dirs are equal
progressHook("Veryfying file-integrity. This may take a while...") #string gets translated in qt gui mainwindow. Don't change just this here.
sourceHash = md5_dir(source)
desinationHash = md5_dir(destination)
if not sourceHash == desinationHash:
logger.error("ERROR! Copied session data is different from source session. Please check you data!")
progressHook("ERROR! Copied session data is different from source session. Please check you data!") #ERROR! is a keyword for the gui wait dialog to not switch away. This gets translated in the Qt GUI mainwindow. Don't change this string
else:
mycopy()
self.forceLiftLock(newName)
except Exception as e: #we don't want to crash if user tries to copy to /root or so.
logger.error(e)
return False
#Export to the User Interface
def sessionAsDict(self, nsmSessionName:str)->dict:
assert self.sessionRoot
entry = {}
entry["nsmSessionName"] = nsmSessionName
entry["name"] = pathlib.Path(nsmSessionName).name
basePath = pathlib.Path(self.sessionRoot, nsmSessionName)
sessionFile = pathlib.Path(basePath, "session.nsm")
if not sessionFile.exists():
#This is a reason to let the program exit.
logger.error("Got wrong session directory from nsmd. Race condition after delete? In any case a breaking error (please report). Quitting. Project was: " + repr(sessionFile))
sysexit() #return None switch to return None to let it crash and see the python traceback
timestamp = datetime.fromtimestamp(sessionFile.stat().st_mtime).isoformat(sep=" ", timespec='minutes')
entry["lastSavedDate"] = timestamp
entry["sessionFile"] = sessionFile
entry["lockFile"] = pathlib.Path(basePath, ".lock")
entry["fullPath"] = str(basePath)
entry["sizeInBytes"] = sum(f.stat().st_size for f in basePath.glob('**/*') if f.is_file() )
entry["numberOfClients"] = len(open(sessionFile).readlines())
entry["hasSymlinks"] = self._checkDirectoryForSymlinks(basePath)
entry["parents"] = basePath.relative_to(self.sessionRoot).parts[:-1] #tuple of each dir between NSM root and nsmSessionName/session.nsm, exluding the actual project name. This is the tree
entry["locked"] = self._checkIfLocked(nsmSessionName) #not for direct display
return entry
def exportSessionsAsDicts(self)->list:
"""Return a list of dicts of projects with additional information:
"""
logger.info("Exporting sessions to dict. Will call blocking list sessions next")
results = []
#assert not self.internalState["currentSession"], self.internalState["currentSession"] #Do not request session list while in active session
self._updateSessionListBlocking()
for nsmSessionName in self.internalState["sessions"]:
result = self.sessionAsDict(nsmSessionName)
results.append(result)
return results
class DataStorage(object):
"""Interface to handle the external datastorage client
url is pre-processed (host, port)
Our init is the same as announcing the nsm-data client in the session.
That means everytime nsm-data sends a new/open reply we get created.
Thus we will send all our data to parent and subsequently to GUI-callbacks in init.
Keys are strings,
While nsmd OSC support int, str and float we use json exclusively.
We send json string and parse the received data.
Try to use only ints, floats, strin
gs, lists and dicts.
Client pretty names are limited to 512 chars, depending on our OSC message size.
nsm-data will just cut to 512 chars. So a GUI should better protect that limit.
"""
def __init__(self, parent, ourClientId, messageSizeLimit:int, url:tuple, sock):
logger.info("Create new DataStorage instance")
self.parent = parent
self.messageSizeLimit = messageSizeLimit # e.g. 512
self.ourClientId = ourClientId
self.clients = parent.internalState["clients"] #shortcut. Mutable, persistent dict, until instance gets deleted.
self.url = url
self.sock = sock
self.ip, self.port = self.sock.getsockname()
#Get initial data. Directly send to the api->GUI.
self.data = self.getAll() #blocks. our local copy. = {"clientOverrideNames":{clientId:nameOverride}, "description":"str", "timelineMaximumDuration":"minutes in int"}
self.namesToParentAndCallbacks()
self.descriptionToParentAndCallbacks()
self.timelineMaximumDurationToParentAndCallbacks()
def namesToParentAndCallbacks(self):
self.parent.dataClientNamesHook(self.data["clientOverrideNames"])
def descriptionToParentAndCallbacks(self):
"""Every char!!!"""
self.parent.dataClientDescriptionHook(self.data["description"])
def timelineMaximumDurationToParentAndCallbacks(self):
self.parent.dataClientTimelineMaximumDurationChangedHook(self.data["timelineMaximumDuration"])
def _waitForMultipartMessage(self, pOscpath:str)->str:
"""Returns a json string, as if the message was sent as a single one.
Can consist of only one part as well."""
logger.info(f"Waiting for multi message {pOscpath} in blocking mode")
self.parent._setPause(True)
jsonString = ""
chunkNumberOfParts = float("+inf") #zero based
currentPartNumber = float("-inf") #zero based
while True:
if currentPartNumber >= chunkNumberOfParts:
break
try:
data, addr = self.sock.recvfrom(1024)
except socket.timeout:
break
msg = _IncomingMessage(data)
if msg.oscpath == pOscpath:
currentPartNumber, l, jsonChunk = msg.params
jsonString += jsonChunk
chunkNumberOfParts = l #overwrite infinity the first time and redundant afterwards.
else:
self.parent._queue.append(msg)
self.parent._setPause(False)
logger.info(f"Message complete with {chunkNumberOfParts} chunks.")
return jsonString
def getAll(self):
"""Mirror everything from nsm-data"""
msg = _OutgoingMessage("/agordejo/datastorage/getall")
msg.add_arg(self.ip)
msg.add_arg(self.port)
self.sock.sendto(msg.build(), self.url)
jsonString = self._waitForMultipartMessage("/agordejo/datastorage/reply/getall")
return json.loads(jsonString)
def setTimelineMaximumDuration(self, minutes:int):
msg = _OutgoingMessage("/agordejo/datastorage/settimelinemaximum")
msg.add_arg(json.dumps(minutes))
self.sock.sendto(msg.build(), self.url)
self.getTimelineMaximumDuration()
def getTimelineMaximumDuration(self):
msg = _OutgoingMessage("/agordejo/datastorage/gettimelinemaximum")
msg.add_arg(self.ip)
msg.add_arg(self.port)
self.sock.sendto(msg.build(), self.url)
#Wait in blocking mode
self.parent._setPause(True)
while True:
try:
data, addr = self.sock.recvfrom(1024)
except socket.timeout:
break
msg = _IncomingMessage(data)
if msg.oscpath == "/agordejo/datastorage/reply/gettimelinemaximum":
jsonMinutes = msg.params[0] #list of one
answerMinutes = json.loads(jsonMinutes)
break
else:
self.parent._queue.append(msg)
self.parent._setPause(False)
#Got answer
assert type(answerMinutes) is int, (answerMinutes, type(answerMinutes))
self.data["timelineMaximumDuration"] = answerMinutes
self.timelineMaximumDurationToParentAndCallbacks()
def setClientOverrideName(self, clientId:str, value):
"""We accept empty string as a name to remove the name override"""
assert clientId in self.clients, self.clients
msg = _OutgoingMessage("/agordejo/datastorage/setclientoverridename")
msg.add_arg(clientId)
msg.add_arg(json.dumps(value))
self.sock.sendto(msg.build(), self.url)
self.getClientOverrideName(clientId) #verifies data and triggers callback
def getClientOverrideName(self, clientId:str):
msg = _OutgoingMessage("/agordejo/datastorage/getclientoverridename")
msg.add_arg(clientId)
msg.add_arg(self.ip)
msg.add_arg(self.port)
self.sock.sendto(msg.build(), self.url)
#Wait in blocking mode
self.parent._setPause(True)
while True:
try:
data, addr = self.sock.recvfrom(1024)
except socket.timeout:
break
msg = _IncomingMessage(data)
if msg.oscpath == "/agordejo/datastorage/reply/getclient":
replyClientId, jsonName = msg.params
assert replyClientId == clientId, (replyClientId, clientId)
break
else:
self.parent._queue.append(msg)
self.parent._setPause(False)
#Got answer
answer = json.loads(jsonName)
if answer:
self.data["clientOverrideNames"][clientId] = answer
else:
#It is possible that a client not present in our storage will send an empty string. Protect.
if clientId in self.data["clientOverrideNames"]:
del self.data["clientOverrideNames"][clientId]
self.namesToParentAndCallbacks()
def _chunkstring(self, string):
return [string[0+i:self.messageSizeLimit+i] for i in range(0, len(string), self.messageSizeLimit)]
def setDescription(self, text:str):
"""This most likely arrives one char at time with the complete text"""
chunks = self._chunkstring(text)
descriptionId = str(id(text))[:8]
for index, chunk in enumerate(chunks):
msg = _OutgoingMessage("/agordejo/datastorage/setdescription")
msg.add_arg(descriptionId)
msg.add_arg(index)
msg.add_arg(chunk)
msg.add_arg(self.ip)
msg.add_arg(self.port)
self.sock.sendto(msg.build(), self.url)
#No echo answer.
#We cheat a bit and inform parents with the new text directly.
self.data["description"] = text
self.descriptionToParentAndCallbacks() #and back
#Generic Functions. Not in use and not ready.
def _test(self):
self.readAll()
self.setDescription("Ein Jäger aus Kurpfalz,\nDer reitet durch den grünen Wald,\nEr schießt das Wild daher,\nGleich wie es ihm gefällt.")
self.read("welt")
self.create("welt", "world")
self.read("welt")
self.create("str", "bar")
self.create("int", 1)
self.create("list", [1, 2, 3])
self.create("tuple", (1, 2, 3)) #no tuples, everything will be a list.
self.create("dict", {1:2, 3:4, 5:6})
self.update("str", "rolf")
self.delete("str")
def read(self, key:str):
"""Request one value"""
msg = _OutgoingMessage("/agordejo/datastorage/read")
msg.add_arg(key)
msg.add_arg(self.ip)
msg.add_arg(self.port)
self.sock.sendto(msg.build(), self.url)
def readAll(self):
"""Request all data"""
msg = _OutgoingMessage("/agordejo/datastorage/readall")
msg.add_arg(self.ip)
msg.add_arg(self.port)
self.sock.sendto(msg.build(), self.url)
def create(self, key:str, value):
"""Write/Create one value."""
msg = _OutgoingMessage("/agordejo/datastorage/create")
msg.add_arg(key)
msg.add_arg(json.dumps(value))
self.sock.sendto(msg.build(), self.url)
def update(self, key:str, value):
"""Update a value, but only if it exists"""
msg = _OutgoingMessage("/agordejo/datastorage/update")
msg.add_arg(key)
msg.add_arg(json.dumps(value))
self.sock.sendto(msg.build(), self.url)
def delete(self, key:str):
"""Delete a key/value completely"""
msg = _OutgoingMessage("/agordejo/datastorage/delete")
msg.add_arg(key)
self.sock.sendto(msg.build(), self.url)