Blog Post

How to Propagate OpenTelemetry Trace Headers Over AWS Kinesis: Part 3

Continuing our series on tracing with OpenTelemetry in AWS Kinesis, we now explore a refined approach to overcome the challenges faced in our initial attempt with the PartitionKey parameter.

Continuing our series on tracing with OpenTelemetry in AWS Kinesis, we now explore a refined approach to overcome the challenges faced in our initial attempt with the PartitionKey parameter.  

Second attempt and solution: Propagate trace context through “PartitionKey” but partition with “ExplicitHashKey” parameter  

In our initial attempt, we were able to propagate the traceparent with the PartitionKey parameter, but while doing this, we also disrupted the correct partition assignment of the events. So, when propagating the traceparent with the PartitionKey parameter, how can we ensure that the events are assigned to the correct shard in accordance with the original PartitionKey value given by the user? The answer here is the ExplicitHashKey parameter. But before getting into the solution, let's take a look at key space partitioning in the AWS Kinesis service.  

Here is a statement about how PartitionKey is used while partitioning records into shards from the AWS Kinesis API Reference documentation:

The partition key is used by Kinesis Data Streams to distribute data across shards. Kinesis Data Streams segregates the data records that belong to a stream into multiple shards, using the partition key associated with each data record to determine the shard to which a given data record belongs.

Partition keys are Unicode strings, with a maximum length limit of 256 characters for each key. An MD5 hash function is used to map partition keys to 128-bit integer values and to map associated data records to shards using the hash key ranges of the shards. You can override hashing the partition key to determine the shard by explicitly specifying a hash value using the ExplicitHashKey parameter.

So, the hash key to be used while partitioning is calculated from PartitionKey as follows:

HashKey = Int128(MD5())

And, this means the range of hash key (and the Kinesis key space) is:  

[0, 2^128) => [0, 340282366920938463463374607431768211455]

How creating, splitting, and merging Kinesis shards affects key ranges

Let’s play a little bit with Kinesis shards by creating, splitting, and merging them to understand how key ranges of shards are affected:

#1 - Create events stream:

> aws kinesis create-stream --stream-name events --shard-count 1

#2 - Verify that the events stream has been created:

> aws kinesis describe-stream --stream-name events 

{ 
    "StreamDescription": { 
        "Shards": [ 
            { 
                "ShardId": "shardId-000000000000", 
                "HashKeyRange": { 
                    "StartingHashKey": "0", 
                    "EndingHashKey": "340282366920938463463374607431768211455" 
                }, 
                "SequenceNumberRange": { 
                    "StartingSequenceNumber": "49646421264737784557324905820411013094940000525450477570" 
                } 
            } 
        ], 
        "StreamARN": "arn:aws:kinesis:us-west-2:************:stream/events", 
        "StreamName": "events", 
        "StreamStatus": "ACTIVE", 
        "RetentionPeriodHours": 24, 
        "EnhancedMonitoring": [ 
            { 
                "ShardLevelMetrics": [] 
            } 
        ], 
        "EncryptionType": "NONE", 
        "KeyId": null, 
        "StreamCreationTimestamp": "2023-11-14T12:31:26+03:00" 
    } 
}

As you can see, whole key space [0, 340282366920938463463374607431768211455] is assigned to the only single shard shardId-000000000000

#3 - Let’s increase shard count to 2:

> aws kinesis update-shard-count --stream-name events --target-shard-count 2 --scaling-type UNIFORM_SCALING 

{ 
    "StreamName": "events", 
    "CurrentShardCount": 1, 
    "TargetShardCount": 2, 
    "StreamARN": "arn:aws:kinesis:us-west-2:************:stream/events" 
} 

Increasing shard count triggers splitting shard shardId-000000000000 into children shards (shardId-000000000001 and shardId-000000000002):

> aws kinesis describe-stream --stream-name events 

