diff --git a/src/object/srv_obj.c b/src/object/srv_obj.c index ad141e816fe..5f3e9f4158e 100644 --- a/src/object/srv_obj.c +++ b/src/object/srv_obj.c @@ -2118,6 +2118,9 @@ obj_local_rw(crt_rpc_t *rpc, struct obj_io_context *ioc, struct dtx_handle *dth) } } + if (rc != 0 && retry > 0) + vos_dtx_cleanup(dth, true); + return rc; } @@ -3822,6 +3825,9 @@ obj_local_punch(struct obj_punch_in *opi, crt_opcode_t opc, uint32_t shard_nr, u } out: + if (rc != 0 && retry > 0) + vos_dtx_cleanup(dth, true); + return rc; } @@ -5138,6 +5144,9 @@ ds_cpd_handle_one_wrap(crt_rpc_t *rpc, struct daos_cpd_sub_head *dcsh, } } + if (rc != 0 && retry > 0) + vos_dtx_cleanup(dth, true); + return rc; } diff --git a/src/vos/vos_dtx.c b/src/vos/vos_dtx.c index 55bf163f2a6..0f4ca90086a 100644 --- a/src/vos/vos_dtx.c +++ b/src/vos/vos_dtx.c @@ -1437,6 +1437,23 @@ vos_dtx_alloc(struct umem_instance *umm, struct dtx_handle *dth) DAE_VER(dae) = dth->dth_ver; if (dth->dth_mbs != NULL) { + if (dth->dth_mbs->dm_data_size <= sizeof(DAE_MBS_INLINE(dae))) { + memcpy(DAE_MBS_INLINE(dae), dth->dth_mbs->dm_data, + dth->dth_mbs->dm_data_size); + } else { + umem_off_t rec_off = umem_zalloc(umm, DAE_MBS_DSIZE(dae)); + + if (UMOFF_IS_NULL(rec_off)) { + D_ERROR("No space to store DTX mbs for " DF_DTI "\n", + DP_DTI(&dth->dth_xid)); + D_GOTO(out, rc = -DER_NOSPACE); + } + + memcpy(umem_off2ptr(umm, rec_off), dth->dth_mbs->dm_data, + dth->dth_mbs->dm_data_size); + DAE_MBS_OFF(dae) = rec_off; + } + DAE_TGT_CNT(dae) = dth->dth_mbs->dm_tgt_cnt; DAE_GRP_CNT(dae) = dth->dth_mbs->dm_grp_cnt; DAE_MBS_DSIZE(dae) = dth->dth_mbs->dm_data_size; @@ -1473,6 +1490,7 @@ vos_dtx_alloc(struct umem_instance *umm, struct dtx_handle *dth) d_list_add_tail(&dae->dae_order_link, &cont->vc_dtx_unsorted_list); dth->dth_ent = dae; } else { +out: dtx_evict_lid(cont, dae); } @@ -2150,22 +2168,6 @@ vos_dtx_prepared(struct dtx_handle *dth, struct vos_dtx_cmt_ent **dce_p) (dth->dth_modification_cnt > 0)) dth->dth_sync = 1; - if (DAE_MBS_DSIZE(dae) <= sizeof(DAE_MBS_INLINE(dae))) { - memcpy(DAE_MBS_INLINE(dae), dth->dth_mbs->dm_data, - DAE_MBS_DSIZE(dae)); - } else { - rec_off = umem_zalloc(umm, DAE_MBS_DSIZE(dae)); - if (UMOFF_IS_NULL(rec_off)) { - D_ERROR("No space to store DTX mbs " - DF_DTI"\n", DP_DTI(&DAE_XID(dae))); - return -DER_NOSPACE; - } - - memcpy(umem_off2ptr(umm, rec_off), - dth->dth_mbs->dm_data, DAE_MBS_DSIZE(dae)); - DAE_MBS_OFF(dae) = rec_off; - } - if (dae->dae_records != NULL) { count = DAE_REC_CNT(dae) - DTX_INLINE_REC_CNT; D_ASSERTF(count > 0, "Invalid DTX rec count %d\n", count); @@ -2229,13 +2231,17 @@ vos_dtx_pack_mbs(struct umem_instance *umm, struct vos_dtx_act_ent *dae) tmp->dm_dte_flags = DAE_FLAGS(dae); /* The DTX is not prepared yet, copy the MBS from DTX handle. */ - if (dth != NULL) + if (dth != NULL) { memcpy(tmp->dm_data, dth->dth_mbs->dm_data, tmp->dm_data_size); - else if (tmp->dm_data_size <= sizeof(DAE_MBS_INLINE(dae))) + } else if (tmp->dm_data_size <= sizeof(DAE_MBS_INLINE(dae))) { memcpy(tmp->dm_data, DAE_MBS_INLINE(dae), tmp->dm_data_size); - else + } else { + D_ASSERTF(!UMOFF_IS_NULL(DAE_MBS_OFF(dae)), "Unexpected empty MBS info for DTX " + DF_DTI "\n", DP_DTI(&DAE_XID(dae))); + memcpy(tmp->dm_data, umem_off2ptr(umm, DAE_MBS_OFF(dae)), tmp->dm_data_size); + } return tmp; } @@ -2383,6 +2389,11 @@ vos_dtx_load_mbs(daos_handle_t coh, struct dtx_id *dti, daos_unit_oid_t *oid, rc = dbtree_lookup(cont->vc_dtx_active_hdl, &kiov, &riov); if (rc == 0) { dae = riov.iov_buf; + + /* That can happen under the case of vos_dtx_refresh_mbs() failure. */ + if (unlikely(DAE_MBS_DSIZE(dae) == 0)) + return -DER_NONEXIST; + tmp = vos_dtx_pack_mbs(vos_cont2umm(cont), dae); if (tmp == NULL) { rc = -DER_NOMEM; @@ -2477,7 +2488,8 @@ vos_dtx_refresh_mbs(daos_handle_t coh, struct dtx_id *dti, struct dtx_membership if (rc != 0) goto out; - dae_df->dae_mbs_off = UMOFF_NULL; + dae_df->dae_mbs_off = UMOFF_NULL; + dae_df->dae_mbs_dsize = 0; } if (new_inline) { @@ -2500,10 +2512,10 @@ vos_dtx_refresh_mbs(daos_handle_t coh, struct dtx_id *dti, struct dtx_membership out: if (started) { if (rc == 0) { + memcpy(&dae->dae_base, dae_df, sizeof(*dae_df)); + rc = umem_tx_commit(umm); D_ASSERTF(rc == 0, "local TX commit failure: %d\n", rc); - - memcpy(&dae->dae_base, dae_df, sizeof(*dae_df)); } else { rc = umem_tx_abort(umm, rc); }