Agent Skill
2/7/2026

integration-architect

**Master Skill**: Integration & Event Systems Architect. Covers Distributed Transactions (Sagas), Event Sourcing, Kafka/AMQ Streams engineering, CDC (Debezium), CloudEvents, Schema Registry, and Exactly-Once semantics.

F
fajjarnr
0GitHub Stars
1Views
npx skills add fajjarnr/payu

SKILL.md

Nameintegration-architect
Description**Master Skill**: Integration & Event Systems Architect. Covers Distributed Transactions (Sagas), Event Sourcing, Kafka/AMQ Streams engineering, CDC (Debezium), CloudEvents, Schema Registry, and Exactly-Once semantics.

name: integration-architect version: 2.0.0 maturity: stable updated: 2026-01-30 author: payu-platform-team requires: [core-banking-engineer] tags: [events, kafka, integration, cdc, saga, event-sourcing, debezium] related: [core-banking-engineer, data-architect, platform-engineer] description: Master Skill: Integration & Event Systems Architect. Covers Distributed Transactions (Sagas), Event Sourcing, Kafka/AMQ Streams engineering, CDC (Debezium), CloudEvents, Schema Registry, and Exactly-Once semantics.

๐Ÿ“š Reference Implementation Patterns

For detailed patterns and historical context on PayU integration, see:

PayU Integration Architect Master Skill

You are the Lead Events & Messaging Architect (AI) for the PayU Platform. You design the nervous system of the bank, ensuring ultra-reliable, high-throughput asynchronous communication between microservices using AMQ Streams (Kafka).


โšก 2026 Integration Trends & Standards

  1. Durable Execution (Temporal): Moving beyond pure Kafka Sagas to stateful workflows for high-stakes business logic.
  2. KRaft Mode: All Kafka clusters in 2026 are Zookeeper-free.
  3. CloudEvents 1.0.2: Strict adherence for cross-cloud and cross-language compatibility.
  4. Exactly-Once Semantics (EOS): Default for all financial ledger operations.

๐Ÿ“ฌ AMQ Streams & Kafka Engineering

1. Producer Configuration (Financial Grade)

// KafkaProducerConfig.java
@Configuration
public class KafkaProducerConfig {
    
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        
        // Bootstrap
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrap);
        
        // โœ… CRITICAL: Exactly-Once Semantics for financial data
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "payu-wallet-txn");
        config.put(ProducerConfig.ACKS_CONFIG, "all");  // Wait for all replicas
        
        // Reliability
        config.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        config.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
        
        // Serialization
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        
        return new DefaultKafkaProducerFactory<>(config);
    }
    
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

2. Consumer Configuration (Read Committed)

// KafkaConsumerConfig.java
@Configuration
public class KafkaConsumerConfig {
    
    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrap);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "wallet-service-group");
        
        // โœ… CRITICAL: Only read committed transactions
        config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        
        // Offset management
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        // Performance tuning
        config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
        config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
        
        return new DefaultKafkaConsumerFactory<>(config);
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        
        // Manual ack for reliability
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        
        // Concurrency based on partitions
        factory.setConcurrency(3);
        
        return factory;
    }
}

3. Topic Naming Convention

payu.<domain>.<event-type>.<version>

Examples:
- payu.wallet.transfer-initiated.v1
- payu.wallet.transfer-completed.v1
- payu.wallet.transfer-failed.v1
- payu.transaction.payment-received.v1
- payu.kyc.verification-completed.v1

DLQ Topics:
- payu.wallet.transfer-initiated.v1.dlq

4. Topic Configuration & Deep-Dive Internals

Konfigurasi topik ini dirancang untuk throughput tinggi dengan jaminan data durability nol-kompromi.

# Strimzi KafkaTopic CR
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: payu.wallet.transfer-initiated.v1
  labels:
    strimzi.io/cluster: payu-kafka
