Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ Increment the:
* [SDK] MeterProvider: do not warn in destructor after explicit Shutdown
[#4085](https://github.com/open-telemetry/opentelemetry-cpp/pull/4085)

* [EXPORTER] Handle OTLP partial success response
[#4104](https://github.com/open-telemetry/opentelemetry-cpp/pull/4104)

## [1.27.0] 2026-05-13

* [RELEASE] Bump main branch to 1.27.0-dev
Expand Down
3 changes: 3 additions & 0 deletions exporters/otlp/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,7 @@ cc_test(
":otlp_grpc_exporter",
"//api",
"//sdk/src/metrics",
"//test_common:headers",
"@com_google_googletest//:gtest_main",
],
)
Expand Down Expand Up @@ -876,6 +877,7 @@ cc_test(
"//api",
"//sdk/src/logs",
"//sdk/src/metrics",
"//test_common:headers",
"@com_google_googletest//:gtest_main",
],
)
Expand Down Expand Up @@ -909,6 +911,7 @@ cc_test(
":otlp_grpc_metric_exporter",
"//api",
"//sdk/src/metrics",
"//test_common:headers",
"@com_google_googletest//:gtest_main",
],
)
Expand Down
24 changes: 16 additions & 8 deletions exporters/otlp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@ if(BUILD_TESTING)
add_executable(otlp_grpc_exporter_test test/otlp_grpc_exporter_test.cc)
target_link_libraries(
otlp_grpc_exporter_test ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}
${GMOCK_LIB} opentelemetry_exporter_otlp_grpc)
${GMOCK_LIB} opentelemetry_exporter_otlp_grpc opentelemetry_test_common)
gtest_add_tests(
TARGET otlp_grpc_exporter_test
TEST_PREFIX exporter.otlp.
Expand Down Expand Up @@ -942,7 +942,8 @@ if(BUILD_TESTING)
opentelemetry_exporter_otlp_grpc
opentelemetry_exporter_otlp_grpc_log
opentelemetry_trace
opentelemetry_logs)
opentelemetry_logs
opentelemetry_test_common)
gtest_add_tests(
TARGET otlp_grpc_log_record_exporter_test
TEST_PREFIX exporter.otlp.
Expand All @@ -962,9 +963,13 @@ if(BUILD_TESTING)
add_executable(otlp_grpc_metric_exporter_test
test/otlp_grpc_metric_exporter_test.cc)
target_link_libraries(
otlp_grpc_metric_exporter_test ${GTEST_BOTH_LIBRARIES}
${CMAKE_THREAD_LIBS_INIT} ${GMOCK_LIB} opentelemetry_exporter_otlp_grpc
opentelemetry_exporter_otlp_grpc_metrics)
otlp_grpc_metric_exporter_test
${GTEST_BOTH_LIBRARIES}
${CMAKE_THREAD_LIBS_INIT}
${GMOCK_LIB}
opentelemetry_exporter_otlp_grpc
opentelemetry_exporter_otlp_grpc_metrics
opentelemetry_test_common)
gtest_add_tests(
TARGET otlp_grpc_metric_exporter_test
TEST_PREFIX exporter.otlp.
Expand Down Expand Up @@ -992,7 +997,8 @@ if(BUILD_TESTING)
opentelemetry_exporter_otlp_http
opentelemetry_http_client_nosend
nlohmann_json::nlohmann_json
protobuf::libprotobuf)
protobuf::libprotobuf
opentelemetry_test_common)
gtest_add_tests(
TARGET otlp_http_exporter_test
TEST_PREFIX exporter.otlp.
Expand All @@ -1018,7 +1024,8 @@ if(BUILD_TESTING)
opentelemetry_exporter_otlp_http_log
opentelemetry_logs
opentelemetry_http_client_nosend
nlohmann_json::nlohmann_json)
nlohmann_json::nlohmann_json
opentelemetry_test_common)
gtest_add_tests(
TARGET otlp_http_log_record_exporter_test
TEST_PREFIX exporter.otlp.
Expand Down Expand Up @@ -1046,7 +1053,8 @@ if(BUILD_TESTING)
opentelemetry_metrics
opentelemetry_http_client_nosend
nlohmann_json::nlohmann_json
protobuf::libprotobuf)
protobuf::libprotobuf
opentelemetry_test_common)
gtest_add_tests(
TARGET otlp_http_metric_exporter_test
TEST_PREFIX exporter.otlp.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <grpcpp/support/status.h>
#include <atomic>
#include <chrono>
#include <functional>
#include <memory>
#include <string>

Expand All @@ -22,8 +23,6 @@
// clang-format on

#ifdef ENABLE_ASYNC_EXPORT
# include <functional>

# include "opentelemetry/sdk/common/exporter_utils.h"
#endif /* ENABLE_ASYNC_EXPORT */

Expand Down Expand Up @@ -151,21 +150,30 @@ class OtlpGrpcClient
std::unique_ptr<grpc::ClientContext> &&context,
std::unique_ptr<google::protobuf::Arena> &&arena,
proto::collector::trace::v1::ExportTraceServiceRequest *request,
proto::collector::trace::v1::ExportTraceServiceResponse *response);
proto::collector::trace::v1::ExportTraceServiceResponse *response,
std::function<void(std::unique_ptr<google::protobuf::Arena> &&,
proto::collector::trace::v1::ExportTraceServiceResponse *)> &&on_complete =
{});

