high-throughput system တွေကိုဘယ်လို design မလဲ

microservice တွေအများကြီးရှိတဲ့ system တခု (ဥပမာ medium-sized ကုမ္ပဏီတခုရဲ့ microservice 80-100 လောက်ရှိတဲ့ Ecommerce Platform မျိုး) မှာအဓိကကြုံရမယ့် challenge က request tracing, mesh, progressive rollout နဲ့ middleware management တွေဖြစ်တယ်။ database bottleneck ကလည်း စဥ်းစားစရာ ကိစ္စတခုဖြစ်ပေမယ့် 10K/sec write လောက်အထိ database အများစုက ကောင်းကောင်း handle လုပ်နိုင်သေးတယ်။ ဒါပေမယ့် တစက္ကန့်ကို read request 2.5M နဲ့ write 500K ထိရှိတဲ့ system တခုဖြစ်လာမယ်ဆိုရင်တော့ challenge အသစ်တွေကို ဖြေရှင်းရဖို့ ဖြစ်သွားပြီ။

ဒီလိုမျိုး high-throughput I/O intensive ဖြစ်တဲ့ system တွေကို ဆောက်တဲ့အခါ အခြေခံကျတဲ့ အချက်တခုက ingestion နဲ့ processing ကိုခွဲထုတ်ပစ်ဖို့ပဲ။ ဘာလို့လဲဆိုတော့ write-optimized ​ဖြစ်တဲ့ Cassandra တို့လို LSM-tree database မျိုးကိုသုံးခဲ့တယ်ဆိုရင်တောင်မှ write ops တခုချင်းစီက consensus ရှိဖို့ quorum အရေအတွက်ပြည့်အောင် node တခုထက်မက လှမ်းရေးရတဲ့အတွက် Network RTT များမယ်။ ဒီတော့ 500K/sec ကို ပုံမှန် write path ဖြစ်တဲ့ Ingress => API => Database ဆိုတဲ့ DB-first (direct persistence) ပုံစံနဲ့သွားရင် request တွေက database ကြောင့် block ဖြစ်နေလိမ့်မယ်။ ဆိုလိုချင်တာက end-to-end latency ပိုတက်လာမယ်။ SLA breach ဖြစ်မယ်ပေါ့။

ဒါကြောင့် throughput 10K နဲ့အထက်ဖြစ်လာပြီဆိုရင် Event sourcing pattern ကိုစဥ်းစားရတယ်။ Event sourcing pattern ရဲ့ယေဘုယျ သဘောတရားက system ဆီ၀င်လာတဲ့ request တွေကို persistent layer တခုမှာအရင် log လိုက်ပြီး client ကို ACK ချက်ချင်းပြန်ပို့မယ်။ အဲဒီ layer က random I/O မဖြစ်ရဘူး။ sequential I/O ဖြစ်ရမယ်။ partitioned ဖြစ်ရမယ်။ ဒါမှ throughput ကောင်းကောင်းရမယ်။ နောက်ပြီး အဲဒီ log တွေကို downstream consumer တွေဆီပြန် stream လို့ရရမယ်။ အရိုးရှင်းဆုံးပုံစံဆိုရင် write တွေကို batch လိုက် database ဆီပို့ဖို့ consumer အုပ်စုတစုပဲရှိမယ်။ write 100 စီ batch ပြီးရေးတယ်ဆိုရင်တောင် စောစောက 500K က 5K ဖြစ်လာမယ်။ ဆိုတော့ 2 orders of magnitude လျော့သွားတယ်ပေါ့။ တခုရှိတာက aggressive ဖြစ်ပြီး အများကြီး ဇွတ် batch လို့တော့မရဘူး။ တအား batch ထားလိုက်ရင် end-to-end latency ဆောင့်တက်လာမယ်။ ဒီတော့ ဘယ်လောက် batch မလဲဆိုတာ ကိုယ့် throughput နဲ့ချိန်တွက်ဖို့ လိုအပ်တယ်။

လက်တွေ့မှာ အဲဒီ append-log layer အတွက် Apache Kafka ကအသုံးအများဆုံးပဲ။ AWS Kinesis, GCP Pub/Sub စတဲ့ option တွေလည်းရှိတယ်။ ဒါပေမယ့် durability, scalablity, orderability, replayability လေးချက်ထဲမှာ Kafka ကိုလူကြိုက်များစေတာက replayability ဆိုတဲ့ log အဟောင်းတွေကို ပြန် poll ပြီး reprocess လုပ်လို့ရနိုင်တာရယ်၊ partition တခုထဲမှာ ordering guarantee လုပ်နိုင်တာရယ်ဖြစ်တယ်။

