Skip to content

Commit fafd173

Browse files
authored
Merge pull request #27 from lalitb/preallocate-plus-logrecord-cow
Preallocate plus logrecord cow
2 parents 2f64386 + ae33b5e commit fafd173

File tree

9 files changed

+134
-53
lines changed

9 files changed

+134
-53
lines changed

opentelemetry-otlp/src/logs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ impl LogExporter {
100100
impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
101101
async fn export<'a>(
102102
&mut self,
103-
batch: Vec<std::borrow::Cow<'a, LogData>>,
103+
batch: Vec<std::borrow::Cow<'a, LogData<'a>>>,
104104
) -> opentelemetry::logs::LogResult<()> {
105105
self.client.export(batch).await
106106
}

opentelemetry-proto/src/transform/common.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,40 @@ pub mod tonic {
108108
}
109109
}
110110

111+
impl
112+
From<(
113+
Cow<'_, opentelemetry_sdk::InstrumentationLibrary>,
114+
Option<Cow<'static, str>>,
115+
)> for InstrumentationScope
116+
{
117+
fn from(
118+
data: (
119+
Cow<'_, opentelemetry_sdk::InstrumentationLibrary>,
120+
Option<Cow<'static, str>>,
121+
),
122+
) -> Self {
123+
let (library, target) = data;
124+
if let Some(t) = target {
125+
InstrumentationScope {
126+
name: t.to_string(),
127+
version: String::new(),
128+
attributes: vec![],
129+
..Default::default()
130+
}
131+
} else {
132+
InstrumentationScope {
133+
name: library.name.clone().into_owned(),
134+
version: library
135+
.version
136+
.as_ref()
137+
.map(ToString::to_string)
138+
.unwrap_or_default(),
139+
attributes: Attributes::from(library.attributes.clone()).0,
140+
..Default::default()
141+
}
142+
}
143+
}
144+
}
111145
/// Wrapper type for Vec<`KeyValue`>
112146
#[derive(Default, Debug)]
113147
pub struct Attributes(pub ::std::vec::Vec<crate::proto::tonic::common::v1::KeyValue>);

opentelemetry-proto/src/transform/logs.rs

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ pub mod tonic {
139139

140140
impl
141141
From<(
142-
opentelemetry_sdk::export::logs::LogData,
142+
opentelemetry_sdk::export::logs::LogData<'_>,
143143
&ResourceAttributesWithSchema,
144144
)> for ResourceLogs
145145
{
@@ -164,30 +164,35 @@ pub mod tonic {
164164
.clone()
165165
.map(Into::into)
166166
.unwrap_or_default(),
167-
scope: Some((log_data.instrumentation, log_data.record.target.clone()).into()),
168-
log_records: vec![log_data.record.into()],
167+
scope: Some(
168+
(
169+
log_data.instrumentation.into_owned(),
170+
log_data.record.target.clone(),
171+
)
172+
.into(),
173+
),
174+
log_records: vec![log_data.record.into_owned().into()],
169175
}],
170176
}
171177
}
172178
}
173179

174180
pub fn group_logs_by_resource_and_scope(
175-
logs: Vec<opentelemetry_sdk::export::logs::LogData>,
181+
logs: Vec<opentelemetry_sdk::export::logs::LogData<'_>>,
176182
resource: &ResourceAttributesWithSchema,
177183
) -> Vec<ResourceLogs> {
178184
// Group logs by target or instrumentation name
179185
let scope_map = logs.iter().fold(
180186
HashMap::new(),
181187
|mut scope_map: HashMap<
182188
Cow<'static, str>,
183-
Vec<&opentelemetry_sdk::export::logs::LogData>,
189+
Vec<&opentelemetry_sdk::export::logs::LogData<'_>>,
184190
>,
185191
log| {
186-
let key = log
187-
.record
188-
.target
189-
.clone()
190-
.unwrap_or_else(|| log.instrumentation.name.clone());
192+
let key =
193+
log.record.target.clone().unwrap_or_else(|| {
194+
Cow::Owned(log.instrumentation.name.clone().into_owned())
195+
});
191196
scope_map.entry(key).or_default().push(log);
192197
scope_map
193198
},
@@ -197,13 +202,20 @@ pub mod tonic {
197202
.into_iter()
198203
.map(|(key, log_data)| ScopeLogs {
199204
scope: Some(InstrumentationScope::from((
200-
&log_data.first().unwrap().instrumentation,
201-
Some(key),
205+
Cow::Owned(
206+
log_data
207+
.first()
208+
.unwrap()
209+
.instrumentation
210+
.clone()
211+
.into_owned(),
212+
),
213+
Some(key.into_owned().into()),
202214
))),
203215
schema_url: resource.schema_url.clone().unwrap_or_default(),
204216
log_records: log_data
205217
.into_iter()
206-
.map(|log_data| log_data.record.clone().into())
218+
.map(|log_data| log_data.record.clone().into_owned().into())
207219
.collect(),
208220
})
209221
.collect();
@@ -225,18 +237,21 @@ mod tests {
225237
use opentelemetry::logs::LogRecord as _;
226238
use opentelemetry_sdk::export::logs::LogData;
227239
use opentelemetry_sdk::{logs::LogRecord, Resource};
240+
use std::borrow::Cow;
228241
use std::time::SystemTime;
229242

230-
fn create_test_log_data(instrumentation_name: &str, _message: &str) -> LogData {
243+
fn create_test_log_data<'a>(instrumentation_name: &str, _message: &str) -> LogData<'a> {
231244
let mut logrecord = LogRecord::default();
232245
logrecord.set_timestamp(SystemTime::now());
233246
logrecord.set_observed_timestamp(SystemTime::now());
234247
LogData {
235-
instrumentation: opentelemetry_sdk::InstrumentationLibrary::builder(
236-
instrumentation_name.to_string(),
237-
)
238-
.build(),
239-
record: logrecord,
248+
instrumentation: Cow::Owned(
249+
opentelemetry_sdk::InstrumentationLibrary::builder(
250+
instrumentation_name.to_string(),
251+
)
252+
.build(),
253+
),
254+
record: Cow::Owned(logrecord),
240255
}
241256
}
242257

opentelemetry-sdk/src/export/logs/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use std::fmt::Debug;
1515
#[async_trait]
1616
pub trait LogExporter: Send + Sync + Debug {
1717
/// Exports a batch of [`LogData`].
18-
async fn export<'a>(&mut self, batch: Vec<Cow<'a, LogData>>) -> LogResult<()>;
18+
async fn export<'a>(&mut self, batch: Vec<Cow<'a, LogData<'a>>>) -> LogResult<()>;
1919
/// Shuts down the exporter.
2020
fn shutdown(&mut self) {}
2121
#[cfg(feature = "logs_level_enabled")]
@@ -29,11 +29,11 @@ pub trait LogExporter: Send + Sync + Debug {
2929

3030
/// `LogData` represents a single log event without resource context.
3131
#[derive(Clone, Debug)]
32-
pub struct LogData {
33-
/// Log record
34-
pub record: LogRecord,
32+
pub struct LogData<'a> {
33+
/// Log record, which can be borrowed or owned.
34+
pub record: Cow<'a, LogRecord>,
3535
/// Instrumentation details for the emitter who produced this `LogEvent`.
36-
pub instrumentation: InstrumentationLibrary,
36+
pub instrumentation: Cow<'a, InstrumentationLibrary>,
3737
}
3838

3939
/// Describes the result of an export.

opentelemetry-sdk/src/logs/log_emitter.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -266,8 +266,8 @@ impl opentelemetry::logs::Logger for Logger {
266266
}
267267

268268
let mut data = LogData {
269-
record: log_record,
270-
instrumentation: self.instrumentation_library().clone(),
269+
record: Cow::Borrowed(&log_record),
270+
instrumentation: Cow::Borrowed(self.instrumentation_library()),
271271
};
272272

273273
for p in processors {
@@ -328,7 +328,7 @@ mod tests {
328328
}
329329

330330
impl LogProcessor for ShutdownTestLogProcessor {
331-
fn emit(&self, _data: &mut LogData) {
331+
fn emit(&self, _data: &mut LogData<'_>) {
332332
self.is_shutdown
333333
.lock()
334334
.map(|is_shutdown| {
@@ -563,7 +563,7 @@ mod tests {
563563
}
564564

565565
impl LogProcessor for LazyLogProcessor {
566-
fn emit(&self, _data: &mut LogData) {
566+
fn emit(&self, _data: &mut LogData<'_>) {
567567
// nothing to do.
568568
}
569569

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ pub trait LogProcessor: Send + Sync + Debug {
5555
///
5656
/// # Parameters
5757
/// - `data`: A mutable reference to `LogData` representing the log record.
58-
fn emit(&self, data: &mut LogData);
58+
fn emit(&self, data: &mut LogData<'_>);
5959
/// Force the logs lying in the cache to be exported.
6060
fn force_flush(&self) -> LogResult<()>;
6161
/// Shuts down the processor.
@@ -90,7 +90,7 @@ impl SimpleLogProcessor {
9090
}
9191

9292
impl LogProcessor for SimpleLogProcessor {
93-
fn emit(&self, data: &mut LogData) {
93+
fn emit(&self, data: &mut LogData<'_>) {
9494
// noop after shutdown
9595
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
9696
return;
@@ -152,10 +152,14 @@ impl<R: RuntimeChannel> Debug for BatchLogProcessor<R> {
152152
}
153153

154154
impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
155-
fn emit(&self, data: &mut LogData) {
155+
fn emit(&self, data: &mut LogData<'_>) {
156+
let owned_data = LogData {
157+
record: Cow::Owned(data.record.clone().into_owned()),
158+
instrumentation: Cow::Owned(data.instrumentation.clone().into_owned()),
159+
};
156160
let result = self
157161
.message_sender
158-
.try_send(BatchMessage::ExportLog(data.clone()));
162+
.try_send(BatchMessage::ExportLog(owned_data));
159163

160164
if let Err(err) = result {
161165
global::handle_error(LogError::Other(err.into()));
@@ -307,7 +311,7 @@ async fn export_with_timeout<'a, R, E>(
307311
time_out: Duration,
308312
exporter: &mut E,
309313
runtime: &R,
310-
batch: Vec<Cow<'a, LogData>>,
314+
batch: Vec<Cow<'a, LogData<'a>>>,
311315
) -> ExportResult
312316
where
313317
R: RuntimeChannel,
@@ -497,7 +501,7 @@ where
497501
#[derive(Debug)]
498502
enum BatchMessage {
499503
/// Export logs, usually called when the log is emitted.
500-
ExportLog(LogData),
504+
ExportLog(LogData<'static>),
501505
/// Flush the current buffer to the backend, it can be triggered by
502506
/// pre configured interval or a call to `force_push` function.
503507
Flush(Option<oneshot::Sender<ExportResult>>),
@@ -545,7 +549,7 @@ mod tests {
545549

546550
#[async_trait]
547551
impl LogExporter for MockLogExporter {
548-
async fn export<'a>(&mut self, _batch: Vec<Cow<'a, LogData>>) -> LogResult<()> {
552+
async fn export<'a>(&mut self, _batch: Vec<Cow<'a, LogData<'a>>>) -> LogResult<()> {
549553
Ok(())
550554
}
551555

@@ -814,21 +818,29 @@ mod tests {
814818

815819
#[derive(Debug)]
816820
struct FirstProcessor {
817-
pub(crate) logs: Arc<Mutex<Vec<LogData>>>,
821+
pub(crate) logs: Arc<Mutex<Vec<LogData<'static>>>>,
818822
}
819823

820824
impl LogProcessor for FirstProcessor {
821-
fn emit(&self, data: &mut LogData) {
825+
fn emit(&self, data: &mut LogData<'_>) {
826+
// Ensure the record is owned before modifying
827+
let record = data.record.to_mut();
822828
// Add attribute
823-
data.record.add_attribute(
829+
record.add_attribute(
824830
Key::from_static_str("processed_by"),
825831
AnyValue::String("FirstProcessor".into()),
826832
);
827833

828834
// Update body
829-
data.record.body = Some(AnyValue::String("Updated by FirstProcessor".into()));
835+
record.body = Some(AnyValue::String("Updated by FirstProcessor".into()));
836+
837+
// Convert the modified LogData to an owned version
838+
let owned_data = LogData {
839+
record: Cow::Owned(record.clone()), // Since record is already owned, no need to clone deeply
840+
instrumentation: Cow::Owned(data.instrumentation.clone().into_owned()),
841+
};
830842

831-
self.logs.lock().unwrap().push(data.clone()); // Clone as the LogProcessor is storing the data.
843+
self.logs.lock().unwrap().push(owned_data); // Clone as the LogProcessor is storing the data.
832844
}
833845

834846
#[cfg(feature = "logs_level_enabled")]
@@ -847,11 +859,11 @@ mod tests {
847859

848860
#[derive(Debug)]
849861
struct SecondProcessor {
850-
pub(crate) logs: Arc<Mutex<Vec<LogData>>>,
862+
pub(crate) logs: Arc<Mutex<Vec<LogData<'static>>>>,
851863
}
852864

853865
impl LogProcessor for SecondProcessor {
854-
fn emit(&self, data: &mut LogData) {
866+
fn emit(&self, data: &mut LogData<'_>) {
855867
assert!(data.record.attributes_contains(
856868
&Key::from_static_str("processed_by"),
857869
&AnyValue::String("FirstProcessor".into())
@@ -860,7 +872,13 @@ mod tests {
860872
data.record.body.clone().unwrap()
861873
== AnyValue::String("Updated by FirstProcessor".into())
862874
);
863-
self.logs.lock().unwrap().push(data.clone());
875+
// Convert the modified LogData to an owned version before storing it
876+
let record = data.record.to_mut();
877+
let owned_data = LogData {
878+
record: Cow::Owned(record.clone()), // Convert the record to owned
879+
instrumentation: Cow::Owned(data.instrumentation.clone().into_owned()),
880+
};
881+
self.logs.lock().unwrap().push(owned_data);
864882
}
865883

866884
#[cfg(feature = "logs_level_enabled")]

opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,20 @@ use std::sync::{Arc, Mutex};
3939
///
4040
#[derive(Clone, Debug)]
4141
pub struct InMemoryLogsExporter {
42-
logs: Arc<Mutex<Vec<LogData>>>,
42+
logs: Arc<Mutex<Vec<OwnedLogData>>>,
4343
resource: Arc<Mutex<Resource>>,
4444
should_reset_on_shutdown: bool,
4545
}
4646

47+
/// `OwnedLogData` represents a single log event without resource context.
48+
#[derive(Debug, Clone)]
49+
pub struct OwnedLogData {
50+
/// Log record, which can be borrowed or owned.
51+
pub record: LogRecord,
52+
/// Instrumentation details for the emitter who produced this `LogEvent`.
53+
pub instrumentation: InstrumentationLibrary,
54+
}
55+
4756
impl Default for InMemoryLogsExporter {
4857
fn default() -> Self {
4958
InMemoryLogsExporterBuilder::new().build()
@@ -175,10 +184,14 @@ impl InMemoryLogsExporter {
175184

176185
#[async_trait]
177186
impl LogExporter for InMemoryLogsExporter {
178-
async fn export<'a>(&mut self, batch: Vec<Cow<'a, LogData>>) -> LogResult<()> {
187+
async fn export<'a>(&mut self, batch: Vec<Cow<'a, LogData<'a>>>) -> LogResult<()> {
179188
let mut logs_guard = self.logs.lock().map_err(LogError::from)?;
180189
for log in batch.into_iter() {
181-
logs_guard.push(log.into_owned());
190+
let owned_log = OwnedLogData {
191+
record: log.record.clone().into_owned(),
192+
instrumentation: log.instrumentation.clone().into_owned(),
193+
};
194+
logs_guard.push(owned_log);
182195
}
183196
Ok(())
184197
}

opentelemetry-stdout/src/logs/exporter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ impl fmt::Debug for LogExporter {
4545
#[async_trait]
4646
impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
4747
/// Export spans to stdout
48-
async fn export<'a>(&mut self, batch: Vec<Cow<'a, LogData>>) -> ExportResult {
48+
async fn export<'a>(&mut self, batch: Vec<Cow<'a, LogData<'a>>>) -> ExportResult {
4949
if let Some(writer) = &mut self.writer {
5050
// TODO - Avoid cloning logdata if it is borrowed.
5151
let log_data = crate::logs::transform::LogData::from((

0 commit comments

Comments
 (0)