static grpc::Status DelegateExport(
proto::collector::metrics::v1::MetricsService::StubInterface *stub,
std::unique_ptr<grpc::ClientContext> &&context,
std::unique_ptr<google::protobuf::Arena> &&arena,
proto::collector::metrics::v1::ExportMetricsServiceRequest *request,
proto::collector::metrics::v1::ExportMetricsServiceResponse *response);
proto::collector::metrics::v1::ExportMetricsServiceResponse *response,
std::function<void(std::unique_ptr<google::protobuf::Arena> &&,
proto::collector::metrics::v1::ExportMetricsServiceResponse *)>
&&on_complete = {});

static grpc::Status DelegateExport(
proto::collector::logs::v1::LogsService::StubInterface *stub,
std::unique_ptr<grpc::ClientContext> &&context,
std::unique_ptr<google::protobuf::Arena> &&arena,
proto::collector::logs::v1::ExportLogsServiceRequest *request,
proto::collector::logs::v1::ExportLogsServiceResponse *response);
proto::collector::logs::v1::ExportLogsServiceResponse *response,
std::function<void(std::unique_ptr<google::protobuf::Arena> &&,
proto::collector::logs::v1::ExportLogsServiceResponse *)> &&on_complete =
{});

void AddReference(OtlpGrpcClientReferenceGuard &guard,
const OtlpGrpcClientOptions &options) noexcept;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@
#include "opentelemetry/sdk/common/thread_instrumentation.h"
#include "opentelemetry/version.h"

// forward declare google::protobuf::Message
// forward declare google::protobuf::Message and google::protobuf::Arena
namespace google
{
namespace protobuf
{
class Arena;
class Message;
}
} // namespace protobuf
} // namespace google

OPENTELEMETRY_BEGIN_NAMESPACE
Expand Down Expand Up @@ -204,6 +205,24 @@ class OtlpHttpClient
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback,
std::size_t max_running_requests) noexcept;

/**
* Async export with typed response
* @param message message to export, it should be ExportTraceServiceRequest,
* ExportMetricsServiceRequest or ExportLogsServiceRequest
* @param arena protobuf arena that owns response
* @param response the parsed body is written here on 2xx
* @param result_callback callback to call when the exporting is done
* @param max_running_requests wait for at most max_running_requests running requests
* @return return the status of this operation
*/
sdk::common::ExportResult Export(
const google::protobuf::Message &message,
std::unique_ptr<google::protobuf::Arena> &&arena,
google::protobuf::Message *response,
std::function<bool(opentelemetry::sdk::common::ExportResult, google::protobuf::Message *)>
&&result_callback,
std::size_t max_running_requests) noexcept;

/**
* Force flush the HTTP client.
*/
Expand Down Expand Up @@ -242,13 +261,20 @@ class OtlpHttpClient
std::shared_ptr<opentelemetry::ext::http::client::Session> session;
std::shared_ptr<opentelemetry::ext::http::client::EventHandler> event_handle;

HttpSessionData() = default;
std::unique_ptr<google::protobuf::Arena> arena;
google::protobuf::Message *response = nullptr;

HttpSessionData() noexcept;
HttpSessionData(std::shared_ptr<opentelemetry::ext::http::client::Session> &&input_session,
std::shared_ptr<opentelemetry::ext::http::client::EventHandler> &&input_handle,
std::unique_ptr<google::protobuf::Arena> &&input_arena,
google::protobuf::Message *input_response) noexcept;

explicit HttpSessionData(
std::shared_ptr<opentelemetry::ext::http::client::Session> &&input_session,
std::shared_ptr<opentelemetry::ext::http::client::EventHandler> &&input_handle)
: session(std::move(input_session)), event_handle(std::move(input_handle))
{}
~HttpSessionData();
HttpSessionData(HttpSessionData &&) noexcept;
HttpSessionData &operator=(HttpSessionData &&) noexcept;
HttpSessionData(const HttpSessionData &) = delete;
HttpSessionData &operator=(const HttpSessionData &) = delete;
};

/**
Expand All @@ -260,6 +286,21 @@ class OtlpHttpClient
const google::protobuf::Message &message,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept;

/**
* @brief Create a Session object that deserializes the response body or return an error result.
*
* @param message The message to send
* @param arena Protobuf arena that owns response
* @param response the parsed body is written here on 2xx
* @param result_callback Callback for the export result; receives the populated response
*/
nostd::variant<sdk::common::ExportResult, HttpSessionData> createSession(
const google::protobuf::Message &message,
std::unique_ptr<google::protobuf::Arena> &&arena,
google::protobuf::Message *response,
std::function<bool(opentelemetry::sdk::common::ExportResult, google::protobuf::Message *)>
&&result_callback) noexcept;

