Commit 95478195 authored by Koen Martens's avatar Koen Martens
Browse files

Refactor script out into modules

parent a6dfb2f9
......@@ -30,11 +30,13 @@ class JackClient:
except queue.Empty:
pass
self.client.set_process_callback(process)
self.client.activate()
def play_audio_chunk(self, chunk):
self.audio_queue.put(chunk)
def play_audio_chunks(self, chunks):
self.silence_audio()
for chunk in chunks:
self.play_audio_chunk(chunk)
......
......@@ -13,6 +13,7 @@ class NoteJudge(Thread):
self._midi_queue = Queue()
self._midi_producer.add_listener(self._midi_queue)
self._listeners = set()
self._notes_on = set()
def _send_judgement(self, verdict):
for listener in self._listeners:
......@@ -21,7 +22,7 @@ class NoteJudge(Thread):
def _judge(self):
if len(self._notes_on) < 2:
verdict = None
if len(self._notes_on) > 2:
elif len(self._notes_on) > 2:
verdict = False
else:
verdict = ((max(self._notes_on) - min(self._notes_on)) == 4)
......@@ -37,12 +38,13 @@ class NoteJudge(Thread):
def stop(self):
self._running = False
self._midi_queue.put(None)
self.join()
def run(self):
while self._running:
message = self._midi_queue.get()
if message:
self._process_message()
self._process_message(message)
def add_listener(self, listener):
self._listeners.add(listener)
from queue import Queue
from threading import Thread
import mido
class MidiParser(Thread):
def __init__(self, jack_client):
......@@ -8,7 +10,7 @@ class MidiParser(Thread):
self._running = True
self._jack_client = jack_client
self._in_queue = Queue()
self._jack_client.add_midi_listener(self.in_queue)
self._jack_client.add_midi_listener(self._in_queue)
self._parser = mido.Parser()
self._listeners = set()
self._notes_on = set()
......@@ -16,6 +18,7 @@ class MidiParser(Thread):
def stop(self):
self._running = False
self._in_queue.put(None)
self.join()
def _process_message(self, message):
for listener in self._listeners:
......
......@@ -8,10 +8,11 @@ import numpy as np
class AudioReport(Thread):
def __init__(self, jack_client, judge):
super().__init__()
self._jack_client = jack_client
self._judge = judge
self._judgement_queue = Queue()
self.judge.add_listener(self._judgement_queue)
self._judge.add_listener(self._judgement_queue)
self._running = True
self._good_sound = self._load_sound('audio/411089__inspectorj__bell-candle-damper-a-h1.wav')
self._bad_sound = self._load_sound('audio/164090__hypocore__buzzer.wav')
......@@ -35,12 +36,18 @@ class AudioReport(Thread):
if judgement:
self._play_sound(judgement.verdict)
def stop(self):
self._running = False
self._judgement_queue.put(None)
self.join()
class PrintReport(Thread):
def __init__(self, judge):
super().__init__()
self._judge = judge
self._judgement_queue = Queue()
self.judge.add_listener(self._judgement_queue)
self._judge.add_listener(self._judgement_queue)
self._running = True
@staticmethod
......@@ -58,3 +65,8 @@ class PrintReport(Thread):
elif judgement.verdict is False:
verdict = "BAD : "
print(f'{verdict}{[self.describe_note(note) for note in judgement.notes]}')
def stop(self):
self._running = False
self._judgement_queue.put(None)
self.join()
#!/usr/bin/env python3
import jack
import binascii
import mido
import queue
import soundfile as sf
import numpy as np
client = jack.Client('MIDI Music Trainer')
midi_port = client.midi_inports.register('input')
audio_port = client.outports.register('verdict')
parser = mido.Parser()
message_queue = queue.Queue()
audio_queue = queue.Queue()
block_size = client.blocksize
@client.set_process_callback
def process(frames):
for offset, data in midi_port.incoming_midi_events():
parser.feed(data[:])
for message in parser:
message_queue.put(message)
try:
data = audio_queue.get_nowait()
audio_port.get_array()[:] = data
except queue.Empty:
pass
client.activate()
def describe_note(note):
notes = ['C', 'C♯/D♭', 'D', 'D♯/E♭', 'E', 'F', 'F♯/G♭', 'G', 'G♯/A♭', 'A', 'A♯/B♭', 'B']
return notes[note % len(notes)]
def judge_notes(notes):
if len(notes) < 2:
return None
if len(notes) > 2:
return False
return (max(notes) - min(notes)) == 4
def load_sound(file_name, blocksize):
sound_file = sf.SoundFile(file_name)
blocks = list(sound_file.blocks(blocksize=blocksize, dtype='float32', always_2d=True, fill_value=0))
blocks = [np.mean(block, axis=1) for block in blocks]
return blocks
def load_sounds(blocksize):
good = load_sound('audio/411089__inspectorj__bell-candle-damper-a-h1.wav', blocksize)
bad = load_sound('audio/164090__hypocore__buzzer.wav', blocksize)
return good, bad
def play_sound(sound):
while not audio_queue.empty():
try:
audio_queue.get_nowait()
except queue.Empty:
pass
for block in sound:
audio_queue.put(block)
good_sound, bad_sound = load_sounds(block_size)
with client:
notes_on = set()
while True:
message = message_queue.get()
print(f'get {message}')
if message.type == 'note_on':
notes_on.add(message.note)
elif message.type == 'note_off':
notes_on.remove(message.note)
else:
continue
print(notes_on)
print([describe_note(note) for note in notes_on])
verdict = judge_notes(notes_on)
if verdict is True:
print("GOOD")
play_sound(good_sound)
elif verdict is False:
print("BAD")
play_sound(bad_sound)
print('#' * 80)
print('press Return to quit')
print('#' * 80)
input()
from midimusictrainer.jack import JackClient
from midimusictrainer.judge import NoteJudge
from midimusictrainer.midi import MidiParser
from midimusictrainer.report import AudioReport, PrintReport
jack_client = JackClient()
midi_parser = MidiParser(jack_client)
note_judge = NoteJudge(midi_parser)
audio_reporter = AudioReport(jack_client, note_judge)
print_reporter = PrintReport(note_judge)
print_reporter.start()
audio_reporter.start()
note_judge.start()
midi_parser.start()
jack_client.start()
print('#' * 80)
print('press Return to quit')
print('#' * 80)
input()
audio_reporter.stop()
print_reporter.stop()
note_judge.stop()
midi_parser.stop()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment