Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/object/srv_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -2118,6 +2118,9 @@ obj_local_rw(crt_rpc_t *rpc, struct obj_io_context *ioc, struct dtx_handle *dth)
}
}

if (rc != 0 && retry > 0)
vos_dtx_cleanup(dth, true);

return rc;
}

Expand Down Expand Up @@ -3822,6 +3825,9 @@ obj_local_punch(struct obj_punch_in *opi, crt_opcode_t opc, uint32_t shard_nr, u
}

out:
if (rc != 0 && retry > 0)
vos_dtx_cleanup(dth, true);

return rc;
}

Expand Down Expand Up @@ -5138,6 +5144,9 @@ ds_cpd_handle_one_wrap(crt_rpc_t *rpc, struct daos_cpd_sub_head *dcsh,
}
}

if (rc != 0 && retry > 0)
vos_dtx_cleanup(dth, true);

return rc;
}

Expand Down
56 changes: 34 additions & 22 deletions src/vos/vos_dtx.c
Original file line number Diff line number Diff line change
Expand Up @@ -1437,6 +1437,23 @@ vos_dtx_alloc(struct umem_instance *umm, struct dtx_handle *dth)
DAE_VER(dae) = dth->dth_ver;

if (dth->dth_mbs != NULL) {
if (dth->dth_mbs->dm_data_size <= sizeof(DAE_MBS_INLINE(dae))) {
memcpy(DAE_MBS_INLINE(dae), dth->dth_mbs->dm_data,
dth->dth_mbs->dm_data_size);
} else {
umem_off_t rec_off = umem_zalloc(umm, DAE_MBS_DSIZE(dae));

if (UMOFF_IS_NULL(rec_off)) {
D_ERROR("No space to store DTX mbs for " DF_DTI "\n",
DP_DTI(&dth->dth_xid));
D_GOTO(out, rc = -DER_NOSPACE);
}

memcpy(umem_off2ptr(umm, rec_off), dth->dth_mbs->dm_data,
dth->dth_mbs->dm_data_size);
DAE_MBS_OFF(dae) = rec_off;
}

DAE_TGT_CNT(dae) = dth->dth_mbs->dm_tgt_cnt;
DAE_GRP_CNT(dae) = dth->dth_mbs->dm_grp_cnt;
DAE_MBS_DSIZE(dae) = dth->dth_mbs->dm_data_size;
Expand Down Expand Up @@ -1473,6 +1490,7 @@ vos_dtx_alloc(struct umem_instance *umm, struct dtx_handle *dth)
d_list_add_tail(&dae->dae_order_link, &cont->vc_dtx_unsorted_list);
dth->dth_ent = dae;
} else {
out:
dtx_evict_lid(cont, dae);
}

Expand Down Expand Up @@ -2150,22 +2168,6 @@ vos_dtx_prepared(struct dtx_handle *dth, struct vos_dtx_cmt_ent **dce_p)
(dth->dth_modification_cnt > 0))
dth->dth_sync = 1;

if (DAE_MBS_DSIZE(dae) <= sizeof(DAE_MBS_INLINE(dae))) {
memcpy(DAE_MBS_INLINE(dae), dth->dth_mbs->dm_data,
DAE_MBS_DSIZE(dae));
} else {
rec_off = umem_zalloc(umm, DAE_MBS_DSIZE(dae));
if (UMOFF_IS_NULL(rec_off)) {
D_ERROR("No space to store DTX mbs "
DF_DTI"\n", DP_DTI(&DAE_XID(dae)));
return -DER_NOSPACE;
}

memcpy(umem_off2ptr(umm, rec_off),
dth->dth_mbs->dm_data, DAE_MBS_DSIZE(dae));
DAE_MBS_OFF(dae) = rec_off;
}

if (dae->dae_records != NULL) {
count = DAE_REC_CNT(dae) - DTX_INLINE_REC_CNT;
D_ASSERTF(count > 0, "Invalid DTX rec count %d\n", count);
Expand Down Expand Up @@ -2229,13 +2231,17 @@ vos_dtx_pack_mbs(struct umem_instance *umm, struct vos_dtx_act_ent *dae)
tmp->dm_dte_flags = DAE_FLAGS(dae);

/* The DTX is not prepared yet, copy the MBS from DTX handle. */
if (dth != NULL)
if (dth != NULL) {
memcpy(tmp->dm_data, dth->dth_mbs->dm_data, tmp->dm_data_size);
else if (tmp->dm_data_size <= sizeof(DAE_MBS_INLINE(dae)))
} else if (tmp->dm_data_size <= sizeof(DAE_MBS_INLINE(dae))) {
memcpy(tmp->dm_data, DAE_MBS_INLINE(dae), tmp->dm_data_size);
else
} else {
D_ASSERTF(!UMOFF_IS_NULL(DAE_MBS_OFF(dae)), "Unexpected empty MBS info for DTX "
DF_DTI "\n", DP_DTI(&DAE_XID(dae)));

memcpy(tmp->dm_data, umem_off2ptr(umm, DAE_MBS_OFF(dae)),
tmp->dm_data_size);
}

return tmp;
}
Expand Down Expand Up @@ -2383,6 +2389,11 @@ vos_dtx_load_mbs(daos_handle_t coh, struct dtx_id *dti, daos_unit_oid_t *oid,
rc = dbtree_lookup(cont->vc_dtx_active_hdl, &kiov, &riov);
if (rc == 0) {
dae = riov.iov_buf;

/* That can happen under the case of vos_dtx_refresh_mbs() failure. */
if (unlikely(DAE_MBS_DSIZE(dae) == 0))
return -DER_NONEXIST;

tmp = vos_dtx_pack_mbs(vos_cont2umm(cont), dae);
if (tmp == NULL) {
rc = -DER_NOMEM;
Expand Down Expand Up @@ -2477,7 +2488,8 @@ vos_dtx_refresh_mbs(daos_handle_t coh, struct dtx_id *dti, struct dtx_membership
if (rc != 0)
goto out;

dae_df->dae_mbs_off = UMOFF_NULL;
dae_df->dae_mbs_off = UMOFF_NULL;
dae_df->dae_mbs_dsize = 0;
}

if (new_inline) {
Expand All @@ -2500,10 +2512,10 @@ vos_dtx_refresh_mbs(daos_handle_t coh, struct dtx_id *dti, struct dtx_membership
out:
if (started) {
if (rc == 0) {
memcpy(&dae->dae_base, dae_df, sizeof(*dae_df));

rc = umem_tx_commit(umm);
D_ASSERTF(rc == 0, "local TX commit failure: %d\n", rc);

memcpy(&dae->dae_base, dae_df, sizeof(*dae_df));
} else {
rc = umem_tx_abort(umm, rc);
}
Expand Down
Loading