Details | Last modification | View Log | RSS feed
Rev | Author | Line No. | Line |
---|---|---|---|
6 | daniel-mar | 1 | #!/usr/bin/env python3 |
2 | |||
3 | # This script listens to the microphone and detects a doorbell that plays 2 sounds ("ding" "dong") |
||
4 | # Please configure and adjust the script to your needings |
||
5 | |||
6 | import pyaudio |
||
7 | import wave |
||
8 | import numpy as np |
||
9 | import time |
||
10 | import os |
||
11 | |||
12 | CHUNK = 2*4096 # number of data points to read at a time |
||
13 | RATE = 48000 # time resolution of the recording device (Hz) |
||
14 | DEVICE = 1 # Webcam |
||
15 | TARGET_FREQ_DING = 3232 # Hz, in AudaCity, select "Analyze > Spectrum" to find out the frequency of your doorbell |
||
16 | TARGET_FREQ_DONG = 2781 # Hz |
||
17 | DING_TRES = 1.5 |
||
18 | DONG_TRES = 2.4 |
||
19 | DING_DONG_INTERVAL = 4 # max seconds between "ding" and "dong" |
||
20 | |||
21 | #SIMULATE_WAVEFILE = "test/doorbell_test.wav" |
||
22 | SIMULATE_WAVEFILE = "" |
||
23 | DEBUG = 2 |
||
24 | |||
25 | # ------------------------------------------------------------- |
||
26 | |||
27 | p = pyaudio.PyAudio() |
||
28 | |||
29 | if SIMULATE_WAVEFILE == "": |
||
30 | RATE = 48000 # TODO: read from hw-params |
||
31 | DTYPE = np.int16 # TODO: read from hw-params? |
||
32 | stream=p.open(format=pyaudio.paInt16,channels=1,rate=RATE,input=True, input_device_index=DEVICE, frames_per_buffer=CHUNK) |
||
33 | else: |
||
34 | wf = wave.open(SIMULATE_WAVEFILE, 'rb') |
||
35 | DTYPE = np.int16 # TODO: read from wave file? |
||
36 | RATE = wf.getframerate() |
||
37 | |||
38 | col_target_ding = [] |
||
39 | col_target_dong = [] |
||
40 | col_other = [] |
||
41 | |||
42 | last_detected_ding = 0 |
||
43 | last_detected_dong = 0 |
||
44 | max_intensity_ding = 0 |
||
45 | max_intensity_dong = 0 |
||
46 | |||
47 | while True: |
||
48 | if SIMULATE_WAVEFILE == "": |
||
49 | indata = np.fromstring(stream.read(CHUNK),dtype=DTYPE) |
||
50 | else: |
||
51 | d = wf.readframes(CHUNK) |
||
52 | if len(d) == 0: |
||
53 | break; |
||
54 | indata = np.fromstring(d,dtype=DTYPE) |
||
55 | |||
56 | fftData = np.fft.fft(indata) |
||
57 | freqs = np.fft.fftfreq(fftData.size) |
||
58 | magnitudes = fftData[1:int(fftData.size/2)] |
||
59 | |||
60 | # Find out volume of target "ding" frequency |
||
61 | |||
62 | idx = (np.abs(freqs-TARGET_FREQ_DING/RATE)).argmin() |
||
63 | volume_target_ding = np.sum(np.abs(magnitudes[int(idx-1):int(idx+1)])) / 3 |
||
64 | |||
65 | # Find out volume of target "dong" frequency |
||
66 | |||
67 | idx = (np.abs(freqs-TARGET_FREQ_DONG/RATE)).argmin() |
||
68 | volume_target_dong = np.sum(np.abs(magnitudes[int(idx-1):int(idx+1)])) / 3 |
||
69 | |||
70 | # Find out the general volume |
||
71 | |||
72 | volume_other = np.mean(np.abs(magnitudes)) |
||
73 | #volume_other = np.median(np.abs(magnitudes)) |
||
74 | |||
75 | # Filter peaks |
||
76 | |||
77 | col_other.append(volume_other) |
||
78 | while len(col_other) > 5: |
||
79 | col_other.pop(0) |
||
80 | volume_other = (np.sum(col_other) + 10*volume_other) / 15 |
||
81 | |||
82 | col_target_ding.append(volume_target_ding) |
||
83 | while len(col_target_ding) > 5: |
||
84 | col_target_ding.pop(0) |
||
85 | volume_target_ding = (np.sum(col_target_ding) + 10*volume_target_ding) / 15 |
||
86 | |||
87 | col_target_dong.append(volume_target_dong) |
||
88 | while len(col_target_dong) > 5: |
||
89 | col_target_dong.pop(0) |
||
90 | volume_target_dong = (np.sum(col_target_dong) + 10*volume_target_dong) / 15 |
||
91 | |||
92 | # Debug |
||
93 | if DEBUG > 1: |
||
94 | print("Debug: DING", volume_target_ding/volume_other, "DONG", volume_target_dong/volume_other) |
||
95 | |||
96 | # Check ratio |
||
97 | if (volume_target_ding/volume_other > DING_TRES): |
||
98 | if DEBUG > 0: |
||
99 | print("Detected: DING with intensity {}>{}".format(volume_target_ding/volume_other,DING_TRES)) |
||
100 | last_detected_ding = time.time() |
||
101 | max_intensity_ding = max(max_intensity_ding, volume_target_ding/volume_other) |
||
102 | |||
103 | if (volume_target_dong/volume_other > DONG_TRES): |
||
104 | if DEBUG > 0: |
||
105 | print("Detected: DONG with intensity {}>{}".format(volume_target_dong/volume_other,DONG_TRES)) |
||
106 | last_detected_dong = time.time() |
||
107 | max_intensity_dong = max(max_intensity_dong, volume_target_dong/volume_other) |
||
108 | |||
109 | interval = last_detected_dong - last_detected_ding |
||
110 | if (last_detected_ding > 0) and (last_detected_dong > 0) and (interval > 0) and (interval < DING_DONG_INTERVAL): |
||
111 | if DEBUG > 0: |
||
112 | print("Detected: DING DONG! with max intensity ding {} dong {}".format(max_intensity_ding, max_intensity_dong)) |
||
113 | else: |
||
114 | print("DING DONG!") |
||
115 | os.system(os.path.dirname(os.path.abspath(__file__)) + "/detect.py") |
||
116 | last_detected_ding = 0 |
||
117 | last_detected_dong = 0 |
||
118 | max_intensity_ding = 0 |
||
119 | max_intensity_dong = 0 |
||
120 | |||
121 | if SIMULATE_WAVEFILE == "": |
||
122 | stream.close() |
||
123 | p.terminate() |