xian-py
xian-py is the external Python SDK for talking to a Xian node from applications, services, wallets, and automation workflows.
If you want shell-first automation instead of Python code, use xian-cli. The xian client ... namespace in xian-cli wraps the same SDK model for command-line usage.
Installation
Base install:
uv add xian-tech-pyThe published PyPI package name is xian-tech-py. The import package remains xian_py.
Optional extras:
uv add "xian-tech-py[hd]" # mnemonic / HD wallet support
uv add "xian-tech-py[eth]" # Ethereum wallet helpersPublic API
The intended top-level imports include:
from xian_py import (
AbciError,
AsyncContractClient,
AsyncEventClient,
AsyncStateKeyClient,
AsyncTokenClient,
BdsStatus,
ContractClient,
DeveloperRewardSummary,
EventClient,
EventProjector,
EventProjectorError,
EventSource,
IndexedBlock,
IndexedEvent,
IndexedTransaction,
LiveEvent,
NodeStatus,
PerformanceStatus,
RetryEvent,
RetryPolicy,
RpcError,
ShieldedOutputTag,
ShieldedRelayerAsyncClient,
ShieldedRelayerAsyncPoolClient,
ShieldedRelayerCatalogEntry,
ShieldedRelayerClient,
ShieldedRelayerInfo,
ShieldedRelayerInfoPolicy,
ShieldedRelayerInfoResult,
ShieldedRelayerJob,
ShieldedRelayerJobResult,
ShieldedRelayerPoolClient,
ShieldedRelayerQuote,
ShieldedRelayerQuoteResult,
SimulationError,
SQLiteProjectionState,
StateKeyClient,
StateEntry,
SubmissionConfig,
TokenBalance,
TokenBalancePage,
TokenClient,
TransactionError,
TransactionReceipt,
TransactionSubmission,
TransportConfig,
TransportError,
TxTimeoutError,
Wallet,
WatcherConfig,
Xian,
XianAsync,
XianClientConfig,
XianException,
indexed_event_sort_key,
merged_event_payload,
run_sync,
to_contract_time,
)HDWallet and EthereumWallet live in xian_py.wallet; they are optional helpers, not part of the top-level API.
Xian and XianAsync require an Ed25519 Xian signer. EthereumWallet is a separate helper for Ethereum-style account workflows and is not valid for signing Xian transactions.
Wallets
Basic Wallet
from xian_py import Wallet
wallet = Wallet()
print(wallet.public_key)
print(wallet.private_key)Restore from an existing private key:
wallet = Wallet(private_key="your_private_key_hex")HD Wallet
from xian_py.wallet import HDWallet
wallet = HDWallet()
print(wallet.mnemonic_str)
child = wallet.get_wallet([0, 0])
print(child.public_key)HD wallet support requires xian-tech-py[hd].
Ethereum Wallet
from xian_py.wallet import EthereumWallet
wallet = EthereumWallet()
print(wallet.address)Ethereum wallet helpers require xian-tech-py[eth].
Synchronous Client
from xian_py import Wallet, Xian
wallet = Wallet()
with Xian("http://127.0.0.1:26657", wallet=wallet) as client:
balance = client.get_balance(wallet.public_key)Constructor parameters:
node_url- optional
chain_id - optional
wallet
If chain_id is omitted, the client fetches it from the node. If you pass chain_id explicitly, it must be a non-empty string.
Xian keeps a persistent background event loop and HTTP session for the life of the client. Prefer using it as a context manager or calling close() when you are done.
Client Configuration
The SDK now exposes explicit config types for transport, retry, submission, and watcher defaults:
from xian_py import (
RetryPolicy,
SubmissionConfig,
TransportConfig,
WatcherConfig,
Xian,
XianClientConfig,
)
config = XianClientConfig(
transport=TransportConfig(total_timeout_seconds=20.0),
retry=RetryPolicy(max_attempts=3, initial_delay_seconds=0.25),
submission=SubmissionConfig(wait_for_tx=True),
watcher=WatcherConfig(poll_interval_seconds=0.5, batch_limit=200),
)
with Xian("http://127.0.0.1:26657", config=config) as client:
status = client.get_node_status()Retry policy applies only to read-side operations such as status queries, ABCI reads, tx lookup, and watcher polling. Transaction broadcasts are not retried automatically.
If you need retry visibility, attach RetryPolicy(on_retry=...). The callback receives a typed RetryEvent with the operation kind, the failed attempt number, the next backoff delay, and the triggering exception.
Async Client
import asyncio
from xian_py import Wallet, XianAsync
async def main():
wallet = Wallet()
async with XianAsync("http://127.0.0.1:26657", wallet=wallet) as client:
return await client.get_balance(wallet.public_key)
asyncio.run(main())Use XianAsync directly inside async code. The sync wrapper intentionally raises if you call it from an already-running event loop.
Common Methods
get_balance
balance = client.get_balance(address=wallet.public_key)
balance = client.get_balance(contract="currency")get_state
get_state takes the contract name, variable name, and zero or more key parts:
value = client.get_state("currency", "balances", wallet.public_key)
allowance = client.get_state("currency", "approvals", wallet.public_key, "con_dex")get_contract
source = client.get_contract("currency")
runtime_code = client.get_contract_code("currency")send_tx
result = client.send_tx(
contract="currency",
function="transfer",
kwargs={"amount": 100, "to": "recipient_public_key"},
chi=50_000,
mode="checktx",
wait_for_tx=True,
)Transaction broadcast modes are explicit:
"async": submit to the node and return immediately"checktx": wait for mempool admission /CheckTx"commit": use CometBFTbroadcast_tx_commit
Returned fields now distinguish the lifecycle:
submittedacceptedfinalizedtx_hashresponsereceipt
The return type is TransactionSubmission, so these values are available as attributes:
result = client.send_tx(...)
print(result.submitted)
print(result.accepted)
print(result.tx_hash)
print(result.receipt)If chi is omitted, the SDK simulates the transaction first and adds a small configurable headroom to the estimated chi usage before submission.
You can set default submission behavior once through XianClientConfig.submission instead of repeating the same options on every call.
send
send is a convenience wrapper for token transfers:
result = client.send(
amount=100,
to_address="recipient_public_key",
token="currency",
mode="checktx",
wait_for_tx=True,
)approve
approve is a convenience wrapper that approves another contract to spend a token on behalf of the wallet:
result = client.approve(
contract="con_dex",
token="currency",
amount=1_000,
mode="checktx",
)get_approved_amount
amount = client.get_approved_amount("con_dex", token="currency")simulate
result = client.simulate(
contract="currency",
function="transfer",
kwargs={"amount": 100, "to": "recipient_public_key"},
)
print(result["status"])
print(result["chi_used"])
print(result["state"])The dry-run result currently comes from the node simulator and uses:
statuschi_usedstateresultpayload
The simulator is a readonly preview, not a consensus transaction. If you expose it to end users, treat it as free but rate-limited compute at the infrastructure layer rather than an unrestricted public endpoint.
Node operators can also refuse or cap simulations through simulation_enabled, simulation_max_concurrency, simulation_timeout_ms, and simulation_max_chi, so client code should expect structured failures when a node disables or limits the simulator.
call
Use call when you want the decoded readonly contract return value instead of the raw simulation envelope:
proposal = client.call(
"con_registry_approval",
"get_proposal",
{"proposal_id": 1},
)call runs through the same readonly simulation path, but it unwraps the successful return value and raises if the readonly execution itself fails.
submit_contract
code = """
counter = Variable()
@construct
def seed():
counter.set(0)
@export
def increment() -> int:
counter.set(counter.get() + 1)
return counter.get()
"""
result = client.submit_contract(
name="con_counter",
code=code,
args={},
chi=500_000,
)name must use lowercase ASCII letters, digits, and underscores only. For user contracts, keep the standard con_ prefix.
Other Helpers
Also available:
get_tx(tx_hash)wait_for_tx(tx_hash)refresh_nonce()estimate_chi(contract, function, kwargs)get_nodes()get_genesis()get_chain_id()get_node_status()get_perf_status()get_bds_status()get_developer_rewards(recipient_key)get_token_balances(address=None, limit=..., offset=..., include_zero=False)list_blocks(limit=..., offset=...)get_block(height)get_block_by_hash(block_hash)get_indexed_tx(tx_hash)list_txs_for_block(block_ref)list_txs_by_sender(sender, limit=..., offset=...)list_txs_by_contract(contract, limit=..., offset=...)list_shielded_wallet_history(tag_value, kind=..., limit=..., after_note_index=...)list_shielded_output_tags(tag_value, kind=..., limit=..., offset=..., after_id=...)get_events_for_tx(tx_hash)list_events(contract, event, limit=..., offset=..., after_id=...)get_state_history(key, limit=..., offset=...)get_state_for_tx(tx_hash)get_state_for_block(block_ref)watch_blocks(start_height=..., poll_interval_seconds=...)watch_events(contract, event, after_id=..., limit=..., poll_interval_seconds=...)watch_live_events(contract, event, poll_interval_seconds=...)
get_developer_rewards(recipient_key) uses the BDS aggregate query surface and returns the cumulative indexed developer_reward total for that recipient, along with reward row count, distinct transaction count, distinct contract count, and first/last indexed reward metadata. The contract count here is the count of distinct source contracts that actually earned developer rewards for that recipient, including called contracts when a transaction spans multiple contracts.
summary = client.get_developer_rewards("alice")
print(summary.total_rewards, summary.tx_count, summary.contract_count)get_tx(tx_hash) and wait_for_tx(tx_hash) now return a TransactionReceipt that exposes the two important pieces separately:
result.txis the original submitted transactionresult.tx_result.datais the decoded execution output- for convenience,
xian-pyalso surfaces these as typed attributes:receipt.transactionandreceipt.execution
wait_for_tx(tx_hash) first uses the normal node /tx lookup path. If that index lags briefly on a live node, xian-py now falls back to recent block inspection so a just-finalized transaction can still be recovered by hash.
get_bds_status() returns a typed BdsStatus model. The main high-signal fields are indexed_height, current_block_height, height_lag, catching_up, spool_pending_count, and alerts.
get_token_balances(address=None, ...) returns the BDS-backed token portfolio for an address. If address is omitted, the client uses the active wallet address. Set include_zero=True when you need zero-balance indexed tokens too.
list_shielded_wallet_history(tag_value, ...) is the higher-level shielded light-wallet recovery feed. It returns the canonical note-commitment sequence in note-index order and only exposes output_payload for outputs whose indexed tag matches the requested wallet tag. Use after_note_index as the resumable cursor.
list_shielded_output_tags(tag_value, ...) exposes the lower-level tagged output index directly. Prefer list_shielded_wallet_history(...) for wallet sync and recovery unless you specifically need tag-index rows.
Shielded Relayer Clients
Use the dedicated relayer clients when you are working with proof-bound shielded submission flows instead of hand-rolling raw HTTP calls:
from xian_py import ShieldedRelayerAsyncClient
async with ShieldedRelayerAsyncClient(
"http://127.0.0.1:8090",
auth_token="secret",
) as relayer:
info = await relayer.get_info()
quote = await relayer.get_quote(
kind="shielded_note_relay_transfer",
contract="con_shielded_note_token",
)Use ShieldedRelayerAsyncPoolClient or ShieldedRelayerPoolClient when you have a catalog of candidate relayers and want priority-ordered failover across them.
If you inject your own aiohttp session, the SDK leaves that session under caller ownership. If the relayer client creates the session itself, close() or the async context manager cleans it up.
Watching Blocks And Events
xian-py now includes polling-based watcher helpers for long-running application processes.
watch_blocks
watch_blocks uses raw node RPC and does not require BDS:
async for block in client.watch_blocks(start_height=101):
print(block.height, block.tx_count)If start_height is omitted, the watcher begins at the next block after the current node head. Persist the last seen height if you want resumable block consumers.
The default poll interval comes from XianClientConfig.watcher.
watch_events
watch_events uses the indexed BDS event surface and a stable event cursor:
async for event in client.watch_events(
"currency",
"Transfer",
after_id=500,
):
print(event.id, event.tx_hash, event.data)Resume by storing the last seen event id and passing it back as after_id.
Event watching requires BDS to be enabled on the node because the event stream comes from indexed reads rather than direct raw state queries.
For raw live WebSocket delivery without the indexed catch-up cursor, use watch_live_events(contract, event, ...). Application workers that need resumability should prefer watch_events(...).
The default watcher batch size and poll interval come from XianClientConfig.watcher.
Application Helper Clients
xian-py now includes thin helper clients that reduce repetitive application boilerplate without hiding the underlying network model.
Available factories:
client.contract("name")client.token("currency")client.events("contract", "EventName")client.state_key("contract", "variable", *keys)
These work on both Xian and XianAsync.
Contract Client
ledger = client.contract("con_ledger")
await ledger.send("add_entry", account="alice", amount=5)
balance = await ledger.get_state("balances", "alice")
history = await ledger.state_key("balances", "alice").history(limit=20)The contract client keeps the contract name fixed and lets you focus on the function call or state path you actually care about.
It also exposes call(...) for authoritative readonly hydration when the contract returns structured objects.
Token Client
currency = client.token()
balance = await currency.balance_of()
await currency.transfer("bob", 10)
await currency.approve("con_dex", amount=100)The token client is just a thin layer over the existing currency-style helper methods, but it keeps the token contract fixed and provides a cleaner application-facing shape.
Event Client
transfers = client.events("currency", "Transfer")
recent = transfers.list(after_id=500, limit=50)You can also watch from the same fixed event source:
async for transfer in transfers.watch(after_id=500):
print(transfer.data)State Key Client
balance_key = client.state_key("currency", "balances", "alice")
current = balance_key.get()
history = balance_key.history(limit=20)This is useful when an application works with one exact state key repeatedly and wants both the current value and the indexed history view.
Reusable Projector Primitives
xian-py now also includes a small reusable layer for event-driven read models and projector workers.
Available primitives:
merged_event_payload(event): merge BDSdata_indexedanddataonce at the SDK boundarySQLiteProjectionState: persist integer cursors in SQLite-backed read modelsEventSource: describe one indexed event streamEventProjector: poll one or more indexed event streams, sort events deterministically, optionally hydrate authoritative state, and call your apply handlerEventProjectorError: surface hydrate/apply failures together with the failing event context
These primitives are intentionally thin. The SDK owns the repetitive event polling, ordering, and cursor plumbing; application code still owns its own projection tables, hydration strategy, and domain-specific apply logic.
The three deeper reference-app slices now build on these shared primitives instead of each carrying their own event-loop and cursor implementation.
Service Integration Examples
The xian-py repo now includes application-facing examples under examples/ that show how the SDK fits into ordinary backend workflows.
It also includes the first solution example set under examples/credits_ledger/, which turns the generic SDK primitives into a concrete credits-ledger backend pattern.
It also includes examples/registry_approval/, which turns the same SDK primitives into a shared registry with proposal and approval flow.
It also includes examples/workflow_backend/, which turns the SDK into a job-style workflow backend with a service/write path and an event-driven worker.
FastAPI Service
examples/fastapi_service.py shows an async API service shape with:
- shared
XianAsynclifecycle management - typed node health reads
- token balance reads
- indexed transfer reads
- token transfer submission
Typical run:
uv run uvicorn examples.fastapi_service:app --reload --app-dir .This example expects normal app dependencies such as fastapi and uvicorn; install them with the SDK app extra:
uv sync --group dev --extra appEvent Worker
examples/event_worker.py shows a resumable background worker that:
- watches indexed BDS events
- stores the last seen
after_idcursor locally - resumes cleanly after restart
Typical run:
uv run python examples/event_worker.pyAdmin / Automation Job
examples/admin_job.py shows a synchronous operator-facing automation task that reads:
- node health
- peer count
- performance status
- optional BDS lag
and exits nonzero when a configured threshold is violated.
Typical run:
uv run python examples/admin_job.pyCredits Ledger Solution Examples
examples/credits_ledger/ adds a solution-specific set of examples for the first credits-ledger workflow and the first deeper reference-app slice:
admin_job.py: bootstrap or administercon_credits_ledgerapi_service.py: expose authoritative balances plus projected activity and summary views through a small FastAPI serviceprojector_worker.py: rebuild a local SQLite read model from indexedIssue,Transfer, andBurneventsevent_worker.py: compatibility wrapper aroundprojector_worker.py
Typical runs:
uv sync --group dev --extra app
uv run python examples/credits_ledger/admin_job.py
uv run uvicorn examples.credits_ledger.api_service:app --reload --app-dir .
uv run python examples/credits_ledger/projector_worker.pyThis is the first example set that demonstrates the full backend pattern:
- Xian as the authoritative ledger
- BDS-backed indexed events as the integration feed
- a local projected read model for application-specific queries
Registry / Approval Solution Examples
examples/registry_approval/ adds the second reference solution and the second deeper reference-app slice:
admin_job.py: deploycon_registry_recordsandcon_registry_approval, configure signers, top up approvers with native balance for the reference flow, and submit an initial proposalapi_service.py: combine authoritative proposal/record reads with projected workflow views for pending approvals and audit activityprojector_worker.py: rebuild a local SQLite workflow projection from indexed events and hydrate rich proposal/record state from authoritative contract readsevent_worker.py: compatibility wrapper aroundprojector_worker.py
Typical runs:
uv sync --group dev --extra app
uv run python examples/registry_approval/admin_job.py
uv run uvicorn examples.registry_approval.api_service:app --reload --app-dir .
uv run python examples/registry_approval/projector_worker.pyThis is the second example set that demonstrates the deeper backend pattern:
- indexed events as workflow triggers
- authoritative decoded readonly contract calls as hydration
- a local projected read model for application-oriented approval queries
Workflow Backend Solution Examples
examples/workflow_backend/ adds the third reference solution and the third deeper reference-app slice:
admin_job.py: deploycon_job_workflow, add workers, and optionally submit an initial workflow itemapi_service.py: combine authoritative item reads with projected queue and workflow activity viewsprocessor_worker.py: claim submitted items and complete or fail themprojector_worker.py: rebuild a local SQLite queue/activity projection from indexed events and authoritativeget_itemreadsevent_worker.py: compatibility wrapper aroundprocessor_worker.py
Typical runs:
uv sync --group dev --extra app
uv run python examples/workflow_backend/admin_job.py
uv run uvicorn examples.workflow_backend.api_service:app --reload --app-dir .
uv run python examples/workflow_backend/processor_worker.py
uv run python examples/workflow_backend/projector_worker.pyThis is the third example set that demonstrates the deeper backend pattern:
- a separate processor and projector worker model
- indexed events as workflow and projection triggers
- authoritative decoded readonly
get_itemcalls for projection hydration - a local projected read model for queue state and workflow activity
The workflow bootstrap now also tops up configured workers with native balance by default so the documented processor path can actually claim and complete items in local/reference networks.
Structured Errors
The SDK now exposes more precise error classes:
TransportErrorRpcErrorAbciErrorSimulationErrorTransactionErrorTxTimeoutError
All of them inherit from XianException.
See the repo README for package-level development and compatibility notes.