#! /usr/bin/env python3
# -*- coding: utf-8 -*-
"""
PyNSMClient - A New Session Manager Client - Library in one file .
The Non - Session - Manager by Jonathan Moore Liles < male @tuxfamily . org > : http : / / non . tuxfamily . org / nsm /
New Session Manager , by LinuxAudio . org : https : / / github . com / linuxaudio / new - session - manager
With help from code fragments from https : / / github . com / attwad / python - osc ( DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE v2 )
MIT License
Copyright 2014 - 2020 Nils Hilbricht https : / / www . laborejo . org
Permission is hereby granted , free of charge , to any person obtaining a copy of this software and
associated documentation files ( the " Software " ) , to deal in the Software without restriction ,
including without limitation the rights to use , copy , modify , merge , publish , distribute ,
sublicense , and / or sell copies of the Software , and to permit persons to whom the Software is
furnished to do so , subject to the following conditions :
The above copyright notice and this permission notice shall be included in all copies or
substantial portions of the Software .
THE SOFTWARE IS PROVIDED " AS IS " , WITHOUT WARRANTY OF ANY KIND , EXPRESS OR IMPLIED , INCLUDING BUT
NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY , FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT . IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM ,
DAMAGES OR OTHER LIABILITY , WHETHER IN AN ACTION OF CONTRACT , TORT OR OTHERWISE , ARISING FROM , OUT
OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE .
"""
import logging ;
logger = None #filled by init with prettyName
import struct
import socket
from os import getenv , getpid , kill
import os
import os . path
import shutil
from uuid import uuid4
from sys import argv
from signal import signal , SIGTERM , SIGINT , SIGKILL #react to exit signals to close the client gracefully. Or kill if the client fails to do so.
from urllib . parse import urlparse
class _IncomingMessage ( object ) :
""" Representation of a parsed datagram representing an OSC message.
An OSC message consists of an OSC Address Pattern followed by an OSC
Type Tag String followed by zero or more OSC Arguments .
"""
def __init__ ( self , dgram ) :
#NSM Broadcasts are bundles, but very simple ones. We only need to care about the single message it contains.
#Therefore we can strip the bundle prefix and handle it as normal message.
if b " #bundle " in dgram :
bundlePrefix , singleMessage = dgram . split ( b " / " , maxsplit = 1 )
dgram = b " / " + singleMessage # / eaten by split
self . isBroadcast = True
else :
self . isBroadcast = False
self . LENGTH = 4 #32 bit
self . _dgram = dgram
self . _parameters = [ ]
self . parse_datagram ( )
def get_int ( self , dgram , start_index ) :
""" Get a 32-bit big-endian two ' s complement integer from the datagram.
Args :
dgram : A datagram packet .
start_index : An index where the integer starts in the datagram .
Returns :
A tuple containing the integer and the new end index .
Raises :
ValueError if the datagram could not be parsed .
"""
try :
if len ( dgram [ start_index : ] ) < self . LENGTH :
raise ValueError ( ' Datagram is too short ' )
return (
struct . unpack ( ' >i ' , dgram [ start_index : start_index + self . LENGTH ] ) [ 0 ] , start_index + self . LENGTH )
except ( struct . error , TypeError ) as e :
raise ValueError ( ' Could not parse datagram %s ' % e )
def get_string ( self , dgram , start_index ) :
""" Get a python string from the datagram, starting at pos start_index.
We receive always the full string , but handle only the part from the start_index internally .
In the end return the offset so it can be added to the index for the next parameter .
Each subsequent call handles less of the same string , starting further to the right .
According to the specifications , a string is :
" A sequence of non-null ASCII characters followed by a null,
followed by 0 - 3 additional null characters to make the total number
of bits a multiple of 32 " .
Args :
dgram : A datagram packet .
start_index : An index where the string starts in the datagram .
Returns :
A tuple containing the string and the new end index .
Raises :
ValueError if the datagram could not be parsed .
"""
#First test for empty string, which is nothing, followed by a terminating \x00 padded by three additional \x00.
if dgram [ start_index : ] . startswith ( b " \x00 \x00 \x00 \x00 " ) :
return " " , start_index + 4
#Otherwise we have a non-empty string that must follow the rules of the docstring.
offset = 0
try :
while dgram [ start_index + offset ] != 0 :
offset + = 1
if offset == 0 :
raise ValueError ( ' OSC string cannot begin with a null byte: %s ' % dgram [ start_index : ] )
# Align to a byte word.
if ( offset ) % self . LENGTH == 0 :
offset + = self . LENGTH
else :
offset + = ( - offset % self . LENGTH )
# Python slices do not raise an IndexError past the last index,
# do it ourselves.
if offset > len ( dgram [ start_index : ] ) :
raise ValueError ( ' Datagram is too short ' )
data_str = dgram [ start_index : start_index + offset ]
return data_str . replace ( b ' \x00 ' , b ' ' ) . decode ( ' utf-8 ' ) , start_index + offset
except IndexError as ie :
raise ValueError ( ' Could not parse datagram %s ' % ie )
except TypeError as te :
raise ValueError ( ' Could not parse datagram %s ' % te )
def get_float ( self , dgram , start_index ) :
""" Get a 32-bit big-endian IEEE 754 floating point number from the datagram.
Args :
dgram : A datagram packet .
start_index : An index where the float starts in the datagram .
Returns :
A tuple containing the float and the new end index .
Raises :
ValueError if the datagram could not be parsed .
"""
try :
return ( struct . unpack ( ' >f ' , dgram [ start_index : start_index + self . LENGTH ] ) [ 0 ] , start_index + self . LENGTH )
except ( struct . error , TypeError ) as e :
raise ValueError ( ' Could not parse datagram %s ' % e )
def parse_datagram ( self ) :
try :
self . _address_regexp , index = self . get_string ( self . _dgram , 0 )
if not self . _dgram [ index : ] :
# No params is legit, just return now.
return
# Get the parameters types.
type_tag , index = self . get_string ( self . _dgram , index )
if type_tag . startswith ( ' , ' ) :
type_tag = type_tag [ 1 : ]
# Parse each parameter given its type.
for param in type_tag :
if param == " i " : # Integer.
val , index = self . get_int ( self . _dgram , index )
elif param == " f " : # Float.
val , index = self . get_float ( self . _dgram , index )
elif param == " s " : # String.
val , index = self . get_string ( self . _dgram , index )
else :
logger . warning ( " Unhandled parameter type: {0} " . format ( param ) )
continue
self . _parameters . append ( val )
except ValueError as pe :
#raise ValueError('Found incorrect datagram, ignoring it', pe)
# Raising an error is not ignoring it!
logger . warning ( " Found incorrect datagram, ignoring it. {} " . format ( pe ) )
@property
def oscpath ( self ) :
""" Returns the OSC address regular expression. """
return self . _address_regexp
@staticmethod
def dgram_is_message ( dgram ) :
""" Returns whether this datagram starts as an OSC message. """
return dgram . startswith ( b ' / ' )
@property
def size ( self ) :
""" Returns the length of the datagram for this message. """
return len ( self . _dgram )
@property
def dgram ( self ) :
""" Returns the datagram from which this message was built. """
return self . _dgram
@property
def params ( self ) :
""" Convenience method for list(self) to get the list of parameters. """
return list ( self )
def __iter__ ( self ) :
""" Returns an iterator over the parameters of this message. """
return iter ( self . _parameters )
class _OutgoingMessage ( object ) :
def __init__ ( self , oscpath ) :
self . LENGTH = 4 #32 bit
self . oscpath = oscpath
self . _args = [ ]
def write_string ( self , val ) :
dgram = val . encode ( ' utf-8 ' )
diff = self . LENGTH - ( len ( dgram ) % self . LENGTH )
dgram + = ( b ' \x00 ' * diff )
return dgram
def write_int ( self , val ) :
return struct . pack ( ' >i ' , val )
def write_float ( self , val ) :
return struct . pack ( ' >f ' , val )
def add_arg ( self , argument ) :
t = { str : " s " , int : " i " , float : " f " } [ type ( argument ) ]
self . _args . append ( ( t , argument ) )
def build ( self ) :
dgram = b ' '
#OSC Path
dgram + = self . write_string ( self . oscpath )
if not self . _args :
dgram + = self . write_string ( ' , ' )
return dgram
# Write the parameters.
arg_types = " " . join ( [ arg [ 0 ] for arg in self . _args ] )
dgram + = self . write_string ( ' , ' + arg_types )
for arg_type , value in self . _args :
f = { " s " : self . write_string , " i " : self . write_int , " f " : self . write_float } [ arg_type ]
dgram + = f ( value )
return dgram
class NSMNotRunningError ( Exception ) :
""" Error raised when environment variable $NSM_URL was not found. """
class NSMClient ( object ) :
""" The representation of the host programs as NSM sees it.
Technically consists of an udp server and a udp client .
Does not run an event loop itself and depends on the host loop .
E . g . a Qt timer or just a simple while True : sleep ( 0.1 ) in Python . """
def __init__ ( self , prettyName , supportsSaveStatus , saveCallback , openOrNewCallback , exitProgramCallback , hideGUICallback = None , showGUICallback = None , broadcastCallback = None , sessionIsLoadedCallback = None , loggingLevel = " info " ) :
self . nsmOSCUrl = self . getNsmOSCUrl ( ) #this fails and raises NSMNotRunningError if NSM is not available. Host programs can ignore it or exit their program.
self . realClient = True
self . cachedSaveStatus = None #save status checks for this.
global logger
logger = logging . getLogger ( prettyName )
logger . info ( " import " )
if loggingLevel == " info " or loggingLevel == 20 :
logging . basicConfig ( level = logging . INFO ) #development
logger . info ( " Starting PyNSM2 Client with logging level INFO. Switch to ' error ' for a release! " ) #the NSM name is not ready yet so we just use the pretty name
elif loggingLevel == " error " or loggingLevel == 40 :
logging . basicConfig ( level = logging . ERROR ) #production
else :
logging . warning ( " Unknown logging level: {} . Choose ' info ' or ' error ' " . format ( loggingLevel ) )
logging . basicConfig ( level = logging . INFO ) #development
#given parameters,
self . prettyName = prettyName #keep this consistent! Settle for one name.
self . supportsSaveStatus = supportsSaveStatus
self . saveCallback = saveCallback
self . exitProgramCallback = exitProgramCallback
self . openOrNewCallback = openOrNewCallback #The host needs to: Create a jack client with ourClientNameUnderNSM - Open the saved file and all its resources
self . broadcastCallback = broadcastCallback
self . hideGUICallback = hideGUICallback
self . showGUICallback = showGUICallback
self . sessionIsLoadedCallback = sessionIsLoadedCallback
#Reactions get the raw _IncomingMessage OSC object
#A client can add to reactions.
self . reactions = {
" /nsm/client/save " : self . _saveCallback ,
" /nsm/client/show_optional_gui " : lambda msg : self . showGUICallback ( ) ,
" /nsm/client/hide_optional_gui " : lambda msg : self . hideGUICallback ( ) ,
" /nsm/client/session_is_loaded " : self . _sessionIsLoadedCallback ,
#Hello source-code reader. You can add your own reactions here by nsmClient.reactions[oscpath]=func, where func gets the raw _IncomingMessage OSC object as argument.
#broadcast is handled directly by the function because it has more parameters
}
#self.discardReactions = set(["/nsm/client/session_is_loaded"])
self . discardReactions = set ( )
#Networking and Init
self . sock = socket . socket ( socket . AF_INET , socket . SOCK_DGRAM ) #internet, udp
self . sock . bind ( ( ' ' , 0 ) ) #pick a free port on localhost.
ip , port = self . sock . getsockname ( )
self . ourOscUrl = f " osc.udp:// { ip } : { port } / "
self . executableName = self . getExecutableName ( )
#UNIX Signals. Used for quit.
signal ( SIGTERM , self . sigtermHandler ) #NSM sends only SIGTERM. #TODO: really? pynsm version 1 handled sigkill as well.
signal ( SIGINT , self . sigtermHandler )
#The following instance parameters are all set in announceOurselves
self . serverFeatures = None
self . sessionName = None
self . ourPath = None
self . ourClientNameUnderNSM = None
self . ourClientId = None # the "file extension" of ourClientNameUnderNSM
self . isVisible = None #set in announceGuiVisibility
self . saveStatus = True # true is clean. false means we need saving.
self . announceOurselves ( )
assert self . serverFeatures , self . serverFeatures
assert self . sessionName , self . sessionName
assert self . ourPath , self . ourPath
assert self . ourClientNameUnderNSM , self . ourClientNameUnderNSM
self . sock . setblocking ( False ) #We have waited for tha handshake. Now switch blocking off because we expect sock.recvfrom to be empty in 99.99...% of the time so we shouldn't wait for the answer.
#After this point the host must include self.reactToMessage in its event loop
#We assume we are save at startup.
self . announceSaveStatus ( isClean = True )
logger . info ( " NSMClient client init complete. Going into listening mode. " )
def reactToMessage ( self ) :
""" This is the main loop message. It is added to the clients event loop. """
try :
data , addr = self . sock . recvfrom ( 4096 ) #4096 is quite big. We don't expect nsm messages this big. Better safe than sorry. However, messages will crash the program if they are bigger than 4096.
except BlockingIOError : #happens while no data is received. Has nothing to do with blocking or not.
return None
msg = _IncomingMessage ( data )
if msg . oscpath in self . reactions :
self . reactions [ msg . oscpath ] ( msg )
elif msg . oscpath in self . discardReactions :
pass
elif msg . oscpath == " /reply " and msg . params == [ " /nsm/server/open " , " Loaded. " ] : #NSM sends that all programs of the session were loaded.
logger . info ( " Got /reply Loaded from NSM Server " )
elif msg . oscpath == " /reply " and msg . params == [ " /nsm/server/save " , " Saved. " ] : #NSM sends that all program-states are saved. Does only happen from the general save instruction, not when saving our client individually
logger . info ( " Got /reply Saved from NSM Server " )
elif msg . isBroadcast :
if self . broadcastCallback :
logger . info ( f " Got broadcast with messagePath { msg . oscpath } and listOfArguments { msg . params } " )
self . broadcastCallback ( self . ourPath , self . sessionName , self . ourClientNameUnderNSM , msg . oscpath , msg . params )
else :
logger . info ( f " No callback for broadcast! Got messagePath { msg . oscpath } and listOfArguments { msg . params } " )
elif msg . oscpath == " /error " :
logger . warning ( " Got /error from NSM Server. Path: {} , Parameter: {} " . format ( msg . oscpath , msg . params ) )
else :
logger . warning ( " Reaction not implemented:. Path: {} , Parameter: {} " . format ( msg . oscpath , msg . params ) )
def send ( self , path : str , listOfParameters : list , host = None , port = None ) :
""" Send any osc message. Defaults to nsmd URL.
Will not wait for an answer but return None . """
if host and port :
url = ( host , port )
else :
url = self . nsmOSCUrl
msg = _OutgoingMessage ( path )
for arg in listOfParameters :
msg . add_arg ( arg ) #type is auto-determined by outgoing message
self . sock . sendto ( msg . build ( ) , url )
def getNsmOSCUrl ( self ) :
""" Return and save the nsm osc url or raise an error """
nsmOSCUrl = getenv ( " NSM_URL " )
if not nsmOSCUrl :
raise NSMNotRunningError ( " New-Session-Manager environment variable $NSM_URL not found. " )
else :
#osc.udp://hostname:portnumber/
o = urlparse ( nsmOSCUrl )
return o . hostname , o . port
def getExecutableName ( self ) :
""" Finding the actual executable name can be a bit hard
in Python . NSM wants the real starting point , even if
it was a bash script .
"""
#TODO: I really don't know how to find out the name of the bash script
fullPath = argv [ 0 ]
assert os . path . dirname ( fullPath ) in os . environ [ " PATH " ] , ( fullPath , os . path . dirname ( fullPath ) , os . environ [ " PATH " ] ) #NSM requires the executable to be in the path. No excuses. This will never happen since the reference NSM server-GUI already checks for this.
executableName = os . path . basename ( fullPath )
assert not " / " in executableName , executableName #see above.
return executableName
def announceOurselves ( self ) :
""" Say hello to NSM and tell it we are ready to receive
instructions
/ nsm / server / announce s : application_name s : capabilities s : executable_name i : api_version_major i : api_version_minor i : pid """
def buildClientFeaturesString ( ) :
#:dirty:switch:progress:
result = [ ]
if self . supportsSaveStatus :
result . append ( " dirty " )
if self . hideGUICallback and self . showGUICallback :
result . append ( " optional-gui " )
if result :
return " : " . join ( [ " " ] + result + [ " " ] )
else :
return " "
logger . info ( " Sending our NSM-announce message " )
announce = _OutgoingMessage ( " /nsm/server/announce " )
announce . add_arg ( self . prettyName ) #s:application_name
announce . add_arg ( buildClientFeaturesString ( ) ) #s:capabilities
announce . add_arg ( self . executableName ) #s:executable_name
announce . add_arg ( 1 ) #i:api_version_major
announce . add_arg ( 2 ) #i:api_version_minor
announce . add_arg ( int ( getpid ( ) ) ) #i:pid
hostname , port = self . nsmOSCUrl
assert hostname , self . nsmOSCUrl
assert port , self . nsmOSCUrl
self . sock . sendto ( announce . build ( ) , self . nsmOSCUrl )
#Wait for /reply (aka 'Howdy, what took you so long?)
data , addr = self . sock . recvfrom ( 1024 )
msg = _IncomingMessage ( data )
if msg . oscpath == " /error " :
originalMessage , errorCode , reason = msg . params
logger . error ( " Code {} : {} " . format ( errorCode , reason ) )
quit ( )
elif msg . oscpath == " /reply " :
nsmAnnouncePath , welcomeMessage , managerName , self . serverFeatures = msg . params
assert nsmAnnouncePath == " /nsm/server/announce " , nsmAnnouncePath
logger . info ( " Got /reply " + welcomeMessage )
#Wait for /nsm/client/open
data , addr = self . sock . recvfrom ( 1024 )
msg = _IncomingMessage ( data )
assert msg . oscpath == " /nsm/client/open " , msg . oscpath
self . ourPath , self . sessionName , self . ourClientNameUnderNSM = msg . params
self . ourClientId = os . path . splitext ( self . ourClientNameUnderNSM ) [ 1 ] [ 1 : ]
logger . info ( " Got ' /nsm/client/open ' from NSM. Telling our client to load or create a file with name {} " . format ( self . ourPath ) )
self . openOrNewCallback ( self . ourPath , self . sessionName , self . ourClientNameUnderNSM ) #Host function to either load an existing session or create a new one.
logger . info ( " Our client should be done loading or creating the file {} " . format ( self . ourPath ) )
replyToOpen = _OutgoingMessage ( " /reply " )
replyToOpen . add_arg ( " /nsm/client/open " )
replyToOpen . add_arg ( " {} is opened or created " . format ( self . prettyName ) )
self . sock . sendto ( replyToOpen . build ( ) , self . nsmOSCUrl )
else :
raise ValueError ( " Unexpected message path after announce: {} " . format ( ( msg . oscpath , msg . params ) ) )
def announceGuiVisibility ( self , isVisible ) :
message = " /nsm/client/gui_is_shown " if isVisible else " /nsm/client/gui_is_hidden "
self . isVisible = isVisible
guiVisibility = _OutgoingMessage ( message )
logger . info ( " Telling NSM that our clients switched GUI visibility to: {} " . format ( message ) )
self . sock . sendto ( guiVisibility . build ( ) , self . nsmOSCUrl )
def announceSaveStatus ( self , isClean ) :
""" Only send to the NSM Server if there was really a change """
if not self . supportsSaveStatus :
return
if not isClean == self . cachedSaveStatus :
message = " /nsm/client/is_clean " if isClean else " /nsm/client/is_dirty "
self . cachedSaveStatus = isClean
saveStatus = _OutgoingMessage ( message )
logger . info ( " Telling NSM that our clients save state is now: {} " . format ( message ) )
self . sock . sendto ( saveStatus . build ( ) , self . nsmOSCUrl )
def _saveCallback ( self , msg ) :
logger . info ( " Telling our client to save as {} " . format ( self . ourPath ) )
self . saveCallback ( self . ourPath , self . sessionName , self . ourClientNameUnderNSM )
replyToSave = _OutgoingMessage ( " /reply " )
replyToSave . add_arg ( " /nsm/client/save " )
replyToSave . add_arg ( " {} saved " . format ( self . prettyName ) )
self . sock . sendto ( replyToSave . build ( ) , self . nsmOSCUrl )
#it is assumed that after saving the state is clear
self . announceSaveStatus ( isClean = True )
def _sessionIsLoadedCallback ( self , msg ) :
if self . sessionIsLoadedCallback :
logger . info ( " Received ' Session is Loaded ' . Our client supports it. Forwarding message... " )
self . sessionIsLoadedCallback ( )
else :
logger . info ( " Received ' Session is Loaded ' . Our client does not support it, which is the default. Discarding message... " )
def sigtermHandler ( self , signal , frame ) :
""" Wait for the user to quit the program
The user function does not need to exit itself .
Just shutdown audio engines etc .
It is possible , that the client does not implement quit
properly . In that case NSM protocol demands that we quit anyway .
No excuses .
Achtung GDB ! If you run your program with
gdb - - args python foo . py
the Python signal handler will not work . This has nothing to do with this library .
"""
logger . info ( " Telling our client to quit. " )
self . exitProgramCallback ( self . ourPath , self . sessionName , self . ourClientNameUnderNSM )
#There is a chance that exitProgramCallback will hang and the program won't quit. However, this is broken design and bad programming. We COULD place a timeout here and just kill after 10s or so, but that would make quitting our responsibility and fixing a broken thing.
#If we reach this point we have reached the point of no return. Say goodbye.
logger . warning ( " Client did not quit on its own. Sending SIGKILL. " )
kill ( getpid ( ) , SIGKILL )
logger . error ( " SIGKILL did nothing. Do it manually. " )
def debugResetDataAndExit ( self ) :
""" This is solely meant for debugging and testing. The user way of action should be to
remove the client from the session and add a new instance , which will get a different
NSM - ID .
Afterwards we perform a clean exit . """
logger . warning ( " debugResetDataAndExit will now delete {} and then request an exit. " . format ( self . ourPath ) )
if os . path . exists ( self . ourPath ) :
if os . path . isfile ( self . ourPath ) :
try :
os . remove ( self . ourPath )
except Exception as e :
logger . info ( e )
elif os . path . isdir ( self . ourPath ) :
try :
shutil . rmtree ( self . ourPath )
except Exception as e :
logger . info ( e )
else :
logger . info ( " {} does not exist. " . format ( self . ourPath ) )
self . serverSendExitToSelf ( )
def serverSendExitToSelf ( self ) :
""" If you want a very strict client you can block any non-NSM quit-attempts, like ignoring a
qt closeEvent , and instead send the NSM Server a request to close this client .
This method is a shortcut to do just that .
"""
logger . info ( " Sending SIGTERM to ourselves to trigger the exit callback. " )
#if "server-control" in self.serverFeatures:
# message = _OutgoingMessage("/nsm/server/stop")
# message.add_arg("{}".format(self.ourClientId))
# self.sock.sendto(message.build(), self.nsmOSCUrl)
#else:
kill ( getpid ( ) , SIGTERM ) #this calls the exit callback
def serverSendSaveToSelf ( self ) :
""" Some clients want to offer a manual Save function, mostly for psychological reasons.
We offer a clean solution in calling this function which will trigger a round trip over the
NSM server so our client thinks it received a Save instruction . This leads to a clean
state with a good saveStatus and no required extra functionality in the client . """
logger . info ( " instructing the NSM-Server to send Save to ourselves. " )
if " server-control " in self . serverFeatures :
#message = _OutgoingMessage("/nsm/server/save") # "Save All" Command.
message = _OutgoingMessage ( " /nsm/gui/client/save " )
message . add_arg ( " {} " . format ( self . ourClientId ) )
self . sock . sendto ( message . build ( ) , self . nsmOSCUrl )
else :
logger . warning ( " ...but the NSM-Server does not support server control. Server only supports: {} " . format ( self . serverFeatures ) )
def changeLabel ( self , label : str ) :
""" This function is implemented because it is provided by NSM. However, it does not much.
The message gets received but is not saved .
The official NSM GUI uses it but then does not save it .
We would have to send it every startup ourselves .
This is fine for us as clients , but you need to provide a GUI field to enter that label . """
logger . info ( " Telling the NSM-Server that our label is now " + label )
message = _OutgoingMessage ( " /nsm/client/label " )
message . add_arg ( label ) #s:label
self . sock . sendto ( message . build ( ) , self . nsmOSCUrl )
def broadcast ( self , path : str , arguments : list ) :
""" /nsm/server/broadcast s:path [arguments...]
We , as sender , will not receive the broadcast back .
Broadcasts starting with / nsm are not allowed and will get discarded by the server
"""
if path . startswith ( " /nsm " ) :
logger . warning ( " Attempted broadbast starting with /nsm. Not allwoed " )
else :
logger . info ( " Sending broadcast " + path + repr ( arguments ) )
message = _OutgoingMessage ( " /nsm/server/broadcast " )
message . add_arg ( path )
for arg in arguments :
message . add_arg ( arg ) #type autodetect
self . sock . sendto ( message . build ( ) , self . nsmOSCUrl )
def importResource ( self , filePath ) :
""" aka. import into session
ATTENTION ! You will still receive an absolute path from this function . You need to make
sure yourself that this path will not be saved in your save file , but rather use a place -
holder that gets replaced by the actual session path each time . A good point is after
serialisation . search & replace for the session prefix ( " ourPath " ) and replace it with a tag
e . g . < sessionDirectory > . The opposite during load .
Only such a behaviour will make your session portable .
Do not use the following pattern : An alternative that comes to mind is to only work with
relative paths and force your programs workdir to the session directory . Better work with
absolute paths internally .
Symlinks given path into session dir and returns the linked path relative to the ourPath .
It can handles single files as well as whole directories .
if filePath is already a symlink we do not follow it . os . path . realpath or os . readlink will
not be used .
Multilayer links may indicate a users ordering system that depends on
abstractions . e . g . with mounted drives under different names which get symlinked to a
reliable path .
Basically do not question the type of our input filePath .
tar with the follow symlink option has os . path . realpath behaviour and therefore is able
to follow multiple levels of links anyway .
A hardlink does not count as a link and will be detected and treated as real file .
Cleaning up a session directory is either responsibility of the user
or of our client program . We do not provide any means to unlink or delete files from the
session directory .
"""
#Even if the project was not saved yet now it is time to make our directory in the NSM dir.
if not os . path . exists ( self . ourPath ) :
os . makedirs ( self . ourPath )
filePath = os . path . abspath ( filePath ) #includes normalisation
if not os . path . exists ( self . ourPath ) : raise FileNotFoundError ( self . ourPath )
if not os . path . isdir ( self . ourPath ) : raise NotADirectoryError ( self . ourPath )
if not os . access ( self . ourPath , os . W_OK ) : raise PermissionError ( " not writable " , self . ourPath )
if not os . path . exists ( filePath ) : raise FileNotFoundError ( filePath )
if os . path . isdir ( filePath ) : raise IsADirectoryError ( filePath )
if not os . access ( filePath , os . R_OK ) : raise PermissionError ( " not readable " , filePath )
filePathInOurSession = os . path . commonprefix ( [ filePath , self . ourPath ] ) == self . ourPath
linkedPath = os . path . join ( self . ourPath , os . path . basename ( filePath ) )
linkedPathAlreadyExists = os . path . exists ( linkedPath )
if not os . access ( os . path . dirname ( linkedPath ) , os . W_OK ) : raise PermissionError ( " not writable " , os . path . dirname ( linkedPath ) )
if filePathInOurSession :
#loadResource from our session dir. Portable session, manually copied beforehand or just loading a link again.
linkedPath = filePath #we could return here, but we continue to get the tests below.
logger . info ( f " tried to import external resource { filePath } but this is already in our session directory. We use this file directly instead. " )
elif linkedPathAlreadyExists and os . readlink ( linkedPath ) == filePath :
#the imported file already exists as link in our session dir. We do not link it again but simply report the existing link.
#We only check for the first target of the existing link and do not follow it through to a real file.
#This way all user abstractions and file structures will be honored.
linkedPath = linkedPath
logger . info ( f " tried to import external resource { filePath } but this was already linked to our session directory before. We use the old link: { linkedPath } " )
elif linkedPathAlreadyExists :
#A new file shall be imported but it would create a linked name which already exists in our session dir.
#Because we already checked for a new link to the same file above this means actually linking a different file so we need to differentiate with a unique name
firstpart , extension = os . path . splitext ( linkedPath )
uniqueLinkedPath = firstpart + " . " + uuid4 ( ) . hex + extension
assert not os . path . exists ( uniqueLinkedPath )
os . symlink ( filePath , uniqueLinkedPath )
logger . info ( self . ourClientNameUnderNSM + f " :pysm2: tried to import external resource { filePath } but potential target link { linkedPath } already exists. Linked to { uniqueLinkedPath } instead. " )
linkedPath = uniqueLinkedPath
else : #this is the "normal" case. External resources will be linked.
assert not os . path . exists ( linkedPath )
os . symlink ( filePath , linkedPath )
logger . info ( f " imported external resource { filePath } as link { linkedPath } " )
assert os . path . exists ( linkedPath ) , linkedPath
return linkedPath
class NullClient ( object ) :
""" Use this as a drop-in replacement if your program has a mode without NSM but you don ' t want
to change the code itself .
This was originally written for programs that have a core - engine and normal mode of operations
is a GUI with NSM but they also support commandline - scripts and batch processing .
For these you don ' t want NSM. " " "
def __init__ ( self , * args , * * kwargs ) :
self . realClient = False
self . ourClientNameUnderNSM = " NSM Null Client "
def announceSaveStatus ( self , * args ) :
pass
def announceGuiVisibility ( self , * args ) :
pass
def reactToMessage ( self ) :
pass
def importResource ( self ) :
return " "
def serverSendExitToSelf ( self ) :
quit ( )