You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
449 lines
20 KiB
449 lines
20 KiB
#! /usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Copyright 2021, Nils Hilbricht, Germany ( https://www.hilbricht.net )
|
|
|
|
This file is part of the Laborejo Software Suite ( https://www.laborejo.org ),
|
|
|
|
This is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
"""
|
|
import logging; logger = logging.getLogger(__name__); logger.info("import")
|
|
|
|
#Python Standard Library
|
|
from collections import defaultdict
|
|
from statistics import median
|
|
|
|
#Third Party Modules
|
|
from calfbox import cbox
|
|
|
|
#Template Modules
|
|
import template.engine.sequencer
|
|
import template.engine.pitch as pitch
|
|
from template.engine.duration import DB, DL, D1, D2, D4, D8, D16, D32, D64, D128, D256, D512, D1024
|
|
|
|
#Client Modules
|
|
|
|
class Data(template.engine.sequencer.Score):
|
|
"""There must always be a Data class in a file main.py.
|
|
Simply inheriting from engine.data.Data is easiest.
|
|
|
|
You need to match the init parameters of your parent class. They vary from class to class
|
|
of course. Simply copy and paste them from your Data parent class
|
|
|
|
About Vico:
|
|
|
|
We have a track that has 10 layers. Each layer holds events.
|
|
Events are of type Event (a dataclass) and are ONLY create in layer.newEvent
|
|
"""
|
|
|
|
def __init__(self, parentSession):
|
|
logger.info("Creating empty Main Data instance")
|
|
super().__init__(parentSession)
|
|
#Add the Singleton Track instance
|
|
self.allEventsById = {} #This needs to be in init and instanceFromSerialized to be ready ro the loading stage. all events in all layers, keyed by id. value is the actual event instance. Does contain deleted items for undo, but who cares. Just don't use this to iterate over all actual events. This is a quick reference to find items without searching through all layers.
|
|
self.track = self.addTrack("out")
|
|
self.parentSession.recordingEnabled = True #saved. If this is a new file set to true
|
|
self._processAfterInit()
|
|
|
|
|
|
def _processAfterInit(self):
|
|
#self.tempoMap.isTransportMaster = False # no own tempo map. This is already the default.
|
|
#Recording Buffer. Every noteOn inserts its pitch. noteOff removes.
|
|
#Keeping track per layer is only future proofing. The standard mode of operation does not allow recording to multiple layers at once.
|
|
#However, explicit is better than implicit.
|
|
self.noteOnsInProgress = {i:set() for i in range(10)}
|
|
self.track.sequencerInterface.insertEmptyClip() #needed for recording. Creates a maximum length empty clip
|
|
self.eventCollectorForUndo = None #The api switches this between a list and None. Starting in a list enables to record right from the program start
|
|
logger.info("Main Data instance complete")
|
|
self.copyBuffer = []
|
|
|
|
def getCopyBufferCopy(self, layerIndex):
|
|
"""Prepare the copy buffer to get pasted onto a new layer"""
|
|
result = []
|
|
for event in self.copyBuffer:
|
|
ev = event.copy()
|
|
ev.layer = layerIndex
|
|
ev.parentLayer = self.track.layers[layerIndex]
|
|
assert not ev in self.allEventsById
|
|
self.allEventsById[id(ev)] = ev
|
|
result.append(ev)
|
|
return result
|
|
|
|
def liveNoteOn(self, tickindex:int, note:int, velocity:int):
|
|
"""Connected via the api, which sends further callbacks to the GUI.
|
|
"""
|
|
if self.parentSession.recordingEnabled and not tickindex is None:
|
|
assert not self.eventCollectorForUndo is None
|
|
self.noteOnsInProgress[self.track.activeLayer].add(note)
|
|
resultEvent = self.track.layers[self.track.activeLayer].newEvent(tickindex, 0x90, note, velocity, "")
|
|
self.eventCollectorForUndo.append(resultEvent)
|
|
return resultEvent
|
|
|
|
def liveNoteOff(self, tickindex:int, note:int, velocity:int):
|
|
"""Connected via the api, which sends further callbacks to the GUI"""
|
|
if not note in self.noteOnsInProgress[self.track.activeLayer]:
|
|
logger.info("Note Off without Note On")
|
|
return None #It is possible that recording started with a physical midi key already down, thus we received a note-off without a preceding note-on. Ignore.
|
|
|
|
if self.parentSession.recordingEnabled and not tickindex is None:
|
|
assert not self.eventCollectorForUndo is None
|
|
self.noteOnsInProgress[self.track.activeLayer].remove(note)
|
|
resultEvent = self.track.layers[self.track.activeLayer].newEvent(tickindex, 0x80, note, velocity, "")
|
|
self.eventCollectorForUndo.append(resultEvent)
|
|
return resultEvent
|
|
|
|
|
|
def livePolyphonicAftertouch(self, tickindex:int, note:int, value:int):
|
|
"""0xA0 Connected via the api, which sends further callbacks to the GUI"""
|
|
if self.parentSession.recordingEnabled and not tickindex is None:
|
|
assert not self.eventCollectorForUndo is None
|
|
resultEvent = self.track.layers[self.track.activeLayer].newEvent(tickindex, 0xA0, note, value, "")
|
|
self.eventCollectorForUndo.append(resultEvent)
|
|
return resultEvent
|
|
|
|
def liveCC(self, tickindex:int, type:int, value:int):
|
|
"""0xB0 Connected via the api, which sends further callbacks to the GUI"""
|
|
if self.parentSession.recordingEnabled and not tickindex is None:
|
|
assert not self.eventCollectorForUndo is None
|
|
resultEvent = self.track.layers[self.track.activeLayer].newEvent(tickindex, 0xB0, type, value, "")
|
|
self.eventCollectorForUndo.append(resultEvent)
|
|
return resultEvent
|
|
|
|
def liveProgramChange(self, tickindex:int, value:int):
|
|
"""Connected via the api, which sends further callbacks to the GUI"""
|
|
if self.parentSession.recordingEnabled and not tickindex is None:
|
|
assert not self.eventCollectorForUndo is None
|
|
resultEvent = self.track.layers[self.track.activeLayer].newEvent(tickindex, 0xC0, value, 0, "") #Byte2 gets ignored
|
|
self.eventCollectorForUndo.append(resultEvent)
|
|
return resultEvent
|
|
|
|
def liveChannelPressure(self, tickindex:int, value:int):
|
|
"""Connected via the api, which sends further callbacks to the GUI"""
|
|
if self.parentSession.recordingEnabled and not tickindex is None:
|
|
assert not self.eventCollectorForUndo is None
|
|
resultEvent = self.track.layers[self.track.activeLayer].newEvent(tickindex, 0xD0, value, 0, "") #Byte2 gets ignored
|
|
self.eventCollectorForUndo.append(resultEvent)
|
|
return resultEvent
|
|
|
|
def livePitchBend(self, tickindex:int, fine:int, coarse:int):
|
|
"""Pitchbend 0xE0 is a 14 bit value. Byte2 is coarse/MSB, byte1 is fine/LSB.
|
|
Many keyboards leave byte1 as 0 and use only byte2, making PitchBend 128steps.
|
|
|
|
There is also a CC to specify pitchbend musical range and sensitivity,
|
|
but that is for a synth, we just send the CC without knowing its implications"""
|
|
if self.parentSession.recordingEnabled and not tickindex is None:
|
|
assert not self.eventCollectorForUndo is None
|
|
resultEvent = self.track.layers[self.track.activeLayer].newEvent(tickindex, 0xE0, fine, coarse, "")
|
|
self.eventCollectorForUndo.append(resultEvent)
|
|
return resultEvent
|
|
|
|
#def playbackStatusChanged(self, state:bool):
|
|
# """auto-triggered via api callback"""
|
|
# if not state and self.parentSession.recordingEnabled:
|
|
# self._finalizeRecording()
|
|
|
|
def finalizeRecording(self, state:bool): #state comes from the api callback
|
|
"""When recording stops each dangling noteOne can be matched with an artificial noteOff.
|
|
We send out the note off to jack midi via the non-rt message
|
|
|
|
This function basically has two return values. One is immediate, a list of all forced note
|
|
offs. A GUI uses this to stop drawing rectangles with the playback cursor.
|
|
|
|
Then it fills data.eventCollectorForUndo , but with all events since the last recording
|
|
started, including note on and already sent note-off.
|
|
"""
|
|
#assert not self.eventCollectorForUndo is None #Can't do that here because finalizeRecording is called right at the start of the program. Nothing happens because all data structures are empty but eventCollectorForUndo is still None.
|
|
if not state and self.parentSession.recordingEnabled:
|
|
forcedNoteOffs = [] #flat list to return after the layers noteOnBuffer have been cleared
|
|
tickindex = cbox.Transport.status().pos_ppqn
|
|
for layerIndex, noteOnBuffer in self.noteOnsInProgress.items():
|
|
for noteOnPitch in noteOnBuffer:
|
|
cbox.send_midi_event(0x80, noteOnPitch, 0, output=self.track.sequencerInterface.cboxMidiOutUuid)
|
|
resultEvent = self.track.layers[self.track.activeLayer].newEvent(tickindex, 0x80, noteOnPitch, 0, "")
|
|
forcedNoteOffs.append(resultEvent)
|
|
self.eventCollectorForUndo.append(resultEvent)
|
|
noteOnBuffer.clear()
|
|
return forcedNoteOffs
|
|
else:
|
|
return [] #no force note offs
|
|
|
|
#def serialize(self)->dict:
|
|
#dictionary = super().serialize()
|
|
#dictionary.update( { #update in place
|
|
# "track" : self.track.serialize(),
|
|
#})
|
|
#return dictionary
|
|
|
|
@classmethod
|
|
def instanceFromSerializedData(cls, parentSession, serializedData):
|
|
logger.info("Loading Main Data instance from file")
|
|
self = cls.__new__(cls)
|
|
self.allEventsById = {} #This needs to be in init and instanceFromSerialized to be ready ro the loading stage. all events in all layers, keyed by id. value is the actual event instance. Does contain deleted items for undo, but who cares. Just don't use this to iterate over all actual events. This is a quick reference to find items without searching through all layers.
|
|
#Tracks depend on the rest of the data already in place because they create a cache on creation.
|
|
#self.track = Track.instanceFromSerializedData(parentData=self, serializedData=serializedData["track"])
|
|
super().copyFromSerializedData(parentSession, serializedData, self) #Tracks, parentSession and tempoMap
|
|
self.track = self.tracks[0]
|
|
self._processAfterInit()
|
|
return self
|
|
|
|
|
|
class Track(object):
|
|
"""A Vico instance has one hardcoded track with ten subtracks.
|
|
You can use these for notes or CC data.
|
|
"""
|
|
|
|
def __init__(self, parentData, name=None):
|
|
logger.info("Creating empty Track instance")
|
|
self.parentData = parentData
|
|
self.sequencerInterface = template.engine.sequencer.SequencerInterface(parentTrack=self, name=name)
|
|
self.layers = {i:Layer(self, i) for i in range(10)}
|
|
self.activeLayer = 1 #We order 0 before 1 but the keyboard layout trumps programming logic. First key is 1, therefore default layer is 1
|
|
self._processAfterInit()
|
|
|
|
|
|
def _processAfterInit(self):
|
|
"""Call this after either init or instanceFromSerializedData"""
|
|
logger.info("Track instance complete")
|
|
|
|
def lastEventPosition(self):
|
|
"""Same as duration"""
|
|
return max(layer.cachedLastEventPosition for layer in self.layers.values())
|
|
|
|
def firstEventPosition(self):
|
|
return min(layer.cachedFirstEventPosition for layer in self.layers.values())
|
|
|
|
def insertEvent(self, event):
|
|
"""Insert an existing Event object. This is different from layer.newEvent which creates
|
|
the instance"""
|
|
self.layers[event.layer].insertEvent(event)
|
|
|
|
|
|
def deleteEvent(self, event):
|
|
"""Search for an event and delete it. Once found, abort.
|
|
No big search is involved because events know their layer.
|
|
"""
|
|
self.layers[event.layer].deleteEvent(event)
|
|
|
|
#Save / Load / Export
|
|
def serialize(self)->dict:
|
|
|
|
return {
|
|
"sequencerInterface" : self.sequencerInterface.serialize(),
|
|
"activeLayer" : self.activeLayer,
|
|
"layers" : [l.serialize() for l in self.layers.values()],
|
|
}
|
|
|
|
def generateCalfboxMidi(self):
|
|
for layer in self.layers.values():
|
|
layer.generateCalfboxMidi() #only builds if dirty data
|
|
|
|
@classmethod
|
|
def instanceFromSerializedData(cls, parentData, serializedData):
|
|
self = cls.__new__(cls)
|
|
self.parentData = parentData
|
|
self.sequencerInterface = template.engine.sequencer.SequencerInterface.instanceFromSerializedData(self, serializedData["sequencerInterface"])
|
|
self.activeLayer = serializedData["activeLayer"]
|
|
self.layers = {}
|
|
for seriLayer in serializedData["layers"]:
|
|
self.layers[seriLayer["index"]] = Layer.instanceFromSerializedData(self, seriLayer)
|
|
self._processAfterInit()
|
|
return self
|
|
|
|
def export(self)->dict:
|
|
return {
|
|
"sequencerInterface" : self.sequencerInterface.export(),
|
|
"activeLayer" : self.activeLayer,
|
|
}
|
|
|
|
#Dependency Injections.
|
|
template.engine.sequencer.Score.TrackClass = Track #Score will look for Track in its module.
|
|
|
|
class Layer(object):
|
|
"""
|
|
Main data structure is self.events which holds tickspositions and tuples.
|
|
"""
|
|
|
|
def __init__(self, parentTrack, index:int):
|
|
self.parentTrack = parentTrack
|
|
self.index = index
|
|
self.color = "cyan"
|
|
self.events = defaultdict(list) # statusType: [Event]
|
|
self.dirty = False # indicates wether this needs re-export. Obviously not saved.
|
|
self.midiChannel = 1 #1-16 inclusive.
|
|
self._processAfterInit()
|
|
|
|
def _processAfterInit(self):
|
|
"""Call this after either init or instanceFromSerializedData"""
|
|
self.cachedMedianVelocity = 64
|
|
self.cachedLastEventPosition = 0 #set in generateCalfboxMidi. Used by the track
|
|
self.cachedFirstEventPosition = 0 #set in generateCalfboxMidi. Used by the track
|
|
|
|
def newEvent(self, tickindex:int, statusType:int, byte1:int, byte2:int, freeText:str):
|
|
"""The only place in the program where events get created, except Score.getCopyBufferCopy
|
|
and where self.parentTrack.parentData.allEventsById gets populated. """
|
|
ev = Event(tickindex, statusType, byte1, byte2, self.index, freeText)
|
|
ev.parentLayer = self
|
|
self.parentTrack.parentData.allEventsById[id(ev)] = ev
|
|
self.events[statusType].append(ev)
|
|
self.dirty = True
|
|
return ev
|
|
|
|
def insertEvent(self, event):
|
|
"""Insert an existing Event object, that was at one time created through newEvent"""
|
|
assert not event in self.events[event.status]
|
|
self.events[event.status].append(event)
|
|
event.parentLayer = self
|
|
self.dirty = True
|
|
|
|
def deleteEvent(self, event):
|
|
"""Don't be surprised. This gets called by api.createNote to temporarily remove
|
|
notes and later add them with insertEvent again"""
|
|
try:
|
|
self.events[event.status].remove(event)
|
|
except ValueError:
|
|
logger.error(f"Event was not in our list. Layer{self.index}. Event: {event}")
|
|
self.dirty = True
|
|
|
|
#Save / Load / Export
|
|
def serialize(self)->dict:
|
|
events = []
|
|
|
|
for statusType, eventlist in self.events.items():
|
|
for event in eventlist:
|
|
events.append(event.serialize())
|
|
|
|
return {
|
|
"index" : self.index,
|
|
"color" : self.color,
|
|
"events": events,
|
|
"midiChannel": self.midiChannel,
|
|
}
|
|
|
|
@classmethod
|
|
def instanceFromSerializedData(cls, parentTrack, serializedData):
|
|
self = cls.__new__(cls)
|
|
self.parentTrack = parentTrack
|
|
self.color = serializedData["color"]
|
|
self.index = serializedData["index"]
|
|
self.midiChannel = serializedData["midiChannel"]
|
|
self.events = defaultdict(list) # statusType: [Event]
|
|
for seriEvent in serializedData["events"]: #tuples or list
|
|
self.newEvent(*seriEvent)
|
|
self.dirty = True
|
|
self._processAfterInit()
|
|
return self
|
|
|
|
def generateCalfboxMidi(self):
|
|
"""We use subtracks, therefore they do not change the cachedDuration of the song.
|
|
Instead we save our own version of cached ticks of the actual song length.
|
|
Useful for the GUI so it doesn not have to draw MAX_DURATION"""
|
|
if self.dirty:
|
|
velocities = []
|
|
blob = bytes()
|
|
maxPos = 0
|
|
minPos = template.engine.sequencer.MAXIMUM_TICK_DURATION
|
|
for status, eventlist in self.events.items():
|
|
for event in eventlist:
|
|
if event.position > maxPos:
|
|
maxPos = event.position
|
|
if event.position < minPos:
|
|
minPos = event.position
|
|
if event.status == 0x90:
|
|
velocities.append(event.byte2)
|
|
blob += event.toCboxBytes()
|
|
|
|
if velocities:
|
|
self.cachedMedianVelocity = int(median(velocities))
|
|
|
|
self.parentTrack.sequencerInterface.setSubtrack(self.index, [(blob, 0, maxPos+1)]) #(bytes-blob, position, length)
|
|
self.dirty = False
|
|
self.cachedLastEventPosition = maxPos
|
|
self.cachedFirstEventPosition = minPos
|
|
|
|
|
|
def _exportEvents(self) -> list:
|
|
"""Includes generating midi.
|
|
This is not called after every change!
|
|
In fact it is only triggered by api._layerChanged which happens once on program start
|
|
and on very extensive operations on the whole layer (like transpose)
|
|
"""
|
|
result = []
|
|
for status, eventlist in self.events.items():
|
|
for event in eventlist:
|
|
result.append(event.export())
|
|
self.generateCalfboxMidi()
|
|
return sorted(result, key=lambda e: e["position"])
|
|
|
|
|
|
def export(self)->dict:
|
|
return {
|
|
"index" : self.index,
|
|
"color" : self.color,
|
|
"events" : self._exportEvents(), #side effect: generate midi
|
|
}
|
|
|
|
|
|
|
|
class Event:
|
|
"""
|
|
[0x90, 60, 100] for a loud middle c on channel 0
|
|
[0xE0, 4, 85] pitchbend to 85*128 + 4 = 10884 steps
|
|
|
|
There is no duration value. Connections, e.g. note rectangles in the GUI, have to be calculated.
|
|
|
|
Byte2 can be None, for Program Change or Channel Pressure.
|
|
|
|
Vico events have no channel. The recorded channel is converted to channel 0.
|
|
"""
|
|
def __init__(self, position:int, status:int, byte1:int, byte2:int, layer:int, freeText:str=""):
|
|
self.position = position
|
|
self.status = status # e.g. 0x90 for note-on or 0xE0 for pitchbend
|
|
self.byte1 = byte1 # e.g. 60 for middle c
|
|
self.byte2 = byte2 #eg. 100 for a rather loud note. Can be None for Program Changes.
|
|
self.layer = layer #0-9 incl. . Events need to know their layer for undo.
|
|
self.freeText = freeText
|
|
#self.parentLayer = Injected and changed by the Layer itself.
|
|
|
|
def serialize(self)->tuple:
|
|
return (int(self.position), self.status, self.byte1, self.byte2, self.freeText)
|
|
|
|
def export(self)->dict:
|
|
return {
|
|
"id": id(self),
|
|
"position": self.position,
|
|
"status" : self.status,
|
|
"byte1" : self.byte1,
|
|
"byte2" : self.byte2,
|
|
"layer" : self.layer,
|
|
"freeText" : self.freeText,
|
|
}
|
|
|
|
|
|
def toCboxBytes(self)->bytes:
|
|
byte1 = pitch.midiPitchLimiter(self.byte1, 0)
|
|
byte2 = pitch.midiPitchLimiter(self.byte2, 0)
|
|
status = self.status + self.parentLayer.midiChannel - 1 #we index channels from 1 to 16, so -1 here because status is itself already chan 1
|
|
if self.position >= 0:
|
|
return cbox.Pattern.serialize_event(self.position, status, byte1, byte2)
|
|
else:
|
|
logger.warning(f"Event {self.byte1},{self.byte2} has position less than 0. Limiting to 0 in midi output. Please fix manually")
|
|
return cbox.Pattern.serialize_event(0, status, byte1, byte2)
|
|
|
|
def __repr__(self):
|
|
return str(self.export())
|
|
|
|
def copy(self):
|
|
"""Returns a standalone copy of self. Needs to be inserted into a layer."""
|
|
#return Event(*self.serialize(), layer=None)
|
|
return Event(position=self.position, status=self.status, byte1=self.byte1, byte2=self.byte2, layer=None, freeText=self.freeText)
|
|
|