Kubernetes context နဲ့ပြောရရင် system ထဲဝင်လာတဲ့ client request တွေက Cloud Load Balancer ကနေတဆင့် Ingress Controller (nginx, trafik, envoy စသဖြင့်) မှာ TLS termination လုပ်မယ်။ request တွေကို backpressure ရှိရင်ရှိသလို leaky bucket ဒါမှမဟုတ် sliding window algorithm တွေသုံးပြီး rate limiting လုပ်မယ်။​ နောက် critical request တွေဆိုရင်တော့ priority queue ကနေသွားမယ်ပေါ့။ Ingress ကနေမှ API backend ဆီရောက်မယ်။ backend က redis-backed materialized view တွေကိုလှမ်း query ပြီး domain invariant constraint တွေစစ်မယ်။ pass ဖြစ်တယ်ဆိုရင် intent (event) ကို Kafka ဆီလှမ်း publish လိုက်မယ်။ publish လုပ်ဖို့အတွက် backend အနေနဲ့ request ထဲက key တခုကို hash ပြီး publish လုပ်ရမယ့် partition ကိုရွေးရမယ်။ hash တဲ့အခါ modulo-based hashing, consistent hashing စသဖြင့် ရှိတဲ့အထဲက modulo-based ကတော့ rebalance လုပ်တဲ့အခါ key အကုန်လုံးနီးပါး partition အသစ်ဆီ ရွှေ့ရနိုင်လို့ မကောင်းဘူး။ သူ့ထက်စာရင် consistent hashing ကနည်းနည်းပို stable ဖြစ်တယ်။

service discovery အတွက် backend က bootstrap broker ဆီ metadata တွေ (ဥပမာ topic partition, partition leader စသဖြင့်) တခေါက်လှမ်း query ပြီး interval အလိုက် sync နေရမယ်။ Kafka က internal မှာ အရင်တုန်းက consensus (total order broadcast), cluster-metadata bookkeeping လုပ်ဖို့ Zookeeper ကိုသုံးခဲ့တယ်။ အခုနောက်ပိုင်း version တွေမှာတော့ Zookeeper အစား KRaft ဖြစ်သွားပြီ။

broker တလုံးက partition တခုထက်မက host ထားနိုင်တယ်။ ဒီနေရာမှာ တခုမှတ်ထားရမှာက partition ဆိုတာ parallelism ပဲ။ partition များလေ producer တွေအတွက်ရော၊ consumer တွေအတွက်ပါ throughput ကောင်းလေလေမို့ partition key ကိုသေချာရွေးဖို့လိုတယ်။ partition ကောင်းကောင်းလုပ်ထားနိုင်ရင် producer တွေ consumer တွေကို သီးသန့်စီ scale out လို့ရတယ်။ ဒါမျိုး linear scalability က database ကိုတိုက်ရိုက်ရေးတဲ့ direct persistence model မှာမရနိုင်ဘူး။ နောက်ပြီး hotspot တွေကိုလည်း partition တခုတည်းမှာ skew ဖြစ်မနေအောင် manage လုပ်ရမယ်။ တကယ်လို့ hot key ကိုပါ partition လုပ်ဖို့ရွေးလိုက်ရင် per key ordering ပျက်မှာမို့လို့ downstream consumer တွေက merge & reorder လုပ်ပေးနိုင်ရမယ်။

latency တတ်နိုင်သလောက် လျော့ချဖို့ processor နဲ့ broker ကို colocate လုပ်ထားရင် အကောင်းဆုံးပဲ။ ဒါပေမယ့် Kafka ကို Kubernetes cluster ထဲမှာ Persistent Volume သုံးပြီး Stateful Set အဖြစ်ထားရမယ် မဟုတ်ဘူး။ လက်တွေ့မှာ Kafka ကိုသီးသန့် VM တွေနဲ့ deploy လုပ်တာက အများကြီး ပိုကောင်းတယ်။ producer နဲ့ Kakfa ကို data center တခုတည်းမှာ အတူတူထားလိုက်ရင် Network RTT က sub millisecond ပဲရှိတာမို့ ACK latency ကပြဿနာမဟုတ်တော့ဘူး။ တကယ်တမ်း စိတ်ပူရမှာက Disk I/O ပဲ။ အဲဒါကြောင့် SSD NVMe (AWS ပေါ်မှာဆိုရင် throughput ကောင်းကောင်းရနိုင်တဲ့ gp3, io2 စသဖြင့်) ကိုသုံးပြီး OS အတွက် disk နဲ့ Kafka disk ကိုနှစ်ခုခွဲပြီး ထားသင့်တယ်။ နောက်တချက်က Kafka log တွေ compact/delete လုပ်ဖို့အတွက် segment size, retention period နဲ့ size, cleanup worker အရေအတွက် စသဖြင့် သေချာသတ်မှတ်ရမယ်။ retention များရင် disk consumption တက်လာမယ်။ နည်းရင်လည်း time travel (event replay) လုပ်လို့ရတဲ့ အတိုင်းအတာ နည်းသွားမယ်။

API backend ဖက်ပြန်လှည့်ရအောင်။ backend နဲ့ပတ်သက်ပြီး ပြောစရာ ၃ ခုရှိတယ်။ နံပါတ်တစ်က message ရဲ့ format ရွေးရမယ်။ Kafka ကတော့ message ရဲ့ key နဲ့ value ကို byte array လို့ပဲသဘောထားတဲ့အတွက် producer ကဘယ် format ဘယ် structure သုံးလဲဆိုတာ အရေးမကြီးဘူး။ process လုပ်မယ့် consumer တွေ message ကိုပြန်ပြီး decode လုပ်နိုင်ဖို့ပဲလိုတယ်။ ဒါပေမယ့် throughput 500K/sec ရှိတဲ့ system တခုမှာ payload size ကိုတတ်နိုင်သလောက် လျှော့ချဖို့ json ထက် binary protocol တွေကိုပဲ သုံးသင့်တယ်ဆိုတာကတော့ common sense ပေါ့လေ။ binary protocol တွေအထဲက Apache Kafka အတွက် protobuf, thrift တို့ထက်စာရင် forward ရော backward ပါ compatible ဖြစ်အောင် လုပ်ရလွယ်တဲ့ Avro ကို Confluent Schema Registry နဲ့တွဲသုံးလေ့ရှိတယ်။

နံပါတ်နှစ်က message ရဲ့ size ကို back-of-the-envelope တွက်ထားရမယ်။ Order Submitted event ကို Avro's JSON schema နဲ့ဒီလိုသတ်မှတ်ကြမယ်။

{
  "type": "record",
  "name": "OrderSubmitted",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "user_id", "type": "string"},
    {"name": "items", "type": {
      "type": "array",
      "items": {
        "type": "record",
        "name": "LineItem",
        "fields": [
          {"name": "product_id", "type": "string"},
          {"name": "quantity", "type": "int"},
          {"name": "price_cents", "type": "int"}
        ]
      }
    }},
    {"name": "currency", "type": "string"},
    {"name": "status", "type": "string"},
    {"name": "timestamp", "type": "long"},
    {"name": "shipping_address", "type": {
      "type": "record",
      "name": "Address",
      "fields": [
        {"name": "street", "type": "string"},
        {"name": "city", "type": "string"},
        {"name": "country", "type": "string"},
        {"name": "postal_code", "type": "string"}
      ]
    }}
  ]
}

နမူနာ payload ကဒီလိုဆိုပါတော့။

{
  "order_id": "o_1234567890",
  "user_id": "u_987654321",
  "items": [
    {"product_id": "p_111111", "quantity": 2, "price_cents": 1299},
    {"product_id": "p_222222", "quantity": 1, "price_cents": 4999}
  ],
  "currency": "USD",
  "status": "PENDING",
  "timestamp": 1694352000000,
  "shipping_address": {
    "street": "Big Ben, Westminster",
    "city": "London",
    "country": "UK",
    "postal_code": "SW1A 0AA"
  }
}

200 bytes လောက်ရှိတဲ့ဒီ payload က LZ4 တို့ snappy တို့လို lossless compression algorithm သုံးပြီး compress လုပ်လိုက်လို့ 40% သက်သာသွားတယ်ဆိုရင်ပဲ 120 bytes ကျန်မယ်။ ဒါက 500K/sec နဲ့တွက်ရင် တစက္ကန့်ကို 60MB တနာရီကို 216GB တရက်ကို 5.18TB ကျော်ကျော်ဖြစ်သွားမယ်။ နောက်တချက်က replication factor ကိုထည့်စဥ်းစားဖို့လိုတယ်။ log တွေကို partition တဲ့အပြင် partition တခုချင်းစီမှာလည်း replicate လုပ်ကြသေးတယ်။ တကယ်လို့ replica 3 ခုရှိမယ်ဆိုရင် RF 3 ပေါ့။ ဒီတော့ write throughput * 3 ဖြစ်ပြီး စောစောက ပမာဏရဲ့ 3 ဆရှိလာမယ်။ နောက် consumer group တခုချင်းစီကပြန် poll တာကိုလည်း ထည့်စဥ်းစားရမယ်။ traffic spike တွေမှာ bursted throughput က sustained throughput ထက်အဆပေါင်းများစွာ ပိုနိုင်တဲ့အချက်ကိုပါ ထည့်တွက်ရင် NIC bandwidth က 1Gbps အောက်တော့ နည်းလို့မရဘူး။

