From 6212fae7164f7610ab5e2be26afc3414fe94848f Mon Sep 17 00:00:00 2001 From: _Redstone_c_ <2824517378@qq.com> Date: Sun, 3 Jan 2021 18:36:45 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=20KCP=20+=20UDP=20=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5=E4=B8=8E=E9=80=9A=E4=BF=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../RSHWNetwork/Private/RSHWNetworkClient.cpp | 61 ++++++++++++++-- .../RSHWNetwork/Private/RSHWNetworkServer.cpp | 69 +++++++++++++++++-- Source/RSHWNetwork/Public/RSHWNetworkClient.h | 8 +++ Source/RSHWNetwork/Public/RSHWNetworkServer.h | 8 +++ 4 files changed, 134 insertions(+), 12 deletions(-) diff --git a/Source/RSHWNetwork/Private/RSHWNetworkClient.cpp b/Source/RSHWNetwork/Private/RSHWNetworkClient.cpp index 084f77c..efdcf03 100644 --- a/Source/RSHWNetwork/Private/RSHWNetworkClient.cpp +++ b/Source/RSHWNetwork/Private/RSHWNetworkClient.cpp @@ -1,5 +1,6 @@ #include "RSHWNetworkClient.h" +#include "KCPWrap.h" #include "Logging.h" #include "Sockets.h" #include "IPAddress.h" @@ -15,6 +16,13 @@ bool URSHWNetworkClient::Send(const TArray& Data) { if (!IsActive() || !(ClientPass.ID | ClientPass.Key)) return false; + return KCPUnit->Send(Data.GetData(), Data.Num()); +} + +int32 URSHWNetworkClient::UDPSend(const uint8 * Data, int32 Count) +{ + if (!IsActive() || !(ClientPass.ID | ClientPass.Key)) return false; + SendBuffer.SetNumUninitialized(8, false); SendBuffer[0] = ClientPass.ID >> 0; @@ -27,10 +35,11 @@ bool URSHWNetworkClient::Send(const TArray& Data) SendBuffer[6] = ClientPass.Key >> 16; SendBuffer[7] = ClientPass.Key >> 24; - SendBuffer.Append(Data); + SendBuffer.Append(Data, Count); int32 BytesSend; - return SocketPtr->SendTo(SendBuffer.GetData(), SendBuffer.Num(), BytesSend, *ServerAddrPtr) && BytesSend == SendBuffer.Num(); + SocketPtr->SendTo(SendBuffer.GetData(), SendBuffer.Num(), BytesSend, *ServerAddrPtr); + return 0; } void URSHWNetworkClient::BeginPlay() @@ -56,6 +65,14 @@ void URSHWNetworkClient::TickComponent(float DeltaTime, ELevelTick TickType, FAc const FDateTime NowTime = FDateTime::Now(); + // update kcp + if (KCPUnit) + { + int32 Current = FPlatformTime::Cycles64() / 1000; + + KCPUnit->Update(Current); + } + // send heartbeat { if (NowTime - LastHeartbeat > Heartbeat) @@ -85,7 +102,7 @@ void URSHWNetworkClient::TickComponent(float DeltaTime, ELevelTick TickType, FAc int32 BytesRead; TSharedRef SourceAddr = SocketSubsystem->CreateInternetAddr(); - while (true) { + while (SocketPtr) { RecvBuffer.SetNumUninitialized(65535, false); @@ -112,6 +129,16 @@ void URSHWNetworkClient::TickComponent(float DeltaTime, ELevelTick TickType, FAc if (!(ClientPass.ID | ClientPass.Key)) { ClientPass = SourcePass; + + KCPUnit = MakeShared(ClientPass.ID, GetName()); + KCPUnit->SetTurboMode(); + KCPUnit->GetKCPCB().logmask = KCPLogMask; + + KCPUnit->OutputFunc = [this](const uint8* Data, int32 Count)->int32 + { + return UDPSend(Data, Count); + }; + OnLogin.Broadcast(); } @@ -126,13 +153,31 @@ void URSHWNetworkClient::TickComponent(float DeltaTime, ELevelTick TickType, FAc // is server request if (SourcePass.ID == ClientPass.ID && SourcePass.Key == ClientPass.Key) { - DataBuffer.SetNumUninitialized(RecvBuffer.Num() - 8, false); - FMemory::Memcpy(DataBuffer.GetData(), RecvBuffer.GetData() + 8, RecvBuffer.Num() - 8); - OnRecv.Broadcast(DataBuffer); + KCPUnit->Input(RecvBuffer.GetData() + 8, RecvBuffer.Num() - 8); } } } + // handle kcp recv + { + while (KCPUnit) + { + int32 Size = KCPUnit->PeekSize(); + + if (Size <= 0) break; + + RecvBuffer.SetNumUninitialized(Size, false); + + Size = KCPUnit->Recv(RecvBuffer.GetData(), RecvBuffer.Num()); + + if (Size <= 0) break; + + RecvBuffer.SetNumUninitialized(Size, false); + + OnRecv.Broadcast(RecvBuffer); + } + } + // handle timeout { if ((ClientPass.ID | ClientPass.Key) && NowTime - LastRecvTime > TimeoutLimit) @@ -140,6 +185,8 @@ void URSHWNetworkClient::TickComponent(float DeltaTime, ELevelTick TickType, FAc ClientPass.ID = 0; ClientPass.Key = 0; + KCPUnit = nullptr; + UE_LOG(LogRSHWNetwork, Warning, TEXT("RSHW network client '%s' timeout."), *GetName()); OnUnlogin.Broadcast(); @@ -221,6 +268,8 @@ void URSHWNetworkClient::Deactivate() ClientPass.ID = 0; ClientPass.Key = 0; + KCPUnit = nullptr; + UE_LOG(LogRSHWNetwork, Log, TEXT("RSHW network client '%s' deactivate."), *GetName()); SetComponentTickEnabled(false); diff --git a/Source/RSHWNetwork/Private/RSHWNetworkServer.cpp b/Source/RSHWNetwork/Private/RSHWNetworkServer.cpp index ee949ea..ba83e13 100644 --- a/Source/RSHWNetwork/Private/RSHWNetworkServer.cpp +++ b/Source/RSHWNetwork/Private/RSHWNetworkServer.cpp @@ -1,5 +1,6 @@ #include "RSHWNetworkServer.h" +#include "KCPWrap.h" #include "Logging.h" #include "Sockets.h" #include "IPAddress.h" @@ -18,6 +19,15 @@ bool URSHWNetworkServer::Send(int32 ClientID, const TArray& Data) const FRegistrationInfo& Info = Registration[ClientID]; + return !Info.KCPUnit->Send(Data.GetData(), Data.Num()); +} + +int32 URSHWNetworkServer::UDPSend(int32 ClientID, const uint8* Data, int32 Count) +{ + if (!IsActive() || !Registration.Contains(ClientID)) return false; + + const FRegistrationInfo& Info = Registration[ClientID]; + SendBuffer.SetNumUninitialized(8, false); SendBuffer[0] = Info.Pass.ID >> 0; @@ -30,10 +40,11 @@ bool URSHWNetworkServer::Send(int32 ClientID, const TArray& Data) SendBuffer[6] = Info.Pass.Key >> 16; SendBuffer[7] = Info.Pass.Key >> 24; - SendBuffer.Append(Data); + SendBuffer.Append(Data, Count); int32 BytesSend; - return SocketPtr->SendTo(SendBuffer.GetData(), SendBuffer.Num(), BytesSend, *Info.Addr) && BytesSend == SendBuffer.Num(); + SocketPtr->SendTo(SendBuffer.GetData(), SendBuffer.Num(), BytesSend, *Info.Addr); + return 0; } void URSHWNetworkServer::BeginPlay() @@ -59,6 +70,19 @@ void URSHWNetworkServer::TickComponent(float DeltaTime, ELevelTick TickType, FAc const FDateTime NowTime = FDateTime::Now(); + // update kcp + { + TArray RegistrationAddr; + Registration.GetKeys(RegistrationAddr); + + int32 Current = FPlatformTime::Cycles64() / 1000; + + for (int32 ID : RegistrationAddr) + { + Registration[ID].KCPUnit->Update(Current); + } + } + // send heartbeat { TArray RegistrationAddr; @@ -94,7 +118,7 @@ void URSHWNetworkServer::TickComponent(float DeltaTime, ELevelTick TickType, FAc int32 BytesRead; TSharedRef SourceAddr = SocketSubsystem->CreateInternetAddr(); - while (true) { + while (SocketPtr) { RecvBuffer.SetNumUninitialized(65535, false); @@ -190,6 +214,15 @@ void URSHWNetworkServer::TickComponent(float DeltaTime, ELevelTick TickType, FAc NewRegistration.Heartbeat = FDateTime::MinValue(); NewRegistration.Addr = SourceAddr; + NewRegistration.KCPUnit = MakeShared(NewRegistration.Pass.ID, FString::Printf(TEXT("%s[%i]"), *GetName(), NewRegistration.Pass.ID)); + NewRegistration.KCPUnit->SetTurboMode(); + NewRegistration.KCPUnit->GetKCPCB().logmask = KCPLogMask; + + NewRegistration.KCPUnit->OutputFunc = [this, ID = NewRegistration.Pass.ID](const uint8* Data, int32 Count) -> int32 + { + return UDPSend(ID, Data, Count); + }; + Registration.Add(SourcePass.ID, NewRegistration); PreRegistration.Remove(SourceAddrStr); @@ -212,9 +245,7 @@ void URSHWNetworkServer::TickComponent(float DeltaTime, ELevelTick TickType, FAc // is client request if (Registration.Contains(SourcePass.ID)) { - DataBuffer.SetNumUninitialized(RecvBuffer.Num() - 8, false); - FMemory::Memcpy(DataBuffer.GetData(), RecvBuffer.GetData() + 8, RecvBuffer.Num() - 8); - OnRecv.Broadcast(SourcePass.ID, DataBuffer); + Registration[SourcePass.ID].KCPUnit->Input(RecvBuffer.GetData() + 8, RecvBuffer.Num() - 8); } } } @@ -252,6 +283,32 @@ void URSHWNetworkServer::TickComponent(float DeltaTime, ELevelTick TickType, FAc } } } + + // handle kcp recv + { + TArray RegistrationAddr; + Registration.GetKeys(RegistrationAddr); + + for (int32 ID : RegistrationAddr) + { + while (Registration[ID].KCPUnit) + { + int32 Size = Registration[ID].KCPUnit->PeekSize(); + + if (Size <= 0) break; + + RecvBuffer.SetNumUninitialized(Size, false); + + Size = Registration[ID].KCPUnit->Recv(RecvBuffer.GetData(), RecvBuffer.Num()); + + if (Size <= 0) break; + + RecvBuffer.SetNumUninitialized(Size, false); + + OnRecv.Broadcast(ID, RecvBuffer); + } + } + } } void URSHWNetworkServer::Activate(bool bReset) diff --git a/Source/RSHWNetwork/Public/RSHWNetworkClient.h b/Source/RSHWNetwork/Public/RSHWNetworkClient.h index 6bef111..f31e86b 100644 --- a/Source/RSHWNetwork/Public/RSHWNetworkClient.h +++ b/Source/RSHWNetwork/Public/RSHWNetworkClient.h @@ -6,6 +6,7 @@ #include "Components/ActorComponent.h" #include "RSHWNetworkClient.generated.h" +class FKCPWrap; class FInternetAddr; UCLASS(BlueprintType, hidecategories = ("Cooking", "ComponentReplication"), meta = (BlueprintSpawnableComponent)) @@ -50,6 +51,9 @@ public: UPROPERTY(BlueprintReadWrite, EditAnywhere, Category = "RSHW|Network") FTimespan TimeoutLimit = FTimespan::FromSeconds(8.0); + UPROPERTY(BlueprintReadWrite, EditAnywhere, Category = "RSHW|Network") + int32 KCPLogMask = 0; + private: TSharedPtr ServerAddrPtr; @@ -65,6 +69,10 @@ private: FDateTime LastRecvTime; FDateTime LastHeartbeat; + TSharedPtr KCPUnit; + + int32 UDPSend(const uint8* Data, int32 Count); + private: //~ Begin UActorComponent Interface diff --git a/Source/RSHWNetwork/Public/RSHWNetworkServer.h b/Source/RSHWNetwork/Public/RSHWNetworkServer.h index f5dd5d3..448d2b1 100644 --- a/Source/RSHWNetwork/Public/RSHWNetworkServer.h +++ b/Source/RSHWNetwork/Public/RSHWNetworkServer.h @@ -1,11 +1,13 @@ #pragma once #include "CoreMinimal.h" +#include "Misc/DateTime.h" #include "RSHWNetworkType.h" #include "Components/ActorComponent.h" #include "RSHWNetworkServer.generated.h" class FSocket; +class FKCPWrap; UCLASS(BlueprintType, hidecategories = ("Cooking", "ComponentReplication"), meta = (BlueprintSpawnableComponent)) class RSHWNETWORK_API URSHWNetworkServer : public UActorComponent @@ -49,6 +51,9 @@ public: UPROPERTY(BlueprintReadWrite, EditAnywhere, Category = "RSHW|Network") FTimespan TimeoutLimit = FTimespan::FromSeconds(8.0); + UPROPERTY(BlueprintReadWrite, EditAnywhere, Category = "RSHW|Network") + int32 KCPLogMask = 0; + private: FSocket* SocketPtr; @@ -73,10 +78,13 @@ private: FDateTime RecvTime; FDateTime Heartbeat; TSharedPtr Addr; + TSharedPtr KCPUnit; }; TMap Registration; + int32 UDPSend(int32 ClientID, const uint8* Data, int32 Count); + private: //~ Begin UActorComponent Interface