You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

449 lines
20 KiB

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Copyright 2021, Nils Hilbricht, Germany ( https://www.hilbricht.net )
This file is part of the Laborejo Software Suite ( https://www.laborejo.org ),
This is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import logging; logger = logging.getLogger(__name__); logger.info("import")
#Python Standard Library
from collections import defaultdict
from statistics import median
#Third Party Modules
from calfbox import cbox
#Template Modules
import template.engine.sequencer
import template.engine.pitch as pitch
from template.engine.duration import DB, DL, D1, D2, D4, D8, D16, D32, D64, D128, D256, D512, D1024
#Client Modules
class Data(template.engine.sequencer.Score):
"""There must always be a Data class in a file main.py.
Simply inheriting from engine.data.Data is easiest.
You need to match the init parameters of your parent class. They vary from class to class
of course. Simply copy and paste them from your Data parent class
About Vico:
We have a track that has 10 layers. Each layer holds events.
Events are of type Event (a dataclass) and are ONLY create in layer.newEvent
"""
def __init__(self, parentSession):
logger.info("Creating empty Main Data instance")
super().__init__(parentSession)
#Add the Singleton Track instance
self.allEventsById = {} #This needs to be in init and instanceFromSerialized to be ready ro the loading stage. all events in all layers, keyed by id. value is the actual event instance. Does contain deleted items for undo, but who cares. Just don't use this to iterate over all actual events. This is a quick reference to find items without searching through all layers.
self.track = self.addTrack("out")
self.parentSession.recordingEnabled = True #saved. If this is a new file set to true
self._processAfterInit()
def _processAfterInit(self):
#self.tempoMap.isTransportMaster = False # no own tempo map. This is already the default.
#Recording Buffer. Every noteOn inserts its pitch. noteOff removes.
#Keeping track per layer is only future proofing. The standard mode of operation does not allow recording to multiple layers at once.
#However, explicit is better than implicit.
self.noteOnsInProgress = {i:set() for i in range(10)}
self.track.sequencerInterface.insertEmptyClip() #needed for recording. Creates a maximum length empty clip
self.eventCollectorForUndo = None #The api switches this between a list and None. Starting in a list enables to record right from the program start
logger.info("Main Data instance complete")
self.copyBuffer = []
def getCopyBufferCopy(self, layerIndex):
"""Prepare the copy buffer to get pasted onto a new layer"""
result = []
for event in self.copyBuffer:
ev = event.copy()
ev.layer = layerIndex
ev.parentLayer = self.track.layers[layerIndex]
assert not ev in self.allEventsById
self.allEventsById[id(ev)] = ev
result.append(ev)
return result
def liveNoteOn(self, tickindex:int, note:int, velocity:int):
"""Connected via the api, which sends further callbacks to the GUI.
"""
if self.parentSession.recordingEnabled and not tickindex is None:
assert not self.eventCollectorForUndo is None
self.noteOnsInProgress[self.track.activeLayer].add(note)
resultEvent = self.track.layers[self.track.activeLayer].newEvent(tickindex, 0x90, note, velocity, "")
self.eventCollectorForUndo.append(resultEvent)
return resultEvent
def liveNoteOff(self, tickindex:int, note:int, velocity:int):
"""Connected via the api, which sends further callbacks to the GUI"""
if not note in self.noteOnsInProgress[self.track.activeLayer]:
logger.info("Note Off without Note On")
return None #It is possible that recording started with a physical midi key already down, thus we received a note-off without a preceding note-on. Ignore.
if self.parentSession.recordingEnabled and not tickindex is None:
assert not self.eventCollectorForUndo is None
self.noteOnsInProgress[self.track.activeLayer].remove(note)
resultEvent = self.track.layers[self.track.activeLayer].newEvent(tickindex, 0x80, note, velocity, "")
self.eventCollectorForUndo.append(resultEvent)
return resultEvent
def livePolyphonicAftertouch(self, tickindex:int, note:int, value:int):
"""0xA0 Connected via the api, which sends further callbacks to the GUI"""
if self.parentSession.recordingEnabled and not tickindex is None:
assert not self.eventCollectorForUndo is None
resultEvent = self.track.layers[self.track.activeLayer].newEvent(tickindex, 0xA0, note, value, "")
self.eventCollectorForUndo.append(resultEvent)
return resultEvent
def liveCC(self, tickindex:int, type:int, value:int):
"""0xB0 Connected via the api, which sends further callbacks to the GUI"""
if self.parentSession.recordingEnabled and not tickindex is None:
assert not self.eventCollectorForUndo is None
resultEvent = self.track.layers[self.track.activeLayer].newEvent(tickindex, 0xB0, type, value, "")
self.eventCollectorForUndo.append(resultEvent)
return resultEvent
def liveProgramChange(self, tickindex:int, value:int):
"""Connected via the api, which sends further callbacks to the GUI"""
if self.parentSession.recordingEnabled and not tickindex is None:
assert not self.eventCollectorForUndo is None
resultEvent = self.track.layers[self.track.activeLayer].newEvent(tickindex, 0xC0, value, 0, "") #Byte2 gets ignored
self.eventCollectorForUndo.append(resultEvent)
return resultEvent
def liveChannelPressure(self, tickindex:int, value:int):
"""Connected via the api, which sends further callbacks to the GUI"""
if self.parentSession.recordingEnabled and not tickindex is None:
assert not self.eventCollectorForUndo is None
resultEvent = self.track.layers[self.track.activeLayer].newEvent(tickindex, 0xD0, value, 0, "") #Byte2 gets ignored
self.eventCollectorForUndo.append(resultEvent)
return resultEvent
def livePitchBend(self, tickindex:int, fine:int, coarse:int):
"""Pitchbend 0xE0 is a 14 bit value. Byte2 is coarse/MSB, byte1 is fine/LSB.
Many keyboards leave byte1 as 0 and use only byte2, making PitchBend 128steps.
There is also a CC to specify pitchbend musical range and sensitivity,
but that is for a synth, we just send the CC without knowing its implications"""
if self.parentSession.recordingEnabled and not tickindex is None:
assert not self.eventCollectorForUndo is None
resultEvent = self.track.layers[self.track.activeLayer].newEvent(tickindex, 0xE0, fine, coarse, "")
self.eventCollectorForUndo.append(resultEvent)
return resultEvent
#def playbackStatusChanged(self, state:bool):
# """auto-triggered via api callback"""
# if not state and self.parentSession.recordingEnabled:
# self._finalizeRecording()
def finalizeRecording(self, state:bool): #state comes from the api callback
"""When recording stops each dangling noteOne can be matched with an artificial noteOff.
We send out the note off to jack midi via the non-rt message
This function basically has two return values. One is immediate, a list of all forced note
offs. A GUI uses this to stop drawing rectangles with the playback cursor.
Then it fills data.eventCollectorForUndo , but with all events since the last recording
started, including note on and already sent note-off.
"""
#assert not self.eventCollectorForUndo is None #Can't do that here because finalizeRecording is called right at the start of the program. Nothing happens because all data structures are empty but eventCollectorForUndo is still None.
if not state and self.parentSession.recordingEnabled:
forcedNoteOffs = [] #flat list to return after the layers noteOnBuffer have been cleared
tickindex = cbox.Transport.status().pos_ppqn
for layerIndex, noteOnBuffer in self.noteOnsInProgress.items():
for noteOnPitch in noteOnBuffer:
cbox.send_midi_event(0x80, noteOnPitch, 0, output=self.track.sequencerInterface.cboxMidiOutUuid)
resultEvent = self.track.layers[self.track.activeLayer].newEvent(tickindex, 0x80, noteOnPitch, 0, "")
forcedNoteOffs.append(resultEvent)
self.eventCollectorForUndo.append(resultEvent)
noteOnBuffer.clear()
return forcedNoteOffs
else:
return [] #no force note offs
#def serialize(self)->dict:
#dictionary = super().serialize()
#dictionary.update( { #update in place
# "track" : self.track.serialize(),
#})
#return dictionary
@classmethod
def instanceFromSerializedData(cls, parentSession, serializedData):
logger.info("Loading Main Data instance from file")
self = cls.__new__(cls)
self.allEventsById = {} #This needs to be in init and instanceFromSerialized to be ready ro the loading stage. all events in all layers, keyed by id. value is the actual event instance. Does contain deleted items for undo, but who cares. Just don't use this to iterate over all actual events. This is a quick reference to find items without searching through all layers.
#Tracks depend on the rest of the data already in place because they create a cache on creation.
#self.track = Track.instanceFromSerializedData(parentData=self, serializedData=serializedData["track"])
super().copyFromSerializedData(parentSession, serializedData, self) #Tracks, parentSession and tempoMap
self.track = self.tracks[0]
self._processAfterInit()
return self
class Track(object):
"""A Vico instance has one hardcoded track with ten subtracks.
You can use these for notes or CC data.
"""
def __init__(self, parentData, name=None):
logger.info("Creating empty Track instance")
self.parentData = parentData
self.sequencerInterface = template.engine.sequencer.SequencerInterface(parentTrack=self, name=name)
self.layers = {i:Layer(self, i) for i in range(10)}
self.activeLayer = 1 #We order 0 before 1 but the keyboard layout trumps programming logic. First key is 1, therefore default layer is 1
self._processAfterInit()
def _processAfterInit(self):
"""Call this after either init or instanceFromSerializedData"""
logger.info("Track instance complete")
def lastEventPosition(self):
"""Same as duration"""
return max(layer.cachedLastEventPosition for layer in self.layers.values())
def firstEventPosition(self):
return min(layer.cachedFirstEventPosition for layer in self.layers.values())
def insertEvent(self, event):
"""Insert an existing Event object. This is different from layer.newEvent which creates
the instance"""
self.layers[event.layer].insertEvent(event)
def deleteEvent(self, event):
"""Search for an event and delete it. Once found, abort.
No big search is involved because events know their layer.
"""
self.layers[event.layer].deleteEvent(event)
#Save / Load / Export
def serialize(self)->dict:
return {
"sequencerInterface" : self.sequencerInterface.serialize(),
"activeLayer" : self.activeLayer,
"layers" : [l.serialize() for l in self.layers.values()],
}
def generateCalfboxMidi(self):
for layer in self.layers.values():
layer.generateCalfboxMidi() #only builds if dirty data
@classmethod
def instanceFromSerializedData(cls, parentData, serializedData):
self = cls.__new__(cls)
self.parentData = parentData
self.sequencerInterface = template.engine.sequencer.SequencerInterface.instanceFromSerializedData(self, serializedData["sequencerInterface"])
self.activeLayer = serializedData["activeLayer"]
self.layers = {}
for seriLayer in serializedData["layers"]:
self.layers[seriLayer["index"]] = Layer.instanceFromSerializedData(self, seriLayer)
self._processAfterInit()
return self
def export(self)->dict:
return {
"sequencerInterface" : self.sequencerInterface.export(),
"activeLayer" : self.activeLayer,
}
#Dependency Injections.
template.engine.sequencer.Score.TrackClass = Track #Score will look for Track in its module.
class Layer(object):
"""
Main data structure is self.events which holds tickspositions and tuples.
"""
def __init__(self, parentTrack, index:int):
self.parentTrack = parentTrack
self.index = index
self.color = "cyan"
self.events = defaultdict(list) # statusType: [Event]
self.dirty = False # indicates wether this needs re-export. Obviously not saved.
self.midiChannel = 1 #1-16 inclusive.
self._processAfterInit()
def _processAfterInit(self):
"""Call this after either init or instanceFromSerializedData"""
self.cachedMedianVelocity = 64
self.cachedLastEventPosition = 0 #set in generateCalfboxMidi. Used by the track
self.cachedFirstEventPosition = 0 #set in generateCalfboxMidi. Used by the track
def newEvent(self, tickindex:int, statusType:int, byte1:int, byte2:int, freeText:str):
"""The only place in the program where events get created, except Score.getCopyBufferCopy
and where self.parentTrack.parentData.allEventsById gets populated. """
ev = Event(tickindex, statusType, byte1, byte2, self.index, freeText)
ev.parentLayer = self
self.parentTrack.parentData.allEventsById[id(ev)] = ev
self.events[statusType].append(ev)
self.dirty = True
return ev
def insertEvent(self, event):
"""Insert an existing Event object, that was at one time created through newEvent"""
assert not event in self.events[event.status]
self.events[event.status].append(event)
event.parentLayer = self
self.dirty = True
def deleteEvent(self, event):
"""Don't be surprised. This gets called by api.createNote to temporarily remove
notes and later add them with insertEvent again"""
try:
self.events[event.status].remove(event)
except ValueError:
logger.error(f"Event was not in our list. Layer{self.index}. Event: {event}")
self.dirty = True
#Save / Load / Export
def serialize(self)->dict:
events = []
for statusType, eventlist in self.events.items():
for event in eventlist:
events.append(event.serialize())
return {
"index" : self.index,
"color" : self.color,
"events": events,
"midiChannel": self.midiChannel,
}
@classmethod
def instanceFromSerializedData(cls, parentTrack, serializedData):
self = cls.__new__(cls)
self.parentTrack = parentTrack
self.color = serializedData["color"]
self.index = serializedData["index"]
self.midiChannel = serializedData["midiChannel"]
self.events = defaultdict(list) # statusType: [Event]
for seriEvent in serializedData["events"]: #tuples or list
self.newEvent(*seriEvent)
self.dirty = True
self._processAfterInit()
return self
def generateCalfboxMidi(self):
"""We use subtracks, therefore they do not change the cachedDuration of the song.
Instead we save our own version of cached ticks of the actual song length.
Useful for the GUI so it doesn not have to draw MAX_DURATION"""
if self.dirty:
velocities = []
blob = bytes()
maxPos = 0
minPos = template.engine.sequencer.MAXIMUM_TICK_DURATION
for status, eventlist in self.events.items():
for event in eventlist:
if event.position > maxPos:
maxPos = event.position
if event.position < minPos:
minPos = event.position
if event.status == 0x90:
velocities.append(event.byte2)
blob += event.toCboxBytes()
if velocities:
self.cachedMedianVelocity = int(median(velocities))
self.parentTrack.sequencerInterface.setSubtrack(self.index, [(blob, 0, maxPos+1)]) #(bytes-blob, position, length)
self.dirty = False
self.cachedLastEventPosition = maxPos
self.cachedFirstEventPosition = minPos
def _exportEvents(self) -> list:
"""Includes generating midi.
This is not called after every change!
In fact it is only triggered by api._layerChanged which happens once on program start
and on very extensive operations on the whole layer (like transpose)
"""
result = []
for status, eventlist in self.events.items():
for event in eventlist:
result.append(event.export())
self.generateCalfboxMidi()
return sorted(result, key=lambda e: e["position"])
def export(self)->dict:
return {
"index" : self.index,
"color" : self.color,
"events" : self._exportEvents(), #side effect: generate midi
}
class Event:
"""
[0x90, 60, 100] for a loud middle c on channel 0
[0xE0, 4, 85] pitchbend to 85*128 + 4 = 10884 steps
There is no duration value. Connections, e.g. note rectangles in the GUI, have to be calculated.
Byte2 can be None, for Program Change or Channel Pressure.
Vico events have no channel. The recorded channel is converted to channel 0.
"""
def __init__(self, position:int, status:int, byte1:int, byte2:int, layer:int, freeText:str=""):
self.position = position
self.status = status # e.g. 0x90 for note-on or 0xE0 for pitchbend
self.byte1 = byte1 # e.g. 60 for middle c
self.byte2 = byte2 #eg. 100 for a rather loud note. Can be None for Program Changes.
self.layer = layer #0-9 incl. . Events need to know their layer for undo.
self.freeText = freeText
#self.parentLayer = Injected and changed by the Layer itself.
def serialize(self)->tuple:
return (int(self.position), self.status, self.byte1, self.byte2, self.freeText)
def export(self)->dict:
return {
"id": id(self),
"position": self.position,
"status" : self.status,
"byte1" : self.byte1,
"byte2" : self.byte2,
"layer" : self.layer,
"freeText" : self.freeText,
}
def toCboxBytes(self)->bytes:
byte1 = pitch.midiPitchLimiter(self.byte1, 0)
byte2 = pitch.midiPitchLimiter(self.byte2, 0)
status = self.status + self.parentLayer.midiChannel - 1 #we index channels from 1 to 16, so -1 here because status is itself already chan 1
if self.position >= 0:
return cbox.Pattern.serialize_event(self.position, status, byte1, byte2)
else:
logger.warning(f"Event {self.byte1},{self.byte2} has position less than 0. Limiting to 0 in midi output. Please fix manually")
return cbox.Pattern.serialize_event(0, status, byte1, byte2)
def __repr__(self):
return str(self.export())
def copy(self):
"""Returns a standalone copy of self. Needs to be inserted into a layer."""
#return Event(*self.serialize(), layer=None)
return Event(position=self.position, status=self.status, byte1=self.byte1, byte2=self.byte2, layer=None, freeText=self.freeText)