နံပါတ်သုံးက performance အတွက် backend node တွေမှာ buffer tuning လုပ်သင့်တယ်။ အကြောင်းအရင်းက kernel က NIC အတွက် data ဘယ်လောက် queue နိုင်လဲဆိုတာနဲ့ network speed နှစ်ခုပေါင်းပြီး throughput ကိုဆုံးဖြတ်ပေးမှာမို့ပဲ။ socket buffer ဘယ်လောက်ထားရမလဲဆိုရင် ကိုယ့် application က Web Socket သုံးထားရင် အနည်းဆုံးတော့ OS ကို write syscall ခေါ်နေတဲ့ buffer ပမာဏထက် ကျော်အောင်တော့ OS buffer ကိုထားသင့်တယ်။

ဥပမာ app buffer က connection တခုကို 1MB ထားထားပြီး OS ကကျ 256KB ပဲရှိတယ်ဆိုပါတော့။ ဒါဆို application က write ပထမတခါ ခေါ်ပြီးရင် နောက်ထပ် 744KB ကိုဆက် flush ဖို့ TCP window စောင့်ရမယ်။ ဒီတော့ sensible ဖြစ်တဲ့ ပမာဏက app buffer ကို 256KB လောက်ထားပြီး OS မှာ net.core.rmem_max နဲ့ net.core.wmem_max တွေကို 1MB လောက်ထားလိုက်တာပဲ။ TCP retransmit တွေမြင်ရမှ နည်းနည်းချင်း ဆက်တိုးပေါ့။ အဓိကက ဘယ်ဟာကိုမှ မျက်လုံးမှိတ်မ max ဖို့ပဲ။ app buffer တအားတင်လိုက်ရင် syscall နည်းနည်းနဲ့ ပို efficient ဖြစ်သွားပေမယ့် batch ထဲအစောဆုံးရောက်တဲ့ request တွေက အကြာဆုံး စောင့်ရပြီး tail latency တက်လာမယ်။ garbage collector ပိုအလုပ်လုပ်ရမယ်။ OS ရဲ့ socket buffer ကိုဇွတ်တင်လိုက်ရင်လည်း memory တအားစားလိမ့်မယ်။ ပြီးတော့ application မှာ zero-copy တွေကို တတ်နိုင်သလောက် သုံးသင့်တယ်။

Kafka မှာ replication factor က topic level မှာသတ်မှတ်ရတာဖြစ်လို့ topic တခုရဲ့ partition တွေက RF အတူတူပဲ။ ဒီ replicated partition တွေက leader broker ဆီက partition နဲ့ asynchronously synchronize လုပ်တာဖြစ်ပြီး maximum lag time သတ်မှတ်ထားနိုင်တယ်။ ဆိုလိုချင်တာက follower broker တခုက အဲဒီသတ်မှတ်ပမာဏထက်တော့ သူ့ရဲ့ partition leader နောက်မှာ ကျန်နေခဲ့လို့မရဘူး။ အဲထက်ပိုနောက်ကျရင် ISR (in-sync replica) စာရင်းကနေ ထုတ်ခံရမယ်။

consistency model ဖက်ကကြည့်ရင် producer က acks=0 နဲ့ publish လိုက်တဲ့ message တွေက broker ဆီက confirmation ကိုမစောင့်ဘူး။ performance အမြန်ဆုံးဖြစ်ပေမယ့် data loss ဖြစ်နိုင်တဲ့ risk ကြီးတယ်။ acks=1 က အနည်းဆုံးတော့ partition leader ဆီက confirmation ကိုစောင့်တယ်။ acks=all ဒါမှမဟုတ် acks=-1 ဆိုရင်တော့ ISR အကုန်လုံး synced ဖြစ်တဲ့အထိ စောင့်တာမို့ consistency အကောင်းဆုံးပေမယ့် performance အဆိုးဆုံးဖြစ်တယ်။ ဒါပေမယ့် acks=0 ဆိုရင်လည်းပဲ message တခုက ISR အကုန် commited ဖြစ်မှ consumer တွေဆီကို visible ဖြစ်မှာမို့ acks=0 က end-to-end latency ကိုတော့ ကောင်းလာစေမှာ မဟုတ်ဘူး။

လက်တွေ့မှာ ISR shrink (ISR < RF) ဖြစ်တဲ့အခါ သိနိုင်အောင် alert တွေဆင်ထားရမယ်။ ISR shrink ရခြင်းက broker overloaded ဖြစ်နေလို့ ဖြစ်နိုင်သလို network issue တွေ​ကြောင့်လည်း ဖြစ်နိုင်တယ်။ ဒါပေမယ့် တော်ရုံနဲ့ ISR shrink မဖြစ်အောင်လို့ maximum lag time ကိုခပ်မြင့်မြင့် (ဥပမာ ၁၀ စက္ကန့်လောက်) တင်ထားလိုက်ရင် acks=all နဲ့ရေးတဲ့ producer တွေအတွက် tail latency ငရဲဖြစ်သွားလိမ့်မယ်။ Apache Kafka က Java Ecosystem ထဲက application ​ဖြစ်တဲ့အတွက် metrics တွေကို Prometheus format နဲ့ expose မလုပ်ပဲ JMX API ကိုသုံးတယ်။ အဲဒီတော့ ကိုယ်က Prometheus stack သုံးနေပြီးသားဆိုရင် Prometheus JMX Exporter ကိုသုံးပြီး Kafka ရဲ့ metrics တွေကို proxy လို့ရတယ်။

