Consume a real-time Media Stream using WebSockets, Python, and Flask
With Twilio's Media Streams, you can access real-time voice data from a Twilio call. Media Streams will stream the audio from the call for its entire duration to a location of your choice.
In this tutorial, you will learn how to stream audio from a live phone call using Twilio, Python, and Flask. You might want to stream audio to provide real-time sentiment analysis for all calls happening within a call center. While we will dial a specific number in this tutorial, you can imagine this number being populated dynamically from the call center software.
Info
Want to see the Flask portion of this project in its entirety? Head over to the GitHub repository, where you can clone the project and run it locally.
Twilio Media Streams uses WebSockets to deliver your audio.
A WebSocket is an upgraded HTTP protocol. WebSockets are intended to be used for long-running connections and are ideal for real-time applications. A handshake is made, a connection is created, and, unlike HTTP, multiple messages are expected to be sent over the socket until it is closed. This helps to remove the need for long-polling applications.
The WebSocket interface is included natively in nearly all client-side web browser implementations.
There are numerous WebSocket Server implementations available for just about every web framework. We'll use the Flask-Sockets to help us through this tutorial.
In this tutorial, we're going to use the web framework Flask and the WebSocket package Flask Sockets. Create a virtual environment and install flask-sockets in your terminal:
1python3 -m venv venv2source ./venv/bin/activate3pip install flask flask-sockets
Now that the package is installed, we can spin up a Flask web server.
The sockets decorator helps you create a WebSocket route with @socket.route.
This allows you to respond to named WebSocket paths (e.g., /media)
1import base642import json3import logging45from flask import Flask6from flask_sockets import Sockets78app = Flask(__name__)9sockets = Sockets(app)1011HTTP_SERVER_PORT = 50001213@sockets.route('/media')14def echo(ws):15app.logger.info("Connection accepted")16# A lot of messages will be sent rapidly. We'll stop showing after the first one.17has_seen_media = False18message_count = 019while not ws.closed:20message = ws.receive()21if message is None:22app.logger.info("No message received...")23continue2425# Messages are a JSON encoded string26data = json.loads(message)2728# Using the event type you can determine what type of message you are receiving29if data['event'] == "connected":30app.logger.info("Connected Message received: {}".format(message))31if data['event'] == "start":32app.logger.info("Start Message received: {}".format(message))33if data['event'] == "media":34if not has_seen_media:35app.logger.info("Media message: {}".format(message))36payload = data['media']['payload']37app.logger.info("Payload is: {}".format(payload))38chunk = base64.b64decode(payload)39app.logger.info("That's {} bytes".format(len(chunk)))40app.logger.info("Additional media messages from WebSocket are being suppressed....")41has_seen_media = True42if data['event'] == "closed":43app.logger.info("Closed Message received: {}".format(message))44break45message_count += 14647app.logger.info("Connection closed. Received a total of {} messages".format(message_count))484950if __name__ == '__main__':51app.logger.setLevel(logging.DEBUG)52from gevent import pywsgi53from geventwebsocket.handler import WebSocketHandler5455server = pywsgi.WSGIServer(('', HTTP_SERVER_PORT), app, handler_class=WebSocketHandler)56print("Server listening on: http://localhost:" + str(HTTP_SERVER_PORT))57server.serve_forever()
Flask Sockets relies on gevent for multithreading, so this server startup looks a little more detailed than a typical Flask server setup.
1import base642import json3import logging45from flask import Flask6from flask_sockets import Sockets78app = Flask(__name__)9sockets = Sockets(app)1011HTTP_SERVER_PORT = 50001213@sockets.route('/media')14def echo(ws):15app.logger.info("Connection accepted")16# A lot of messages will be sent rapidly. We'll stop showing after the first one.17has_seen_media = False18message_count = 019while not ws.closed:20message = ws.receive()21if message is None:22app.logger.info("No message received...")23continue2425# Messages are a JSON encoded string26data = json.loads(message)2728# Using the event type you can determine what type of message you are receiving29if data['event'] == "connected":30app.logger.info("Connected Message received: {}".format(message))31if data['event'] == "start":32app.logger.info("Start Message received: {}".format(message))33if data['event'] == "media":34if not has_seen_media:35app.logger.info("Media message: {}".format(message))36payload = data['media']['payload']37app.logger.info("Payload is: {}".format(payload))38chunk = base64.b64decode(payload)39app.logger.info("That's {} bytes".format(len(chunk)))40app.logger.info("Additional media messages from WebSocket are being suppressed....")41has_seen_media = True42if data['event'] == "closed":43app.logger.info("Closed Message received: {}".format(message))44break45message_count += 14647app.logger.info("Connection closed. Received a total of {} messages".format(message_count))484950if __name__ == '__main__':51app.logger.setLevel(logging.DEBUG)52from gevent import pywsgi53from geventwebsocket.handler import WebSocketHandler5455server = pywsgi.WSGIServer(('', HTTP_SERVER_PORT), app, handler_class=WebSocketHandler)56print("Server listening on: http://localhost:" + str(HTTP_SERVER_PORT))57server.serve_forever()
A typical pattern in most WebSocket server implementations is to continue reading until the WebSocket connection closes:
1import base642import json3import logging45from flask import Flask6from flask_sockets import Sockets78app = Flask(__name__)9sockets = Sockets(app)1011HTTP_SERVER_PORT = 50001213@sockets.route('/media')14def echo(ws):15app.logger.info("Connection accepted")16# A lot of messages will be sent rapidly. We'll stop showing after the first one.17has_seen_media = False18message_count = 019while not ws.closed:20message = ws.receive()21if message is None:22app.logger.info("No message received...")23continue2425# Messages are a JSON encoded string26data = json.loads(message)2728# Using the event type you can determine what type of message you are receiving29if data['event'] == "connected":30app.logger.info("Connected Message received: {}".format(message))31if data['event'] == "start":32app.logger.info("Start Message received: {}".format(message))33if data['event'] == "media":34if not has_seen_media:35app.logger.info("Media message: {}".format(message))36payload = data['media']['payload']37app.logger.info("Payload is: {}".format(payload))38chunk = base64.b64decode(payload)39app.logger.info("That's {} bytes".format(len(chunk)))40app.logger.info("Additional media messages from WebSocket are being suppressed....")41has_seen_media = True42if data['event'] == "closed":43app.logger.info("Closed Message received: {}".format(message))44break45message_count += 14647app.logger.info("Connection closed. Received a total of {} messages".format(message_count))484950if __name__ == '__main__':51app.logger.setLevel(logging.DEBUG)52from gevent import pywsgi53from geventwebsocket.handler import WebSocketHandler5455server = pywsgi.WSGIServer(('', HTTP_SERVER_PORT), app, handler_class=WebSocketHandler)56print("Server listening on: http://localhost:" + str(HTTP_SERVER_PORT))57server.serve_forever()
All messages that are passed over MediaStreams WebSockets are in JSON format.
Python provides a straightforward way to decode JSON:
1import base642import json3import logging45from flask import Flask6from flask_sockets import Sockets78app = Flask(__name__)9sockets = Sockets(app)1011HTTP_SERVER_PORT = 50001213@sockets.route('/media')14def echo(ws):15app.logger.info("Connection accepted")16# A lot of messages will be sent rapidly. We'll stop showing after the first one.17has_seen_media = False18message_count = 019while not ws.closed:20message = ws.receive()21if message is None:22app.logger.info("No message received...")23continue2425# Messages are a JSON encoded string26data = json.loads(message)2728# Using the event type you can determine what type of message you are receiving29if data['event'] == "connected":30app.logger.info("Connected Message received: {}".format(message))31if data['event'] == "start":32app.logger.info("Start Message received: {}".format(message))33if data['event'] == "media":34if not has_seen_media:35app.logger.info("Media message: {}".format(message))36payload = data['media']['payload']37app.logger.info("Payload is: {}".format(payload))38chunk = base64.b64decode(payload)39app.logger.info("That's {} bytes".format(len(chunk)))40app.logger.info("Additional media messages from WebSocket are being suppressed....")41has_seen_media = True42if data['event'] == "closed":43app.logger.info("Closed Message received: {}".format(message))44break45message_count += 14647app.logger.info("Connection closed. Received a total of {} messages".format(message_count))484950if __name__ == '__main__':51app.logger.setLevel(logging.DEBUG)52from gevent import pywsgi53from geventwebsocket.handler import WebSocketHandler5455server = pywsgi.WSGIServer(('', HTTP_SERVER_PORT), app, handler_class=WebSocketHandler)56print("Server listening on: http://localhost:" + str(HTTP_SERVER_PORT))57server.serve_forever()
There are four different message types that you will encounter:
The Start message will contain important information about the stream, like the type of audio, its name, the originating call and any other custom parameters you might have sent.
This information will likely come in handy for whatever service you plan to use with your real-time audio.
You can handle each type by looking at the messages event property.
1import base642import json3import logging45from flask import Flask6from flask_sockets import Sockets78app = Flask(__name__)9sockets = Sockets(app)1011HTTP_SERVER_PORT = 50001213@sockets.route('/media')14def echo(ws):15app.logger.info("Connection accepted")16# A lot of messages will be sent rapidly. We'll stop showing after the first one.17has_seen_media = False18message_count = 019while not ws.closed:20message = ws.receive()21if message is None:22app.logger.info("No message received...")23continue2425# Messages are a JSON encoded string26data = json.loads(message)2728# Using the event type you can determine what type of message you are receiving29if data['event'] == "connected":30app.logger.info("Connected Message received: {}".format(message))31if data['event'] == "start":32app.logger.info("Start Message received: {}".format(message))33if data['event'] == "media":34if not has_seen_media:35app.logger.info("Media message: {}".format(message))36payload = data['media']['payload']37app.logger.info("Payload is: {}".format(payload))38chunk = base64.b64decode(payload)39app.logger.info("That's {} bytes".format(len(chunk)))40app.logger.info("Additional media messages from WebSocket are being suppressed....")41has_seen_media = True42if data['event'] == "closed":43app.logger.info("Closed Message received: {}".format(message))44break45message_count += 14647app.logger.info("Connection closed. Received a total of {} messages".format(message_count))484950if __name__ == '__main__':51app.logger.setLevel(logging.DEBUG)52from gevent import pywsgi53from geventwebsocket.handler import WebSocketHandler5455server = pywsgi.WSGIServer(('', HTTP_SERVER_PORT), app, handler_class=WebSocketHandler)56print("Server listening on: http://localhost:" + str(HTTP_SERVER_PORT))57server.serve_forever()
The media payload is encoded in base64. Use the built-in Python function b64decode to decode it to bytes.
1import base642import json3import logging45from flask import Flask6from flask_sockets import Sockets78app = Flask(__name__)9sockets = Sockets(app)1011HTTP_SERVER_PORT = 50001213@sockets.route('/media')14def echo(ws):15app.logger.info("Connection accepted")16# A lot of messages will be sent rapidly. We'll stop showing after the first one.17has_seen_media = False18message_count = 019while not ws.closed:20message = ws.receive()21if message is None:22app.logger.info("No message received...")23continue2425# Messages are a JSON encoded string26data = json.loads(message)2728# Using the event type you can determine what type of message you are receiving29if data['event'] == "connected":30app.logger.info("Connected Message received: {}".format(message))31if data['event'] == "start":32app.logger.info("Start Message received: {}".format(message))33if data['event'] == "media":34if not has_seen_media:35app.logger.info("Media message: {}".format(message))36payload = data['media']['payload']37app.logger.info("Payload is: {}".format(payload))38chunk = base64.b64decode(payload)39app.logger.info("That's {} bytes".format(len(chunk)))40app.logger.info("Additional media messages from WebSocket are being suppressed....")41has_seen_media = True42if data['event'] == "closed":43app.logger.info("Closed Message received: {}".format(message))44break45message_count += 14647app.logger.info("Connection closed. Received a total of {} messages".format(message_count))484950if __name__ == '__main__':51app.logger.setLevel(logging.DEBUG)52from gevent import pywsgi53from geventwebsocket.handler import WebSocketHandler5455server = pywsgi.WSGIServer(('', HTTP_SERVER_PORT), app, handler_class=WebSocketHandler)56print("Server listening on: http://localhost:" + str(HTTP_SERVER_PORT))57server.serve_forever()
Once your code is all in place, start your Flask server by running this command in your terminal:
python app.py
Now your server should be running on your localhost port 5000. Congratulations! Only one thing left to do here: make sure that Twilio can reach your local web server.
We recommend that you make use of an ssh tunnel service like ngrok, which supports the wss scheme. We highly recommend installing ngrok if you haven't already.
Since our server is running on port 5000, we'll start a tunnel using:
ngrok http 5000
This will generate a random ngrok subdomain. Copy that URL - you'll need it in the next section.
To begin streaming your call's audio with Twilio, you can use the <Stream> TwiML verb.
Create a new TwiML Bin with the following TwiML:
1<?xml version="1.0" encoding="UTF-8"?>2<Response>3<Start>4<Stream url="wss://yourdomain.ngrok.io/media" />5</Start>6<Dial>+15550123456</Dial>7</Response>8
You'll need to update the above sample in two key ways:
- Replace the phone number nested in the <Dial>tag with your personal phone number, or the number of a friend or family member who can help you see this in action.
- Replace the Stream urlwith your new ngrok subdomain - you can find this in the terminal if ngrok is running. Theurlattribute must use thewssscheme (WebSockets Secure), but we're in the clear since ngrok itself uses thewssscheme.
The <Start> tag will asynchronously fork your media and immediately continue onto the next TwiML statement. Streaming will continue for the entire duration of the call unless <Stop><Stream> is encountered.
Save your new TwiML Bin, then wire it up to one of your incoming phone numbers by selecting TwiML Bin in the A Call Comes In section and then selecting your bin from the dropdown. Now, when a call comes into that number, Twilio will stream the real-time data straight to your web server!
Info
By default, Twilio will stream the incoming track - in our case, the incoming phone call. You can always change this by using the track attribute.
Find a friend or family member willing to help you test your streaming web server (or use a second phone that is different than the one you listed in your TwiML bin).
One of you should call your Twilio phone number, which will then connect the call to the number you specified in your TwiML bin. Keep an eye on your console output and start talking - you should see your conversation appear in the console as you talk!
Real-time access to your audio data opens up new doors of innovation for you. From real-time visual effects to bioinformatics, you are certain to benefit from this access to live data.
There are several services that you can connect with to provide live speech to text transcriptions. Now that you have the text in real-time, you can perform all sorts of text-based operations like translations, sentiment analysis, and keyword detection.
You might want to pipe your real-time data into a number of external providers. You can use Google's Cloud Speech to Text, Amazon's Transcribe, or IBM Watson Speech to Text. All of these providers have a language translation service available as well.
Our community has created a starter set for many languages and services. Check it out for inspiration in building your real-time applications, and consider contributing.
Let's build something amazing.