/**
* Add http session and hold it's lifetime.
* @param session_data the session to add
Expand Down
37 changes: 31 additions & 6 deletions exporters/otlp/src/otlp_grpc_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -515,35 +515,60 @@ grpc::Status OtlpGrpcClient::DelegateExport(
std::unique_ptr<grpc::ClientContext> &&context,
std::unique_ptr<google::protobuf::Arena> &&arena,
proto::collector::trace::v1::ExportTraceServiceRequest *request,
proto::collector::trace::v1::ExportTraceServiceResponse *response)
proto::collector::trace::v1::ExportTraceServiceResponse *response,
std::function<void(std::unique_ptr<google::protobuf::Arena> &&,
proto::collector::trace::v1::ExportTraceServiceResponse *)> &&on_complete)
{
auto trace_grpc_context = std::move(context);
auto trace_arena = std::move(arena);
return stub->Export(trace_grpc_context.get(), *request, response);
auto status = stub->Export(trace_grpc_context.get(), *request, response);
if (status.ok() && on_complete)
{
auto callback = std::move(on_complete);
callback(std::move(trace_arena), response);
}
return status;
}

grpc::Status OtlpGrpcClient::DelegateExport(
proto::collector::metrics::v1::MetricsService::StubInterface *stub,
std::unique_ptr<grpc::ClientContext> &&context,
std::unique_ptr<google::protobuf::Arena> &&arena,
proto::collector::metrics::v1::ExportMetricsServiceRequest *request,
proto::collector::metrics::v1::ExportMetricsServiceResponse *response)
proto::collector::metrics::v1::ExportMetricsServiceResponse *response,
std::function<void(std::unique_ptr<google::protobuf::Arena> &&,
proto::collector::metrics::v1::ExportMetricsServiceResponse *)>
&&on_complete)
{
auto metrics_grpc_context = std::move(context);
auto metrics_arena = std::move(arena);
return stub->Export(metrics_grpc_context.get(), *request, response);
auto status = stub->Export(metrics_grpc_context.get(), *request, response);
if (status.ok() && on_complete)
{
auto callback = std::move(on_complete);
callback(std::move(metrics_arena), response);
}
return status;
}

grpc::Status OtlpGrpcClient::DelegateExport(
proto::collector::logs::v1::LogsService::StubInterface *stub,
std::unique_ptr<grpc::ClientContext> &&context,
std::unique_ptr<google::protobuf::Arena> &&arena,
proto::collector::logs::v1::ExportLogsServiceRequest *request,
proto::collector::logs::v1::ExportLogsServiceResponse *response)
proto::collector::logs::v1::ExportLogsServiceResponse *response,
std::function<void(std::unique_ptr<google::protobuf::Arena> &&,
proto::collector::logs::v1::ExportLogsServiceResponse *)> &&on_complete)
{
auto logs_grpc_context = std::move(context);
auto logs_arena = std::move(arena);
return stub->Export(logs_grpc_context.get(), *request, response);
auto status = stub->Export(logs_grpc_context.get(), *request, response);
if (status.ok() && on_complete)
{
auto callback = std::move(on_complete);
callback(std::move(logs_arena), response);
}
return status;
}

void OtlpGrpcClient::AddReference(OtlpGrpcClientReferenceGuard &guard,
Expand Down
41 changes: 30 additions & 11 deletions exporters/otlp/src/otlp_grpc_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <grpcpp/support/status.h>
#include <atomic>
#include <chrono>
#include <functional>
#include <memory>
#include <new>
#include <ostream>
Expand Down Expand Up @@ -32,10 +33,6 @@
#include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" // IWYU pragma: keep
// clang-format on

#ifdef ENABLE_ASYNC_EXPORT
# include <functional>
#endif

OPENTELEMETRY_BEGIN_NAMESPACE
namespace exporter
{
Expand Down Expand Up @@ -151,14 +148,23 @@ sdk::common::ExportResult OtlpGrpcExporter::Export(
opentelemetry::sdk::common::ExportResult result,
std::unique_ptr<google::protobuf::Arena> &&arena,
const proto::collector::trace::v1::ExportTraceServiceRequest &request,
proto::collector::trace::v1::ExportTraceServiceResponse *) {
proto::collector::trace::v1::ExportTraceServiceResponse *response) {
auto trace_arena = std::move(arena);
if (result != opentelemetry::sdk::common::ExportResult::kSuccess)
{
OTEL_INTERNAL_LOG_ERROR("[OTLP TRACE GRPC Exporter] ERROR: Export "
<< request.resource_spans_size()
<< " trace span(s) error: " << static_cast<int>(result));
}
else if (response->has_partial_success() &&
(response->partial_success().rejected_spans() != 0 ||
!response->partial_success().error_message().empty()))
{
const auto &partial = response->partial_success();
OTEL_INTERNAL_LOG_ERROR("[OTLP TRACE GRPC Exporter] Export partial success: "
<< partial.rejected_spans() << " span(s) rejected: \""
<< partial.error_message() << "\"");
}
else
{
OTEL_INTERNAL_LOG_DEBUG("[OTLP TRACE GRPC Exporter] Export "
Expand All @@ -175,19 +181,32 @@ sdk::common::ExportResult OtlpGrpcExporter::Export(
google::protobuf::Arena::Create<proto::collector::trace::v1::ExportTraceServiceResponse>(
arena.get());
grpc::Status status = OtlpGrpcClient::DelegateExport(
trace_service_stub_.get(), std::move(context), std::move(arena), request, response);
trace_service_stub_.get(), std::move(context), std::move(arena), request, response,
[resource_spans_size](std::unique_ptr<google::protobuf::Arena> &&arena,
proto::collector::trace::v1::ExportTraceServiceResponse *response) {
auto trace_arena = std::move(arena);
if (response->has_partial_success() &&
(response->partial_success().rejected_spans() != 0 ||
!response->partial_success().error_message().empty()))
{
const auto &partial = response->partial_success();
OTEL_INTERNAL_LOG_ERROR("[OTLP TRACE GRPC Exporter] Export partial success: "
<< partial.rejected_spans() << " span(s) rejected: \""
<< partial.error_message() << "\"");
}
else
{
OTEL_INTERNAL_LOG_DEBUG("[OTLP TRACE GRPC Exporter] Export "
<< resource_spans_size << " trace span(s) success");
}
});
if (!status.ok())
{
OTEL_INTERNAL_LOG_ERROR("[OTLP TRACE GRPC Exporter] Export() failed with status_code: \""
<< grpc_utils::grpc_status_code_to_string(status.error_code())
<< "\" error_message: \"" << status.error_message() << "\"");
return sdk::common::ExportResult::kFailure;
}
else
{
OTEL_INTERNAL_LOG_DEBUG("[OTLP TRACE GRPC Exporter] Export " << resource_spans_size
<< " trace span(s) success");
}
#ifdef ENABLE_ASYNC_EXPORT
}
#endif
Expand Down
Loading
Loading