2.5M/sec ရှိတဲ့ read request တွေကိုတော့ database ဆီတိုက်ရိုက်မ query ပဲ precomputed aggregate တွေ materialized view (denormalized) တွေနဲ့ consumer-provided local cache တွေကနေ serve မယ်။ အထူးသဖြင့် strongly consistent မဖြစ်တဲ့ ဘယ် system မှာမဆို read-after-write ရဖို့အတွက် local cache ရှိရလိမ့်မယ်။ အဲဒီအတွက် Kafka ဆီက ack ရပြီးတာနဲ့ redis ကိုလှမ်း cache မယ်။ client ဆီက read request ထပ်ရောက်လာခဲ့ရင် cache နဲ့ materialized view ကို merge ပြီးပြမယ်။ materialized view ကိုတော့ per key နဲ့ထားပြီး processor က update လုပ်တဲ့အခါ local cache ကို invalidate လုပ်လိုက်မယ်။

အရမ်း hot တဲ့ key အချို့ကိုတော့ fan-out on read သုံးပြီး Twitter လိုမျိုး hybrid စတိုင် serve မယ်။ ဥပမာ follower 9 သိန်းရှိတဲ့ merchant တယောက်က promotion တခုဖန်တီးလိုက်ရင် သူ့ follower 9 သိန်းစလုံးရဲ့ product feed ဆီအဲဒီ promotion item ပြပေးဖို့ write ops 900K လိုတယ်။ အဲဒါက ပုံမှန် throughput 500K/sec နဲ့ပေါင်းရင် 1400K/sec ဖြစ်ပြီမို့လို့ order of magnitude တခုပိုသွားပြီ။ ဒါကြောင့် hybrid model ကိုစဥ်းစားဖို့လိုတယ်။

ဒီတော့ read path 2 ခုရှိမှာပေါ့။ ပထမတခုက user-facing query read တွေ။ အဲဒါကတော့ အပေါ်မှာပြောခဲ့သလို local cache နဲ့ materialized view ကို merge ပြီးအဖြေပေးမယ်။ နောက်တခုက domain invariant-check read တွေ။ ဥပမာ warehouse ထဲ stock ကျန်သေးရဲ့လားလို့ မကြည့်ပဲ OrderSubmitted ကိုတန်း emit ပေးလိုက်လို့မရဘူး။ အဲတော့ backend ကသူ့ဆီ client request ရောက်လာတဲ့အခါ precomputed aggregate တွေ၊ လိုအပ်တဲ့ materialized view တွေနဲ့အရင် check ကြည့်ဖို့လိုတယ်။ တခုရှိတာက အဲဒီ data တွေက stale ဖြစ်ကောင်းဖြစ်နေလိမ့်မယ်။ ဘာလို့လဲဆိုတော့ downstream processor တွေဖြစ်တဲ့ Kafka Streams တို့ Apache Flink တို့တွေ partition ဆီက log ပြန် poll ပြီးနောက်ကလိုက် process လုပ်ရတဲ့အခါ lag ရှိမှာ။ အဲဒီ lag ကသိပ်မဟဖို့တော့ လိုတာပေါ့။ အရမ်းဟနေရင် stale data တွေပိုများလာမယ်။ transaction တွေကမောက်ကမဖြစ်မယ်။ transactional integrity ပျက်ရင် compensating intent တွေလိုက်ထည့်ရမယ်။ ဥပမာ OrderFailed, OrderCanceled စသဖြင့်။ customer ကိုအော်ဒါမှာလို့မရတော့ကြောင်း mail ပို့ပြီး အသိပေးတာ၊ refund လုပ်တာနဲ့ discount ကူပွန်တွေပေးတာ စသဖြင့်ပေါ့။ ဒါမျိုးတွေ မကြာခဏ ဖြစ်နေမယ်ဆို ကုမ္ပဏီငွေကြေးဆုံးရှုံးတာ၊ reputation ထိခိုက်တာတွေဖြစ်လာမယ်။ အဲဒီတော့ partition ရဲ့ committed offset (sequential ID) နဲ့ latest offset ကြားက ခြားနားချက် သိပ်ကြီးလာပြီဆိုရင် အဲဒါ stream processor တွေကို scale သင့်ပြီလို့ အချက်ပြနေတာပဲ။ ရုတ်တရက်ကြီး consumer lag မတရား တက်လာတဲ့အခါ control plane ကနေ ingress layer မှာ backpressure ပေးနိုင်အောင်လည်း လုပ်ထားသင့်တယ်။