{ 
    "StreamDescription": { 
        "Shards": [ 
            { 
                "ShardId": "shardId-000000000000", 
                "HashKeyRange": { 
                    "StartingHashKey": "0", 
                    "EndingHashKey": "340282366920938463463374607431768211455" 
                }, 
                "SequenceNumberRange": { 
                    "StartingSequenceNumber": "49646421264737784557324905820411013094940000525450477570", 
                    "EndingSequenceNumber": "49646421264748934929924171131980572028256847859579617282" 
                } 
            }, 
            { 
                "ShardId": "shardId-000000000001", 
                "ParentShardId": "shardId-000000000000", 
                "HashKeyRange": { 
                    "StartingHashKey": "0", 
                    "EndingHashKey": "170141183460469231731687303715884105727" 
                }, 
                "SequenceNumberRange": { 
                    "StartingSequenceNumber": "49646421980524803194562316794282962492134597634174222354" 
                } 
            }, 
            { 
                "ShardId": "shardId-000000000002", 
                "ParentShardId": "shardId-000000000000", 
                "HashKeyRange": { 
                    "StartingHashKey": "170141183460469231731687303715884105728", 
                    "EndingHashKey": "340282366920938463463374607431768211455" 
                }, 
                "SequenceNumberRange": { 
                    "StartingSequenceNumber": "49646421980547103939760847417424498210407245995680202786" 
                } 
            } 
        ], 
        "StreamARN": "arn:aws:kinesis:us-west-2:************:stream/events", 
        "StreamName": "events", 
        "StreamStatus": "ACTIVE", 
        "RetentionPeriodHours": 24, 
        "EnhancedMonitoring": [ 
            { 
                "ShardLevelMetrics": [] 
            } 
        ], 
        "EncryptionType": "NONE", 
        "KeyId": null, 
        "StreamCreationTimestamp": "2023-11-14T12:31:26+03:00" 
    } 
} 

As seen from the response, the hash key range of shards is immutable. When a shard is split, the parent shard (shardId-000000000000 here) is still available but has switched to the CLOSED state here and its entire hash key range has been taken over (equally divided in half) by its children shards shardId-000000000001 ([0, 170141183460469231731687303715884105727]) and shardId-000000000002 ([170141183460469231731687303715884105728, 340282366920938463463374607431768211455]).  

The CLOSED state of a shard can be identified by the presence of an EndingSequenceNumber property in the description of that shard.

#4 - Now, let’s see what will happen when we decrease shard count back to 1:

> aws kinesis update-shard-count --stream-name events --target-shard-count 1 --scaling-type UNIFORM_SCALING 

{ 
    "StreamName": "events", 
    "CurrentShardCount": 2, 
    "TargetShardCount": 1, 
    "StreamARN": "arn:aws:kinesis:us-west-2:************:stream/events" 
} 

Decreasing shard count triggers merging shards shardId-000000000001 and shardId-000000000002 back to a new parent shard (shardId-000000000003):

> aws kinesis describe-stream --stream-name events  

{ 
    "StreamDescription": { 
        "Shards": [ 
            { 
                "ShardId": "shardId-000000000000", 
                "HashKeyRange": { 
                    "StartingHashKey": "0", 
                    "EndingHashKey": "340282366920938463463374607431768211455" 
                }, 
                "SequenceNumberRange": { 
                    "StartingSequenceNumber": "49646421264737784557324905820411013094940000525450477570", 
                    "EndingSequenceNumber": "49646421264748934929924171131980572028256847859579617282" 
                } 
            }, 
            { 
                "ShardId": "shardId-000000000001", 
                "ParentShardId": "shardId-000000000000", 
                "HashKeyRange": { 
                    "StartingHashKey": "0", 
                    "EndingHashKey": "170141183460469231731687303715884105727" 
                }, 
                "SequenceNumberRange": { 
                    "StartingSequenceNumber": "49646421980524803194562316794282962492134597634174222354", 
                    "EndingSequenceNumber": "49646421980535953567161582105852521425451370064073719826" 
                } 
            }, 
            { 
                "ShardId": "shardId-000000000002", 
                "ParentShardId": "shardId-000000000000", 
                "HashKeyRange": { 
                    "StartingHashKey": "170141183460469231731687303715884105728", 
                    "EndingHashKey": "340282366920938463463374607431768211455" 
                }, 
                "SequenceNumberRange": { 
                    "StartingSequenceNumber": "49646421980547103939760847417424498210407245995680202786", 
                    "EndingSequenceNumber": "49646421980558254312360112728994057143724018425579700258" 
                } 
            }, 
            { 
                "ShardId": "shardId-000000000003", 
                "ParentShardId": "shardId-000000000001", 
                "AdjacentParentShardId": "shardId-000000000002", 
                "HashKeyRange": { 
                    "StartingHashKey": "0", 
                    "EndingHashKey": "340282366920938463463374607431768211455" 
                }, 
                "SequenceNumberRange": { 
                    "StartingSequenceNumber": "49646422307409126314624190802913520932614343535875850290" 
                } 
            } 
        ], 
        "StreamARN": "arn:aws:kinesis:us-west-2:************:stream/events", 
        "StreamName": "events", 
        "StreamStatus": "ACTIVE", 
        "RetentionPeriodHours": 24, 
        "EnhancedMonitoring": [ 
            { 
                "ShardLevelMetrics": [] 
            } 
        ], 
        "EncryptionType": "NONE", 
        "KeyId": null, 
        "StreamCreationTimestamp": "2023-11-14T12:31:26+03:00" 
    } 
} 

As in shard splitting, closed shards shardId-000000000001 and shardId-000000000002 still exist, but in CLOSED state because  they have EndingSequenceNumber property which indicates that they are closed.

And the newly created shard shardId-000000000003 has its history inherited from its parent shards shardId-000000000001 and shardId-000000000002. It points back to shardId-000000000001 as its ParentShardId, as well as to the shardId-000000000002 as its AdjacentParentShardID that also helped to derive it.

On the contrary of splitting, after splitting, newly created shard gets its own hash key range [0, 340282366920938463463374607431768211455] as merge of its parents (shardId-000000000001 and shardId-000000000002) hash key ranges ([0, 170141183460469231731687303715884105727] and [170141183460469231731687303715884105728, 340282366920938463463374607431768211455])

How to use the ExplicitKey parameter

Ok, let's get back to the original problem.

We are using the PartitionKey parameter to propagate the transparent header and now we have to find a way to specify the hash key based on the original PartitionKey parameter (given by the user) directly, then the AWS will put the event into the correct shard.  

As we mentioned in the beginning of this section, we can use the ExplicitHashKey parameter to specify the hash key explicitly.

Let’s remember the hash key calculation based on the PartitionKey:

HashKey = Int128(MD5())

And here its implementation:

private String toHashKey(String partitionKey) throws Exception { 
     MessageDigest md = MessageDigest.getInstance("MD5"); 
     md.update(partitionKey.getBytes()); 
     byte[] digest = md.digest(); 
     BigInteger bi = new BigInteger(1, digest); 
     return bi.toString(); 
} 

Ok, let’s test and verify our custom hash key calculation algorithm whether AWS Kinesis puts the records to the correct shards when partitions are specified by custom generated hash key through ExplicitHashKey parameter.

#1 - First, we will create a Kinesis stream with 5 shards:

> aws kinesis create-stream --stream-name events --shard-count 5

#2 - Then, let’s see how the hash keys are split across shards:

> aws kinesis describe-stream --stream-name events  

{     
    "StreamDescription": { 
        "Shards": [ 
            { 
                "ShardId": "shardId-000000000000", 
                "HashKeyRange": { 
                    "StartingHashKey": "0", 
                    "EndingHashKey": "68056473384187692692674921486353642290" 
                }, 
                "SequenceNumberRange": { 
                    "StartingSequenceNumber": "49646447835808079664180011482520906336196071965037953026" 
                } 
            }, 
            { 
                "ShardId": "shardId-000000000001", 
                "HashKeyRange": { 
                    "StartingHashKey": "68056473384187692692674921486353642291", 
                    "EndingHashKey": "136112946768375385385349842972707284581" 
                }, 
                "SequenceNumberRange": { 
                    "StartingSequenceNumber": "49646447835830380409378542105662442054468720326543933458" 
                } 
            }, 
            { 
                "ShardId": "shardId-000000000002", 
                "HashKeyRange": { 
                    "StartingHashKey": "136112946768375385385349842972707284582", 
                    "EndingHashKey": "204169420152563078078024764459060926872" 
                }, 
                "SequenceNumberRange": { 
                    "StartingSequenceNumber": "49646447835852681154577072728803977772741368688049913890" 
                } 
            }, 
            { 
                "ShardId": "shardId-000000000003", 
                "HashKeyRange": { 
                    "StartingHashKey": "204169420152563078078024764459060926873", 
                    "EndingHashKey": "272225893536750770770699685945414569163" 
                }, 
                "SequenceNumberRange": { 
                    "StartingSequenceNumber": "49646447835874981899775603351945513491014017049555894322" 
                } 
            }, 
            { 
                "ShardId": "shardId-000000000004", 
                "HashKeyRange": { 
                    "StartingHashKey": "272225893536750770770699685945414569164", 
                    "EndingHashKey": "340282366920938463463374607431768211455" 
                }, 
                "SequenceNumberRange": { 
                    "StartingSequenceNumber": "49646447835897282644974133975087049209286665411061874754" 
                } 
            } 
        ], 
        "StreamARN": "arn:aws:kinesis:us-west-2:************:stream/test", 
        "StreamName": "test", 
        "StreamStatus": "ACTIVE", 
        "RetentionPeriodHours": 24, 
        "EnhancedMonitoring": [ 
            { 
                "ShardLevelMetrics": [] 
            } 
        ], 
        "EncryptionType": "NONE", 
        "KeyId": null, 
        "StreamCreationTimestamp": "2023-11-15T09:12:34+03:00" 
    } 
}

