Music production session manager https://www.laborejo.org
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1622 lines
73 KiB

4 years ago
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Copyright 2020, Nils Hilbricht, Germany ( https://www.hilbricht.net )
The Non-Session-Manager by Jonathan Moore Liles <male@tuxfamily.org>: http://non.tuxfamily.org/nsm/
With help from code fragments from https://github.com/attwad/python-osc ( DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE v2 )
API documentation: http://non.tuxfamily.org/nsm/API.html
This file is part of the Laborejo Software Suite ( https://www.laborejo.org ).
This application is free software: you can redistribute it and/or modify
4 years ago
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import logging; logger = logging.getLogger(__name__); logger.info("import")
#Standard Library
import struct
import socket
from os import getenv #to get NSM env var
from shutil import rmtree as shutilrmtree
from shutil import copytree as shutilcopytree
from urllib.parse import urlparse #to convert NSM env var
import subprocess
import atexit
import pathlib
import json
from uuid import uuid4
4 years ago
from datetime import datetime
from sys import exit as sysexit
class _IncomingMessage(object):
"""Representation of a parsed datagram representing an OSC message.
An OSC message consists of an OSC Address Pattern followed by an OSC
Type Tag String followed by zero or more OSC Arguments.
"""
def __init__(self, dgram):
#NSM Broadcasts are bundles, but very simple ones. We only need to care about the single message it contains.
#Therefore we can strip the bundle prefix and handle it as normal message.
if b"#bundle" in dgram:
bundlePrefix, singleMessage = dgram.split(b"/", maxsplit=1)
dgram = b"/" + singleMessage # / eaten by split
self.LENGTH = 4 #32 bit
self._dgram = dgram
self._parameters = []
self.parse_datagram()
def get_int(self, dgram, start_index):
"""Get a 32-bit big-endian two's complement integer from the datagram.
Args:
dgram: A datagram packet.
start_index: An index where the integer starts in the datagram.
Returns:
A tuple containing the integer and the new end index.
Raises:
ValueError if the datagram could not be parsed.
"""
try:
if len(dgram[start_index:]) < self.LENGTH:
raise ValueError('Datagram is too short')
return (
struct.unpack('>i', dgram[start_index:start_index + self.LENGTH])[0], start_index + self.LENGTH)
except (struct.error, TypeError) as e:
raise ValueError('Could not parse datagram %s' % e)
def get_string(self, dgram, start_index):
"""Get a python string from the datagram, starting at pos start_index.
We receive always the full string, but handle only the part from the start_index internally.
In the end return the offset so it can be added to the index for the next parameter.
Each subsequent call handles less of the same string, starting further to the right.
According to the specifications, a string is:
"A sequence of non-null ASCII characters followed by a null,
followed by 0-3 additional null characters to make the total number
of bits a multiple of 32".
Args:
dgram: A datagram packet.
start_index: An index where the string starts in the datagram.
Returns:
A tuple containing the string and the new end index.
Raises:
ValueError if the datagram could not be parsed.
"""
#First test for empty string, which is nothing, followed by a terminating \x00 padded by three additional \x00.
if dgram[start_index:].startswith(b"\x00\x00\x00\x00"):
return "", start_index + 4
#Otherwise we have a non-empty string that must follow the rules of the docstring.
offset = 0
try:
while dgram[start_index + offset] != 0:
offset += 1
if offset == 0:
raise ValueError('OSC string cannot begin with a null byte: %s' % dgram[start_index:])
# Align to a byte word.
if (offset) % self.LENGTH == 0:
offset += self.LENGTH
else:
offset += (-offset % self.LENGTH)
# Python slices do not raise an IndexError past the last index,
# do it ourselves.
if offset > len(dgram[start_index:]):
raise ValueError('Datagram is too short')
data_str = dgram[start_index:start_index + offset]
return data_str.replace(b'\x00', b'').decode('utf-8'), start_index + offset
except IndexError as ie:
raise ValueError('Could not parse datagram %s' % ie)
except TypeError as te:
raise ValueError('Could not parse datagram %s' % te)
def get_float(self, dgram, start_index):
"""Get a 32-bit big-endian IEEE 754 floating point number from the datagram.
Args:
dgram: A datagram packet.
start_index: An index where the float starts in the datagram.
Returns:
A tuple containing the float and the new end index.
Raises:
ValueError if the datagram could not be parsed.
"""
try:
return (struct.unpack('>f', dgram[start_index:start_index + self.LENGTH])[0], start_index + self.LENGTH)
except (struct.error, TypeError) as e:
raise ValueError('Could not parse datagram %s' % e)
def parse_datagram(self):
try:
self._address_regexp, index = self.get_string(self._dgram, 0)
if not self._dgram[index:]:
# No params is legit, just return now.
return
# Get the parameters types.
type_tag, index = self.get_string(self._dgram, index)
if type_tag.startswith(','):
type_tag = type_tag[1:]
# Parse each parameter given its type.
for param in type_tag:
if param == "i": # Integer.
val, index = self.get_int(self._dgram, index)
elif param == "f": # Float.
val, index = self.get_float(self._dgram, index)
elif param == "s": # String.
val, index = self.get_string(self._dgram, index)
else:
logger.warning("Unhandled parameter type: {0}".format(param))
continue
self._parameters.append(val)
except ValueError as pe:
#raise ValueError('Found incorrect datagram, ignoring it', pe)
# Raising an error is not ignoring it!
logger.warning("Found incorrect datagram, ignoring it. {}".format(pe))
@property
def oscpath(self):
"""Returns the OSC address regular expression."""
return self._address_regexp
@staticmethod
def dgram_is_message(dgram):
"""Returns whether this datagram starts as an OSC message."""
return dgram.startswith(b'/')
@property
def size(self):
"""Returns the length of the datagram for this message."""
return len(self._dgram)
@property
def dgram(self):
"""Returns the datagram from which this message was built."""
return self._dgram
@property
def params(self):
"""Convenience method for list(self) to get the list of parameters."""
return list(self)
def __iter__(self):
"""Returns an iterator over the parameters of this message."""
return iter(self._parameters)
class _OutgoingMessage(object):
def __init__(self, oscpath):
self.LENGTH = 4 #32 bit
self.oscpath = oscpath
self._args = []
def write_string(self, val):
dgram = val.encode('utf-8')
diff = self.LENGTH - (len(dgram) % self.LENGTH)
dgram += (b'\x00' * diff)
return dgram
def write_int(self, val):
return struct.pack('>i', val)
def write_float(self, val):
return struct.pack('>f', val)
def add_arg(self, argument):
t = {str:"s", int:"i", float:"f"}[type(argument)]
self._args.append((t, argument))
def build(self):
dgram = b''
#OSC Path
dgram += self.write_string(self.oscpath)
if not self._args:
dgram += self.write_string(',')
return dgram
# Write the parameters.
arg_types = "".join([arg[0] for arg in self._args])
dgram += self.write_string(',' + arg_types)
for arg_type, value in self._args:
f = {"s":self.write_string, "i":self.write_int, "f":self.write_float}[arg_type]
dgram += f(value)
return dgram
class NsmServerControl(object):
"""
The ServerControl can be started in three modes, regarding nsmd.
We expect that starting our own nsmd will be the majority of cases.
SessionRoot parameter is only honored if we start nsmd ourselves.
Ascending lookup priority:
1) Default is to start our own nsmd. A single-instance watcher will prevent multiple programs
on the same system.
2) When $NSM_URL is found as environment we will connect to that nsmd.
3) When hostname and portnumber are given explicitely as instance variables we will first test
if a server is running at that URL, if not we will start our own with these parameters.
This is not only a pure implemenation of the protocol.
It is extended by us reacting to and storing incoming data. This data can be interpreted
and enhanced by looking at the session dir ourselves. However, we don't do anything that
is not possible by the original nsmd + human interaction.
100% Compatibility is the highest priority.
The big problems are the async nature of communication, message come out of order or interleaved,
and nsm is not consistent in its usage of osc-paths. For example it starts listing sessions
with /nsm/gui/server/message, but sends the content with /reply [/nsm/server/list, nsmSessionName] and
then ends it with /nsm/server/list [0, Done] (no reply!). So three message types, three callbacks
for one logically connected process.
To update our internal session information we therefore need to split the functionality into
severall seemingly unconnected callbacks and you need to know how the protocol works to actually
know the order of operations. Switch logging to info to learn more.
We have a mix between NSM callbacks and our own functions.
Most important there is a watchdog that looks at the session directory and creates its own
callbacks if something changes.
A typical operation, say sessionDelete or sessionCopy looks like this:
* Ask (blocking) nsmd for a current list of sessions, update our internal state
* Perform a file operation, like copy or delete or lift a lock
* Let our watchdog discover the changes in the file system and trigger another (non-blocking)
request for a list of sessions to adjust our internal state to reality.
Granted, we could just call our blocking query again at the end, but we would still need to
let the watchdog run for operations that the user does with a filemanager, which would end up
in redundant calls. Bottom line: _updateSessionListBlocking is called at the beginning of a
function, but not at the end.
Docs:
http://non.tuxfamily.org/nsm/
http://non.tuxfamily.org/wiki/Non%20Session%20Manager
http://non.tuxfamily.org/wiki/ApplicationsSupportingNsm
http://non.tuxfamily.org/nsm/API.html
"""
def __init__(self, sessionOpenReadyHook, sessionOpenLoadingHook, sessionClosedHook, clientStatusHook, singleInstanceActivateWindowHook, dataClientNamesHook, dataClientDescriptionHook, parameterNsmOSCUrl=None, sessionRoot=None, useCallbacks=True):
"""If useCallbacks is False you will see every message in the log.
This is just a development mode to see all messages, unfiltered.
Normally we have special hook functions that save and interpret data, so they don't
show in the logs"""
self._queue = list() #Incoming OSC messages are buffered here.
#Status variables that are set by our callbacks
self.internalState = {
"sessions" : set(), #nsmSessionNames:str . We use set for unqiue, just in case. But we also clear that on /nsm/gui/server/message ['Listing sessions'] to handle deleted sessions
"currentSession" : None,
"port" : None, #Our GUI port
"serverPort" : None, #nsmd port
"nsmUrl" : None, #the environment variable
"clients" : {}, #clientId:dict see self._initializeEmptyClient . Gets replaced with a new dict instance on session changes.
"broadcasts" : [], #in the order they appeared
"datastorage" : None, #URL, if present in the session
}
self.dataStorage = None #Becomes DataStorage() every time a datastorage client does a broadcast announce.
self._addToNextSession = [] #A list of executables in PATH. Filled by new, waits for reply that session is created and then will send clientNew and clear the list.
#Hooks for api callbacks
self.sessionOpenReadyHook = sessionOpenReadyHook #nsmSessionName as parameter
self.sessionOpenLoadingHook = sessionOpenLoadingHook #nsmSessionName as parameter
self.sessionClosedHook = sessionClosedHook #no parameter. This is also "choose a session" mode
self.clientStatusHook = clientStatusHook #all client status is done via this single hook. GUIs need to check if they already know the client or not.
self.dataClientNamesHook = dataClientNamesHook
self.dataClientDescriptionHook = dataClientDescriptionHook
self.singleInstanceActivateWindowHook = singleInstanceActivateWindowHook #added to self.processSingleInstance() to listen for a message from another wannabe-instance
if useCallbacks:
self.callbacks = {
"/nsm/gui/session/name" : self._reactCallback_activeSessionChanged,
#"/nsm/gui/session/root" #Session root is an active blocking call in init
"/nsm/gui/client/label" : self._reactCallback_ClientLabelChanged,
"/nsm/gui/client/new" : self._reactCallback_ClientNew,
"/nsm/gui/session/session" : self._reactCallback_SessionSession,
"/nsm/gui/client/status" : self._reactCallback_statusChanged, #handles multiple status keywords
"/reply" : self._reactCallback_reply, #handles multiple replies
"/error" : self._reactCallback_error,
"/nsm/gui/client/has_optional_gui" : self._reactCallback_clientHasOptionalGui,
"/nsm/gui/client/gui_visible" : self._reactCallback_clientGuiVisible,
"/nsm/gui/client/pid" : self._reactCallback_clientPid,
"/nsm/gui/client/dirty" : self._reactCallback_clientDirty,
"/nsm/gui/server/message" : self._reactCallback_serverMessage,
"/nsm/gui/gui_announce" : self._reactCallback_guiAnnounce,
"/nsm/server/list" : self._reactCallback_serverList,
"/nsm/server/broadcast" : self._reactCallback_broadcast,
}
else: #This is just a development mode to see all messages, unfiltered
self.callbacks = set() #empty set is easiest to check
#Networking and Init for our control part, not for the server
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #internet, udp
self.sock.bind(('', 0)) #pick a free port on localhost.
self.sock.setblocking(False)
self.internalState["port"] = self.sock.getsockname()[1] #only happens once, ports don't change during runtime
#self.sock.close() Do not close, this runs until the end of the program
###Testing of existing servers, starting and connecting
#First handle the NSM URL, or generate on.
#self.nsmOSCUrl must be a tuple compatible to the result of urlparse. (hostname, port)
self.singleInstanceSocket = None
if parameterNsmOSCUrl:
o = urlparse(parameterNsmOSCUrl)
self.nsmOSCUrl = (o.hostname, o.port)
else:
envResult = self._getNsmOSCUrlFromEnvironment()
if envResult:
self.nsmOSCUrl = envResult
else:
#This is the default case. User just starts the GUI. The other modes are concious decisions to either start with URL as parameter or in an NSM environment.
#But now we need to test if the user accidentaly opened a second GUI, which would start a second server.
self._setupAndTestForSingleInstance() #This might quit the whole program and we will never see the next line.
self.nsmOSCUrl = self._generateFreeNsmOSCUrl()
assert self.nsmOSCUrl
self.internalState["serverPort"] = self.nsmOSCUrl[1] #only happens once, ports don't change during runtime
self.internalState["nsmUrl"] = f"osc.udp://{self.nsmOSCUrl[0]}:{self.nsmOSCUrl[1]}/" #only happens once, ports don't change during runtime
#Then check if a server is running there. If not start one.
self.ourOwnServer = None #Might become a subprocess handle
if self._isNsmdRunning(self.nsmOSCUrl):
#serverport = self.nsmOSCUrl[1]
#No further action required. GUI announce below this testing.
pass
else:
self._startNsmdOurselves(sessionRoot) #Session root can be a commandline parameter we forward to the server if we start it ourselves.
assert type(self.ourOwnServer) is subprocess.Popen, (self.ourOwnServer, type(self.ourOwnServer))
#Wait for the server, or test if it is reacting.
self._waitForPingResponseBlocking()
logger.info("nsmd is ready @ {}".format(self.nsmOSCUrl))
#Tell nsmd that we are a GUI and want to receive general messages async, not only after we request something
self.sessionRoot = self._initial_announce() #Triggers "hi" and session root
4 years ago
self.internalState["sessionRoot"] = self.sessionRoot
atexit.register(self.quit) #mostly does stuff when we started nsmd ourself
self._receiverActive = True
#Now an external event loop can add self.process
#Internal Methods
def _setupAndTestForSingleInstance(self):
"""on program startup trigger this if there is already another instance of us running.
This socket is only
"""
self.singleInstanceSocket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
logger.info("Testing if another non-specific Argodejo is running.")
try:
## Create an abstract socket, by prefixing it with null.
# this relies on a feature only in linux, when current process quits, the
# socket will be deleted.
self.singleInstanceSocket.bind('\0' + "argodejo")
self.singleInstanceSocket.listen(1)
self.singleInstanceSocket.setblocking(False)
logger.info("No other non-specific Argodejo found. Starting GUI")
#Continue in self.processSingleInstance()
return True
except socket.error:
logger.error("GUI for this nsmd server already running. Informing the existing application to show itself.")
self.singleInstanceSocket.connect('\0' + "argodejo")
self.singleInstanceSocket.send("argodejoactivate".encode());
self.singleInstanceSocket.close()
sysexit(1) #triggers atexit
#print ("not executed")
return False
def processSingleInstance(self):
"""Tests our unix socket for an incoming signal.
if received forward to the engine->gui
Can be added to a slower event loop, so it is not in self.process"""
if self.singleInstanceSocket:
try:
connection, client_address = self.singleInstanceSocket.accept() #This blocks and waits for a message
incoming = connection.recv(1024)
if incoming and incoming == b"argodejoactivate":
self.singleInstanceActivateWindowHook()
except BlockingIOError: #happens while no data is received. Has nothing to do with blocking or not. In fact: this happens when in non-blocking mode.
pass
except socket.timeout:
pass
def _setPause(self, state:bool):
"""Set both the socket and the thread into waiting mode or not.
With this we can wait for answers until we resume async operation"""
if state:
self.sock.setblocking(True) #explicitly wait.
self.sock.settimeout(0.5)
self._receiverActive = False
logger.info("Suspending receiving async mode.")
else:
self.sock.setblocking(False)
self._receiverActive = True
logger.info("Resuming receiving async mode.")
def process(self):
"""Use this in an external event loop"""
if self._receiverActive:
while True:
try:
data, addr = self.sock.recvfrom(1024)
msg = _IncomingMessage(data)
if msg:
self._queue.append(msg)
except BlockingIOError: #happens while no data is received. Has nothing to do with blocking or not.
break
except socket.timeout:
break
for msg in self._queue:
if msg.oscpath in self.callbacks:
self.callbacks[msg.oscpath](msg.params)
else:
logger.warning(f"Unhandled message with path {msg.oscpath} and parameters {msg.params}")
self._queue.clear()
def _getNsmOSCUrlFromEnvironment(self):
"""Return the nsm osc url or None"""
nsmOSCUrl = getenv("NSM_URL")
if not nsmOSCUrl:
return None
else:
#osc.udp://hostname:portnumber/
o = urlparse(nsmOSCUrl)
return o.hostname, o.port
def _generateFreeNsmOSCUrl(self):
#Instead of reading out the NSM port we get a free port ourselves and set up nsmd with that
tempServerSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #internet, udp
tempServerSock.bind(('', 0)) #pick a free port on localhost.
address, tempServerSockPort = tempServerSock.getsockname()
tempServerSock.close() #We need to close it because nsmd will open it right away.
nsmOSCUrl = ("0.0.0.0", tempServerSockPort) #compatible to result of urlparse
logger.info("Generated our own free NSM_URL to start a server @ {}".format(nsmOSCUrl))
return nsmOSCUrl
def _isNsmdRunning(self, nsmOSCUrl):
"""Test if the port is open or not"""
logger.info(f"Testing if a server is running @ {nsmOSCUrl}")
hostname, port = nsmOSCUrl
tempServerSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #internet, udp
try:
tempServerSock.bind((hostname, port))
logger.info(f"No external nsmd found (we tested if port is closed) @ {nsmOSCUrl}")
return False
except:
logger.info(f"External nsmd found (we tested if port is closed) @ {nsmOSCUrl}")
return True
finally:
tempServerSock.close()
def _startNsmdOurselves(self, sessionRoot:str):
assert self.nsmOSCUrl
hostname, port = self.nsmOSCUrl
4 years ago
if sessionRoot:
self.ourOwnServer = subprocess.Popen(["nsmd","--osc-port", str(port), "--session-root", sessionRoot])
4 years ago
else:
self.ourOwnServer = subprocess.Popen(["nsmd","--osc-port", str(port)])
def _blockingRequest(self, path:str, arguments:list, answerPath:str, answerArguments:list, repeat=False)->list:
"""During start-up we need to wait for replies. Also some operations only make sense
if we got data back. This is an abstraction that deals with messages that may come
out-of-order and keeps them for later, but at least prevents our side from sending
messages out-of-order itself.
4 years ago
Default is: send once, wait for answer. repeat=True sends multiple times until an answer arrives.
4 years ago
Returns list of arguments, can be empty.
4 years ago
"""
assert not self._queue, [(m.oscpath, m.params) for m in self._queue]
logger.info(f"[wait for answer]: Sending {path}: {arguments}")
self._setPause(True)
out_msg = _OutgoingMessage(path)
for arg in arguments:
out_msg.add_arg(arg)
4 years ago
if not repeat:
self.sock.sendto(out_msg.build(), self.nsmOSCUrl)
4 years ago
#Wait for answer
4 years ago
ready = False
while not ready:
if repeat: #we need to send multiple times.
self.sock.sendto(out_msg.build(), self.nsmOSCUrl)
try:
data, addr = self.sock.recvfrom(1024)
msg = _IncomingMessage(data)
if answerArguments and msg.oscpath == answerPath and msg.params == answerArguments:
result = msg.params
logger.info(f"[wait from {path}] Received {answerPath}: {result}")
ready = True
elif msg.oscpath == answerPath:
result = msg.params
logger.info(f"[wait from {path}] Received {answerPath}: {result}")
ready = True
else:
logger.warning(f"Waiting for {answerPath} from nsmd, but got: {msg.oscpath} with {msg.params}. Adding to queue for later.")
self._queue.append(msg)
except BlockingIOError: #happens while no data is received. Has nothing to do with blocking or not.
continue
except socket.timeout:
continue
4 years ago
self._setPause(False)
return result
def _waitForPingResponseBlocking(self):
self._blockingRequest(path="/osc/ping", arguments=[], answerPath="/reply", answerArguments=["/osc/ping",], repeat=True)
def _initial_announce(self)->pathlib.Path:
"""nsm/gui/gui_announce triggers a multi-stage reply. First we get "hi",
then we get the session root. We wait for session root and then clean 'hi' from the queue.
Returns session root as pathlib-path."""
resultArguments = self._blockingRequest(path="/nsm/gui/gui_announce", arguments=[], answerPath="/nsm/gui/session/root", answerArguments=[])
if len(self._queue) == 1 and self._queue[0].oscpath == "/nsm/gui/gui_announce" and self._queue[0].params == ["hi"]:
self._queue.clear()
else:
4 years ago
logging.error(f"For ValueError below: {[(m.oscpath, m.params) for m in self._queue]}")
raise ValueError("We were expecting a clean _queue with only 'hi' as leftover, but instead there were unhandled messages. see print above. Better abort than a wrong program state")
#all ok
return pathlib.Path(resultArguments[0])
4 years ago
#General Commands
def send(self, arg):
"""
Intended for a text input / command line interface.
Sends anything to nsmd, separated by semicolon. First part is the message address,
the rest are string-parameters."""
args = arg.split()
msg = _OutgoingMessage(args[0])
for p in args[1:]:
msg.add_arg(p)
self.sock.sendto(msg.build(), self.nsmOSCUrl)
def gui_announce(self):
"""This is just the announce without any answer. This is a last-resort method if another GUI
"stole" our slot. For our own initial announce we use self._initial_announce()"""
4 years ago
msg = _OutgoingMessage("/nsm/gui/gui_announce")
self.sock.sendto(msg.build(), self.nsmOSCUrl)
def ping(self):
msg = _OutgoingMessage("/osc/ping")
self.sock.sendto(msg.build(), self.nsmOSCUrl)
def list(self):
msg = _OutgoingMessage("/nsm/server/list")
self.sock.sendto(msg.build(), self.nsmOSCUrl)
def _updateSessionListBlocking(self):
"""To ensure correct data on session operations we manage ourselves,
like copy, rename and delete.
Ask nsmd for projects in session root and update our internal state.
This will wait for an answer and block all other operations.
First is /nsm/gui/server/message ['Listing sessions']
Then session names come one reply at a time such as /reply ['/nsm/server/list', 'test3']
Finally /nsm/server/list [0, 'Done.'] , not a reply
4 years ago
"""
logger.info("Requesting project list from session server in blocking mode")
self._setPause(True)
msg = _OutgoingMessage("/nsm/server/list")
self.sock.sendto(msg.build(), self.nsmOSCUrl)
#Wait for /reply
ready = False
while True:
try:
data, addr = self.sock.recvfrom(1024)
except socket.timeout:
continue
msg = _IncomingMessage(data)
if not ready and msg.oscpath == "/nsm/gui/server/message" and msg.params == ["Listing sessions"]:
self.internalState["sessions"].clear() # new clients are added as /reply /nsm/server/list callbacks
ready = True
else:
if len(msg.params) != 2:
logger.warning(f"Expected project but got path {msg.oscpath} with {msg.params}. Adding to queue for later.")
self._queue.append(msg)
continue
#This is what we want:
4 years ago
elif msg.oscpath == "/reply" and msg.params[0] == "/nsm/server/list":
#/reply ['/nsm/server/list', 'test3']
self.internalState["sessions"].add(msg.params[1])
logger.info(f"Received session name: {msg.params[1]}")
elif msg.params[0] == 0 and msg.params[1] == "Done.":
break
else:
logger.warning(f"Expected project but got path {msg.oscpath} with {msg.params}. Adding to queue for later.")
self._queue.append(msg)
continue
self._setPause(False)
def quit(self):
"""Called through atexit.
Thanks to start.py sys exception hook this will also trigger on PyQt crash"""
if self.ourOwnServer:
msg = _OutgoingMessage("/nsm/server/quit")
self.sock.sendto(msg.build(), self.nsmOSCUrl)
returncode = self.ourOwnServer.wait()
logger.info("Stopped our own server with return code {}".format(returncode))
def broadcast(self, path:str, arguments:list):
"""/nsm/server/broadcast s:path [arguments...]
http://non.tuxfamily.org/nsm/API.html 1.2.7.1
/nsm/server/broadcast s:path [arguments...]
/nsm/server/broadcast /tempomap/update "0,120,4/4:12351234,240,4/4"
All clients except the sender recive:
/tempomap/update "0,120,4/4:12351234,240,4/4"
"""
logger.info(f"Sending broadcast with path {path} and args {arguments}")
message = _OutgoingMessage("/nsm/server/broadcast")
message.add_arg(path)
for arg in arguments:
message.add_arg(arg) #type autodetect
self.sock.sendto(message.build(), self.nsmOSCUrl)
#Primarily Without Session
def open(self, nsmSessionName:str):
4 years ago
if nsmSessionName in self.internalState["sessions"]:
msg = _OutgoingMessage("/nsm/server/open")
msg.add_arg(nsmSessionName) #s:project_name
self.sock.sendto(msg.build(), self.nsmOSCUrl)
else:
logger.warning(f"Session {nsmSessionName} not found. Not forwarding to nsmd.")
def new(self, newName:str, startClients:list=[])->str:
"""Saves the current session and creates a new session.
Only works if dir does not exist yet.
"""
basePath = pathlib.Path(self.sessionRoot, newName)
if basePath.exists():
return None
self._addToNextSession = startClients
msg = _OutgoingMessage("/nsm/server/new")
msg.add_arg(newName) #s:project_name
self.sock.sendto(msg.build(), self.nsmOSCUrl)
#Only with Current Session
def save(self):
msg = _OutgoingMessage("/nsm/server/save")
self.sock.sendto(msg.build(), self.nsmOSCUrl)
def close(self, blocking=False):
if not blocking:
msg = _OutgoingMessage("/nsm/server/close")
self.sock.sendto(msg.build(), self.nsmOSCUrl)
else:
msg = _OutgoingMessage("/nsm/server/close")
self.sock.sendto(msg.build(), self.nsmOSCUrl)
#Drive the process loop ourselves. This will still trigger updates but the mainloop will wait.
while self.internalState["currentSession"]:
self.process()
def abort(self, blocking=False):
if not blocking:
msg = _OutgoingMessage("/nsm/server/abort")
self.sock.sendto(msg.build(), self.nsmOSCUrl)
else:
msg = _OutgoingMessage("/nsm/server/abort")
self.sock.sendto(msg.build(), self.nsmOSCUrl)
#Drive the process loop ourselves. This will still trigger updates but the mainloop will wait.
while self.internalState["currentSession"]:
self.process()
def duplicate(self, newName:str)->str:
"""Saves the current session and creates a new session.
Requires an open session and uses nsmd to do the work.
If you want to do copy of any session use our owns
self.sessionCopy"""
msg = _OutgoingMessage("/nsm/server/duplicate")
msg.add_arg(newName) #s:project_name
self.sock.sendto(msg.build(), self.nsmOSCUrl)
#Client Commands for Loaded Session
def clientAdd(self, executableName:str):
"""Adds a client to the current session.
executable must be in $PATH.
We do not trust NSM to perform the right checks. It will add an empty path or wrong
path.
"""
if not pathlib.Path(executableName).name == executableName:
logger.warning(f"{executableName} must be just an executable file in your $PATH. We expected: {pathlib.Path(executableName).name} . We will not ask nsmd to add it as client")
return False
allPaths = getenv("PATH")
assert allPaths, allPaths
binaryPaths = allPaths.split(":") #TODO: There is a corner case that NSMD runs in a different $PATH environment.
executableInPath = any(pathlib.Path(bp, executableName).is_file() for bp in binaryPaths)
if executableInPath:
msg = _OutgoingMessage("/nsm/server/add")
msg.add_arg(executableName) #s:executable_name
self.sock.sendto(msg.build(), self.nsmOSCUrl)
return True
else:
logger.warning("Executable {} not found. We will not ask nsmd to add it as client".format(executableName))
return False
def clientStop(self, clientId:str):
msg = _OutgoingMessage("/nsm/gui/client/stop")
msg.add_arg(clientId) #s:clientId
self.sock.sendto(msg.build(), self.nsmOSCUrl)
def clientResume(self, clientId:str):
"""Opposite of clientStop"""
msg = _OutgoingMessage("/nsm/gui/client/resume")
msg.add_arg(clientId) #s:clientId
self.sock.sendto(msg.build(), self.nsmOSCUrl)
def clientRemove(self, clientId:str):
"""Client needs to be stopped already. We will do that and wait for an answer.
Remove from the session. Will not delete the save-files, but make them inaccesible"""
#We have a blocking operation in here so we need to be extra cautios that the client exists.
if not clientId in self.internalState["clients"]:
return False
self.clientStop(clientId) #We need to wait for an answer.
#Drive the process loop ourselves. This will still trigger updates but the mainloop will wait.
logger.info(f"Waiting for {clientId} to be status 'stopped'")
while not self.internalState["clients"][clientId]["lastStatus"] == "stopped":
self.process()
msg = _OutgoingMessage("/nsm/gui/client/remove")
msg.add_arg(clientId) #s:clientId
self.sock.sendto(msg.build(), self.nsmOSCUrl)
#Flood lazy lagging nsmd until it removed the client.
#We will receive a few -10 "No such client." errors but that is ok.
while True:
if not clientId in self.internalState["clients"]:
break
if self.internalState["clients"][clientId]["lastStatus"] == "removed":
break
self.sock.sendto(msg.build(), self.nsmOSCUrl)
self.process()
def clientSave(self, clientId:str):
"""Saves only the given client"""
msg = _OutgoingMessage("/nsm/gui/client/save")
msg.add_arg(clientId) #s:clientId
self.sock.sendto(msg.build(), self.nsmOSCUrl)
def clientHide(self, clientId:str):
"""Hides the client. Works only if client announced itself with this feature"""
msg = _OutgoingMessage("/nsm/gui/client/hide_optional_gui")
msg.add_arg(clientId) #s:clientId
self.sock.sendto(msg.build(), self.nsmOSCUrl)
def clientShow(self, clientId:str):
"""Hides the client. Works only if client announced itself with this feature"""
msg = _OutgoingMessage("/nsm/gui/client/show_optional_gui")
msg.add_arg(clientId) #s:clientId
self.sock.sendto(msg.build(), self.nsmOSCUrl)
#Callbacks
def _reactCallback_guiAnnounce(self, parameters:list):
"""Acknowledge"""
assert parameters == ["hi"], parameters
logger.info("We got acknowledged as current nsmd GUI.")
def _reactCallback_error(self, parameters:list):
logger.error(parameters)
def _reactCallback_reply(self, parameters:list):
"""This is a difficult function because replies arrive for many unrelated things, like
status. We do our best to send all replies on the right way"""
success = False
l = len(parameters)
if l == 2:
originalMessage, data = parameters
logger.info(f"Got reply for {originalMessage} with {data}")
reply = {
"/nsm/server/list" : self._reactReply_nsmServerList,
"/nsm/server/new" : self._reactReply_nsmServerNew,
"/nsm/server/close" : self._reactReply_nsmServerClose,
"/nsm/server/open" : self._reactReply_nsmServerOpen,
"/nsm/server/save" : self._reactReply_nsmServerSave,
"/nsm/server/abort" : self._reactReply_nsmServerAbort,
"/nsm/server/duplicate" : self._reactReply_nsmServerDuplicate,
}
if originalMessage in reply:
reply[originalMessage](data)
success = True
elif l == 3:
originalMessage, errorCode, answer = parameters
logger.info(f"Got reply for {originalMessage} with code {errorCode} saying {answer}")
if originalMessage == "/nsm/server/add":
assert errorCode == 0, parameters
self._reactReply_nsmServerAdd(answer)
success = True
elif l == 1:
singleMessage = parameters[0]
"""For unknown reasons these replies do not repeat the originalMessage"""
if singleMessage == "/osc/ping":
logger.info(singleMessage)
success = True
elif singleMessage == "Client removed.":
self._reactReply_nsmServerRemoved()
success = True
elif singleMessage == "Client stopped.":
self._reactReply_nsmServerStopped()
success = True
#After all these reactions and checks the function will eventually return here.
if not success:
raise NotImplementedError(parameters)
def _reactCallback_serverMessage(self, parameters:list):
"""Messages are normally harmless and uninteresting.
Howerver, we need to use some of them for actual tasks.
In opposite to reply and status this all go in our function for now, until refactoring"""
if parameters == ["Listing session"]:
#this feels bad! A simple message is not a reliable state token and could change in the future.
#we cannot put that into our own /list outgoing message because other actions like "new" also trigger this callback
self.internalState["sessions"].clear() # new clients are added as /reply /nsm/server/list callbacks
if parameters[0].startswith("Opening session"):
#This gets send only when an existing session starts loading. It will not trigger on new sessions, be it really new or duplicate.
#e.g. /nsm/gui/server/message ["Opening session FOO"]
nsmSessionName = parameters[0].replace("Opening session ", "")
logger.info(f"Starting to load clients of session: {nsmSessionName}")
self.sessionOpenLoadingHook(self.sessionAsDict(nsmSessionName)) #notify the api->UI
else:
logger.info("/nsm/gui/server/message " + repr(parameters))
def _reactCallback_broadcast(self, parameters:list):
"""We have nothing to do with broadcast. But we save them, so they can be shown on request
parameters[0] is an osc path:str without naming constraints
the rest is a list of arguments.
"""
logger.info(f"Received broadcast. Saving in internal state: {parameters}")
self.internalState["broadcasts"].append(parameters)
#Our little trick. We know and like some clients better than others.
#If we detect our own data-storage we remember our friends.
#It is possible that another datastorage broadcasts, then we overwrite the URL.
if parameters and parameters[0] == "/argodejo/datastorage/announce":
path, clientId, messageSizeLimit, url = parameters
assert "osc.udp" in url
logger.info(f"Got announce from argodejo datastorage clientId {clientId} @ {url}")
o = urlparse(url)
self.dataStorage = DataStorage(self, clientId, messageSizeLimit, (o.hostname, o.port), self.sock)
def _reactCallback_serverList(self, parameters:list):
"""This finalizes a new session list. Here we send new data to the GUI etc."""
l = len(parameters)
if l == 2:
errorCode, message = parameters
assert errorCode == 0, errorCode
assert message == "Done.", message #don't miss the dot after Done
logger.info("/nsm/server/list is done and has transmitted all available sessions to us")
else:
raise NotImplementedError(parameters)
def _reactCallback_activeSessionChanged(self, parameters:list):
"""This is called when the session has already changed.
Shortly before we receive /nsm/gui/session/session which indicates the attempt to create,
I guess! :)
If you want to react to the attempt to open a session you need to use /nsm/gui/server/message ["Opening session FOO"]
OR creating a new session, after which nsmd will open that session without a message.
Empty string is "No session" or "Choose A Session" mode.
"""
l = len(parameters)
if l == 2:
shortName, nsmSessionName = parameters
if not shortName and not nsmSessionName: #No session loaded. We are in session-choosing mode.
self.internalState["currentSession"] = None
self.sessionClosedHook()
else:
nsmSessionName = nsmSessionName.lstrip("/")
logger.info(f"Current Session changed. We are now {shortName} under {nsmSessionName}")
self.internalState["currentSession"] = nsmSessionName
#This is after the session, received after all programs have loaded.
#We have a counterpart as message reaction that signals the attempt to load.
self.sessionOpenReadyHook(self.sessionAsDict(nsmSessionName)) #notify the api->UI
for autoClientExecutableInPath in self._addToNextSession:
self.clientAdd(autoClientExecutableInPath)
self._addToNextSession = [] #reset
elif l == 0: #Another way of "no session".
self.internalState["currentSession"] = None
self.sessionClosedHook()
else:
raise NotImplementedError(parameters)
def _initializeEmptyClient(self, clientId:str):
"""NSM reuses signals. It is quite possible that this will be called multiple times,
e.g. after opening a session"""
if clientId in self.internalState["clients"]:
return
logger.info(f"Creating new internal entry for client {clientId}")
client = {
"clientId":clientId, #for convenience, included internally as well
"executable":None, #For dumb clients this is the same as reportedName.
"dumbClient":True, #Bool. Real nsm or just any old program? status "Ready" switches this.
"reportedName":None, #str . The reported name is first the executable name, for status started. But for NSM clients it gets replaced with a reported name.
"label":None, #str
"lastStatus":None, #str
"statusHistory":[], #list
"hasOptionalGUI": False, #bool
"visible": None, # bool
"dirty": None, # bool
}
self.internalState["clients"][clientId] = client
def _setClientData(self, clientId:str, parameter:str, value):
if clientId in self.internalState["clients"]:
self.internalState["clients"][clientId][parameter] = value
return True
else:
logger.warning(f"Client {clientId} not found in internal status storage. If the session was just closed this is most likely a known race condition. Everything is fine in this case.")
return False
def _reactCallback_ClientLabelChanged(self, parameters:list):
"""osc->add_method( "/nsm/gui/client/label", "ss", osc_handler, osc, "path,display_name" );
"""
l = len(parameters)
if l == 2:
clientId, label = parameters
logger.info(f"Label for client {clientId} changed to {label}")
self._setClientData(clientId, "label", label)
self.clientStatusHook(self.internalState["clients"][clientId])
4 years ago
else:
raise NotImplementedError(parameters)
def _reactCallback_clientPid(self, parameters:list):
clientId, pid = parameters
self._setClientData(clientId, "pid", pid)
def _reactCallback_SessionSession(self, parameters:list):
"""This is received only when a new session gets created and followed by
/nsm/gui/client/new and then a reply for
/reply /nsm/server/new Session created"""
#This is the counterpart to Message "Opening Session", but for really new or freshly duplicated session.
logger.info(f"Attempt to create session: {parameters}")
self.sessionOpenLoadingHook(self.sessionAsDict(parameters[0])) #notify the api->UI
def _reactCallback_ClientNew(self, parameters:list):
"""/nsm/gui/client/new ['nBAVO', 'jackpatch']
This is both add client or open.
It appears in the session.
And the message actually comes twice. Once when you add a client, then parameters
will contain the executable name. If the client reports itself as NSM compatible through
announce we will also get the Open message through this function.
Then the name changes from executableName to a reportedName, which will remain for the rest
of the session.
A loaded session will directly start with the reported/announced name.
"""
l = len(parameters)
if l == 2:
clientId, executableName = parameters
if not clientId in self.internalState["clients"]:
self._initializeEmptyClient(clientId)
self._setClientData(clientId, "executable", executableName)
logger.info(f"Client started {executableName}:{clientId}")
else:
self._setClientData(clientId, "reportedName", executableName)
logger.info(f"Client upgraded to NSM-compatible: {executableName}:{clientId}")
self.clientStatusHook(self.internalState["clients"][clientId])
else:
raise NotImplementedError(parameters)
def _reactCallback_clientDirty(self, parameters:list):
"""/nsm/gui/client/dirty ['nMAJH', 1]
"""
l = len(parameters)
if l == 2:
clientId, dirty = parameters
dirty = bool(dirty)