Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue-2434: DescribeSessions method should return OrphanSessions as well as active sessions to prevent filesystem destruction right after tablet reboot #2484

Merged
merged 3 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ void TDestroySessionActor::HandleDestroySessionResponse(
if (msg->GetStatus() == E_REJECTED) {
// Pipe error
if (ctx.Now() < Deadline) {
return ctx.Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup());
return ctx.Schedule(
TDuration::Seconds(1),
new TEvents::TEvWakeup());
}
}

Expand Down Expand Up @@ -161,7 +163,8 @@ void TDestroySessionActor::Notify(
SessionId.Quote().c_str(),
FormatError(error).c_str());

auto response = std::make_unique<TEvServicePrivate::TEvSessionDestroyed>(error);
auto response =
std::make_unique<TEvServicePrivate::TEvSessionDestroyed>(error);
response->SessionId = SessionId;
response->SeqNo = SeqNo;

Expand All @@ -174,7 +177,9 @@ STFUNC(TDestroySessionActor::StateWork)
HFunc(TEvents::TEvPoisonPill, HandlePoisonPill);
HFunc(TEvents::TEvWakeup, HandleWakeUp);

HFunc(TEvIndexTablet::TEvDestroySessionResponse, HandleDestroySessionResponse);
HFunc(
TEvIndexTablet::TEvDestroySessionResponse,
HandleDestroySessionResponse);
IgnoreFunc(TEvServicePrivate::TEvPingSession);

