This chapter describes configuring the Oracle Database components that are used by XStream. This chapter also includes sample client applications that communicate with an XStream outbound server and inbound server.
This chapter contains these topics:
See Also:
This section describes a sample XStream client application. This application illustrates the basic tasks that are required of an XStream Out and XStream In application.
The application performs the following tasks:
It attaches to an XStream outbound server and inbound server and waits for LCRs from the outbound server. The outbound server and inbound server are in two different databases.
When it receives an LCR from the outbound server, it immediately sends the LCR to the inbound server.
It periodically gets the processed low position from the inbound server and sends this value to the outbound server.
It periodically sends a "ping" LCR from the outbound server to the inbound server to move the inbound server's processed low position forward in times of low activity.
In an XStream Out configuration that does not send LCRs to an inbound server, the client application must obtain the processed low position in another way.
This application waits indefinitely for transactions from the outbound server. To interrupt the application, enter the interrupt command for your operating system. For example, the interrupt command on some operating systems is control-C
. If the program is restarted, then the outbound server starts sending LCRs from the processed low position that was set during the previous run.
Figure A-1 provides an overview of the XStream environment configured in this section.
Before running the sample application, ensure that the following components exist:
Two Oracle databases with network connectivity between them
An XStream administrator on both databases
An outbound server configuration on one database, including a capture process, queue, and outbound server
An inbound server configuration on another database
If you are running the sample application with a multitenant container database (CDB), then ensure that the client application connects to the correct container:
When the client application connects to the outbound server, it must connect to the root.
When the client application connects to the inbound server, it must connect to the container in which the inbound server was created.
The sample applications in the following sections perform the same tasks. One sample application uses the OCI API, and the other uses the Java API.
To run the sample XStream client application for the OCI API, compile and link the application file, and enter the following on a command line:
xio -ob_svr xout_name -ob_db sn_xout_db -ob_usr xout_cu -ob_pwd xout_cu_pass -ib_svr xin_name -ib_db sn_xin_db -ib_usr xin_au -ib_pwd xin_au_pass
Substitute the appropriate values for the following placeholders:
xout_name is the name of the outbound server.
sn_xout_db is the service name for the outbound server's database.
xout_cu is the outbound server's connect user.
xout_cu_pass is the password for the outbound server's connect user.
xin_name is the name of the inbound server.
sn_xin_db is the service name for the inbound server's database.
xin_au is the inbound server's apply user.
xin_au_pass is the password for the inbound server's apply user.
When the sample client application is running, it prints information about the row LCRs it is processing. The output looks similar to the following:
----------- ROW LCR Header ----------------- src_db_name=DB.EXAMPLE.COM cmd_type=UPDATE txid=17.0.74 owner=HR oname=COUNTRIES ----------- ROW LCR Header ----------------- src_db_name=DB.EXAMPLE.COM cmd_type=COMMIT txid=17.0.74 ----------- ROW LCR Header ----------------- src_db_name=DB.EXAMPLE.COM cmd_type=UPDATE txid=12.25.77 owner=OE oname=ORDERS ----------- ROW LCR Header ----------------- src_db_name=DB.EXAMPLE.COM cmd_type=UPDATE txid=12.25.77 owner=OE oname=ORDERS
This output contains the following information for each row LCR:
src_db_name
shows the source database for the change encapsulated in the row LCR.
cmd_type
shows the type of SQL statement that made the change.
txid
shows the transaction ID of the transaction that includes the row LCR.
owner
shows the owner of the database object that was changed.
oname
shows the name of the database object that was changed.
This demo is available in the following location:
$ORACLE_HOME/rdbms/demo/xstream/oci
The file name for the demo is xio.c
. See the README.txt
file in the demo directory for more information about compiling and running the application.
The code for the sample application that uses the OCI API follows:
#ifndef OCI_ORACLE #include <oci.h> #endif #ifndef _STDIO_H #include <stdio.h> #endif #ifndef _STDLIB_H #include <stdlib.h> #endif #ifndef _STRING_H #include <string.h> #endif #ifndef _MALLOC_H #include <malloc.h> #endif /*---------------------------------------------------------------------- * Internal structures *----------------------------------------------------------------------*/ #define M_DBNAME_LEN (128) typedef struct conn_info /* connect info */ { oratext * user; ub4 userlen; oratext * passw; ub4 passwlen; oratext * dbname; ub4 dbnamelen; oratext * svrnm; ub4 svrnmlen; } conn_info_t; typedef struct params { conn_info_t xout; /* outbound info */ conn_info_t xin; /* inbound info */ } params_t; typedef struct oci /* OCI handles */ { OCIEnv *envp; /* Environment handle */ OCIError *errp; /* Error handle */ OCIServer *srvp; /* Server handle */ OCISvcCtx *svcp; /* Service handle */ OCISession *authp; OCIStmt *stmtp; boolean attached; boolean outbound; } oci_t; static void connect_db(conn_info_t *opt_params_p, oci_t ** ocip, ub2 char_csid, ub2 nchar_csid); static void disconnect_db(oci_t * ocip); static void ocierror(oci_t * ocip, char * msg); static void attach(oci_t * ocip, conn_info_t *conn, boolean outbound); static void detach(oci_t *ocip); static void get_lcrs(oci_t *xin_ocip, oci_t *xout_ocip); static void get_chunks(oci_t *xin_ocip, oci_t *xout_ocip); static void print_lcr(oci_t *ocip, void *lcrp, ub1 lcrtype, oratext **src_db_name, ub2 *src_db_namel); static void print_chunk (ub1 *chunk_ptr, ub4 chunk_len, ub2 dty); static void get_inputs(conn_info_t *xout_params, conn_info_t *xin_params, int argc, char ** argv); static void get_db_charsets(conn_info_t *params_p, ub2 *char_csid, ub2 *nchar_csid); static void set_client_charset(oci_t *outbound_ocip); #define OCICALL(ocip, function) do {\ sword status=function;\ if (OCI_SUCCESS==status) break;\ else if (OCI_ERROR==status) \ {ocierror(ocip, (char *)"OCI_ERROR");\ exit(1);}\ else {printf("Error encountered %d\n", status);\ exit(1);}\ } while(0) /*--------------------------------------------------------------------- * M A I N P R O G R A M *---------------------------------------------------------------------*/ main(int argc, char **argv) { /* Outbound and inbound connection info */ conn_info_t xout_params; conn_info_t xin_params; oci_t *xout_ocip = (oci_t *)NULL; oci_t *xin_ocip = (oci_t *)NULL; ub2 obdb_char_csid = 0; /* outbound db char csid */ ub2 obdb_nchar_csid = 0; /* outbound db nchar csid */ /* parse command line arguments */ get_inputs(&xout_params, &xin_params, argc, argv); /* Get the outbound database CHAR and NCHAR character set info */ get_db_charsets(&xout_params, &obdb_char_csid, &obdb_nchar_csid); /* Connect to the outbound db and set the client env to the outbound charsets * to minimize character conversion when transferring LCRs from outbound * directly to inbound server. */ connect_db(&xout_params, &xout_ocip, obdb_char_csid, obdb_nchar_csid); /* Attach to outbound server */ attach(xout_ocip, &xout_params, TRUE); /* connect to inbound db and set the client charsets the same as the * outbound db charsets. */ connect_db(&xin_params, &xin_ocip, obdb_char_csid, obdb_nchar_csid); /* Attach to inbound server */ attach(xin_ocip, &xin_params, FALSE); /* Get lcrs from outbound server and send to inbound server */ get_lcrs(xin_ocip, xout_ocip); /* Detach from XStream servers */ detach(xout_ocip); detach(xin_ocip); /* Disconnect from both databases */ disconnect_db(xout_ocip); disconnect_db(xin_ocip); free(xout_ocip); free(xin_ocip); exit (0); } /*--------------------------------------------------------------------- * connect_db - Connect to the database and set the env to the given * char and nchar character set ids. *---------------------------------------------------------------------*/ static void connect_db(conn_info_t *params_p, oci_t **ociptr, ub2 char_csid, ub2 nchar_csid) { oci_t *ocip; printf ("Connect to Oracle as %.*s@%.*s ", params_p->userlen, params_p->user, params_p->dbnamelen, params_p->dbname); if (char_csid && nchar_csid) printf ("using char csid=%d and nchar csid=%d", char_csid, nchar_csid); printf("\n"); ocip = (oci_t *)malloc(sizeof(oci_t)); if (OCIEnvNlsCreate(&ocip->envp, OCI_OBJECT, (dvoid *)0, (dvoid * (*)(dvoid *, size_t)) 0, (dvoid * (*)(dvoid *, dvoid *, size_t))0, (void (*)(dvoid *, dvoid *)) 0, (size_t) 0, (dvoid **) 0, char_csid, nchar_csid)) { ocierror(ocip, (char *)"OCIEnvCreate() failed"); } if (OCIHandleAlloc((dvoid *) ocip->envp, (dvoid **) &ocip->errp, (ub4) OCI_HTYPE_ERROR, (size_t) 0, (dvoid **) 0)) { ocierror(ocip, (char *)"OCIHandleAlloc(OCI_HTYPE_ERROR) failed"); } /* Logon to database */ OCICALL(ocip, OCILogon(ocip->envp, ocip->errp, &ocip->svcp, params_p->user, params_p->userlen, params_p->passw, params_p->passwlen, params_p->dbname, params_p->dbnamelen)); /* allocate the server handle */ OCICALL(ocip, OCIHandleAlloc((dvoid *) ocip->envp, (dvoid **) &ocip->srvp, OCI_HTYPE_SERVER, (size_t) 0, (dvoid **) 0)); OCICALL(ocip, OCIHandleAlloc((dvoid *) ocip->envp, (dvoid **) &ocip->stmtp, (ub4) OCI_HTYPE_STMT, (size_t) 0, (dvoid **) 0)); if (*ociptr == (oci_t *)NULL) { *ociptr = ocip; } } /*--------------------------------------------------------------------- * get_db_charsets - Get the database CHAR and NCHAR character set ids. *---------------------------------------------------------------------*/ static const oratext GET_DB_CHARSETS[] = \ "select parameter, value from nls_database_parameters where parameter = \ 'NLS_CHARACTERSET' or parameter = 'NLS_NCHAR_CHARACTERSET'"; #define PARM_BUFLEN (30) static void get_db_charsets(conn_info_t *params_p, ub2 *char_csid, ub2 *nchar_csid) { OCIDefine *defnp1 = (OCIDefine *) NULL; OCIDefine *defnp2 = (OCIDefine *) NULL; oratext parm[PARM_BUFLEN]; oratext value[OCI_NLS_MAXBUFSZ]; ub2 parm_len = 0; ub2 value_len = 0; oci_t ocistruct; oci_t *ocip = &ocistruct; *char_csid = 0; *nchar_csid = 0; memset (ocip, 0, sizeof(ocistruct)); if (OCIEnvCreate(&ocip->envp, OCI_OBJECT, (dvoid *)0, (dvoid * (*)(dvoid *, size_t)) 0, (dvoid * (*)(dvoid *, dvoid *, size_t))0, (void (*)(dvoid *, dvoid *)) 0, (size_t) 0, (dvoid **) 0)) { ocierror(ocip, (char *)"OCIEnvCreate() failed"); } if (OCIHandleAlloc((dvoid *) ocip->envp, (dvoid **) &ocip->errp, (ub4) OCI_HTYPE_ERROR, (size_t) 0, (dvoid **) 0)) { ocierror(ocip, (char *)"OCIHandleAlloc(OCI_HTYPE_ERROR) failed"); } OCICALL(ocip, OCILogon(ocip->envp, ocip->errp, &ocip->svcp, params_p->user, params_p->userlen, params_p->passw, params_p->passwlen, params_p->dbname, params_p->dbnamelen)); OCICALL(ocip, OCIHandleAlloc((dvoid *) ocip->envp, (dvoid **) &ocip->stmtp, (ub4) OCI_HTYPE_STMT, (size_t) 0, (dvoid **) 0)); /* Execute stmt to select the db nls char and nchar character set */ OCICALL(ocip, OCIStmtPrepare(ocip->stmtp, ocip->errp, (CONST text *)GET_DB_CHARSETS, (ub4)strlen((char *)GET_DB_CHARSETS), (ub4)OCI_NTV_SYNTAX, (ub4)OCI_DEFAULT)); OCICALL(ocip, OCIDefineByPos(ocip->stmtp, &defnp1, ocip->errp, (ub4) 1, parm, PARM_BUFLEN, SQLT_CHR, (void*) 0, &parm_len, (ub2 *)0, OCI_DEFAULT)); OCICALL(ocip, OCIDefineByPos(ocip->stmtp, &defnp2, ocip->errp, (ub4) 2, value, OCI_NLS_MAXBUFSZ, SQLT_CHR, (void*) 0, &value_len, (ub2 *)0, OCI_DEFAULT)); OCICALL(ocip, OCIStmtExecute(ocip->svcp, ocip->stmtp, ocip->errp, (ub4)0, (ub4)0, (const OCISnapshot *)0, (OCISnapshot *)0, (ub4)OCI_DEFAULT)); while (OCIStmtFetch(ocip->stmtp, ocip->errp, 1, OCI_FETCH_NEXT, OCI_DEFAULT) == OCI_SUCCESS) { value[value_len] = '\0'; if (parm_len == strlen("NLS_CHARACTERSET") && !memcmp(parm, "NLS_CHARACTERSET", parm_len)) { *char_csid = OCINlsCharSetNameToId(ocip->envp, value); printf("Outbound database NLS_CHARACTERSET = %.*s (csid = %d) \n", value_len, value, *char_csid); } else if (parm_len == strlen("NLS_NCHAR_CHARACTERSET") && !memcmp(parm, "NLS_NCHAR_CHARACTERSET", parm_len)) { *nchar_csid = OCINlsCharSetNameToId(ocip->envp, value); printf("Outbound database NLS_NCHAR_CHARACTERSET = %.*s (csid = %d) \n", value_len, value, *nchar_csid); } } disconnect_db(ocip); } /*--------------------------------------------------------------------- * attach - Attach to XStream server specified in connection info *---------------------------------------------------------------------*/ static void attach(oci_t * ocip, conn_info_t *conn, boolean outbound) { sword err; printf ("Attach to XStream %s server '%.*s'\n", outbound ? "outbound" : "inbound", conn->svrnmlen, conn->svrnm); if (outbound) { OCICALL(ocip, OCIXStreamOutAttach(ocip->svcp, ocip->errp, conn->svrnm, (ub2)conn->svrnmlen, (ub1 *)0, 0, OCI_DEFAULT)); } else { OCICALL(ocip, OCIXStreamInAttach(ocip->svcp, ocip->errp, conn->svrnm, (ub2)conn->svrnmlen, (oratext *)"From_XOUT", 9, (ub1 *)0, 0, OCI_DEFAULT)); } ocip->attached = TRUE; ocip->outbound = outbound; } /*--------------------------------------------------------------------- * ping_svr - Ping inbound server by sending a commit LCR. *---------------------------------------------------------------------*/ static void ping_svr(oci_t *xin_ocip, void *commit_lcr, ub1 *cmtpos, ub2 cmtpos_len, oratext *source_db, ub2 source_db_len) { OCIDate src_time; oratext txid[128]; OCICALL(xin_ocip, OCIDateSysDate(xin_ocip->errp, &src_time)); sprintf((char *)txid, "Ping %2d:%2d:%2d", src_time.OCIDateTime.OCITimeHH, src_time.OCIDateTime.OCITimeMI, src_time.OCIDateTime.OCITimeSS); /* Initialize LCR with new txid and commit position */ OCICALL(xin_ocip, OCILCRHeaderSet(xin_ocip->svcp, xin_ocip->errp, source_db, source_db_len, (oratext *)OCI_LCR_ROW_CMD_COMMIT, (ub2)strlen(OCI_LCR_ROW_CMD_COMMIT), (oratext *)0, 0, /* null owner */ (oratext *)0, 0, /* null object */ (ub1 *)0, 0, /* null tag */ txid, (ub2)strlen((char *)txid), &src_time, cmtpos, cmtpos_len, 0, commit_lcr, OCI_DEFAULT)); /* Send commit lcr to inbound server. */ if (OCIXStreamInLCRSend(xin_ocip->svcp, xin_ocip->errp, commit_lcr, OCI_LCR_XROW, 0, OCI_DEFAULT) == OCI_ERROR) { ocierror(xin_ocip, (char *)"OCIXStreamInLCRSend failed in ping_svr()"); } } /*--------------------------------------------------------------------- * get_lcrs - Get LCRs from outbound server and send to inbound server. *---------------------------------------------------------------------*/ static void get_lcrs(oci_t *xin_ocip, oci_t *xout_ocip) { sword status = OCI_SUCCESS; void *lcr; ub1 lcrtype; oraub8 flag; ub1 proclwm[OCI_LCR_MAX_POSITION_LEN]; ub2 proclwm_len = 0; ub1 sv_pingpos[OCI_LCR_MAX_POSITION_LEN]; ub2 sv_pingpos_len = 0; ub1 fetchlwm[OCI_LCR_MAX_POSITION_LEN]; ub2 fetchlwm_len = 0; void *commit_lcr = (void *)0; oratext *lcr_srcdb = (oratext *)0; ub2 lcr_srcdb_len = 0; oratext source_db[M_DBNAME_LEN]; ub2 source_db_len = 0; ub4 lcrcnt = 0; /* create an lcr to ping the inbound server periodically by sending a * commit lcr. */ commit_lcr = (void*)0; OCICALL(xin_ocip, OCILCRNew(xin_ocip->svcp, xin_ocip->errp, OCI_DURATION_SESSION, OCI_LCR_XROW, &commit_lcr, OCI_DEFAULT)); while (status == OCI_SUCCESS) { lcrcnt = 0; /* reset lcr count before each batch */ while ((status = OCIXStreamOutLCRReceive(xout_ocip->svcp, xout_ocip->errp, &lcr, &lcrtype, &flag, fetchlwm, &fetchlwm_len, OCI_DEFAULT)) == OCI_STILL_EXECUTING) { lcrcnt++; /* print header of LCR just received */ print_lcr(xout_ocip, lcr, lcrtype, &lcr_srcdb, &lcr_srcdb_len); /* save the source db to construct ping lcr later */ if (!source_db_len && lcr_srcdb_len) { memcpy(source_db, lcr_srcdb, lcr_srcdb_len); source_db_len = lcr_srcdb_len; } /* send the LCR just received */ if (OCIXStreamInLCRSend(xin_ocip->svcp, xin_ocip->errp, lcr, lcrtype, flag, OCI_DEFAULT) == OCI_ERROR) { ocierror(xin_ocip, (char *)"OCIXStreamInLCRSend failed"); } /* If LCR has chunked columns (i.e, has LOB/Long/XMLType columns) */ if (flag & OCI_XSTREAM_MORE_ROW_DATA) { /* receive and send chunked columns */ get_chunks(xin_ocip, xout_ocip); } } if (status == OCI_ERROR) ocierror(xout_ocip, (char *)"OCIXStreamOutLCRReceive failed"); /* clear the saved ping position if we just received some new lcrs */ if (lcrcnt) { sv_pingpos_len = 0; } /* If no lcrs received during previous WHILE loop and got a new fetch * LWM then send a commit lcr to ping the inbound server with the new * fetch LWM position. */ else if (fetchlwm_len > 0 && source_db_len > 0 && (fetchlwm_len != sv_pingpos_len || memcmp(sv_pingpos, fetchlwm, fetchlwm_len))) { /* To ensure we don't send multiple lcrs with duplicate position, send * a new ping only if we have saved the last ping position. */ if (sv_pingpos_len > 0) { ping_svr(xin_ocip, commit_lcr, fetchlwm, fetchlwm_len, source_db, source_db_len); } /* save the position just sent to inbound server */ memcpy(sv_pingpos, fetchlwm, fetchlwm_len); sv_pingpos_len = fetchlwm_len; } /* flush inbound network to flush all lcrs to inbound server */ OCICALL(xin_ocip, OCIXStreamInFlush(xin_ocip->svcp, xin_ocip->errp, OCI_DEFAULT)); /* get processed LWM of inbound server */ OCICALL(xin_ocip, OCIXStreamInProcessedLWMGet(xin_ocip->svcp, xin_ocip->errp, proclwm, &proclwm_len, OCI_DEFAULT)); if (proclwm_len > 0) { /* Set processed LWM for outbound server */ OCICALL(xout_ocip, OCIXStreamOutProcessedLWMSet(xout_ocip->svcp, xout_ocip->errp, proclwm, proclwm_len, OCI_DEFAULT)); } } if (status != OCI_SUCCESS) ocierror(xout_ocip, (char *)"get_lcrs() encounters error"); } /*--------------------------------------------------------------------- * get_chunks - Get each chunk for the current LCR and send it to * the inbound server. *---------------------------------------------------------------------*/ static void get_chunks(oci_t *xin_ocip, oci_t *xout_ocip) { oratext *colname; ub2 colname_len; ub2 coldty; oraub8 col_flags; ub2 col_csid; ub4 chunk_len; ub1 *chunk_ptr; oraub8 row_flag; sword err; sb4 rtncode; do { /* Get a chunk from outbound server */ OCICALL(xout_ocip, OCIXStreamOutChunkReceive(xout_ocip->svcp, xout_ocip->errp, &colname, &colname_len, &coldty, &col_flags, &col_csid, &chunk_len, &chunk_ptr, &row_flag, OCI_DEFAULT)); /* print chunked column info */ printf( " Chunked column name=%.*s DTY=%d chunk len=%d csid=%d col_flag=0x%lx\n", colname_len, colname, coldty, chunk_len, col_csid, col_flags); /* print chunk data */ print_chunk(chunk_ptr, chunk_len, coldty); /* Send the chunk just received to inbound server */ OCICALL(xin_ocip, OCIXStreamInChunkSend(xin_ocip->svcp, xin_ocip->errp, colname, colname_len, coldty, col_flags, col_csid, chunk_len, chunk_ptr, row_flag, OCI_DEFAULT)); } while (row_flag & OCI_XSTREAM_MORE_ROW_DATA); } /*--------------------------------------------------------------------- * print_chunk - Print chunked column information. Only print the first * 50 bytes for each chunk. *---------------------------------------------------------------------*/ static void print_chunk (ub1 *chunk_ptr, ub4 chunk_len, ub2 dty) { #define MAX_PRINT_BYTES (50) /* print max of 50 bytes per chunk */ ub4 print_bytes; if (chunk_len == 0) return; print_bytes = chunk_len > MAX_PRINT_BYTES ? MAX_PRINT_BYTES : chunk_len; printf(" Data = "); if (dty == SQLT_CHR) printf("%.*s", print_bytes, chunk_ptr); else { ub2 idx; for (idx = 0; idx < print_bytes; idx++) printf("%02x", chunk_ptr[idx]); } printf("\n"); } /*--------------------------------------------------------------------- * print_lcr - Print header information of given lcr. *---------------------------------------------------------------------*/ static void print_lcr(oci_t *ocip, void *lcrp, ub1 lcrtype, oratext **src_db_name, ub2 *src_db_namel) { oratext *cmd_type; ub2 cmd_type_len; oratext *owner; ub2 ownerl; oratext *oname; ub2 onamel; oratext *txid; ub2 txidl; sword ret; printf("\n ----------- %s LCR Header -----------------\n", lcrtype == OCI_LCR_XDDL ? "DDL" : "ROW"); /* Get LCR Header information */ ret = OCILCRHeaderGet(ocip->svcp, ocip->errp, src_db_name, src_db_namel, /* source db */ &cmd_type, &cmd_type_len, /* command type */ &owner, &ownerl, /* owner name */ &oname, &onamel, /* object name */ (ub1 **)0, (ub2 *)0, /* lcr tag */ &txid, &txidl, (OCIDate *)0, /* txn id & src time */ (ub2 *)0, (ub2 *)0, /* OLD/NEW col cnts */ (ub1 **)0, (ub2 *)0, /* LCR position */ (oraub8*)0, lcrp, OCI_DEFAULT); if (ret != OCI_SUCCESS) ocierror(ocip, (char *)"OCILCRHeaderGet failed"); else { printf(" src_db_name=%.*s\n cmd_type=%.*s txid=%.*s\n", *src_db_namel, *src_db_name, cmd_type_len, cmd_type, txidl, txid ); if (ownerl > 0) printf(" owner=%.*s oname=%.*s \n", ownerl, owner, onamel, oname); } } /*--------------------------------------------------------------------- * detach - Detach from XStream server *---------------------------------------------------------------------*/ static void detach(oci_t * ocip) { sword err = OCI_SUCCESS; printf ("Detach from XStream %s server\n", ocip->outbound ? "outbound" : "inbound" ); if (ocip->outbound) { OCICALL(ocip, OCIXStreamOutDetach(ocip->svcp, ocip->errp, OCI_DEFAULT)); } else { OCICALL(ocip, OCIXStreamInDetach(ocip->svcp, ocip->errp, (ub1 *)0, (ub2 *)0, /* processed LWM */ OCI_DEFAULT)); } } /*--------------------------------------------------------------------- * disconnect_db - Logoff from the database *---------------------------------------------------------------------*/ static void disconnect_db(oci_t * ocip) { if (OCILogoff(ocip->svcp, ocip->errp)) { ocierror(ocip, (char *)"OCILogoff() failed"); } if (ocip->errp) OCIHandleFree((dvoid *) ocip->errp, (ub4) OCI_HTYPE_ERROR); if (ocip->envp) OCIHandleFree((dvoid *) ocip->envp, (ub4) OCI_HTYPE_ENV); } /*--------------------------------------------------------------------- * ocierror - Print error status and exit program *---------------------------------------------------------------------*/ static void ocierror(oci_t * ocip, char * msg) { sb4 errcode=0; text bufp[4096]; if (ocip->errp) { OCIErrorGet((dvoid *) ocip->errp, (ub4) 1, (text *) NULL, &errcode, bufp, (ub4) 4096, (ub4) OCI_HTYPE_ERROR); printf("%s\n%s", msg, bufp); } else puts(msg); printf ("\n"); exit(1); } /*-------------------------------------------------------------------- * print_usage - Print command usage *---------------------------------------------------------------------*/ static void print_usage(int exitcode) { puts("\nUsage: xio -ob_svr <outbound_svr> -ob_db <outbound_db>\n" " -ob_usr <conn_user> -ob_pwd <conn_user_pwd>\n" " -ib_svr <inbound_svr> -ib_db <inbound_db>\n" " -ib_usr <apply_user> -ib_pwd <apply_user_pwd>\n"); puts(" ob_svr : outbound server name\n" " ob_db : database name of outbound server\n" " ob_usr : connect user to outbound server\n" " ob_pwd : password of outbound's connect user\n" " ib_svr : inbound server name\n" " ib_db : database name of inbound server\n" " ib_usr : apply user for inbound server\n" " ib_pwd : password of inbound's apply user\n"); exit(exitcode); } /*-------------------------------------------------------------------- * get_inputs - Get user inputs from command line *---------------------------------------------------------------------*/ static void get_inputs(conn_info_t *xout_params, conn_info_t *xin_params, int argc, char ** argv) { char * option; char * value; memset (xout_params, 0, sizeof(*xout_params)); memset (xin_params, 0, sizeof(*xin_params)); while(--argc) { /* get the option name */ argv++; option = *argv; /* check that the option begins with a "-" */ if (!strncmp(option, (char *)"-", 1)) { option ++; } else { printf("Error: bad argument '%s'\n", option); print_usage(1); } /* get the value of the option */ --argc; argv++; value = *argv; if (!strncmp(option, (char *)"ob_db", 5)) { xout_params->dbname = (oratext *)value; xout_params->dbnamelen = (ub4)strlen(value); } else if (!strncmp(option, (char *)"ob_usr", 6)) { xout_params->user = (oratext *)value; xout_params->userlen = (ub4)strlen(value); } else if (!strncmp(option, (char *)"ob_pwd", 6)) { xout_params->passw = (oratext *)value; xout_params->passwlen = (ub4)strlen(value); } else if (!strncmp(option, (char *)"ob_svr", 6)) { xout_params->svrnm = (oratext *)value; xout_params->svrnmlen = (ub4)strlen(value); } else if (!strncmp(option, (char *)"ib_db", 5)) { xin_params->dbname = (oratext *)value; xin_params->dbnamelen = (ub4)strlen(value); } else if (!strncmp(option, (char *)"ib_usr", 6)) { xin_params->user = (oratext *)value; xin_params->userlen = (ub4)strlen(value); } else if (!strncmp(option, (char *)"ib_pwd", 6)) { xin_params->passw = (oratext *)value; xin_params->passwlen = (ub4)strlen(value); } else if (!strncmp(option, (char *)"ib_svr", 6)) { xin_params->svrnm = (oratext *)value; xin_params->svrnmlen = (ub4)strlen(value); } else { printf("Error: unknown option '%s'.\n", option); print_usage(1); } } /* print usage and exit if any argument is not specified */ if (!xout_params->svrnmlen || !xout_params->passwlen || !xout_params->userlen || !xout_params->dbnamelen || !xin_params->svrnmlen || !xin_params->passwlen || !xin_params->userlen || !xin_params->dbnamelen) { printf("Error: missing command arguments. \n"); print_usage(1); } }
To run the sample XStream client application for the Java API, compile and link the application file, and enter the following on a command line:
java xio xsin_oraclesid xsin_host xsin_port xsin_username xsin_passwd xin_servername xsout_oraclesid xsout_host xsout_port xsout_username xsout_passwd xsout_servername
Substitute the appropriate values for the following placeholders:
xsin_oraclesid is the Oracle SID of the inbound server's database.
xsin_host is the host name of the computer system running the inbound server.
xsin_port is the port number of the listener for the inbound server's database.
xsin_username is the inbound server's apply user.
xsin_passwd is the password for the inbound server's apply user.
xin_servername is the name of the inbound server.
xsout_oraclesid is the Oracle SID of the outbound server's database.
xsout_host is the host name of the computer system running the outbound server.
xsout_port is the port number of the listener for the outbound server's database.
xsout_username is the outbound server's connect user.
xsout_passwd is the password for the outbound server's connect user.
xsout_servername is the name of the outbound server.
When the sample client application is running, it prints information about attaching to the inbound server and outbound server, along with the last position for each server. The output looks similar to the following:
xsin_host = server2.example.com xsin_port = 1482 xsin_ora_sid = db2 xsin connection url: jdbc:oracle:oci:@server2.example.com:1482:db2 xsout_host = server1.example.com xsout_port = 1481 xsout_ora_sid = db1 xsout connection url: jdbc:oracle:oci:@server1.example.com:1481:db1 Attached to inbound server:xin Inbound Server Last Position is: 0000000920250000000100000001000000092025000000010000000101 Attached to outbound server:xout Last Position is: 0000000920250000000100000001000000092025000000010000000101
This demo is available in the following location:
$ORACLE_HOME/rdbms/demo/xstream/java
The file name for the demo is xio.java
. See the README.txt
file in the demo directory for more information about compiling and running the application.
The code for the sample application that uses the Java API follows:
import oracle.streams.*; import oracle.jdbc.internal.OracleConnection; import oracle.jdbc.*; import oracle.sql.*; import java.sql.*; import java.util.*; public class xio { public static String xsinusername = null; public static String xsinpasswd = null; public static String xsinName = null; public static String xsoutusername = null; public static String xsoutpasswd = null; public static String xsoutName = null; public static String in_url = null; public static String out_url = null; public static Connection in_conn = null; public static Connection out_conn = null; public static XStreamIn xsIn = null; public static XStreamOut xsOut = null; public static byte[] lastPosition = null; public static byte[] processedLowPosition = null; public static void main(String args[]) { // get connection url to inbound and outbound server in_url = parseXSInArguments(args); out_url = parseXSOutArguments(args); // create connection to inbound and outbound server in_conn = createConnection(in_url, xsinusername, xsinpasswd); out_conn = createConnection(out_url, xsoutusername, xsoutpasswd); // attach to inbound and outbound server xsIn = attachInbound(in_conn); xsOut = attachOutbound(out_conn); // main loop to get lcrs get_lcrs(xsIn, xsOut); // detach from inbound and outbound server detachInbound(xsIn); detachOutbound(xsOut); } // parse the arguments to get the conncetion url to inbound db public static String parseXSInArguments(String args[]) { String trace, pref; String orasid, host, port; if (args.length != 12) { printUsage(); System.exit(0); } orasid = args[0]; host = args[1]; port = args[2]; xsinusername = args[3]; xsinpasswd = args[4]; xsinName = args[5]; System.out.println("xsin_host = "+host); System.out.println("xsin_port = "+port); System.out.println("xsin_ora_sid = "+orasid); String in_url = "jdbc:oracle:oci:@"+host+":"+port+":"+orasid; System.out.println("xsin connection url: "+ in_url); return in_url; } // parse the arguments to get the conncetion url to outbound db public static String parseXSOutArguments(String args[]) { String trace, pref; String orasid, host, port; if (args.length != 12) { printUsage(); System.exit(0); } orasid = args[6]; host = args[7]; port = args[8]; xsoutusername = args[9]; xsoutpasswd = args[10]; xsoutName = args[11]; System.out.println("xsout_host = "+host); System.out.println("xsout_port = "+port); System.out.println("xsout_ora_sid = "+orasid); String out_url = "jdbc:oracle:oci:@"+host+":"+port+":"+orasid; System.out.println("xsout connection url: "+ out_url); return out_url; } // print out sample program usage message public static void printUsage() { System.out.println(""); System.out.println("Usage: java xio "+"<xsin_oraclesid> " + "<xsin_host> " + "<xsin_port> "); System.out.println(" "+"<xsin_username> " + "<xsin_passwd> " + "<xsin_servername> "); System.out.println(" "+"<xsout_oraclesid> " + "<xsout_host> " + "<xsout_port> "); System.out.println(" "+"<xsout_username> " + "<xsout_passwd> " + "<xsout_servername> "); } // create a connection to an Oracle Database public static Connection createConnection(String url, String username, String passwd) { try { DriverManager.registerDriver(new oracle.jdbc.OracleDriver()); return DriverManager.getConnection(url, username, passwd); } catch(Exception e) { System.out.println("fail to establish DB connection to: " +url); e.printStackTrace(); return null; } } // attach to the XStream Inbound Server public static XStreamIn attachInbound(Connection in_conn) { XStreamIn xsIn = null; try { xsIn = XStreamIn.attach((OracleConnection)in_conn, xsinName, "XSDEMOINCLIENT" , XStreamIn.DEFAULT_MODE); // use last position to decide where should we start sending LCRs lastPosition = xsIn.getLastPosition(); System.out.println("Attached to inbound server:"+xsinName); System.out.print("Inbound Server Last Position is: "); if (null == lastPosition) { System.out.println("null"); } else { printHex(lastPosition); } return xsIn; } catch(Exception e) { System.out.println("cannot attach to inbound server: "+xsinName); System.out.println(e.getMessage()); e.printStackTrace(); return null; } } // attach to the XStream Outbound Server public static XStreamOut attachOutbound(Connection out_conn) { XStreamOut xsOut = null; try { // when attach to an outbound server, client needs to tell outbound // server the last position. xsOut = XStreamOut.attach((OracleConnection)out_conn, xsoutName, lastPosition, XStreamOut.DEFAULT_MODE); System.out.println("Attached to outbound server:"+xsoutName); System.out.print("Last Position is: "); if (lastPosition != null) { printHex(lastPosition); } else { System.out.println("NULL"); } return xsOut; } catch(Exception e) { System.out.println("cannot attach to outbound server: "+xsoutName); System.out.println(e.getMessage()); e.printStackTrace(); return null; } } // detach from the XStream Inbound Server public static void detachInbound(XStreamIn xsIn) { byte[] processedLowPosition = null; try { processedLowPosition = xsIn.detach(XStreamIn.DEFAULT_MODE); System.out.print("Inbound server processed low Position is: "); if (processedLowPosition != null) { printHex(processedLowPosition); } else { System.out.println("NULL"); } } catch(Exception e) { System.out.println("cannot detach from the inbound server: "+xsinName); System.out.println(e.getMessage()); e.printStackTrace(); } } // detach from the XStream Outbound Server public static void detachOutbound(XStreamOut xsOut) { try { xsOut.detach(XStreamOut.DEFAULT_MODE); } catch(Exception e) { System.out.println("cannot detach from the outbound server: "+xsoutName); System.out.println(e.getMessage()); e.printStackTrace(); } } public static void get_lcrs(XStreamIn xsIn, XStreamOut xsOut) { if (null == xsIn) { System.out.println("xstreamIn is null"); System.exit(0); } if (null == xsOut) { System.out.println("xstreamOut is null"); System.exit(0); } try { while(true) { // receive an LCR from outbound server LCR alcr = xsOut.receiveLCR(XStreamOut.DEFAULT_MODE); if (xsOut.getBatchStatus() == XStreamOut.EXECUTING) // batch is active { assert alcr != null; // send the LCR to the inbound server xsIn.sendLCR(alcr, XStreamIn.DEFAULT_MODE); // also get chunk data for this LCR if any if (alcr instanceof RowLCR) { // receive chunk from outbound then send to inbound if (((RowLCR)alcr).hasChunkData()) { ChunkColumnValue chunk = null; do { chunk = xsOut.receiveChunk(XStreamOut.DEFAULT_MODE); xsIn.sendChunk(chunk, XStreamIn.DEFAULT_MODE); } while (!chunk.isEndOfRow()); } } processedLowPosition = alcr.getPosition(); } else // batch is end { assert alcr == null; // flush the network xsIn.flush(XStreamIn.DEFAULT_MODE); // get the processed_low_position from inbound server processedLowPosition = xsIn.getProcessedLowWatermark(); // update the processed_low_position at oubound server if (null != processedLowPosition) xsOut.setProcessedLowWatermark(processedLowPosition, XStreamOut.DEFAULT_MODE); } } } catch(Exception e) { System.out.println("exception when processing LCRs"); System.out.println(e.getMessage()); e.printStackTrace(); } } public static void printHex(byte[] b) { for (int i = 0; i < b.length; ++i) { System.out.print( Integer.toHexString((b[i]&0xFF) | 0x100).substring(1,3)); } System.out.println(""); } }