... // Creates a new service loader for the given service type and class loader. publicstatic <S> ServiceLoader<S> load(Class<S> service, ClassLoader loader){ returnnew ServiceLoader<>(service, loader); } ... // Creates a new service loader for the given service type, using the current // thread's context class loader. publicstatic <S> ServiceLoader<S> load(Class<S> service){ ClassLoader cl = Thread.currentThread().getContextClassLoader(); return ServiceLoader.load(service, cl); } ... // Creates a new service loader for the given service type, using the extension // class loader. publicstatic <S> ServiceLoader<S> loadInstalled(Class<S> service){ ClassLoader cl = ClassLoader.getSystemClassLoader(); ClassLoader prev = null; while (cl != null) { prev = cl; cl = cl.getParent(); } return ServiceLoader.load(service, prev); } ...
获得 ServiceLoader 实例后,便能通过调用其 iterator() 方法来获取所有加载到的 service providers。
// The class or interface representing the service being loaded privatefinal Class<S> service;
// The class loader used to locate, load, and instantiate providers privatefinal ClassLoader loader;
// The access control context taken when the ServiceLoader is created privatefinal AccessControlContext acc;
// Cached providers, in instantiation order // 缓存的服务提供者实现 map,<class qualifier name, ServiceT> private LinkedHashMap<String,S> providers = new LinkedHashMap<>();
// The current lazy-lookup iterator private LazyIterator lookupIterator;
/** * Clear this loader's provider cache so that all providers will be * reloaded. * * <p> After invoking this method, subsequent invocations of the {@link * #iterator() iterator} method will lazily look up and instantiate * providers from scratch, just as is done by a newly-created loader. * * <p> This method is intended for use in situations in which new providers * can be installed into a running Java virtual machine. */ // 我们还可以显式地调用该 reload 方法来重新加载服务提供者实现 publicvoidreload(){ providers.clear(); lookupIterator = new LazyIterator(service, loader); } privateServiceLoader(Class<S> svc, ClassLoader cl){ service = Objects.requireNonNull(svc, "Service interface cannot be null"); loader = (cl == null) ? ClassLoader.getSystemClassLoader() : cl; acc = (System.getSecurityManager() != null) ? AccessController.getContext() : null; reload(); }
@Override public Optional<String> defaultDriverName(){ return Optional.of("oracle.jdbc.OracleDriver"); }
@Override public String quoteIdentifier(String identifier){ // if we use double-quotes identifier then Oracle becomes case-sensitive return identifier; }
// FIXME: this implementation is not been well tested // maybe provides canHandle and quoteIdentifier methods can meet the minimum requirements.
/** * Fetch the JDBCDialect class corresponding to a given database url. */ publicstatic Optional<JDBCDialect> get(String url){ return DIALECTS.stream() .filter(e -> e.canHandle(url)) .findFirst(); } }
@Override public StreamTableSink<Tuple2<Boolean, Row>> createStreamTableSink(Map<String, String> properties) { final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
final JDBCUpsertTableSink.Builder builder = JDBCUpsertTableSink.builder() .setOptions(getJDBCOptions(descriptorProperties)) .setTableSchema(descriptorProperties.getTableSchema(SCHEMA));
descriptorProperties.getOptionalInt(CONNECTOR_WRITE_FLUSH_MAX_ROWS).ifPresent(builder::setFlushMaxSize); descriptorProperties.getOptionalDuration(CONNECTOR_WRITE_FLUSH_INTERVAL).ifPresent( s -> builder.setFlushIntervalMills(s.toMillis())); descriptorProperties.getOptionalInt(CONNECTOR_WRITE_MAX_RETRIES).ifPresent(builder::setMaxRetryTimes);
return builder.build(); }
private DescriptorProperties getValidatedProperties(Map<String, String> properties){ final DescriptorProperties descriptorProperties = new DescriptorProperties(true); descriptorProperties.putProperties(properties);
new SchemaValidator(true, false, false).validate(descriptorProperties); // disabling the JDBCValidator provided by flink 1.9 // using our own JDBCValidator, a modified version of original new CustomizedJDBCValidator().validate(descriptorProperties);
return descriptorProperties; }
private JDBCOptions getJDBCOptions(DescriptorProperties descriptorProperties){ final String url = descriptorProperties.getString(CONNECTOR_URL); final JDBCOptions.Builder builder = JDBCOptions.builder() .setDBUrl(url) .setTableName(descriptorProperties.getString(CONNECTOR_TABLE)) // using customized JDBCDialects to set the specified dialect. // 使用自定义的 JDBCDialectSerice 来加载 JDBCDialect .setDialect(JDBCDialectService.get(url).get());
/** * A modified version of {@link JDBCValidator}. * NOTICE: this implementation has the difference to check jdbc.url only. * (using our CustomizedJDBCDialects instead of the original one {@link JDBCDialects}). * * @author rovo98 */ publicclassCustomizedJDBCValidatorextendsConnectorDescriptorValidator{
final String url = properties.getString(CONNECTOR_URL);
// using JDBCDialectService to find JDBCDialects here. final Optional<JDBCDialect> dialect = JDBCDialectService.get(url); Preconditions.checkState(dialect.isPresent(), "Cannot handle such jdbc url: " + url);
Optional<String> password = properties.getOptionalString(CONNECTOR_PASSWORD); if (password.isPresent()) { Preconditions.checkArgument( properties.getOptionalString(CONNECTOR_USERNAME).isPresent(), "Database username must be provided when database password is provided"); } }
privatevoidcheckAllOrNone(DescriptorProperties properties, String[] propertyNames){ int presentCount = 0; for (String name : propertyNames) { if (properties.getOptionalString(name).isPresent()) { presentCount++; } } Preconditions.checkArgument(presentCount == 0 || presentCount == propertyNames.length, "Either all or none of the following properties should be provided:\n" + String.join("\n", propertyNames)); } }
当然,别忘了把修改后的 service providers 放到 META-INF/services 目录下。
tableEnv.registerDataStream("processed_users", userDataStream); tableEnv.sqlUpdate("insert into " + registeredTblName + "select id, name, age from processed_users"); ...