Skip to main content

Module airbyte.progress

A simple progress bar for the command line and IPython notebooks.

Note: Some runtimes (e.g. Dagger) may not support Rich Live views, and sometimes because they also use Rich, and Rich only supports one live view at a time. PyAirbyte will try to use smart defaults based on your execution environment.

If you experience issues, you can force plain text status reporting by setting the environment variable NO_LIVE_PROGRESS=1.

Logging is controlled by the AIRBYTE_LOGGING_ROOT and AIRBYTE_STRUCTURED_LOGGING environment variables, as described in airbyte.logs. If AIRBYTE_STRUCTURED_LOGGING is set, logs will be written in JSONL format. Otherwise, log files will be written as text.

Variables

DEFAULT_REFRESHES_PER_SECOND : The default number of times per second to refresh the progress view.

MAX_ITEMIZED_STREAMS : The maximum number of streams to itemize in the progress view.

MAX_UPDATE_FREQUENCY : The max number of records to read before updating the progress bar.

TIME_TO_FIRST_RECORD_THRESHOLD_SECONDS : Threshold for time_to_first_record above which adjusted metrics are calculated.

Classes

ProgressStyle(*args, **kwds) : An enum of progress bar styles.

Ancestors (in MRO)

  • enum.Enum

Class variables

AUTO : Automatically select the best style for the environment.

IPYTHON : Use IPython display methods.

NONE : Skip progress prints.

PLAIN : A plain text progress print.

RICH : A Rich progress bar.

ProgressTracker(style: ProgressStyle = ProgressStyle.AUTO, *, source: Source | None, cache: CacheBase | None, destination: Destination | None, expected_streams: list[str] | None = None) : A simple progress bar for the command line and IPython notebooks.

Initialize the progress tracker.

Instance variables

bytes_tracking_enabled: bool : Return True if bytes are being tracked.

destination_records_delivered_per_second: float : Return the number of records delivered per second.

elapsed_finalization_seconds: float : Return the number of seconds elapsed since the read operation started.

elapsed_finalization_time_str: str : Return duration as a string.

elapsed_read_seconds: float : Return the number of seconds elapsed since the read operation started.

elapsed_read_time_string: str : Return duration as a string.

elapsed_seconds: float : Return the number of seconds elapsed since the operation started.

elapsed_seconds_since_last_update: float | None : Return the number of seconds elapsed since the last update.

elapsed_time_string: str : Return duration as a string.

job_description: str : Return a description of the job, combining source, destination, and cache inputs.

total_bytes_read: int : Return the total number of bytes read.

Return None if bytes are not being tracked.

total_destination_records_confirmed: int : Return the total number of records confirmed by the destination.

total_destination_records_delivered: int : Return the total number of records delivered to the destination.

total_destination_write_time_seconds: float : Return the total time elapsed in seconds.

total_destination_write_time_str: str : Return the total time elapsed as a string.

total_megabytes_read: float : Return the total number of bytes read.

Return None if no bytes have been read, as this is generally due to bytes not being tracked.

total_time_elapsed_seconds: float : Return the total time elapsed in seconds.

total_time_elapsed_str: str : Return the total time elapsed as a string.

Methods

log_batch_written(self, stream_name: str, batch_size: int) ‑> None : Log that a batch has been written.

Args: stream_name: The name of the stream. batch_size: The number of records in the batch.

log_batches_finalized(self, stream_name: str, num_batches: int) ‑> None : Log that a batch has been finalized.

log_batches_finalizing(self, stream_name: str, num_batches: int) ‑> None : Log that batch are ready to be finalized.

In our current implementation, we ignore the stream name and number of batches. We just use this as a signal that we're finished reading and have begun to finalize any accumulated batches.

log_cache_processing_complete(self) ‑> None : Log that cache processing is complete.

log_failure(self, exception: Exception) ‑> None : Log the failure of a sync operation.

log_read_complete(self) ‑> None : Log that reading is complete.

log_stream_finalized(self, stream_name: str) ‑> None : Log that a stream has been finalized.

log_stream_start(self, stream_name: str) ‑> None : Log that a stream has started reading.

log_success(self) ‑> None : Log the success of a sync operation.

reset_progress_style(self, style: ProgressStyle = ProgressStyle.AUTO) ‑> None : Set the progress bar style.

You can call this method at any time to change the progress bar style as needed.

Usage:

from airbyte.progress import progress, ProgressStyle

progress.reset_progress_style(ProgressStyle.PLAIN)

tally_bytes_read(self, bytes_read: int, stream_name: str) ‑> None : Tally the number of bytes read.

Unlike the other tally methods, this method does not yield messages.

tally_confirmed_writes(self, messages: Iterable[AirbyteMessage]) ‑> Generator[AirbyteMessage, Any, None] : This method watches for state messages and tally records that are confirmed written.

The original messages are passed through unchanged.

tally_pending_writes(self, messages: IO[str] | AirbyteMessageIterator) ‑> Generator[AirbyteMessage, None, None] : This method simply tallies the number of records processed and yields the messages.

tally_records_read(self, messages: Iterable[AirbyteMessage], *, auto_close_streams: bool = False) ‑> Generator[AirbyteMessage, Any, None] : This method simply tallies the number of records processed and yields the messages.