Here are the hash key ranges for all the 5 shards:

Table 1 - Hash Key Ranges of All Shards
Shard ID Starting Hash Key Ending Hash Key
000000000000 0 68056473384187692692674921486353642290
000000000001 68056473384187692692674921486353642291 136112946768375385385349842972707284581
000000000002 136112946768375385385349842972707284582 204169420152563078078024764459060926872
000000000003 204169420152563078078024764459060926873 272225893536750770770699685945414569163
000000000004 272225893536750770770699685945414569164 340282366920938463463374607431768211455

#3 - Then, send 20 events from the producer application with different event group IDs (group-1, …, group-20) by using group ID as partition key without any trace context propagation.

Here are the partition assignment results we collected from the consumer function logs:

Table 2 - Event Group ID Shard Assignments with PartitionKey
Group ID Assigned Shard ID
group-1 000000000000
group-2 000000000004
group-3 000000000004
group-4 000000000003
group-5 000000000001
group-6 000000000002
group-7 000000000001
group-8 000000000001
group-9 000000000001
group-10 000000000002
group-11 000000000001
group-12 000000000004
group-13 000000000001
group-14 000000000003
group-15 000000000001
group-16 000000000001
group-17 000000000003
group-18 000000000004
group-19 000000000004
group-20 000000000002

#4 - Then, to verify our hash key generator algorithm based on partition key (event group ID here), run our hash key generator for each of the event group IDs (group-1, …, group-20) to generate the hash keys:

Table 3 - Event Group ID Generated Hash Keys
Group ID Generated Hash Key
group-1 36878702945378520736626047775679136663
group-2 306061958545308461565701106111834939294
group-3 292735753390589125910426864564403662374
group-4 269007284811237496684139904908027348900
group-5 134599845387778953616504356620047892735
group-6 172404274464367626337742804044690822722
group-7 120051614284874321986263574254488428932
group-8 96003601987456478459892371035583429633
group-9 124399773740621873695985890065916563322
group-10 148110075274167556626459053393330135135
group-11 114645606660698920883266061759514048378
group-12 278195397729494683269018512556856246017
group-13 103647572796091141221952419811970549173
group-14 238058499068402564349796027478307801963
group-15 135187142084058751121981517202764523229
group-16 126372876827453074963320105050964021236
group-17 250808695372119527102005148552638263311
group-18 294194029414536733417445446143685926310
group-19 303879463638450793897400745877701295675
group-20 194160077621386137990572866333304120589

According to hash key ranges of the the 5 shards as shown in the Table 1, with these custom generated hash keys as shown in the Table 3, events will be assigned to the following shards:

