Skip to content

Commit 0733c4d

Browse files
authored
PYTHON-4925 Fix test bugs in $$matchAsDocument and $$matchAsRoot (#1988)
Fixes a bug where the driverConnectionId field was missing from "server heartbeat failed" log messages. Avoids sending "upsert": False since various client.bulkWrite spec tests assume this field is only sent when it's True.
1 parent 466d0a1 commit 0733c4d

20 files changed

+609
-65
lines changed

pymongo/asynchronous/bulk.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -140,18 +140,18 @@ def add_update(
140140
self,
141141
selector: Mapping[str, Any],
142142
update: Union[Mapping[str, Any], _Pipeline],
143-
multi: bool = False,
144-
upsert: bool = False,
143+
multi: bool,
144+
upsert: Optional[bool],
145145
collation: Optional[Mapping[str, Any]] = None,
146146
array_filters: Optional[list[Mapping[str, Any]]] = None,
147147
hint: Union[str, dict[str, Any], None] = None,
148148
sort: Optional[Mapping[str, Any]] = None,
149149
) -> None:
150150
"""Create an update document and add it to the list of ops."""
151151
validate_ok_for_update(update)
152-
cmd: dict[str, Any] = dict( # noqa: C406
153-
[("q", selector), ("u", update), ("multi", multi), ("upsert", upsert)]
154-
)
152+
cmd: dict[str, Any] = {"q": selector, "u": update, "multi": multi}
153+
if upsert is not None:
154+
cmd["upsert"] = upsert
155155
if collation is not None:
156156
self.uses_collation = True
157157
cmd["collation"] = collation
@@ -173,14 +173,16 @@ def add_replace(
173173
self,
174174
selector: Mapping[str, Any],
175175
replacement: Mapping[str, Any],
176-
upsert: bool = False,
176+
upsert: Optional[bool],
177177
collation: Optional[Mapping[str, Any]] = None,
178178
hint: Union[str, dict[str, Any], None] = None,
179179
sort: Optional[Mapping[str, Any]] = None,
180180
) -> None:
181181
"""Create a replace document and add it to the list of ops."""
182182
validate_ok_for_replace(replacement)
183-
cmd = {"q": selector, "u": replacement, "multi": False, "upsert": upsert}
183+
cmd: dict[str, Any] = {"q": selector, "u": replacement}
184+
if upsert is not None:
185+
cmd["upsert"] = upsert
184186
if collation is not None:
185187
self.uses_collation = True
186188
cmd["collation"] = collation
@@ -200,7 +202,7 @@ def add_delete(
200202
hint: Union[str, dict[str, Any], None] = None,
201203
) -> None:
202204
"""Create a delete document and add it to the list of ops."""
203-
cmd = {"q": selector, "limit": limit}
205+
cmd: dict[str, Any] = {"q": selector, "limit": limit}
204206
if collation is not None:
205207
self.uses_collation = True
206208
cmd["collation"] = collation

pymongo/asynchronous/client_bulk.py

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -106,20 +106,13 @@ def __init__(
106106
self.bypass_doc_val = bypass_document_validation
107107
self.comment = comment
108108
self.verbose_results = verbose_results
109-
110109
self.ops: list[tuple[str, Mapping[str, Any]]] = []
111110
self.namespaces: list[str] = []
112111
self.idx_offset: int = 0
113112
self.total_ops: int = 0
114-
115113
self.executed = False
116-
self.uses_upsert = False
117114
self.uses_collation = False
118115
self.uses_array_filters = False
119-
self.uses_hint_update = False
120-
self.uses_hint_delete = False
121-
self.uses_sort = False
122-
123116
self.is_retryable = self.client.options.retry_writes
124117
self.retrying = False
125118
self.started_retryable_write = False
@@ -144,7 +137,7 @@ def add_update(
144137
namespace: str,
145138
selector: Mapping[str, Any],
146139
update: Union[Mapping[str, Any], _Pipeline],
147-
multi: bool = False,
140+
multi: bool,
148141
upsert: Optional[bool] = None,
149142
collation: Optional[Mapping[str, Any]] = None,
150143
array_filters: Optional[list[Mapping[str, Any]]] = None,
@@ -160,19 +153,16 @@ def add_update(
160153
"multi": multi,
161154
}
162155
if upsert is not None:
163-
self.uses_upsert = True
164156
cmd["upsert"] = upsert
165157
if array_filters is not None:
166158
self.uses_array_filters = True
167159
cmd["arrayFilters"] = array_filters
168160
if hint is not None:
169-
self.uses_hint_update = True
170161
cmd["hint"] = hint
171162
if collation is not None:
172163
self.uses_collation = True
173164
cmd["collation"] = collation
174165
if sort is not None:
175-
self.uses_sort = True
176166
cmd["sort"] = sort
177167
if multi:
178168
# A bulk_write containing an update_many is not retryable.
@@ -200,16 +190,13 @@ def add_replace(
200190
"multi": False,
201191
}
202192
if upsert is not None:
203-
self.uses_upsert = True
204193
cmd["upsert"] = upsert
205194
if hint is not None:
206-
self.uses_hint_update = True
207195
cmd["hint"] = hint
208196
if collation is not None:
209197
self.uses_collation = True
210198
cmd["collation"] = collation
211199
if sort is not None:
212-
self.uses_sort = True
213200
cmd["sort"] = sort
214201
self.ops.append(("replace", cmd))
215202
self.namespaces.append(namespace)
@@ -226,7 +213,6 @@ def add_delete(
226213
"""Create a delete document and add it to the list of ops."""
227214
cmd = {"delete": -1, "filter": selector, "multi": multi}
228215
if hint is not None:
229-
self.uses_hint_delete = True
230216
cmd["hint"] = hint
231217
if collation is not None:
232218
self.uses_collation = True

pymongo/asynchronous/monitor.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ def __init__(
149149
self._listeners = self._settings._pool_options._event_listeners
150150
self._publish = self._listeners is not None and self._listeners.enabled_for_server_heartbeat
151151
self._cancel_context: Optional[_CancellationContext] = None
152+
self._conn_id: Optional[int] = None
152153
self._rtt_monitor = _RttMonitor(
153154
topology,
154155
topology_settings,
@@ -243,6 +244,7 @@ async def _check_server(self) -> ServerDescription:
243244
244245
Returns a ServerDescription.
245246
"""
247+
self._conn_id = None
246248
start = time.monotonic()
247249
try:
248250
try:
@@ -272,6 +274,7 @@ async def _check_server(self) -> ServerDescription:
272274
awaited=awaited,
273275
durationMS=duration * 1000,
274276
failure=error,
277+
driverConnectionId=self._conn_id,
275278
message=_SDAMStatusMessage.HEARTBEAT_FAIL,
276279
)
277280
await self._reset_connection()
@@ -314,6 +317,8 @@ async def _check_once(self) -> ServerDescription:
314317
)
315318

316319
self._cancel_context = conn.cancel_context
320+
# Record the connection id so we can later attach it to the failed log message.
321+
self._conn_id = conn.id
317322
response, round_trip_time = await self._check_with_socket(conn)
318323
if not response.awaitable:
319324
self._rtt_monitor.add_sample(round_trip_time)

pymongo/operations.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ def __init__(
332332
self,
333333
filter: Mapping[str, Any],
334334
replacement: Union[_DocumentType, RawBSONDocument],
335-
upsert: bool = False,
335+
upsert: Optional[bool] = None,
336336
collation: Optional[_CollationIn] = None,
337337
hint: Optional[_IndexKeyHint] = None,
338338
namespace: Optional[str] = None,
@@ -693,7 +693,7 @@ def _add_to_bulk(self, bulkobj: _AgnosticBulk) -> None:
693693
self._filter,
694694
self._doc,
695695
True,
696-
bool(self._upsert),
696+
self._upsert,
697697
collation=validate_collation_or_none(self._collation),
698698
array_filters=self._array_filters,
699699
hint=self._hint,

pymongo/synchronous/bulk.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -140,18 +140,18 @@ def add_update(
140140
self,
141141
selector: Mapping[str, Any],
142142
update: Union[Mapping[str, Any], _Pipeline],
143-
multi: bool = False,
144-
upsert: bool = False,
143+
multi: bool,
144+
upsert: Optional[bool],
145145
collation: Optional[Mapping[str, Any]] = None,
146146
array_filters: Optional[list[Mapping[str, Any]]] = None,
147147
hint: Union[str, dict[str, Any], None] = None,
148148
sort: Optional[Mapping[str, Any]] = None,
149149
) -> None:
150150
"""Create an update document and add it to the list of ops."""
151151
validate_ok_for_update(update)
152-
cmd: dict[str, Any] = dict( # noqa: C406
153-
[("q", selector), ("u", update), ("multi", multi), ("upsert", upsert)]
154-
)
152+
cmd: dict[str, Any] = {"q": selector, "u": update, "multi": multi}
153+
if upsert is not None:
154+
cmd["upsert"] = upsert
155155
if collation is not None:
156156
self.uses_collation = True
157157
cmd["collation"] = collation
@@ -173,14 +173,16 @@ def add_replace(
173173
self,
174174
selector: Mapping[str, Any],
175175
replacement: Mapping[str, Any],
176-
upsert: bool = False,
176+
upsert: Optional[bool],
177177
collation: Optional[Mapping[str, Any]] = None,
178178
hint: Union[str, dict[str, Any], None] = None,
179179
sort: Optional[Mapping[str, Any]] = None,
180180
) -> None:
181181
"""Create a replace document and add it to the list of ops."""
182182
validate_ok_for_replace(replacement)
183-
cmd = {"q": selector, "u": replacement, "multi": False, "upsert": upsert}
183+
cmd: dict[str, Any] = {"q": selector, "u": replacement}
184+
if upsert is not None:
185+
cmd["upsert"] = upsert
184186
if collation is not None:
185187
self.uses_collation = True
186188
cmd["collation"] = collation
@@ -200,7 +202,7 @@ def add_delete(
200202
hint: Union[str, dict[str, Any], None] = None,
201203
) -> None:
202204
"""Create a delete document and add it to the list of ops."""
203-
cmd = {"q": selector, "limit": limit}
205+
cmd: dict[str, Any] = {"q": selector, "limit": limit}
204206
if collation is not None:
205207
self.uses_collation = True
206208
cmd["collation"] = collation

pymongo/synchronous/client_bulk.py

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -106,20 +106,13 @@ def __init__(
106106
self.bypass_doc_val = bypass_document_validation
107107
self.comment = comment
108108
self.verbose_results = verbose_results
109-
110109
self.ops: list[tuple[str, Mapping[str, Any]]] = []
111110
self.namespaces: list[str] = []
112111
self.idx_offset: int = 0
113112
self.total_ops: int = 0
114-
115113
self.executed = False
116-
self.uses_upsert = False
117114
self.uses_collation = False
118115
self.uses_array_filters = False
119-
self.uses_hint_update = False
120-
self.uses_hint_delete = False
121-
self.uses_sort = False
122-
123116
self.is_retryable = self.client.options.retry_writes
124117
self.retrying = False
125118
self.started_retryable_write = False
@@ -144,7 +137,7 @@ def add_update(
144137
namespace: str,
145138
selector: Mapping[str, Any],
146139
update: Union[Mapping[str, Any], _Pipeline],
147-
multi: bool = False,
140+
multi: bool,
148141
upsert: Optional[bool] = None,
149142
collation: Optional[Mapping[str, Any]] = None,
150143
array_filters: Optional[list[Mapping[str, Any]]] = None,
@@ -160,19 +153,16 @@ def add_update(
160153
"multi": multi,
161154
}
162155
if upsert is not None:
163-
self.uses_upsert = True
164156
cmd["upsert"] = upsert
165157
if array_filters is not None:
166158
self.uses_array_filters = True
167159
cmd["arrayFilters"] = array_filters
168160
if hint is not None:
169-
self.uses_hint_update = True
170161
cmd["hint"] = hint
171162
if collation is not None:
172163
self.uses_collation = True
173164
cmd["collation"] = collation
174165
if sort is not None:
175-
self.uses_sort = True
176166
cmd["sort"] = sort
177167
if multi:
178168
# A bulk_write containing an update_many is not retryable.
@@ -200,16 +190,13 @@ def add_replace(
200190
"multi": False,
201191
}
202192
if upsert is not None:
203-
self.uses_upsert = True
204193
cmd["upsert"] = upsert
205194
if hint is not None:
206-
self.uses_hint_update = True
207195
cmd["hint"] = hint
208196
if collation is not None:
209197
self.uses_collation = True
210198
cmd["collation"] = collation
211199
if sort is not None:
212-
self.uses_sort = True
213200
cmd["sort"] = sort
214201
self.ops.append(("replace", cmd))
215202
self.namespaces.append(namespace)
@@ -226,7 +213,6 @@ def add_delete(
226213
"""Create a delete document and add it to the list of ops."""
227214
cmd = {"delete": -1, "filter": selector, "multi": multi}
228215
if hint is not None:
229-
self.uses_hint_delete = True
230216
cmd["hint"] = hint
231217
if collation is not None:
232218
self.uses_collation = True

pymongo/synchronous/monitor.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ def __init__(
149149
self._listeners = self._settings._pool_options._event_listeners
150150
self._publish = self._listeners is not None and self._listeners.enabled_for_server_heartbeat
151151
self._cancel_context: Optional[_CancellationContext] = None
152+
self._conn_id: Optional[int] = None
152153
self._rtt_monitor = _RttMonitor(
153154
topology,
154155
topology_settings,
@@ -243,6 +244,7 @@ def _check_server(self) -> ServerDescription:
243244
244245
Returns a ServerDescription.
245246
"""
247+
self._conn_id = None
246248
start = time.monotonic()
247249
try:
248250
try:
@@ -272,6 +274,7 @@ def _check_server(self) -> ServerDescription:
272274
awaited=awaited,
273275
durationMS=duration * 1000,
274276
failure=error,
277+
driverConnectionId=self._conn_id,
275278
message=_SDAMStatusMessage.HEARTBEAT_FAIL,
276279
)
277280
self._reset_connection()
@@ -314,6 +317,8 @@ def _check_once(self) -> ServerDescription:
314317
)
315318

316319
self._cancel_context = conn.cancel_context
320+
# Record the connection id so we can later attach it to the failed log message.
321+
self._conn_id = conn.id
317322
response, round_trip_time = self._check_with_socket(conn)
318323
if not response.awaitable:
319324
self._rtt_monitor.add_sample(round_trip_time)

test/asynchronous/unified_format.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1328,8 +1328,8 @@ def format_logs(log_list):
13281328
if log.module == "ocsp_support":
13291329
continue
13301330
data = json_util.loads(log.getMessage())
1331-
client = data.pop("clientId") if "clientId" in data else data.pop("topologyId")
1332-
client_to_log[client].append(
1331+
client_id = data.get("clientId", data.get("topologyId"))
1332+
client_to_log[client_id].append(
13331333
{
13341334
"level": log.levelname.lower(),
13351335
"component": log.name.replace("pymongo.", "", 1),

test/discovery_and_monitoring/unified/logging-replicaset.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,7 @@
357357
},
358358
"durationMS": {
359359
"$$type": [
360+
"double",
360361
"int",
361362
"long"
362363
]
@@ -398,6 +399,7 @@
398399
},
399400
"durationMS": {
400401
"$$type": [
402+
"double",
401403
"int",
402404
"long"
403405
]
@@ -439,6 +441,7 @@
439441
},
440442
"durationMS": {
441443
"$$type": [
444+
"double",
442445
"int",
443446
"long"
444447
]
@@ -589,6 +592,7 @@
589592
},
590593
"durationMS": {
591594
"$$type": [
595+
"double",
592596
"int",
593597
"long"
594598
]

test/discovery_and_monitoring/unified/logging-sharded.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,7 @@
324324
},
325325
"durationMS": {
326326
"$$type": [
327+
"double",
327328
"int",
328329
"long"
329330
]
@@ -475,6 +476,7 @@
475476
},
476477
"durationMS": {
477478
"$$type": [
479+
"double",
478480
"int",
479481
"long"
480482
]

0 commit comments

Comments
 (0)