diff --git a/Cargo.toml b/Cargo.toml index 471ffe19723f7a4bbe0a7eed0671925dcee6f348..cde69204f8f3b2caf31f6e8223f3fc516e5c8434 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ members = [ "./connection", "./binlog_cli", "./tests", + "./relay_log", ] [workspace.package] @@ -29,6 +30,7 @@ memory = { path = "memory", version = "0.0.2" } common = { path = "common", version = "0.0.2" } binlog = { path = "binlog", version = "0.0.2" } binlog_cli = { path = "binlog_cli", version = "0.0.2" } +relay_log = { path = "relay_log", version = "0.0.2" } tokio = {version = "1.32.0", features = ["full"]} async-trait = "0.1.73" diff --git a/relay_log/Cargo.toml b/relay_log/Cargo.toml new file mode 100644 index 0000000000000000000000000000000000000000..e23d34f7dce51bb47ee77c964b24d1b963c6d7dd --- /dev/null +++ b/relay_log/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "relay_log" +version = { workspace = true } +authors = { workspace = true } +edition = { workspace = true } +license = { workspace = true } +publish = { workspace = true } + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +common = { workspace = true } +binlog = { workspace = true } + +serde = { workspace = true } +flatbuffers = { workspace = true } +tokio = { workspace = true } +async-trait ={ workspace = true } +lazy_static = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +num_enum = { workspace = true } +byteorder = { workspace = true } +dashmap = { workspace = true } +ringbuffer = { workspace = true } +pin-utils = { workspace = true } +serde_json = { workspace = true } +bincode = { workspace = true } +chrono = { workspace = true } +bytes = { workspace = true } +getset = { workspace = true } +memmap2 = { workspace = true } +checksum = { workspace = true } \ No newline at end of file diff --git a/relay_log/README.md b/relay_log/README.md new file mode 100644 index 0000000000000000000000000000000000000000..e42dc95105f77bbcb8206d123f6edd6e6769519e --- /dev/null +++ b/relay_log/README.md @@ -0,0 +1 @@ +# relay_log 中继日志模块 diff --git a/relay_log/src/apply/mod.rs b/relay_log/src/apply/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..9bbcf46b67cefe7b15fa4b1edcf803ae6e07630f --- /dev/null +++ b/relay_log/src/apply/mod.rs @@ -0,0 +1,2 @@ +pub mod relay_log_apply; +pub mod relay_log_apply_snapshot; \ No newline at end of file diff --git a/relay_log/src/apply/relay_log_apply.rs b/relay_log/src/apply/relay_log_apply.rs new file mode 100644 index 0000000000000000000000000000000000000000..4626a4690a7366ddcf499f6aa177c67da15cae0a --- /dev/null +++ b/relay_log/src/apply/relay_log_apply.rs @@ -0,0 +1,5 @@ + +/// todo +pub struct RelayLogApply { + +} \ No newline at end of file diff --git a/relay_log/src/apply/relay_log_apply_snapshot.rs b/relay_log/src/apply/relay_log_apply_snapshot.rs new file mode 100644 index 0000000000000000000000000000000000000000..10f71c1a200c190cd68d4d14d786038061fe2dcb --- /dev/null +++ b/relay_log/src/apply/relay_log_apply_snapshot.rs @@ -0,0 +1,5 @@ + +/// todo 日志Apply快照 +pub struct RelayLogApplySnapshot { + +} \ No newline at end of file diff --git a/relay_log/src/codec/binary_codec.rs b/relay_log/src/codec/binary_codec.rs new file mode 100644 index 0000000000000000000000000000000000000000..e2feafe68deb17c670ef2bad19cd0092cfed430e --- /dev/null +++ b/relay_log/src/codec/binary_codec.rs @@ -0,0 +1,158 @@ +use std::fmt::{Debug, Formatter}; + +use bincode::{DefaultOptions, Options}; +use tracing::error; + +use common::err::CResult; +use common::err::decode_error::ReError; + +use crate::codec::codec::Codec; + +#[derive(Clone)] +pub struct BinaryCodec { + serialize_options: DefaultOptions, +} + +pub enum CodecStyle { + // 小端定长 + LittleFix(u64), + // 大端定长 + BigFix(u64), + // 小端变长 + LittleVar, + // 大端变长 + BigVar, +} + +impl Codec for BinaryCodec { + fn new() -> Self where Self: Sized { + BinaryCodec { + serialize_options: bincode::options(), + } + } + + fn name(&self) -> String { + String::from("BinaryCodec") + } +} + +impl BinaryCodec { + /// Serializes a serializable object into a `Vec` of bytes. + pub fn binary_serialize(&self, codec_style: &CodecStyle, value: &T) -> CResult> + where + T: serde::Serialize, + { + match codec_style { + CodecStyle::LittleFix(limit) => { + let bytes = self.serialize_options + .with_limit(*limit) + .with_little_endian() + .with_fixint_encoding() + .serialize(value).or_else(|e| { + error!("binary serialize err: {:?}", &e); + Err(ReError::Error(e.to_string())) + })?; + Ok(bytes) + } + CodecStyle::BigFix(limit) => { + let bytes = self.serialize_options + .with_limit(*limit) + .with_big_endian() + .with_fixint_encoding() + .serialize(value).or_else(|e| { + error!("binary serialize err: {:?}", &e); + Err(ReError::Error(e.to_string())) + })?; + Ok(bytes) + } + CodecStyle::LittleVar => { + let bytes = self.serialize_options + .allow_trailing_bytes() + .with_no_limit() + .with_little_endian() + .with_varint_encoding() + .serialize(value).or_else(|e| { + error!("binary serialize err: {:?}", &e); + Err(ReError::Error(e.to_string())) + })?; + Ok(bytes) + } + CodecStyle::BigVar => { + let bytes = self.serialize_options + .allow_trailing_bytes() + .with_no_limit() + .with_big_endian() + .with_varint_encoding() + .serialize(value).or_else(|e| { + error!("binary serialize err: {:?}", &e); + Err(ReError::Error(e.to_string())) + })?; + Ok(bytes) + } + } + } + + /// Deserializes a slice of bytes into an instance of `T`. + pub fn binary_deserialize<'a, T>(&self, codec_style: &CodecStyle, bytes: &'a [u8]) -> CResult + where + T: serde::de::Deserialize<'a>, + { + match codec_style { + CodecStyle::LittleFix(limit) => { + let r = self.serialize_options + .with_limit(*limit) + .with_little_endian() + .with_fixint_encoding() + .deserialize::(bytes).or_else(|e| { + error!("binary deserialize err: {:?}", &e); + Err(ReError::Error(e.to_string())) + })?; + Ok(r) + } + CodecStyle::BigFix(limit) => { + let r = self.serialize_options + .with_limit(*limit) + .with_big_endian() + .with_fixint_encoding() + .deserialize::(bytes).or_else(|e| { + error!("binary deserialize err: {:?}", &e); + Err(ReError::Error(e.to_string())) + })?; + Ok(r) + } + CodecStyle::LittleVar => { + let r = self.serialize_options + .allow_trailing_bytes() + .with_no_limit() + .with_little_endian() + .with_varint_encoding() + .deserialize::(bytes).or_else(|e| { + error!("binary deserialize err: {:?}", &e); + Err(ReError::Error(e.to_string())) + })?; + Ok(r) + } + CodecStyle::BigVar => { + let r = self.serialize_options + .allow_trailing_bytes() + .with_no_limit() + .with_big_endian() + .with_varint_encoding() + .deserialize::(bytes).or_else(|e| { + error!("binary deserialize err: {:?}", &e); + Err(ReError::Error(e.to_string())) + })?; + Ok(r) + } + } + } +} + +impl Debug for BinaryCodec { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BinaryCodec") + .field("name", &self.name()) + .field("serializer", &"bincode") + .finish() + } +} \ No newline at end of file diff --git a/relay_log/src/codec/codec.rs b/relay_log/src/codec/codec.rs new file mode 100644 index 0000000000000000000000000000000000000000..993842382db2c18ccbfd3c4cadcb0ab76664eac7 --- /dev/null +++ b/relay_log/src/codec/codec.rs @@ -0,0 +1,10 @@ + +/// 编解码 +pub trait Codec { + /// 实例化 + fn new() -> Self where Self: Sized; + + /// 实例类型名称 + fn name(&self) -> String; + +} \ No newline at end of file diff --git a/relay_log/src/codec/json_codec.rs b/relay_log/src/codec/json_codec.rs new file mode 100644 index 0000000000000000000000000000000000000000..dfb48ab41fc742107561265fb48af4b457d37975 --- /dev/null +++ b/relay_log/src/codec/json_codec.rs @@ -0,0 +1,26 @@ +use std::fmt::{Debug, Formatter}; +use crate::codec::codec::Codec; + +#[derive(Clone)] +pub struct JsonCodec { + +} + +impl Codec for JsonCodec { + fn new() -> Self where Self: Sized { + JsonCodec { + + } + } + + fn name(&self) -> String { + String::from("JsonCodec") + } +} + +impl Debug for JsonCodec { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + todo!() + } +} + diff --git a/relay_log/src/codec/mod.rs b/relay_log/src/codec/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..0d223c8c6e2bd6850978b2482a53bd58bc339a3f --- /dev/null +++ b/relay_log/src/codec/mod.rs @@ -0,0 +1,6 @@ +pub mod json_codec; +pub mod codec; +pub mod binary_codec; + +#[cfg(test)] +mod test; diff --git a/relay_log/src/lib.rs b/relay_log/src/lib.rs new file mode 100644 index 0000000000000000000000000000000000000000..d5e54260ab7d4e02a2687050ca61db5491d4c4b9 --- /dev/null +++ b/relay_log/src/lib.rs @@ -0,0 +1,8 @@ + +pub mod storage; +pub mod apply; +pub mod codec; +pub mod relay_log_server; +pub mod relay_log_server_machine; +pub mod relay_log; + diff --git a/relay_log/src/relay_log.rs b/relay_log/src/relay_log.rs new file mode 100644 index 0000000000000000000000000000000000000000..8d62b6f392cbbc5356bc504eb030ac8f212dcb71 --- /dev/null +++ b/relay_log/src/relay_log.rs @@ -0,0 +1,419 @@ +use chrono::{NaiveDate, NaiveDateTime, NaiveTime, Utc}; +use getset::{Getters, Setters}; +use serde::{Deserialize, Serialize}; +use tracing::warn; + +use binlog::events::binlog_event::BinlogEvent; +use binlog::events::declare::log_event::LogEvent; +use binlog::events::declare::rows_log_event::RowsLogEvent; +use binlog::events::protocol::table_map_event::ColumnInfo; +use binlog::row::row_data::RowData; +use common::binlog::column::column_type::SrcColumnType; +use common::binlog::column::column_value::SrcColumnValue; +use common::binlog::src_meta::SrcType; +use common::schema::data_type::{DstColumnType, Value}; + +/// 中继日志信息 +#[derive(Serialize, Deserialize, Debug, Clone, Getters, Setters)] +pub struct RelayLog { + /// src type + #[getset(get = "pub", set = "pub")] + src_type: SrcType, + + /// binlog event position + #[getset(get = "pub", set = "pub")] + event_log_pos: u64, + + /// binlog event name + #[getset(get = "pub", set = "pub")] + event_name: String, + + /// database + #[getset(get = "pub", set = "pub")] + database_name: String, + + /// table + #[getset(get = "pub", set = "pub")] + table_name: String, + + /// column info + #[getset(get = "pub", set = "pub")] + columns: Vec, + + /// replay command + #[getset(get = "pub", set = "pub")] + relay_command: RelayCommand, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum RelayCommand { + None, + CreateDatabase, + DropDatabase, + CreateTable, + DropTable, + AlterTable, + /// 插入数据 + Insert(Vec), + /// 删除数据 + Delete(Vec), + /// 更新数据: (deleteRows, insertRows) + Update(Vec<(RelayRowData, RelayRowData)>), +} + +/// 列信息 +#[derive(Serialize, Deserialize, Debug, Clone, Getters, Setters)] +pub struct RelayColumnInfo { + /// 列类型 + #[getset(get = "pub", set = "pub")] + column_type: DstColumnType, + + /// 列名称 + #[getset(get = "pub", set = "pub")] + column_name: String, +} + +impl RelayColumnInfo { + fn from_binlog_column_info(binlog_column_info: &ColumnInfo) -> Self { + let column_name = binlog_column_info.get_name(); + if let Some(binlog_column_type) = binlog_column_info.get_c_type().take() { + let column_type = match binlog_column_type { + SrcColumnType::Null => { + DstColumnType::Null + } + // Decimal + SrcColumnType::Decimal | + SrcColumnType::NewDecimal => { + DstColumnType::Decimal + } + // Int + SrcColumnType::Tiny | + SrcColumnType::Int24 | + SrcColumnType::Year => { + DstColumnType::Int + } + SrcColumnType::Short => { + DstColumnType::Short + } + // Long + SrcColumnType::Long | + SrcColumnType::LongLong => { + DstColumnType::Long + } + SrcColumnType::Float => { + DstColumnType::Float + } + SrcColumnType::Double => { + DstColumnType::Double + } + // Timestamp + SrcColumnType::Timestamp | + SrcColumnType::Timestamp2 => { + DstColumnType::Timestamp + } + // Date + SrcColumnType::Date | + SrcColumnType::NewDate => { + DstColumnType::Date + } + // Time + SrcColumnType::Time | + SrcColumnType::Time2 => { + DstColumnType::Time + } + // DateTime + SrcColumnType::DateTime | + SrcColumnType::DateTime2 => { + DstColumnType::DateTime + } + // String + SrcColumnType::VarChar | + SrcColumnType::Enum | + SrcColumnType::Set | + SrcColumnType::VarString | + SrcColumnType::String | + SrcColumnType::Array => { + DstColumnType::String + } + SrcColumnType::Bit => { + DstColumnType::Bitmap + } + SrcColumnType::Bool => { + DstColumnType::Boolean + } + SrcColumnType::Json => { + DstColumnType::JSON + } + // Blob + SrcColumnType::TinyBlob | + SrcColumnType::MediumBlob | + SrcColumnType::LongBlob | + SrcColumnType::Blob => { + DstColumnType::Blob + } + SrcColumnType::Geometry => { + DstColumnType::Geometry + } + // todo 未知? + SrcColumnType::Invalid => { + DstColumnType::Other + } + }; + Self { + column_type, + column_name, + } + } else { + Self::default() + } + } +} + +impl Default for RelayColumnInfo { + fn default() -> Self { + Self { + column_type: DstColumnType::Null, + column_name: "".to_string(), + } + } +} + +/// 一行数据 +#[derive(Serialize, Deserialize, Debug, Clone, Getters, Setters)] +pub struct RelayRowData { + #[getset(get = "pub", set = "pub")] + values: Vec, +} + +impl RelayRowData { + fn from_binlog_row(binlog_row: &RowData) -> Self { + let values: Vec = binlog_row.get_cells().iter().map(|c| { + if let Some(v) = c { + match v { + // `TinyInt`,`SmallInt`,`MediumInt`,`Int` => `Int` + SrcColumnValue::TinyInt(data) => { + Value::Int(*data as i32) + } + SrcColumnValue::SmallInt(data) => { + Value::Int(*data as i32) + } + SrcColumnValue::MediumInt(data) => { + Value::Int(*data as i32) + } + SrcColumnValue::Int(data) => { + Value::Int(*data as i32) + } + // `BigInt` => `Long` + SrcColumnValue::BigInt(data) => { + Value::Long(*data as i64) + } + SrcColumnValue::Float(data) => { + Value::Float(*data) + } + SrcColumnValue::Double(data) => { + Value::Double(*data) + } + SrcColumnValue::Decimal(data) => { + Value::Decimal(data.to_string()) + } + SrcColumnValue::String(data) => { + Value::String(data.to_string()) + } + SrcColumnValue::Blob(data) => { + Value::Blob(data.to_vec()) + } + // Year => Int + SrcColumnValue::Year(data) => { + Value::Int(*data as i32) + } + SrcColumnValue::Date(data) => { + if let Some(naive_date) = NaiveDate::from_ymd_opt(data.year as i32, data.month as u32, data.day as u32) { + if let Some(naive_datetime) = naive_date.and_hms_milli_opt(0, 0, 0, 0) { + Value::Date(naive_datetime.timestamp_millis()) + } else { + warn!("Date parse error."); + Value::Null + } + } else { + warn!("Date parse error."); + Value::Null + } + } + SrcColumnValue::Time(data) => { + if let Some(naive_time) = NaiveTime::from_hms_milli_opt(data.hour as u32, data.minute as u32, data.second as u32, data.millis) { + let now_date = Utc::now().date_naive(); + Value::Time(NaiveDateTime::new(now_date, naive_time).timestamp_millis()) + } else { + warn!("Time parse error."); + Value::Null + } + } + SrcColumnValue::DateTime(data) => { + if let Some(naive_date) = NaiveDate::from_ymd_opt(data.year as i32, data.month as u32, data.day as u32) { + if let Some(naive_datetime) = naive_date.and_hms_milli_opt(data.hour as u32, data.minute as u32, data.second as u32, data.millis) { + Value::DateTime(naive_datetime.timestamp_millis()) + } else { + warn!("DateTime parse error."); + Value::Null + } + } else { + warn!("DateTime parse error."); + Value::Null + } + } + SrcColumnValue::Timestamp(data) => { + Value::Timestamp(*data as i64) + } + + // todo 暂不支持的类型:`Bit`,`Enum`,`Set` + SrcColumnValue::Bit(data) => { + Value::Null + } + SrcColumnValue::Enum(data) => { + Value::Null + } + SrcColumnValue::Set(data) => { + Value::Null + } + } + } else { + Value::Null + } + }).collect(); + + Self { + values + } + } +} + +impl Default for RelayRowData { + fn default() -> Self { + Self { + values: vec![] + } + } +} + +impl RelayLog { + pub fn from_binlog_event(event: &BinlogEvent) -> Self { + // todo 暂时写死 Mysql 源 + let src_type = SrcType::Mysql; + match event { + BinlogEvent::WriteRows(e) => { + if let Some(table) = e.get_table_map_event() { + let event_log_pos = e.get_header().get_log_pos(); + let event_name = e.get_type_name(); + let insert_rows: Vec = e.get_rows() + .iter() + .map(|r| { + RelayRowData::from_binlog_row(r) + }).collect(); + let database_name = table.get_database_name(); + let table_name = table.get_table_name(); + let columns = table.get_column_infos() + .iter() + .map(|c| { + RelayColumnInfo::from_binlog_column_info(c) + }).collect(); + let relay_command = RelayCommand::Insert(insert_rows); + Self { + src_type, + event_log_pos, + event_name, + database_name, + table_name, + columns, + relay_command, + } + } else { + Self::default() + } + } + BinlogEvent::UpdateRows(e) => { + if let Some(table) = e.get_table_map_event() { + let event_log_pos = e.get_header().get_log_pos(); + let event_name = e.get_type_name(); + let update_rows: Vec<(RelayRowData, RelayRowData)> = e.rows + .iter() + .map(|r| { + (RelayRowData::from_binlog_row(&(r.get_before_update())), RelayRowData::from_binlog_row(&(r.get_after_update()))) + }).collect(); + let database_name = table.get_database_name(); + let table_name = table.get_table_name(); + let columns = table.get_column_infos() + .iter() + .map(|c| { + RelayColumnInfo::from_binlog_column_info(c) + }).collect(); + let relay_command = RelayCommand::Update(update_rows); + Self { + src_type, + event_log_pos, + event_name, + database_name, + table_name, + columns, + relay_command, + } + } else { + Self::default() + } + } + BinlogEvent::DeleteRows(e) => { + if let Some(table) = e.get_table_map_event() { + let event_log_pos = e.get_header().get_log_pos(); + let event_name = e.get_type_name(); + let delete_rows: Vec = e.get_rows().iter().map(|r| { + RelayRowData::from_binlog_row(r) + }).collect(); + let database_name = table.get_database_name(); + let table_name = table.get_table_name(); + let columns = table.get_column_infos() + .iter() + .map(|c| { + RelayColumnInfo::from_binlog_column_info(c) + }).collect(); + let relay_command = RelayCommand::Delete(delete_rows); + Self { + src_type, + event_log_pos, + event_name, + database_name, + table_name, + columns, + relay_command, + } + } else { + Self::default() + } + } + _ => { + // todo 其它event后续实现 + Self::default() + } + } + } + + pub fn get_database_name(&self) -> &str { + self.database_name.as_str() + } + + pub fn get_table_name(&self) -> &str { + self.table_name.as_str() + } +} + +impl Default for RelayLog { + fn default() -> Self { + Self { + src_type: SrcType::default(), + event_log_pos: 0, + event_name: "".to_string(), + database_name: "".to_string(), + table_name: "".to_string(), + columns: vec![], + relay_command: RelayCommand::None, + } + } +} \ No newline at end of file diff --git a/relay_log/src/relay_log_server.rs b/relay_log/src/relay_log_server.rs new file mode 100644 index 0000000000000000000000000000000000000000..fb7a46c66742754da5caac67476b3db98845c290 --- /dev/null +++ b/relay_log/src/relay_log_server.rs @@ -0,0 +1,139 @@ +use std::sync::{Arc, Mutex}; + +use tokio::sync::mpsc::Receiver; +use tokio::task::JoinHandle; +use tracing::{error, info, warn}; + +use binlog::events::binlog_event::BinlogEvent; +use common::err::CResult; +use common::err::decode_error::ReError; + +use crate::relay_log_server_machine::RelayLogServerMachine; + +#[derive(Debug)] +pub struct RelayLogServer { + state: Arc>, +} + +#[derive(Debug)] +struct ServerState { + running: bool, + receiver_handle: Option>>, +} + +impl RelayLogServer { + pub fn new() -> Self { + let state = Arc::new(Mutex::new(ServerState { + running: false, + receiver_handle: None, + })); + Self { + state, + } + } + + /// 实例化,且开始监听Receiver + pub fn new_with_binlog_receiver(rx: Receiver) -> CResult { + let state = Arc::new(Mutex::new(ServerState { + running: false, + receiver_handle: None, + })); + Self::recv_event(Arc::clone(&state), rx)?; + Ok(Self { + state + }) + } + + /// 开启中继日志监听服务(监听上游binlog事件) + pub fn start(&self, rx: Receiver) -> CResult { + Self::recv_event(Arc::clone(&self.state), rx) + } + + /// 监听binlog事件 + fn recv_event(state: Arc>, mut rx: Receiver) -> CResult { + let mut s = state.lock().or_else(|e| { + error!("stare recv binlog event err: {:?}", &e); + Err(ReError::Error(e.to_string())) + })?; + if s.running == true { + Err(ReError::String("The RelayLogServer is already running.".to_string())) + } else { + let shard_state = Arc::clone(&state); + // 开启一个task监听Receiver + let handle: JoinHandle> = tokio::spawn(async move { + // 监听Receiver。 + // 1. 若通道为空,但是发送端未关闭,则当前task放弃CPU使用权(不会阻塞线程)。 + // 2. 若通道为空,且发送端已经关闭,则收到消息:None + // 2. 若通道不为空,则接收到消息:Some(event) + while let Some(event) = rx.recv().await { + match RelayLogServerMachine::process_binlog_event(&event) { + Ok(()) => {} + Err(e) => { + error!("Precess BinlogEvent: {:?}, err: {:?}", event, e); + } + } + }; + warn!("binlog event sender closed, current receiving end is about to shutdown."); + let mut end_state = shard_state.lock().or_else(|e| { + error!("relay log server close err: {:?}", &e); + Err(ReError::Error(e.to_string())) + })?; + end_state.running = false; + end_state.receiver_handle = None; + Ok(()) + }); + + s.running = true; + s.receiver_handle = Some(handle); + Ok(true) + } + } + + /// + /// Receiver端异常close()后,只是半关闭状态,Receiver端仍然可以继续读取可能已经缓冲在通道中的消息, + /// close()只能保证Sender端无法再发送普通的消息,但Permit或OwnedPermit仍然可以向通道发送消息。 + /// 只有通道已空且所有Sender端(包括Permit和OwnedPermit)都已经关闭的情况下,recv()才会返回None,此时代表通道完全关闭。 + fn try_recv_event(&self, mut rx: Receiver) { + loop { + match rx.try_recv() { + Ok(event) => { + info!("收到binlog事件:{:?}", event); + } + Err(e) => { + warn!("binlog event Receiver try_recv: {:?}", e); + break; + } + } + } + } + + /// 关闭中继日志监听服务。 + /// 不主动关闭binlog监听端服务,receiver状态完全由sender决定。 + /// 只要存在一个sender未关闭,RelayLogServer则一直运行。 + // pub fn shutdown(&mut self) -> CResult { + // let state = self.state.lock(); + // match state { + // Ok(mut s) => { + // if let Some(handle) = s.receiver_handle.take() { + // handle.abort(); + // } + // s.running = false; + // Ok(true) + // } + // Err(e) => { + // Err(ReError::Error(e.to_string())) + // } + // } + // } + + pub fn is_running(&self) -> CResult { + return match self.state.lock() { + Ok(s) => { + Ok(s.running) + } + Err(e) => { + Err(ReError::String(e.to_string())) + } + }; + } +} \ No newline at end of file diff --git a/relay_log/src/relay_log_server_machine.rs b/relay_log/src/relay_log_server_machine.rs new file mode 100644 index 0000000000000000000000000000000000000000..5b2a3955a0274fe6fda1d2f43b7ca6aeb4815756 --- /dev/null +++ b/relay_log/src/relay_log_server_machine.rs @@ -0,0 +1,59 @@ +use std::collections::HashMap; +use std::sync::RwLock; + +use chrono::Local; +use lazy_static::lazy_static; +use tracing::{info, warn}; + +use binlog::events::binlog_event::BinlogEvent; +use common::err::CResult; +use common::schema::rc_task::RcTask; +use crate::relay_log::RelayLog; + +lazy_static! { + static ref INS: RelayLogServerMachine = RelayLogServerMachine { + t: Local::now().timestamp_millis(), + rc_task: RwLock::new(HashMap::::new()), + }; +} + +#[derive(Debug)] +pub struct RelayLogServerMachine { + t: i64, + // task信息(线程安全) + rc_task: RwLock>, +} + +unsafe impl Sync for RelayLogServerMachine {} + +impl RelayLogServerMachine { + /// 单例 + pub fn get_instance() -> &'static RelayLogServerMachine { + &INS + } + + /// 处理binlog事件 + pub fn process_binlog_event(event: &BinlogEvent) -> CResult<()> { + let relay_entity = RelayLog::from_binlog_event(event); + + let a = Self::get_instance(); + + info!("relay_entity: {:?}", relay_entity); + + let db_name = relay_entity.get_database_name(); + let table_name = relay_entity.get_table_name(); + + // todo.. + Ok(()) + } + + pub fn add_task(&self, task: RcTask) -> CResult { + let mut tasks = self.rc_task.write().unwrap(); + if tasks.contains_key(&task.task_id) { + tasks.remove(&task.task_id); + warn!("更新task.."); + } + tasks.insert(task.task_id.clone(), task); + Ok(true) + } +} \ No newline at end of file diff --git a/relay_log/src/storage/compactor.rs b/relay_log/src/storage/compactor.rs new file mode 100644 index 0000000000000000000000000000000000000000..1dd790b3f1dcba4f8cb1cb7d768acc6ec9d264f5 --- /dev/null +++ b/relay_log/src/storage/compactor.rs @@ -0,0 +1,5 @@ + +/// todo 日志整理,定时删除已经Apply完的segment文件 +pub struct Compactor { + +} \ No newline at end of file diff --git a/relay_log/src/storage/file_system.rs b/relay_log/src/storage/file_system.rs new file mode 100644 index 0000000000000000000000000000000000000000..e2f0b5a2c30a6fceda9ef15dfdd922f112dca039 --- /dev/null +++ b/relay_log/src/storage/file_system.rs @@ -0,0 +1,9 @@ +use common::err::CResult; + +pub trait FileSystem: Sized { + /// 文件加载 + fn from_file(file_path: &str, start_offset: u64, len: usize) -> CResult; + + /// 强制刷盘(底层调用内核sync方法) + fn flush(&self) -> CResult<()>; +} \ No newline at end of file diff --git a/relay_log/src/storage/mod.rs b/relay_log/src/storage/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..f1ba5320cedf4ea611683930a8a0d820cbdea665 --- /dev/null +++ b/relay_log/src/storage/mod.rs @@ -0,0 +1,13 @@ +pub mod relay_log_storage; +pub mod segment; +pub mod segment_manager; +pub mod storage_config; +pub mod compactor; +pub mod storage_event; +pub mod storage_entry; +pub mod segment_file; +pub mod segment_header; +pub mod segment_entry_position; +pub mod file_system; + + diff --git a/relay_log/src/storage/relay_log_storage.rs b/relay_log/src/storage/relay_log_storage.rs new file mode 100644 index 0000000000000000000000000000000000000000..b72acd36009c5fd0bb75e6f6b08efa6fc75fb5fb --- /dev/null +++ b/relay_log/src/storage/relay_log_storage.rs @@ -0,0 +1,125 @@ +use std::cell::RefCell; +use std::rc::Rc; + +use common::err::CResult; + +use crate::relay_log::RelayLog; +use crate::storage::segment::Segment; +use crate::storage::segment_manager::SegmentManager; +use crate::storage::storage_config::StorageConfig; +use crate::storage::storage_entry::StorageEntry; + +/// todo 目标表日志存储 +pub struct RelayLogStorage { + // 目标库 + dst_db_name: String, + // 目标表 + dst_table_name: String, + // segment管理器 + pub segment_manager: SegmentManager, + // 环形队列 + entry_buffer: EntryRingBuffer, + // 日志整理 + // log_compactor: Compactor, +} + +struct EntryRingBuffer { + entry_buffer_num: usize, + // entry缓存 + entry_buffer: Vec>>, + +} + +impl EntryRingBuffer { + pub fn new(entry_buffer_num: usize) -> Self { + let entry_buffer: Vec>> = vec![None; entry_buffer_num]; + Self { + entry_buffer_num, + entry_buffer, + } + } + + pub fn add(&mut self, entry: StorageEntry) { + let offset = self.offset(*entry.index()); + self.entry_buffer[offset] = Some(Rc::new(entry)); + } + + pub fn get(&self, index: u64) -> Option> { + let e = &self.entry_buffer[self.offset(index)]; + match e { + None => { + None + } + Some(e) => { + if *e.index() == index { + Some(Rc::clone(e)) + } else { + None + } + } + } + } + + fn offset(&self, index: u64) -> usize { + let mut offset = index as usize % self.entry_buffer_num; + if offset < 0 { + offset = 0; + } + offset + } +} + +impl RelayLogStorage { + pub fn new(storage_config: &StorageConfig, dst_db_name: String, dst_table_name: String) -> CResult { + let segment_manager = SegmentManager::new(storage_config, &dst_db_name, &dst_table_name)?; + let entry_buffer = EntryRingBuffer::new(*storage_config.entry_buffer_num()); + Ok(Self { + dst_db_name, + dst_table_name, + segment_manager, + entry_buffer, + }) + } + + /// 追加中继日志 + pub fn append_relay_log(&mut self, log: RelayLog) -> CResult<()> { + let mut entry = self.create_entry(log)?; + let current_segment = self.current_usable_segment()?; + // append to disk + current_segment.borrow_mut().append(&mut entry)?; + // add buf + self.entry_buffer.add(entry); + + // todo send storage_event + + Ok(()) + } + + /// get an entry by index + pub fn get_entry(&mut self, index: u64) -> CResult> { + if let Some(entry) = &self.entry_buffer.get(index) { + Ok(Rc::clone(entry)) + } else { + let segment = self.segment_manager.segment(index)?; + let entry = segment.borrow_mut().get_entry(index)?; + Ok(Rc::new(entry)) + } + } + + /// 创建中继日志存储实体 + fn create_entry(&mut self, log: RelayLog) -> CResult { + let current_segment = self.current_usable_segment()?; + let index = current_segment.borrow().next_index(); + Ok(StorageEntry::new(index, 0, 0, log)) + } + + /// 当前可用的segment + fn current_usable_segment(&mut self) -> CResult>> { + let mut current_segment = self.segment_manager.current_segment(); + if current_segment.borrow().is_full() { + current_segment.borrow_mut().write_flush()?; + current_segment = self.segment_manager.create_next_segment()?; + } + Ok(current_segment) + } +} \ No newline at end of file diff --git a/relay_log/src/storage/segment.rs b/relay_log/src/storage/segment.rs new file mode 100644 index 0000000000000000000000000000000000000000..7e6ddc02cc4cf25fea3198f9962e2b40916f7f12 --- /dev/null +++ b/relay_log/src/storage/segment.rs @@ -0,0 +1,369 @@ +use std::fmt::{Debug, Formatter}; +use std::fs; +use std::fs::{File, OpenOptions}; +use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write}; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; + +use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; +use checksum::crc32::Crc32; +use tracing::{error, warn}; + +use common::err::CResult; +use common::err::decode_error::ReError; + +use crate::codec::binary_codec::{BinaryCodec, CodecStyle}; +use crate::codec::binary_codec::CodecStyle::LittleVar; +use crate::codec::codec::Codec; +use crate::relay_log::RelayLog; +use crate::storage::file_system::FileSystem; +use crate::storage::segment::SegmentStatus::{ReadOnly, WriteRead}; +use crate::storage::segment_entry_position::SegmentEntryPosition; +use crate::storage::segment_file::SegmentFile; +use crate::storage::segment_header::SegmentHeader; +use crate::storage::storage_config::{SEGMENT_FILE_PRE, SEGMENT_HEADER_SIZE_BYTES, VERSION}; +use crate::storage::storage_entry::StorageEntry; + +const FILE_WRITE_BUFFER_SIZE: usize = 4 * 1024; +const FILE_READ_BUFFER_SIZE: usize = 16 * 1024; + +/// 日志存储文件. +/// +/// 文件结构: +/// ```txt +/// |=================| +/// | header | -> SegmentHeader +/// |-----------------| +/// | entry position | -> SegmentEntryPosition +/// |-----------------| +/// | StorageEntry | +/// | StorageEntry | +/// | ... | +/// |=================| +/// ``` +pub(crate) struct Segment { + // 文件信息(文件名、文件路径、文件大小) + segment_file: SegmentFile, + // 文件头 + header: SegmentHeader, + // entry偏移量 + entry_position: SegmentEntryPosition, + // 编解码 + codec: BinaryCodec, + // 编解码风格 + codec_style: CodecStyle, + // 文件读取 + reader: Arc>>, + // segment状态 + status: SegmentStatus, +} + +/// Segment Status +/// - WriteRead: 可读可写 +/// - ReadOnly: 只读 +pub(crate) enum SegmentStatus { + // 可读可写模式 + WriteRead(BufWriter), + // 只读模式 + ReadOnly, +} + +impl SegmentStatus { + pub fn name(&self) -> String { + match self { + WriteRead(_) => { + String::from("WriteRead") + } + ReadOnly => { + String::from("ReadOnly") + } + } + } +} + +impl Segment { + /// 新建一个segment + pub fn new(segment_dir_path: &str, + id: u32, + first_index: u64, + max_segment_size: u64, + max_entries: u32) -> CResult { + // rlog-{version}-{id}-{index}.log + let segment_file_name = format!("{}-{}-{}-{}.log", SEGMENT_FILE_PRE, VERSION, id, first_index); + // /x/x/x/x/rlog-{version}-{id}-{index}.log + let segment_file_path = PathBuf::from(segment_dir_path).join(&segment_file_name); + if !segment_file_path.exists() { + File::create_new(segment_file_path.as_path())?; + } + let segment_file_path_str = segment_file_path.to_str().ok_or(ReError::String("segment file not exists.".to_string()))?; + let header = SegmentHeader::new(segment_file_path_str, id, first_index, max_segment_size, max_entries)?; + let entry_position = SegmentEntryPosition::new(segment_file_path_str, max_entries)?; + let init_segment_size = SEGMENT_HEADER_SIZE_BYTES as u64 + (4 + max_entries as u64 * 8); + let segment_file = SegmentFile::new(segment_file_path_str.to_string(), segment_file_name, init_segment_size); + + let reader = BufReader::with_capacity(FILE_READ_BUFFER_SIZE, File::open(segment_file_path_str)?); + Ok(Self { + segment_file, + header, + entry_position, + codec: BinaryCodec::new(), + codec_style: LittleVar, + reader: Arc::new(Mutex::new(reader)), + status: ReadOnly, + }) + } + + /// 从文件初始化segment + pub fn from_file(file_path: &str) -> CResult { + let segment_file = SegmentFile::from_path(file_path)?; + let header = SegmentHeader::from_file(file_path, 0, SEGMENT_HEADER_SIZE_BYTES)?; + + let start_offset = SEGMENT_HEADER_SIZE_BYTES as u64; + let bytes_size = 4 + (*header.max_entries()) * 8; + let entry_position = SegmentEntryPosition::from_file(file_path, start_offset, bytes_size as usize)?; + + let reader = BufReader::with_capacity(FILE_READ_BUFFER_SIZE, File::open(file_path)?); + Ok(Self { + segment_file, + header, + entry_position, + codec: BinaryCodec::new(), + codec_style: LittleVar, + reader: Arc::new(Mutex::new(reader)), + status: ReadOnly, + }) + } + + /// 计算切片crc32值 + fn checksum(buf: &[u8]) -> u32 { + let mut crc = Crc32::new(); + crc.checksum(buf) + } + + /// 开启可写模式 + pub fn write_open(&mut self) -> CResult<()> { + let f1 = OpenOptions::new() + .write(true) + .append(true) + .open(self.segment_file.path())?; + let writer = BufWriter::with_capacity(FILE_WRITE_BUFFER_SIZE, f1); + self.status = WriteRead(writer); + Ok(()) + } + + /// 关闭可写模式 + pub fn write_close(&mut self) -> CResult<()> { + match &mut self.status { + WriteRead(w) => { + self.entry_position.flush()?; + w.flush()?; + self.status = ReadOnly; + } + ReadOnly => {} + } + Ok(()) + } + + /// 返回当前segment第一个index值 + pub fn first_index(&self) -> u64 { + if self.is_empty() { + 0 + } else { + *self.header.first_index() + } + } + + /// segment id + pub fn id(&self) -> u32 { + *self.header.id() + } + + /// 返回当前segment最后一个index值 + pub fn last_index(&self) -> u64 { + if self.is_empty() { + *self.header.first_index() - 1 + } else { + *self.header.first_index() + self.entry_position.get_entry_count() as u64 - 1 + } + } + + /// 返回当前segment下一个index值 + pub fn next_index(&self) -> u64 { + if self.is_empty() { + *self.header.first_index() + } else { + self.last_index() + 1 + } + } + + /// 验证当前segment是否包含index + pub fn contain_index(&self, index: u64) -> bool { + if self.is_empty() { + false + } else { + index >= self.first_index() && index <= self.last_index() + } + } + + /// 当前segment是否为空 + pub fn is_empty(&self) -> bool { + self.entry_position.get_entry_count() == 0 + } + + /// 当前segment字节大小 + pub fn current_segment_size(&self) -> u64 { + self.segment_file.size() + } + + /// append entry(非线程安全,只能单线程写) + ///

+ /// 每个Entry块包含如下内容(字节大小 = 8 + 8 + 8 + 4 + {RelayLogSize}): + /// + /// ```txt + /// index: 索引id, 8字节 + /// offset: 日志内容偏移量, 8字节 + /// size: 日志内容大小, 8字节 + /// checksum: 日志内容校验值, 4字节 + /// relay_log: 日志内容, 动态大小 + /// ``` + pub fn append(&mut self, entry: &mut StorageEntry) -> CResult<()> { + match &mut self.status { + WriteRead(w) => { + // log serialize + let log_bytes = self.codec.binary_serialize(&self.codec_style, entry.relay_log())?; + + let start_position = self.segment_file.size(); + let mut entry_size = 0; + + // index + let index = *entry.index(); + w.write_u64::(*entry.index())?; + entry_size += 8; + + // log size + let log_size = log_bytes.len() as u64; + entry.set_log_size(log_size); + w.write_u64::(log_size)?; + entry_size += 8; + + // checksum + let checksum = Self::checksum(&log_bytes); + entry.set_checksum(checksum); + w.write_u32::(checksum)?; + entry_size += 4; + + // log bytes + w.write_all(&log_bytes)?; + entry_size += log_size; + + if self.is_empty() { + self.entry_position.add_position(0, start_position)?; + } else { + let offset = index - self.first_index(); + self.entry_position.add_position(offset as usize, start_position)?; + } + + self.segment_file.add_entry_size(entry_size); + Ok(()) + } + ReadOnly => { + Err(ReError::String("segment read only.".to_string())) + } + } + } + + /// read entry by index + pub fn get_entry(&mut self, index: u64) -> CResult { + if self.is_empty() { + return Err(ReError::String("segment is empty.".to_string())); + } + if index < self.first_index() { + return Err(ReError::String("index less than segment first index.".to_string())); + } + if index > self.last_index() { + return Err(ReError::String("index greater than segment last index.".to_string())); + } + + let offset = index - self.first_index(); + let entry_position = self.entry_position.get_position(offset as usize); + + let r = Arc::clone(&self.reader); + let mut reader = r.lock().or_else(|e| { + error!("segment read lock err: {:?}", &e); + Err(ReError::Error(e.to_string())) + })?; + + reader.seek(SeekFrom::Start(entry_position))?; + + let idx = reader.read_u64::()?; + let log_size = reader.read_u64::()?; + let checksum = reader.read_u32::()?; + + let mut log_bytes: Vec = vec![0; log_size as usize]; + reader.read_exact(&mut log_bytes)?; + + // 校验crc32值 + if checksum != Self::checksum(&log_bytes) { + return Err(ReError::Error("log checksum err.".to_string())); + } + + let relay_log = self.codec.binary_deserialize::(&self.codec_style, &log_bytes)?; + Ok(StorageEntry::new(idx, log_size, checksum, relay_log)) + } + + /// 是否可写 + pub fn is_writable(&self) -> bool { + match self.status { + WriteRead(_) => { + true + } + ReadOnly => { + false + } + } + } + + /// segment is full + pub fn is_full(&self) -> bool { + let max_size = *self.header.max_segment_size(); + let max_entries = *self.header.max_entries(); + let current_size = self.segment_file.size(); + let current_entries = self.entry_position.get_entry_count(); + if current_entries >= max_entries || current_size >= max_size { + true + } else { + false + } + } + + /// flush the segment writer buf to disk + pub fn write_flush(&mut self) -> CResult<()> { + match &mut self.status { + WriteRead(w) => { + self.entry_position.flush()?; + w.flush()?; + } + ReadOnly => {} + } + Ok(()) + } + + /// 删除segment文件 + pub fn delete(&self) -> CResult<()> { + let file_path = self.segment_file.path(); + warn!("===删除segment文件: {:?}", file_path); + Ok(fs::remove_file(file_path)?) + } +} + +impl Debug for Segment { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Segment") + .field("segment_file", &self.segment_file.name()) + .field("segment_id", &self.header.id()) + .field("first_index", &self.header.first_index()) + .field("entry_count", &self.entry_position.get_entry_count()) + .field("status", &self.status.name()) + .finish() + } +} diff --git a/relay_log/src/storage/segment_entry_position.rs b/relay_log/src/storage/segment_entry_position.rs new file mode 100644 index 0000000000000000000000000000000000000000..b97341f04b9ed1d059f84174c14e4f26b688cb8c --- /dev/null +++ b/relay_log/src/storage/segment_entry_position.rs @@ -0,0 +1,158 @@ +use std::fmt::{Debug, Formatter}; +use std::fs::OpenOptions; +use std::io::{Cursor, Write}; + +use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; +use memmap2::{MmapMut, MmapOptions}; +use tracing::error; + +use common::err::CResult; +use common::file_util; + +use crate::storage::file_system::FileSystem; +use crate::storage::storage_config::SEGMENT_HEADER_SIZE_BYTES; + +/// entry位置信息. +/// +/// 固定字节大小 = 4 + max_entries * 8 +pub struct SegmentEntryPosition { + // 当前Segment中Entry数量 + entry_count: u32, + // 当前Segment中Entry的位置信息 + position_info: Vec, + // 文件缓存 + file_buffer: MmapMut, +} + +impl SegmentEntryPosition { + /// 初始化 + pub fn new(file_path: &str, max_entries: u32) -> CResult { + let entry_count: u32 = 0; + let mut position_info: Vec = vec![0; max_entries as usize]; + + let len = 4 + max_entries * 8; + let bytes_buffer: Vec = vec![0; len as usize]; + + let start_offset = SEGMENT_HEADER_SIZE_BYTES as u64; + // 文件初始化 + file_util::update_file_bytes(file_path, start_offset, &bytes_buffer)?; + + // 打开文件内存映射 + let mmap = Self::open_file_mem_map(file_path, start_offset, len as usize)?; + + Ok(Self { + entry_count, + position_info, + file_buffer: mmap, + }) + } + + /// 打开文件内存映射(可读可写) + fn open_file_mem_map(file_path: &str, start_offset: u64, len: usize) -> CResult { + let file = OpenOptions::new() + .read(true) + .write(true) + .append(false) + .open(file_path)?; + return unsafe { + Ok(MmapOptions::new().offset(start_offset).len(len).map_mut(&file)?) + }; + } + + /// 添加entry位置信息 + pub fn add_position(&mut self, offset: usize, position: u64) -> CResult<()> { + match self.update_file_position_info(offset, position) { + Ok(_) => { + self.entry_count += 1; + self.position_info[offset] = position; + match self.update_file_entry_count() { + Ok(_) => { + Ok(()) + } + Err(e) => { + self.entry_count -= 1; + self.position_info[offset] = 0; + + error!("update segment file err: {:?}", e); + Err(e) + } + } + } + Err(e) => { + Err(e) + } + } + } + + /// 更新文件中entry数量统计 + fn update_file_entry_count(&mut self) -> CResult<()> { + (&mut self.file_buffer[0..4]).write_u32::(self.entry_count)?; + Ok(()) + } + + /// 更新文件中entry位置信息 + fn update_file_position_info(&mut self, offset: usize, position: u64) -> CResult<()> { + let pos_start = 4 + (offset * 8); + let pos_end = pos_start + 8; + (&mut self.file_buffer[pos_start..pos_end]).write_u64::(position)?; + Ok(()) + } + + /// 返回entry位置信息(内存) + pub fn get_position(&self, offset: usize) -> u64 { + self.position_info[offset] + } + + /// 返回entry位置信息(磁盘) + pub fn get_position_disk(&self, offset: usize) -> CResult { + let pos_start = 4 + (offset * 8); + let pos_end = pos_start + 8; + let pos = (&self.file_buffer[pos_start..pos_end]).read_u64::()?; + Ok(pos) + } + + /// 返回entry数量(内存) + pub fn get_entry_count(&self) -> u32 { + self.entry_count + } + + /// 返回entry数量(磁盘) + pub fn get_entry_count_disk(&self) -> CResult { + let count = (&self.file_buffer[0..4]).read_u32::()?; + Ok(count) + } +} + +impl FileSystem for SegmentEntryPosition { + fn from_file(file_path: &str, start_offset: u64, len: usize) -> CResult { + let mmap = Self::open_file_mem_map(file_path, start_offset, len)?; + + let bytes_buffer = &mmap[0..]; + let mut cusor = Cursor::new(bytes_buffer); + + cusor.set_position(0); + let entry_count = cusor.read_u32::()?; + + cusor.set_position(4); + let max_entries = (len - 4) / 8; + let mut position_info: Vec = vec![0; max_entries]; + cusor.read_u64_into::(&mut position_info)?; + Ok(Self { + entry_count, + position_info, + file_buffer: mmap, + }) + } + + fn flush(&self) -> CResult<()> { + Ok(self.file_buffer.flush()?) + } +} + +impl Debug for SegmentEntryPosition { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SegmentEntryPosition") + .field("entry_count", &self.entry_count) + .finish() + } +} \ No newline at end of file diff --git a/relay_log/src/storage/segment_file.rs b/relay_log/src/storage/segment_file.rs new file mode 100644 index 0000000000000000000000000000000000000000..cc458530ef610aacb618cd4b3807463ee9c3ce1d --- /dev/null +++ b/relay_log/src/storage/segment_file.rs @@ -0,0 +1,101 @@ +use std::path::Path; + +use getset::{Getters, Setters}; + +use common::err::CResult; +use common::err::decode_error::ReError; + +use crate::storage::storage_config::SEGMENT_FILE_PRE; + +#[derive(Debug, Clone, Getters, Setters)] +pub struct SegmentFile { + // 文件的绝对路径: /x/x/x/x/rlog-{version}-{id}-{index}.log + #[getset(get = "pub")] + path: String, + // 文件名: rlog-{version}-{id}-{index}.log + #[getset(get = "pub")] + name: String, + // 文件大小 + size: u64, +} + +impl SegmentFile { + pub fn new(path: String, name: String, size: u64) -> Self { + Self { + path, + name, + size, + } + } + + /// /a/s/rlog-{version}-{id}-{index}.log + pub fn from_path(file_path: &str) -> CResult { + let path = file_path.to_string(); + let p = Path::new(file_path); + let size = p.metadata()?.len(); + let os_name = p.file_name().ok_or(ReError::String("segment file not exists.".to_string()))?; + let name = os_name.to_str().ok_or(ReError::String("segment file not exists.".to_string()))?.to_string(); + Ok(Self { + path, + name, + size, + }) + } + + + /// 判断文件: rlog-{version}-{id}-{index}.log + pub fn is_segment_file(file_name: &str) -> CResult { + if !file_name.ends_with(".log") { + return Ok(false); + } + if !file_name.starts_with(SEGMENT_FILE_PRE) { + return Ok(false); + } + let parse = Self::segment_file_name_split(file_name)?; + if parse.0 == 0 || parse.1 == 0 || parse.2 == 0 { + return Ok(false); + } + Ok(true) + } + + /// current version + pub fn version(&self) -> CResult { + Ok(Self::segment_file_name_split(self.name())?.0) + } + + /// segment id + pub fn segment_id(&self) -> CResult { + Ok(Self::segment_file_name_split(self.name())?.1) + } + + /// segment file first index + pub fn index(&self) -> CResult { + Ok(Self::segment_file_name_split(self.name())?.2) + } + + /// segment文件名解析:`[{version},{id},{index}]` + fn segment_file_name_split(file_name: &str) -> CResult<(u32, u32, u64)> { + let s: Vec<&str> = file_name.split(".").collect(); + let name_split: Vec<&str> = s[0].split("-").collect(); + let version = name_split[1].parse::()?; + let segment_id = name_split[2].parse::()?; + let first_index = name_split[3].parse::()?; + Ok((version, segment_id, first_index)) + } + + /// 返回当前segment大小 + pub fn size(&self) -> u64 { + self.size + } + + /// segment文件大小 + pub fn size_file(&self) -> CResult { + let p = Path::new(self.path()); + Ok(p.metadata()?.len()) + } + + /// 增加entry大小 + pub fn add_entry_size(&mut self, entry_size: u64) { + self.size += entry_size; + } +} \ No newline at end of file diff --git a/relay_log/src/storage/segment_header.rs b/relay_log/src/storage/segment_header.rs new file mode 100644 index 0000000000000000000000000000000000000000..ed7d5fae59643c0f2d062e6597456366f59cd414 --- /dev/null +++ b/relay_log/src/storage/segment_header.rs @@ -0,0 +1,102 @@ +use std::io::Cursor; + +use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; +use getset::{Getters, Setters}; + +use common::err::CResult; +use common::file_util; + +use crate::storage::file_system::FileSystem; +use crate::storage::storage_config::{SEGMENT_HEADER_SIZE_BYTES, VERSION}; + +/// segment文件头(不可更改,总大小:64byte). +/// ```txt +/// 4字节:segmentId, +/// 4字节:version, +/// 8字节:第一个entry的index值, +/// 8字节:segment最大容量, +/// 4字节:segment最多存Entry数量 +/// 36字节预留空间(用于后续扩展...) +/// ``` +#[derive(Debug, Getters, Setters)] +pub(crate) struct SegmentHeader { + // segmentId + #[getset(get = "pub")] + id: u32, + + // version + #[getset(get = "pub")] + version: u32, + + // 第一个entry的index值 + #[getset(get = "pub")] + first_index: u64, + + // segment最大容量(单位:byte) + #[getset(get = "pub")] + max_segment_size: u64, + + // segment最大存Entry数量 + #[getset(get = "pub")] + max_entries: u32, +} + +impl SegmentHeader { + /// 初始化文件头 + pub fn new(file_path: &str, + id: u32, + first_index: u64, + max_segment_size: u64, + max_entries: u32) -> CResult { + let mut bytes_buffer: [u8; SEGMENT_HEADER_SIZE_BYTES] = [0; SEGMENT_HEADER_SIZE_BYTES]; + let mut c = Cursor::new(&mut bytes_buffer[0..]); + c.write_u32::(id)?; + c.write_u32::(VERSION)?; + c.write_u64::(first_index)?; + c.write_u64::(max_segment_size)?; + c.write_u32::(max_entries)?; + // 初始化 + file_util::update_file_bytes(file_path, 0, &bytes_buffer)?; + Ok(Self { + id, + version: VERSION, + first_index, + max_segment_size, + max_entries, + }) + } +} + +impl FileSystem for SegmentHeader { + fn from_file(file_path: &str, _start_offset: u64, _len: usize) -> CResult { + let file_buffer = file_util::read_file_bytes(file_path, 0, SEGMENT_HEADER_SIZE_BYTES)?; + + let mut cursor = Cursor::new(file_buffer); + cursor.set_position(0); + let id = cursor.read_u32::()?; + + cursor.set_position(4); + let version = cursor.read_u32::()?; + + cursor.set_position(8); + let first_index = cursor.read_u64::()?; + + cursor.set_position(16); + let max_segment_size = cursor.read_u64::()?; + + cursor.set_position(24); + let max_entries = cursor.read_u32::()?; + + Ok(Self { + id, + version, + first_index, + max_segment_size, + max_entries, + }) + } + + fn flush(&self) -> CResult<()> { + Ok(()) + } +} \ No newline at end of file diff --git a/relay_log/src/storage/segment_manager.rs b/relay_log/src/storage/segment_manager.rs new file mode 100644 index 0000000000000000000000000000000000000000..6f5012c97310bd108cd552e241691f4652529bcf --- /dev/null +++ b/relay_log/src/storage/segment_manager.rs @@ -0,0 +1,173 @@ +use std::cell::RefCell; +use std::collections::BTreeMap; +use std::ffi::OsString; +use std::fmt::{Debug, Display, Formatter}; +use std::fs; +use std::path::PathBuf; +use std::rc::Rc; + +use tracing::info; +use tracing_subscriber::fmt::format; + +use common::err::CResult; +use common::err::decode_error::ReError; + +use crate::storage::segment::Segment; +use crate::storage::segment_file::SegmentFile; +use crate::storage::storage_config::StorageConfig; + +/// 目标表segment文件管理. +/// +pub struct SegmentManager { + // segment列表 + segments: BTreeMap>>, + // 当前活跃的segment + current_segment: Rc>, + // segment文件夹 + segment_dir: String, + // 单个segment最大值 + max_segment_size: u64, + // 单个segment最多entry数 + max_segment_entries: u32, +} + +impl SegmentManager { + /// 初始化一个segment管理器 + pub fn new(storage_config: &StorageConfig, dst_db_name: &str, dst_table_name: &str) -> CResult { + let segment_dir = Self::get_segment_dir_path(storage_config.relay_log_dir(), dst_db_name, dst_table_name)?; + let max_segment_size = *storage_config.max_segment_size(); + let max_segment_entries = *storage_config.max_segment_entries(); + // 加载已有的segment文件 + let mut segments = Self::load_segment(segment_dir.as_str())?; + info!("load segments: {:?}", &segments); + if segments.is_empty() { + // 实例化第一个segment + let mut segment = Segment::new(&segment_dir, + 1, + 1, + max_segment_size, + max_segment_entries)?; + segment.write_open()?; + let index = segment.first_index(); + let current_segment = Rc::new(RefCell::new(segment)); + segments.insert(index, Rc::clone(¤t_segment)); + Ok(Self { + current_segment, + segments, + segment_dir, + max_segment_size, + max_segment_entries, + }) + } else { + let current_segment = Rc::clone(segments.last_entry().ok_or(ReError::Error("get last segment err.".to_string()))?.get()); + Ok(Self { + current_segment, + segments, + segment_dir, + max_segment_size, + max_segment_entries, + }) + } + } + + /// 加载目标表所有segment文件 + fn load_segment(segment_dir: &str) -> CResult>>> { + info!("++++start load segments: {:?}", segment_dir); + let path = PathBuf::from(segment_dir); + if !path.exists() { + fs::create_dir(path.as_path())?; + } + let mut segments: BTreeMap>> = BTreeMap::new(); + let files = path.read_dir()?; + for file in files { + if let Ok(f) = file { + let file_path = PathBuf::from(f.path()); + if file_path.is_file() { + if let Some(segment_file_name) = file_path.file_name().ok_or(ReError::String("".to_string()))?.to_str() { + if SegmentFile::is_segment_file(segment_file_name)? { + // segment文件全路径 + let segment_file_path = file_path.to_str().ok_or(ReError::String("".to_string()))?; + if let Ok(mut segment) = Segment::from_file(segment_file_path) { + if !segment.is_full() { + segment.write_open()?; + } + // todo 校验segment合法性? + segments.insert(segment.first_index(), Rc::new(RefCell::new(segment))); + } + } + } + } + } + } + Ok(segments) + } + + /// 获取目标表的日志文件夹路径 + pub fn get_segment_dir_path(relay_log_dir: &str, dst_db_name: &str, dst_table_name: &str) -> CResult { + let log_dir_name = format!("{}#{}", dst_db_name, dst_table_name); + let path = PathBuf::from(relay_log_dir).join(&log_dir_name); + Ok(path.to_str().ok_or(ReError::String("".to_string()))?.to_string()) + } + + /// 当前segment + pub fn current_segment(&self) -> Rc> { + Rc::clone(&self.current_segment) + } + + /// 最后一个segment + pub fn last_segment(&mut self) -> CResult>> { + Ok(Rc::clone(self.segments.last_entry().ok_or(ReError::Error("get last segment err.".to_string()))?.get())) + } + + /// 第一个segment + pub fn first_segment(&mut self) -> CResult>> { + Ok(Rc::clone(self.segments.first_entry().ok_or(ReError::Error("get last segment err.".to_string()))?.get())) + } + + /// 返回index所在的segment + pub fn segment(&mut self, index: u64) -> CResult>> { + if self.current_segment.borrow().contain_index(index) { + Ok(Rc::clone(&self.current_segment)) + } else { + for (i, s) in &self.segments { + if s.borrow().contain_index(index) { + return Ok(Rc::clone(s)); + } + } + Err(ReError::Error(format!("unknown index: {}.", index))) + } + } + + /// 创建下一个segment + pub fn create_next_segment(&mut self) -> CResult>> { + let last_segment = self.last_segment()?; + if last_segment.borrow().is_writable() { + // 关闭最后一个segment可写模式 + last_segment.borrow_mut().write_close()?; + } + let next_segment_id = last_segment.borrow().id() + 1; + let next_segment_first_index = last_segment.borrow().last_index() + 1; + let mut next_segment = Segment::new(&self.segment_dir, + next_segment_id, + next_segment_first_index, + self.max_segment_size, + self.max_segment_entries)?; + next_segment.write_open()?; + self.current_segment = Rc::new(RefCell::new(next_segment)); + let r = self.segments.insert(next_segment_first_index, Rc::clone(&self.current_segment)); + if let Some(old) = r { + drop(old); + } + Ok(Rc::clone(&self.current_segment)) + } +} + +impl Debug for SegmentManager { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SegmentManager") + .field("segment_num", &self.segments.len()) + .field("segments", &self.segments) + .field("current_segment", &self.current_segment) + .finish() + } +} \ No newline at end of file diff --git a/relay_log/src/storage/storage_config.rs b/relay_log/src/storage/storage_config.rs new file mode 100644 index 0000000000000000000000000000000000000000..03341abffc8c91fcbe0e6bf0e51110d80d409240 --- /dev/null +++ b/relay_log/src/storage/storage_config.rs @@ -0,0 +1,53 @@ +use getset::{Getters, Setters}; + +/// 版本号 +pub(crate) const VERSION: u32 = 1; +/// segment文件前缀 +pub(crate) const SEGMENT_FILE_PRE: &str = "rlog"; +/// segment文件头大小 +pub(crate) const SEGMENT_HEADER_SIZE_BYTES: usize = 64; + +/// 存储可配项 +#[derive(Debug, Clone, Getters, Setters)] +pub struct StorageConfig { + // 中继日志存储路径 + #[getset(get = "pub", set = "pub")] + relay_log_dir: String, + + // 每个segment最大值 + #[getset(get = "pub", set = "pub")] + max_segment_size: u64, + + // 每个segment最多存实体数量 + #[getset(get = "pub", set = "pub")] + max_segment_entries: u32, + + // segment file buffer size + #[getset(get = "pub", set = "pub")] + entry_buffer_num: usize, + + // 是否flush + #[getset(get = "pub", set = "pub")] + flush_on_commit: bool, + + // 日志整理周期 + #[getset(get = "pub", set = "pub")] + compact_interval_millisecond: u64, +} + +impl Default for StorageConfig { + fn default() -> Self { + Self { + relay_log_dir: "".to_string(), + // 10M + max_segment_size: 10 * 1024 * 1024, + // + max_segment_entries: 100, + // 1k个 + entry_buffer_num: 1024, + flush_on_commit: false, + // 5min + compact_interval_millisecond: 5 * 60 * 1000, + } + } +} \ No newline at end of file diff --git a/relay_log/src/storage/storage_entry.rs b/relay_log/src/storage/storage_entry.rs new file mode 100644 index 0000000000000000000000000000000000000000..cb58a98db39a408d72795b73602184bb39631322 --- /dev/null +++ b/relay_log/src/storage/storage_entry.rs @@ -0,0 +1,46 @@ +use getset::{Getters, Setters}; + +use crate::relay_log::RelayLog; + +/// 日志存储块. +/// ======================================== +/// 字节大小 = 8 + 8 + 4 + {RelayLogSize}. +/// ```txt +/// index: 索引id, 8字节 +/// log_size: 日志内容大小, 8字节 +/// checksum: 日志内容校验值, 4字节 +/// relay_log: 日志内容, 动态大小 +/// ``` +/// ========================================= +#[derive(Debug, Clone, Getters, Setters)] +pub struct StorageEntry { + // entry索引id + #[getset(get = "pub")] + index: u64, + + // 日志内容大小 + #[getset(get = "pub", set = "pub")] + log_size: u64, + + // 日志内容校验值 + #[getset(get = "pub", set = "pub")] + checksum: u32, + + // 中继日志实体 + #[getset(get = "pub")] + relay_log: RelayLog, +} + +impl StorageEntry { + pub fn new(index: u64, + log_size: u64, + checksum: u32, + relay_log: RelayLog) -> Self { + Self { + index, + log_size, + checksum, + relay_log + } + } +} \ No newline at end of file diff --git a/relay_log/src/storage/storage_event.rs b/relay_log/src/storage/storage_event.rs new file mode 100644 index 0000000000000000000000000000000000000000..e2a9fd5dae97182a67783268911fff1e11544a81 --- /dev/null +++ b/relay_log/src/storage/storage_event.rs @@ -0,0 +1,5 @@ +/// todo 存储事件 +pub(crate) struct StorageEvent { + segment_id: u32, + index: u64, +} \ No newline at end of file diff --git a/tests/Cargo.toml b/tests/Cargo.toml index c9341eb829ad841f70212b8210b310ba54b42f04..573337f86b36247c80a1d542b04b29e981fce4ae 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -11,6 +11,7 @@ publish = { workspace = true } [dependencies] common = { workspace = true } binlog = { workspace = true } +relay_log = { workspace = true } tokio = { workspace = true } async-trait ={ workspace = true } diff --git a/tests/src/lib.rs b/tests/src/lib.rs index d0e2b2410bc21bee51999b9e1e2fc35842920c92..35471f9ad3e895b7fbbae23329aa63fcddcaf3bc 100644 --- a/tests/src/lib.rs +++ b/tests/src/lib.rs @@ -4,4 +4,4 @@ #![feature(exact_size_is_empty)] mod binlog; -// mod relay_log; \ No newline at end of file +mod relay_log; \ No newline at end of file diff --git a/tests/src/relay_log/codec/mod.rs b/tests/src/relay_log/codec/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..2f5cdfda551ce5edbfaaa31537d261caa4e03f5c --- /dev/null +++ b/tests/src/relay_log/codec/mod.rs @@ -0,0 +1,2 @@ +#[cfg(test)] +mod test_relay_log_codec; \ No newline at end of file diff --git a/tests/src/relay_log/codec/test_relay_log_codec.rs b/tests/src/relay_log/codec/test_relay_log_codec.rs new file mode 100644 index 0000000000000000000000000000000000000000..a73ecb3f89de9c5bd73b82fa40f990383dda3905 --- /dev/null +++ b/tests/src/relay_log/codec/test_relay_log_codec.rs @@ -0,0 +1,25 @@ +use tracing::info; +use common::log::tracing_factory::TracingFactory; +use relay_log::codec::binary_codec::BinaryCodec; +use relay_log::codec::binary_codec::CodecStyle::LittleVar; +use relay_log::codec::codec::Codec; +use relay_log::relay_log::RelayLog; + +#[test] +fn test_binary_codec() { + TracingFactory::init_log(true); + + let mut s = RelayLog::default(); + s.set_database_name("db1".to_string()); + s.set_table_name("t2".to_string()); + s.set_event_log_pos(10); + s.set_event_name("binlog".to_string()); + + let codec = BinaryCodec::new(); + + let bytes = codec.binary_serialize(&LittleVar, &s).unwrap(); + info!("序列化:{}-{:?}", bytes.len(), bytes); + + let s2 = codec.binary_deserialize::(&LittleVar, &bytes).unwrap(); + info!("反序列化:{:?}", s2); +} \ No newline at end of file diff --git a/tests/src/relay_log/mod.rs b/tests/src/relay_log/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..13c02f5caa1bb0587858e8ebfa528c33a32f3c1a --- /dev/null +++ b/tests/src/relay_log/mod.rs @@ -0,0 +1,9 @@ +#[cfg(test)] +mod test_relay_log; +#[cfg(test)] +mod test_relay_log_server; +#[cfg(test)] +mod test_relay_log_server_machine; + +mod storage; +mod codec; \ No newline at end of file diff --git a/tests/src/relay_log/storage/mod.rs b/tests/src/relay_log/storage/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..cd153ed22b1e09a6eb70c08a76a05725ae0873f1 --- /dev/null +++ b/tests/src/relay_log/storage/mod.rs @@ -0,0 +1,2 @@ +#[cfg(test)] +mod test_relay_log_storage; \ No newline at end of file diff --git a/tests/src/relay_log/storage/test_relay_log_storage.rs b/tests/src/relay_log/storage/test_relay_log_storage.rs new file mode 100644 index 0000000000000000000000000000000000000000..0c7ca1b207a48667ac30435a1cff8e902d8e6551 --- /dev/null +++ b/tests/src/relay_log/storage/test_relay_log_storage.rs @@ -0,0 +1,61 @@ +use tracing::info; +use common::log::tracing_factory::TracingFactory; +use relay_log::relay_log::RelayLog; +use relay_log::storage::relay_log_storage::RelayLogStorage; +use relay_log::storage::segment_file::SegmentFile; +use relay_log::storage::segment_manager::SegmentManager; +use relay_log::storage::storage_config::StorageConfig; + +#[test] +pub fn test_segment_file() { + TracingFactory::init_log(true); + let segment_file = SegmentFile::from_path("/Users/zhangtao/tmp/db1#t1/rlog-1-1-1.log").unwrap(); + info!("is_segment_file: {:?}", SegmentFile::is_segment_file("rlog-1-1-1.log")); + info!("version: {:?}", segment_file.version()); + info!("segment_id: {:?}", segment_file.segment_id()); + info!("index: {:?}", segment_file.index()); +} + +#[test] +pub fn test_segment_manager() { + TracingFactory::init_log(true); + let mut storage_config = StorageConfig::default(); + storage_config.set_relay_log_dir("/Users/zhangtao/tmp".to_string()); + let segment_manager = SegmentManager::new(&storage_config, "db1", "t1").unwrap(); + info!("{:?}", segment_manager); +} + +#[test] +pub fn test_log_storage_append() { + TracingFactory::init_log(true); + let mut storage_config = StorageConfig::default(); + storage_config.set_relay_log_dir("/Users/zhangtao/tmp".to_string()); + let db_name = "db1"; + let tb_name = "t1"; + let mut log_storage = RelayLogStorage::new(&storage_config, db_name.to_string(), tb_name.to_string()).unwrap(); + for i in 0..203 { + let mut relay_log = RelayLog::default(); + relay_log.set_database_name(db_name.to_string()); + relay_log.set_table_name(tb_name.to_string()); + relay_log.set_event_log_pos(i as u64); + relay_log.set_event_name(format!("binlog_{}", i)); + + log_storage.append_relay_log(relay_log).unwrap(); + } + + info!("{:?}", &log_storage.segment_manager); +} + +#[test] +pub fn test_log_storage_read() { + TracingFactory::init_log(true); + let mut storage_config = StorageConfig::default(); + storage_config.set_relay_log_dir("/Users/zhangtao/tmp".to_string()); + let db_name = "db1"; + let tb_name = "t1"; + let mut log_storage = RelayLogStorage::new(&storage_config, db_name.to_string(), tb_name.to_string()).unwrap(); + + + let entry = log_storage.get_entry(202).unwrap(); + info!("{:?}", *entry); +} \ No newline at end of file diff --git a/tests/src/relay_log/test_relay_log.rs b/tests/src/relay_log/test_relay_log.rs new file mode 100644 index 0000000000000000000000000000000000000000..4e01053effacd1f39699e9ac92e1e1c12c60d2ad --- /dev/null +++ b/tests/src/relay_log/test_relay_log.rs @@ -0,0 +1,25 @@ +use tracing::info; +use binlog::events::binlog_event::BinlogEvent; +use binlog::factory::event_factory::{EventFactory, EventReaderOption, IEventFactory}; +use common::log::tracing_factory::TracingFactory; +use relay_log::relay_log::RelayLog; + + +pub fn get_table_map_event_write_rows_log_event() -> Vec { + let input = include_bytes!("../../events/8.0/19_30_Table_map_event_Write_rows_log_event/binlog.000018"); + let mut factory = EventFactory::new(false); + let (_, output) = factory.parser_bytes(input, &EventReaderOption::default()).unwrap(); + output +} + +#[test] +pub fn test_binlog_event_to_relay_entity() { + TracingFactory::init_log(true); + let local_events = get_table_map_event_write_rows_log_event(); + let relay_entities: Vec = local_events.iter().map(|e| { + RelayLog::from_binlog_event(e) + }).collect(); + + info!("local_events: {:?}", local_events); + info!("relay_entities: {:?}", relay_entities); +} \ No newline at end of file diff --git a/tests/src/relay_log/test_relay_log_server.rs b/tests/src/relay_log/test_relay_log_server.rs new file mode 100644 index 0000000000000000000000000000000000000000..444c3b9f995bad7a88459a9dc6ed6cc35dbf2353 --- /dev/null +++ b/tests/src/relay_log/test_relay_log_server.rs @@ -0,0 +1,46 @@ +use std::time::Duration; + +use tokio::join; +use tokio::sync::mpsc; +use tracing::{info, warn}; + +use binlog::events::binlog_event::BinlogEvent; +use common::err::CResult; +use common::log::tracing_factory::TracingFactory; +use relay_log::relay_log_server::RelayLogServer; + +use crate::relay_log::test_relay_log; + +#[tokio::test] +pub async fn test_server() -> CResult<()>{ + TracingFactory::init_log(true); + + let events = test_relay_log::get_table_map_event_write_rows_log_event(); + + // 多发送端,单一接收端通道 + let (tx, rx) = mpsc::channel::(10); + + let server = RelayLogServer::new_with_binlog_receiver(rx)?; + + info!("{:?}", server); + + let send_task = tokio::spawn(async move { + for e in events.into_iter() { + tokio::time::sleep(Duration::from_millis(500)).await; + if tx.send(e).await.is_err() { + warn!("receiver closed"); + break; + } + } + }); + + info!("before relay log server state: {}", server.is_running().unwrap()); + + join!(send_task); + + info!("after relay log server state: {}", server.is_running().unwrap()); + + tokio::time::sleep(Duration::from_secs(10)).await; + + Ok(()) +} \ No newline at end of file diff --git a/tests/src/relay_log/test_relay_log_server_machine.rs b/tests/src/relay_log/test_relay_log_server_machine.rs new file mode 100644 index 0000000000000000000000000000000000000000..12c9e41c5230c65d8240a8ab89fa4043e4048929 --- /dev/null +++ b/tests/src/relay_log/test_relay_log_server_machine.rs @@ -0,0 +1,38 @@ +use tokio::join; +use tracing::info; + +use common::log::tracing_factory::TracingFactory; +use common::schema::rc_task::RcTask; +use relay_log::relay_log_server_machine::RelayLogServerMachine; + +#[tokio::test] +pub async fn test_relay_log_server_machine() { + TracingFactory::init_log(true); + + let h1 = tokio::task::spawn_blocking(|| { + add_task("1") + }); + + let h2 = tokio::task::spawn_blocking(|| { + add_task("2") + }); + + let h3 = tokio::task::spawn_blocking(|| { + add_task("3") + }); + + join!(h1, h2, h3); + + info!("{:?}", RelayLogServerMachine::get_instance()); +} + +fn add_task(id: &str) { + let task = RcTask { + task_id: id.to_string(), + task_name: "".to_string(), + src_info: vec![], + dst_db_name: "".to_string(), + dst_table_name: "".to_string(), + }; + RelayLogServerMachine::get_instance().add_task(task).unwrap(); +} \ No newline at end of file