Skip to contentSkip to navigationSkip to topbar
Rate this page:
On this page

Consume a real-time Media Stream using WebSockets, Python, and Flask



Meet Media Streams

meet-media-streams page anchor

With Twilio's Media Streams, you can access real-time voice data from a Twilio call. Media Streams will stream the audio from the call for its entire duration to a location of your choice.

In this tutorial, you will learn how to stream audio from a live phone call using Twilio, Python, and Flask. You might want to stream audio to provide real-time sentiment analysis for all calls happening within a call center. While we will dial a specific number in this tutorial, you can imagine this number being populated dynamically from the call center software.

(information)

Info

Want to see the Flask portion of this project in its entirety? Head over to the GitHub repository(link takes you to an external page), where you can clone the project and run it locally.


Twilio Media Streams uses WebSockets to deliver your audio.

A WebSocket is an upgraded HTTP protocol. WebSockets are intended to be used for long-running connections and are ideal for real-time applications. A handshake is made, a connection is created, and, unlike HTTP, multiple messages are expected to be sent over the socket until it is closed. This helps to remove the need for long-polling applications.

The WebSocket interface is included natively in nearly all client-side web browser implementations.

There are numerous WebSocket Server implementations available for just about every web framework. We'll use the Flask-Sockets to help us through this tutorial.


Set up your Python environment

set-up-your-python-environment page anchor

In this tutorial, we're going to use the web framework Flask and the WebSocket package Flask Sockets(link takes you to an external page). Create a virtual environment and install flask-sockets in your terminal:


_10
python3 -m venv venv
_10
source ./venv/bin/activate
_10
pip install flask flask-sockets

Now that the package is installed, we can spin up a Flask web server.


Build your WebSocket server

build-your-websocket-server page anchor

The sockets decorator helps you create a WebSocket route with @socket.route.

Create a @socket decorator

create-a-socket-decorator page anchor

This allows you to respond to named WebSocket paths (e.g., /media)


_57
import base64
_57
import json
_57
import logging
_57
_57
from flask import Flask
_57
from flask_sockets import Sockets
_57
_57
app = Flask(__name__)
_57
sockets = Sockets(app)
_57
_57
HTTP_SERVER_PORT = 5000
_57
_57
@sockets.route('/media')
_57
def echo(ws):
_57
app.logger.info("Connection accepted")
_57
# A lot of messages will be sent rapidly. We'll stop showing after the first one.
_57
has_seen_media = False
_57
message_count = 0
_57
while not ws.closed:
_57
message = ws.receive()
_57
if message is None:
_57
app.logger.info("No message received...")
_57
continue
_57
_57
# Messages are a JSON encoded string
_57
data = json.loads(message)
_57
_57
# Using the event type you can determine what type of message you are receiving
_57
if data['event'] == "connected":
_57
app.logger.info("Connected Message received: {}".format(message))
_57
if data['event'] == "start":
_57
app.logger.info("Start Message received: {}".format(message))
_57
if data['event'] == "media":
_57
if not has_seen_media:
_57
app.logger.info("Media message: {}".format(message))
_57
payload = data['media']['payload']
_57
app.logger.info("Payload is: {}".format(payload))
_57
chunk = base64.b64decode(payload)
_57
app.logger.info("That's {} bytes".format(len(chunk)))
_57
app.logger.info("Additional media messages from WebSocket are being suppressed....")
_57
has_seen_media = True
_57
if data['event'] == "closed":
_57
app.logger.info("Closed Message received: {}".format(message))
_57
break
_57
message_count += 1
_57
_57
app.logger.info("Connection closed. Received a total of {} messages".format(message_count))
_57
_57
_57
if __name__ == '__main__':
_57
app.logger.setLevel(logging.DEBUG)
_57
from gevent import pywsgi
_57
from geventwebsocket.handler import WebSocketHandler
_57
_57
server = pywsgi.WSGIServer(('', HTTP_SERVER_PORT), app, handler_class=WebSocketHandler)
_57
print("Server listening on: http://localhost:" + str(HTTP_SERVER_PORT))
_57
server.serve_forever()

Flask Sockets relies on gevent(link takes you to an external page) for multithreading, so this server startup looks a little more detailed than a typical Flask server setup.

Start your server using a WebSocket handler

start-your-server-using-a-websocket-handler page anchor

_57
import base64
_57
import json
_57
import logging
_57
_57
from flask import Flask
_57
from flask_sockets import Sockets
_57
_57
app = Flask(__name__)
_57
sockets = Sockets(app)
_57
_57
HTTP_SERVER_PORT = 5000
_57
_57
@sockets.route('/media')
_57
def echo(ws):
_57
app.logger.info("Connection accepted")
_57
# A lot of messages will be sent rapidly. We'll stop showing after the first one.
_57
has_seen_media = False
_57
message_count = 0
_57
while not ws.closed:
_57
message = ws.receive()
_57
if message is None:
_57
app.logger.info("No message received...")
_57
continue
_57
_57
# Messages are a JSON encoded string
_57
data = json.loads(message)
_57
_57
# Using the event type you can determine what type of message you are receiving
_57
if data['event'] == "connected":
_57
app.logger.info("Connected Message received: {}".format(message))
_57
if data['event'] == "start":
_57
app.logger.info("Start Message received: {}".format(message))
_57
if data['event'] == "media":
_57
if not has_seen_media:
_57
app.logger.info("Media message: {}".format(message))
_57
payload = data['media']['payload']
_57
app.logger.info("Payload is: {}".format(payload))
_57
chunk = base64.b64decode(payload)
_57
app.logger.info("That's {} bytes".format(len(chunk)))
_57
app.logger.info("Additional media messages from WebSocket are being suppressed....")
_57
has_seen_media = True
_57
if data['event'] == "closed":
_57
app.logger.info("Closed Message received: {}".format(message))
_57
break
_57
message_count += 1
_57
_57
app.logger.info("Connection closed. Received a total of {} messages".format(message_count))
_57
_57
_57
if __name__ == '__main__':
_57
app.logger.setLevel(logging.DEBUG)
_57
from gevent import pywsgi
_57
from geventwebsocket.handler import WebSocketHandler
_57
_57
server = pywsgi.WSGIServer(('', HTTP_SERVER_PORT), app, handler_class=WebSocketHandler)
_57
print("Server listening on: http://localhost:" + str(HTTP_SERVER_PORT))
_57
server.serve_forever()

A typical pattern in most WebSocket server implementations is to continue reading until the WebSocket connection closes:

Read from the connected WebSocket until it is closed

read-from-the-connected-websocket-until-it-is-closed page anchor

_57
import base64
_57
import json
_57
import logging
_57
_57
from flask import Flask
_57
from flask_sockets import Sockets
_57
_57
app = Flask(__name__)
_57
sockets = Sockets(app)
_57
_57
HTTP_SERVER_PORT = 5000
_57
_57
@sockets.route('/media')
_57
def echo(ws):
_57
app.logger.info("Connection accepted")
_57
# A lot of messages will be sent rapidly. We'll stop showing after the first one.
_57
has_seen_media = False
_57
message_count = 0
_57
while not ws.closed:
_57
message = ws.receive()
_57
if message is None:
_57
app.logger.info("No message received...")
_57
continue
_57
_57
# Messages are a JSON encoded string
_57
data = json.loads(message)
_57
_57
# Using the event type you can determine what type of message you are receiving
_57
if data['event'] == "connected":
_57
app.logger.info("Connected Message received: {}".format(message))
_57
if data['event'] == "start":
_57
app.logger.info("Start Message received: {}".format(message))
_57
if data['event'] == "media":
_57
if not has_seen_media:
_57
app.logger.info("Media message: {}".format(message))
_57
payload = data['media']['payload']
_57
app.logger.info("Payload is: {}".format(payload))
_57
chunk = base64.b64decode(payload)
_57
app.logger.info("That's {} bytes".format(len(chunk)))
_57
app.logger.info("Additional media messages from WebSocket are being suppressed....")
_57
has_seen_media = True
_57
if data['event'] == "closed":
_57
app.logger.info("Closed Message received: {}".format(message))
_57
break
_57
message_count += 1
_57
_57
app.logger.info("Connection closed. Received a total of {} messages".format(message_count))
_57
_57
_57
if __name__ == '__main__':
_57
app.logger.setLevel(logging.DEBUG)
_57
from gevent import pywsgi
_57
from geventwebsocket.handler import WebSocketHandler
_57
_57
server = pywsgi.WSGIServer(('', HTTP_SERVER_PORT), app, handler_class=WebSocketHandler)
_57
print("Server listening on: http://localhost:" + str(HTTP_SERVER_PORT))
_57
server.serve_forever()

All messages that are passed over MediaStreams WebSockets are in JSON format.

Python provides a straightforward way to decode JSON:


