Skip to contentSkip to navigationSkip to topbar
Rate this page:

A Script to Get a Kinesis Stream Test Message


Here is a Bash script you can ue to retrieve a test message from a Kinesis Stream. The script can be modified to read from either the oldest message (TRIM_HORIZON), the latest message (LATEST), or go back over the last hour (AT_TIMESTAMP). The default is LATEST.


_57
#!/bin/bash
_57
_57
JQ_CHECK=$(which jq)
_57
if [ -z "$JQ_CHECK" ]; then
_57
echo
_57
echo "This script requires the jq JSON processor. Please install for your OS from https://stedolan.github.io/jq/download/"
_57
echo
_57
exit 1
_57
fi
_57
_57
if [ $# -ne 1 ]; then
_57
echo
_57
echo "usage: $0 <stream_name>"
_57
echo
_57
exit 1
_57
fi
_57
_57
# Set the stream name
_57
STREAM_NAME=$1
_57
_57
# Choose the iterator type:
_57
# TRIM HORIZON is for starting at the begining of the Kinesis Stream.
_57
# This can take a while if you have a lot of records.
_57
# To use TRIM HORIZON, uncomment the following line:
_57
# TYPE=TRIM_HORIZON
_57
_57
# AT_TIMESTAMP allows you to go back to a point in time. This is set for going back one hour
_57
# To use AT_TIMESTAMP, uncomment the following two lines:
_57
# TIMESTAMP=$(($(date +%s) - 3600)).000
_57
# TYPE="AT_TIMESTAMP --timestamp $TIMESTAMP"
_57
_57
# LATEST means start at the most current point in the stream and read forward
_57
TYPE=LATEST
_57
_57
# Get a list of shards
_57
SHARDS=$(aws kinesis list-shards --stream-name $STREAM_NAME | jq -r .Shards[].ShardId)
_57
_57
# Get all the starting points
_57
SHARD_ITERATOR=()
_57
i=0
_57
for shard in $SHARDS ; do
_57
SHARD_ITERATOR[$i]=$(aws kinesis get-shard-iterator --shard-id $shard --shard-iterator-type $TYPE --stream-name $STREAM_NAME --query 'ShardIterator')
_57
i=$((i+1))
_57
done
_57
_57
# Start getting events from all shards and display them
_57
while [ 1 ] ; do
_57
len=${#SHARD_ITERATOR[@]}
_57
for (( i=0; i < $len; i++ )); do
_57
DATA=$(aws kinesis get-records --limit 50 --shard-iterator ${SHARD_ITERATOR[$i]})
_57
SHARD_ITERATOR[$i]=$(echo $DATA | jq -r .NextShardIterator)
_57
ROWS=$(echo $DATA | jq -r .Records[].Data?)
_57
for row in $ROWS; do
_57
echo $row | base64 -d | jq .
_57
done
_57
done
_57
done


Rate this page: