其他
程序员奶爸用树莓派制作婴儿监护仪:哭声自动通知,还能分析何时喂奶
能够在任何带有廉价USB麦克风的廉价设备上运行,比如树莓派等。
能够检测到婴儿的哭声,并在宝宝开始哭或停止哭泣的时候通知我(最好通过手机);或者在宝宝哭的时候在仪表板上显示跟踪的数据点;或者执行任何我希望执行的任务。不仅仅是充当哑巴对讲机,将声音从源头传递到唯一的兼容设备上。
能够在任何设备上传输音频流,我自己的扬声器、我的智能手机、我的计算机等。
无论声源与扬声器之间的距离是多少,都能够正常工作,我不需要在房屋内移动扬声器。
配有摄像头,这样我就可以实时检查宝宝的状况;或者在宝宝哭的时候,我可以看到婴儿床的图片或简短的录像,以检查一切是否正常。
录制一些音频样本
[sudo]apt-get install ffmpeg lame libatlas-base-dev alsa-utils
[sudo]pip3 install tensorflow
****List of CAPTURE Hardware Devices ****
card1: Device [USB PnP Sound Device], device 0: USB Audio [USB Audio]
Subdevices: 0/1
Subdevice #0: subdevice #0
card2: Device_1 [USB PnP Sound Device], device 0: USB Audio [USB Audio]
Subdevices: 0/1
Subdevice #0: subdevice #0
标记音频样本
~/datasets/sound-detect/audio
-> sample_1
-> audio.mp3
-> labels.json
-> sample_2
-> audio.mp3
-> labels.json
...
{
"00:00": "negative",
"02:13": "positive",
"04:57": "negative",
"15:41": "positive",
"18:24": "negative"
}
生成数据集
gitclone git@github.com:/BlackLight/micmon.git
cdmicmon
[sudo]pip3 install -r requirements.txt
[sudo]python3 setup.py build install
micmon-datagen\
--low 250 --high 2500 --bins 100 \
--sample-duration 2 --channels 1 \
~/datasets/sound-detect/audio ~/datasets/sound-detect/data
importos
frommicmon.audio import AudioDirectory, AudioPlayer, AudioFile
frommicmon.dataset import DatasetWriter
basedir= os.path.expanduser('~/datasets/sound-detect')
audio_dir= os.path.join(basedir, 'audio')
datasets_dir= os.path.join(basedir, 'data')
cutoff_frequencies= [250, 2500]
#Scan the base audio_dir for labelled audio samples
audio_dirs= AudioDirectory.scan(audio_dir)
#Save the spectrum information and labels of the samples to a
#different compressed file for each audio file.
foraudio_dir in audio_dirs:
dataset_file = os.path.join(datasets_dir,os.path.basename(audio_dir.path) + '.npz')
print(f'Processing audio sample{audio_dir.path}')
with AudioFile(audio_dir) as reader, \
DatasetWriter(dataset_file,
low_freq=cutoff_frequencies[0],
high_freq=cutoff_frequencies[1]) as writer:
for sample in reader:
writer += sample
训练模型
importos
fromtensorflow.keras import layers
frommicmon.dataset import Dataset
frommicmon.model import Model
#This is a directory that contains the saved .npz dataset files
datasets_dir= os.path.expanduser('~/datasets/sound-detect/data')
#This is the output directory where the model will be saved
model_dir= os.path.expanduser('~/models/sound-detect')
#This is the number of training epochs for each dataset sample
epochs= 2
#Load the datasets from the compressed files.
#70% of the data points will be included in the training set,
#30% of the data points will be included in the evaluation set
#and used to evaluate the performance of the model.
datasets= Dataset.scan(datasets_dir, validation_split=0.3)
labels= ['negative', 'positive']
freq_bins= len(datasets[0].samples[0])
#Create a network with 4 layers (one input layer, two intermediate layers andone output layer).
#The first intermediate layer in this example will have twice the number ofunits as the number
#of input units, while the second intermediate layer will have 75% of the numberof
#input units. We also specify the names for the labels and the low and high frequencyrange
#used when sampling.
model= Model(
[
layers.Input(shape=(freq_bins,)),
layers.Dense(int(2 * freq_bins),activation='relu'),
layers.Dense(int(0.75 * freq_bins),activation='relu'),
layers.Dense(len(labels),activation='softmax'),
],
labels=labels,
low_freq=datasets[0].low_freq,
high_freq=datasets[0].high_freq
)
#Train the model
forepoch in range(epochs):
for i, dataset in enumerate(datasets):
print(f'[epoch {epoch+1}/{epochs}][audio sample {i+1}/{len(datasets)}]')
model.fit(dataset)
evaluation = model.evaluate(dataset)
print(f'Validation set loss andaccuracy: {evaluation}')
#Save the model
model.save(model_dir,overwrite=True)
使用模型进行预测
importos
frommicmon.audio import AudioDevice
frommicmon.model import Model
model_dir= os.path.expanduser('~/models/sound-detect')
model= Model.load(model_dir)
audio_system= 'alsa' # Supported: alsa andpulse
audio_device= 'plughw:2,0' # Get list of recognizedinput devices with arecord -l
withAudioDevice(audio_system, device=audio_device) as source:
for sample in source:
source.pause() # Pause recording while we process the frame
prediction = model.predict(sample)
print(prediction)
source.resume() #Resume recording
[sudo]apt-get install redis-server
[sudo]systemctl start redis-server.service
[sudo]systemctl enable redis-server.service
[sudo]pip3 install 'platypush[http,pushbullet]'
backend.http:
enabled: True
pushbullet:
token: YOUR_TOKEN
#!/usr/bin/python3
importargparse
importlogging
importos
importsys
fromplatypush import RedisBus
fromplatypush.message.event.custom import CustomEvent
frommicmon.audio import AudioDevice
frommicmon.model import Model
logger= logging.getLogger('micmon')
defget_args():
parser = argparse.ArgumentParser()
parser.add_argument('model_path',help='Path to the file/directory containing the saved Tensorflow model')
parser.add_argument('-i', help='Input sounddevice (e.g. hw:0,1 or default)', required=True, dest='sound_device')
parser.add_argument('-e', help='Name of theevent that should be raised when a positive event occurs', required=True,dest='event_type')
parser.add_argument('-s', '--sound-server',help='Sound server to be used (available: alsa, pulse)', required=False,default='alsa', dest='sound_server')
parser.add_argument('-P','--positive-label', help='Model output label name/index to indicate a positivesample (default: positive)', required=False, default='positive',dest='positive_label')
parser.add_argument('-N','--negative-label', help='Model output label name/index to indicate a negativesample (default: negative)', required=False, default='negative',dest='negative_label')
parser.add_argument('-l','--sample-duration', help='Length of the FFT audio samples (default: 2seconds)', required=False, type=float, default=2., dest='sample_duration')
parser.add_argument('-r', '--sample-rate',help='Sample rate (default: 44100 Hz)', required=False, type=int,default=44100, dest='sample_rate')
parser.add_argument('-c', '--channels',help='Number of audio recording channels (default: 1)', required=False,type=int, default=1, dest='channels')
parser.add_argument('-f', '--ffmpeg-bin',help='FFmpeg executable path (default: ffmpeg)', required=False,default='ffmpeg', dest='ffmpeg_bin')
parser.add_argument('-v', '--verbose',help='Verbose/debug mode', required=False, action='store_true', dest='debug')
parser.add_argument('-w','--window-duration', help='Duration of the look-back window (default: 10seconds)', required=False, type=float, default=10., dest='window_length')
parser.add_argument('-n','--positive-samples', help='Number of positive samples detected over the windowduration to trigger the event (default: 1)', required=False, type=int,default=1, dest='positive_samples')
opts, args =parser.parse_known_args(sys.argv[1:])
return opts
defmain():
args = get_args()
if args.debug:
logger.setLevel(logging.DEBUG)
model_dir =os.path.abspath(os.path.expanduser(args.model_path))
model = Model.load(model_dir)
window = []
cur_prediction = args.negative_label
bus = RedisBus()
with AudioDevice(system=args.sound_server,
device=args.sound_device,
sample_duration=args.sample_duration,
sample_rate=args.sample_rate,
channels=args.channels,
ffmpeg_bin=args.ffmpeg_bin,
debug=args.debug) assource:
for sample in source:
source.pause() # Pause recording while we process the frame
prediction = model.predict(sample)
logger.debug(f'Sample prediction:{prediction}')
has_change = False
if len(window) < args.window_length:
window += [prediction]
else:
window = window[1:] +[prediction]
positive_samples = len([pred forpred in window if pred == args.positive_label])
if args.positive_samples <=positive_samples and \
prediction ==args.positive_label and \
cur_prediction !=args.positive_label:
cur_prediction =args.positive_label
has_change = True
logging.info(f'Positive samplethreshold detected ({positive_samples}/{len(window)})')
elif args.positive_samples >positive_samples and \
prediction ==args.negative_label and \
cur_prediction !=args.negative_label:
cur_prediction = args.negative_label
has_change = True
logging.info(f'Negative samplethreshold detected ({len(window)-positive_samples}/{len(window)})')
if has_change:
evt = CustomEvent(subtype=args.event_type,state=prediction)
bus.post(evt)
source.resume() # Resume recording
if__name__ == '__main__':
main()
mkdir-p ~/.config/platypush/scripts
cd~/.config/platypush/scripts
#Define the directory as a module
touch__init__.py
#Create a script for the baby-cry events
vibabymonitor.py
babymonitor.py的内容如下:
fromplatypush.context import get_plugin
fromplatypush.event.hook import hook
fromplatypush.message.event.custom import CustomEvent
@hook(CustomEvent,subtype='baby-cry', state='positive')
defon_baby_cry_start(event, **_):
pb = get_plugin('pushbullet')
pb.send_note(title='Baby cry status',body='The baby is crying!')
@hook(CustomEvent,subtype='baby-cry', state='negative')
defon_baby_cry_stop(event, **_):
pb = get_plugin('pushbullet')
pb.send_note(title='Baby cry status',body='The baby stopped crying - good job!')
mkdir-p ~/.config/systemd/user
wget-O ~/.config/systemd/user/platypush.service \
https://raw.githubusercontent.com/BlackLight/platypush/master/examples/systemd/platypush.service
systemctl--user start platypush.service
systemctl--user enable platypush.service
~/.config/systemd/user/babymonitor.service
[Unit]
Description=Monitorto detect my baby's cries
After=network.targetsound.target
[Service]
ExecStart=/home/pi/bin/micmon_detect.py-i plughw:2,0 -e baby-cry -w 10 -n 2 ~/models/sound-detect
Restart=always
RestartSec=10
[Install]
WantedBy=default.target
systemctl--user start babymonitor.service
systemctl--user enable babymonitor.service
婴儿摄像头
[sudo]pip3 install 'platypush[http,camera,picamera]'
camera.pi:
listen_port: 5001
wgethttp://raspberry-pi:8008/camera/pi/photo.jpg
http://raspberry-pi:8008/camera/pi/video.mjpg
mkdir-p ~/.config/platypush/scripts
cd~/.config/platypush/scripts
touch__init__.py
vicamera.py
fromplatypush.context import get_plugin
fromplatypush.event.hook import hook
fromplatypush.message.event.application import ApplicationStartedEvent
@hook(ApplicationStartedEvent)
defon_application_started(event, **_):
cam = get_plugin('camera.pi')
cam.start_streaming()
vlctcp/h264://raspberry-pi:5001