Skip to content
Draft
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,486 changes: 769 additions & 717 deletions Cargo.lock

Large diffs are not rendered by default.

44 changes: 44 additions & 0 deletions crates/engineioxide-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
[package]
name = "engineioxide-client"
description = "Engine IO client implementation in rust"
version = "0.17.0"
edition.workspace = true
rust-version.workspace = true
authors.workspace = true
repository.workspace = true
homepage.workspace = true
keywords.workspace = true
categories.workspace = true
license.workspace = true
readme = "README.md"

[dependencies]
engineioxide-core = { path = "../engineioxide-core", version = "0.2" }
bytes.workspace = true
futures-core.workspace = true
futures-util.workspace = true
http.workspace = true
http-body.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["rt", "time"] }
hyper = { workspace = true, features = ["client", "http1"] }
tokio-tungstenite.workspace = true
http-body-util.workspace = true
pin-project-lite.workspace = true
smallvec.workspace = true
hyper-util = { workspace = true, features = ["tokio"] }

# Tracing
tracing = { workspace = true, optional = true }

[dev-dependencies]
tokio = { workspace = true, features = ["macros", "parking_lot"] }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
engineioxide = { path = "../engineioxide", features = ["tracing", "v3"] }

[features]
v3 = ["engineioxide-core/v3"]
tracing = ["dep:tracing", "engineioxide-core/tracing"]
__test_harness = []
Empty file.
112 changes: 112 additions & 0 deletions crates/engineioxide-client/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use std::{
fmt,
pin::Pin,
sync::Mutex,
Comment thread Fixed
Comment thread Fixed
task::{Context, Poll},
};

use engineioxide_core::{Packet, PacketBuf, PacketParseError, Sid};
Comment thread Fixed
Comment thread Fixed
use futures_core::Stream;
use futures_util::{
Sink, SinkExt, StreamExt,
Comment thread Fixed
Comment thread Fixed
stream::{SplitSink, SplitStream},
};
use tokio::sync::mpsc::{self, error::TrySendError};
Comment thread Fixed
Comment thread Fixed
Comment thread Fixed
Comment thread Fixed

use crate::{
HttpClient, poll,
transport::{Transport, polling::PollingSvc},
};

type SendPongFut<S> = Pin<
Comment thread Fixed
Comment thread Fixed
Box<
dyn Future<Output = Result<(), <SplitSink<Transport<S>, Packet> as Sink<Packet>>::Error>>
+ 'static,
>,
>;

pin_project_lite::pin_project! {
pub struct Client<S: PollingSvc> {
#[pin]
pub transport_rx: SplitStream<Transport<S>>,
// TODO: is this the right implementation? We need something that can be driven itself.
// Otherwise we need a way to drive the transport_tx. Normally it should be driven by the user.
// But what if we need to send a PONG packet from the inner lib?
#[pin]
pub transport_tx: SplitSink<Transport<S>, Packet>,

pub sid: Sid,
// pub tx: mpsc::Sender<PacketBuf>,
// pub(crate) rx: Mutex<mpsc::Receiver<PacketBuf>>,
}
}

impl<S: PollingSvc> Client<S>
where
S::Error: fmt::Debug,
<S::Body as http_body::Body>::Error: fmt::Debug,
{
pub async fn connect(svc: S) -> Result<Self, ()> {
let mut inner = HttpClient::new(svc);
let packet = inner.handshake().await.unwrap();

let transport = Transport::Polling { inner };
let (transport_tx, transport_rx) = transport.split();
let client = Client {
transport_tx,
transport_rx,
sid: packet.sid,
};

Ok(client)
}
}

impl<S: PollingSvc> Stream for Client<S>
where
S::Error: fmt::Debug,
<S::Body as http_body::Body>::Error: fmt::Debug,
{
type Item = Result<Packet, PacketParseError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
Comment thread Fixed
Comment thread Fixed
match poll!(this.transport_rx.poll_next(cx)) {
Some(Ok(Packet::Ping)) => {
cx.waker().wake_by_ref();
// let mut tx = self.transport_tx.clone();
// let fut = async move {
// tx.send(Packet::Pong).await?;
// tx.flush().await
// };
// this.pending_pong.set(Some(Box::pin(fut)));

Poll::Pending
}
packet => Poll::Ready(packet),
}
}
}

impl<S: PollingSvc> Sink<Packet> for Client<S>
where
S::Error: fmt::Debug,
<S::Body as http_body::Body>::Error: fmt::Debug,
{
type Error = ();

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().transport_tx.poll_ready(cx)
}

fn start_send(self: Pin<&mut Self>, item: Packet) -> Result<(), Self::Error> {
self.project().transport_tx.start_send(item)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().transport_tx.poll_flush(cx)
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().transport_tx.poll_close(cx)
}
}
1 change: 1 addition & 0 deletions crates/engineioxide-client/src/io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

19 changes: 19 additions & 0 deletions crates/engineioxide-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// #![warn(clippy::pedantic)]
#![allow(clippy::similar_names)]
//! Engine.IO client library for Rust.

mod client;
mod io;
mod transport;
pub use crate::client::Client;
pub use crate::transport::polling::HttpClient;

#[macro_export]
macro_rules! poll {
($expr:expr) => {
match $expr {
std::task::Poll::Pending => return std::task::Poll::Pending,
std::task::Poll::Ready(value) => value,
}
};
}
81 changes: 81 additions & 0 deletions crates/engineioxide-client/src/transport/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use std::{
fmt,
pin::Pin,
task::{Context, Poll},
};

use engineioxide_core::{Packet, PacketParseError, TransportType};
use futures_core::Stream;
use futures_util::Sink;

use crate::{HttpClient, transport::polling::PollingSvc};

pub mod polling;

pin_project_lite::pin_project! {
#[project = TransportProj]
pub enum Transport<S: PollingSvc> {
Polling {
#[pin]
inner: HttpClient<S>
},
Websocket {
#[pin]
inner: HttpClient<S>
}
}
}

impl<S: PollingSvc> Transport<S> {
pub fn transport_type(&self) -> TransportType {
match self {
Transport::Polling { .. } => TransportType::Polling,
Transport::Websocket { .. } => TransportType::Websocket,
}
}
}

impl<S: PollingSvc> Stream for Transport<S>
where
S::Error: fmt::Debug,
<S::Body as http_body::Body>::Error: fmt::Debug,
{
type Item = Result<Packet, PacketParseError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.as_mut().project() {
TransportProj::Polling { inner } => inner.poll_next(cx),
TransportProj::Websocket { inner } => inner.poll_next(cx),
}
}
}
impl<S: PollingSvc> Sink<Packet> for Transport<S> {
type Error = ();

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.project() {
TransportProj::Polling { inner } => inner.poll_ready(cx),
TransportProj::Websocket { inner } => inner.poll_ready(cx),
}
}

fn start_send(self: Pin<&mut Self>, item: Packet) -> Result<(), Self::Error> {
match self.project() {
TransportProj::Polling { inner } => inner.start_send(item),
TransportProj::Websocket { inner } => inner.start_send(item),
}
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.project() {
TransportProj::Polling { inner } => inner.poll_flush(cx),
TransportProj::Websocket { inner } => inner.poll_flush(cx),
}
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.project() {
TransportProj::Polling { inner } => inner.poll_close(cx),
TransportProj::Websocket { inner } => inner.poll_close(cx),
}
}
}
Loading
Loading