default:
Expand Down Expand Up @@ -215,7 +220,27 @@ void TStorageServiceActor::HandleDestroySession(

auto* session = State->FindSession(sessionId, seqNo);
if (!session) {
return reply(MakeError(S_ALREADY, "session doesn't exist"));
// session still needs to be destroyed in index tablet
LOG_INFO(ctx, TFileStoreComponents::SERVICE,
"%s DestroySession - tablet only",
LogTag(fsId, clientId, sessionId, seqNo).c_str());

auto requestInfo = CreateRequestInfo(
SelfId(),
cookie,
msg->CallContext);

auto actor = std::make_unique<TDestroySessionActor>(
StorageConfig,
std::move(requestInfo),
clientId,
fsId,
sessionId,
seqNo);

NCloud::Register(ctx, std::move(actor));

return;
}

if (session->CreateDestroyState != ESessionCreateDestroyState::STATE_NONE) {
Expand All @@ -231,7 +256,8 @@ void TStorageServiceActor::HandleDestroySession(
}

if (session->ShouldStop) {
return reply(MakeError(E_REJECTED, "session destruction is in progress"));
return reply(
MakeError(E_REJECTED, "session destruction is in progress"));
}

LOG_INFO(ctx, TFileStoreComponents::SERVICE,
Expand All @@ -254,7 +280,8 @@ void TStorageServiceActor::HandleDestroySession(
session->ShouldStop = true;
}

session->CreateDestroyState = ESessionCreateDestroyState::STATE_DESTROY_SESSION;
session->CreateDestroyState =
ESessionCreateDestroyState::STATE_DESTROY_SESSION;

auto requestInfo = CreateRequestInfo(
SelfId(),
Expand Down Expand Up @@ -290,7 +317,7 @@ void TStorageServiceActor::HandleSessionDestroyed(
// shutdown or stop before start
}

auto inflight = FindInFlightRequest(ev->Cookie);
auto* inflight = FindInFlightRequest(ev->Cookie);
if (!inflight) {
LOG_CRIT(ctx, TFileStoreComponents::SERVICE,
"[%s] failed to complete DestroySession: invalid cookie (%lu)",
Expand All @@ -304,7 +331,8 @@ void TStorageServiceActor::HandleSessionDestroyed(
msg->SessionId.Quote().c_str(),
FormatError(msg->GetError()).c_str());

auto response = std::make_unique<TEvService::TEvDestroySessionResponse>(msg->GetError());
auto response = std::make_unique<TEvService::TEvDestroySessionResponse>(
msg->GetError());
NCloud::Reply(ctx, *inflight, std::move(response));

inflight->Complete(ctx.Now(), msg->GetError());
Expand Down
149 changes: 123 additions & 26 deletions cloud/filestore/libs/storage/service/service_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1496,49 +1496,146 @@ Y_UNIT_TEST_SUITE(TStorageServiceTest)

ui32 nodeIdx = env.CreateNode("nfs");

// delaying pipe creation response
ui64 tabletId = -1;
env.GetRuntime().SetEventFilter(
[&] (TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) {
switch (event->GetTypeRewrite()) {
case TEvSSProxy::EvDescribeFileStoreResponse: {
using TResponse =
TEvSSProxy::TEvDescribeFileStoreResponse;
const auto* msg = event->Get<TResponse>();
const auto& desc =
msg->PathDescription.GetFileStoreDescription();
tabletId = desc.GetIndexTabletId();
break;
}
}

return false;
});

TServiceClient service(env.GetRuntime(), nodeIdx);
service.CreateFileStore("test", 1'000);

THeaders headers = {"test", "client", "session", 3};
THeaders headers1 = {"test", "client", "session", 3};
service.CreateSession(
headers,
headers1,
"", // checkpointId
false, // restoreClientSession
headers.SessionSeqNo);
service.ResetSession(headers, "some_state");
headers1.SessionSeqNo);
service.ResetSession(headers1, "some_state");

headers = {"test", "client2", "session2", 4};
THeaders headers2 = {"test", "client2", "session2", 4};
service.CreateSession(
headers,
headers2,
"", // checkpointId
false, // restoreClientSession
headers.SessionSeqNo);
service.ResetSession(headers, "some_state2");
headers2.SessionSeqNo);
service.ResetSession(headers2, "some_state2");

NProtoPrivate::TDescribeSessionsRequest request;
request.SetFileSystemId("test");

TString buf;
google::protobuf::util::MessageToJsonString(request, &buf);
auto jsonResponse = service.ExecuteAction("describesessions", buf);
NProtoPrivate::TDescribeSessionsResponse response;
UNIT_ASSERT(google::protobuf::util::JsonStringToMessage(
jsonResponse->Record.GetOutput(), &response).ok());
{
TString buf;
google::protobuf::util::MessageToJsonString(request, &buf);
auto jsonResponse = service.ExecuteAction("describesessions", buf);
NProtoPrivate::TDescribeSessionsResponse response;
UNIT_ASSERT(google::protobuf::util::JsonStringToMessage(
jsonResponse->Record.GetOutput(), &response).ok());

const auto& sessions = response.GetSessions();
UNIT_ASSERT_VALUES_EQUAL(2, sessions.size());
const auto& sessions = response.GetSessions();
UNIT_ASSERT_VALUES_EQUAL(2, sessions.size());

UNIT_ASSERT_VALUES_EQUAL("session", sessions[0].GetSessionId());
UNIT_ASSERT_VALUES_EQUAL("client", sessions[0].GetClientId());
UNIT_ASSERT_VALUES_EQUAL("some_state", sessions[0].GetSessionState());
UNIT_ASSERT_VALUES_EQUAL(3, sessions[0].GetMaxSeqNo());
UNIT_ASSERT_VALUES_EQUAL(3, sessions[0].GetMaxRwSeqNo());
UNIT_ASSERT_VALUES_EQUAL("session", sessions[0].GetSessionId());
UNIT_ASSERT_VALUES_EQUAL("client", sessions[0].GetClientId());
UNIT_ASSERT_VALUES_EQUAL(
"some_state",
sessions[0].GetSessionState());
UNIT_ASSERT_VALUES_EQUAL(3, sessions[0].GetMaxSeqNo());
UNIT_ASSERT_VALUES_EQUAL(3, sessions[0].GetMaxRwSeqNo());
UNIT_ASSERT(!sessions[0].GetIsOrphan());

UNIT_ASSERT_VALUES_EQUAL("session2", sessions[1].GetSessionId());
UNIT_ASSERT_VALUES_EQUAL("client2", sessions[1].GetClientId());
UNIT_ASSERT_VALUES_EQUAL(
"some_state2",
sessions[1].GetSessionState());
UNIT_ASSERT_VALUES_EQUAL(4, sessions[1].GetMaxSeqNo());
UNIT_ASSERT_VALUES_EQUAL(4, sessions[1].GetMaxRwSeqNo());
UNIT_ASSERT(!sessions[1].GetIsOrphan());
}

UNIT_ASSERT_VALUES_EQUAL("session2", sessions[1].GetSessionId());
UNIT_ASSERT_VALUES_EQUAL("client2", sessions[1].GetClientId());
UNIT_ASSERT_VALUES_EQUAL("some_state2", sessions[1].GetSessionState());
UNIT_ASSERT_VALUES_EQUAL(4, sessions[1].GetMaxSeqNo());
UNIT_ASSERT_VALUES_EQUAL(4, sessions[1].GetMaxRwSeqNo());
TIndexTabletClient tablet(env.GetRuntime(), nodeIdx, tabletId);
// rebooting tablet to destroy the pipe
tablet.RebootTablet();

{
TString buf;
google::protobuf::util::MessageToJsonString(request, &buf);
auto jsonResponse = service.ExecuteAction("describesessions", buf);
NProtoPrivate::TDescribeSessionsResponse response;
UNIT_ASSERT(google::protobuf::util::JsonStringToMessage(
jsonResponse->Record.GetOutput(), &response).ok());

const auto& sessions = response.GetSessions();
UNIT_ASSERT_VALUES_EQUAL(2, sessions.size());

UNIT_ASSERT_VALUES_EQUAL("session", sessions[0].GetSessionId());
UNIT_ASSERT_VALUES_EQUAL("client", sessions[0].GetClientId());
UNIT_ASSERT_VALUES_EQUAL(
"some_state",
sessions[0].GetSessionState());
UNIT_ASSERT_VALUES_EQUAL(3, sessions[0].GetMaxSeqNo());
UNIT_ASSERT_VALUES_EQUAL(3, sessions[0].GetMaxRwSeqNo());
UNIT_ASSERT(sessions[0].GetIsOrphan());

UNIT_ASSERT_VALUES_EQUAL("session2", sessions[1].GetSessionId());
UNIT_ASSERT_VALUES_EQUAL("client2", sessions[1].GetClientId());
UNIT_ASSERT_VALUES_EQUAL(
"some_state2",
sessions[1].GetSessionState());
UNIT_ASSERT_VALUES_EQUAL(4, sessions[1].GetMaxSeqNo());
UNIT_ASSERT_VALUES_EQUAL(4, sessions[1].GetMaxRwSeqNo());
UNIT_ASSERT(sessions[1].GetIsOrphan());
}

{
auto response = service.DestroySession(headers1);
UNIT_ASSERT_VALUES_EQUAL_C(
S_OK,
response->GetStatus(),
response->GetErrorReason());
}

{
auto response = service.DestroySession(headers1);
UNIT_ASSERT_VALUES_EQUAL_C(
// this request should still reach index tablet which responds
// with S_OK
S_OK,
response->GetStatus(),
response->GetErrorReason());
}

{
auto response = service.DestroySession(headers2);
UNIT_ASSERT_VALUES_EQUAL_C(
S_OK,
response->GetStatus(),
response->GetErrorReason());
}

{
auto response = service.DestroySession(headers2);
UNIT_ASSERT_VALUES_EQUAL_C(
// this request should still reach index tablet which responds
// with S_OK
S_OK,
response->GetStatus(),
response->GetErrorReason());
}
}

Y_UNIT_TEST(ShouldValidateBlockSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,9 @@ void TIndexTabletActor::HandleSyncShardSessions(
{
THashSet<TString> filter;
for (auto& s: *ev->Get()->Sessions.MutableSessions()) {
filter.insert(*s.MutableSessionId());
if (!s.GetIsOrphan()) {
filter.insert(*s.MutableSessionId());
}
}
TEvIndexTabletPrivate::TShardSessionsInfo info;
info.ShardId = std::move(ev->Get()->ShardId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,17 @@ void TIndexTabletActor::HandleDestroySession(

auto* session = FindSession(sessionId);
if (!session) {
auto response = std::make_unique<TEvIndexTablet::TEvDestroySessionResponse>();
auto response =
std::make_unique<TEvIndexTablet::TEvDestroySessionResponse>();

NCloud::Reply(ctx, *ev, std::move(response));
return;
}

if (session->GetClientId() != clientId) {
auto response = std::make_unique<TEvIndexTablet::TEvDestroySessionResponse>(
ErrorInvalidSession(clientId, sessionId, sessionSeqNo));
auto response =
std::make_unique<TEvIndexTablet::TEvDestroySessionResponse>(
ErrorInvalidSession(clientId, sessionId, sessionSeqNo));

NCloud::Reply(ctx, *ev, std::move(response));
return;
Expand Down
28 changes: 18 additions & 10 deletions cloud/filestore/libs/storage/tablet/tablet_state_sessions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,18 +392,26 @@ TVector<TSession*> TIndexTabletState::GetSessionsToNotify(
return result;
}

TVector<NProtoPrivate::TTabletSessionInfo> TIndexTabletState::DescribeSessions() const
auto TIndexTabletState::DescribeSessions() const
-> TVector<NProtoPrivate::TTabletSessionInfo>
{
TVector<NProtoPrivate::TTabletSessionInfo> sessionInfos;
for (const auto& session: Impl->Sessions) {
NProtoPrivate::TTabletSessionInfo sessionInfo;
sessionInfo.SetSessionId(session.GetSessionId());
sessionInfo.SetClientId(session.GetClientId());
sessionInfo.SetSessionState(session.GetSessionState());
sessionInfo.SetMaxSeqNo(session.GetMaxSeqNo());
sessionInfo.SetMaxRwSeqNo(session.GetMaxRwSeqNo());
sessionInfos.push_back(std::move(sessionInfo));
}
auto addSessionInfos = [&] (const TSessionList& sessions, bool isOrphan) {
for (const auto& session: sessions) {
NProtoPrivate::TTabletSessionInfo sessionInfo;
sessionInfo.SetSessionId(session.GetSessionId());
sessionInfo.SetClientId(session.GetClientId());
sessionInfo.SetSessionState(session.GetSessionState());
sessionInfo.SetMaxSeqNo(session.GetMaxSeqNo());
sessionInfo.SetMaxRwSeqNo(session.GetMaxRwSeqNo());
sessionInfo.SetIsOrphan(isOrphan);
sessionInfos.push_back(std::move(sessionInfo));
}
};

addSessionInfos(Impl->Sessions, false);
addSessionInfos(Impl->OrphanSessions, true);

return sessionInfos;
}

Expand Down
1 change: 1 addition & 0 deletions cloud/filestore/private/api/protos/tablet.proto
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ message TTabletSessionInfo
bytes SessionState = 3;
uint64 MaxSeqNo = 4;
uint64 MaxRwSeqNo = 5;
bool IsOrphan = 6;
}

message TDescribeSessionsResponse
Expand Down
Loading