/*-------------------------------------------------------------------------- Project: HorizonSDK File: rdt_SenderImpl.cpp Copyright 2009 Nintendo. All rights reserved. These coded instructions, statements, and computer programs contain proprietary information of Nintendo of America Inc. and/or Nintendo Company Ltd., and are protected by Federal copyright law. They may not be disclosed to third parties or copied or duplicated in any form, in whole or in part, without the prior written consent of Nintendo. $Date:: 2011-02-07#$ $Rev: 34174 $ $Author: okubata_ryoma $ *-------------------------------------------------------------------------*/ #include "stdafx.h" #include "rdt_SenderImpl.h" #include #include #include "rdt_Stopwatch.h" #include "Test.h" namespace { } // end of anonymous namespace namespace nn { namespace rdt { namespace CTR { // 「状態」の基底クラス。 class SenderStateBase{ public: virtual ~SenderStateBase(void){} virtual void initialize(SenderImpl *pSender); // この状態が開始されるときに呼ばれる virtual void finalize (SenderImpl *pSender); // この状態が終了されるときに呼ばれる void update (SenderImpl *pSender); virtual void updatePrologue (SenderImpl *pSender); virtual void updateCore (SenderImpl *pSender); virtual void updateEpilogue (SenderImpl *pSender); virtual enum SenderState getStatus(void) const = 0; protected: SenderStateBase(void){} }; void SenderStateBase::initialize(SenderImpl *pSender) { (void)pSender; } void SenderStateBase::finalize(SenderImpl *pSender) { (void)pSender; } void SenderStateBase::updatePrologue(SenderImpl *pSender) { (void)pSender; } void SenderStateBase::updateCore(SenderImpl *pSender) { (void)pSender; } void SenderStateBase::updateEpilogue(SenderImpl *pSender) { (void)pSender; } void SenderStateBase::update(SenderImpl *pSender) { updatePrologue(pSender); updateCore(pSender); updateEpilogue(pSender); } class SenderStateOpenRequested : public SenderStateBase{ public: static SenderStateBase* getInstance(void); virtual void updateCore(SenderImpl *pSender); virtual enum SenderState getStatus(void) const; protected: SenderStateOpenRequested(void){} }; class SenderStateOpening : public SenderStateBase{ public: static SenderStateBase* getInstance(void); virtual void updateCore(SenderImpl *pSender); virtual enum SenderState getStatus(void) const; protected: SenderStateOpening(void){} }; class SenderStateOpened : public SenderStateBase{ public: static SenderStateBase* getInstance(void); virtual void updateCore(SenderImpl *pSender); virtual enum SenderState getStatus(void) const; protected: SenderStateOpened(void){} }; class SenderStateCloseRequested : public SenderStateBase{ public: static SenderStateBase* getInstance(void); virtual void updateCore(SenderImpl *pSender); virtual enum SenderState getStatus(void) const; protected: SenderStateCloseRequested(void){} }; class SenderStateClosing : public SenderStateBase{ public: static SenderStateBase* getInstance(void); virtual void updateCore(SenderImpl *pSender); virtual enum SenderState getStatus(void) const; protected: SenderStateClosing(void){} }; class SenderStateClosed : public SenderStateBase{ public: static SenderStateBase* getInstance(void); virtual void initialize(SenderImpl *pSender); virtual void updateCore(SenderImpl *pSender); virtual enum SenderState getStatus(void) const; protected: SenderStateClosed(void){} }; // 以下、State派生クラスの実装。 SenderStateBase* SenderStateClosed::getInstance(void) { static SenderStateClosed s_instance; return &s_instance; } enum SenderState SenderStateClosed::getStatus(void) const { return SENDER_STATE_CLOSED; } void SenderStateClosed::initialize(SenderImpl *pSender) { ASSERT(pSender!=NULL); pSender->clear(); } void SenderStateClosed::updateCore(SenderImpl *pSender) { ASSERT(pSender!=NULL); // CLOSED状態の時に受信した(RSTを含まない)セグメントに対しては、RSTで返答 Segment seg; if(pSender->pullSegment(&seg).IsSuccess() && !seg.IsRst()) { #if 0 LOG("SenderはCLOSED状態ですが、セグメントが届いてしまったので、RSTで返答します。\n"); LOG("受信セグメントの内容は、\n"); seg.PrintDebugInfo(); LOG("です。\n"); const u32 SEQ = 0; pSender->sendRstSegment(SEQ); #else LOG("Sender is in CLOSED state, but received segment. Ignored.\n"); #endif } } SenderStateBase* SenderStateOpenRequested::getInstance(void) { static SenderStateOpenRequested s_instance; return &s_instance; } enum SenderState SenderStateOpenRequested::getStatus(void) const { return SENDER_STATE_OPEN_REQUESTED; } void SenderStateOpenRequested::updateCore(SenderImpl *pSender) { // 接続要求セグメントを送る pSender->sendSynSegment(); // 次の状態へ。 pSender->setNextState(SenderStateOpening::getInstance()); } SenderStateBase* SenderStateOpening::getInstance(void) { static SenderStateOpening s_instance; return &s_instance; } enum SenderState SenderStateOpening::getStatus(void) const { return SENDER_STATE_OPENING; } void SenderStateOpening::updateCore(SenderImpl *pSender) { ASSERT(pSender!=NULL); // 受信したセグメントの内容を調べる。 Segment seg; if(pSender->pullSegment(&seg).IsSuccess()) { pSender->processReceivedSegment(seg); if(seg.IsRst()) { // こちらのSYNに対するRSTかどうかをチェック。SYNに対するものでなければ、セグメントは破棄。 if(seg.IsAck() && seg.GetAckNumber()==pSender->getInitialSequenceNumber()+1) { // SYNは拒否されたと見なす。CLOSEDへ。 LOG("SYN rejected. CLOSED... ack of segment = %u\n", seg.GetAckNumber()); pSender->setNextState(SenderStateClosed::getInstance()); pSender->errorHandling(ResultResetReceived()); return; } else { LOG("Ignored RST segment.\n"); } } else if(seg.IsAck()) { if(seg.GetAckNumber()==pSender->getInitialSequenceNumber()+1) { // SYNに対するACKが返ってきたので、次の状態に移行。 pSender->setNextState(SenderStateOpened::getInstance()); } else { LOG("ack : %u\n", seg.GetAckNumber()); LOG("una : %u\n", pSender->getUnacknowledgeNumber()); LOG("nxt : %u\n", pSender->getNextSequenceNumber()); LOG("It seems that it isn't ACK for SYN. Ignored\n"); } } } // 再送処理。 pSender->processResending(); } SenderStateBase* SenderStateOpened::getInstance(void) { static SenderStateOpened s_instance; return &s_instance; } enum SenderState SenderStateOpened::getStatus(void) const { return SENDER_STATE_OPENED; } void SenderStateOpened::updateCore(SenderImpl *pSender) { ASSERT(pSender!=NULL); // 受信したセグメントの内容を調べる。 Segment seg; if(pSender->pullSegment(&seg).IsSuccess()) { pSender->processReceivedSegment(seg); if(seg.IsRst()) { // RSTだったので、CLOSEDへ。 pSender->setNextState(SenderStateClosed::getInstance()); pSender->errorHandling(ResultResetReceived()); return; } } bool bSent = false; // 送信処理を実行したなら、trueにセットすること。 // 再送処理。 bSent = pSender->processResending(); // 送信処理が1回も無ければ、データ送信のチャンス。 if(!bSent) { if(!pSender->isSendBufferEmpty()) { pSender->sendData(); bSent = true; } } } SenderStateBase* SenderStateCloseRequested::getInstance(void) { static SenderStateCloseRequested s_instance; return &s_instance; } enum SenderState SenderStateCloseRequested::getStatus(void) const { return SENDER_STATE_CLOSE_REQUESTED; } void SenderStateCloseRequested::updateCore(SenderImpl *pSender) { ASSERT(pSender!=NULL); // 受信したセグメントの内容を調べる。このあたりはOpened状態と同様。 Segment seg; if(pSender->pullSegment(&seg).IsSuccess()) { pSender->processReceivedSegment(seg); if(seg.IsRst()) { // RSTだったので、CLOSEDへ。 pSender->setNextState(SenderStateClosed::getInstance()); pSender->errorHandling(ResultResetReceived()); return; } } bool bSent = false; // 送信処理を実行したなら、trueにセットすること。 // 再送処理。 bSent = pSender->processResending(); if(pSender->isSendBufferEmpty()) { // FINセグメントの送信 if(!bSent) { pSender->sendFinSegment(); bSent = true; // CLOSING状態へ。 pSender->setNextState(SenderStateClosing::getInstance()); } } else { // 送信バッファのデータを掃き出し if(!bSent) { pSender->sendData(); bSent = true; } } // 送信処理が1回も無ければ、データ送信のチャンス。 if(!bSent) { if(!pSender->isSendBufferEmpty()) { pSender->sendData(); bSent = true; } } } SenderStateBase* SenderStateClosing::getInstance(void) { static SenderStateClosing s_instance; return &s_instance; } enum SenderState SenderStateClosing::getStatus(void) const { return SENDER_STATE_CLOSING; } void SenderStateClosing::updateCore(SenderImpl *pSender) { ASSERT(pSender!=NULL); // 再送処理。 pSender->processResending(); // 受信したセグメントの内容を調べる。 Segment seg; if(pSender->pullSegment(&seg).IsSuccess()) { pSender->processReceivedSegment(seg); if(seg.IsRst()) { // RSTだったので、CLOSEDへ。 pSender->setNextState(SenderStateClosed::getInstance()); pSender->errorHandling(ResultResetReceived()); return; } else if(seg.IsAck() && seg.GetAckNumber()==pSender->m_sendBuffer.GetCurrentSequenceNumber()+1) { // 受信したセグメントの内容を調べる。(FINチェック) // FINに対するACKが返ってきたので、CLOSEDへ。 pSender->setNextState(SenderStateClosed::getInstance()); } else { VERBOSE("セグメントは受信しましたが、FINへのACKセグメントではないようです。破棄します。\n"); } } } ///< コンストラクタ SenderImpl::SenderImpl(void) throw() :m_initialized(false) { } ///< デストラクタ SenderImpl::~SenderImpl(void) { Finalize(); } #ifdef _WIN32 nn::Result SenderImpl::Initialize(SOCKET sock, void *pSendBuf, u16 sendBufSize) #else nn::Result SenderImpl::Initialize(u16 nodeId, u8 port, void *pSendBuf, u16 sendBufSize) #endif { if(m_initialized) { return ResultAlreadyInitialized(); } else { if(pSendBuf==NULL) { return ResultNullPointer(); } if(sendBufSize==0) { return ResultInvalidSize(); } #ifdef _WIN32 nn::Result result = HostBase::Initialize(sock); #elif defined(NN_PLATFORM_CTR) nn::Result result = HostBase::Initialize(nodeId, port); #endif if(result.IsFailure()) { return result; } else { m_sendBuffer.Initialize(pSendBuf, sendBufSize); m_pState = SenderStateClosed::getInstance(); m_pNextState = NULL; clear(); m_initialized = true; return ResultSuccess(); } } } void SenderImpl::Finalize(void) { if(m_initialized) { m_initialized = false; m_sendBuffer.Finalize(); HostBase::Finalize(); } else { // Do nothing. } } nn::Result SenderImpl::Process(void) { ASSERT(m_initialized); // 状態に応じたupdate() if(m_pState) { m_pState->update(this); } // 状態遷移 changeState(); // デバッグ用 // PrintDebugInfo(); // エラーはここで返る。 // 返したら、エラーはリセットする。 nn::Result ret = GetErrorCode(); errorHandling(ResultSuccess()); return ret; } nn::Result SenderImpl::Open(void) { ASSERT(m_initialized); if(GetStatus()==SENDER_STATE_CLOSED) { setNextState(SenderStateOpenRequested::getInstance()); changeState(); return ResultSuccess(); } else { return ResultUntimelyFunctionCall(); } } nn::Result SenderImpl::Close(void) { ASSERT(m_initialized); if(GetStatus()==SENDER_STATE_OPENED) { setNextState(SenderStateCloseRequested::getInstance()); changeState(); return ResultSuccess(); } else { // CLOSED状態の時にClose()が呼ばれたら、エラーにする。 // この挙動はTCPのRFCに準じているが、この挙動もここに含まれる。 return ResultUntimelyFunctionCall(); } } nn::Result SenderImpl::Send(const void *pBuf, size_t bufSize) { ASSERT(m_initialized); if(pBuf==NULL) { return ResultNullPointer(); } if(bufSize==0) { return ResultDoNothing(); } if(GetStatus()!=SENDER_STATE_OPENED) { return ResultUntimelyFunctionCall(); } if(m_sendBuffer.Push(pBuf, bufSize)) { return ResultSuccess(); } else { return ResultSendBufferIsNotAvailable(); } } void SenderImpl::Cancel(void) { ASSERT(m_initialized); // キャンセル処理はProcess()を待たずに即座に実行することにする。 // エラーハンドリングを実装したサンプルプログラムを記述している最中、 // そのほうがアプリ側にとって使いやすいように思えたので。 // RST送信 sendRstSegment(m_sendBuffer.GetCurrentSequenceNumber()); // 即座にClosed状態へ。 setNextState(SenderStateClosed::getInstance()); changeState(); } enum SenderState SenderImpl::GetStatus(void) const { ASSERT(m_initialized); ASSERT(m_pState!=NULL); return m_pState->getStatus(); } bool SenderImpl::processResending(void) { Segment seg; // タイムアウトを迎えたセグメントがキューに見つかったか、 // あるいはいったんそういうセグメントが見つかり、残りの // セグメントを送り終えるのを待っている状態か if(m_resendQueue.IsResendMode()) { bool ret = m_resendQueue.Front(&seg); ASSERT(ret); VERBOSE("新・再送処理を実行します。\n"); VERBOSE("送信セグメントのシーケンス番号は:%dです。\n", seg.GetSeqNumber()); putSegment(seg); static detail::Stopwatch s_sp("ResendQueue::TryAgain()"); s_sp.Start(); m_resendQueue.TryAgain(); // キューの先頭要素を最後尾に回す s_sp.Stop(); return true; } else { return false; } } void SenderImpl::sendData(void) { if(m_sendBuffer.IsEmpty()) { VERBOSE("送信バッファはカラッポでしたので、送信処理は行いません。\n"); return; } u32 unackDataSize = m_resendQueue.GetTotalDataSize(); if(unackDataSize >= m_remoteWindowSize) { VERBOSE("到達未確認のデータが受信ウィンドウサイズ以上なので、データ送信は見送ります。\n"); return; } if(m_resendQueue.IsFull()) { VERBOSE("再送キューが満杯なので、送信処理は行いません。\n"); return; } size_t vacant = m_remoteWindowSize - unackDataSize; size_t pullSize = min(Segment::PAYLOAD_SIZE, vacant); // 送信バッファから引き出す量 ASSERT(pullSize > 0); // 以下、データ送信処理 Segment seg; seg.ClearHeader(); u32 seq = 0; // データをセグメントに詰める処理を実行。 size_t sz = m_sendBuffer.Pull(seg.payload, &seq, pullSize); ASSERT(sz > 0); // ここにくるまでに、再送処理が確実に行われること(再送キューに余裕があること)などを // しっかり保証しておかないといけない。 seg.header.dataLength = sz; seg.SetSeqNumber(seq); putSegmentWithResend(seg); } void SenderImpl::sendSynSegment(void) { Segment seg; seg.ClearHeader(); seg.SetSeqNumber(getInitialSequenceNumber()); seg.SetSyn(); putSegmentWithResend(seg); } void SenderImpl::sendFinSegment(void) { ASSERTMSG(m_sendBuffer.IsEmpty(), "You cannot request FIN if send buffer is not empty."); VERBOSE("sendFinSegment() called.\n"); Segment seg; seg.ClearHeader(); seg.SetFin(); seg.SetSeqNumber(m_sendBuffer.GetCurrentSequenceNumber()); putSegmentWithResend(seg); } void SenderImpl::clear(void) { m_resendQueue.Clear(); m_remoteWindowSize = 0; #if 1 m_iss = static_cast(0xffffffff & GetCurrentTimeAsMillisecond()); #else m_iss = 0xffffffff - 100; #endif m_sendBuffer.Clear(m_iss); m_una = m_iss; m_nxt = m_iss; m_arrivals = 0; } bool SenderImpl::isValidAckNumber(u32 ack) const { s32 s32_ack = static_cast(ack); s32 s32_una = static_cast(getUnacknowledgeNumber()); s32 s32_nxt = static_cast(getNextSequenceNumber()); // 許容ACKの不等式、snd.una < seg.ack <= snd.nxtを表現する。 if((0 <= s32_una - s32_ack) || (0 < s32_ack - s32_nxt)) { return false; } else { return true; } } // 状態遷移。次の状態がセットされていないなら、特に何もしない。 void SenderImpl::changeState(void) { if(m_pNextState) { if(m_pState) { m_pState->finalize(this); } m_pState = m_pNextState; m_pNextState = NULL; m_pState->initialize(this); } } void SenderImpl::setNextState(SenderStateBase *p) { ASSERT(p!=NULL); ASSERT(m_pNextState==NULL); m_pNextState = p; } void SenderImpl::putSegmentWithResend(const Segment &seg) { // 再送キューが満杯では、信頼性のある通信が実現できない。 ASSERT(!m_resendQueue.IsFull()); putSegment(seg); // 全てのセグメントは、再送対象であることに依存している。 // もし、SYNセグメントを再送対象から外してしまうと、 // うまくいかない。 m_nxt = seg.GetLastSeqNumber() + 1; // 再送キューにセグメントを入れる。 // 基本的に送信側は、「届いた」ことが確認できなければ、再送しなければならない。 // 受信側は、自分の応答が相手に届くかどうか、あまり気にしないことにする。 // 届いていなかったら、相手の方から何か言ってくることを期待する。 bool ret = m_resendQueue.Push(seg); if(!ret) { errorHandling(ResultResendQueueIsFull()); PANIC("It seems that resend queue in RDT library is full."); } } void SenderImpl::processReceivedSegment(const Segment &seg) { // セグメントが到着したときに、必ず実行される処理を // ここにまとめる。 ++m_arrivals; // ACKフィールドチェック if(seg.IsAck()) { const u32 ack = seg.GetAckNumber(); if(isValidAckNumber(ack)) { m_una = ack; // 配達の確認ができたパケットは、再送キューから除去できる。 VERBOSE("ACK番号 %d を受け取りました。\n", ack); m_resendQueue.Remove(ack); // ウィンドウサイズ更新 m_remoteWindowSize = seg.GetWindowSize(); } else { VERBOSE("una : %d\n", getUnacknowledgeNumber()); VERBOSE("nxt : %d\n", getNextSequenceNumber()); VERBOSE("不正、あるいは受信済みのACK番号です。(%u)\n", ack); } } } void SenderImpl::PrintDebugInfo(void) const { ASSERT(m_initialized); m_resendQueue.PrintDebugInfo(); m_sendBuffer.PrintDebugInfo(); LOG("m_remoteWindowSize: %d\n", m_remoteWindowSize); LOG("m_arrivals: %u\n", m_arrivals); } void SenderImpl::PrintProperty(void) { LOG("m_sendBuffer: %d (%d)\n", offsetof(SenderImpl, m_sendBuffer), sizeof(SendBuffer)); LOG("m_remoteWindowSize: %d (%d)\n", offsetof(SenderImpl, m_remoteWindowSize), sizeof(u16)); LOG("m_resendQueue: %d (%d)\n", offsetof(SenderImpl, m_resendQueue), sizeof(ResendQueue)); LOG("m_pState: %d (%d)\n", offsetof(SenderImpl, m_pState), sizeof(SenderStateBase*)); LOG("m_pNextState: %d (%d)\n", offsetof(SenderImpl, m_pNextState), sizeof(SenderStateBase*)); LOG("m_arrivals: %d (%d)\n", offsetof(SenderImpl, m_arrivals), sizeof(u32)); LOG("sizeof(SenderImpl)=%ld\n", (long) sizeof(SenderImpl)); } // この単体テストは、Receiver抜きで実行できるテスト…を目指していたのですが、 // ペンディング。 void SenderImpl::Test(void) { /* const int BUFSIZE = 1024; char buf[BUFSIZE]; SenderImpl s; const int SUCCESS = 0; CU_ASSERT(s.GetStatus()==SENDER_STATE_CLOSED); CU_ASSERT(s.Open()==SUCCESS); for(int i=0; i<10; ++i) { CU_ASSERT(s.Process()==SUCCESS); CU_ASSERT(s.GetStatus()==SENDER_STATE_OPENING); SleepCurrentThread(50); } CU_ASSERT(s.Cancel()==SUCCESS); while(1) { CU_ASSERT(s.Process()==SUCCESS); CU_ASSERT(s.GetStatus()==SENDER_STATE_CLOSED); SleepCurrentThread(50); } */ } }}} // namespace nn::rdt::CTR