_57
import base64
_57
import json
_57
import logging
_57
_57
from flask import Flask
_57
from flask_sockets import Sockets
_57
_57
app = Flask(__name__)
_57
sockets = Sockets(app)
_57
_57
HTTP_SERVER_PORT = 5000
_57
_57
@sockets.route('/media')
_57
def echo(ws):
_57
app.logger.info("Connection accepted")
_57
# A lot of messages will be sent rapidly. We'll stop showing after the first one.
_57
has_seen_media = False
_57
message_count = 0
_57
while not ws.closed:
_57
message = ws.receive()
_57
if message is None:
_57
app.logger.info("No message received...")
_57
continue
_57
_57
# Messages are a JSON encoded string
_57
data = json.loads(message)
_57
_57
# Using the event type you can determine what type of message you are receiving
_57
if data['event'] == "connected":
_57
app.logger.info("Connected Message received: {}".format(message))
_57
if data['event'] == "start":
_57
app.logger.info("Start Message received: {}".format(message))
_57
if data['event'] == "media":
_57
if not has_seen_media:
_57
app.logger.info("Media message: {}".format(message))
_57
payload = data['media']['payload']
_57
app.logger.info("Payload is: {}".format(payload))
_57
chunk = base64.b64decode(payload)
_57
app.logger.info("That's {} bytes".format(len(chunk)))
_57
app.logger.info("Additional media messages from WebSocket are being suppressed....")
_57
has_seen_media = True
_57
if data['event'] == "closed":
_57
app.logger.info("Closed Message received: {}".format(message))
_57
break
_57
message_count += 1
_57
_57
app.logger.info("Connection closed. Received a total of {} messages".format(message_count))
_57
_57
_57
if __name__ == '__main__':
_57
app.logger.setLevel(logging.DEBUG)
_57
from gevent import pywsgi
_57
from geventwebsocket.handler import WebSocketHandler
_57
_57
server = pywsgi.WSGIServer(('', HTTP_SERVER_PORT), app, handler_class=WebSocketHandler)
_57
print("Server listening on: http://localhost:" + str(HTTP_SERVER_PORT))
_57
server.serve_forever()

There are four different message types that you will encounter:

  • connected
  • start
  • media
  • stop .

The Start message will contain important information about the stream, like the type of audio, its name, the originating call and any other custom parameters you might have sent.

This information will likely come in handy for whatever service you plan to use with your real-time audio.

You can handle each type by looking at the messages event property.

Handle each message type by using the event property

handle-each-message-type-by-using-the-event-property page anchor

_57
import base64
_57
import json
_57
import logging
_57
_57
from flask import Flask
_57
from flask_sockets import Sockets
_57
_57
app = Flask(__name__)
_57
sockets = Sockets(app)
_57
_57
HTTP_SERVER_PORT = 5000
_57
_57
@sockets.route('/media')
_57
def echo(ws):
_57
app.logger.info("Connection accepted")
_57
# A lot of messages will be sent rapidly. We'll stop showing after the first one.
_57
has_seen_media = False
_57
message_count = 0
_57
while not ws.closed:
_57
message = ws.receive()
_57
if message is None:
_57
app.logger.info("No message received...")
_57
continue
_57
_57
# Messages are a JSON encoded string
_57
data = json.loads(message)
_57
_57
# Using the event type you can determine what type of message you are receiving
_57
if data['event'] == "connected":
_57
app.logger.info("Connected Message received: {}".format(message))
_57
if data['event'] == "start":
_57
app.logger.info("Start Message received: {}".format(message))
_57
if data['event'] == "media":
_57
if not has_seen_media:
_57
app.logger.info("Media message: {}".format(message))
_57
payload = data['media']['payload']
_57
app.logger.info("Payload is: {}".format(payload))
_57
chunk = base64.b64decode(payload)
_57
app.logger.info("That's {} bytes".format(len(chunk)))
_57
app.logger.info("Additional media messages from WebSocket are being suppressed....")
_57
has_seen_media = True
_57
if data['event'] == "closed":
_57
app.logger.info("Closed Message received: {}".format(message))
_57
break
_57
message_count += 1
_57
_57
app.logger.info("Connection closed. Received a total of {} messages".format(message_count))
_57
_57
_57
if __name__ == '__main__':
_57
app.logger.setLevel(logging.DEBUG)
_57
from gevent import pywsgi
_57
from geventwebsocket.handler import WebSocketHandler
_57
_57
server = pywsgi.WSGIServer(('', HTTP_SERVER_PORT), app, handler_class=WebSocketHandler)
_57
print("Server listening on: http://localhost:" + str(HTTP_SERVER_PORT))
_57
server.serve_forever()

The media payload is encoded in base64. Use the built-in Python function b64decode to decode it to bytes.

Decode the base64 encoded payload

decode-the-base64-encoded-payload page anchor

