实现 KCP + UDP 连接与通信
This commit is contained in:
parent
39be28a997
commit
6212fae716
@ -1,5 +1,6 @@
|
||||
#include "RSHWNetworkClient.h"
|
||||
|
||||
#include "KCPWrap.h"
|
||||
#include "Logging.h"
|
||||
#include "Sockets.h"
|
||||
#include "IPAddress.h"
|
||||
@ -15,6 +16,13 @@ bool URSHWNetworkClient::Send(const TArray<uint8>& Data)
|
||||
{
|
||||
if (!IsActive() || !(ClientPass.ID | ClientPass.Key)) return false;
|
||||
|
||||
return KCPUnit->Send(Data.GetData(), Data.Num());
|
||||
}
|
||||
|
||||
int32 URSHWNetworkClient::UDPSend(const uint8 * Data, int32 Count)
|
||||
{
|
||||
if (!IsActive() || !(ClientPass.ID | ClientPass.Key)) return false;
|
||||
|
||||
SendBuffer.SetNumUninitialized(8, false);
|
||||
|
||||
SendBuffer[0] = ClientPass.ID >> 0;
|
||||
@ -27,10 +35,11 @@ bool URSHWNetworkClient::Send(const TArray<uint8>& Data)
|
||||
SendBuffer[6] = ClientPass.Key >> 16;
|
||||
SendBuffer[7] = ClientPass.Key >> 24;
|
||||
|
||||
SendBuffer.Append(Data);
|
||||
SendBuffer.Append(Data, Count);
|
||||
|
||||
int32 BytesSend;
|
||||
return SocketPtr->SendTo(SendBuffer.GetData(), SendBuffer.Num(), BytesSend, *ServerAddrPtr) && BytesSend == SendBuffer.Num();
|
||||
SocketPtr->SendTo(SendBuffer.GetData(), SendBuffer.Num(), BytesSend, *ServerAddrPtr);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void URSHWNetworkClient::BeginPlay()
|
||||
@ -56,6 +65,14 @@ void URSHWNetworkClient::TickComponent(float DeltaTime, ELevelTick TickType, FAc
|
||||
|
||||
const FDateTime NowTime = FDateTime::Now();
|
||||
|
||||
// update kcp
|
||||
if (KCPUnit)
|
||||
{
|
||||
int32 Current = FPlatformTime::Cycles64() / 1000;
|
||||
|
||||
KCPUnit->Update(Current);
|
||||
}
|
||||
|
||||
// send heartbeat
|
||||
{
|
||||
if (NowTime - LastHeartbeat > Heartbeat)
|
||||
@ -85,7 +102,7 @@ void URSHWNetworkClient::TickComponent(float DeltaTime, ELevelTick TickType, FAc
|
||||
int32 BytesRead;
|
||||
TSharedRef<FInternetAddr> SourceAddr = SocketSubsystem->CreateInternetAddr();
|
||||
|
||||
while (true) {
|
||||
while (SocketPtr) {
|
||||
|
||||
RecvBuffer.SetNumUninitialized(65535, false);
|
||||
|
||||
@ -112,6 +129,16 @@ void URSHWNetworkClient::TickComponent(float DeltaTime, ELevelTick TickType, FAc
|
||||
if (!(ClientPass.ID | ClientPass.Key))
|
||||
{
|
||||
ClientPass = SourcePass;
|
||||
|
||||
KCPUnit = MakeShared<FKCPWrap>(ClientPass.ID, GetName());
|
||||
KCPUnit->SetTurboMode();
|
||||
KCPUnit->GetKCPCB().logmask = KCPLogMask;
|
||||
|
||||
KCPUnit->OutputFunc = [this](const uint8* Data, int32 Count)->int32
|
||||
{
|
||||
return UDPSend(Data, Count);
|
||||
};
|
||||
|
||||
OnLogin.Broadcast();
|
||||
}
|
||||
|
||||
@ -126,13 +153,31 @@ void URSHWNetworkClient::TickComponent(float DeltaTime, ELevelTick TickType, FAc
|
||||
// is server request
|
||||
if (SourcePass.ID == ClientPass.ID && SourcePass.Key == ClientPass.Key)
|
||||
{
|
||||
DataBuffer.SetNumUninitialized(RecvBuffer.Num() - 8, false);
|
||||
FMemory::Memcpy(DataBuffer.GetData(), RecvBuffer.GetData() + 8, RecvBuffer.Num() - 8);
|
||||
OnRecv.Broadcast(DataBuffer);
|
||||
KCPUnit->Input(RecvBuffer.GetData() + 8, RecvBuffer.Num() - 8);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handle kcp recv
|
||||
{
|
||||
while (KCPUnit)
|
||||
{
|
||||
int32 Size = KCPUnit->PeekSize();
|
||||
|
||||
if (Size <= 0) break;
|
||||
|
||||
RecvBuffer.SetNumUninitialized(Size, false);
|
||||
|
||||
Size = KCPUnit->Recv(RecvBuffer.GetData(), RecvBuffer.Num());
|
||||
|
||||
if (Size <= 0) break;
|
||||
|
||||
RecvBuffer.SetNumUninitialized(Size, false);
|
||||
|
||||
OnRecv.Broadcast(RecvBuffer);
|
||||
}
|
||||
}
|
||||
|
||||
// handle timeout
|
||||
{
|
||||
if ((ClientPass.ID | ClientPass.Key) && NowTime - LastRecvTime > TimeoutLimit)
|
||||
@ -140,6 +185,8 @@ void URSHWNetworkClient::TickComponent(float DeltaTime, ELevelTick TickType, FAc
|
||||
ClientPass.ID = 0;
|
||||
ClientPass.Key = 0;
|
||||
|
||||
KCPUnit = nullptr;
|
||||
|
||||
UE_LOG(LogRSHWNetwork, Warning, TEXT("RSHW network client '%s' timeout."), *GetName());
|
||||
|
||||
OnUnlogin.Broadcast();
|
||||
@ -221,6 +268,8 @@ void URSHWNetworkClient::Deactivate()
|
||||
ClientPass.ID = 0;
|
||||
ClientPass.Key = 0;
|
||||
|
||||
KCPUnit = nullptr;
|
||||
|
||||
UE_LOG(LogRSHWNetwork, Log, TEXT("RSHW network client '%s' deactivate."), *GetName());
|
||||
|
||||
SetComponentTickEnabled(false);
|
||||
|
@ -1,5 +1,6 @@
|
||||
#include "RSHWNetworkServer.h"
|
||||
|
||||
#include "KCPWrap.h"
|
||||
#include "Logging.h"
|
||||
#include "Sockets.h"
|
||||
#include "IPAddress.h"
|
||||
@ -18,6 +19,15 @@ bool URSHWNetworkServer::Send(int32 ClientID, const TArray<uint8>& Data)
|
||||
|
||||
const FRegistrationInfo& Info = Registration[ClientID];
|
||||
|
||||
return !Info.KCPUnit->Send(Data.GetData(), Data.Num());
|
||||
}
|
||||
|
||||
int32 URSHWNetworkServer::UDPSend(int32 ClientID, const uint8* Data, int32 Count)
|
||||
{
|
||||
if (!IsActive() || !Registration.Contains(ClientID)) return false;
|
||||
|
||||
const FRegistrationInfo& Info = Registration[ClientID];
|
||||
|
||||
SendBuffer.SetNumUninitialized(8, false);
|
||||
|
||||
SendBuffer[0] = Info.Pass.ID >> 0;
|
||||
@ -30,10 +40,11 @@ bool URSHWNetworkServer::Send(int32 ClientID, const TArray<uint8>& Data)
|
||||
SendBuffer[6] = Info.Pass.Key >> 16;
|
||||
SendBuffer[7] = Info.Pass.Key >> 24;
|
||||
|
||||
SendBuffer.Append(Data);
|
||||
SendBuffer.Append(Data, Count);
|
||||
|
||||
int32 BytesSend;
|
||||
return SocketPtr->SendTo(SendBuffer.GetData(), SendBuffer.Num(), BytesSend, *Info.Addr) && BytesSend == SendBuffer.Num();
|
||||
SocketPtr->SendTo(SendBuffer.GetData(), SendBuffer.Num(), BytesSend, *Info.Addr);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void URSHWNetworkServer::BeginPlay()
|
||||
@ -59,6 +70,19 @@ void URSHWNetworkServer::TickComponent(float DeltaTime, ELevelTick TickType, FAc
|
||||
|
||||
const FDateTime NowTime = FDateTime::Now();
|
||||
|
||||
// update kcp
|
||||
{
|
||||
TArray<int32> RegistrationAddr;
|
||||
Registration.GetKeys(RegistrationAddr);
|
||||
|
||||
int32 Current = FPlatformTime::Cycles64() / 1000;
|
||||
|
||||
for (int32 ID : RegistrationAddr)
|
||||
{
|
||||
Registration[ID].KCPUnit->Update(Current);
|
||||
}
|
||||
}
|
||||
|
||||
// send heartbeat
|
||||
{
|
||||
TArray<int32> RegistrationAddr;
|
||||
@ -94,7 +118,7 @@ void URSHWNetworkServer::TickComponent(float DeltaTime, ELevelTick TickType, FAc
|
||||
int32 BytesRead;
|
||||
TSharedRef<FInternetAddr> SourceAddr = SocketSubsystem->CreateInternetAddr();
|
||||
|
||||
while (true) {
|
||||
while (SocketPtr) {
|
||||
|
||||
RecvBuffer.SetNumUninitialized(65535, false);
|
||||
|
||||
@ -190,6 +214,15 @@ void URSHWNetworkServer::TickComponent(float DeltaTime, ELevelTick TickType, FAc
|
||||
NewRegistration.Heartbeat = FDateTime::MinValue();
|
||||
NewRegistration.Addr = SourceAddr;
|
||||
|
||||
NewRegistration.KCPUnit = MakeShared<FKCPWrap>(NewRegistration.Pass.ID, FString::Printf(TEXT("%s[%i]"), *GetName(), NewRegistration.Pass.ID));
|
||||
NewRegistration.KCPUnit->SetTurboMode();
|
||||
NewRegistration.KCPUnit->GetKCPCB().logmask = KCPLogMask;
|
||||
|
||||
NewRegistration.KCPUnit->OutputFunc = [this, ID = NewRegistration.Pass.ID](const uint8* Data, int32 Count) -> int32
|
||||
{
|
||||
return UDPSend(ID, Data, Count);
|
||||
};
|
||||
|
||||
Registration.Add(SourcePass.ID, NewRegistration);
|
||||
|
||||
PreRegistration.Remove(SourceAddrStr);
|
||||
@ -212,9 +245,7 @@ void URSHWNetworkServer::TickComponent(float DeltaTime, ELevelTick TickType, FAc
|
||||
// is client request
|
||||
if (Registration.Contains(SourcePass.ID))
|
||||
{
|
||||
DataBuffer.SetNumUninitialized(RecvBuffer.Num() - 8, false);
|
||||
FMemory::Memcpy(DataBuffer.GetData(), RecvBuffer.GetData() + 8, RecvBuffer.Num() - 8);
|
||||
OnRecv.Broadcast(SourcePass.ID, DataBuffer);
|
||||
Registration[SourcePass.ID].KCPUnit->Input(RecvBuffer.GetData() + 8, RecvBuffer.Num() - 8);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -252,6 +283,32 @@ void URSHWNetworkServer::TickComponent(float DeltaTime, ELevelTick TickType, FAc
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handle kcp recv
|
||||
{
|
||||
TArray<int32> RegistrationAddr;
|
||||
Registration.GetKeys(RegistrationAddr);
|
||||
|
||||
for (int32 ID : RegistrationAddr)
|
||||
{
|
||||
while (Registration[ID].KCPUnit)
|
||||
{
|
||||
int32 Size = Registration[ID].KCPUnit->PeekSize();
|
||||
|
||||
if (Size <= 0) break;
|
||||
|
||||
RecvBuffer.SetNumUninitialized(Size, false);
|
||||
|
||||
Size = Registration[ID].KCPUnit->Recv(RecvBuffer.GetData(), RecvBuffer.Num());
|
||||
|
||||
if (Size <= 0) break;
|
||||
|
||||
RecvBuffer.SetNumUninitialized(Size, false);
|
||||
|
||||
OnRecv.Broadcast(ID, RecvBuffer);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void URSHWNetworkServer::Activate(bool bReset)
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include "Components/ActorComponent.h"
|
||||
#include "RSHWNetworkClient.generated.h"
|
||||
|
||||
class FKCPWrap;
|
||||
class FInternetAddr;
|
||||
|
||||
UCLASS(BlueprintType, hidecategories = ("Cooking", "ComponentReplication"), meta = (BlueprintSpawnableComponent))
|
||||
@ -50,6 +51,9 @@ public:
|
||||
UPROPERTY(BlueprintReadWrite, EditAnywhere, Category = "RSHW|Network")
|
||||
FTimespan TimeoutLimit = FTimespan::FromSeconds(8.0);
|
||||
|
||||
UPROPERTY(BlueprintReadWrite, EditAnywhere, Category = "RSHW|Network")
|
||||
int32 KCPLogMask = 0;
|
||||
|
||||
private:
|
||||
|
||||
TSharedPtr<FInternetAddr> ServerAddrPtr;
|
||||
@ -65,6 +69,10 @@ private:
|
||||
FDateTime LastRecvTime;
|
||||
FDateTime LastHeartbeat;
|
||||
|
||||
TSharedPtr<FKCPWrap> KCPUnit;
|
||||
|
||||
int32 UDPSend(const uint8* Data, int32 Count);
|
||||
|
||||
private:
|
||||
|
||||
//~ Begin UActorComponent Interface
|
||||
|
@ -1,11 +1,13 @@
|
||||
#pragma once
|
||||
|
||||
#include "CoreMinimal.h"
|
||||
#include "Misc/DateTime.h"
|
||||
#include "RSHWNetworkType.h"
|
||||
#include "Components/ActorComponent.h"
|
||||
#include "RSHWNetworkServer.generated.h"
|
||||
|
||||
class FSocket;
|
||||
class FKCPWrap;
|
||||
|
||||
UCLASS(BlueprintType, hidecategories = ("Cooking", "ComponentReplication"), meta = (BlueprintSpawnableComponent))
|
||||
class RSHWNETWORK_API URSHWNetworkServer : public UActorComponent
|
||||
@ -49,6 +51,9 @@ public:
|
||||
UPROPERTY(BlueprintReadWrite, EditAnywhere, Category = "RSHW|Network")
|
||||
FTimespan TimeoutLimit = FTimespan::FromSeconds(8.0);
|
||||
|
||||
UPROPERTY(BlueprintReadWrite, EditAnywhere, Category = "RSHW|Network")
|
||||
int32 KCPLogMask = 0;
|
||||
|
||||
private:
|
||||
|
||||
FSocket* SocketPtr;
|
||||
@ -73,10 +78,13 @@ private:
|
||||
FDateTime RecvTime;
|
||||
FDateTime Heartbeat;
|
||||
TSharedPtr<FInternetAddr> Addr;
|
||||
TSharedPtr<FKCPWrap> KCPUnit;
|
||||
};
|
||||
|
||||
TMap<int32, FRegistrationInfo> Registration;
|
||||
|
||||
int32 UDPSend(int32 ClientID, const uint8* Data, int32 Count);
|
||||
|
||||
private:
|
||||
|
||||
//~ Begin UActorComponent Interface
|
||||
|
Reference in New Issue
Block a user