diff --git a/homeassistant/components/aws_s3/backup.py b/homeassistant/components/aws_s3/backup.py index 46585ea7e97..97e2baeec8d 100644 --- a/homeassistant/components/aws_s3/backup.py +++ b/homeassistant/components/aws_s3/backup.py @@ -5,7 +5,7 @@ import functools import json import logging from time import time -from typing import Any +from typing import Any, cast from botocore.exceptions import BotoCoreError @@ -189,48 +189,68 @@ class S3BackupAgent(BackupAgent): ) upload_id = multipart_upload["UploadId"] try: - parts = [] + parts: list[dict[str, Any]] = [] part_number = 1 - buffer_size = 0 # bytes - buffer: list[bytes] = [] + buffer = bytearray() # bytes buffer to store the data + offset = 0 # start index of unread data inside buffer stream = await open_stream() async for chunk in stream: - buffer_size += len(chunk) - buffer.append(chunk) + buffer.extend(chunk) - # If buffer size meets minimum part size, upload it as a part - if buffer_size >= MULTIPART_MIN_PART_SIZE_BYTES: - _LOGGER.debug( - "Uploading part number %d, size %d", part_number, buffer_size - ) - part = await self._client.upload_part( - Bucket=self._bucket, - Key=tar_filename, - PartNumber=part_number, - UploadId=upload_id, - Body=b"".join(buffer), - ) - parts.append({"PartNumber": part_number, "ETag": part["ETag"]}) - part_number += 1 - buffer_size = 0 - buffer = [] + # Upload parts of exactly MULTIPART_MIN_PART_SIZE_BYTES to ensure + # all non-trailing parts have the same size (defensive implementation) + view = memoryview(buffer) + try: + while len(buffer) - offset >= MULTIPART_MIN_PART_SIZE_BYTES: + start = offset + end = offset + MULTIPART_MIN_PART_SIZE_BYTES + part_data = view[start:end] + offset = end + + _LOGGER.debug( + "Uploading part number %d, size %d", + part_number, + len(part_data), + ) + part = await cast(Any, self._client).upload_part( + Bucket=self._bucket, + Key=tar_filename, + PartNumber=part_number, + UploadId=upload_id, + Body=part_data.tobytes(), + ) + parts.append({"PartNumber": part_number, "ETag": part["ETag"]}) + part_number += 1 + finally: + view.release() + + # Compact the buffer if the consumed offset has grown large enough. This + # avoids unnecessary memory copies when compacting after every part upload. + if offset and offset >= MULTIPART_MIN_PART_SIZE_BYTES: + buffer = bytearray(buffer[offset:]) + offset = 0 # Upload the final buffer as the last part (no minimum size requirement) - if buffer: + # Offset should be 0 after the last compaction, but we use it as the start + # index to be defensive in case the buffer was not compacted. + if offset < len(buffer): + remaining_data = memoryview(buffer)[offset:] _LOGGER.debug( - "Uploading final part number %d, size %d", part_number, buffer_size + "Uploading final part number %d, size %d", + part_number, + len(remaining_data), ) - part = await self._client.upload_part( + part = await cast(Any, self._client).upload_part( Bucket=self._bucket, Key=tar_filename, PartNumber=part_number, UploadId=upload_id, - Body=b"".join(buffer), + Body=remaining_data.tobytes(), ) parts.append({"PartNumber": part_number, "ETag": part["ETag"]}) - await self._client.complete_multipart_upload( + await cast(Any, self._client).complete_multipart_upload( Bucket=self._bucket, Key=tar_filename, UploadId=upload_id,