stream processor တွေအတွက်က stateful stream processor သုံးမယ်။ Apache Flink, Kafka Streams စသဖြင့်ပေါ့။ ဒါတွေက batch workload တွေအတွက် ရည်ရွယ်ခဲ့တဲ့ Apache Hadoop ကနေ inspire ဖြစ်ပြီး လိုက်ဆောက်ထားတဲ့ data processing software တွေ။ Hadoop ကိုယ်တိုင်ကလည်း Google ရဲ့ MapReduce ကနေ ဖြစ်လာတာပါပဲ။ batch နဲ့ stream အဓိက ကွာသွားတဲ့တချက်က batch processor တွေမှာ state မရှိဘူး။ batch job တခုက finite input နဲ့အလုပ်လုပ်ရတယ်။ stream processor တွေကတော့ state ရှိနိုင်တယ်။ နောက်ပြီး stream job တွေက infinite (unbounded) input တွေနဲ့ အလုပ်လုပ်ရတယ်။ ဒါပေမယ့် နောက်ပိုင်း software တွေက နှစ်မျိုးလုံးကို အတိုင်းအတာတခုထိ လုပ်လာနိုင်ကြတယ်။ ဥပမာ Apache Flink က stream ကို batch ပြီးအလုပ်လုပ်နိုင်သလို Apache Spark ကလည်း micro batch တွေအဖြစ် stream နိုင်တယ်။

အခုဒီ system မှာဆိုရင် processing job 2 ခုရှိမယ်။ ပထမတခုက system-of-record ကို persist လုပ်မယ့် stream processor ဖြစ်တယ်။ အဲဒီ database ကလည်း throughput ကိုခံနိုင်ဖို့ sharded, partitioned ဖြစ်ရမယ်။ 500K ကို 100 စီ batch ပြီးလှမ်းရေးတယ်ဆိုရင်တောင်မှ 5K/sec ဆိုတဲ့ write ops ကတော်ရုံတန်ရုံ ပမာဏ မဟုတ်သေးဘူး။ ဒုတိယ processing job ကတော့ derived dataset တွေဖြစ်တဲ့ aggregate တွက်ဖို့၊ materialized view တွေထုတ်ဖို့နဲ့ နောက် backend ရဲ့ local cache ကို invalidate လုပ်ဖို့ဖြစ်မယ်။

ဒီ stream processor တွေက validation, dedupe, enrichment နဲ့ transactional write တွေကိုလုပ်နိုင်ရမယ်။ ဒါပေမယ့် critical transaction တွေကို exactly-once semantics (EOS) ရဖို့ dedupe လုပ်တဲ့ process က client မှာစပြီး API ကလည်းပါဝင်တဲ့ end-to-end process ဖြစ်ဖို့လိုတယ်။ downstream processor ဆီရောက်မှ enforce သွားလုပ်လို့မရဘူး။ client မှာကတည်းက transaction id ထုတ်ပြီး server ကိုယူလာခဲ့ရမှာဖြစ်သလို API ကလည်း client id နဲ့ transaction id အတွဲကို TTL-based expiry နဲ့ cache ထားသင့်တယ်။ ​ဒါမှ subsequent request တွေမှာ transaction id ကို cache ထဲတွေ့တဲ့အခါ duplicated ဆိုပြီး response ပြန်နိုင်မယ်။ နောက်ပြီး stream job တွေက aggregate တွေတွက်ရမှာရယ်၊ batching လုပ်ရမှာရယ်ကြောင့် stateful ဖြစ်နေဖို့လိုတယ်။ ဒီလိုမျိုး ingestion နဲ့ processing ကိုခွဲလိုက်တဲ့အတွက် ကောင်းတဲ့အချက်တခုက တကယ်လို့ bug တွေပါသွားခဲ့တာ ဒါမှမဟုတ် processor's logic ကိုပြောင်းချင်တာတို့ဆိုရင် event log တွေကို replay လုပ်ပြီး reprocess လုပ်ရုံပဲ။ ဒီအတွက် stream processing job တွေက idempotency ရှိဖို့တော့ လိုမယ်။

streaming platform တွေမှာ idempotency နဲ့ပတ်သက်ပြီး သတိထားစရာတခုက transient error တွေဖြစ်တယ်။ ဆိုပါတော့။ producer တယောက်က partition တခုဆီကို connection တခုတည်းမှာ in-flight batch နှစ်ခုလှမ်းရေးလိုက်တဲ့အခါ ပထမ message batch က failed သွားပြီး ဒုတိယတခုက success ဖြစ်သွားတယ်။ failed သွားရတဲ့ အကြောင်းအရင်းက transient network error ဖြစ်တဲ့အတွက် producer ကအဲဒီ batch ကို retry လုပ်လိုက်တဲ့အခါ နောက်အခေါက်တွေမှာ success ဖြစ်သွားတယ်ဆိုပါစို့။

ဒါပေမယ့် log layer ရဲ့ POV ကကြည့်ရင် ဒုတိယ batch ကအရင်ရောက်လာတာမို့ ordering ပျက်သွားတယ်။ ဒီပြဿနာကို ဖြေရှင်းဖို့အတွက် retry မလုပ်ပဲနေမလား ဒါမှမဟုတ် in-flight ကို single batch ပဲထားမှာလားစတဲ့ option တွေထက် producer ကို batch တိုင်းအတွက် sequential ID ထုတ်ခိုင်းလိုက်တာက ပို robust ဖြစ်တဲ့အဖြေဖြစ်တယ်။ ဒါပေမယ့် အဲလိုဆိုရင်တောင်မှ ဒါက producer တခုစီအတွက်ပဲဆိုတာ နားလည်ထားဖို့လိုတယ်။ လုံးဝ per key ordering အစစ်ရချင်တယ်ဆိုရင်တော့ ingress level မှာကတည်းက key routing ကိုထိန်းရလိမ့်မယ်။

ဒီနေရာမှာ CQRS အကြောင်း နည်းနည်းထည့်ပြောချင်တယ်။ Event sourcing နဲ့ CQRS နှစ်ခုက မတူဘူး။ တခု implement လုပ်လိုက်တာနဲ့ နောက်တခု တန်းရသွားတာ မဟုတ်ပါဘူး။ Event sourcing system ဆိုတာက state ရဲ့ latest snapshot ကိုပဲ သိမ်းရုံတင်မဟုတ်ပဲ state-changing event တိုင်းကို persist လုပ်တဲ့ system မျိုးကို ဆိုလိုတယ်။ CQRS (Command Query Responsibility Segregation) system ကကျ read နဲ့ write model ကိုခွဲပစ်တဲ့ပုံစံဖြစ်တယ်။ ဒီတော့ CQRS မပါတဲ့ Event souring system က read ရော write ကိုပါ system-of-record datastore တခုတည်းကနေ လုပ်လိမ့်မယ်။ Event sourcing မပါတဲ့ CQRS system က DB-first design ဖြစ်နေလိမ့်မယ်။

နောက်တချက်က CQRS မှာလည်း command နဲ့ query ကိုကိုယ်လိုချင်တဲ့ အတိုင်းအတာထိ ခွဲထုတ်လို့ရတယ်။ ဆိုပါတော့။ write database နဲ့ read replica တွေအဖြစ် responsibility ခွဲထားပေမယ့် data ကိုတော့ဒဲ့ replicate လုပ်တဲ့ CQRS ဖြစ်နိုင်တယ်။ ဒါမှမဟုတ် simplified query ဖြစ်ဖို့ table တွေကြားက join တွေလျှော့ချဖို့ data ကို denormalized လုပ်ပြီးမှ read replica တွေဆီ replicate လုပ်တဲ့ CQRS ဖြစ်နိုင်တယ်။ ဒါမှမဟုတ် command နဲ့ query အတွက် မတူညီတဲ့ ployglot database တွေသုံးထားတဲ့ CQRS လည်းဖြစ်နိုင်တယ်။ ဥပမာ command အတွက် Postgresql ကိုသုံးပြီး query ကိုတော့ Redis, Cassandra ကနေတဆင့်သွားနိုင်ဖို့ လုပ်ထားတာမျိုးပေါ့။ ဒီလောက်ဆို ဒီ pattern နှစ်ခုက မတူဘူးဆိုတာ နားလည်သွားမယ်ထင်တယ်။

ဒီနေ့ပြောသွားတဲ့ Event sourcing system ရဲ့အားသာချက်က CQRS ကိုလွယ်လွယ်လုပ်လို့ရတာပဲ။ တကယ်လို့ ကိုယ့်ကုမ္ပဏီက business analyst တွေက analytics query တွေ run ပြီး report ထုတ်ဖို့လိုတယ်ဆိုပါတော့။ analytics query တွေက OLTP database မှာထက် columnar OLAP database တွေက ပိုသင့်တော်၊ ပို efficient ဖြစ်တယ်။ ဒီအတွက် DB-first system တွေမှာဆိုရင် ETL pipeline တွေဆောက်ပြီး OLTP ကနေ OLAP ကိုပေါင်းကူးပေးဖို့ လုပ်ရတယ်။ ဒါပေမယ့် အခုလို Event sourcing system နဲ့ဆိုရင်တော့ downstream processor တွေထဲမှာ columnar db တခုခု (ဥပမာ Amazon Redshift, ClickHouse, Google BigQuery စသဖြင့်) ကို star schema သုံးပြီး တိုက်ရိုက် feed တဲ့ consumer group တခုထည့်ထားလိုက်လို့ရတယ်။ ဒီပုံစံက application ရဲ့ read path တွေနဲ့ resource competition မလုပ်ရသလို တခြား consumer တွေနဲ့လည်း couple မဖြစ်ပဲ သီးသန့် scale လို့ရတယ်။