Table 4 - Event Group ID Shard Assignments with Generated Hash Keys
Group ID Generated Hash Key Assigned Shard ID
group-1 36878702945378520736626047775679136663 000000000000
group-2 306061958545308461565701106111834939294 000000000004
group-3 292735753390589125910426864564403662374 000000000004
group-4 269007284811237496684139904908027348900 000000000003
group-5 134599845387778953616504356620047892735 000000000001
group-6 172404274464367626337742804044690822722 000000000002
group-7 120051614284874321986263574254488428932 000000000001
group-8 96003601987456478459892371035583429633 000000000001
group-9 124399773740621873695985890065916563322 000000000001
group-10 148110075274167556626459053393330135135 000000000002
group-11 114645606660698920883266061759514048378 000000000001
group-12 278195397729494683269018512556856246017 000000000004
group-13 103647572796091141221952419811970549173 000000000001
group-14 238058499068402564349796027478307801963 000000000003
group-15 135187142084058751121981517202764523229 000000000001
group-16 126372876827453074963320105050964021236 000000000001
group-17 250808695372119527102005148552638263311 000000000003
group-18 294194029414536733417445446143685926310 000000000004
group-19 303879463638450793897400745877701295675 000000000004
group-20 194160077621386137990572866333304120589 000000000002

As you can see, Kinesis shard assignments of events based on group IDs are the same with both the standard PartitionKey method and the custom generated hash key method.

It now seems safe to conclude that by generating a custom hash key for the ExplicitHashKey parameter, we can maintain accurate shard assignments. This allows us to use the PartitionKey parameter effectively to propagate the trace context.

Propagating trace context through “PartitionKey” but partition with “ExplicitHashKey” parameter  

Let’s see our new approach in action with the following implementation steps:

private PutRecordRequest injectTraceHeader(PutRecordRequest request) throws Exception { 
    if (!TRACE_CONTEXT_PROPAGATION_ENABLED) { 
        return request; 
    } 
    Span currentSpan = Span.current(); 
    if (currentSpan == null) { 
        return request; 
    } 
    SpanContext currentSpanContext = currentSpan.getSpanContext(); 
    if (currentSpanContext == null) { 
        return request; 
    }  

    String partitionKey = request.partitionKey(); 
    PutRecordRequest.Builder requestBuilder = request.toBuilder(); 
    String traceParent = String.format("00-%s-%s-%s", 
            currentSpanContext.getTraceId(), 
            currentSpanContext.getSpanId(), 
            currentSpanContext.getTraceFlags().asHex()); 
    requestBuilder.partitionKey(traceParent); 
    if (partitionKey != null) { 
        requestBuilder.explicitHashKey(toHashKey(partitionKey)); 
    } 

    return requestBuilder.build(); 
} 

private String toHashKey(String partitionKey) throws Exception { 
    MessageDigest md = MessageDigest.getInstance("MD5"); 
    md.update(partitionKey.getBytes()); 
    byte[] digest = md.digest(); 
    BigInteger bi = new BigInteger(1, digest); 
    return bi.toString(); 
} 

When running our test scenario again, we can see that the event-producer and event-consumer applications are now included in the same trace because the trace ID is propagated via the PartitionKey parameter in the PutRecord request, and we didn’t break the event’s shard assignment because we set the hash key over ExplicitHashKey parameter to put them into correct partitions.  

Image 6 - Trace of the event-producer and event-consumer applications with the same trace ID (correct partitioning over generated hash key)

You can check from the logs that events are partitioned by event group ID properly, not randomly based on trace ID as we did in the first attempt over the PartitionKey parameter.  

Image 7 - event-consumer function invocation logs for processing of the first event with group ID group-1 with custom hash key

Image 8 - event-consumer function invocation logs for processing of the second event with group ID group-1 with custom hash key

Image 9 - event-consumer function invocation logs for processing of the third event with group ID group-1 with custom hash key

Image 10 - event-consumer function invocation logs for processing of the fourth event with group ID group-1 with custom hash key

Image 11 - event-consumer function invocation logs for processing of the fifth event with group ID group-1 with custom hash key  

Conclusion

This 3-part series has demonstrated a practical approach to propagating the OTEL trace context through AWS Kinesis, leveraging the PartitionKey and ExplicitHashKey parameters without the need to modify the record data. By integrating custom code within the application, we have navigated the complexities of trace context propagation in a cloud-native environment.

Go back and read part 1 and part 2 of this series if you missed them.

Application Experience
This is some text inside of a div block.

You might also like

Blog post

How to Propagate OpenTelemetry Trace Headers Over AWS Kinesis: Part 3

Blog post

How to Propagate OpenTelemetry Trace Headers Over AWS Kinesis: Part 2

Blog post

How to Propagate OpenTelemetry Trace Headers Over AWS Kinesis: Part 1