spec:
  partitions: 12  # Formula: Max(Consumer Group Parallelism) * Buffer
  replicas: 3     # Standar HA (survive 1 node failure)
  config:
    # DURABILITY
    min.insync.replicas: 2       # Wajib! Producer akan gagal jika hanya 1 replika yang aktif.
    unclean.leader.election.enable: "false" # Jangan pernah promote replica yang lag.
    
    # RETENTION
    retention.ms: 604800000       # 7 hari
    retention.bytes: -1          # Unlimited size (storage bound)
    
    # PERFORMANCE & BATCHING
    segment.bytes: 1073741824    # 1GB log segments
    max.message.bytes: 1048576   # 1MB cap (cegah payload bloating)
    compression.type: lz4        # Best balance CPU vs Size
    
    # CLEANUP (Compact untuk state, Delete untuk events)
    cleanup.policy: delete 

๐Ÿง  Partitioning Strategy Calculator

Jangan menebak jumlah partisi. Gunakan rumus ini:

$$ Partitions = Max(T_p, T_c) $$

  • $T_p$: Target Throughput Producer (MB/s)
  • $T_c$: Target Throughput Consumer (MB/s)

Skenario PayU:

  • Target: 50,000 TPS (Transactions Per Second)
  • Avg Msg Size: 1KB
  • Throughput: 50 MB/s
  • Single Consumer speed: 10 MB/s (heavy processing)
  • Result: Butuh minimal 5 consumer paralel -> set 6 atau 12 partisi (untuk scaling room).

โ˜ฃ๏ธ Poison Pill Handling Strategy

Pesan rusak (malformed JSON) bisa memacetkan consumer selamanya.

Implementasi ErrorHandlingDeserializer:

// Spring Boot Properties
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer

// Dead Letter Publishing (Dlt)
@RetryableTopic(
    attempts = "3",
    backoff = @Backoff(delay = 1000, multiplier = 2.0),
    topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE
)
@KafkaListener(topics = "payu.wallet.transfer")
public void listen(TransferEvent event) {
    // Process
}

๐Ÿ“‹ CloudEvents Standard

Event Envelope Structure

{
  "specversion": "1.0",
  "id": "evt-550e8400-e29b-41d4-a716-446655440000",
  "source": "payu://wallet-service/wallets/wallet-123",
  "type": "com.payu.wallet.transfer.completed.v1",
  "datacontenttype": "application/json",
  "time": "2026-01-30T10:45:00.000Z",
  "subject": "wallet-123",
  "data": {
    "transferId": "txn-456",
    "fromWalletId": "wallet-123",
    "toWalletId": "wallet-789",
    "amount": {
      "value": 500000,
      "currency": "IDR"
    },
    "status": "COMPLETED",
    "completedAt": "2026-01-30T10:45:00.000Z"
  },
  "payutracecontext": "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01",
  "payucorrelationid": "corr-abc-123"
}

Java CloudEvents Implementation

// CloudEvent builder
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;

public CloudEvent createTransferEvent(TransferCompleted transfer) {
    return CloudEventBuilder.v1()
        .withId(UUID.randomUUID().toString())
        .withSource(URI.create("payu://wallet-service/wallets/" + transfer.getFromWalletId()))
        .withType("com.payu.wallet.transfer.completed.v1")
        .withDataContentType("application/json")
        .withTime(OffsetDateTime.now())
        .withSubject(transfer.getFromWalletId())
        .withData(objectMapper.writeValueAsBytes(transfer))
        .withExtension("payutracecontext", traceContext)
        .withExtension("payucorrelationid", correlationId)
        .build();
}

๐Ÿ“Š Schema Registry (Apicurio)

Avro Schema Definition

{
  "type": "record",
  "name": "TransferCompleted",
  "namespace": "com.payu.wallet.events",
  "doc": "Event emitted when a wallet transfer completes successfully",
  "fields": [
    {"name": "transferId", "type": "string"},
    {"name": "fromWalletId", "type": "string"},
    {"name": "toWalletId", "type": "string"},
    {
      "name": "amount",
      "type": {
        "type": "record",
        "name": "Money",
        "fields": [
          {"name": "value", "type": "long", "doc": "Amount in smallest unit (cents)"},
          {"name": "currency", "type": "string", "default": "IDR"}
        ]
      }
    },
    {"name": "status", "type": {"type": "enum", "name": "TransferStatus", "symbols": ["COMPLETED", "REVERSED"]}},
    {"name": "completedAt", "type": {"type": "long", "logicalType": "timestamp-millis"}}
  ]
}

Schema Compatibility Rules

# apicurio-registry-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: apicurio-registry-config
data:
  # Schema compatibility mode
  registry.rules.validity: FULL
  registry.rules.compatibility: BACKWARD_TRANSITIVE
  
  # Schema groups
  # payu-events: BACKWARD (consumers can read old + new)
  # payu-commands: FORWARD (producers can write to old + new)

๐Ÿ”„ Transactional Outbox Pattern

1. Outbox Table Schema

CREATE TABLE outbox_events (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type VARCHAR(100) NOT NULL,
    aggregate_id VARCHAR(100) NOT NULL,
    event_type VARCHAR(100) NOT NULL,
    payload JSONB NOT NULL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    published_at TIMESTAMPTZ,
    
    -- For ordering
    sequence_num BIGSERIAL
);

-- Index for Debezium polling
CREATE INDEX idx_outbox_unpublished 
ON outbox_events(created_at) 
WHERE published_at IS NULL;

2. Transactional Write Pattern

@Service
@Transactional
public class WalletService {
    
    @Autowired private WalletRepository walletRepository;
    @Autowired private OutboxRepository outboxRepository;
    
    public void transfer(TransferRequest request) {
        // 1. Update wallet balance
        Wallet fromWallet = walletRepository.findById(request.getFromWalletId())
            .orElseThrow(() -> new WalletNotFoundException());
        
        fromWallet.debit(request.getAmount());
        walletRepository.save(fromWallet);
        
        // 2. Write to outbox (same transaction!)
        OutboxEvent event = OutboxEvent.builder()
            .aggregateType("Wallet")
            .aggregateId(request.getFromWalletId())
            .eventType("TransferInitiated")
            .payload(objectMapper.writeValueAsString(request))
            .build();
        
        outboxRepository.save(event);
        
        // โœ… Both succeed or both fail (ACID)
    }
}

3. Debezium Outbox Connector

{
  "name": "outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "wallet-db",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "${env:DB_PASSWORD}",
    "database.dbname": "wallet",
    "plugin.name": "pgoutput",
    "slot.name": "outbox_slot",
    "table.include.list": "public.outbox_events",
    
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.table.field.event.id": "id",
    "transforms.outbox.table.field.event.key": "aggregate_id",
    "transforms.outbox.table.field.event.type": "event_type",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.route.topic.replacement": "payu.${routedByValue}.events"
  }
}

๐Ÿ”€ Saga Orchestration

1. Transfer Saga State Machine

public enum TransferSagaState {
    INITIATED,
    SOURCE_DEBITED,
    DESTINATION_CREDITED,
    COMPLETED,
    
    // Compensation states
    DEBIT_COMPENSATION_PENDING,
    CREDIT_COMPENSATION_PENDING,
    COMPENSATED,
    FAILED
}

@Entity
@Table(name = "transfer_sagas")
public class TransferSaga {
    @Id
    private String sagaId;
    
    @Enumerated(EnumType.STRING)
    private TransferSagaState state;
    
    private String fromWalletId;
    private String toWalletId;
    private BigDecimal amount;
    private String currency;
    
    // Audit
    @Version
    private Long version;
    private Instant createdAt;
    private Instant updatedAt;
    
    // Compensation data
    private String debitTransactionId;
    private String creditTransactionId;
}

2. Saga Orchestrator

@Service
public class TransferSagaOrchestrator {
    
    @Transactional
    public void handleDebitCompleted(DebitCompletedEvent event) {
        TransferSaga saga = sagaRepository.findById(event.getSagaId())
            .orElseThrow(() -> new SagaNotFoundException());
        
        if (saga.getState() != TransferSagaState.INITIATED) {
            log.warn("Invalid state transition for saga {}", saga.getSagaId());
            return;
        }
        
        // Store compensation data
        saga.setDebitTransactionId(event.getTransactionId());
        saga.setState(TransferSagaState.SOURCE_DEBITED);
        sagaRepository.save(saga);
        
        // Proceed to credit destination
        CreditCommand command = CreditCommand.builder()
            .sagaId(saga.getSagaId())
            .walletId(saga.getToWalletId())
            .amount(saga.getAmount())
            .build();
        
        kafkaTemplate.send("payu.wallet.commands.v1", command);
    }
    
    @Transactional
    public void handleCreditFailed(CreditFailedEvent event) {
        TransferSaga saga = sagaRepository.findById(event.getSagaId())
            .orElseThrow();
        
        // Start compensation
        saga.setState(TransferSagaState.DEBIT_COMPENSATION_PENDING);
        sagaRepository.save(saga);
        
        // Compensate: reverse the debit
        ReversalCommand reversal = ReversalCommand.builder()
            .sagaId(saga.getSagaId())
            .originalTransactionId(saga.getDebitTransactionId())
            .reason("Credit failed: " + event.getReason())
            .build();
        
        kafkaTemplate.send("payu.wallet.reversals.v1", reversal);
    }
}

3. Compensation Table

StepActionCompensation
1Debit Source WalletCredit Source Wallet (Reversal)
2Credit Destination WalletDebit Destination Wallet (Reversal)

4. Durable Execution (Temporal)

For complex, long-running sagas (e.g., Cross-border transfers taking 2 days), PayU uses Temporal. It provides automatic retries, state persistence, and "infinite" timeouts.

// TransferWorkflowImpl.java
public void executeTransfer(TransferRequest request) {
    try {
        walletActivity.debit(request.from(), request.amount());
        bankActivity.sendWire(request.to(), request.amount());
        notificationActivity.sendSuccess(request.user());
    } catch (Exception e) {
        // Automatic compensation if any activity fails
        walletActivity.credit(request.from(), request.amount());
        throw e;
    }
}

๐Ÿ—๏ธ CDC with Debezium

Full CDC Configuration

{
  "name": "wallet-cdc-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "wallet-db",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "${env:DB_PASSWORD}",
    "database.dbname": "wallet",
    "database.server.name": "payu-wallet",
    
    "plugin.name": "pgoutput",
    "slot.name": "wallet_cdc_slot",
    "publication.name": "wallet_publication",
    
    "table.include.list": "public.wallets,public.ledger_entries",
    "column.exclude.list": "public.wallets.encrypted_data",
    
    "transforms": "route,unwrap",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "payu-wallet.public.(.*)",
    "transforms.route.replacement": "payu.wallet.cdc.$1",
    
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.add.fields": "op,source.ts_ms",
    "transforms.unwrap.delete.handling.mode": "rewrite",
    
    "heartbeat.interval.ms": 10000,
    "snapshot.mode": "initial"
  }
}

๐Ÿ“ˆ Consumer Lag Monitoring

Prometheus Metrics

# prometheus/kafka-alerts.yaml
groups:
  - name: kafka-consumer-alerts
    rules:
      - alert: KafkaConsumerLagHigh
        expr: kafka_consumer_group_lag > 10000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Kafka consumer lag is high"
          description: "Consumer group {{ $labels.group }} has lag {{ $value }} on topic {{ $labels.topic }}"
      
      - alert: KafkaConsumerLagCritical
        expr: kafka_consumer_group_lag > 100000
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "Kafka consumer lag is critical"

Grafana Dashboard Query

# Consumer lag by group
sum by (group, topic) (kafka_consumergroup_lag)

# Messages per second
rate(kafka_topic_partition_current_offset[5m])

# Consumer throughput
rate(kafka_consumer_fetch_manager_records_consumed_total[5m])

๐Ÿ” Event Systems Checklist

Producer

  • acks=all configured for financial data
  • Idempotent producer enabled
  • Transactional ID set for EOS
  • Retry and timeout configured

Consumer

  • isolation.level=read_committed configured
  • Manual acknowledgment implemented
  • DLQ configured for poison pills
  • Consumer lag monitored

Events

  • CloudEvents spec followed
  • Schema registered in Apicurio
  • Backward compatibility verified
  • Message size optimized

Sagas

  • Every step has compensation defined
  • Saga state persisted in database
  • Timeout handling implemented
  • Idempotency keys used

๐Ÿ“š References

Local Reference Files

CategoryTopicFile
Broker SelectionKafka vs Pulsar vs Redpanda comparisonbroker-selection.md
Stream ProcessingFlink vs Spark vs Kafka Streamsprocessor-selection.md
DeliveryAt-least-once, exactly-once patternsdelivery-guarantees.md
Exactly-OnceTransactional processingexactly-once.md
Event SourcingEvent store patternsevent-sourcing.md
CDCDebezium integrationcdc-patterns.md
Error HandlingDLQ, retries, backpressureerror-handling.md
SagasChoreography vs Orchestration patternssaga_patterns.md
Task QueuesBullMQ for TypeScript background jobsbullmq.md
Task QueuesCelery for Python background jobscelery.md
WorkflowsTemporal for durable execution & sagastemporal-workflows.md
MessagingNATS for cloud-native request-replynats.md
MessagingRabbitMQ for complex routingrabbitmq.md
MessagingRedis Streams for simple job queuesredis-streams.md
JavaKafka Java client patternsjava-patterns.md
TypeScriptKafkaJS patternstypescript-patterns.md
Pythonconfluent-kafka-python patternspython-patterns.md
Gokafka-go patternsgo-patterns.md

External Documentation


๐Ÿšจ P19 Audit Status โ€” Integration Gaps (Feb 2026)

CRITICAL: Read .agent/context/P19-AUDIT-STATUS.md for full details. Event-First Architecture: โš ๏ธ PARTIAL โ€” Shared starters exist but are dead code.

Dead Code Alert: Shared Starters NOT Integrated

StarterStatusBuiltConsumersImpact
events-starter (CloudEvents)๐Ÿ”ด Deadโœ…0 servicesServices publish raw Kafka, not CloudEvents
outbox-starter (Transactional Outbox)๐Ÿ”ด Deadโœ…0 servicesFinancial txns can lose events during failures
saga-starter (Saga Orchestration)๐Ÿ”ด Deadโœ…0 servicestransaction-service has raw saga logic instead

What Services Actually Do (vs What Starters Provide)

Current Reality:

  • transaction-service: Publishes Kafka events directly (no outbox, no CloudEvents envelope)
  • wallet-service: Publishes Kafka events directly (same issue)
  • lending-service: No event publishing at all
  • saga-starter: Full saga orchestration engine built but unused โ€” transaction-service has inline saga logic

What SHOULD Happen (Target State):

  1. transaction-service โ†’ Use outbox-starter for atomic event publishing
  2. wallet-service โ†’ Use outbox-starter for balance change events
  3. lending-service โ†’ Use saga-starter for loan disbursement workflow
  4. ALL events โ†’ Use events-starter CloudEvents envelope format

Integration Patterns to Apply

When asked to implement any event publishing:

// โŒ WRONG โ€” Current state (direct Kafka, no atomicity)
@Transactional
public Transfer execute(TransferCommand cmd) {
    Transfer t = processTransfer(cmd);
    transferRepo.save(t);
    kafkaTemplate.send("transfers", t); // NOT atomic! Can lose events
    return t;
}

// โœ… CORRECT โ€” Use outbox-starter (atomic with DB transaction)
@Transactional
public Transfer execute(TransferCommand cmd) {
    Transfer t = processTransfer(cmd);
    transferRepo.save(t);
    outboxRepository.save(OutboxEvent.of("Transfer", t.getId(), "TransferCompleted", t));
    return t;
    // Debezium CDC picks up outbox entry and publishes to Kafka
}

Outbox Starter Integration Steps

See docs/guides/LESSONS.md ยง "Transactional Outbox Pattern Integration" for full guide:

  1. Add outbox-starter dependency to service pom.xml
  2. Include Flyway migration for outbox_events table
  3. Inject OutboxRepository into application services
  4. Replace direct kafkaTemplate.send() with outboxRepository.save()
  5. Configure Debezium CDC connector for the service's database
  6. Write integration test with Testcontainers (PostgreSQL + Kafka + Debezium)

Saga Starter Integration Steps

See docs/guides/LESSONS.md ยง "Saga Orchestration Pattern Integration":

  1. Add saga-starter dependency
  2. Define saga steps as SagaStep<> implementations
  3. Use SagaOrchestrator to execute the saga
  4. Implement compensation logic for each step
  5. Replace inline saga logic in transaction-service

Last Updated: February 2026 (P19 Audit)

Skills Info
Original Name:integration-architectAuthor:fajjarnr