_57
import base64
_57
import json
_57
import logging
_57
_57
from flask import Flask
_57
from flask_sockets import Sockets
_57
_57
app = Flask(__name__)
_57
sockets = Sockets(app)
_57
_57
HTTP_SERVER_PORT = 5000
_57
_57
@sockets.route('/media')
_57
def echo(ws):
_57
app.logger.info("Connection accepted")
_57
# A lot of messages will be sent rapidly. We'll stop showing after the first one.
_57
has_seen_media = False
_57
message_count = 0
_57
while not ws.closed:
_57
message = ws.receive()
_57
if message is None:
_57
app.logger.info("No message received...")
_57
continue
_57
_57
# Messages are a JSON encoded string
_57
data = json.loads(message)
_57
_57
# Using the event type you can determine what type of message you are receiving
_57
if data['event'] == "connected":
_57
app.logger.info("Connected Message received: {}".format(message))
_57
if data['event'] == "start":
_57
app.logger.info("Start Message received: {}".format(message))
_57
if data['event'] == "media":
_57
if not has_seen_media:
_57
app.logger.info("Media message: {}".format(message))
_57
payload = data['media']['payload']
_57
app.logger.info("Payload is: {}".format(payload))
_57
chunk = base64.b64decode(payload)
_57
app.logger.info("That's {} bytes".format(len(chunk)))
_57
app.logger.info("Additional media messages from WebSocket are being suppressed....")
_57
has_seen_media = True
_57
if data['event'] == "closed":
_57
app.logger.info("Closed Message received: {}".format(message))
_57
break
_57
message_count += 1
_57
_57
app.logger.info("Connection closed. Received a total of {} messages".format(message_count))
_57
_57
_57
if __name__ == '__main__':
_57
app.logger.setLevel(logging.DEBUG)
_57
from gevent import pywsgi
_57
from geventwebsocket.handler import WebSocketHandler
_57
_57
server = pywsgi.WSGIServer(('', HTTP_SERVER_PORT), app, handler_class=WebSocketHandler)
_57
print("Server listening on: http://localhost:" + str(HTTP_SERVER_PORT))
_57
server.serve_forever()

Once your code is all in place, start your Flask server by running this command in your terminal:


_10
python app.py

Now your server should be running on your localhost port 5000. Congratulations! Only one thing left to do here: make sure that Twilio can reach your local web server.

We recommend that you make use of an ssh tunnel service like ngrok, which supports the wss scheme. We highly recommend installing ngrok(link takes you to an external page) if you haven't already.

Since our server is running on port 5000, we'll start a tunnel using:


_10
ngrok http 5000

This will generate a random ngrok subdomain. Copy that URL - you'll need it in the next section.


To begin streaming your call's audio with Twilio, you can use the <Stream> TwiML verb.

Create a new TwiML Bin(link takes you to an external page) with the following TwiML:


_10
<?xml version="1.0" encoding="UTF-8"?>
_10
<Response>
_10
<Start>
_10
<Stream url="wss://yourdomain.ngrok.io/media" />
_10
</Start>
_10
<Dial>+15550123456</Dial>
_10
</Response>

You'll need to update the above sample in two key ways:

  1. Replace the phone number nested in the <Dial> tag with your personal phone number, or the number of a friend or family member who can help you see this in action.
  2. Replace the Stream url with your new nrgok subdomain - you can find this in the terminal if ngrok is running. The url attribute must use the wss scheme (WebSockets Secure), but we're in the clear since ngrok itself uses the wss scheme.

The <Start> tag will asynchronously fork your media and immediately continue onto the next TwiML statement. Streaming will continue for the entire duration of the call unless <Stop><Stream>is encountered.

Save your new TiwML Bin, then wire it up to one of your incoming phone numbers by selecting TwiML Bin in the A Call Comes In section and then selecting your bin from the dropdown. Now, when a call comes into that number, Twilio will stream the real-time data straight to your web server!

(information)

Info

By default, Twilio will stream the incoming track - in our case, the incoming phone call. You can always change this by using the track attribute.


Find a friend or family member willing to help you test your streaming web server (or use a second phone that is different than the one you listed in your TwiML bin).

One of you should call your Twilio phone number, which will then connect the call to the number you specified in your TwiML bin. Keep an eye on your console output and start talking - you should see your conversation appear in the console as you talk!


Real-time access to your audio data opens up new doors of innovation for you. From real-time visual effects to bioinformatics, you are certain to benefit from this access to live data.

There are several services that you can connect with to provide live speech to text transcriptions. Now that you have the text in real-time, you can perform all sorts of text-based operations like translations, sentiment analysis, and keyword detection.

You might want to pipe your real-time data into a number of external providers. You can use Google's Cloud Speech to Text(link takes you to an external page), Amazon's Transcribe(link takes you to an external page), or IBM Watson Speech to Text(link takes you to an external page). All of these providers have a language translation service available as well.

Our community has created a starter set for many languages(link takes you to an external page) and services. Please check it out for inspiration in building your real-time applications, and consider contributing.

We can't wait to see what you build!


Rate this page: