diff --git a/Cargo.toml b/Cargo.toml
index 471ffe19723f7a4bbe0a7eed0671925dcee6f348..cde69204f8f3b2caf31f6e8223f3fc516e5c8434 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -9,6 +9,7 @@ members = [
"./connection",
"./binlog_cli",
"./tests",
+ "./relay_log",
]
[workspace.package]
@@ -29,6 +30,7 @@ memory = { path = "memory", version = "0.0.2" }
common = { path = "common", version = "0.0.2" }
binlog = { path = "binlog", version = "0.0.2" }
binlog_cli = { path = "binlog_cli", version = "0.0.2" }
+relay_log = { path = "relay_log", version = "0.0.2" }
tokio = {version = "1.32.0", features = ["full"]}
async-trait = "0.1.73"
diff --git a/relay_log/Cargo.toml b/relay_log/Cargo.toml
new file mode 100644
index 0000000000000000000000000000000000000000..e23d34f7dce51bb47ee77c964b24d1b963c6d7dd
--- /dev/null
+++ b/relay_log/Cargo.toml
@@ -0,0 +1,33 @@
+[package]
+name = "relay_log"
+version = { workspace = true }
+authors = { workspace = true }
+edition = { workspace = true }
+license = { workspace = true }
+publish = { workspace = true }
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+common = { workspace = true }
+binlog = { workspace = true }
+
+serde = { workspace = true }
+flatbuffers = { workspace = true }
+tokio = { workspace = true }
+async-trait ={ workspace = true }
+lazy_static = { workspace = true }
+tracing = { workspace = true }
+tracing-subscriber = { workspace = true }
+num_enum = { workspace = true }
+byteorder = { workspace = true }
+dashmap = { workspace = true }
+ringbuffer = { workspace = true }
+pin-utils = { workspace = true }
+serde_json = { workspace = true }
+bincode = { workspace = true }
+chrono = { workspace = true }
+bytes = { workspace = true }
+getset = { workspace = true }
+memmap2 = { workspace = true }
+checksum = { workspace = true }
\ No newline at end of file
diff --git a/relay_log/README.md b/relay_log/README.md
new file mode 100644
index 0000000000000000000000000000000000000000..e42dc95105f77bbcb8206d123f6edd6e6769519e
--- /dev/null
+++ b/relay_log/README.md
@@ -0,0 +1 @@
+# relay_log 中继日志模块
diff --git a/relay_log/src/apply/mod.rs b/relay_log/src/apply/mod.rs
new file mode 100644
index 0000000000000000000000000000000000000000..9bbcf46b67cefe7b15fa4b1edcf803ae6e07630f
--- /dev/null
+++ b/relay_log/src/apply/mod.rs
@@ -0,0 +1,2 @@
+pub mod relay_log_apply;
+pub mod relay_log_apply_snapshot;
\ No newline at end of file
diff --git a/relay_log/src/apply/relay_log_apply.rs b/relay_log/src/apply/relay_log_apply.rs
new file mode 100644
index 0000000000000000000000000000000000000000..4626a4690a7366ddcf499f6aa177c67da15cae0a
--- /dev/null
+++ b/relay_log/src/apply/relay_log_apply.rs
@@ -0,0 +1,5 @@
+
+/// todo
+pub struct RelayLogApply {
+
+}
\ No newline at end of file
diff --git a/relay_log/src/apply/relay_log_apply_snapshot.rs b/relay_log/src/apply/relay_log_apply_snapshot.rs
new file mode 100644
index 0000000000000000000000000000000000000000..10f71c1a200c190cd68d4d14d786038061fe2dcb
--- /dev/null
+++ b/relay_log/src/apply/relay_log_apply_snapshot.rs
@@ -0,0 +1,5 @@
+
+/// todo 日志Apply快照
+pub struct RelayLogApplySnapshot {
+
+}
\ No newline at end of file
diff --git a/relay_log/src/codec/binary_codec.rs b/relay_log/src/codec/binary_codec.rs
new file mode 100644
index 0000000000000000000000000000000000000000..e2feafe68deb17c670ef2bad19cd0092cfed430e
--- /dev/null
+++ b/relay_log/src/codec/binary_codec.rs
@@ -0,0 +1,158 @@
+use std::fmt::{Debug, Formatter};
+
+use bincode::{DefaultOptions, Options};
+use tracing::error;
+
+use common::err::CResult;
+use common::err::decode_error::ReError;
+
+use crate::codec::codec::Codec;
+
+#[derive(Clone)]
+pub struct BinaryCodec {
+ serialize_options: DefaultOptions,
+}
+
+pub enum CodecStyle {
+ // 小端定长
+ LittleFix(u64),
+ // 大端定长
+ BigFix(u64),
+ // 小端变长
+ LittleVar,
+ // 大端变长
+ BigVar,
+}
+
+impl Codec for BinaryCodec {
+ fn new() -> Self where Self: Sized {
+ BinaryCodec {
+ serialize_options: bincode::options(),
+ }
+ }
+
+ fn name(&self) -> String {
+ String::from("BinaryCodec")
+ }
+}
+
+impl BinaryCodec {
+ /// Serializes a serializable object into a `Vec` of bytes.
+ pub fn binary_serialize(&self, codec_style: &CodecStyle, value: &T) -> CResult>
+ where
+ T: serde::Serialize,
+ {
+ match codec_style {
+ CodecStyle::LittleFix(limit) => {
+ let bytes = self.serialize_options
+ .with_limit(*limit)
+ .with_little_endian()
+ .with_fixint_encoding()
+ .serialize(value).or_else(|e| {
+ error!("binary serialize err: {:?}", &e);
+ Err(ReError::Error(e.to_string()))
+ })?;
+ Ok(bytes)
+ }
+ CodecStyle::BigFix(limit) => {
+ let bytes = self.serialize_options
+ .with_limit(*limit)
+ .with_big_endian()
+ .with_fixint_encoding()
+ .serialize(value).or_else(|e| {
+ error!("binary serialize err: {:?}", &e);
+ Err(ReError::Error(e.to_string()))
+ })?;
+ Ok(bytes)
+ }
+ CodecStyle::LittleVar => {
+ let bytes = self.serialize_options
+ .allow_trailing_bytes()
+ .with_no_limit()
+ .with_little_endian()
+ .with_varint_encoding()
+ .serialize(value).or_else(|e| {
+ error!("binary serialize err: {:?}", &e);
+ Err(ReError::Error(e.to_string()))
+ })?;
+ Ok(bytes)
+ }
+ CodecStyle::BigVar => {
+ let bytes = self.serialize_options
+ .allow_trailing_bytes()
+ .with_no_limit()
+ .with_big_endian()
+ .with_varint_encoding()
+ .serialize(value).or_else(|e| {
+ error!("binary serialize err: {:?}", &e);
+ Err(ReError::Error(e.to_string()))
+ })?;
+ Ok(bytes)
+ }
+ }
+ }
+
+ /// Deserializes a slice of bytes into an instance of `T`.
+ pub fn binary_deserialize<'a, T>(&self, codec_style: &CodecStyle, bytes: &'a [u8]) -> CResult
+ where
+ T: serde::de::Deserialize<'a>,
+ {
+ match codec_style {
+ CodecStyle::LittleFix(limit) => {
+ let r = self.serialize_options
+ .with_limit(*limit)
+ .with_little_endian()
+ .with_fixint_encoding()
+ .deserialize::(bytes).or_else(|e| {
+ error!("binary deserialize err: {:?}", &e);
+ Err(ReError::Error(e.to_string()))
+ })?;
+ Ok(r)
+ }
+ CodecStyle::BigFix(limit) => {
+ let r = self.serialize_options
+ .with_limit(*limit)
+ .with_big_endian()
+ .with_fixint_encoding()
+ .deserialize::(bytes).or_else(|e| {
+ error!("binary deserialize err: {:?}", &e);
+ Err(ReError::Error(e.to_string()))
+ })?;
+ Ok(r)
+ }
+ CodecStyle::LittleVar => {
+ let r = self.serialize_options
+ .allow_trailing_bytes()
+ .with_no_limit()
+ .with_little_endian()
+ .with_varint_encoding()
+ .deserialize::(bytes).or_else(|e| {
+ error!("binary deserialize err: {:?}", &e);
+ Err(ReError::Error(e.to_string()))
+ })?;
+ Ok(r)
+ }
+ CodecStyle::BigVar => {
+ let r = self.serialize_options
+ .allow_trailing_bytes()
+ .with_no_limit()
+ .with_big_endian()
+ .with_varint_encoding()
+ .deserialize::(bytes).or_else(|e| {
+ error!("binary deserialize err: {:?}", &e);
+ Err(ReError::Error(e.to_string()))
+ })?;
+ Ok(r)
+ }
+ }
+ }
+}
+
+impl Debug for BinaryCodec {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("BinaryCodec")
+ .field("name", &self.name())
+ .field("serializer", &"bincode")
+ .finish()
+ }
+}
\ No newline at end of file
diff --git a/relay_log/src/codec/codec.rs b/relay_log/src/codec/codec.rs
new file mode 100644
index 0000000000000000000000000000000000000000..993842382db2c18ccbfd3c4cadcb0ab76664eac7
--- /dev/null
+++ b/relay_log/src/codec/codec.rs
@@ -0,0 +1,10 @@
+
+/// 编解码
+pub trait Codec {
+ /// 实例化
+ fn new() -> Self where Self: Sized;
+
+ /// 实例类型名称
+ fn name(&self) -> String;
+
+}
\ No newline at end of file
diff --git a/relay_log/src/codec/json_codec.rs b/relay_log/src/codec/json_codec.rs
new file mode 100644
index 0000000000000000000000000000000000000000..dfb48ab41fc742107561265fb48af4b457d37975
--- /dev/null
+++ b/relay_log/src/codec/json_codec.rs
@@ -0,0 +1,26 @@
+use std::fmt::{Debug, Formatter};
+use crate::codec::codec::Codec;
+
+#[derive(Clone)]
+pub struct JsonCodec {
+
+}
+
+impl Codec for JsonCodec {
+ fn new() -> Self where Self: Sized {
+ JsonCodec {
+
+ }
+ }
+
+ fn name(&self) -> String {
+ String::from("JsonCodec")
+ }
+}
+
+impl Debug for JsonCodec {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ todo!()
+ }
+}
+
diff --git a/relay_log/src/codec/mod.rs b/relay_log/src/codec/mod.rs
new file mode 100644
index 0000000000000000000000000000000000000000..0d223c8c6e2bd6850978b2482a53bd58bc339a3f
--- /dev/null
+++ b/relay_log/src/codec/mod.rs
@@ -0,0 +1,6 @@
+pub mod json_codec;
+pub mod codec;
+pub mod binary_codec;
+
+#[cfg(test)]
+mod test;
diff --git a/relay_log/src/lib.rs b/relay_log/src/lib.rs
new file mode 100644
index 0000000000000000000000000000000000000000..d5e54260ab7d4e02a2687050ca61db5491d4c4b9
--- /dev/null
+++ b/relay_log/src/lib.rs
@@ -0,0 +1,8 @@
+
+pub mod storage;
+pub mod apply;
+pub mod codec;
+pub mod relay_log_server;
+pub mod relay_log_server_machine;
+pub mod relay_log;
+
diff --git a/relay_log/src/relay_log.rs b/relay_log/src/relay_log.rs
new file mode 100644
index 0000000000000000000000000000000000000000..8d62b6f392cbbc5356bc504eb030ac8f212dcb71
--- /dev/null
+++ b/relay_log/src/relay_log.rs
@@ -0,0 +1,419 @@
+use chrono::{NaiveDate, NaiveDateTime, NaiveTime, Utc};
+use getset::{Getters, Setters};
+use serde::{Deserialize, Serialize};
+use tracing::warn;
+
+use binlog::events::binlog_event::BinlogEvent;
+use binlog::events::declare::log_event::LogEvent;
+use binlog::events::declare::rows_log_event::RowsLogEvent;
+use binlog::events::protocol::table_map_event::ColumnInfo;
+use binlog::row::row_data::RowData;
+use common::binlog::column::column_type::SrcColumnType;
+use common::binlog::column::column_value::SrcColumnValue;
+use common::binlog::src_meta::SrcType;
+use common::schema::data_type::{DstColumnType, Value};
+
+/// 中继日志信息
+#[derive(Serialize, Deserialize, Debug, Clone, Getters, Setters)]
+pub struct RelayLog {
+ /// src type
+ #[getset(get = "pub", set = "pub")]
+ src_type: SrcType,
+
+ /// binlog event position
+ #[getset(get = "pub", set = "pub")]
+ event_log_pos: u64,
+
+ /// binlog event name
+ #[getset(get = "pub", set = "pub")]
+ event_name: String,
+
+ /// database
+ #[getset(get = "pub", set = "pub")]
+ database_name: String,
+
+ /// table
+ #[getset(get = "pub", set = "pub")]
+ table_name: String,
+
+ /// column info
+ #[getset(get = "pub", set = "pub")]
+ columns: Vec,
+
+ /// replay command
+ #[getset(get = "pub", set = "pub")]
+ relay_command: RelayCommand,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub enum RelayCommand {
+ None,
+ CreateDatabase,
+ DropDatabase,
+ CreateTable,
+ DropTable,
+ AlterTable,
+ /// 插入数据
+ Insert(Vec),
+ /// 删除数据
+ Delete(Vec),
+ /// 更新数据: (deleteRows, insertRows)
+ Update(Vec<(RelayRowData, RelayRowData)>),
+}
+
+/// 列信息
+#[derive(Serialize, Deserialize, Debug, Clone, Getters, Setters)]
+pub struct RelayColumnInfo {
+ /// 列类型
+ #[getset(get = "pub", set = "pub")]
+ column_type: DstColumnType,
+
+ /// 列名称
+ #[getset(get = "pub", set = "pub")]
+ column_name: String,
+}
+
+impl RelayColumnInfo {
+ fn from_binlog_column_info(binlog_column_info: &ColumnInfo) -> Self {
+ let column_name = binlog_column_info.get_name();
+ if let Some(binlog_column_type) = binlog_column_info.get_c_type().take() {
+ let column_type = match binlog_column_type {
+ SrcColumnType::Null => {
+ DstColumnType::Null
+ }
+ // Decimal
+ SrcColumnType::Decimal |
+ SrcColumnType::NewDecimal => {
+ DstColumnType::Decimal
+ }
+ // Int
+ SrcColumnType::Tiny |
+ SrcColumnType::Int24 |
+ SrcColumnType::Year => {
+ DstColumnType::Int
+ }
+ SrcColumnType::Short => {
+ DstColumnType::Short
+ }
+ // Long
+ SrcColumnType::Long |
+ SrcColumnType::LongLong => {
+ DstColumnType::Long
+ }
+ SrcColumnType::Float => {
+ DstColumnType::Float
+ }
+ SrcColumnType::Double => {
+ DstColumnType::Double
+ }
+ // Timestamp
+ SrcColumnType::Timestamp |
+ SrcColumnType::Timestamp2 => {
+ DstColumnType::Timestamp
+ }
+ // Date
+ SrcColumnType::Date |
+ SrcColumnType::NewDate => {
+ DstColumnType::Date
+ }
+ // Time
+ SrcColumnType::Time |
+ SrcColumnType::Time2 => {
+ DstColumnType::Time
+ }
+ // DateTime
+ SrcColumnType::DateTime |
+ SrcColumnType::DateTime2 => {
+ DstColumnType::DateTime
+ }
+ // String
+ SrcColumnType::VarChar |
+ SrcColumnType::Enum |
+ SrcColumnType::Set |
+ SrcColumnType::VarString |
+ SrcColumnType::String |
+ SrcColumnType::Array => {
+ DstColumnType::String
+ }
+ SrcColumnType::Bit => {
+ DstColumnType::Bitmap
+ }
+ SrcColumnType::Bool => {
+ DstColumnType::Boolean
+ }
+ SrcColumnType::Json => {
+ DstColumnType::JSON
+ }
+ // Blob
+ SrcColumnType::TinyBlob |
+ SrcColumnType::MediumBlob |
+ SrcColumnType::LongBlob |
+ SrcColumnType::Blob => {
+ DstColumnType::Blob
+ }
+ SrcColumnType::Geometry => {
+ DstColumnType::Geometry
+ }
+ // todo 未知?
+ SrcColumnType::Invalid => {
+ DstColumnType::Other
+ }
+ };
+ Self {
+ column_type,
+ column_name,
+ }
+ } else {
+ Self::default()
+ }
+ }
+}
+
+impl Default for RelayColumnInfo {
+ fn default() -> Self {
+ Self {
+ column_type: DstColumnType::Null,
+ column_name: "".to_string(),
+ }
+ }
+}
+
+/// 一行数据
+#[derive(Serialize, Deserialize, Debug, Clone, Getters, Setters)]
+pub struct RelayRowData {
+ #[getset(get = "pub", set = "pub")]
+ values: Vec,
+}
+
+impl RelayRowData {
+ fn from_binlog_row(binlog_row: &RowData) -> Self {
+ let values: Vec = binlog_row.get_cells().iter().map(|c| {
+ if let Some(v) = c {
+ match v {
+ // `TinyInt`,`SmallInt`,`MediumInt`,`Int` => `Int`
+ SrcColumnValue::TinyInt(data) => {
+ Value::Int(*data as i32)
+ }
+ SrcColumnValue::SmallInt(data) => {
+ Value::Int(*data as i32)
+ }
+ SrcColumnValue::MediumInt(data) => {
+ Value::Int(*data as i32)
+ }
+ SrcColumnValue::Int(data) => {
+ Value::Int(*data as i32)
+ }
+ // `BigInt` => `Long`
+ SrcColumnValue::BigInt(data) => {
+ Value::Long(*data as i64)
+ }
+ SrcColumnValue::Float(data) => {
+ Value::Float(*data)
+ }
+ SrcColumnValue::Double(data) => {
+ Value::Double(*data)
+ }
+ SrcColumnValue::Decimal(data) => {
+ Value::Decimal(data.to_string())
+ }
+ SrcColumnValue::String(data) => {
+ Value::String(data.to_string())
+ }
+ SrcColumnValue::Blob(data) => {
+ Value::Blob(data.to_vec())
+ }
+ // Year => Int
+ SrcColumnValue::Year(data) => {
+ Value::Int(*data as i32)
+ }
+ SrcColumnValue::Date(data) => {
+ if let Some(naive_date) = NaiveDate::from_ymd_opt(data.year as i32, data.month as u32, data.day as u32) {
+ if let Some(naive_datetime) = naive_date.and_hms_milli_opt(0, 0, 0, 0) {
+ Value::Date(naive_datetime.timestamp_millis())
+ } else {
+ warn!("Date parse error.");
+ Value::Null
+ }
+ } else {
+ warn!("Date parse error.");
+ Value::Null
+ }
+ }
+ SrcColumnValue::Time(data) => {
+ if let Some(naive_time) = NaiveTime::from_hms_milli_opt(data.hour as u32, data.minute as u32, data.second as u32, data.millis) {
+ let now_date = Utc::now().date_naive();
+ Value::Time(NaiveDateTime::new(now_date, naive_time).timestamp_millis())
+ } else {
+ warn!("Time parse error.");
+ Value::Null
+ }
+ }
+ SrcColumnValue::DateTime(data) => {
+ if let Some(naive_date) = NaiveDate::from_ymd_opt(data.year as i32, data.month as u32, data.day as u32) {
+ if let Some(naive_datetime) = naive_date.and_hms_milli_opt(data.hour as u32, data.minute as u32, data.second as u32, data.millis) {
+ Value::DateTime(naive_datetime.timestamp_millis())
+ } else {
+ warn!("DateTime parse error.");
+ Value::Null
+ }
+ } else {
+ warn!("DateTime parse error.");
+ Value::Null
+ }
+ }
+ SrcColumnValue::Timestamp(data) => {
+ Value::Timestamp(*data as i64)
+ }
+
+ // todo 暂不支持的类型:`Bit`,`Enum`,`Set`
+ SrcColumnValue::Bit(data) => {
+ Value::Null
+ }
+ SrcColumnValue::Enum(data) => {
+ Value::Null
+ }
+ SrcColumnValue::Set(data) => {
+ Value::Null
+ }
+ }
+ } else {
+ Value::Null
+ }
+ }).collect();
+
+ Self {
+ values
+ }
+ }
+}
+
+impl Default for RelayRowData {
+ fn default() -> Self {
+ Self {
+ values: vec![]
+ }
+ }
+}
+
+impl RelayLog {
+ pub fn from_binlog_event(event: &BinlogEvent) -> Self {
+ // todo 暂时写死 Mysql 源
+ let src_type = SrcType::Mysql;
+ match event {
+ BinlogEvent::WriteRows(e) => {
+ if let Some(table) = e.get_table_map_event() {
+ let event_log_pos = e.get_header().get_log_pos();
+ let event_name = e.get_type_name();
+ let insert_rows: Vec = e.get_rows()
+ .iter()
+ .map(|r| {
+ RelayRowData::from_binlog_row(r)
+ }).collect();
+ let database_name = table.get_database_name();
+ let table_name = table.get_table_name();
+ let columns = table.get_column_infos()
+ .iter()
+ .map(|c| {
+ RelayColumnInfo::from_binlog_column_info(c)
+ }).collect();
+ let relay_command = RelayCommand::Insert(insert_rows);
+ Self {
+ src_type,
+ event_log_pos,
+ event_name,
+ database_name,
+ table_name,
+ columns,
+ relay_command,
+ }
+ } else {
+ Self::default()
+ }
+ }
+ BinlogEvent::UpdateRows(e) => {
+ if let Some(table) = e.get_table_map_event() {
+ let event_log_pos = e.get_header().get_log_pos();
+ let event_name = e.get_type_name();
+ let update_rows: Vec<(RelayRowData, RelayRowData)> = e.rows
+ .iter()
+ .map(|r| {
+ (RelayRowData::from_binlog_row(&(r.get_before_update())), RelayRowData::from_binlog_row(&(r.get_after_update())))
+ }).collect();
+ let database_name = table.get_database_name();
+ let table_name = table.get_table_name();
+ let columns = table.get_column_infos()
+ .iter()
+ .map(|c| {
+ RelayColumnInfo::from_binlog_column_info(c)
+ }).collect();
+ let relay_command = RelayCommand::Update(update_rows);
+ Self {
+ src_type,
+ event_log_pos,
+ event_name,
+ database_name,
+ table_name,
+ columns,
+ relay_command,
+ }
+ } else {
+ Self::default()
+ }
+ }
+ BinlogEvent::DeleteRows(e) => {
+ if let Some(table) = e.get_table_map_event() {
+ let event_log_pos = e.get_header().get_log_pos();
+ let event_name = e.get_type_name();
+ let delete_rows: Vec = e.get_rows().iter().map(|r| {
+ RelayRowData::from_binlog_row(r)
+ }).collect();
+ let database_name = table.get_database_name();
+ let table_name = table.get_table_name();
+ let columns = table.get_column_infos()
+ .iter()
+ .map(|c| {
+ RelayColumnInfo::from_binlog_column_info(c)
+ }).collect();
+ let relay_command = RelayCommand::Delete(delete_rows);
+ Self {
+ src_type,
+ event_log_pos,
+ event_name,
+ database_name,
+ table_name,
+ columns,
+ relay_command,
+ }
+ } else {
+ Self::default()
+ }
+ }
+ _ => {
+ // todo 其它event后续实现
+ Self::default()
+ }
+ }
+ }
+
+ pub fn get_database_name(&self) -> &str {
+ self.database_name.as_str()
+ }
+
+ pub fn get_table_name(&self) -> &str {
+ self.table_name.as_str()
+ }
+}
+
+impl Default for RelayLog {
+ fn default() -> Self {
+ Self {
+ src_type: SrcType::default(),
+ event_log_pos: 0,
+ event_name: "".to_string(),
+ database_name: "".to_string(),
+ table_name: "".to_string(),
+ columns: vec![],
+ relay_command: RelayCommand::None,
+ }
+ }
+}
\ No newline at end of file
diff --git a/relay_log/src/relay_log_server.rs b/relay_log/src/relay_log_server.rs
new file mode 100644
index 0000000000000000000000000000000000000000..fb7a46c66742754da5caac67476b3db98845c290
--- /dev/null
+++ b/relay_log/src/relay_log_server.rs
@@ -0,0 +1,139 @@
+use std::sync::{Arc, Mutex};
+
+use tokio::sync::mpsc::Receiver;
+use tokio::task::JoinHandle;
+use tracing::{error, info, warn};
+
+use binlog::events::binlog_event::BinlogEvent;
+use common::err::CResult;
+use common::err::decode_error::ReError;
+
+use crate::relay_log_server_machine::RelayLogServerMachine;
+
+#[derive(Debug)]
+pub struct RelayLogServer {
+ state: Arc>,
+}
+
+#[derive(Debug)]
+struct ServerState {
+ running: bool,
+ receiver_handle: Option>>,
+}
+
+impl RelayLogServer {
+ pub fn new() -> Self {
+ let state = Arc::new(Mutex::new(ServerState {
+ running: false,
+ receiver_handle: None,
+ }));
+ Self {
+ state,
+ }
+ }
+
+ /// 实例化,且开始监听Receiver
+ pub fn new_with_binlog_receiver(rx: Receiver) -> CResult {
+ let state = Arc::new(Mutex::new(ServerState {
+ running: false,
+ receiver_handle: None,
+ }));
+ Self::recv_event(Arc::clone(&state), rx)?;
+ Ok(Self {
+ state
+ })
+ }
+
+ /// 开启中继日志监听服务(监听上游binlog事件)
+ pub fn start(&self, rx: Receiver) -> CResult {
+ Self::recv_event(Arc::clone(&self.state), rx)
+ }
+
+ /// 监听binlog事件
+ fn recv_event(state: Arc>, mut rx: Receiver) -> CResult {
+ let mut s = state.lock().or_else(|e| {
+ error!("stare recv binlog event err: {:?}", &e);
+ Err(ReError::Error(e.to_string()))
+ })?;
+ if s.running == true {
+ Err(ReError::String("The RelayLogServer is already running.".to_string()))
+ } else {
+ let shard_state = Arc::clone(&state);
+ // 开启一个task监听Receiver
+ let handle: JoinHandle> = tokio::spawn(async move {
+ // 监听Receiver。
+ // 1. 若通道为空,但是发送端未关闭,则当前task放弃CPU使用权(不会阻塞线程)。
+ // 2. 若通道为空,且发送端已经关闭,则收到消息:None
+ // 2. 若通道不为空,则接收到消息:Some(event)
+ while let Some(event) = rx.recv().await {
+ match RelayLogServerMachine::process_binlog_event(&event) {
+ Ok(()) => {}
+ Err(e) => {
+ error!("Precess BinlogEvent: {:?}, err: {:?}", event, e);
+ }
+ }
+ };
+ warn!("binlog event sender closed, current receiving end is about to shutdown.");
+ let mut end_state = shard_state.lock().or_else(|e| {
+ error!("relay log server close err: {:?}", &e);
+ Err(ReError::Error(e.to_string()))
+ })?;
+ end_state.running = false;
+ end_state.receiver_handle = None;
+ Ok(())
+ });
+
+ s.running = true;
+ s.receiver_handle = Some(handle);
+ Ok(true)
+ }
+ }
+
+ ///
+ /// Receiver端异常close()后,只是半关闭状态,Receiver端仍然可以继续读取可能已经缓冲在通道中的消息,
+ /// close()只能保证Sender端无法再发送普通的消息,但Permit或OwnedPermit仍然可以向通道发送消息。
+ /// 只有通道已空且所有Sender端(包括Permit和OwnedPermit)都已经关闭的情况下,recv()才会返回None,此时代表通道完全关闭。
+ fn try_recv_event(&self, mut rx: Receiver) {
+ loop {
+ match rx.try_recv() {
+ Ok(event) => {
+ info!("收到binlog事件:{:?}", event);
+ }
+ Err(e) => {
+ warn!("binlog event Receiver try_recv: {:?}", e);
+ break;
+ }
+ }
+ }
+ }
+
+ /// 关闭中继日志监听服务。
+ /// 不主动关闭binlog监听端服务,receiver状态完全由sender决定。
+ /// 只要存在一个sender未关闭,RelayLogServer则一直运行。
+ // pub fn shutdown(&mut self) -> CResult {
+ // let state = self.state.lock();
+ // match state {
+ // Ok(mut s) => {
+ // if let Some(handle) = s.receiver_handle.take() {
+ // handle.abort();
+ // }
+ // s.running = false;
+ // Ok(true)
+ // }
+ // Err(e) => {
+ // Err(ReError::Error(e.to_string()))
+ // }
+ // }
+ // }
+
+ pub fn is_running(&self) -> CResult {
+ return match self.state.lock() {
+ Ok(s) => {
+ Ok(s.running)
+ }
+ Err(e) => {
+ Err(ReError::String(e.to_string()))
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/relay_log/src/relay_log_server_machine.rs b/relay_log/src/relay_log_server_machine.rs
new file mode 100644
index 0000000000000000000000000000000000000000..5b2a3955a0274fe6fda1d2f43b7ca6aeb4815756
--- /dev/null
+++ b/relay_log/src/relay_log_server_machine.rs
@@ -0,0 +1,59 @@
+use std::collections::HashMap;
+use std::sync::RwLock;
+
+use chrono::Local;
+use lazy_static::lazy_static;
+use tracing::{info, warn};
+
+use binlog::events::binlog_event::BinlogEvent;
+use common::err::CResult;
+use common::schema::rc_task::RcTask;
+use crate::relay_log::RelayLog;
+
+lazy_static! {
+ static ref INS: RelayLogServerMachine = RelayLogServerMachine {
+ t: Local::now().timestamp_millis(),
+ rc_task: RwLock::new(HashMap::::new()),
+ };
+}
+
+#[derive(Debug)]
+pub struct RelayLogServerMachine {
+ t: i64,
+ // task信息(线程安全)
+ rc_task: RwLock>,
+}
+
+unsafe impl Sync for RelayLogServerMachine {}
+
+impl RelayLogServerMachine {
+ /// 单例
+ pub fn get_instance() -> &'static RelayLogServerMachine {
+ &INS
+ }
+
+ /// 处理binlog事件
+ pub fn process_binlog_event(event: &BinlogEvent) -> CResult<()> {
+ let relay_entity = RelayLog::from_binlog_event(event);
+
+ let a = Self::get_instance();
+
+ info!("relay_entity: {:?}", relay_entity);
+
+ let db_name = relay_entity.get_database_name();
+ let table_name = relay_entity.get_table_name();
+
+ // todo..
+ Ok(())
+ }
+
+ pub fn add_task(&self, task: RcTask) -> CResult {
+ let mut tasks = self.rc_task.write().unwrap();
+ if tasks.contains_key(&task.task_id) {
+ tasks.remove(&task.task_id);
+ warn!("更新task..");
+ }
+ tasks.insert(task.task_id.clone(), task);
+ Ok(true)
+ }
+}
\ No newline at end of file
diff --git a/relay_log/src/storage/compactor.rs b/relay_log/src/storage/compactor.rs
new file mode 100644
index 0000000000000000000000000000000000000000..1dd790b3f1dcba4f8cb1cb7d768acc6ec9d264f5
--- /dev/null
+++ b/relay_log/src/storage/compactor.rs
@@ -0,0 +1,5 @@
+
+/// todo 日志整理,定时删除已经Apply完的segment文件
+pub struct Compactor {
+
+}
\ No newline at end of file
diff --git a/relay_log/src/storage/file_system.rs b/relay_log/src/storage/file_system.rs
new file mode 100644
index 0000000000000000000000000000000000000000..e2f0b5a2c30a6fceda9ef15dfdd922f112dca039
--- /dev/null
+++ b/relay_log/src/storage/file_system.rs
@@ -0,0 +1,9 @@
+use common::err::CResult;
+
+pub trait FileSystem: Sized {
+ /// 文件加载
+ fn from_file(file_path: &str, start_offset: u64, len: usize) -> CResult;
+
+ /// 强制刷盘(底层调用内核sync方法)
+ fn flush(&self) -> CResult<()>;
+}
\ No newline at end of file
diff --git a/relay_log/src/storage/mod.rs b/relay_log/src/storage/mod.rs
new file mode 100644
index 0000000000000000000000000000000000000000..f1ba5320cedf4ea611683930a8a0d820cbdea665
--- /dev/null
+++ b/relay_log/src/storage/mod.rs
@@ -0,0 +1,13 @@
+pub mod relay_log_storage;
+pub mod segment;
+pub mod segment_manager;
+pub mod storage_config;
+pub mod compactor;
+pub mod storage_event;
+pub mod storage_entry;
+pub mod segment_file;
+pub mod segment_header;
+pub mod segment_entry_position;
+pub mod file_system;
+
+
diff --git a/relay_log/src/storage/relay_log_storage.rs b/relay_log/src/storage/relay_log_storage.rs
new file mode 100644
index 0000000000000000000000000000000000000000..b72acd36009c5fd0bb75e6f6b08efa6fc75fb5fb
--- /dev/null
+++ b/relay_log/src/storage/relay_log_storage.rs
@@ -0,0 +1,125 @@
+use std::cell::RefCell;
+use std::rc::Rc;
+
+use common::err::CResult;
+
+use crate::relay_log::RelayLog;
+use crate::storage::segment::Segment;
+use crate::storage::segment_manager::SegmentManager;
+use crate::storage::storage_config::StorageConfig;
+use crate::storage::storage_entry::StorageEntry;
+
+/// todo 目标表日志存储
+pub struct RelayLogStorage {
+ // 目标库
+ dst_db_name: String,
+ // 目标表
+ dst_table_name: String,
+ // segment管理器
+ pub segment_manager: SegmentManager,
+ // 环形队列
+ entry_buffer: EntryRingBuffer,
+ // 日志整理
+ // log_compactor: Compactor,
+}
+
+struct EntryRingBuffer {
+ entry_buffer_num: usize,
+ // entry缓存
+ entry_buffer: Vec
+ /// 每个Entry块包含如下内容(字节大小 = 8 + 8 + 8 + 4 + {RelayLogSize}):
+ ///
+ /// ```txt
+ /// index: 索引id, 8字节
+ /// offset: 日志内容偏移量, 8字节
+ /// size: 日志内容大小, 8字节
+ /// checksum: 日志内容校验值, 4字节
+ /// relay_log: 日志内容, 动态大小
+ /// ```
+ pub fn append(&mut self, entry: &mut StorageEntry) -> CResult<()> {
+ match &mut self.status {
+ WriteRead(w) => {
+ // log serialize
+ let log_bytes = self.codec.binary_serialize(&self.codec_style, entry.relay_log())?;
+
+ let start_position = self.segment_file.size();
+ let mut entry_size = 0;
+
+ // index
+ let index = *entry.index();
+ w.write_u64::(*entry.index())?;
+ entry_size += 8;
+
+ // log size
+ let log_size = log_bytes.len() as u64;
+ entry.set_log_size(log_size);
+ w.write_u64::(log_size)?;
+ entry_size += 8;
+
+ // checksum
+ let checksum = Self::checksum(&log_bytes);
+ entry.set_checksum(checksum);
+ w.write_u32::(checksum)?;
+ entry_size += 4;
+
+ // log bytes
+ w.write_all(&log_bytes)?;
+ entry_size += log_size;
+
+ if self.is_empty() {
+ self.entry_position.add_position(0, start_position)?;
+ } else {
+ let offset = index - self.first_index();
+ self.entry_position.add_position(offset as usize, start_position)?;
+ }
+
+ self.segment_file.add_entry_size(entry_size);
+ Ok(())
+ }
+ ReadOnly => {
+ Err(ReError::String("segment read only.".to_string()))
+ }
+ }
+ }
+
+ /// read entry by index
+ pub fn get_entry(&mut self, index: u64) -> CResult {
+ if self.is_empty() {
+ return Err(ReError::String("segment is empty.".to_string()));
+ }
+ if index < self.first_index() {
+ return Err(ReError::String("index less than segment first index.".to_string()));
+ }
+ if index > self.last_index() {
+ return Err(ReError::String("index greater than segment last index.".to_string()));
+ }
+
+ let offset = index - self.first_index();
+ let entry_position = self.entry_position.get_position(offset as usize);
+
+ let r = Arc::clone(&self.reader);
+ let mut reader = r.lock().or_else(|e| {
+ error!("segment read lock err: {:?}", &e);
+ Err(ReError::Error(e.to_string()))
+ })?;
+
+ reader.seek(SeekFrom::Start(entry_position))?;
+
+ let idx = reader.read_u64::()?;
+ let log_size = reader.read_u64::()?;
+ let checksum = reader.read_u32::()?;
+
+ let mut log_bytes: Vec = vec![0; log_size as usize];
+ reader.read_exact(&mut log_bytes)?;
+
+ // 校验crc32值
+ if checksum != Self::checksum(&log_bytes) {
+ return Err(ReError::Error("log checksum err.".to_string()));
+ }
+
+ let relay_log = self.codec.binary_deserialize::(&self.codec_style, &log_bytes)?;
+ Ok(StorageEntry::new(idx, log_size, checksum, relay_log))
+ }
+
+ /// 是否可写
+ pub fn is_writable(&self) -> bool {
+ match self.status {
+ WriteRead(_) => {
+ true
+ }
+ ReadOnly => {
+ false
+ }
+ }
+ }
+
+ /// segment is full
+ pub fn is_full(&self) -> bool {
+ let max_size = *self.header.max_segment_size();
+ let max_entries = *self.header.max_entries();
+ let current_size = self.segment_file.size();
+ let current_entries = self.entry_position.get_entry_count();
+ if current_entries >= max_entries || current_size >= max_size {
+ true
+ } else {
+ false
+ }
+ }
+
+ /// flush the segment writer buf to disk
+ pub fn write_flush(&mut self) -> CResult<()> {
+ match &mut self.status {
+ WriteRead(w) => {
+ self.entry_position.flush()?;
+ w.flush()?;
+ }
+ ReadOnly => {}
+ }
+ Ok(())
+ }
+
+ /// 删除segment文件
+ pub fn delete(&self) -> CResult<()> {
+ let file_path = self.segment_file.path();
+ warn!("===删除segment文件: {:?}", file_path);
+ Ok(fs::remove_file(file_path)?)
+ }
+}
+
+impl Debug for Segment {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("Segment")
+ .field("segment_file", &self.segment_file.name())
+ .field("segment_id", &self.header.id())
+ .field("first_index", &self.header.first_index())
+ .field("entry_count", &self.entry_position.get_entry_count())
+ .field("status", &self.status.name())
+ .finish()
+ }
+}
diff --git a/relay_log/src/storage/segment_entry_position.rs b/relay_log/src/storage/segment_entry_position.rs
new file mode 100644
index 0000000000000000000000000000000000000000..b97341f04b9ed1d059f84174c14e4f26b688cb8c
--- /dev/null
+++ b/relay_log/src/storage/segment_entry_position.rs
@@ -0,0 +1,158 @@
+use std::fmt::{Debug, Formatter};
+use std::fs::OpenOptions;
+use std::io::{Cursor, Write};
+
+use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
+use memmap2::{MmapMut, MmapOptions};
+use tracing::error;
+
+use common::err::CResult;
+use common::file_util;
+
+use crate::storage::file_system::FileSystem;
+use crate::storage::storage_config::SEGMENT_HEADER_SIZE_BYTES;
+
+/// entry位置信息.
+///
+/// 固定字节大小 = 4 + max_entries * 8
+pub struct SegmentEntryPosition {
+ // 当前Segment中Entry数量
+ entry_count: u32,
+ // 当前Segment中Entry的位置信息
+ position_info: Vec,
+ // 文件缓存
+ file_buffer: MmapMut,
+}
+
+impl SegmentEntryPosition {
+ /// 初始化
+ pub fn new(file_path: &str, max_entries: u32) -> CResult {
+ let entry_count: u32 = 0;
+ let mut position_info: Vec = vec![0; max_entries as usize];
+
+ let len = 4 + max_entries * 8;
+ let bytes_buffer: Vec = vec![0; len as usize];
+
+ let start_offset = SEGMENT_HEADER_SIZE_BYTES as u64;
+ // 文件初始化
+ file_util::update_file_bytes(file_path, start_offset, &bytes_buffer)?;
+
+ // 打开文件内存映射
+ let mmap = Self::open_file_mem_map(file_path, start_offset, len as usize)?;
+
+ Ok(Self {
+ entry_count,
+ position_info,
+ file_buffer: mmap,
+ })
+ }
+
+ /// 打开文件内存映射(可读可写)
+ fn open_file_mem_map(file_path: &str, start_offset: u64, len: usize) -> CResult {
+ let file = OpenOptions::new()
+ .read(true)
+ .write(true)
+ .append(false)
+ .open(file_path)?;
+ return unsafe {
+ Ok(MmapOptions::new().offset(start_offset).len(len).map_mut(&file)?)
+ };
+ }
+
+ /// 添加entry位置信息
+ pub fn add_position(&mut self, offset: usize, position: u64) -> CResult<()> {
+ match self.update_file_position_info(offset, position) {
+ Ok(_) => {
+ self.entry_count += 1;
+ self.position_info[offset] = position;
+ match self.update_file_entry_count() {
+ Ok(_) => {
+ Ok(())
+ }
+ Err(e) => {
+ self.entry_count -= 1;
+ self.position_info[offset] = 0;
+
+ error!("update segment file err: {:?}", e);
+ Err(e)
+ }
+ }
+ }
+ Err(e) => {
+ Err(e)
+ }
+ }
+ }
+
+ /// 更新文件中entry数量统计
+ fn update_file_entry_count(&mut self) -> CResult<()> {
+ (&mut self.file_buffer[0..4]).write_u32::(self.entry_count)?;
+ Ok(())
+ }
+
+ /// 更新文件中entry位置信息
+ fn update_file_position_info(&mut self, offset: usize, position: u64) -> CResult<()> {
+ let pos_start = 4 + (offset * 8);
+ let pos_end = pos_start + 8;
+ (&mut self.file_buffer[pos_start..pos_end]).write_u64::(position)?;
+ Ok(())
+ }
+
+ /// 返回entry位置信息(内存)
+ pub fn get_position(&self, offset: usize) -> u64 {
+ self.position_info[offset]
+ }
+
+ /// 返回entry位置信息(磁盘)
+ pub fn get_position_disk(&self, offset: usize) -> CResult {
+ let pos_start = 4 + (offset * 8);
+ let pos_end = pos_start + 8;
+ let pos = (&self.file_buffer[pos_start..pos_end]).read_u64::()?;
+ Ok(pos)
+ }
+
+ /// 返回entry数量(内存)
+ pub fn get_entry_count(&self) -> u32 {
+ self.entry_count
+ }
+
+ /// 返回entry数量(磁盘)
+ pub fn get_entry_count_disk(&self) -> CResult {
+ let count = (&self.file_buffer[0..4]).read_u32::()?;
+ Ok(count)
+ }
+}
+
+impl FileSystem for SegmentEntryPosition {
+ fn from_file(file_path: &str, start_offset: u64, len: usize) -> CResult {
+ let mmap = Self::open_file_mem_map(file_path, start_offset, len)?;
+
+ let bytes_buffer = &mmap[0..];
+ let mut cusor = Cursor::new(bytes_buffer);
+
+ cusor.set_position(0);
+ let entry_count = cusor.read_u32::()?;
+
+ cusor.set_position(4);
+ let max_entries = (len - 4) / 8;
+ let mut position_info: Vec = vec![0; max_entries];
+ cusor.read_u64_into::(&mut position_info)?;
+ Ok(Self {
+ entry_count,
+ position_info,
+ file_buffer: mmap,
+ })
+ }
+
+ fn flush(&self) -> CResult<()> {
+ Ok(self.file_buffer.flush()?)
+ }
+}
+
+impl Debug for SegmentEntryPosition {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("SegmentEntryPosition")
+ .field("entry_count", &self.entry_count)
+ .finish()
+ }
+}
\ No newline at end of file
diff --git a/relay_log/src/storage/segment_file.rs b/relay_log/src/storage/segment_file.rs
new file mode 100644
index 0000000000000000000000000000000000000000..cc458530ef610aacb618cd4b3807463ee9c3ce1d
--- /dev/null
+++ b/relay_log/src/storage/segment_file.rs
@@ -0,0 +1,101 @@
+use std::path::Path;
+
+use getset::{Getters, Setters};
+
+use common::err::CResult;
+use common::err::decode_error::ReError;
+
+use crate::storage::storage_config::SEGMENT_FILE_PRE;
+
+#[derive(Debug, Clone, Getters, Setters)]
+pub struct SegmentFile {
+ // 文件的绝对路径: /x/x/x/x/rlog-{version}-{id}-{index}.log
+ #[getset(get = "pub")]
+ path: String,
+ // 文件名: rlog-{version}-{id}-{index}.log
+ #[getset(get = "pub")]
+ name: String,
+ // 文件大小
+ size: u64,
+}
+
+impl SegmentFile {
+ pub fn new(path: String, name: String, size: u64) -> Self {
+ Self {
+ path,
+ name,
+ size,
+ }
+ }
+
+ /// /a/s/rlog-{version}-{id}-{index}.log
+ pub fn from_path(file_path: &str) -> CResult {
+ let path = file_path.to_string();
+ let p = Path::new(file_path);
+ let size = p.metadata()?.len();
+ let os_name = p.file_name().ok_or(ReError::String("segment file not exists.".to_string()))?;
+ let name = os_name.to_str().ok_or(ReError::String("segment file not exists.".to_string()))?.to_string();
+ Ok(Self {
+ path,
+ name,
+ size,
+ })
+ }
+
+
+ /// 判断文件: rlog-{version}-{id}-{index}.log
+ pub fn is_segment_file(file_name: &str) -> CResult {
+ if !file_name.ends_with(".log") {
+ return Ok(false);
+ }
+ if !file_name.starts_with(SEGMENT_FILE_PRE) {
+ return Ok(false);
+ }
+ let parse = Self::segment_file_name_split(file_name)?;
+ if parse.0 == 0 || parse.1 == 0 || parse.2 == 0 {
+ return Ok(false);
+ }
+ Ok(true)
+ }
+
+ /// current version
+ pub fn version(&self) -> CResult {
+ Ok(Self::segment_file_name_split(self.name())?.0)
+ }
+
+ /// segment id
+ pub fn segment_id(&self) -> CResult {
+ Ok(Self::segment_file_name_split(self.name())?.1)
+ }
+
+ /// segment file first index
+ pub fn index(&self) -> CResult {
+ Ok(Self::segment_file_name_split(self.name())?.2)
+ }
+
+ /// segment文件名解析:`[{version},{id},{index}]`
+ fn segment_file_name_split(file_name: &str) -> CResult<(u32, u32, u64)> {
+ let s: Vec<&str> = file_name.split(".").collect();
+ let name_split: Vec<&str> = s[0].split("-").collect();
+ let version = name_split[1].parse::()?;
+ let segment_id = name_split[2].parse::()?;
+ let first_index = name_split[3].parse::()?;
+ Ok((version, segment_id, first_index))
+ }
+
+ /// 返回当前segment大小
+ pub fn size(&self) -> u64 {
+ self.size
+ }
+
+ /// segment文件大小
+ pub fn size_file(&self) -> CResult {
+ let p = Path::new(self.path());
+ Ok(p.metadata()?.len())
+ }
+
+ /// 增加entry大小
+ pub fn add_entry_size(&mut self, entry_size: u64) {
+ self.size += entry_size;
+ }
+}
\ No newline at end of file
diff --git a/relay_log/src/storage/segment_header.rs b/relay_log/src/storage/segment_header.rs
new file mode 100644
index 0000000000000000000000000000000000000000..ed7d5fae59643c0f2d062e6597456366f59cd414
--- /dev/null
+++ b/relay_log/src/storage/segment_header.rs
@@ -0,0 +1,102 @@
+use std::io::Cursor;
+
+use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
+use getset::{Getters, Setters};
+
+use common::err::CResult;
+use common::file_util;
+
+use crate::storage::file_system::FileSystem;
+use crate::storage::storage_config::{SEGMENT_HEADER_SIZE_BYTES, VERSION};
+
+/// segment文件头(不可更改,总大小:64byte).
+/// ```txt
+/// 4字节:segmentId,
+/// 4字节:version,
+/// 8字节:第一个entry的index值,
+/// 8字节:segment最大容量,
+/// 4字节:segment最多存Entry数量
+/// 36字节预留空间(用于后续扩展...)
+/// ```
+#[derive(Debug, Getters, Setters)]
+pub(crate) struct SegmentHeader {
+ // segmentId
+ #[getset(get = "pub")]
+ id: u32,
+
+ // version
+ #[getset(get = "pub")]
+ version: u32,
+
+ // 第一个entry的index值
+ #[getset(get = "pub")]
+ first_index: u64,
+
+ // segment最大容量(单位:byte)
+ #[getset(get = "pub")]
+ max_segment_size: u64,
+
+ // segment最大存Entry数量
+ #[getset(get = "pub")]
+ max_entries: u32,
+}
+
+impl SegmentHeader {
+ /// 初始化文件头
+ pub fn new(file_path: &str,
+ id: u32,
+ first_index: u64,
+ max_segment_size: u64,
+ max_entries: u32) -> CResult {
+ let mut bytes_buffer: [u8; SEGMENT_HEADER_SIZE_BYTES] = [0; SEGMENT_HEADER_SIZE_BYTES];
+ let mut c = Cursor::new(&mut bytes_buffer[0..]);
+ c.write_u32::(id)?;
+ c.write_u32::(VERSION)?;
+ c.write_u64::(first_index)?;
+ c.write_u64::(max_segment_size)?;
+ c.write_u32::(max_entries)?;
+ // 初始化
+ file_util::update_file_bytes(file_path, 0, &bytes_buffer)?;
+ Ok(Self {
+ id,
+ version: VERSION,
+ first_index,
+ max_segment_size,
+ max_entries,
+ })
+ }
+}
+
+impl FileSystem for SegmentHeader {
+ fn from_file(file_path: &str, _start_offset: u64, _len: usize) -> CResult {
+ let file_buffer = file_util::read_file_bytes(file_path, 0, SEGMENT_HEADER_SIZE_BYTES)?;
+
+ let mut cursor = Cursor::new(file_buffer);
+ cursor.set_position(0);
+ let id = cursor.read_u32::()?;
+
+ cursor.set_position(4);
+ let version = cursor.read_u32::()?;
+
+ cursor.set_position(8);
+ let first_index = cursor.read_u64::()?;
+
+ cursor.set_position(16);
+ let max_segment_size = cursor.read_u64::()?;
+
+ cursor.set_position(24);
+ let max_entries = cursor.read_u32::()?;
+
+ Ok(Self {
+ id,
+ version,
+ first_index,
+ max_segment_size,
+ max_entries,
+ })
+ }
+
+ fn flush(&self) -> CResult<()> {
+ Ok(())
+ }
+}
\ No newline at end of file
diff --git a/relay_log/src/storage/segment_manager.rs b/relay_log/src/storage/segment_manager.rs
new file mode 100644
index 0000000000000000000000000000000000000000..6f5012c97310bd108cd552e241691f4652529bcf
--- /dev/null
+++ b/relay_log/src/storage/segment_manager.rs
@@ -0,0 +1,173 @@
+use std::cell::RefCell;
+use std::collections::BTreeMap;
+use std::ffi::OsString;
+use std::fmt::{Debug, Display, Formatter};
+use std::fs;
+use std::path::PathBuf;
+use std::rc::Rc;
+
+use tracing::info;
+use tracing_subscriber::fmt::format;
+
+use common::err::CResult;
+use common::err::decode_error::ReError;
+
+use crate::storage::segment::Segment;
+use crate::storage::segment_file::SegmentFile;
+use crate::storage::storage_config::StorageConfig;
+
+/// 目标表segment文件管理.
+///
+pub struct SegmentManager {
+ // segment列表
+ segments: BTreeMap>>,
+ // 当前活跃的segment
+ current_segment: Rc>,
+ // segment文件夹
+ segment_dir: String,
+ // 单个segment最大值
+ max_segment_size: u64,
+ // 单个segment最多entry数
+ max_segment_entries: u32,
+}
+
+impl SegmentManager {
+ /// 初始化一个segment管理器
+ pub fn new(storage_config: &StorageConfig, dst_db_name: &str, dst_table_name: &str) -> CResult {
+ let segment_dir = Self::get_segment_dir_path(storage_config.relay_log_dir(), dst_db_name, dst_table_name)?;
+ let max_segment_size = *storage_config.max_segment_size();
+ let max_segment_entries = *storage_config.max_segment_entries();
+ // 加载已有的segment文件
+ let mut segments = Self::load_segment(segment_dir.as_str())?;
+ info!("load segments: {:?}", &segments);
+ if segments.is_empty() {
+ // 实例化第一个segment
+ let mut segment = Segment::new(&segment_dir,
+ 1,
+ 1,
+ max_segment_size,
+ max_segment_entries)?;
+ segment.write_open()?;
+ let index = segment.first_index();
+ let current_segment = Rc::new(RefCell::new(segment));
+ segments.insert(index, Rc::clone(¤t_segment));
+ Ok(Self {
+ current_segment,
+ segments,
+ segment_dir,
+ max_segment_size,
+ max_segment_entries,
+ })
+ } else {
+ let current_segment = Rc::clone(segments.last_entry().ok_or(ReError::Error("get last segment err.".to_string()))?.get());
+ Ok(Self {
+ current_segment,
+ segments,
+ segment_dir,
+ max_segment_size,
+ max_segment_entries,
+ })
+ }
+ }
+
+ /// 加载目标表所有segment文件
+ fn load_segment(segment_dir: &str) -> CResult>>> {
+ info!("++++start load segments: {:?}", segment_dir);
+ let path = PathBuf::from(segment_dir);
+ if !path.exists() {
+ fs::create_dir(path.as_path())?;
+ }
+ let mut segments: BTreeMap>> = BTreeMap::new();
+ let files = path.read_dir()?;
+ for file in files {
+ if let Ok(f) = file {
+ let file_path = PathBuf::from(f.path());
+ if file_path.is_file() {
+ if let Some(segment_file_name) = file_path.file_name().ok_or(ReError::String("".to_string()))?.to_str() {
+ if SegmentFile::is_segment_file(segment_file_name)? {
+ // segment文件全路径
+ let segment_file_path = file_path.to_str().ok_or(ReError::String("".to_string()))?;
+ if let Ok(mut segment) = Segment::from_file(segment_file_path) {
+ if !segment.is_full() {
+ segment.write_open()?;
+ }
+ // todo 校验segment合法性?
+ segments.insert(segment.first_index(), Rc::new(RefCell::new(segment)));
+ }
+ }
+ }
+ }
+ }
+ }
+ Ok(segments)
+ }
+
+ /// 获取目标表的日志文件夹路径
+ pub fn get_segment_dir_path(relay_log_dir: &str, dst_db_name: &str, dst_table_name: &str) -> CResult {
+ let log_dir_name = format!("{}#{}", dst_db_name, dst_table_name);
+ let path = PathBuf::from(relay_log_dir).join(&log_dir_name);
+ Ok(path.to_str().ok_or(ReError::String("".to_string()))?.to_string())
+ }
+
+ /// 当前segment
+ pub fn current_segment(&self) -> Rc> {
+ Rc::clone(&self.current_segment)
+ }
+
+ /// 最后一个segment
+ pub fn last_segment(&mut self) -> CResult>> {
+ Ok(Rc::clone(self.segments.last_entry().ok_or(ReError::Error("get last segment err.".to_string()))?.get()))
+ }
+
+ /// 第一个segment
+ pub fn first_segment(&mut self) -> CResult>> {
+ Ok(Rc::clone(self.segments.first_entry().ok_or(ReError::Error("get last segment err.".to_string()))?.get()))
+ }
+
+ /// 返回index所在的segment
+ pub fn segment(&mut self, index: u64) -> CResult>> {
+ if self.current_segment.borrow().contain_index(index) {
+ Ok(Rc::clone(&self.current_segment))
+ } else {
+ for (i, s) in &self.segments {
+ if s.borrow().contain_index(index) {
+ return Ok(Rc::clone(s));
+ }
+ }
+ Err(ReError::Error(format!("unknown index: {}.", index)))
+ }
+ }
+
+ /// 创建下一个segment
+ pub fn create_next_segment(&mut self) -> CResult>> {
+ let last_segment = self.last_segment()?;
+ if last_segment.borrow().is_writable() {
+ // 关闭最后一个segment可写模式
+ last_segment.borrow_mut().write_close()?;
+ }
+ let next_segment_id = last_segment.borrow().id() + 1;
+ let next_segment_first_index = last_segment.borrow().last_index() + 1;
+ let mut next_segment = Segment::new(&self.segment_dir,
+ next_segment_id,
+ next_segment_first_index,
+ self.max_segment_size,
+ self.max_segment_entries)?;
+ next_segment.write_open()?;
+ self.current_segment = Rc::new(RefCell::new(next_segment));
+ let r = self.segments.insert(next_segment_first_index, Rc::clone(&self.current_segment));
+ if let Some(old) = r {
+ drop(old);
+ }
+ Ok(Rc::clone(&self.current_segment))
+ }
+}
+
+impl Debug for SegmentManager {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("SegmentManager")
+ .field("segment_num", &self.segments.len())
+ .field("segments", &self.segments)
+ .field("current_segment", &self.current_segment)
+ .finish()
+ }
+}
\ No newline at end of file
diff --git a/relay_log/src/storage/storage_config.rs b/relay_log/src/storage/storage_config.rs
new file mode 100644
index 0000000000000000000000000000000000000000..03341abffc8c91fcbe0e6bf0e51110d80d409240
--- /dev/null
+++ b/relay_log/src/storage/storage_config.rs
@@ -0,0 +1,53 @@
+use getset::{Getters, Setters};
+
+/// 版本号
+pub(crate) const VERSION: u32 = 1;
+/// segment文件前缀
+pub(crate) const SEGMENT_FILE_PRE: &str = "rlog";
+/// segment文件头大小
+pub(crate) const SEGMENT_HEADER_SIZE_BYTES: usize = 64;
+
+/// 存储可配项
+#[derive(Debug, Clone, Getters, Setters)]
+pub struct StorageConfig {
+ // 中继日志存储路径
+ #[getset(get = "pub", set = "pub")]
+ relay_log_dir: String,
+
+ // 每个segment最大值
+ #[getset(get = "pub", set = "pub")]
+ max_segment_size: u64,
+
+ // 每个segment最多存实体数量
+ #[getset(get = "pub", set = "pub")]
+ max_segment_entries: u32,
+
+ // segment file buffer size
+ #[getset(get = "pub", set = "pub")]
+ entry_buffer_num: usize,
+
+ // 是否flush
+ #[getset(get = "pub", set = "pub")]
+ flush_on_commit: bool,
+
+ // 日志整理周期
+ #[getset(get = "pub", set = "pub")]
+ compact_interval_millisecond: u64,
+}
+
+impl Default for StorageConfig {
+ fn default() -> Self {
+ Self {
+ relay_log_dir: "".to_string(),
+ // 10M
+ max_segment_size: 10 * 1024 * 1024,
+ //
+ max_segment_entries: 100,
+ // 1k个
+ entry_buffer_num: 1024,
+ flush_on_commit: false,
+ // 5min
+ compact_interval_millisecond: 5 * 60 * 1000,
+ }
+ }
+}
\ No newline at end of file
diff --git a/relay_log/src/storage/storage_entry.rs b/relay_log/src/storage/storage_entry.rs
new file mode 100644
index 0000000000000000000000000000000000000000..cb58a98db39a408d72795b73602184bb39631322
--- /dev/null
+++ b/relay_log/src/storage/storage_entry.rs
@@ -0,0 +1,46 @@
+use getset::{Getters, Setters};
+
+use crate::relay_log::RelayLog;
+
+/// 日志存储块.
+/// ========================================
+/// 字节大小 = 8 + 8 + 4 + {RelayLogSize}.
+/// ```txt
+/// index: 索引id, 8字节
+/// log_size: 日志内容大小, 8字节
+/// checksum: 日志内容校验值, 4字节
+/// relay_log: 日志内容, 动态大小
+/// ```
+/// =========================================
+#[derive(Debug, Clone, Getters, Setters)]
+pub struct StorageEntry {
+ // entry索引id
+ #[getset(get = "pub")]
+ index: u64,
+
+ // 日志内容大小
+ #[getset(get = "pub", set = "pub")]
+ log_size: u64,
+
+ // 日志内容校验值
+ #[getset(get = "pub", set = "pub")]
+ checksum: u32,
+
+ // 中继日志实体
+ #[getset(get = "pub")]
+ relay_log: RelayLog,
+}
+
+impl StorageEntry {
+ pub fn new(index: u64,
+ log_size: u64,
+ checksum: u32,
+ relay_log: RelayLog) -> Self {
+ Self {
+ index,
+ log_size,
+ checksum,
+ relay_log
+ }
+ }
+}
\ No newline at end of file
diff --git a/relay_log/src/storage/storage_event.rs b/relay_log/src/storage/storage_event.rs
new file mode 100644
index 0000000000000000000000000000000000000000..e2a9fd5dae97182a67783268911fff1e11544a81
--- /dev/null
+++ b/relay_log/src/storage/storage_event.rs
@@ -0,0 +1,5 @@
+/// todo 存储事件
+pub(crate) struct StorageEvent {
+ segment_id: u32,
+ index: u64,
+}
\ No newline at end of file
diff --git a/tests/Cargo.toml b/tests/Cargo.toml
index c9341eb829ad841f70212b8210b310ba54b42f04..573337f86b36247c80a1d542b04b29e981fce4ae 100644
--- a/tests/Cargo.toml
+++ b/tests/Cargo.toml
@@ -11,6 +11,7 @@ publish = { workspace = true }
[dependencies]
common = { workspace = true }
binlog = { workspace = true }
+relay_log = { workspace = true }
tokio = { workspace = true }
async-trait ={ workspace = true }
diff --git a/tests/src/lib.rs b/tests/src/lib.rs
index d0e2b2410bc21bee51999b9e1e2fc35842920c92..35471f9ad3e895b7fbbae23329aa63fcddcaf3bc 100644
--- a/tests/src/lib.rs
+++ b/tests/src/lib.rs
@@ -4,4 +4,4 @@
#![feature(exact_size_is_empty)]
mod binlog;
-// mod relay_log;
\ No newline at end of file
+mod relay_log;
\ No newline at end of file
diff --git a/tests/src/relay_log/codec/mod.rs b/tests/src/relay_log/codec/mod.rs
new file mode 100644
index 0000000000000000000000000000000000000000..2f5cdfda551ce5edbfaaa31537d261caa4e03f5c
--- /dev/null
+++ b/tests/src/relay_log/codec/mod.rs
@@ -0,0 +1,2 @@
+#[cfg(test)]
+mod test_relay_log_codec;
\ No newline at end of file
diff --git a/tests/src/relay_log/codec/test_relay_log_codec.rs b/tests/src/relay_log/codec/test_relay_log_codec.rs
new file mode 100644
index 0000000000000000000000000000000000000000..a73ecb3f89de9c5bd73b82fa40f990383dda3905
--- /dev/null
+++ b/tests/src/relay_log/codec/test_relay_log_codec.rs
@@ -0,0 +1,25 @@
+use tracing::info;
+use common::log::tracing_factory::TracingFactory;
+use relay_log::codec::binary_codec::BinaryCodec;
+use relay_log::codec::binary_codec::CodecStyle::LittleVar;
+use relay_log::codec::codec::Codec;
+use relay_log::relay_log::RelayLog;
+
+#[test]
+fn test_binary_codec() {
+ TracingFactory::init_log(true);
+
+ let mut s = RelayLog::default();
+ s.set_database_name("db1".to_string());
+ s.set_table_name("t2".to_string());
+ s.set_event_log_pos(10);
+ s.set_event_name("binlog".to_string());
+
+ let codec = BinaryCodec::new();
+
+ let bytes = codec.binary_serialize(&LittleVar, &s).unwrap();
+ info!("序列化:{}-{:?}", bytes.len(), bytes);
+
+ let s2 = codec.binary_deserialize::(&LittleVar, &bytes).unwrap();
+ info!("反序列化:{:?}", s2);
+}
\ No newline at end of file
diff --git a/tests/src/relay_log/mod.rs b/tests/src/relay_log/mod.rs
new file mode 100644
index 0000000000000000000000000000000000000000..13c02f5caa1bb0587858e8ebfa528c33a32f3c1a
--- /dev/null
+++ b/tests/src/relay_log/mod.rs
@@ -0,0 +1,9 @@
+#[cfg(test)]
+mod test_relay_log;
+#[cfg(test)]
+mod test_relay_log_server;
+#[cfg(test)]
+mod test_relay_log_server_machine;
+
+mod storage;
+mod codec;
\ No newline at end of file
diff --git a/tests/src/relay_log/storage/mod.rs b/tests/src/relay_log/storage/mod.rs
new file mode 100644
index 0000000000000000000000000000000000000000..cd153ed22b1e09a6eb70c08a76a05725ae0873f1
--- /dev/null
+++ b/tests/src/relay_log/storage/mod.rs
@@ -0,0 +1,2 @@
+#[cfg(test)]
+mod test_relay_log_storage;
\ No newline at end of file
diff --git a/tests/src/relay_log/storage/test_relay_log_storage.rs b/tests/src/relay_log/storage/test_relay_log_storage.rs
new file mode 100644
index 0000000000000000000000000000000000000000..0c7ca1b207a48667ac30435a1cff8e902d8e6551
--- /dev/null
+++ b/tests/src/relay_log/storage/test_relay_log_storage.rs
@@ -0,0 +1,61 @@
+use tracing::info;
+use common::log::tracing_factory::TracingFactory;
+use relay_log::relay_log::RelayLog;
+use relay_log::storage::relay_log_storage::RelayLogStorage;
+use relay_log::storage::segment_file::SegmentFile;
+use relay_log::storage::segment_manager::SegmentManager;
+use relay_log::storage::storage_config::StorageConfig;
+
+#[test]
+pub fn test_segment_file() {
+ TracingFactory::init_log(true);
+ let segment_file = SegmentFile::from_path("/Users/zhangtao/tmp/db1#t1/rlog-1-1-1.log").unwrap();
+ info!("is_segment_file: {:?}", SegmentFile::is_segment_file("rlog-1-1-1.log"));
+ info!("version: {:?}", segment_file.version());
+ info!("segment_id: {:?}", segment_file.segment_id());
+ info!("index: {:?}", segment_file.index());
+}
+
+#[test]
+pub fn test_segment_manager() {
+ TracingFactory::init_log(true);
+ let mut storage_config = StorageConfig::default();
+ storage_config.set_relay_log_dir("/Users/zhangtao/tmp".to_string());
+ let segment_manager = SegmentManager::new(&storage_config, "db1", "t1").unwrap();
+ info!("{:?}", segment_manager);
+}
+
+#[test]
+pub fn test_log_storage_append() {
+ TracingFactory::init_log(true);
+ let mut storage_config = StorageConfig::default();
+ storage_config.set_relay_log_dir("/Users/zhangtao/tmp".to_string());
+ let db_name = "db1";
+ let tb_name = "t1";
+ let mut log_storage = RelayLogStorage::new(&storage_config, db_name.to_string(), tb_name.to_string()).unwrap();
+ for i in 0..203 {
+ let mut relay_log = RelayLog::default();
+ relay_log.set_database_name(db_name.to_string());
+ relay_log.set_table_name(tb_name.to_string());
+ relay_log.set_event_log_pos(i as u64);
+ relay_log.set_event_name(format!("binlog_{}", i));
+
+ log_storage.append_relay_log(relay_log).unwrap();
+ }
+
+ info!("{:?}", &log_storage.segment_manager);
+}
+
+#[test]
+pub fn test_log_storage_read() {
+ TracingFactory::init_log(true);
+ let mut storage_config = StorageConfig::default();
+ storage_config.set_relay_log_dir("/Users/zhangtao/tmp".to_string());
+ let db_name = "db1";
+ let tb_name = "t1";
+ let mut log_storage = RelayLogStorage::new(&storage_config, db_name.to_string(), tb_name.to_string()).unwrap();
+
+
+ let entry = log_storage.get_entry(202).unwrap();
+ info!("{:?}", *entry);
+}
\ No newline at end of file
diff --git a/tests/src/relay_log/test_relay_log.rs b/tests/src/relay_log/test_relay_log.rs
new file mode 100644
index 0000000000000000000000000000000000000000..4e01053effacd1f39699e9ac92e1e1c12c60d2ad
--- /dev/null
+++ b/tests/src/relay_log/test_relay_log.rs
@@ -0,0 +1,25 @@
+use tracing::info;
+use binlog::events::binlog_event::BinlogEvent;
+use binlog::factory::event_factory::{EventFactory, EventReaderOption, IEventFactory};
+use common::log::tracing_factory::TracingFactory;
+use relay_log::relay_log::RelayLog;
+
+
+pub fn get_table_map_event_write_rows_log_event() -> Vec {
+ let input = include_bytes!("../../events/8.0/19_30_Table_map_event_Write_rows_log_event/binlog.000018");
+ let mut factory = EventFactory::new(false);
+ let (_, output) = factory.parser_bytes(input, &EventReaderOption::default()).unwrap();
+ output
+}
+
+#[test]
+pub fn test_binlog_event_to_relay_entity() {
+ TracingFactory::init_log(true);
+ let local_events = get_table_map_event_write_rows_log_event();
+ let relay_entities: Vec = local_events.iter().map(|e| {
+ RelayLog::from_binlog_event(e)
+ }).collect();
+
+ info!("local_events: {:?}", local_events);
+ info!("relay_entities: {:?}", relay_entities);
+}
\ No newline at end of file
diff --git a/tests/src/relay_log/test_relay_log_server.rs b/tests/src/relay_log/test_relay_log_server.rs
new file mode 100644
index 0000000000000000000000000000000000000000..444c3b9f995bad7a88459a9dc6ed6cc35dbf2353
--- /dev/null
+++ b/tests/src/relay_log/test_relay_log_server.rs
@@ -0,0 +1,46 @@
+use std::time::Duration;
+
+use tokio::join;
+use tokio::sync::mpsc;
+use tracing::{info, warn};
+
+use binlog::events::binlog_event::BinlogEvent;
+use common::err::CResult;
+use common::log::tracing_factory::TracingFactory;
+use relay_log::relay_log_server::RelayLogServer;
+
+use crate::relay_log::test_relay_log;
+
+#[tokio::test]
+pub async fn test_server() -> CResult<()>{
+ TracingFactory::init_log(true);
+
+ let events = test_relay_log::get_table_map_event_write_rows_log_event();
+
+ // 多发送端,单一接收端通道
+ let (tx, rx) = mpsc::channel::(10);
+
+ let server = RelayLogServer::new_with_binlog_receiver(rx)?;
+
+ info!("{:?}", server);
+
+ let send_task = tokio::spawn(async move {
+ for e in events.into_iter() {
+ tokio::time::sleep(Duration::from_millis(500)).await;
+ if tx.send(e).await.is_err() {
+ warn!("receiver closed");
+ break;
+ }
+ }
+ });
+
+ info!("before relay log server state: {}", server.is_running().unwrap());
+
+ join!(send_task);
+
+ info!("after relay log server state: {}", server.is_running().unwrap());
+
+ tokio::time::sleep(Duration::from_secs(10)).await;
+
+ Ok(())
+}
\ No newline at end of file
diff --git a/tests/src/relay_log/test_relay_log_server_machine.rs b/tests/src/relay_log/test_relay_log_server_machine.rs
new file mode 100644
index 0000000000000000000000000000000000000000..12c9e41c5230c65d8240a8ab89fa4043e4048929
--- /dev/null
+++ b/tests/src/relay_log/test_relay_log_server_machine.rs
@@ -0,0 +1,38 @@
+use tokio::join;
+use tracing::info;
+
+use common::log::tracing_factory::TracingFactory;
+use common::schema::rc_task::RcTask;
+use relay_log::relay_log_server_machine::RelayLogServerMachine;
+
+#[tokio::test]
+pub async fn test_relay_log_server_machine() {
+ TracingFactory::init_log(true);
+
+ let h1 = tokio::task::spawn_blocking(|| {
+ add_task("1")
+ });
+
+ let h2 = tokio::task::spawn_blocking(|| {
+ add_task("2")
+ });
+
+ let h3 = tokio::task::spawn_blocking(|| {
+ add_task("3")
+ });
+
+ join!(h1, h2, h3);
+
+ info!("{:?}", RelayLogServerMachine::get_instance());
+}
+
+fn add_task(id: &str) {
+ let task = RcTask {
+ task_id: id.to_string(),
+ task_name: "".to_string(),
+ src_info: vec![],
+ dst_db_name: "".to_string(),
+ dst_table_name: "".to_string(),
+ };
+ RelayLogServerMachine::get_instance().add_task(task).unwrap();
+}
\ No newline at end of file