ဒါပြီးရင် operational efficiency နဲ့ cost optimization အကြောင်းဆက်ကြမယ်။

ဒီ system မှာအဓိက operational bottleneck ဖြစ်မှာကတော့ Kafka cluster ဖြစ်လိမ့်မယ်။ ဒါကြောင့် automation တွေပါပြီးသား managed service တွေကို စဥ်းစားသင့်တယ်။ ဥပမာ Confluent မှာဆိုရင် auto partitioning (autoscaling partition), elastic scaling (autoscaling broker), storage scaling စတဲ့ feature တွေက out-of-the-box ပါတာမို့လို့ တော်တော်လေး ဝန်ပေါ့သွားတယ်။

နောက်တချက်က တနေ့ 10-15TB ဆိုတဲ့ ပမာဏက disk ပေါ်မှာ ရေရှည်ထားလို့ကောင်းတဲ့ ပမာဏမဟုတ်ပါဘူး။ ဒီတော့ archive လုပ်ပြီး cold storage တွေဆီ ရွှေ့ပစ်ဖို့လုပ်ရမယ်။ Kafka ကဒီ feature အတွက် KIP-405 အောက်မှာ development စခဲ့တာ version 3.9 မှာ GA ဖြစ်သွားပြီမို့လို့ remote storage plugin သုံးပြီး S3 လိုမျိုး Object Storage တွေဆီ log အဟောင်းတွေ ပို့ဖို့လုပ်ထားသင့်တယ်။ စကားမစပ် ဒီ feature ကလည်း Confluent မှာ managed service အနေနဲ့ ပါပြီးသား။

storage ရွေးတဲ့အခါ Kafka က database တွေလို IOPS များများသုံးလို့ ပိုကောင်းသွားတာမျိုးမဟုတ်ပဲ throughput ကိုသာ ပိုလိုအပ်တဲ့အတွက် IOPS အတွက် ပိုက်ဆံမဖြုန်းသင့်ပါဘူး။ AWS ရဲ့ block storage တွေထဲမှာဆိုရင် gp3 ကို throughput ထပ်ထည့်လိုက်တာနဲ့တင် လုံလောက်နေပါပြီ။ အဲဒါကိုမှ performance လိုအပ်ချက် ထပ်ရှိလာရင် io2 ကိုစဥ်းစားလို့ရတယ်။ ပြီးတော့ data တွေကို in-transit မှာရော at-rest မှာပါ secure ဖြစ်စေဖို့ SPIRE (SPIFEE ရဲ့ reference implementation) သုံးပြီးကိုယ့် Kubernetes mesh ရဲ့ boundary ကို broker node တွေဆီ ချဲ့လို့ရသလို disk ပေါ်မှာလည်း AES-256 ကိုသုံးပြီး encrypt လုပ်ထားသင့်တယ်။

နောက်ဆုံးပြောချင်တဲ့အချက်က ဒီလို high-throughput system တခုကို တည်ဆောက်တဲ့အခါ monitoring နဲ့ control plane ရဲ့ အရေးကြီးပုံကို မမေ့ဖို့ပါပဲ။ client ACK, system-of-record ထဲရောက်တဲ့ end-to-end latency, materialized view တွေရဲ့ p95, p99, p999 စတဲ့ tail latency တွေကို မပြတ်စောင့်ကြည့်နေပြီး လိုသလို action ယူနိုင်ဖို့လိုတယ်။ Kafka နဲ့ downstream processor တွေဆီက metrics တွေကို သုံးပြီးတော့လည်း consumer scaling နဲ့ gateway ကနေ backpressure လုပ်ရမယ်။ ဥပမာ Kafka ရဲ့ consumer lag နဲ့ Flink ရဲ့ backpressure time, checkpoint duration စတဲ့ metric တွေကိုကြည့်ပြီး Apache Flink ရဲ့ Task Manager တွေကို Prometheus HPA adapter နဲ့ scale သွားနိုင်ဖို့ လုပ်ထားသင့်တယ်။ နောက်တပုဒ်မှာ downstream processor ကသုံးထားတဲ့ system-of-record အတွက် multi-region database design အကြောင်းဆက်ကြပါမယ်။