| |

OCI Streams in OIC (pitfall)

Using OCI Streams in Oracle Integration Cloud (OIC)

In one of my projects there was a requirement to write message from OIC to an OCI stream and then, in some intervals read those messages from the OCI Stream back into OIC. Publishing is not constant, but a couple of times per day, during a couple of minutes. Adding to this, publishing can be 200 messages per minute, but consuming could only be done at a rate of 100 messages per minute, due to a third-party system limitation. So OCI Stream was used as a throttling mechanism.

Clear requirement, easy to implement right? Well, it proved to be a little bit more complicated. In this blogpost I will explain to you how I initially implemented this, why this did not work, and how I was able to solve this.

The initial implementation was as follows:

  • 1 Stream with 1 partition
  • 1 Producer writing 200 messages per minute to stream
  • 1 Consumer polling every 60 seconds the stream and read 100 messages.

You would expect all messages to be read within 2 minutes, but this is not happening. I Observed a huge delay, sometimes even up to 15 or 20 minutes.

I tried several other setups and configurations to speed things up, such as:

  • 1 Stream with multiple partitions
  • Publishing to and Reading from a dedicated partition

None of these helped. Sometimes it looked that things speeded up, but all in all the results were not consistent and thus not reliable. The slowness was reproducible, and unacceptable for the use case.

To understand the behavior, we must understand some OCI Stream specifics, such as consumer groups and partitions. Let’s investigate these, straight from the documentation:

How do consumer groups work?

Consumers can be configured to consume messages as part of a group. Stream partitions are distributed among members of a group so that messages from any single partition are sent only to a single consumer. Partition assignments are rebalanced as consumers join or leave the group. Multiple consumer groups can read from a single stream. Each consumer group receives all messages in the stream at least once.

What triggers a rebalance activity within a consumer group?

When an instance of a consumer group becomes inactive either because it fails to send a heartbeat for more than 30 seconds or the process is terminated, a rebalance activity is triggered within the consumer group. This is done to handle the partitions previously consumed by the inactive instance and reassign it to an active instance. Similarly, when an instance of a consumer group that was previously inactive joins the group, a rebalance is triggered to assign a partition to start consuming from. The Streaming service provides no guarantee in reassigning the instance to the same partition when it rejoins the group.

  • Instance: If an instance stops consuming messages for more than 30 seconds, it’s removed from the consumer group and its partition is reassigned to another instance. This is referred to as rebalancing.

To mimic the behaviour, I decided to trim down the numbers and work with 21 produced messages which then in turn must be consumed by the integration.

I created a Producer and 2 Consumers. Consumer Integrations have the same name, and a different major version, so the can both be active at the same time in OIC.

One integration (version 1.0) (as consumer group OIC-V3) would poll every 60 seconds and fetch a maximum of 10 records, whereas the other integration (version 2.0) (as consumer group OIC-V4) would poll every 12 seconds and fetch a maximum of 2 records. Both integrations are identical, except for the polling interval and the consumer group. The latter to make sure that both integration can read the same messages from the stream.

This is what we expect to happen in within 3 minutes after the messages have been produced to the stream:

 

Number of polls

Records per poll

Total records

OIC-V3

3

10

21 (duration 3m)

OIC-V4

11

2

21 (duration 2m12s)

And that is when everything became clear…..

I started the Producer, which put21 records on the stream.

After the 3 minutes expired, this is what I observed in OIC:

Consumer 1.0 (the one with polling interval 60 seconds) was never started, whereas Consumer 2.0 ( 12 second interval) was started 11 times (which is exactly what I expected)and read all produced messages from the stream.

It took some time for Consumer 1.0 to kick in. As a matter of fact, the last message was consumed 16 minutes after it was published.

When we investigate the 3 successful instance of Consumer 1.0 we see that first consumption was 5 minutes after the messages where produced, and the second 10 where consumed 5 minutes after that. The final message was consumed even 16 after being produced.

So this is what happens when your polling frequency exceeds the 30 seconds and rebalancing kicks in… You can never be sure it works as expected and you will probably see delays.

A SR with Oracle was raised to investigate this behaviour, and the behaviour was reproduced during a live session wit Oracle Support. So be aware not to set your polling interval at 30 seconds or more….