Pubsub batch autocommitting.#2966
Conversation
This PR adds some functionality to the Batch object:
* The ability to specify `max_messages` and have the batch
automatically call `commit` when the number of messages
gets that high.
* The ability to specify `max_interval` and have the batch
automatically commit when a publish occurs and the batch
is at least as old as the specified interval.
This is one of two changes requested by the PubSub team.
pubsub/google/cloud/pubsub/topic.py
Outdated
| def __init__(self, topic, client): | ||
| INFINITY = float('inf') | ||
|
|
||
| def __init__(self, topic, client, max_interval=INFINITY, |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
dhermes
left a comment
There was a problem hiding this comment.
Mostly LG, ping me once addressed / we discuss?
pubsub/google/cloud/pubsub/topic.py
Outdated
| :type max_messages: float | ||
| """ | ||
| def __init__(self, topic, client): | ||
| INFINITY = float('inf') |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
pubsub/google/cloud/pubsub/topic.py
Outdated
| self._max_messages = max_messages | ||
|
|
||
| # Set the initial starting timestamp (used against the interval). | ||
| self._start_timestamp = float(time.time()) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
pubsub/google/cloud/pubsub/topic.py
Outdated
| :type attrs: dict (string -> string) | ||
| :param attrs: key-value pairs to send as message attributes | ||
|
|
||
| :rtype: None |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
pubsub/google/cloud/pubsub/topic.py
Outdated
| if self._max_interval < self.INFINITY: | ||
| if float(time.time()) - self._start_timestamp > self._max_interval: | ||
| self._start_timestamp = float(time.time()) | ||
| return self.commit() |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
pubsub/google/cloud/pubsub/topic.py
Outdated
| # If the number of messages on the list is greater than the | ||
| # maximum allowed, autocommit (with the batch's client). | ||
| if len(self.messages) >= self._max_messages: | ||
| return self.commit() |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| def test_message_count_autocommit(self): | ||
| """Establish that if the batch is assigned to take a maximum | ||
| number of messages, that it commits when it reaches that maximum. | ||
| """ |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| topic = _Topic(name='TOPIC') | ||
|
|
||
| # Track commits, but do not perform them. | ||
| Batch = self._get_target_class() |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
pubsub/unit_tests/test_topic.py
Outdated
| for i in range(0, 4): | ||
| batch.publish({ | ||
| 'attributes': {}, | ||
| 'data': 'Batch message %d.' % i, |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
pubsub/unit_tests/test_topic.py
Outdated
| Batch = self._get_target_class() | ||
| with mock.patch.object(Batch, 'commit') as commit: | ||
| with self._make_one(topic, client=client, max_messages=5) as batch: | ||
| self.assertIsInstance(batch, self._get_target_class()) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| # Track commits, but do not perform them. | ||
| Batch = self._get_target_class() | ||
| with mock.patch.object(Batch, 'commit') as commit: | ||
| mock_time.return_value = 0.0 |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
pubsub/google/cloud/pubsub/topic.py
Outdated
|
|
||
| # If too much time has elapsed since the first message | ||
| # was added, autocommit. | ||
| if self._max_interval < self.INFINITY: |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
|
||
| # If the number of messages on the list is greater than the | ||
| # maximum allowed, autocommit (with the batch's client). | ||
| if len(self.messages) >= self._max_messages: |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| # maximum allowed, autocommit (with the batch's client). | ||
| if len(self.messages) >= self._max_messages: | ||
| self.commit() | ||
| return |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| topic = _Topic(name='TOPIC') | ||
|
|
||
| # Track commits, but do not perform them. | ||
| Batch = self._get_target_class() |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| # Track commits, but do not perform them. | ||
| Batch = self._get_target_class() | ||
| with mock.patch.object(Batch, 'commit') as commit: | ||
| mock_time.return_value = 0.0 |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
* Pubsub batch autocommitting.
This PR adds some functionality to the Batch object:
* The ability to specify `max_messages` and have the batch
automatically call `commit` when the number of messages
gets that high.
* The ability to specify `max_interval` and have the batch
automatically commit when a publish occurs and the batch
is at least as old as the specified interval.
This is one of two changes requested by the PubSub team.
* Addressing comments from @dhermes.
* Remove unneeded -lt check @dhermes.
* Make INFINITY have a leading underscore. @dhermes
This PR adds some functionality to the Batch object:
max_messagesand have the batchautomatically call
commitwhen the number of messagesgets that high.
max_intervaland have the batchautomatically commit when a publish occurs and the batch
is at least as old as the specified interval.
This is one of two changes requested by the PubSub team.