1 |
#!/usr/bin/env python3 |
2 |
|
3 |
# This script listens to the microphone and detects a doorbell that plays 2 sounds ("ding" "dong") |
4 |
# Please configure and adjust the script to your needings |
5 |
|
6 |
import pyaudio |
7 |
import wave |
8 |
import numpy as np |
9 |
import time |
10 |
import os |
11 |
|
12 |
CHUNK = 2*4096 # number of data points to read at a time |
13 |
RATE = 48000 # time resolution of the recording device (Hz) |
14 |
DEVICE = 1 # Webcam |
15 |
TARGET_FREQ_DING = 3232 # Hz, in AudaCity, select "Analyze > Spectrum" to find out the frequency of your doorbell |
16 |
TARGET_FREQ_DONG = 2781 # Hz |
17 |
DING_TRES = 1.5 |
18 |
DONG_TRES = 2.4 |
19 |
DING_DONG_INTERVAL = 4 # max seconds between "ding" and "dong" |
20 |
|
21 |
#SIMULATE_WAVEFILE = "test/doorbell_test.wav" |
22 |
SIMULATE_WAVEFILE = "" |
23 |
DEBUG = 2 |
24 |
|
25 |
# ------------------------------------------------------------- |
26 |
|
27 |
p = pyaudio.PyAudio() |
28 |
|
29 |
if SIMULATE_WAVEFILE == "": |
30 |
RATE = 48000 # TODO: read from hw-params |
31 |
DTYPE = np.int16 # TODO: read from hw-params? |
32 |
stream=p.open(format=pyaudio.paInt16,channels=1,rate=RATE,input=True, input_device_index=DEVICE, frames_per_buffer=CHUNK) |
33 |
else: |
34 |
wf = wave.open(SIMULATE_WAVEFILE, 'rb') |
35 |
DTYPE = np.int16 # TODO: read from wave file? |
36 |
RATE = wf.getframerate() |
37 |
|
38 |
col_target_ding = [] |
39 |
col_target_dong = [] |
40 |
col_other = [] |
41 |
|
42 |
last_detected_ding = 0 |
43 |
last_detected_dong = 0 |
44 |
max_intensity_ding = 0 |
45 |
max_intensity_dong = 0 |
46 |
|
47 |
while True: |
48 |
if SIMULATE_WAVEFILE == "": |
49 |
indata = np.fromstring(stream.read(CHUNK),dtype=DTYPE) |
50 |
else: |
51 |
d = wf.readframes(CHUNK) |
52 |
if len(d) == 0: |
53 |
break; |
54 |
indata = np.fromstring(d,dtype=DTYPE) |
55 |
|
56 |
fftData = np.fft.fft(indata) |
57 |
freqs = np.fft.fftfreq(fftData.size) |
58 |
magnitudes = fftData[1:int(fftData.size/2)] |
59 |
|
60 |
# Find out volume of target "ding" frequency |
61 |
|
62 |
idx = (np.abs(freqs-TARGET_FREQ_DING/RATE)).argmin() |
63 |
volume_target_ding = np.sum(np.abs(magnitudes[int(idx-1):int(idx+1)])) / 3 |
64 |
|
65 |
# Find out volume of target "dong" frequency |
66 |
|
67 |
idx = (np.abs(freqs-TARGET_FREQ_DONG/RATE)).argmin() |
68 |
volume_target_dong = np.sum(np.abs(magnitudes[int(idx-1):int(idx+1)])) / 3 |
69 |
|
70 |
# Find out the general volume |
71 |
|
72 |
volume_other = np.mean(np.abs(magnitudes)) |
73 |
#volume_other = np.median(np.abs(magnitudes)) |
74 |
|
75 |
# Filter peaks |
76 |
|
77 |
col_other.append(volume_other) |
78 |
while len(col_other) > 5: |
79 |
col_other.pop(0) |
80 |
volume_other = (np.sum(col_other) + 10*volume_other) / 15 |
81 |
|
82 |
col_target_ding.append(volume_target_ding) |
83 |
while len(col_target_ding) > 5: |
84 |
col_target_ding.pop(0) |
85 |
volume_target_ding = (np.sum(col_target_ding) + 10*volume_target_ding) / 15 |
86 |
|
87 |
col_target_dong.append(volume_target_dong) |
88 |
while len(col_target_dong) > 5: |
89 |
col_target_dong.pop(0) |
90 |
volume_target_dong = (np.sum(col_target_dong) + 10*volume_target_dong) / 15 |
91 |
|
92 |
# Debug |
93 |
if DEBUG > 1: |
94 |
print("Debug: DING", volume_target_ding/volume_other, "DONG", volume_target_dong/volume_other) |
95 |
|
96 |
# Check ratio |
97 |
if (volume_target_ding/volume_other > DING_TRES): |
98 |
if DEBUG > 0: |
99 |
print("Detected: DING with intensity {}>{}".format(volume_target_ding/volume_other,DING_TRES)) |
100 |
last_detected_ding = time.time() |
101 |
max_intensity_ding = max(max_intensity_ding, volume_target_ding/volume_other) |
102 |
|
103 |
if (volume_target_dong/volume_other > DONG_TRES): |
104 |
if DEBUG > 0: |
105 |
print("Detected: DONG with intensity {}>{}".format(volume_target_dong/volume_other,DONG_TRES)) |
106 |
last_detected_dong = time.time() |
107 |
max_intensity_dong = max(max_intensity_dong, volume_target_dong/volume_other) |
108 |
|
109 |
interval = last_detected_dong - last_detected_ding |
110 |
if (last_detected_ding > 0) and (last_detected_dong > 0) and (interval > 0) and (interval < DING_DONG_INTERVAL): |
111 |
if DEBUG > 0: |
112 |
print("Detected: DING DONG! with max intensity ding {} dong {}".format(max_intensity_ding, max_intensity_dong)) |
113 |
else: |
114 |
print("DING DONG!") |
115 |
os.system(os.path.dirname(os.path.abspath(__file__)) + "/detect.py") |
116 |
last_detected_ding = 0 |
117 |
last_detected_dong = 0 |
118 |
max_intensity_ding = 0 |
119 |
max_intensity_dong = 0 |
120 |
|
121 |
if SIMULATE_WAVEFILE == "": |
122 |
stream.close() |
123 |
p.terminate() |