diff --git a/.github/workflows/builds.yml b/.github/workflows/builds.yml index 784687d2..05879d39 100644 --- a/.github/workflows/builds.yml +++ b/.github/workflows/builds.yml @@ -65,23 +65,27 @@ jobs: include: - os: ubuntu-latest name: linux-x64 - build_cmd: ./build.sh release-all + build_cmd: ./build.sh release-examples build_dir: build-release - os: ubuntu-24.04-arm name: linux-arm64 - build_cmd: ./build.sh release-all + build_cmd: ./build.sh release-examples build_dir: build-release - - os: macos-latest + # macos-latest currently maps to a lower-spec 3-core/7GB arm64 image; + # macos-26 gives us 4 cores/14GB on the standard free runner tier. + - os: macos-26 name: macos-arm64 - build_cmd: ./build.sh release-all + build_cmd: ./build.sh release-examples build_dir: build-release - - os: macos-latest + # Use the explicit Intel image for x64 with the same 4-core/14GB profile + # instead of cross-building on the lower-spec macos-latest arm64 runner. + - os: macos-26-intel name: macos-x64 - build_cmd: ./build.sh release-all --macos-arch x86_64 + build_cmd: ./build.sh release-examples --macos-arch x86_64 build_dir: build-release - os: windows-latest name: windows-x64 - build_cmd: .\build.cmd release-all + build_cmd: .\build.cmd release-examples build_dir: build-release name: Build (${{ matrix.name }}) @@ -149,22 +153,10 @@ jobs: run: rustup target add x86_64-apple-darwin # ---------- Cache Cargo ---------- - - name: Cache Cargo registry - uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5 + - name: Cache Cargo + uses: Swatinem/rust-cache@c19371144df3bb44fab255c43d04cbc2ab54d1c4 # v2.9.1 with: - path: | - ~/.cargo/registry - ~/.cargo/git - key: ${{ runner.os }}-${{ matrix.name }}-cargo-reg-${{ hashFiles('**/Cargo.lock') }} - restore-keys: ${{ runner.os }}-${{ matrix.name }}-cargo-reg- - - - name: Cache Cargo target - uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5 - with: - path: client-sdk-rust/target - key: ${{ runner.os }}-${{ matrix.name }}-cargo-target-${{ hashFiles('**/Cargo.lock') }} - restore-keys: | - ${{ runner.os }}-${{ matrix.name }}-cargo-target- + workspaces: client-sdk-rust -> target # ---------- Build environment setup ---------- - name: Set Linux build environment @@ -288,29 +280,6 @@ jobs: } if ($failed) { exit 1 } else { exit 0 } - # ---------- Run unit tests ---------- - - name: Run unit tests (Unix) - if: runner.os != 'Windows' - shell: bash - run: | - ${{ matrix.build_dir }}/bin/livekit_unit_tests \ - --gtest_output=xml:${{ matrix.build_dir }}/unit-test-results.xml - - - name: Run unit tests (Windows) - if: runner.os == 'Windows' - shell: pwsh - run: | - & "${{ matrix.build_dir }}/bin/livekit_unit_tests.exe" ` - --gtest_output=xml:${{ matrix.build_dir }}/unit-test-results.xml - - - name: Upload test results - if: always() - uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1 - with: - name: test-results-${{ matrix.name }} - path: ${{ matrix.build_dir }}/unit-test-results.xml - retention-days: 7 - # ---------- Upload artifacts ---------- - name: Upload build artifacts uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1 diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 00000000..94875227 --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,205 @@ +name: Tests + +on: + push: + branches: ["main"] + paths: + - src/** + - include/** + - client-sdk-rust/** + - CMakeLists.txt + - CMakePresets.json + - build.sh + - build.cmd + - vcpkg.json + - .token_helpers/** + - .github/workflows/tests.yml + pull_request: + branches: ["main"] + paths: + - src/** + - include/** + - client-sdk-rust/** + - CMakeLists.txt + - CMakePresets.json + - build.sh + - build.cmd + - vcpkg.json + - .token_helpers/** + - .github/workflows/tests.yml + workflow_dispatch: + +permissions: + contents: read + actions: read + packages: read + +env: + CARGO_TERM_COLOR: always + +jobs: + test: + strategy: + fail-fast: false + matrix: + include: + - os: ubuntu-latest + name: linux-x64 + - os: ubuntu-24.04-arm + name: linux-arm64 + # macos-latest currently maps to a lower-spec 3-core/7GB arm64 image; + # macos-26 gives us 4 cores/14GB on the standard free runner tier. + - os: macos-26 + name: macos-arm64 + + name: Test (${{ matrix.name }}) + runs-on: ${{ matrix.os }} + + steps: + - name: Checkout (with submodules) + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + with: + submodules: recursive + fetch-depth: 1 + + - name: Pull LFS files + run: git lfs pull + + # ---------- OS-specific deps ---------- + - name: Install deps (Ubuntu) + if: runner.os == 'Linux' + run: | + set -eux + sudo apt-get update + sudo apt-get install -y \ + build-essential cmake ninja-build pkg-config \ + llvm-dev libclang-dev clang \ + libva-dev libdrm-dev libgbm-dev libx11-dev libgl1-mesa-dev \ + libxext-dev libxcomposite-dev libxdamage-dev libxfixes-dev \ + libxrandr-dev libxi-dev libxkbcommon-dev \ + libasound2-dev libpulse-dev \ + libssl-dev \ + libprotobuf-dev protobuf-compiler \ + libabsl-dev \ + libwayland-dev libdecor-0-dev \ + libspdlog-dev \ + jq + + - name: Install deps (macOS) + if: runner.os == 'macOS' + run: | + set -eux + brew update + brew install cmake ninja protobuf abseil spdlog jq + + # ---------- Rust toolchain ---------- + - name: Install Rust (stable) + uses: dtolnay/rust-toolchain@3c5f7ea28cd621ae0bf5283f0e981fb97b8a7af9 + with: + toolchain: stable + + # ---------- Cache Cargo ---------- + - name: Cache Cargo + uses: Swatinem/rust-cache@c19371144df3bb44fab255c43d04cbc2ab54d1c4 # v2.9.1 + with: + workspaces: client-sdk-rust -> target + + # ---------- Build environment setup ---------- + - name: Set Linux build environment + if: runner.os == 'Linux' + run: | + echo "CXXFLAGS=-Wno-deprecated-declarations" >> $GITHUB_ENV + echo "CFLAGS=-Wno-deprecated-declarations" >> $GITHUB_ENV + LLVM_VERSION=$(llvm-config --version | cut -d. -f1) + echo "LIBCLANG_PATH=/usr/lib/llvm-${LLVM_VERSION}/lib" >> $GITHUB_ENV + + # ---------- Build (release-tests: tests on, examples off) ---------- + - name: Build tests + shell: bash + run: | + chmod +x build.sh + ./build.sh release-tests + + # ---------- Run unit tests ---------- + - name: Run unit tests + timeout-minutes: 1 + shell: bash + run: | + build-release/bin/livekit_unit_tests \ + --gtest_output=xml:build-release/unit-test-results.xml + + # ---------- Install + start livekit-server for integration tests ---------- + - name: Install livekit-server and lk CLI + shell: bash + run: | + set -euxo pipefail + if [[ "$RUNNER_OS" == "Linux" ]]; then + # Linux: official install scripts. lk's installer parses the GitHub + # API JSON with jq (already installed above). + curl -sSL https://get.livekit.io | bash + curl -sSL https://get.livekit.io/cli | bash + else + # macOS: Homebrew formulas. Server install script aborts on Darwin. + brew install livekit livekit-cli + fi + livekit-server --version + lk --version + + - name: Start livekit-server + shell: bash + env: + LIVEKIT_CONFIG: "enable_data_tracks: true" + run: | + set -euxo pipefail + # Background the server with nohup so it survives this step's shell + # exit and remains running for the integration-test step. + nohup livekit-server --dev > livekit-server.log 2>&1 & + echo $! > livekit-server.pid + # Port 7880 is a WebSocket endpoint, so a TCP-connect probe is the + # most reliable readiness signal. + for i in $(seq 1 30); do + if nc -z 127.0.0.1 7880 >/dev/null 2>&1; then + echo "livekit-server is ready" + exit 0 + fi + sleep 1 + done + echo "::error::livekit-server failed to start within 30s" + tail -n 200 livekit-server.log || true + exit 1 + + - name: Run integration tests + timeout-minutes: 5 + shell: bash + run: | + set -euo pipefail + source .token_helpers/set_data_track_test_tokens.bash + build-release/bin/livekit_integration_tests \ + --gtest_output=xml:build-release/integration-test-results.xml + + - name: Stop livekit-server + if: always() + shell: bash + run: | + if [ -f livekit-server.pid ]; then + kill "$(cat livekit-server.pid)" 2>/dev/null || true + rm -f livekit-server.pid + fi + + - name: Dump livekit-server log on failure + if: failure() + shell: bash + run: tail -n 500 livekit-server.log || true + + # ---------- Upload results ---------- + - name: Upload test results + if: always() + uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1 + with: + name: test-results-${{ matrix.name }} + path: | + build-release/unit-test-results.xml + build-release/integration-test-results.xml + livekit-server.log + if-no-files-found: ignore + retention-days: 7 diff --git a/.token_helpers/README.md b/.token_helpers/README.md index ebed99c1..d407a58f 100644 --- a/.token_helpers/README.md +++ b/.token_helpers/README.md @@ -5,4 +5,7 @@ Examples of generating tokens Generate tokens and then set them as env vars for the current terminal session ## set_data_track_test_tokens.bash -Generate tokens for data track integration tests and set them as env vars for the current terminal session. \ No newline at end of file +Generate the two participant tokens required by the C++ SDK's integration +and stress test suites (data tracks, RPC, media multistream, etc.) and +export them as `LK_TOKEN_TEST_A`, `LK_TOKEN_TEST_B`, and `LIVEKIT_URL` for +the current terminal session. \ No newline at end of file diff --git a/.token_helpers/set_data_track_test_tokens.bash b/.token_helpers/set_data_track_test_tokens.bash index b9bf99b8..99b43b11 100755 --- a/.token_helpers/set_data_track_test_tokens.bash +++ b/.token_helpers/set_data_track_test_tokens.bash @@ -14,10 +14,11 @@ # limitations under the License. # Generate two LiveKit access tokens via `lk` and set the environment variables -# required by src/tests/integration/test_data_track.cpp. +# required by the C++ SDK's integration and stress tests (data tracks, RPC, +# media multistream, etc.). # -# source examples/tokens/set_data_track_test_tokens.bash -# eval "$(bash examples/tokens/set_data_track_test_tokens.bash)" +# source .token_helpers/set_data_track_test_tokens.bash +# eval "$(bash .token_helpers/set_data_track_test_tokens.bash)" # # Exports: # LK_TOKEN_TEST_A @@ -47,8 +48,6 @@ fi LIVEKIT_ROOM="cpp_data_track_test" LIVEKIT_IDENTITY_A="cpp-test-a" LIVEKIT_IDENTITY_B="cpp-test-b" -LIVEKIT_CALLER_IDENTITY="caller" -LIVEKIT_RECEIVER_IDENTITY="receiver" if [[ $# -ne 0 ]]; then _fail "this script is hard-coded and does not accept arguments" 2 @@ -106,27 +105,22 @@ _create_token() { LK_TOKEN_TEST_A="$(_create_token "$LIVEKIT_IDENTITY_A")" LK_TOKEN_TEST_B="$(_create_token "$LIVEKIT_IDENTITY_B")" -LIVEKIT_CALLER_TOKEN="$(_create_token "$LIVEKIT_CALLER_IDENTITY")" -LIVEKIT_RECEIVER_TOKEN="$(_create_token "$LIVEKIT_RECEIVER_IDENTITY")" + _apply() { export LK_TOKEN_TEST_A export LK_TOKEN_TEST_B - export LIVEKIT_CALLER_TOKEN - export LIVEKIT_RECEIVER_TOKEN export LIVEKIT_URL } _emit_eval() { printf 'export LK_TOKEN_TEST_A=%q\n' "$LK_TOKEN_TEST_A" printf 'export LK_TOKEN_TEST_B=%q\n' "$LK_TOKEN_TEST_B" - printf 'export LIVEKIT_CALLER_TOKEN=%q\n' "$LIVEKIT_CALLER_TOKEN" - printf 'export LIVEKIT_RECEIVER_TOKEN=%q\n' "$LIVEKIT_RECEIVER_TOKEN" printf 'export LIVEKIT_URL=%q\n' "$LIVEKIT_URL" } if [[ "$_sourced" -eq 1 ]]; then _apply - echo "LK_TOKEN_TEST_A, LK_TOKEN_TEST_B, LIVEKIT_CALLER_TOKEN, LIVEKIT_RECEIVER_TOKEN, and LIVEKIT_URL set for this shell." >&2 + echo "LK_TOKEN_TEST_A, LK_TOKEN_TEST_B, and LIVEKIT_URL set for this shell." >&2 else _emit_eval echo "set_data_track_test_tokens.bash: for this shell run: source $0 or: eval \"\$(bash $0 ...)\"" >&2 diff --git a/AGENTS.md b/AGENTS.md index f6a0d883..e78838f7 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -107,6 +107,9 @@ Updates to ./build.sh and ./build.cmd should be accompanied by updates to this f ./build.sh clean-all # Full clean (C++ + Rust targets) ``` +The build scripts pass an explicit job count to `cmake --build --parallel`. Set +`CMAKE_BUILD_PARALLEL_LEVEL` to override the default detected logical CPU count. + **Requirements:** CMake 3.20+, C++17, Rust toolchain (cargo), protoc. On macOS: `brew install cmake ninja protobuf abseil spdlog`. On Linux: see the CI workflow for apt packages. ### SDK Packaging diff --git a/README.md b/README.md index 6136542b..4e402ebd 100644 --- a/README.md +++ b/README.md @@ -86,6 +86,9 @@ Using build scripts: .\build.cmd release-all # Build Release with tests + examples ``` +The build scripts pass an explicit job count to `cmake --build --parallel`. Set +`CMAKE_BUILD_PARALLEL_LEVEL` to override the default detected logical CPU count. + ### Windows build using cmake/vcpkg ```bash cmake -S . -B build -DCMAKE_TOOLCHAIN_FILE="$PWD/vcpkg/scripts/buildsystems/vcpkg.cmake" # Generate Makefiles in build folder @@ -418,25 +421,41 @@ ctest --output-on-failure | `livekit_integration_tests` | Quick tests (~1-2 minutes) for SDK functionality | | `livekit_stress_tests` | Long-running tests (configurable, default 1 hour) | -### RPC Test Environment Variables +### Integration & Stress Test Environment Variables -RPC integration and stress tests require a LiveKit server and two participant tokens: +The integration and stress test suites (data tracks, RPC, media multistream, +etc.) require a LiveKit server and two participant tokens: ```bash # Required -export LIVEKIT_URL="wss://your-server.livekit.cloud" -export LIVEKIT_CALLER_TOKEN="" -export LIVEKIT_RECEIVER_TOKEN="" +export LIVEKIT_URL="ws://localhost:7880" # or wss://your-server.livekit.cloud +export LK_TOKEN_TEST_A="" +export LK_TOKEN_TEST_B="" # Optional (for stress tests) export RPC_STRESS_DURATION_SECONDS=3600 # Test duration (default: 1 hour) export RPC_STRESS_CALLER_THREADS=4 # Concurrent caller threads (default: 4) ``` -**Generate tokens for RPC tests:** +**Generate tokens for the test suites:** + +The easiest path is to source the helper script, which will mint both +participant tokens against a local `livekit-server --dev` and export +`LK_TOKEN_TEST_A`, `LK_TOKEN_TEST_B`, and `LIVEKIT_URL` for the current shell: + ```bash -lk token create -r test -i rpc-caller --join --valid-for 99999h --dev --room=rpc-test-room -lk token create -r test -i rpc-receiver --join --valid-for 99999h --dev --room=rpc-test-room +source .token_helpers/set_data_track_test_tokens.bash +``` + +To generate tokens manually instead (e.g. against a non-default server): + +```bash +lk token create --api-key devkey --api-secret secret -i cpp-test-a \ + --join --valid-for 99999h --room cpp_data_track_test \ + --grant '{"canPublish":true,"canSubscribe":true,"canPublishData":true}' +lk token create --api-key devkey --api-secret secret -i cpp-test-b \ + --join --valid-for 99999h --room cpp_data_track_test \ + --grant '{"canPublish":true,"canSubscribe":true,"canPublishData":true}' ``` ### Test Coverage diff --git a/build.cmd b/build.cmd index 7008df93..da6f08c6 100644 --- a/build.cmd +++ b/build.cmd @@ -7,6 +7,7 @@ set "BUILD_TYPE=Release" set "PRESET=windows-release" set "LIVEKIT_VERSION=" set "CMAKE_EXTRA_ARGS=" +set "BUILD_PARALLEL_JOBS=" REM ============================================================ REM Auto-detect LIBCLANG_PATH if not already set @@ -214,8 +215,17 @@ if errorlevel 1 ( goto build_only :build_only -echo ==^> Building (%BUILD_TYPE%)... -cmake --build "%BUILD_DIR%" --config %BUILD_TYPE% +if not defined BUILD_PARALLEL_JOBS ( + if defined CMAKE_BUILD_PARALLEL_LEVEL ( + set "BUILD_PARALLEL_JOBS=%CMAKE_BUILD_PARALLEL_LEVEL%" + ) else if defined NUMBER_OF_PROCESSORS ( + set "BUILD_PARALLEL_JOBS=%NUMBER_OF_PROCESSORS%" + ) else ( + set "BUILD_PARALLEL_JOBS=2" + ) +) +echo ==^> Building (%BUILD_TYPE%) with %BUILD_PARALLEL_JOBS% parallel jobs... +cmake --build "%BUILD_DIR%" --config %BUILD_TYPE% --parallel "%BUILD_PARALLEL_JOBS%" if errorlevel 1 ( echo Build failed! exit /b 1 diff --git a/build.sh b/build.sh index 13b67274..2a9560c4 100755 --- a/build.sh +++ b/build.sh @@ -161,14 +161,39 @@ configure() { fi } +detect_parallel_jobs() { + if [[ -n "${CMAKE_BUILD_PARALLEL_LEVEL:-}" ]]; then + echo "${CMAKE_BUILD_PARALLEL_LEVEL}" + return + fi + + local jobs="" + if command -v getconf >/dev/null 2>&1; then + jobs="$(getconf _NPROCESSORS_ONLN 2>/dev/null || true)" + fi + if [[ -z "${jobs}" || ! "${jobs}" =~ ^[0-9]+$ || "${jobs}" -lt 1 ]]; then + if command -v sysctl >/dev/null 2>&1; then + jobs="$(sysctl -n hw.ncpu 2>/dev/null || true)" + fi + fi + if [[ -z "${jobs}" || ! "${jobs}" =~ ^[0-9]+$ || "${jobs}" -lt 1 ]]; then + jobs="2" + fi + + echo "${jobs}" +} + build() { - echo "==> Building (${BUILD_TYPE})..." + local parallel_jobs + parallel_jobs="$(detect_parallel_jobs)" + + echo "==> Building (${BUILD_TYPE}) with ${parallel_jobs} parallel jobs..." if [[ -n "${PRESET}" ]] && [[ -f "${PROJECT_ROOT}/CMakePresets.json" ]]; then # Use preset build if available - cmake --build --preset "${PRESET}" + cmake --build --preset "${PRESET}" --parallel "${parallel_jobs}" else # Fallback to traditional build - cmake --build "${BUILD_DIR}" + cmake --build "${BUILD_DIR}" --parallel "${parallel_jobs}" fi } diff --git a/src/data_track_stream.cpp b/src/data_track_stream.cpp index 30a355c8..d5abe636 100644 --- a/src/data_track_stream.cpp +++ b/src/data_track_stream.cpp @@ -19,7 +19,6 @@ #include "data_track.pb.h" #include "ffi.pb.h" #include "ffi_client.h" -#include "lk_log.h" #include diff --git a/src/tests/common/test_common.h b/src/tests/common/test_common.h index 495a5f56..2acdaa2b 100644 --- a/src/tests/common/test_common.h +++ b/src/tests/common/test_common.h @@ -70,8 +70,8 @@ constexpr char kLocalTestLiveKitUrl[] = "ws://localhost:7880"; * * Environment variables: * LIVEKIT_URL - WebSocket URL of the LiveKit server - * LIVEKIT_CALLER_TOKEN - Token for the caller/sender participant - * LIVEKIT_RECEIVER_TOKEN - Token for the receiver participant + * LK_TOKEN_TEST_A - Token for the first test participant + * LK_TOKEN_TEST_B - Token for the second test participant * TEST_ITERATIONS - Number of iterations for iterative tests (default: * 10) STRESS_DURATION_SECONDS - Duration for stress tests in seconds (default: * 600) STRESS_CALLER_THREADS - Number of caller threads for stress tests @@ -79,8 +79,8 @@ constexpr char kLocalTestLiveKitUrl[] = "ws://localhost:7880"; */ struct TestConfig { std::string url; - std::string caller_token; - std::string receiver_token; + std::string token_a; + std::string token_b; int test_iterations; int stress_duration_seconds; int num_caller_threads; @@ -89,16 +89,16 @@ struct TestConfig { static TestConfig fromEnv() { TestConfig config; const char *url = std::getenv("LIVEKIT_URL"); - const char *caller_token = std::getenv("LIVEKIT_CALLER_TOKEN"); - const char *receiver_token = std::getenv("LIVEKIT_RECEIVER_TOKEN"); + const char *token_a = std::getenv("LK_TOKEN_TEST_A"); + const char *token_b = std::getenv("LK_TOKEN_TEST_B"); const char *iterations_env = std::getenv("TEST_ITERATIONS"); const char *duration_env = std::getenv("STRESS_DURATION_SECONDS"); const char *threads_env = std::getenv("STRESS_CALLER_THREADS"); - if (url && caller_token && receiver_token) { + if (url && token_a && token_b) { config.url = url; - config.caller_token = caller_token; - config.receiver_token = receiver_token; + config.token_a = token_a; + config.token_b = token_b; config.available = true; } @@ -142,17 +142,17 @@ inline bool waitForParticipant(Room *room, const std::string &identity, } inline std::array getDataTrackTestTokens() { - const char *token_a = std::getenv("LIVEKIT_CALLER_TOKEN"); + const char *token_a = std::getenv("LK_TOKEN_TEST_A"); if (token_a == nullptr || std::string(token_a).empty()) { throw std::runtime_error( - "LIVEKIT_CALLER_TOKEN must be present and non-empty for data track E2E " + "LK_TOKEN_TEST_A must be present and non-empty for data track E2E " "tests"); } - const char *token_b = std::getenv("LIVEKIT_RECEIVER_TOKEN"); + const char *token_b = std::getenv("LK_TOKEN_TEST_B"); if (token_b == nullptr || std::string(token_b).empty()) { throw std::runtime_error( - "LIVEKIT_RECEIVER_TOKEN must be present and non-empty for data track E2E " + "LK_TOKEN_TEST_B must be present and non-empty for data track E2E " "tests"); } @@ -216,7 +216,7 @@ testRooms(const std::vector &room_configs) { if (room_configs.size() > 2) { throw std::invalid_argument( - "testRooms supports at most two rooms with LIVEKIT_CALLER_TOKEN/B"); + "testRooms supports at most two rooms with LK_TOKEN_TEST_A/B"); } auto tokens = getDataTrackTestTokens(); @@ -508,8 +508,8 @@ class LiveKitTestBase : public ::testing::Test { /// Skip the test if the required environment variables are not set void skipIfNotConfigured() { if (!config_.available) { - GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " - "LIVEKIT_RECEIVER_TOKEN not set"; + GTEST_SKIP() + << "LIVEKIT_URL, LK_TOKEN_TEST_A, and LK_TOKEN_TEST_B not set"; } } diff --git a/src/tests/integration/test_data_track.cpp b/src/tests/integration/test_data_track.cpp index 31a9b15f..a3ef7a85 100644 --- a/src/tests/integration/test_data_track.cpp +++ b/src/tests/integration/test_data_track.cpp @@ -25,6 +25,7 @@ #include "ffi_client.h" +#include #include #include #include @@ -32,6 +33,7 @@ #include #include #include +#include #include namespace livekit::test { @@ -41,11 +43,11 @@ using namespace std::chrono_literals; namespace { constexpr char kTrackNamePrefix[] = "data_track_e2e"; -constexpr auto kPublishDuration = 5s; +constexpr auto kPublishDuration = 10s; constexpr auto kTrackWaitTimeout = 10s; -constexpr auto kReadTimeout = 30s; +constexpr auto kTransportTimeout = kPublishDuration + 25s; constexpr auto kPollingInterval = 10ms; -constexpr float kMinimumReceivedPercent = 0.95f; +constexpr float kMinimumReceivedPercent = 0.9f; constexpr int kResubscribeIterations = 10; constexpr int kPublishManyTrackCount = 256; constexpr auto kPublishManyTimeout = 5s; @@ -224,57 +226,53 @@ TEST_P(DataTrackTransportTest, PublishesAndReceivesFramesEndToEnd) { const auto publisher_identity = publisher_room->localParticipant()->identity(); + auto local_track = + requirePublishedTrack(publisher_room->localParticipant(), track_name); + ASSERT_TRUE(local_track->isPublished()); + EXPECT_FALSE(local_track->info().uses_e2ee); + EXPECT_EQ(local_track->info().name, track_name); + + auto remote_track = subscriber_delegate.waitForTrack(kTrackWaitTimeout); + ASSERT_NE(remote_track, nullptr) << "Timed out waiting for remote data track"; + EXPECT_TRUE(remote_track->isPublished()); + EXPECT_FALSE(remote_track->info().uses_e2ee); + EXPECT_EQ(remote_track->info().name, track_name); + EXPECT_EQ(remote_track->publisherIdentity(), publisher_identity); + + auto subscribe_result = remote_track->subscribe(); + if (!subscribe_result) { + FAIL() << describeDataTrackError(subscribe_result.error()); + } + auto subscription = subscribe_result.value(); + std::exception_ptr publish_error; std::thread publisher([&]() { try { - auto track = - requirePublishedTrack(publisher_room->localParticipant(), track_name); - if (!track->isPublished()) { - throw std::runtime_error("Publisher failed to publish data track"); - } - if (track->info().uses_e2ee) { - throw std::runtime_error("Unexpected E2EE on test data track"); - } - if (track->info().name != track_name) { - throw std::runtime_error("Published track name mismatch"); - } - const auto frame_interval = std::chrono::duration_cast( std::chrono::duration(1.0 / publish_fps)); auto next_send = std::chrono::steady_clock::now(); std::cout << "Publishing " << frame_count - << " frames with payload length " << payload_len << std::endl; + << " frames with payload length " << payload_len << '\n'; for (size_t index = 0; index < frame_count; ++index) { std::vector payload(payload_len, static_cast(index)); - requirePushSuccess(track->tryPush(std::move(payload)), + requirePushSuccess(local_track->tryPush(std::move(payload)), "Failed to push data frame"); next_send += frame_interval; std::this_thread::sleep_until(next_send); } - track->unpublishDataTrack(); + local_track->unpublishDataTrack(); } catch (...) { publish_error = std::current_exception(); } }); - auto remote_track = subscriber_delegate.waitForTrack(kTrackWaitTimeout); - ASSERT_NE(remote_track, nullptr) << "Timed out waiting for remote data track"; - EXPECT_TRUE(remote_track->isPublished()); - EXPECT_FALSE(remote_track->info().uses_e2ee); - EXPECT_EQ(remote_track->info().name, track_name); - EXPECT_EQ(remote_track->publisherIdentity(), publisher_identity); - - auto subscribe_result = remote_track->subscribe(); - if (!subscribe_result) { - FAIL() << describeDataTrackError(subscribe_result.error()); - } - auto subscription = subscribe_result.value(); - + const auto receive_min = static_cast( + static_cast(frame_count) * kMinimumReceivedPercent); std::promise receive_count_promise; auto receive_count_future = receive_count_promise.get_future(); std::exception_ptr subscribe_error; @@ -282,7 +280,7 @@ TEST_P(DataTrackTransportTest, PublishesAndReceivesFramesEndToEnd) { try { size_t received_count = 0; DataTrackFrame frame; - while (subscription->read(frame) && received_count < frame_count) { + while (subscription->read(frame) && received_count < receive_min) { if (frame.payload.empty()) { throw std::runtime_error("Received empty data frame"); } @@ -309,7 +307,7 @@ TEST_P(DataTrackTransportTest, PublishesAndReceivesFramesEndToEnd) { } }); - if (receive_count_future.wait_for(kReadTimeout) != + if (receive_count_future.wait_for(kTransportTimeout) != std::future_status::ready) { subscription->close(); ADD_FAILURE() << "Timed out waiting for data frames"; @@ -329,7 +327,7 @@ TEST_P(DataTrackTransportTest, PublishesAndReceivesFramesEndToEnd) { const auto received_percent = static_cast(received_count) / static_cast(frame_count); std::cout << "Received " << received_count << "/" << frame_count - << " frames (" << received_percent * 100.0f << "%)" << std::endl; + << " frames (" << received_percent * 100.0f << "%)" << '\n'; EXPECT_GE(received_percent, kMinimumReceivedPercent) << "Received " << received_count << "/" << frame_count << " frames"; @@ -578,7 +576,6 @@ TEST_F(DataTrackE2ETest, FfiClientSubscribeDataTrackReturnsSyncResult) { TEST_F(DataTrackE2ETest, PreservesUserTimestampEndToEnd) { const auto track_name = makeTrackName("user_timestamp"); - const auto sent_timestamp = getTimestampUs(); DataTrackPublishedDelegate subscriber_delegate; std::vector room_configs(2); @@ -619,21 +616,36 @@ TEST_F(DataTrackE2ETest, PreservesUserTimestampEndToEnd) { } }); - const auto push_result = - local_track->tryPush(std::vector(64, 0xFA), sent_timestamp); + std::atomic publishing{true}; + std::string push_error; + std::thread publisher([&]() { + while (publishing.load(std::memory_order_relaxed)) { + auto push_result = + local_track->tryPush(std::vector(64, 0xFA), + getTimestampUs()); + if (!push_result) { + push_error = describeDataTrackError(push_result.error()); + publishing.store(false, std::memory_order_relaxed); + return; + } + std::this_thread::sleep_for(50ms); + } + }); + const auto frame_status = frame_future.wait_for(5s); + publishing.store(false, std::memory_order_relaxed); if (frame_status != std::future_status::ready) { subscription->close(); } subscription->close(); + publisher.join(); reader.join(); local_track->unpublishDataTrack(); - if (!push_result) { - FAIL() << "Failed to push timestamped data frame: " - << describeDataTrackError(push_result.error()); + if (!push_error.empty()) { + FAIL() << "Failed to push timestamped data frame: " << push_error; } ASSERT_EQ(frame_status, std::future_status::ready) << "Timed out waiting for timestamped frame"; @@ -647,7 +659,9 @@ TEST_F(DataTrackE2ETest, PreservesUserTimestampEndToEnd) { ASSERT_FALSE(frame.payload.empty()); ASSERT_TRUE(frame.user_timestamp.has_value()); - EXPECT_EQ(frame.user_timestamp.value(), sent_timestamp); + const auto received_at = getTimestampUs(); + ASSERT_LE(frame.user_timestamp.value(), received_at); + EXPECT_LT(received_at - frame.user_timestamp.value(), 1000000u); } TEST_F(DataTrackE2ETest, PublishesAndReceivesEncryptedFramesEndToEnd) { diff --git a/src/tests/integration/test_media_multistream.cpp b/src/tests/integration/test_media_multistream.cpp index 8d48f3c5..46f0dea8 100644 --- a/src/tests/integration/test_media_multistream.cpp +++ b/src/tests/integration/test_media_multistream.cpp @@ -86,8 +86,8 @@ class MediaMultiStreamIntegrationTest : public LiveKitTestBase { void MediaMultiStreamIntegrationTest::runPublishTwoVideoAndTwoAudioTracks( bool single_peer_connection) { if (!config_.available) { - GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " - "LIVEKIT_RECEIVER_TOKEN not set"; + GTEST_SKIP() + << "LIVEKIT_URL, LK_TOKEN_TEST_A, and LK_TOKEN_TEST_B not set"; } RoomOptions options; @@ -100,11 +100,11 @@ void MediaMultiStreamIntegrationTest::runPublishTwoVideoAndTwoAudioTracks( auto receiver_room = std::make_unique(); receiver_room->setDelegate(&receiver_delegate); ASSERT_TRUE( - receiver_room->Connect(config_.url, config_.receiver_token, options)) + receiver_room->Connect(config_.url, config_.token_b, options)) << "Receiver failed to connect"; auto sender_room = std::make_unique(); - ASSERT_TRUE(sender_room->Connect(config_.url, config_.caller_token, options)) + ASSERT_TRUE(sender_room->Connect(config_.url, config_.token_a, options)) << "Sender failed to connect"; const std::string receiver_identity = diff --git a/src/tests/integration/test_room.cpp b/src/tests/integration/test_room.cpp index a5f4fbe5..d2e08dea 100644 --- a/src/tests/integration/test_room.cpp +++ b/src/tests/integration/test_room.cpp @@ -98,14 +98,14 @@ TEST_F(RoomTest, RemoteParticipantLookupBeforeConnect) { << "Looking up participant before connect should return nullptr"; } -// Server-dependent tests - require LIVEKIT_URL and LIVEKIT_TOKEN env vars +// Server-dependent tests - require LIVEKIT_URL and LK_TOKEN_TEST_A env vars class RoomServerTest : public ::testing::Test { protected: void SetUp() override { livekit::initialize(livekit::LogLevel::Info, livekit::LogSink::kConsole); const char *url_env = std::getenv("LIVEKIT_URL"); - const char *token_env = std::getenv("LIVEKIT_CALLER_TOKEN"); + const char *token_env = std::getenv("LK_TOKEN_TEST_A"); if (url_env && token_env) { server_url_ = url_env; @@ -123,7 +123,7 @@ class RoomServerTest : public ::testing::Test { TEST_F(RoomServerTest, ConnectToServer) { if (!server_available_) { - GTEST_SKIP() << "LIVEKIT_URL and LIVEKIT_TOKEN not set, skipping server " + GTEST_SKIP() << "LIVEKIT_URL and LK_TOKEN_TEST_A not set, skipping server " "connection test"; } diff --git a/src/tests/integration/test_rpc.cpp b/src/tests/integration/test_rpc.cpp index 939a6f66..686b0579 100644 --- a/src/tests/integration/test_rpc.cpp +++ b/src/tests/integration/test_rpc.cpp @@ -36,20 +36,20 @@ constexpr size_t kMaxRpcPayloadSize = 15 * 1024; // Test configuration from environment variables struct RpcTestConfig { std::string url; - std::string caller_token; - std::string receiver_token; + std::string token_a; + std::string token_b; bool available = false; static RpcTestConfig fromEnv() { RpcTestConfig config; const char *url = std::getenv("LIVEKIT_URL"); - const char *caller_token = std::getenv("LIVEKIT_CALLER_TOKEN"); - const char *receiver_token = std::getenv("LIVEKIT_RECEIVER_TOKEN"); + const char *token_a = std::getenv("LK_TOKEN_TEST_A"); + const char *token_b = std::getenv("LK_TOKEN_TEST_B"); - if (url && caller_token && receiver_token) { + if (url && token_a && token_b) { config.url = url; - config.caller_token = caller_token; - config.receiver_token = receiver_token; + config.token_a = token_a; + config.token_b = token_b; config.available = true; } return config; @@ -117,8 +117,8 @@ class RpcIntegrationTest : public ::testing::Test { // Test basic RPC round-trip TEST_F(RpcIntegrationTest, BasicRpcRoundTrip) { if (!config_.available) { - GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " - "LIVEKIT_RECEIVER_TOKEN not set"; + GTEST_SKIP() + << "LIVEKIT_URL, LK_TOKEN_TEST_A, and LK_TOKEN_TEST_B not set"; } // Create receiver room @@ -127,7 +127,7 @@ TEST_F(RpcIntegrationTest, BasicRpcRoundTrip) { receiver_options.auto_subscribe = true; bool receiver_connected = receiver_room->Connect( - config_.url, config_.receiver_token, receiver_options); + config_.url, config_.token_b, receiver_options); ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; std::string receiver_identity = receiver_room->localParticipant()->identity(); @@ -154,7 +154,7 @@ TEST_F(RpcIntegrationTest, BasicRpcRoundTrip) { caller_options.auto_subscribe = true; bool caller_connected = - caller_room->Connect(config_.url, config_.caller_token, caller_options); + caller_room->Connect(config_.url, config_.token_a, caller_options); ASSERT_TRUE(caller_connected) << "Caller failed to connect"; // Wait for receiver to be visible to caller @@ -187,8 +187,8 @@ TEST_F(RpcIntegrationTest, BasicRpcRoundTrip) { // Test maximum payload size (15KB) TEST_F(RpcIntegrationTest, MaxPayloadSize) { if (!config_.available) { - GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " - "LIVEKIT_RECEIVER_TOKEN not set"; + GTEST_SKIP() + << "LIVEKIT_URL, LK_TOKEN_TEST_A, and LK_TOKEN_TEST_B not set"; } auto receiver_room = std::make_unique(); @@ -196,7 +196,7 @@ TEST_F(RpcIntegrationTest, MaxPayloadSize) { options.auto_subscribe = true; bool receiver_connected = - receiver_room->Connect(config_.url, config_.receiver_token, options); + receiver_room->Connect(config_.url, config_.token_b, options); ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; std::string receiver_identity = receiver_room->localParticipant()->identity(); @@ -210,7 +210,7 @@ TEST_F(RpcIntegrationTest, MaxPayloadSize) { auto caller_room = std::make_unique(); bool caller_connected = - caller_room->Connect(config_.url, config_.caller_token, options); + caller_room->Connect(config_.url, config_.token_a, options); ASSERT_TRUE(caller_connected) << "Caller failed to connect"; bool receiver_visible = @@ -232,8 +232,8 @@ TEST_F(RpcIntegrationTest, MaxPayloadSize) { // Test RPC timeout TEST_F(RpcIntegrationTest, RpcTimeout) { if (!config_.available) { - GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " - "LIVEKIT_RECEIVER_TOKEN not set"; + GTEST_SKIP() + << "LIVEKIT_URL, LK_TOKEN_TEST_A, and LK_TOKEN_TEST_B not set"; } auto receiver_room = std::make_unique(); @@ -241,7 +241,7 @@ TEST_F(RpcIntegrationTest, RpcTimeout) { options.auto_subscribe = true; bool receiver_connected = - receiver_room->Connect(config_.url, config_.receiver_token, options); + receiver_room->Connect(config_.url, config_.token_b, options); ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; std::string receiver_identity = receiver_room->localParticipant()->identity(); @@ -256,7 +256,7 @@ TEST_F(RpcIntegrationTest, RpcTimeout) { auto caller_room = std::make_unique(); bool caller_connected = - caller_room->Connect(config_.url, config_.caller_token, options); + caller_room->Connect(config_.url, config_.token_a, options); ASSERT_TRUE(caller_connected) << "Caller failed to connect"; bool receiver_visible = @@ -279,8 +279,8 @@ TEST_F(RpcIntegrationTest, RpcTimeout) { // Test RPC with unsupported method TEST_F(RpcIntegrationTest, UnsupportedMethod) { if (!config_.available) { - GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " - "LIVEKIT_RECEIVER_TOKEN not set"; + GTEST_SKIP() + << "LIVEKIT_URL, LK_TOKEN_TEST_A, and LK_TOKEN_TEST_B not set"; } auto receiver_room = std::make_unique(); @@ -288,14 +288,14 @@ TEST_F(RpcIntegrationTest, UnsupportedMethod) { options.auto_subscribe = true; bool receiver_connected = - receiver_room->Connect(config_.url, config_.receiver_token, options); + receiver_room->Connect(config_.url, config_.token_b, options); ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; std::string receiver_identity = receiver_room->localParticipant()->identity(); auto caller_room = std::make_unique(); bool caller_connected = - caller_room->Connect(config_.url, config_.caller_token, options); + caller_room->Connect(config_.url, config_.token_a, options); ASSERT_TRUE(caller_connected) << "Caller failed to connect"; bool receiver_visible = @@ -319,8 +319,8 @@ TEST_F(RpcIntegrationTest, UnsupportedMethod) { // Test RPC with application error TEST_F(RpcIntegrationTest, ApplicationError) { if (!config_.available) { - GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " - "LIVEKIT_RECEIVER_TOKEN not set"; + GTEST_SKIP() + << "LIVEKIT_URL, LK_TOKEN_TEST_A, and LK_TOKEN_TEST_B not set"; } auto receiver_room = std::make_unique(); @@ -328,7 +328,7 @@ TEST_F(RpcIntegrationTest, ApplicationError) { options.auto_subscribe = true; bool receiver_connected = - receiver_room->Connect(config_.url, config_.receiver_token, options); + receiver_room->Connect(config_.url, config_.token_b, options); ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; std::string receiver_identity = receiver_room->localParticipant()->identity(); @@ -342,7 +342,7 @@ TEST_F(RpcIntegrationTest, ApplicationError) { auto caller_room = std::make_unique(); bool caller_connected = - caller_room->Connect(config_.url, config_.caller_token, options); + caller_room->Connect(config_.url, config_.token_a, options); ASSERT_TRUE(caller_connected) << "Caller failed to connect"; bool receiver_visible = @@ -366,8 +366,8 @@ TEST_F(RpcIntegrationTest, ApplicationError) { // Test multiple concurrent RPC calls TEST_F(RpcIntegrationTest, ConcurrentRpcCalls) { if (!config_.available) { - GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " - "LIVEKIT_RECEIVER_TOKEN not set"; + GTEST_SKIP() + << "LIVEKIT_URL, LK_TOKEN_TEST_A, and LK_TOKEN_TEST_B not set"; } auto receiver_room = std::make_unique(); @@ -375,7 +375,7 @@ TEST_F(RpcIntegrationTest, ConcurrentRpcCalls) { options.auto_subscribe = true; bool receiver_connected = - receiver_room->Connect(config_.url, config_.receiver_token, options); + receiver_room->Connect(config_.url, config_.token_b, options); ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; std::string receiver_identity = receiver_room->localParticipant()->identity(); @@ -393,7 +393,7 @@ TEST_F(RpcIntegrationTest, ConcurrentRpcCalls) { auto caller_room = std::make_unique(); bool caller_connected = - caller_room->Connect(config_.url, config_.caller_token, options); + caller_room->Connect(config_.url, config_.token_a, options); ASSERT_TRUE(caller_connected) << "Caller failed to connect"; bool receiver_visible = @@ -434,8 +434,8 @@ TEST_F(RpcIntegrationTest, ConcurrentRpcCalls) { // Integration test: Run for approximately 1 minute TEST_F(RpcIntegrationTest, OneMinuteIntegration) { if (!config_.available) { - GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " - "LIVEKIT_RECEIVER_TOKEN not set"; + GTEST_SKIP() + << "LIVEKIT_URL, LK_TOKEN_TEST_A, and LK_TOKEN_TEST_B not set"; } auto receiver_room = std::make_unique(); @@ -443,7 +443,7 @@ TEST_F(RpcIntegrationTest, OneMinuteIntegration) { options.auto_subscribe = true; bool receiver_connected = - receiver_room->Connect(config_.url, config_.receiver_token, options); + receiver_room->Connect(config_.url, config_.token_b, options); ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; std::string receiver_identity = receiver_room->localParticipant()->identity(); @@ -461,7 +461,7 @@ TEST_F(RpcIntegrationTest, OneMinuteIntegration) { auto caller_room = std::make_unique(); bool caller_connected = - caller_room->Connect(config_.url, config_.caller_token, options); + caller_room->Connect(config_.url, config_.token_a, options); ASSERT_TRUE(caller_connected) << "Caller failed to connect"; bool receiver_visible = diff --git a/src/tests/integration/test_video_frame_metadata.cpp b/src/tests/integration/test_video_frame_metadata.cpp index 59dc1557..faf8dcb8 100644 --- a/src/tests/integration/test_video_frame_metadata.cpp +++ b/src/tests/integration/test_video_frame_metadata.cpp @@ -15,12 +15,15 @@ */ #include "tests/common/test_common.h" -#include "video_utils.h" +#include +#include #include #include +#include #include #include +#include namespace livekit::test { @@ -35,8 +38,8 @@ TEST_F(VideoFrameMetadataServerTest, RoomOptions options; ASSERT_TRUE( - receiver_room.Connect(config_.url, config_.receiver_token, options)); - ASSERT_TRUE(sender_room.Connect(config_.url, config_.caller_token, options)); + receiver_room.Connect(config_.url, config_.token_b, options)); + ASSERT_TRUE(sender_room.Connect(config_.url, config_.token_a, options)); ASSERT_NE(sender_room.localParticipant(), nullptr); ASSERT_NE(receiver_room.localParticipant(), nullptr); @@ -54,8 +57,12 @@ TEST_F(VideoFrameMetadataServerTest, sender_identity, track_name, [&mutex, &cv, &received_user_timestamp_us](const VideoFrameEvent &event) { std::lock_guard lock(mutex); - if (event.metadata && event.metadata->user_timestamp_us.has_value()) { - received_user_timestamp_us = event.metadata->user_timestamp_us; + if (!event.metadata) { + return; + } + const auto &user_timestamp_us = event.metadata->user_timestamp_us; + if (user_timestamp_us.has_value() && *user_timestamp_us != 0) { + received_user_timestamp_us = user_timestamp_us; cv.notify_all(); } }); @@ -105,31 +112,57 @@ TEST_F(VideoFrameMetadataServerTest, ASSERT_TRUE(receiver_track_ready) << "Timed out waiting for receiver video track subscription"; - VideoFrame frame = VideoFrame::create(16, 16, VideoBufferType::RGBA); - std::fill(frame.data(), frame.data() + frame.dataSize(), 0x7f); - - const std::uint64_t expected_user_timestamp_us = getTimestampUs(); - VideoCaptureOptions capture_options; - capture_options.timestamp_us = - static_cast(expected_user_timestamp_us); - capture_options.metadata = VideoFrameMetadata{}; - capture_options.metadata->user_timestamp_us = expected_user_timestamp_us; - - source->captureFrame(frame, capture_options); + std::atomic publishing{true}; + std::thread publisher([&]() { + VideoFrame frame = VideoFrame::create(16, 16, VideoBufferType::RGBA); + std::fill(frame.data(), frame.data() + frame.dataSize(), 0x7f); + + while (publishing.load(std::memory_order_relaxed)) { + const std::uint64_t user_timestamp_us = getTimestampUs(); + VideoCaptureOptions capture_options; + capture_options.timestamp_us = + static_cast(user_timestamp_us); + capture_options.metadata = VideoFrameMetadata{}; + capture_options.metadata->user_timestamp_us = user_timestamp_us; + + try { + source->captureFrame(frame, capture_options); + } catch (...) { + publishing.store(false, std::memory_order_relaxed); + break; + } + std::this_thread::sleep_for(50ms); + } + }); + bool got_metadata = false; { std::unique_lock lock(mutex); - ASSERT_TRUE(cv.wait_for(lock, 10s, [&received_user_timestamp_us] { + got_metadata = cv.wait_for(lock, 10s, [&received_user_timestamp_us] { return received_user_timestamp_us.has_value(); - })) - << "Timed out waiting for user timestamp metadata"; - EXPECT_EQ(*received_user_timestamp_us, expected_user_timestamp_us); + }); + } + + publishing.store(false, std::memory_order_relaxed); + publisher.join(); + + std::optional received_user_timestamp_snapshot; + { + std::lock_guard lock(mutex); + received_user_timestamp_snapshot = received_user_timestamp_us; } receiver_room.clearOnVideoFrameCallback(sender_identity, track_name); if (track->publication()) { sender_room.localParticipant()->unpublishTrack(track->publication()->sid()); } + + ASSERT_TRUE(got_metadata) << "Timed out waiting for user timestamp metadata"; + ASSERT_TRUE(received_user_timestamp_snapshot.has_value()); + + const auto received_at = getTimestampUs(); + ASSERT_LE(*received_user_timestamp_snapshot, received_at); + EXPECT_LT(received_at - *received_user_timestamp_snapshot, 1000000u); } } // namespace livekit::test diff --git a/src/tests/stress/test_latency_measurement.cpp b/src/tests/stress/test_latency_measurement.cpp index 23e89bba..9b6b30f8 100644 --- a/src/tests/stress/test_latency_measurement.cpp +++ b/src/tests/stress/test_latency_measurement.cpp @@ -292,7 +292,7 @@ TEST_F(LatencyMeasurementTest, ConnectionTime) { auto start = std::chrono::high_resolution_clock::now(); // Room::Connect() has built-in TRACE_EVENT0 for automatic timing - bool connected = room->Connect(config_.url, config_.caller_token, options); + bool connected = room->Connect(config_.url, config_.token_a, options); auto end = std::chrono::high_resolution_clock::now(); if (connected) { @@ -395,16 +395,16 @@ TEST_F(LatencyMeasurementTest, AudioLatency) { options.auto_subscribe = true; bool receiver_connected = - receiver_room->Connect(config_.url, config_.receiver_token, options); + receiver_room->Connect(config_.url, config_.token_b, options); ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; std::string receiver_identity = receiver_room->localParticipant()->identity(); std::cout << "Receiver connected as: " << receiver_identity << std::endl; - // Create sender room (using caller_token) + // Create sender room (using token_a) auto sender_room = std::make_unique(); bool sender_connected = - sender_room->Connect(config_.url, config_.caller_token, options); + sender_room->Connect(config_.url, config_.token_a, options); ASSERT_TRUE(sender_connected) << "Sender failed to connect"; std::string sender_identity = sender_room->localParticipant()->identity(); @@ -605,9 +605,9 @@ TEST_F(LatencyMeasurementTest, FullDeplexAudioLatency) { RoomOptions options; options.auto_subscribe = true; - ASSERT_TRUE(room_a->Connect(config_.url, config_.caller_token, options)) + ASSERT_TRUE(room_a->Connect(config_.url, config_.token_a, options)) << "Participant A failed to connect"; - ASSERT_TRUE(room_b->Connect(config_.url, config_.receiver_token, options)) + ASSERT_TRUE(room_b->Connect(config_.url, config_.token_b, options)) << "Participant B failed to connect"; std::string id_a = room_a->localParticipant()->identity(); diff --git a/src/tests/stress/test_room_stress.cpp b/src/tests/stress/test_room_stress.cpp index 1f84db10..fb42d67d 100644 --- a/src/tests/stress/test_room_stress.cpp +++ b/src/tests/stress/test_room_stress.cpp @@ -192,7 +192,7 @@ class RoomServerStressTest : public ::testing::Test { livekit::initialize(livekit::LogLevel::Info, livekit::LogSink::kConsole); const char *url_env = std::getenv("LIVEKIT_URL"); - const char *token_env = std::getenv("LIVEKIT_CALLER_TOKEN"); + const char *token_env = std::getenv("LK_TOKEN_TEST_A"); if (url_env && token_env) { server_url_ = url_env; @@ -210,8 +210,8 @@ class RoomServerStressTest : public ::testing::Test { TEST_F(RoomServerStressTest, RepeatedConnectDisconnect) { if (!server_available_) { - GTEST_SKIP() - << "LIVEKIT_URL and LIVEKIT_TOKEN not set, skipping server stress test"; + GTEST_SKIP() << "LIVEKIT_URL and LK_TOKEN_TEST_A not set, skipping server " + "stress test"; } const int num_iterations = 10; diff --git a/src/tests/stress/test_rpc_stress.cpp b/src/tests/stress/test_rpc_stress.cpp index 9db7a9ec..89e249fd 100644 --- a/src/tests/stress/test_rpc_stress.cpp +++ b/src/tests/stress/test_rpc_stress.cpp @@ -164,7 +164,7 @@ TEST_F(RpcStressTest, MaxPayloadStress) { options.auto_subscribe = true; bool receiver_connected = - receiver_room->Connect(config_.url, config_.receiver_token, options); + receiver_room->Connect(config_.url, config_.token_b, options); ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; auto receiver_info = receiver_room->room_info(); @@ -189,7 +189,7 @@ TEST_F(RpcStressTest, MaxPayloadStress) { // Create caller room auto caller_room = std::make_unique(); bool caller_connected = - caller_room->Connect(config_.url, config_.caller_token, options); + caller_room->Connect(config_.url, config_.token_a, options); ASSERT_TRUE(caller_connected) << "Caller failed to connect"; auto caller_info = caller_room->room_info(); @@ -366,7 +366,7 @@ TEST_F(RpcStressTest, SmallPayloadStress) { options.auto_subscribe = true; bool receiver_connected = - receiver_room->Connect(config_.url, config_.receiver_token, options); + receiver_room->Connect(config_.url, config_.token_b, options); ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; auto receiver_info = receiver_room->room_info(); @@ -390,7 +390,7 @@ TEST_F(RpcStressTest, SmallPayloadStress) { // Create caller room auto caller_room = std::make_unique(); bool caller_connected = - caller_room->Connect(config_.url, config_.caller_token, options); + caller_room->Connect(config_.url, config_.token_a, options); ASSERT_TRUE(caller_connected) << "Caller failed to connect"; auto caller_info = caller_room->room_info(); @@ -553,11 +553,11 @@ TEST_F(RpcStressTest, BidirectionalRpcStress) { options.auto_subscribe = true; bool a_connected = - room_a->Connect(config_.url, config_.caller_token, options); + room_a->Connect(config_.url, config_.token_a, options); ASSERT_TRUE(a_connected) << "Room A failed to connect"; bool b_connected = - room_b->Connect(config_.url, config_.receiver_token, options); + room_b->Connect(config_.url, config_.token_b, options); ASSERT_TRUE(b_connected) << "Room B failed to connect"; std::string identity_a = room_a->localParticipant()->identity(); @@ -774,7 +774,7 @@ TEST_F(RpcStressTest, HighThroughputBurst) { options.auto_subscribe = true; bool receiver_connected = - receiver_room->Connect(config_.url, config_.receiver_token, options); + receiver_room->Connect(config_.url, config_.token_b, options); ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; std::string receiver_identity = receiver_room->localParticipant()->identity(); @@ -792,7 +792,7 @@ TEST_F(RpcStressTest, HighThroughputBurst) { auto caller_room = std::make_unique(); bool caller_connected = - caller_room->Connect(config_.url, config_.caller_token, options); + caller_room->Connect(config_.url, config_.token_a, options); ASSERT_TRUE(caller_connected) << "Caller failed to connect"; bool receiver_visible =